2.2.2. 验证 Debezium 连接器是否正在运行
如果连接器正确启动且无错误,则会为将连接器配置为捕获的每个表创建一个主题。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。
要验证连接器是否正在运行,您需要从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:
- 验证连接器状态。
- 验证连接器是否生成主题。
- 验证主题是否已填充了用于读取操作("op":"r")的事件,该事件是否会在每个表的初始快照中生成。
前提条件
- 一个 Debezium 连接器被部署到 OpenShift 上的 AMQ Streams 中。
-
已安装 OpenShift
oc
CLI 客户端。 - 访问 OpenShift Container Platform web 控制台。
流程
使用以下方法之一检查
KafkaConnector
资源的状态:在 OpenShift Container Platform Web 控制台中:
-
导航到 Home
Search。 -
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaConnector
。 - 在 KafkaConnectors 列表中,点您要检查的连接器的名称,如 inventory-connector-mysql。
- 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在终端窗口中:
输入以下命令:
oc describe KafkaConnector <connector-name> -n <project>
例如,
oc describe KafkaConnector inventory-connector-mysql -n debezium
该命令返回类似以下输出的状态信息:
例 2.3.
KafkaConnector
resource statusName: inventory-connector-mysql Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-mysql Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory_connector_mysql inventory_connector_mysql.inventory.addresses inventory_connector_mysql.inventory.customers inventory_connector_mysql.inventory.geom inventory_connector_mysql.inventory.orders inventory_connector_mysql.inventory.products inventory_connector_mysql.inventory.products_on_hand Events: <none>
验证连接器是否创建了 Kafka 主题:
从 OpenShift Container Platform Web 控制台。
-
导航到 Home
Search。 -
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaTopic
。 - 在 KafkaTopics 列表中,点您要检查的主题名称,例如 inventory-connector-mysql.inventory.orders--ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d。
- 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在终端窗口中:
输入以下命令:
oc get kafkatopics
该命令返回类似以下输出的状态信息:
例 2.4.
KafkaTopic
resource statusNAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-mysql---a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
检查主题内容。
- 在终端窗口中输入以下命令:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
例如,
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector_mysql.inventory.products_on_hand
指定主题名称的格式与步骤 1 中的
oc describe
命令相同,如inventory_connector_mysql.inventory.addresses
。对于主题中的每个事件,命令会返回类似以下输出的信息:
例 2.5. Debezium 更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.7.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
在前面的示例中,
有效负载
值显示连接器快照从表清单中生成读取(
.op" ="r"
)事件。products_on_handproduct_id
记录的"前"
状态为空
,表示记录不存在之前的值。"after"
状态显示带有product_id
101
的项目的3
数量
。
您可以使用多个 Kafka Connect 服务集群和多个 Kafka 集群运行 Debezium。您可以部署到 Kafka Connect 集群的连接器数量取决于数据库事件的卷和率。
后续步骤
有关部署特定连接器的更多信息,请参阅 Debezium 用户指南中的以下主题: