第 3 章 查看更改事件
部署 Debezium MySQL 连接器后,它会开始捕获对 inventory
数据库的更改。
当连接器启动时,它会将事件写入一组 Apache Kafka 主题,每个主题代表 MySQL 数据库中一个表。每个主题的名称以数据库服务器的名称开头,dbserver1
。
连接器写入以下 Kafka 主题:
dbserver1
- 架构更改主题,其中 DDL 语句适用于正在捕获更改的表。
dbserver1.inventory.products
-
接收
inventory
数据库中 product表
的更改事件记录。 dbserver1.inventory.products_on_hand
-
接收
inventory
数据库中 product_on_hand
表的更改事件记录。 dbserver1.inventory.customers
-
接收
inventory
数据库中客户
表的更改事件记录。 dbserver1.inventory.orders
-
接收
清单
数据库中订单
表的变更事件记录。
本教程的其余部分检查 dbserver1.inventory.customers
Kafka 主题。当您仔细查看主题时,您会看到它如何代表不同类型的更改事件,并查找有关每个事件捕获的连接器的信息。
教程包含以下部分:
3.1. 查看 创建事件
查看 dbserver1.inventory.customers
主题,您可以看到 MySQL 连接器如何在 inventory
数据库中捕获创建事件。在这种情况下,创建 的事件捕获正添加到数据库中的新客户。
流程
打开一个新终端,再使用
kafka-console-consumer
从主题开始使用 dbserver1.inventory.
customers 主题。此命令在运行 Kafka(
my-cluster-kafka-0
)的 Pod 中运行一个简单的使用者(kafka-console-consumer.sh
):$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
使用者返回四个消息(采用 JSON 格式),每个消息对应
客户
表中的每一行。每个消息都包含相应表行的事件记录。每个事件都有两个 JSON 文档: 一个键 和 一个值。键与行的主键对应,值显示行的详细信息(行包含的字段、每个字段的值以及行上执行的操作类型)。
对于最后一个事件,请查看 键 的详细信息。
以下是最后一次事件 的密钥 的详细信息(便于阅读):
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }
该事件有两个部分:
schema
和payload
。该模式
包含一个 Kafka 连接模式,描述有效负载中的内容。在本例中,有效负载是名为dbserver1.inventory.customers.Key
的构造,它是可选的,并且有一个必填字段(类型为int32
)。有效负载
具有单个id
字段,值设为1004
。通过查看事件 的密钥,您可以看到此事件适用于
inventory.customers
表(其id
主键列的值为1004
的行)。检查同一事件 的值 的详细信息。
事件 的值 显示创建了行,并描述它包含的内容(在本例中,
id
、first_name
、last_name
以及插入行的电子邮件
)。以下是最后一次事件 的值 的详细信息(便于阅读):
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "1.9.5.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "r", "ts_ms": 1486500577691 } }
这个部分事件比,但与事件 的密钥 一样,它也具有
schema
和有效负载
。该架构包含名为dbserver1.inventory.customers.Envelope
(版本 1)的 Kafka Connect 模式,它包含五个字段:op
-
包含描述操作类型的字符串值的必需字段。MySQL 连接器的值为
c
,用于创建(或插入)、u
表示 update、d
表示删除,以及r
以便读取(在快照中)。 before
-
可选字段(如果存在)包含事件 发生前 行的状态。其结构将由
dbserver1.
Kafka Connect 模式进行描述,该模式在 inventory.customers 表中使用inventory.customers
.Valuedbserver1
连接器。 之后
-
可选字段(如果存在)包含事件发生 后 行的状态。结构由
前面
使用的相同dbserver1.inventory.customers.Value
Kafka Connect 模式描述。 source
-
一个必需字段,其中包含描述事件的源元数据的结构,该元素是 MySQL 的源元数据,包含多个字段:连接器名称、记录事件的
binlog
文件的名称、该binlog
文件中的位置,即事件内的行(如果有多个字段),受影响的数据库和表的名称, 进行更改的 MySQL 线程 ID,无论此事件是快照的一部分,如果可用,则 MySQL 服务器 ID 以及以秒为单位的时间戳。 ts_ms
- 一个可选字段(如果存在)包含时间(在运行 Kafka Connect 任务的 JVM 中)以及连接器处理事件的 JVM 中。
注意事件的 JSON 表示要比它们描述的行要长。这是因为,对于每个事件键和值,Kafka Connect 提供了描述 有效负载的 schema。随着时间的推移,这个结构可能会改变。但是,在事件本身中使用键和值的 schema 使得使用应用程序更容易理解消息,特别是随着时间推移而变化。
Debezium MySQL 连接器根据数据库表的结构构建这些模式。如果您使用 DDL 语句更改 MySQL 数据库中的表定义,连接器会读取这些 DDL 声明并更新其 Kafka Connect 模式。这是构建每个事件的唯一方法,与在事件发生时所源自的表完全相同。但是,包含单一表的所有事件的 Kafka 主题可能会具有与表定义的每个状态对应的事件。
JSON converter 在每个消息中包含键和值模式,因此它会生成非常详细的事件。
将事件 的键和值 模式与
inventory
数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下语句:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
这表明您检查的事件记录与数据库中的记录相匹配。