第 4 章 查看更改事件
部署 Debezium MySQL 连接器后,它会开始捕获对 inventory 数据库的更改。
当连接器启动时,它会将事件写入一组 Apache Kafka 主题,每个主题代表 MySQL 数据库中的表之一。每个主题的名称都以数据库服务器的名称 dbserver1 开头。
连接器写入以下 Kafka 主题:
dbserver1- 适用于正在捕获更改的表的 DDL 语句的 schema 更改主题。
dbserver1.inventory.products-
接收
inventory数据库中products表的更改事件记录。 dbserver1.inventory.products_on_hand-
接收
inventory数据库中products_on_hand表的更改事件记录。 dbserver1.inventory.customers-
接收
inventory数据库中customers表的更改事件记录。 dbserver1.inventory.orders-
接收
inventory数据库中订购表的更改事件记录。
本教程的其余部分将检查 dbserver1.inventory.customers Kafka 主题。查看更多主题时,您将看到它如何代表不同类型的更改事件,并查找有关每个事件捕获的连接器的信息。
教程包含以下部分:
4.1. 查看 创建事件 复制链接链接已复制到粘贴板!
通过查看 dbserver1.inventory.customers 主题,您可以看到 MySQL 连接器如何在 inventory 数据库中捕获 create 事件。在这种情况下,创建事件 会捕获添加到数据库中的新客户。
流程
打开一个新终端,并使用
kafka-console-consumer来消耗主题开头的dbserver1.inventory.customers主题。此命令在运行 Kafka (
my-cluster-kafka-0)的 Pod 中运行一个简单的消费者(kafka-console-consumer.sh):oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customersCopy to Clipboard Copied! Toggle word wrap Toggle overflow 消费者返回四个消息(以 JSON 格式),在
customers表中每行都对应一个信息。每个消息都包含对应表行的事件记录。每个事件有两个 JSON 文档:一个 key 和一个 value。键对应于行的主键,而值则显示行的详细信息(行包含的字段、每个字段的值以及行执行的操作类型)。
对于最后一个事件,请查看 密钥 的详细信息。
以下是最后一次事件 的密钥 的详细信息(为可读性格式化):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 事件有两个部分:
schema和payload。模式包含一个 Kafka Connect 模式,描述有效负载中的内容。在这种情况下,有效负载是名为dbserver1.inventory.customers.Key的struct,它是可选的,且有一个必填字段(类型为int32的id)。payload是一个单一的id字段,它的值为1004。通过查看事件的 key,您可以看到此事件应用到
inventory.customers表中的id主键栏的值为1004的行。查看同一事件 值 的详细信息。
事件 的值 显示行已创建,并描述了它包含的内容(本例中为
id、first_name、last_name以及插入行的电子邮件)。以下是最后一次事件 的值 的详细信息(格式为可读性):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 事件的这一部分时间较长,但与事件 的密钥 一样,它还具有
schema和payload。schema包括了一个 Kafka Connect 方案,称为dbserver1.inventory.customers.Envelope(version 1),它可以包括五个字段:op-
包含描述操作类型的字符串值的必填字段。MySQL 连接器的值是
c用于创建(或插入)、u用于 update,d表示删除,r用于读取(在快照的情况下)。 before-
存在的可选字段(如果存在)包含事件 发生前 行的状态。该结构由
dbserver1.inventory.customers.ValueKafka Connect 模式描述,dbserver1连接器用于inventory.customers表中的所有行。 after-
存在的可选字段(如果存在)包含事件发生 后 行的状态。该结构与
之前使用的dbserver1.inventory.customers.ValueKafka Connect schema 相同。 source-
一个必需的字段,包含描述事件源元数据的结构(在 MySQL 中,包含几个字段:连接器名称、记录事件的
binlog文件的名称、事件出现的binlog的位置,事件中显示的位置(如果有多个字段)、受影响的数据库和表的名称,进行更改的 MySQL 线程 ID,这个事件是快照的一部分,如果可用,则 MySQL 服务器 ID 和时间戳(以秒为单位)。 ts_ms- 可选字段,如果存在,包含运行 Kafka Connect 任务的 JVM 中的时间(使用系统时钟),该字段处理事件。
注意事件的 JSON 表示比它们描述的行要长。这是因为,在每个事件键和值中,Kafka Connect 都附带了描述 有效负载的 schema。随着时间的推移,这种结构可能会改变。但是,在事件本身中拥有 key 和 value 的 schema 可让您更轻松地使用应用程序了解消息,特别是随着时间推移而变化的信息。
Debezium MySQL 连接器根据数据库表的结构构建这些模式。如果您使用 DDL 语句更改 MySQL 数据库中的表定义,连接器会读取这些 DDL 语句并更新其 Kafka Connect 模式。这是每个事件的结构与事件发生时源自的表的唯一方式。但是,包含单个表的所有事件的 Kafka 主题可能会具有与表定义的每个状态对应的事件。
JSON 转换程序在每个消息中包含键和值模式,因此它会产生非常详细的事件。
将事件的 key 和 value 方案与
inventory数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下语句:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 这表明您检查的事件记录与数据库中记录匹配的记录。