第 12 章 应用转换以修改使用 Apache Kafka 交换的消息
Debezium 提供了几个单一消息转换(SMT),您可以使用它来修改更改事件记录。您可以配置连接器以应用转换,在将记录发送到 Apache Kafka 前修改记录。您还可以将 Debezium SMTs 应用到接收器连接器,以在连接器从 Kafka 主题读取前修改记录。
如果只想将转换应用到特定消息,您可以配置 Kafka Connect predicate 来定义应用 SMT 的条件。
Debezium 提供以下 SMT:
- 主题路由器 SMT
- 根据应用于原始主题名称的正则表达式,将事件记录重新路由到特定的主题。
- 基于内容的路由器 SMT
- 根据事件内容重新路由指定的更改事件记录。
- 消息过滤 SMT
- 允许您将事件记录的子集传播到目标 Kafka 主题。根据事件记录的内容,转换会对连接器发出的更改事件记录应用正则表达式。只有与表达式匹配的记录才会写入目标主题。其他记录将被忽略。
- 新的记录状态提取 SMT
- 以简化的格式创建 Debezium 更改事件记录的复杂结构。简化的结构启用由无法使用原始结构的接收器连接器处理。
- MongoDB 新记录状态提取
- 简化了 Debezium MongoDB 连接器更改事件记录的复杂结构。简化的结构启用由无法使用原始事件结构的接收器连接器处理。
- outbox 事件路由器 SMT
- 提供对开箱即用模式的支持,以启用多个服务间的安全可靠的数据交换。
- MongoDB outbox 事件路由器 SMT
- 支持将 outbox 模式与 MongoDB 连接器搭配使用,以启用多个服务间的安全可靠的数据交换。
12.1. 使用 SMT predicates 有选择地应用转换
当您为连接器配置单个消息转换(SMT)时,您可以为转换定义 predicate。predicate 指定如何将转换条件应用到连接器进程的消息子集。您可以为源连接器(如 Debezium)或 sink 连接器分配 predicates 以转换。
12.1.1. 关于 SMT predicates
Debezium 提供多个单消息转换(SMT),可用于在 Kafka Connect 将记录保存到 Kafka 主题前修改事件记录。默认情况下,当您为 Debezium 连接器配置其中一个 SMT 时,Kafka Connect 会将这个转换应用到连接器发出的每个记录。但是,您可能有一些要有选择地应用转换的实例,以便它只修改共享通用特征的事件消息子集。
例如,对于 Debezium 连接器,您可能希望仅在来自特定表的事件消息或包含特定标头键的事件消息上运行转换。在运行 Apache Kafka 2.6 或更高版本的环境中,您可以在转换中附加 predicate 语句,以指示 Kafka Connect 只对某些记录应用 SMT。在 predicate 中,您可以指定 Kafka Connect 用来评估它处理的每个消息的条件。当 Debezium 连接器发出更改事件消息时,Kafka Connect 会根据配置的 predicate 条件检查消息。如果事件消息的条件为 true,Kafka Connect 会应用转换,然后将消息写入 Kafka 主题。与条件不匹配的消息会原封发送到 Kafka。
这种情况与您为接收器连接器 SMT 定义的 predicates 类似。连接器从 Kafka 主题读取信息,Kafka Connect 根据 predicate 条件评估信息。如果消息与条件匹配,Kafka Connect 会应用转换,然后将信息传递给接收器连接器。
定义了 predicate 后,您可以重复使用它并将其应用到多个转换。predicates 还包含一个 negate
选项,可用于反转 predicate,以便 predicate 条件仅应用到与 predicate 语句中定义的条件 不匹配的 记录。您可以使用 negate
选项将 predicate 与基于负条件的其他转换配对。
predicate 元素
predicates 包括以下元素:
-
predicates
前缀 -
别名(例如,
isOutboxTable
) -
类型(例如
org.apache.kafka.connect.transforms.predicates.TopicNameMatches
)。Kafka Connect 提供了一组默认的 predicate 类型,您可以通过定义自己的自定义 predicates 来补充。 - 根据 predicate 的类型(例如,正则表达式命名模式)
默认 predicate 类型
以下 predicate 类型默认可用:
- HasHeaderKey
- 在您希望 Kafka Connect 评估的事件消息中指定标头中的键名称。对于包含具有指定名称的标头键的任何记录,predicate 会评估为 true。
- RecordIsTombstone
匹配 Kafka tombstone 记录。对于具有
null
值的任何记录,predicate 会评估为true
。将此 predicate 与过滤器 SMT 结合使用,以删除 tombstone 记录。此 predicate 没有配置参数。Kafka 中的 tombstone 是一个记录,其中包含一个具有 0 字节、
null
有效负载的键。当 Debezium 连接器处理源数据库中的 delete 操作时,连接器会为删除操作发送两个更改事件:-
删除操作(
"op" : "d"
)事件,它提供数据库记录的先前值。 具有相同键但为
null
值的 tombstone 事件。tombstone 代表行的删除标记。当为 Kafka 启用 日志 压缩时,在压缩 Kafka 过程中会删除共享与 tombstone 相同的键的所有事件。日志压缩会定期进行,压缩间隔由主题的
delete.retention.ms
设置控制。虽然可以 配置 Debezium,使其不会发出 tombstone 事件,但最好允许 Debezium 在日志压缩过程中发出 tombstones 来保持预期的行为。禁止 tombstones 会阻止 Kafka 在日志压缩过程中删除删除密钥的记录。如果您的环境包含无法处理 tombstones 的接收器连接器,您可以将接收器连接器配置为使用带有
RecordIsTombstone
predicate 的 SMT 来过滤 tombstone 记录。
-
删除操作(
- TopicNameMatches
- 指定 Kafka Connect 匹配的主题名称的正则表达式。对于与指定的正则表达式匹配的连接器记录,predicate 为 true。使用此 predicate 根据源表的名称将 SMT 应用到记录。
12.1.2. 定义 SMT predicates
默认情况下,Kafka Connect 将 Debezium 连接器配置中的每个单个消息转换应用到它从 Debezium 接收的每个更改事件记录。从 Apache Kafka 2.6 开始,您可以在控制 Kafka Connect 如何应用转换的连接器配置中为转换定义 SMT predicate。predicate 语句定义了 Kafka Connect 将转换应用到 Debezium 发送的事件记录的条件。Kafka Connect 评估 predicate 声明,然后有选择地将 SMT 应用到与 predicate 中定义的条件匹配的记录子集。配置 Kafka Connect predicates 与配置转换类似。您可以指定 predicate 别名,将别名与转换相关联,然后定义 predicate 的类型和配置。
先决条件
- Debezium 环境运行 Apache Kafka 2.6 或更高版本。
- 为 Debezium 连接器配置 SMT。
流程
-
在 Debezium 连接器配置中,为
predicates
参数指定 predicate 别名,例如IsOutboxTable
。 通过将 predicate 别名附加到连接器配置中的转换别名,将 predicate 别名与您要有条件应用的转换别名相关联:
transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>
例如:
transforms.outbox.predicate=IsOutboxTable
通过指定其 type 并为配置参数提供值来配置 predicate。
对于类型,请指定 Kafka Connect 中的以下默认类型之一:
- HasHeaderKey
- RecordIsTombstone
TopicNameMatches
例如:
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
对于 TopicNameMatch 或
HasHeaderKey
predicates,为您要匹配的主题或标头名称指定一个正则表达式。例如:
predicates.IsOutboxTable.pattern=outbox.event.*
如果要对条件进行必要,请将
negate
关键字附加到转换别名,并将它设置为true
。例如:
transforms.outbox.negate=true
以上属性反转 predicate 匹配的一组记录,因此 Kafka Connect 会将转换应用到与 predicate 中指定的条件不匹配的任何记录。
示例:用于 outbox 事件路由器转换的 TopicNameMatch predicate
以下示例显示了 Debezium 连接器配置,它将 outbox 事件路由器转换应用到 Debezium 发送到 Kafka outbox.event.order
主题的消息。
由于 TopicNameMatch
predicate 仅针对来自 outbox 表中的消息评估为 true,所以转换不会应用到源自数据库中其他表的消息。
transforms=outbox transforms.outbox.predicate=IsOutboxTable transforms.outbox.type=io.debezium.transforms.outbox.EventRouter predicates=IsOutboxTable predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsOutboxTable.pattern=outbox.event.*
12.1.3. 忽略 tombstone 事件
您可以控制 Debezium 是否发出 tombstone 事件,以及 Kafka 保留的时长。根据您的数据管道,您可能想要为连接器设置 tombstones.on.delete
属性,以便 Debezium 不会发出 tombstone 事件。
是否启用 Debezium 发送 tombstones,具体取决于如何在您的环境中消耗主题以及接收器消费者的特性。有些接收器连接器依赖于 tombstone 事件从下游数据存储中删除记录。如果接收器连接器依赖 tombstone 记录来指示何时删除下游数据存储中的记录,请配置 Debezium 来发出它们。
当您将 Debezium 配置为生成 tombstones 时,需要进一步的配置以确保接收器连接器接收 tombstone 事件。必须设置主题的保留策略,以便连接器在 Kafka 在日志压缩过程中删除事件消息的时间。在压缩前保留 tombstones 的时长,由主题的 delete.retention.ms
属性控制。
默认情况下,连接器的 tombstones.on.delete
属性设置为 true
,以便连接器在每次删除事件后生成一个 tombstone。如果将属性设置为 false
以防止 Debezium 将 tombstone 记录保存到 Kafka 主题,则没有 tombstone 记录可能会导致意外的结果。Kafka 在日志压缩过程中依赖于 tombstone 来删除与已删除密钥相关的记录。
如果您需要支持无法处理 null 值记录的接收器连接器或下游 Kafka 用户,而不是防止 Debezium 发送 tombstones,请考虑使用 RecordIsTombstone
predicate 类型为连接器配置 SMT,以便在用户读取前删除 tombstone 信息。
流程
要防止 Debezium 为删除的数据库记录发出 tombstone 事件,请将连接器的
tombstones.on.delete
选项设置为false
。例如:
“tombstones.on.delete”: “false”