6.2. Debezium MySQL 连接器数据更改事件的描述
Debezium MySQL 连接器为每个行级 INSERT、UPDATE 和 DELETE 操作生成数据更改事件。每个事件包含一个键和值。键的结构和值取决于已更改的表。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间推移而改变,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您正在使用 schema registry,用户可以使用该模式 ID 从 registry 获取 schema。这使得每个事件都自包含。
以下框架 JSON 显示更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect converter,决定更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema 字段才会处于更改事件中。同样,只有在您配置转换器来生成它时,事件密钥和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,更改事件具有此结构:
{
"schema": {
...
},
"payload": {
...
},
"schema": {
...
},
"payload": {
...
},
}
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
第一个 |
| 2 |
|
第一个 |
| 3 |
|
第二个 |
| 4 |
|
第二个 |
默认情况下,连接器流将事件记录改为与事件原始表相同的主题。请参阅 主题名称。
MySQL 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,它将使用下划线字符替换。
如果逻辑服务器名称、数据库名称或表名称包含无效字符,且唯一与另一个名称区分名称的字符无效,这可能会导致意外冲突冲突,从而被下划线替换。
详情包括在以下主题中:
6.2.1. 关于 Debezium MySQL 中的键更改事件 复制链接链接已复制到粘贴板!
更改事件的密钥包含更改表的密钥和更改行的实际键的 schema。当连接器创建事件时,schema 及其对应有效负载都会包含更改表的 PRIMARY KEY (或唯一约束)中每个列的字段。
考虑以下 客户 表,后跟此表的更改事件键的示例。
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
每次捕获 customer 表的更改事件都有相同的事件关键模式。只要 customers 表有以前的定义,可以捕获 customer 表更改的事件都有以下关键结构:在 JSON 中,它类似如下:
{
"schema": {
"type": "struct",
"name": "mysql-server-1.inventory.customers.Key",
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1001
}
}
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 |
| 2 |
|
定义密钥有效负载结构的模式名称。这个 schema 描述了已更改的表的主键的结构。键模式名称的格式是 connector-name.database-name.table-name.
|
| 3 |
|
指明 event 键是否必须在其 |
| 4 |
|
指定 |
| 5 |
|
包含生成此更改事件的行的密钥。在本例中,键 包含一个 |
6.2.2. 关于 Debezium MySQL 更改事件中的值 复制链接链接已复制到粘贴板!
更改事件中的值比键复杂一些。与键一样,该值有一个 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。为创建、更新或删除数据的操作更改事件,它们都有一个带有 envelope 结构的值有效负载。
考虑用于显示更改事件键示例的相同示例表:
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
对这个表的更改事件的值部分描述:
创建 事件
以下示例显示了一个更改事件的值部分,连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql-server-1.inventory.customers.Envelope"
},
"payload": {
"op": "c",
"ts_ms": 1465491411815,
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "2.3.7.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 0,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": 7,
"query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
}
}
}
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
| 值的 schema,用于描述值有效负载的结构。当连接器为特定表生成的每次更改事件中,更改事件的值模式都是相同的。 |
| 2 |
|
在 |
| 3 |
|
|
| 4 |
|
|
| 5 |
|
值的实际数据。这是更改事件提供的信息。 |
| 6 |
|
描述导致连接器生成事件的操作类型的强制字符串。在本例中,
|
| 7 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
| 8 |
|
指定事件发生前行状态的可选字段。当 |
| 9 |
|
指定事件发生后行状态的可选字段。在本例中, |
| 10 |
| 描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序以及事件是否为同一事务的一部分。源元数据包括:
如果启用了 |
更新 事件
示例 customers 表中一个更新的改变事件的值有与那个表的 create 事件相同的模式。同样,事件值有效负载具有相同的结构。但是,事件值有效负载在 update 事件中包含不同的值。以下是连接器为 customer 表中更新生成的更改事件值 的示例 :
{
"schema": { ... },
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "2.3.7.Final",
"name": "mysql-server-1",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581029100,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 484,
"row": 0,
"thread": 7,
"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
},
"op": "u",
"ts_ms": 1465581029523
}
}
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
指定事件发生前行状态的可选字段。在 update 事件值中, |
| 2 |
|
指定事件发生后行状态的可选字段。您可以比较 |
| 3 |
|
描述事件源元数据的必需字段。
如果启用了 |
| 4 |
|
描述操作类型的强制字符串。在 update 事件值中, |
| 5 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新行 primary/unique 键的列会更改行的键值。当键更改时,Debezium 会输出 三个 事件:一个 DELETE 事件,以及一个带有行的旧键的 tombstone 事件,后跟一个带有行的新键的事件。详情在下一节中。
主密钥更新
更改行的主键字段的 UPDATE 操作称为主密钥更改。对于主键更改,以代替 UPDATE 事件记录,连接器为旧密钥发出 DELETE 事件记录,并为新的(updated)密钥的 CREATE 事件记录。这些事件具有常见的结构和内容,另外,每个事件都有与主密钥更改相关的消息标头:
-
DELETE事件记录具有__debezium.newkey作为消息标头。此标头的值是更新行的新主键。 -
CREATE事件记录具有__debezium.oldkey作为消息标头。此标头的值是更新行所具有的前一个主键(旧的)主键。
删除 事件
delete 更改事件中的值与为同一表的 create 和 update 事件相同的 schema 部分。示例 customer 表的 delete 事件中 payload 部分类似如下:
{
"schema": { ... },
"payload": {
"before": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null,
"source": {
"version": "2.3.7.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581902300,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 805,
"row": 0,
"thread": 7,
"query": "DELETE FROM customers WHERE id=1004"
},
"op": "d",
"ts_ms": 1465581902461
}
}
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
指定事件发生前行状态的可选字段。在一个 delete 事件值中, |
| 2 |
|
指定事件发生后行状态的可选字段。在 delete 事件值中, |
| 3 |
|
描述事件源元数据的必需字段。在一个 delete 事件值中,
如果启用了 |
| 4 |
|
描述操作类型的强制字符串。 |
| 5 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
删除 更改事件记录为消费者提供处理此行删除所需的信息。包含旧值,因为有些用户可能需要它们才能正确处理删除。
MySQL 连接器事件被设计为使用 Kafka 日志压缩。只要保留每个密钥的最新消息,日志压缩就会启用删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
tombstone 事件
删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须为 null。为了实现此目的,在 Debezium 的 MySQL 连接器发出 一个 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但有一个 null 值。
Truncate 事件
截断 更改事件信号,表示表已被截断。在这种情况下,message 键为 null,message 值类似如下:
{
"schema": { ... },
"payload": {
"source": {
"version": "2.3.7.Final",
"name": "mysql-server-1",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581029100,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 484,
"row": 0,
"thread": 7,
"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
},
"op": "t",
"ts_ms": 1465581029523
}
}
| 项 | 字段名称 | 描述 |
|---|---|---|
| 1 |
|
描述事件源元数据的必需字段。在 truncate 事件值中,
|
| 2 |
|
描述操作类型的强制字符串。 |
| 3 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。
+ |
如果单个 TRUNCATE 语句应用到多个表,则会为每个删节的表发出一个 truncate 更改事件记录。
请注意,由于 truncate 事件代表对整个表进行的更改,且没有消息密钥,除非您使用单个分区的主题,否则与表相关的更改事件没有排序保证(创建、更新 等)和 截断 该表的事件。例如,当这些事件从不同的分区读取时,消费者只能在该表的 truncate 事件后收到 update 事件。