7.7. 从 Debezium 更改事件中提取源记录 after 状态


Debezium 连接器会发送数据更改消息,以表示它们从源数据库捕获的每个操作。连接器发送到 Apache Kafka 的信息有一个复杂的结构,它代表了原始数据库事件的详情。

虽然这种复杂的消息格式准确有关系统中发生的更改的详细信息,但格式可能不适用于一些下游用户。sink 连接器或 Kafka 生态系统的其他部分可能需要格式化消息,以便以简化的扁平化结构呈现字段名称和值。

要简化 Debezium 连接器生成的事件记录格式,您可以使用 Debezium 事件扁平化单个消息转换(SMT)。将转换配置为支持要求 Kafka 记录采用的格式,其格式比连接器生成的默认格式更简单。根据具体用例,您可以将 SMT 应用到 Debezium 连接器,或应用于使用 Debezium 连接器生成消息的接收器连接器。要启用 Apache Kafka 以原始格式保留 Debezium 更改事件信息,请为接收器连接器配置 SMT。

事件扁平化转换是一个 Kafka Connect SMT

注意

本章中的信息描述了 Debezium 基于 SQL 的数据库连接器的事件扁平化单一消息转换(SMT)。有关 Debezium MongoDB 连接器对等的 SMT 的详情,请参考 MongoDB New Document State Extraction

以下主题提供详情:

7.7.1. Debezium 更改事件结构的描述

Debezium 生成具有复杂结构的数据更改事件。每个事件由三个部分组成:

  • 元数据,包括但不限于:

    • 更改数据的操作类型。
    • 源信息,如数据库的名称以及发生更改的表。
    • 标识更改时间的时间戳。
    • 可选的事务信息。
  • 更改前的行数据
  • 更改后的行数据

以下示例显示了 UPDATE 更改事件的消息结构的一部分:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"ts_us" : "...",
	"ts_ns" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}
Copy to Clipboard Toggle word wrap

有关连接器更改事件结构的更多信息,请参阅连接器的文档。

在事件扁平化 SMT 处理上例中的消息后,它会简化消息格式,如下例所示:

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}
Copy to Clipboard Toggle word wrap

7.7.2. Debezium 事件扁平化转换的行为

事件扁平化 SMT 在 Kafka 记录中从 Debezium 更改事件中提取 after 字段。SMT 将原始更改事件替换为其 after 字段,以创建简单的 Kafka 记录。

您可以为 Debezium 连接器或使用 Debezium 连接器发出的消息的接收器连接器配置事件扁平化 SMT。为接收器连接器配置事件扁平化的好处是,存储在 Apache Kafka 中的记录包含整个 Debezium 更改事件。将 SMT 应用到源或接收器连接器的决定取决于您的特定用例。

您可以将转换配置为进行以下任一操作:

  • 将更改事件的元数据添加到简化的 Kafka 记录。默认行为是 SMT 不添加元数据。
  • 在流中保留包含 DELETE 操作更改事件的 Kafka 记录。默认行为是,SMT 会丢弃 DELETE 操作更改事件的 Kafka 记录,因为大多数用户还无法处理它们。

数据库 DELETE 操作会导致 Debezium 生成两个 Kafka 记录:

  • 包含 "op": "d"、 在行数据 和一些其他字段的记录。
  • 一个 tombstone 记录,其键与已删除行和 null 值相同。这个记录是 Apache Kafka 的标记。表示 日志压缩 可以删除此密钥的所有记录。

您可以将事件扁平化 SMT 配置为执行以下操作之一,而不是丢弃包含 before 行数据的记录:

  • 将记录保存在流中,并编辑它使其只有 "value": "null" 字段。
  • 将记录保存在流中,并编辑它的一个 value 字段,该字段包含 before 字段中带有添加 "__deleted": "true" 条目的 key/value 对。

同样,您不需要丢弃 tombstone 记录,您可以配置事件扁平化 SMT,以便在流中保留 tombstone 记录。

7.7.3. 配置 Debezium 事件扁平化转换

通过在连接器的配置中添加 SMT 配置详情,在 Kafka Connect 源或接收器连接器中配置 Debezium 事件扁平化 SMT。例如,要获取转换的默认行为,请将其添加到连接器配置中,而不指定任何选项,如下例所示:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
Copy to Clipboard Toggle word wrap

与任何 Kafka Connect 连接器配置一样,您可以按照您希望 Kafka Connect 应用 SMT 的顺序将 transforms= 设置为多个,用逗号分开的 SMT 别名。

以下 .properties 示例设置几个事件扁平化 SMT 选项:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
Copy to Clipboard Toggle word wrap
drop.tombstones=false
在事件流中为 DELETE 操作保留 tombstone 记录。
delete.handling.mode=rewrite

对于 DELETE 操作,请通过扁平化 change 事件中的 value 字段来编辑 Kafka 记录。value 字段直接包含 before 字段中的键/值对。SMT 添加 __deleted 并将其设置为 true,例如:

"value": {
  "pk": 2,
  "cola": null,
  "__deleted": "true"
}
Copy to Clipboard Toggle word wrap
add.fields=table,lsn
在简化的 Kafka 记录中添加 表和 lsn 字段的更改事件元数据。

自定义配置

连接器可能会发出许多类型的事件消息(heartbeat 消息、tombstone 消息或有关事务或模式更改的元数据消息)。要将转换应用到事件的子集,您可以定义 SMT predicate 语句,该语句仅有选择地将转换 应用到特定的事件。

7.7.4. 在 Kafka 记录中添加 Debezium 元数据示例

您可以配置事件扁平化 SMT,将原始更改事件元数据添加到简化的 Kafka 记录中。例如,您可能希望简化的记录的标头或值使其包含以下任意一个:

  • 进行更改的操作类型
  • 已更改的数据库或表的名称
  • 特定于连接器的字段,如 Postgres LSN 字段

要在简化的 Kafka 记录标头中添加元数据,请指定 add.headers 选项。要在简化的 Kafka 记录的值中添加元数据,请指定 add.fields 选项。每个选项都使用以逗号分隔的更改事件字段名称列表。不要指定空格。当存在重复的字段名称时,要为其中一个字段添加元数据,请指定 struct 和 字段。例如:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite
Copy to Clipboard Toggle word wrap

使用这个配置,简化的 Kafka 记录将包含如下内容:

{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}
Copy to Clipboard Toggle word wrap

另外,简化的 Kafka 记录会有一个 __db 标头。

在简化的 Kafka 记录中,SMT 会用双下划线作为 metadata 字段名称前缀。当您指定 struct 时,SMT 也会在 struct 名称和字段名称间插入下划线。

要将元数据添加到用于 DELETE 操作的简化 Kafka 记录中,还必须配置 delete.handling.mode=rewrite

除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括 heartbeat 信息,以及有关 schema 更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。

有关如何有选择地应用 SMT 的更多信息,请参阅为转换配置 SMT predicate

7.7.6. 配置 Debezium 事件扁平化转换的选项

下表描述了您可以指定配置事件扁平化 SMT 的选项。

Expand
表 7.7. 事件扁平化 SMT 配置选项的描述
选项默认描述

drop.tombstones

true

Debezium 为每个 DELETE 操作生成一个 tombstone 记录。默认行为是事件扁平化 SMT 从流中删除 tombstone 记录。要在流中保留 tombstone 记录,请指定 drop.tombstones=false

注意

计划在以后的发行版本中删除这个选项。在这一位置,使用 delete.tombstone.handling.mode 选项。

delete.handling​.mode

drop

Debezium 为每个 DELETE 操作生成更改事件记录。默认行为是事件扁平化 SMT 会从流中删除这些记录。要在流中保留 DELETE 操作的 Kafka 记录,将 delete.handling.mode 设置为 nonerewrite

指定 none 在流中保留更改事件记录。记录仅包含 "value": "null "。

指定 rewrite 在流中保留更改事件记录,并编辑记录,使其具有 字段的 key/value 对,并将 __deleted: true 添加到 这是指明该记录已被删除的另一种方式。

当您指定 重写 时,DELETE 操作的更新简化记录可能全部需要跟踪删除的记录。您可以考虑接受 Debezium 连接器创建的 tombstone 记录的默认行为。

注意

计划在以后的发行版本中删除这个选项。在这一位置,使用 delete.tombstone.handling.mode 选项。

delete.tombstone.handling.mode

没有默认值

Debezium 为每个 DELETE 操作生成更改事件记录。此设置决定了事件扁平化 SMT 如何处理流中的 DELETE 事件。

注意

这个选项的设置优先于您可以为已弃用的 drop.tombstonesdelete.handling.mode 选项配置的任何冲突设置。

设置以下选项之一:

drop
SMT 从流中删除 DELETE 事件和 TOMBSTONE
tombstone (默认)
SMT 在流中保留 TOMBSTONE 记录。TOMBSTONE 记录仅包含以下值: "value": "null "。
rewrite

SMT 在流中保留更改事件记录,并进行以下更改:

  • 在记录中添加一个 value 字段,其中包含原始记录的 before 字段中的键/值对。
  • 在记录的值中添加 __deleted: true
  • 删除 TOMBSTONE 记录。

    此设置提供了另一种表示该记录已被删除的方法。

rewrite-with-tombstone
当您选择 rewrite 选项时,SMT 的行为相同,但它也会保留 TOMBSTONE 记录。

route.by.field

 

要使用行数据来确定要将记录路由到的主题,请将此选项设置为 after field 属性。SMT 将记录路由到名称与指定 after field 属性的值匹配的主题。对于 DELETE 操作,将此选项设置为 before field 属性。

例如,route.by.field=destination 的配置将记录路由到主题,其名称是 after.destination 的值。默认行为是,Debezium 连接器将每个更改事件记录发送到一个主题,其名称由数据库的名称以及进行更改的表的名称发送。

如果要在接收器连接器上配置事件扁平化 SMT,当目标主题名称指定使用简化的更改事件记录更新的数据库表的名称时,设置此选项可能会很有用。如果您的用例的主题名称不正确,您可以配置 route.by.field 来重新路由事件。

add.fields.prefix

__ (double-underscore)

将此可选字符串设置为前缀字段。

add.fields

 

将这个选项设置为用逗号分开的列表,没有空格,元数据字段添加到简化的 Kafka 记录的值中。当存在重复的字段名称时,要为其中一个字段添加元数据,请指定 struct 和字段,如 source.ts_ms

另外,您可以通过 <field name& gt;:<new field name& gt; 覆盖字段名称,例如:如 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP 等新字段名称。请注意,新字段名称是区分大小写的。

当 SMT 在简化的记录的值中添加元数据字段时,它会使用双下划线为每个元数据字段名称添加前缀。对于 struct 规格,SMT 还在 struct 名称和字段名称之间插入一个下划线。

如果您指定了不在更改事件记录中的字段,则 SMT 仍然会将字段添加到记录的值中。

add.headers.prefix

__ (double-underscore)

将此可选字符串设置为前缀标头。

add.headers

 

将这个选项设置为用逗号分开的列表,没有空格,元数据字段添加到简化的 Kafka 记录标头中。当存在重复的字段名称时,要为其中一个字段添加元数据,请指定 struct 和字段,如 source.ts_ms

另外,您可以通过 <field name& gt;:<new field name& gt; 覆盖字段名称,例如:如 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP 等新字段名称。请注意,新字段名称是区分大小写的。

当 SMT 在简化的记录的标头中添加元数据字段时,它会使用双下划线为每个元数据字段名称添加前缀。对于 struct 规格,SMT 还在 struct 名称和字段名称之间插入一个下划线。

如果您指定了一个不在更改事件记录中的字段,则 SMT 不会在标头中添加该字段。

drop.fields.header.name

 

要从输出消息中丢弃的源消息中用于列出字段名称的 Kafka 消息标头名称。

drop.fields.from.key

false

指定是否希望 SMT 从事件键中删除 drop.fields.header.name 中列出的字段。

drop.fields.keep.schema.compatible

true

指定是否希望 SMT 删除 drop.fields.header.name 配置属性中包含的非可选字段。

默认情况下,SMT 只移除标记为可选 的字段。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat