4.2. 更新数据库并查看 更新 事件
现在,您已了解 Debezium MySQL 连接器如何捕获 inventory
数据库中的 create 事件,现在您将更改其中一个记录并查看连接器如何捕获它。
通过完成此步骤,您将了解如何查找数据库提交中变化的详细信息,以及如何比较更改事件以确定更改事件与其他更改相关。
流程
在运行 MySQL 命令行客户端的终端中,运行以下命令:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
查看更新的
客户
表:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
切换到运行
kafka-console-consumer
的终端,以查看 一个新的 五个事件。通过更改
customers
表中的记录,Debebe MySQL 连接器会生成一个新事件。您应该会看到两个新的 JSON 文档:一个用于事件 键,一个用于新事件 的值。以下是 更新 事件 的密钥 详情(用于可读性的格式):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
这个密钥 与之前事件 的密钥 相同。
以下是新事件 的值。
schema
部分没有更改,因此只显示payload
部分(用于可读性的格式):{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { 2 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "name": "2.1.4.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", 4 "ts_ms": 1486501486308 5 } }
通过查看
payload
部分,您可以了解有关 更新 事件的几个重要事项:-
通过比较
before
和after
结构,您可以确定受影响的行中实际更改的内容,因为提交的原因。 -
通过查看
源
结构,您可以查找有关 MySQL 已更改记录的信息(提供可追溯性)。 -
通过将事件的
payload
部分与同一主题(或不同的主题)中的其他事件进行比较,您可以确定事件是否在之前、之后或作为另一个事件相同的 MySQL 提交的一部分。
-
通过比较