第 3 章 查看更改事件
部署 Debezium MySQL 连接器后,它开始捕获 清单
数据库的更改。
当连接器启动时,它会将事件写入一组 Apache Kafka 主题,每个主题代表 MySQL 数据库中的一个表。每个主题的名称都以数据库服务器 dbserver1
的名称开头。
连接器写入以下 Kafka 主题:
dbserver1
- 架构更改主题为应用于要捕获的更改的 DDL 语句将写入到其中。
dbserver1.inventory.products
-
接收
inventory
数据库中 product表
的更改事件记录。 dbserver1.inventory.products_on_hand
-
接收
inventory
数据库中 product_on_hand
表的更改事件记录。 dbserver1.inventory.customers
-
接收
库存
数据库中客户
表的更改事件记录。 dbserver1.inventory.orders
-
接收
清单
数据库中的排序
表的更改事件记录。
本教程的其余部分检查 dbserver1.inventory.customers
Kafka 主题。正如您在主题中仔细查看,您会看到它如何代表不同类型的更改事件,并查找有关每个事件捕获的连接器的信息。
教程包含以下部分:
3.1. 查看 创建事件
通过查看 dbserver1.inventory.customers
主题,您可以看到 MySQL 连接器如何获取在 inventory
数据库中捕获的事件。在这种情况下,创建事件 会捕获向数据库添加新客户。
流程
打开一个新终端,再使用
kafka-console-consumer
来消耗主题开头的dbserver1.inventory.customers
主题。该命令在运行 Kafka (
my-cluster-kafka-0
)的 Pod 中运行一个简单的使用者(kafka-console-consumer.sh
):$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
使用者返回四条消息(采用 JSON 格式),
客户
表中的每一行都会返回一条消息。每个消息都包含相应表行的事件记录。每个事件都有两个 JSON 文档:一个 键和值 。键与行的主键对应,值显示行的详情(包括行的字段、每个字段的值以及在该行中执行的操作类型)。
对于最后的事件,请查看 密钥 的详细信息。
以下是最后一次事件 的密钥 的详细信息(为可读性而格式化):
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }
该事件有两个部分:一个
schema
和一个有效负载
。模式
包含一个 Kafka Connect 模式,描述有效负载中的内容。在本例中,有效负载是名为dbserver1.inventory.customers.Key
的struct
,它是可选的,并且有一个必填字段(类型为int32
)。有效负载
有一个id
字段,值为1004
。通过查看事件 的密钥,您可以看到此事件应用到
清单中的行。其
表。id
主键列值为1004
的客户检查同一事件 的值 的详细信息。
事件 的值 显示创建了行,并描述其所包含的内容(本例中为
id
、first_name
、last_name
,以及插入行的电子邮件
)。以下是最后一次事件 的值 的详细信息(为可读性而格式化):
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "1.9.7.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "r", "ts_ms": 1486500577691 } }
此部分事件已长长,但与事件 的关键 相似,它还具有架构和
有效载荷
。该架构包含一个名为
dbserver1.inventory.customers.Envelope
(版本 1)的 Kafka Connect 模式,它可以包含五个字段:op
-
包含描述操作类型的字符串值的必需字段。MySQL 连接器的值为
c
,用于创建(或插入)、u
用于更新、d
表示删除,以及r
表示读取(在快照的情况下)。 before
-
可选字段(如果存在)包含事件 发生前 行的状态。该结构将由
dbserver1.
Kafka Connect 模式描述,inventory.customers
.Valuedbserver1
连接器用于清单.customers 表中的所有行。 after
-
可选字段(如果存在)包含事件发生 后 行的状态。结构由
前面
使用的相同dbserver1.inventory.customers.Value
Kafka Connect 模式描述。 source
-
一个必需字段,其中包含一个描述事件的源元数据的结构(在 MySQL 中包含几个字段):连接器名称、正在记录该事件的
binlog
文件的名称、事件所出现的binlog
文件中的位置、事件中的行(如果是多个字段)、受影响数据库和表的名称,以及受影响的数据库和表的名称。 进行更改的 MySQL 线程 ID (此事件是否为快照的一部分),以及 MySQL 服务器 ID 以及以秒为单位的时间戳。 ts_ms
- 如果存在可选字段,其中包含运行 Kafka Connect 任务的 JVM 时间(使用系统时钟),其中可处理该事件。
注意事件的 JSON 表示比它们描述的行要长。这是因为,带有每个事件键和值,Kafka Connect 会发布描述 有效负载的 schema。这种结构可能会随着时间推移而变化。但是,拥有键和事件本身的模式,使得使用应用程序更容易理解消息,特别是随着时间发展而变化。
Debezium MySQL 连接器根据数据库表的结构来构建这些架构。如果您使用 DDL 语句更改 MySQL 数据库中的表定义,连接器会读取这些 DDL 声明并更新其 Kafka 连接模式。这是每个事件的结构与来自事件发生时所源自于其的唯一方式。但是,包含单个表的所有事件的 Kafka 主题可能具有与表定义每个状态对应的事件。
JSON 转换器包含每条消息中的键和值 schema,因此它会生成非常详细的事件。
将事件 的键和值 模式与
inventory
数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下声明:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
这表明您检查的事件记录与数据库中的记录相匹配。