3.2. 更新数据库并查看 更新 事件
现在,您已看到 Debezium MySQL 连接器如何捕获 inventory
数据库中的 create 事件,现在您将更改其中一个记录并查看连接器如何捕获它。
通过完成此步骤,您将了解如何查找数据库提交中更改的详细信息,以及如何比较更改事件,以确定更改事件何时与其他更改相关。
流程
在运行 MySQL 命令行客户端的终端中,运行以下语句:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
查看更新的
customers
表:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
切换到运行
kafka-console-consumer
的终端,以查看 一个新的 第五个事件。通过更改
customers
表中的记录,Debezium MySQL 连接器会生成新事件。您应该会看到两个新 JSON 文档:一个用于事件 的密钥,另一个用于新事件 的值。以下是 更新 事件 的密钥 的详细信息(用于可读性的格式):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }
这个密钥 与之前事件 的密钥 相同。
以下是新事件 的值。
schema
部分没有更改,因此仅显示payload
部分(用于可读性的格式):{ "schema": {...}, "payload": { "before": { 1 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { 2 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "name": "2.3.4.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", 4 "ts_ms": 1486501486308 5 } }
通过查看
payload
部分,您可以了解 update 事件的几个重要事项:-
通过比较
before
和after
结构,您可以确定由于提交,在受影响行中实际更改的内容。 -
通过查看
源
结构,您可以找到有关 MySQL 的更改记录的信息(提供可追溯性)。 -
通过将事件的
payload
部分与同一主题(或不同的主题)中的其他事件进行比较,您可以确定事件是之前、之后还是作为与另一个事件相同的 MySQL 提交的一部分。
-
通过比较