3.2. 更新数据库并查看 更新 事件
现在,您已经了解了 Debezium MySQL 连接器如何捕获 清单数据库中创建事件
,现在您将更改其中一个记录,并查看连接器如何捕获它。
通过完成此步骤,您将了解如何查找数据库提交中更改的详细信息,以及如何比较更改事件以确定与更改相关的变化。
流程
在运行 MySQL 命令行客户端的终端中,运行以下声明:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
查看更新的
客户
表:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
切换到运行
kafka-console-consumer
的终端,以查看新 的第五个事件。通过在
客户
表中更改记录,Debezium MySQL 连接器会生成新的事件。您应该会看到两个新的 JSON 文档:一个用于事件 的密钥,另一个用于新事件 的值。以下是 更新 事件 的密钥 详情(为可读性而格式化):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
这个密钥 与之前的事件 的密钥 相同。
这里是新事件 的值为。
schema
部分中没有显示任何更改,因此仅显示payload
部分(为可读性格式化):{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { 2 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "name": "1.9.7.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", 4 "ts_ms": 1486501486308 5 } }
通过查看
payload
部分,您可以了解有关 更新 事件的几个重要事项:-
通过比较
before
和after
结构,您可以确定由于提交,在受影响的行中实际更改了哪些内容。 -
通过查看
源
结构,您可以查找有关 MySQL 更改记录的信息(可用可追溯性)。 -
通过将事件的
payload
部分与同一主题中的其他事件进行比较(或不同的主题),您可以确定事件是否发生在之前、之后还是作为与另一个事件相同的 MySQL 提交的一部分。
-
通过比较