13.7. 从 Debezium 更改事件中提取源记录 after 状态
Debezium 连接器发出数据更改消息来代表它们从源数据库捕获的每个操作。连接器发送到 Apache Kafka 的消息有一个复杂的结构,它代表原始数据库事件的详情。
虽然这种复杂的消息格式准确了解有关系统中发生更改的详细信息,但格式可能不适用于某些下游用户。sink 连接器或 Kafka 生态系统的其他部分可能需要格式化信息,以便以简化的扁平化结构呈现字段名称和值。
要简化 Debezium 连接器生成的事件记录格式,您可以使用 Debezium 事件扁平化单一消息转换(SMT)。配置转换以支持需要 Kafka 记录的格式比连接器生成的默认格式更简单。根据您的特定用例,您可以将 SMT 应用到 Debezium 连接器,或应用到消耗 Debezium 连接器生成的消息的接收器连接器。要启用 Apache Kafka 以原始格式保留 Debezium 更改事件信息,请为接收器连接器配置 SMT。
事件扁平化转换是一个 Kafka Connect SMT。
本章中的信息描述了基于 SQL 的数据库连接器的事件扁平化单一消息转换(SMT)。有关 Debezium MongoDB 连接器等效的 SMT 的详情,请参考 MongoDB New Document State Extraction。
以下主题提供详情:
13.7.1. Debezium 更改事件结构的描述
Debezium 生成具有复杂结构的数据更改事件。每个事件由三个部分组成:
元数据,其中包括但不仅限于:
- 更改数据的操作类型。
- 源信息,如数据库的名称以及所发生更改的表。
- 在进行更改时标识的时间戳。
- 可选的事务信息。
- 更改前的行数据
- 更改后的行数据
以下示例显示了 UPDATE
更改事件的消息结构的一部分:
{ "op": "u", "source": { ... }, "ts_ms" : "...", "before" : { "field1" : "oldvalue1", "field2" : "oldvalue2" }, "after" : { "field1" : "newvalue1", "field2" : "newvalue2" } }
有关连接器更改事件结构的更多信息,请参阅连接器的文档。
当事件扁平化 SMT 处理上例中的消息后,它简化了消息格式,如下例所示:
{ "field1" : "newvalue1", "field2" : "newvalue2" }
13.7.2. Debezium 事件扁平化转换的行为
事件扁平化 SMT 从 Kafka 记录中的 Debezium 更改事件中提取 after
字段。SMT 仅将原始更改事件替换为其 after
字段,以创建简单的 Kafka 记录。
您可以为 Debezium 连接器或消耗 Debezium 连接器发送消息的接收器连接器配置事件扁平化 SMT。为接收器连接器配置事件扁平化的优点在于 Apache Kafka 中存储的记录包含整个 Debezium 更改事件。将 SMT 应用到源或接收器连接器的决定取决于您的特定用例。
您可以将转换配置为执行以下操作之一:
- 将更改事件中的元数据添加到简化的 Kafka 记录中。默认行为是 SMT 不添加元数据。
-
在流中保留包含
DELETE
操作更改事件的 Kafka 记录。默认行为是 SMT 丢弃DELETE
操作更改事件的 Kafka 记录,因为大多数消费者还没有处理它们。
数据库 DELETE
操作会导致 Debezium 生成两个 Kafka 记录:
-
包含
"op": "d"、
before
行数据以及一些其他字段的记录。 -
有一个 tombstone 记录,其键与已删除行和值
null
相同。此记录是 Apache Kafka 的标记。表示 日志压缩 可以删除具有此密钥的所有记录。
您可以将事件扁平化 SMT 配置为执行以下操作之一,而不是丢弃包含 before
行数据的记录:
-
在流中保留记录,并编辑它使其只有
"value": "null"
字段。 -
在流中保留记录,并编辑它,使其包含一个
value
字段,其中包含带有添加"__deleted": "true"
条目的before
字段中的键/值对。
同样,您可以配置事件扁平化 SMT 以在流中保留 tombstone 记录,而不是丢弃 tombstone 记录。
13.7.3. 配置 Debezium 事件扁平化转换
通过在连接器的配置中添加 SMT 配置详情,在 Kafka Connect 源或 sink 连接器中配置 Debezium 事件扁平化 SMT。例如,要获取转换的默认行为,请在没有指定任何选项的情况下将其添加到连接器配置中,如下例所示:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
与任何 Kafka Connect 连接器配置一样,您可以将 transformations=
设置为多个用逗号分开的 SMT 别名,按您希望 Kafka Connect 应用 SMT 的顺序设置。
以下 .properties
示例设置几个事件扁平化 SMT 选项:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=rewrite transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
-
在事件流中为
DELETE
操作保留 tombstone 记录。 delete.handling.mode=rewrite
对于
DELETE
操作,请通过扁平化更改事件中的value
字段来编辑 Kafka 记录。value
字段直接包含before
字段中的键/值对。SMT 添加__deleted
并将其设置为true
,例如:"value": { "pk": 2, "cola": null, "__deleted": "true" }
add.fields=table,lsn
-
在简化的 Kafka 记录中添加
table
和lsn
字段的更改事件元数据。
自定义配置
连接器可能会发出许多类型的事件消息(heartbeat 消息、tombstone 消息或有关事务或模式更改的元数据信息)。要将转换应用到事件子集,您可以定义一个 SMT predicate 语句,该语句有选择地将转换 应用到特定的事件。
13.7.4. 在 Kafka 记录中添加 Debezium 元数据示例
您可以配置事件扁平化 SMT,将原始更改事件元数据添加到简化的 Kafka 记录中。例如,您可能希望简化的记录的标头或值包含以下任一内容:
- 进行更改的操作类型
- 已更改的数据库或表的名称
- 特定于连接器的字段,如 Postgres LSN 字段
要在简化的 Kafka 记录标头中添加元数据,请指定 add.headers
选项。要在简化的 Kafka 记录值中添加元数据,请指定 add.fields
选项。每个选项都使用以逗号分隔的更改事件字段名称列表。不要指定空格。当存在重复的字段名称时,若要为其中一个字段添加元数据,请指定 struct 和字段。例如:
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields=op,table,lsn,source.ts_ms transforms.unwrap.add.headers=db transforms.unwrap.delete.handling.mode=rewrite
使用这个配置,简化的 Kafka 记录将包含如下内容:
{ ... "__op" : "c", "__table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ... }
另外,简化的 Kafka 记录也会有一个 __db
标头。
在简化的 Kafka 记录中,SMT 使用双下划线前缀 metadata 字段名称。当您指定 struct 时,SMT 也在 struct 名称和字段名称之间插入下划线。
要在用于 DELETE
操作的简化 Kafka 记录中添加元数据,还必须配置 delete.handling.mode=rewrite
。
13.7.5. 用于有选择地应用事件扁平化转换的选项
除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括心跳消息,以及有关 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。
有关如何有选择地应用 SMT 的更多信息,请参阅为转换配置 SMT predicate。
13.7.6. 用于配置 Debezium 事件扁平化转换的选项
下表描述了您可以指定配置事件扁平化 SMT 的选项。
选项 | 默认 | 描述 |
---|---|---|
|
Debezium 为每个 | |
|
Debezium 为每个 | |
要使用行数据来确定要将记录路由到的主题,请将此选项设置为 | ||
__ (double-underscore) | 将此可选字符串设为前缀字段。 | |
将这个选项设置为以逗号分隔的列表,没有空格,使用 metadata 字段添加到简化的 Kafka 记录值中。当存在重复的字段名称时,若要为其中一个字段添加元数据,请指定 struct 以及字段,如 | ||
__ (double-underscore) | 将此可选字符串设为前缀标头。 | |
将这个选项设置为以逗号分隔的列表,没有空格,使用 metadata 字段添加到简化的 Kafka 记录标头中。当存在重复的字段名称时,若要为其中一个字段添加元数据,请指定 struct 以及字段,如 | ||
用于列出您要从输出消息丢弃的源消息中的字段名称的 Kafka 消息标头名称。 | ||
|
指定是否希望 SMT 从事件键中删除 | |
|
指定是否希望 SMT 删除 |