2.4. 验证部署
连接器启动后,它会执行配置的数据库的快照,并为您指定的每个表创建主题。
先决条件
根据 第 2.3 节 “在 Red Hat Enterprise Linux 中使用 AMQ Streams 部署 Debezium” 中的说明,您在 Red Hat Enterprise Linux 上部署了连接器。
在主机上的终端窗口中输入以下命令请求 Kafka Connect API 中的连接器列表:
$ curl -H "Accept:application/json" localhost:8083/connectors/
查询返回部署的连接器的名称,例如:
["inventory-connector"]
在主机上的终端窗口中输入以下命令查看连接器正在运行的任务:
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
该命令返回类似以下示例的输出:
HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", "task": 0 } ] }
显示 Kafka 集群中的主题列表。
在终端窗口中进入/opt/kafka/bin/
并运行以下 shell 脚本:./kafka-topics.sh --bootstrap-server=localhost:9092 --list
Kafka 代理返回连接器创建的主题列表。可用的主题取决于连接器的
snapshot.mode
、snapshot.include.collection.list
和table.include.list
配置属性的设置。默认情况下,连接器会为数据库中的每个非系统表创建一个主题。查看主题的内容。
在终端窗口中,进入到/opt/kafka/bin/
,并运行kafka-console-consumer.sh
shell 脚本,以显示上一命令返回的其中一个主题的内容:
例如:
./kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
对于主题中的每个事件,命令会返回类似以下示例的信息:
例 2.1. 集成更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.4.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
在前面的示例中,
有效负载
值显示连接器快照从表inventory.products_on_hand
生成读取(op" ="r"
)事件。product_id
记录的"before"
状态为null
,表示该记录不存在之前的值。"after"
状态对于product_id
为101
的项目的quantity
显示为3
。
后续步骤
有关每个连接器可用的配置设置的信息,并了解如何配置源数据库以启用更改数据捕获,请参阅 Debezium 用户指南。