7.3. Debezium PostgreSQL 连接器数据更改事件的描述
Debezium PostgreSQL 连接器会为每行级别 INSERT
、UPDATE
和 DELETE
操作生成数据更改事件。每个事件都包含一个键和值。键和值的结构取决于更改的表。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,则消费者可以从 registry 获取 schema 的模式 ID。这使得每个事件都可以自我包含。
以下框架 JSON 显示更改事件的基本四个部分。但是,如何配置在应用程序中使用的 Kafka Connect converter 决定了更改事件中的这四个部分的表示。只有当您将转换器配置为生成它时,schema
字段才处于更改事件中。同样,只有在将转换器配置为生成它时,才会发生更改事件中的事件密钥和事件有效负载。如果您使用 JSON 转换器,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:
{ "schema": { 1 ... }, "payload": { 2 ... }, "schema": { 3 ... }, "payload": { 4 ... }, }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为 名称与事件原始表相同的名称的主题。
从 Kafka 0.10 开始,Kafka 可以使用创建消息 的时间戳 (由制作者记录)或由 Kafka 写入日志来记录事件键和值。
PostgreSQL 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着,逻辑服务器名称必须以拉丁或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余的字符以及模式和表名称中的每个字符都必须是一个字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将替换为下划线字符。
如果逻辑服务器名称、模式名称或表名称包含无效字符,且区分另一个名称的唯一字符无效,则可能会导致意外冲突,因此替换为下划线。
详情包括在以下主题中:
7.3.1. 关于 Debezium PostgreSQL 更改事件中的键
对于给定表,更改事件的密钥具有结构,其中包含创建事件时表的主键中的每个列的字段。或者,如果表将 REPLICA IDENTITY
设置为 FULL
或 USING INDEX
,则每个唯一键约束都有一个字段。
考虑在 公共
数据库模式中定义的 客户
表,以及该表的更改事件密钥示例。
表示例
CREATE TABLE customers ( id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id) );
更改事件键示例
如果 topic.prefix
连接器配置属性的值为 PostgreSQL_server
,而 客户
表的每个更改事件都有相同的密钥结构,在 JSON 中类似如下:
{ "schema": { 1 "type": "struct", "name": "PostgreSQL_server.public.customers.Key", 2 "optional": false, 3 "fields": [ 4 { "name": "id", "index": "0", "schema": { "type": "INT32", "optional": "false" } } ] }, "payload": { 5 "id": "1" }, }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
键的 schema 部分指定一个 Kafka Connect 模式,它描述了密钥 |
2 |
|
定义密钥有效负载结构的 schema 名称。这个模式描述了更改的表的主键的结构。键架构名称的格式为 connector-name.database-name.table-name.
|
3 |
|
指明 event 键必须在其 |
4 |
|
指定 |
5 |
|
包含生成此更改事件的行的键。在本例中,键包含一个 |
虽然 column.exclude.list
和 column.include.list
连接器配置属性只允许您只捕获表列的子集,但主或唯一键中的所有列始终包含在事件键中。
如果表没有主键或唯一键,则更改事件的密钥为 null。没有主或唯一键约束的表中的行无法唯一标识。
7.3.2. 关于 Debezium PostgreSQL 更改事件中的值
更改事件中的值比键稍微复杂。与键一样,该值具有 schema
部分和 payload
部分。schema
部分包含描述 payload
部分的 Envelope
结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的事件,并且具有 envelope 结构的值有效负载。
考虑用于显示更改事件键示例的相同示例:
CREATE TABLE customers ( id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id) );
对这个表的更改事件的值部分因 REPLICA IDENTITY
设置和事件所针对的操作而异。
以下部分详细如下:
副本身份
REPLICA IDENTITY 是特定于 PostgreSQL 的表级设置,它决定了 UPDATE
和 DELETE
事件的逻辑解码插件可用的信息量。更具体地说,当发生 UPDATE
或 DELETE
事件时,设置 REPLICA IDENTITY
控制哪些(若有)信息用于之前涉及的表列的值。
REPLICA IDENTITY
有 4 个可能的值:
DEFAULT
- 如果该表有主密钥,则默认行为是UPDATE
和DELETE
事件包含表的主键列的以前的值。对于UPDATE
事件,只有带有更改值的主键列会被存在。如果表没有主密钥,则连接器不会为该表发出
UPDATE
或DELETE
事件。对于没有主密钥的表,连接器只发出 创建事件。通常,没有主键的表用于将消息附加到表末尾,这意味着UPDATE
和DELETE
事件不可用。-
NOTHING
-UPDATE
和DELETE
操作的事件不包含任何表列之前值的任何信息。 -
FULL
- Emitted events forUPDATE
和DELETE
操作包含表中所有列的先前值。 -
INDEX
index-name - Emitted events forUPDATE
和DELETE
操作包含指定索引中包含的列的以前的值。UPDATE
事件还包含带有更新值的索引列。
创建 事件
以下示例显示了连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:
{ "schema": { 1 "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "PostgreSQL_server.inventory.customers.Value", 2 "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "PostgreSQL_server.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source", 3 "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "PostgreSQL_server.inventory.customers.Envelope" 4 }, "payload": { 5 "before": null, 6 "after": { 7 "id": 1, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 8 "version": "2.1.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": true, "db": "postgres", "sequence": "[\"24023119\",\"24023128\"]" "schema": "public", "table": "customers", "txId": 555, "lsn": 24023128, "xmin": null }, "op": "c", 9 "ts_ms": 1559033904863 10 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 值的 schema,用于描述值有效负载的结构。每次连接器为特定表生成的更改时,更改事件的值模式都是相同的。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值的实际数据。这是更改事件提供的信息。 |
6 |
|
指定事件发生前行状态的可选字段。当 注意
此字段是否取决于每个表的 |
7 |
|
指定事件发生后行状态的可选字段。在本例中, |
8 |
| 描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序,以及事件是否为同一事务的一部分。源元数据包括:
|
9 |
|
描述导致连接器生成事件的操作类型的必要字符串。在本例中,
|
10 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新 事件
示例 customer 表中更新的更改事件值与该表的 create 事件相同。同样,事件的 payload 具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。以下是连接器在
customers
表中为更新生成的更改事件值示例:
{ "schema": { ... }, "payload": { "before": { 1 "id": 1 }, "after": { 2 "id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "version": "2.1.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 24023128, "xmin": null }, "op": "u", 4 "ts_ms": 1465584025523 5 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
包含数据库提交前行中的值的可选字段。在本例中,只有主键列 |
2 |
|
指定事件发生后行状态的可选字段。在本例中, |
3 |
|
描述事件源元数据的必需字段。
|
4 |
|
描述操作类型的强制字符串。在 update 事件值中, |
5 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新行的主/唯一键的列会更改行键的值。当键更改时,Debebe 会输出 三个 事件: DELETE
事件和带有行的旧键的 tombstone 事件,后跟带有行的新键的事件。下一小节中提供了详细信息。
主密钥更新
更改行主密钥字段的 UPDATE
操作被称为主密钥更改。对于主键更改,为了发送 UPDATE
事件记录,连接器会为旧密钥和新(更新)密钥的 CREATE
事件记录发送 DELETE
事件记录。这些事件具有常见的结构和内容,以及每个事件都有一个与主密钥更改相关的消息标头:
-
DELETE
事件记录具有__debezium.newkey
作为消息标头。此标头的值是更新行的新主密钥。 -
CREATE
事件记录具有__debezium.oldkey
作为消息标头。此标头的值是更新的行具有的先前(旧的)主密钥。
删除 事件
delete 更改事件中的值与为同一表的 create 和 update 事件相同的 schema
部分。示例 customer
表的 delete 事件中 payload
部分类似如下:
{ "schema": { ... }, "payload": { "before": { 1 "id": 1 }, "after": null, 2 "source": { 3 "version": "2.1.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null }, "op": "d", 4 "ts_ms": 1465581902461 5 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定事件发生前行状态的可选字段。在 delete 事件值中, |
2 |
|
指定事件发生后行状态的可选字段。在 delete 事件值中, |
3 |
|
描述事件源元数据的必需字段。在一个 delete 事件值中,
|
4 |
|
描述操作类型的强制字符串。 |
5 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
删除 更改事件记录为消费者提供处理此行删除所需的信息。
对于消费者能够处理为没有主密钥的表生成的 删除 事件,请将表的 REPLICA IDENTITY
设置为 FULL
。当表没有主密钥,并且表的 REPLICA IDENTITY
被设置为 DEFAULT
或 NOTHING
时,删除 事件没有 before
字段。
PostgreSQL 连接器事件旨在用于 Kafka 日志压缩。只要每个密钥至少保留最新的消息,日志压缩就可以删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
tombstone 事件
删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,如果 Kafka 删除具有相同键的所有消息,消息值必须是 null
。为了实现此目的,PostgreSQL 连接器遵循一个 delete 事件,其中包含一个特殊的tombstone 事件,它有相同的键但值为 null
。
截断 事件
一个 截断 的更改事件信号,表示表已被截断。本例中 message 键为 null
,消息值类似如下:
{ "schema": { ... }, "payload": { "source": { 1 "version": "2.1.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null }, "op": "t", 2 "ts_ms": 1559033904961 3 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
描述事件源元数据的必需字段。在 truncate 事件值中,
|
2 |
|
描述操作类型的强制字符串。 |
3 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
如果单个 TRUNCATE
语句应用到多个表,则会发出每个截断表的截断更改事件记录。
请注意,由于 truncate 事件代表对整个表所做的更改,且没有消息键,除非您正在使用单个分区的主题,否则不会排序与表相关的更改事件(创建、更新 等)和 截断 事件。例如,当这些事件从不同的分区读取时,消费者只能在该表的 truncate 事件后收到 update 事件。
此事件类型只支持 Postgres 14+ 上的 pgoutput
插件(Postgres Documentation)
消息 事件信号,一个通用逻辑解码消息已被直接插入到 WAL 中,通常使用 pg_logical_emit_message
函数。message 键是一个 Struct
,本例中为一个名为 prefix
的单个字段,在插入消息时指定前缀。message 值类似如下用于事务消息:
{ "schema": { ... }, "payload": { "source": { 1 "version": "2.1.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "", "table": "", "txId": 556, "lsn": 46523128, "xmin": null }, "op": "m", 2 "ts_ms": 1559033904961, 3 "message": { 4 "prefix": "foo", "content": "Ymfy" } } }
与其他事件类型不同,非事务消息将不会有任何关联的 BEGIN
或 END
事务事件。对于非事务消息,Message 值类似如下:
{ "schema": { ... }, "payload": { "source": { 1 "version": "2.1.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "", "table": "", "lsn": 46523128, "xmin": null }, "op": "m", 2 "ts_ms": 1559033904961 3 "message": { 4 "prefix": "foo", "content": "Ymfy" } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
描述事件源元数据的必需字段。在 message 事件值中,
|
2 |
|
描述操作类型的强制字符串。 |
3 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。
对于非事务 消息 事件, |
4 |
| 包含消息元数据的字段
|