搜索

13.4. 从 Debezium 事件记录中提取字段级别的更改

download PDF

Debezium 数据更改事件具有复杂的结构,可提供大量信息。然而,在某些情况下,在下游消费者处理 Debezium 更改事件信息前,它需要有关原始数据库更改结果的信息。为了增强事件消息,其中包含数据库操作如何修改源数据库中的字段的详细信息,Debebe 提供 ExtractChangedRecordState 单个消息转换(SMT)。

事件更改转换是一个 Kafka Connect SMT

13.4.1. Debezium 更改事件结构的描述

Debezium 生成具有复杂结构的数据更改事件。每个事件由以下部分组成:

  • 元数据,其中包括但不仅限于以下类型:

    • 更改数据的操作类型。
    • 源信息,如数据库的名称以及所发生更改的表。
    • 在进行更改时标识的时间戳。
    • 可选的事务信息。
  • 更改前的行数据。
  • 更改后行数据。

以下示例显示了典型的 Debezium UPDATE 更改事件结构的一部分:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

上例中的消息的复杂格式提供有关源数据库中发生更改的详细信息。但是,格式可能不适用于某些下游用户。sink 连接器或 Kafka 生态系统的其他部分可能会预期信息明确标识数据库操作更改或保持不变的字段。ExtractChangedRecordState SMT 将标头添加到更改事件消息中,以识别由数据库操作修改的字段,以及保持不变的字段。

13.4.2. Debezium 事件的行为更改 SMT

事件更改 SMT 在 Kafka 记录中从 Debezium UPDATE 更改事件中提取 beforeafter 字段。转换会检查事件状态结构的 beforeafter,以识别操作更改的字段,以及保持不变的字段。根据连接器配置,转换会生成修改的事件消息,添加消息标头来列出更改的字段、更改的字段或两者。如果事件代表 INSERTDELETE,则这个单一消息转换无效。

您可以为 Debezium 连接器配置事件更改 SMT,或使用 Debezium 连接器发送的消息的接收器连接器。如果您希望 Apache Kafka 保留整个原始 Debezium 更改事件,请为接收器连接器配置 SMT。将 SMT 应用到源或接收器连接器的决定取决于您的特定用例。

根据您的用例,您可以通过执行以下任一任务来配置转换来修改原始消息:

  • 通过在用户配置的 header.changed.name 标头中列出它们,来识别由 UPDATE 事件更改的字段。
  • 通过在用户配置的 header.unchanged.name 标头中列出它们,识别 UPDATE 事件未更改的字段。

13.4.3. 配置 Debezium 事件更改 SMT

您可以通过在连接器的配置中添加 SMT 配置详情,为 Kafka Connect 源或 sink 连接器配置 Debezium 事件更改 SMT。要获取默认行为,不添加任何标头,请将转换添加到连接器配置中,如下例所示:

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState

与任何 Kafka Connect 连接器配置一样,您可以将 transformations= 设置为多个用逗号分开的 SMT 别名,按您希望 Kafka Connect 应用 SMT 的顺序设置。

以下示例中的连接器配置为事件更改 SMT 设置几个选项:

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed.name=Changed
transforms.changes.header.unchanged.name=Unchanged
header.changed.name
用于存储数据库操作更改的字段的逗号分隔列表的 Kafka 消息标头名称。
header.unchanged.name
Kafka 消息标头名称,用于存储在数据库操作后保持不变的字段列表。

自定义配置

连接器可能会发出许多类型的事件消息(heartbeat 消息、tombstone 消息或有关事务或模式更改的元数据信息)。要将转换应用到事件子集,您可以定义一个 SMT predicate 语句,该语句有选择地将转换 应用到特定的事件。

13.4.4. 用于有选择地应用事件更改转换的选项

除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括心跳消息,以及有关 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。

有关如何有选择地应用 SMT 的更多信息,请参阅为转换配置 SMT predicate

13.4.5. Debezium 事件的配置选项的描述会更改 SMT

下表描述了您可以指定配置事件更改 SMT 的选项。

表 13.3. 事件更改 SMT 配置选项的描述
选项默认描述

header.changed.name

 

用于存储数据库操作更改的字段的逗号分隔列表的 Kafka 消息标头名称。

header.unchanged.name

 

Kafka 消息标头名称,用于存储在数据库操作后保持不变的字段列表。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.