第 7 章 应用转换来修改与 Apache Kafka 交换的消息


Debezium 提供多个消息转换(SMT),可用于修改更改事件记录。您可以将连接器配置为应用在将记录发送到 Apache Kafka 前修改记录的转换。您还可以将 Debezium SMTs 应用到接收器连接器,在连接器从 Kafka 主题读取前修改记录。

如果要 只对特定信息应用转换,您可以配置 Kafka Connect predicate 来定义应用 SMT 的条件。

Debezium 提供以下 SMT:

主题路由器 SMT
根据应用于原始主题名称的正则表达式,将更改事件记录重新路由到特定的主题。
基于内容的路由器 SMT
根据事件内容重新路由指定的更改事件记录。
事件记录更改 SMT
增强了事件消息,以识别值在数据库操作后更改或保持不变的字段
消息过滤 SMT
允许您将事件记录子集传播到目标 Kafka 主题。根据事件记录的内容,转换将正则表达式应用到连接器发出的更改事件记录。只有与表达式匹配的记录才会写入目标主题。其他记录将被忽略。
HeaderToValue SMT
从事件记录中提取指定的标头字段,然后将标头字段或移到事件记录中的值。
新的记录状态提取 SMT
将 Debezium 更改事件记录的复杂结构扁平化为简化的格式。简化的结构允许由不能使用原始结构的接收器连接器进行处理。
MongoDB new record state extraction
简化 Debezium MongoDB 连接器更改事件记录的复杂结构。简化的结构允许由 sink 连接器处理,这些连接器无法使用原始事件结构。
outbox 事件路由器 SMT
提供对 outbox 模式的支持,以便在多个服务之间实现安全可靠的数据交换。
MongoDB outbox 事件路由器 SMT
提供对将 outbox 模式与 MongoDB 连接器搭配使用的支持,以便在多个服务中启用安全可靠的数据交换。
分区路由 SMT
根据一个或多个指定有效负载字段的值将事件路由到特定的目标分区。
时区转换器 SMT
将事件记录中的 Debezium 和 Kafka Connect 时间戳字段转换为指定的时区。

7.1. 使用 SMT predicates 有选择地应用转换

当您为连接器配置单个消息转换(SMT)时,您可以为转换定义 predicate。predicate 指定如何将转换条件应用到连接器进程的消息的子集。您可以将 predicates 分配给为源连接器配置的转换,如 Debezium 或 sink 连接器。

7.1.1. 关于 SMT predicates

Debezium 提供多个消息转换(SMT),您可以在 Kafka Connect 将记录保存到 Kafka 主题前修改事件记录。默认情况下,当您为 Debezium 连接器配置其中一个 SMT 时,Kafka Connect 会将该转换应用到连接器发出的每个记录。但是,您可能希望有选择地应用转换的实例,以便仅修改共享通用特征更改事件消息的子集。

例如,对于 Debezium 连接器,您可能需要仅在来自特定表或包含特定标头键的事件消息上运行转换。在运行 Apache Kafka 2.6 或更高版本的环境中,您可以将 predicate 语句附加到转换中,以指示 Kafka Connect 仅将 SMT 应用到某些记录。在 predicate 中,您可以指定 Kafka Connect 用来评估其进程的每个消息的条件。当 Debezium 连接器发出更改事件消息时,Kafka Connect 会针对配置的 predicate 条件检查消息。如果事件消息的条件为 true,Kafka Connect 应用转换,然后将消息写入 Kafka 主题。与条件不匹配的消息将发送到 Kafka,未修改。

这种情况与您为接收器连接器 SMT 定义的 predicates 类似。连接器从 Kafka 主题读取信息,Kafka Connect 会根据 predicate 条件评估信息。如果消息与条件匹配,Kafka Connect 会应用转换,然后将信息传递给接收器连接器。

在定义了 predicate 后,您可以重复使用它并将其应用到多个转换中。predicates 还包含一个 negate 选项,您可以使用它来反转 predicate,以便 predicate 条件仅应用到与 predicate 语句中定义的条件 不匹配的 记录。您可以使用 negate 选项将 predicate 与基于负条件的其他转换进行排序。

predicate 元素

predicates 包括以下元素:

  • predicates 前缀
  • 别名(例如,isOutboxTable
  • type (例如,org.apache.kafka.connect.transforms.predicates.TopicNameMatches)。Kafka Connect 提供了一组默认 predicate 类型,您可以通过定义自己的自定义 predicates 来补充它们。
  • condition 语句和任何其他配置属性,具体取决于 predicate 的类型(例如,正则表达式命名模式)

默认 predicate 类型

以下 predicate 类型默认可用:

HasHeaderKey
在您要评估 Kafka Connect 的事件消息中指定标头中的键名称。predicate 针对包含指定名称的标头键的任何记录评估为 true。
RecordIsTombstone

匹配 Kafka tombstone 记录。对于具有 null 值的任何记录,predicate 会评估为 true。将此 predicate 与过滤器 SMT 结合使用,以删除 tombstone 记录。此 predicate 没有配置参数。

Kafka 中的 tombstone 是一个记录,它有一个带有一个 0 字节的 null 有效负载的密钥。当 Debezium 连接器在源数据库中处理 delete 操作时,连接器为 delete 操作发出两个更改事件:

  • 一个删除操作("op" : "d")事件提供数据库记录之前的值。
  • 一个具有相同键但 null 值的 tombstone 事件。

    tombstone 代表行的删除标记。当为 Kafka 启用 日志 压缩时,在压缩 Kafka 过程中会删除共享与 tombstone 相同的键的所有事件。日志压缩会定期进行,由 delete.retention.ms 设置控制的压缩间隔。

    虽然可以 配置 Debezium,使其不会发出 tombstone 事件,但最好允许 Debezium 在日志压缩过程中发出 tombstones 来维护预期的行为。防止 tombstones 可防止 Kafka 在日志压缩过程中删除已删除密钥的记录。如果您的环境包含无法处理 tombstones 的接收器连接器,您可以将接收器连接器配置为使用带有 RecordIsTombstone predicate 的 SMT 来过滤 tombstone 记录。

TopicNameMatches
指定您希望 Kafka Connect 匹配的主题名称的正则表达式。对于与指定正则表达式匹配的主题名称,则 predicate 的连接器记录是 true。使用此 predicate 根据源表的名称对记录应用 SMT。

7.1.2. 定义 SMT predicates

默认情况下,Kafka Connect 将 Debezium 连接器配置中的每个消息转换应用到它从 Debezium 接收的每个更改事件记录。从 Apache Kafka 2.6 开始,您可以在控制 Kafka Connect 如何应用转换的连接器配置中定义 SMT predicate。predicate 语句定义 Kafka Connect 将转换应用到 Debezium 发出的事件记录的条件。Kafka Connect 评估 predicate 语句,然后选择性地将 SMT 应用到与 predicate 中定义的条件匹配的记录子集。配置 Kafka Connect predicates 与配置转换类似。您可以指定一个 predicate 别名,将别名与转换关联,然后定义 predicate 的类型和配置。

先决条件

  • Debezium 环境运行 Apache Kafka 2.6 或更高版本。
  • 为 Debezium 连接器配置 SMT。

流程

  1. 在 Debezium 连接器配置中,为 predicates 参数指定一个 predicate 别名,如 IsOutboxTable
  2. 将 predicate 别名与您要有条件应用的转换关联,方法是将 predicate 别名附加到连接器配置中的转换别名中:

    transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>

    例如:

    transforms.outbox.predicate=IsOutboxTable
  3. 通过指定类型并为配置参数提供值来配置 predicate。

    1. 对于类型,指定 Kafka Connect 中可用的以下默认类型之一:

      • HasHeaderKey
      • RecordIsTombstone
      • TopicNameMatches

        例如:

        predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    2. 对于 TopicNameMatch 或 HasHeaderKey predicates,请为您要匹配的主题或标头名称指定一个正则表达式。

      例如:

      predicates.IsOutboxTable.pattern=outbox.event.*
  4. 如果要对条件进行求反,请将 negate 关键字附加到转换别名中,并将其设置为 true

    例如:

    transforms.outbox.negate=true

    前面的属性反转 predicate 匹配的一组记录,以便 Kafka Connect 将转换应用到与 predicate 中指定的条件不匹配的任何记录。

示例:用于 outbox 事件路由器转换的主题名称Match predicate

以下示例显示了一个 Debezium 连接器配置,它仅将 outbox 事件路由器转换应用到 Debezium 向 Kafka outbox.event.order 主题发送的信息。

由于只有对于来自 outbox 表 (outbox.event.*) 的信息 TopicNameMatch predicate 被评估为 true,所以源来自数据库中其他表的消息不会进行转换。

transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsOutboxTable.pattern=outbox.event.*

7.1.3. 忽略 tombstone 事件

您可以控制 Debezium 是否发出 tombstone 事件,以及 Kafka 保留它们的时长。根据您的数据管道,您可能需要为连接器设置 tombstones.on.delete 属性,以便 Debezium 不会发出 tombstone 事件。

是否启用 Debezium 来发出 tombstones 取决于您环境中如何使用主题以及 sink 消费者的特性。有些接收器连接器依赖于 tombstone 事件从下游数据存储中删除记录。如果接收器连接器依赖 tombstone 记录来指示何时删除下游数据存储中的记录,请将 Debezium 配置为发出它们。

当您将 Debezium 配置为生成 tombstones 时,需要进一步配置以确保接收器连接器收到 tombstone 事件。必须设置主题的保留策略,以便连接器在日志压缩过程中删除事件信息前有时间读取事件信息。主题在压缩控制前保留 tombstones 的时间长度,由主题的 delete.retention.ms 属性控制。

默认情况下,连接器的 tombstones.on.delete 属性设置为 true,以便连接器在每个删除事件后生成一个 tombstone。如果将属性设置为 false,以防止 Debezium 将 tombstone 记录保存到 Kafka 主题,则没有 tombstone 记录可能会导致意外的后果。Kafka 在日志压缩过程中依赖于 tombstone 来删除与已删除密钥相关的记录。

如果您需要支持接收器连接器或下游 Kafka 用户,它们无法使用 null 值处理记录,而不是阻止 Debezium 发出 tombstones,请考虑为带有 RecordIsTombstone predicate 类型的连接器配置 SMT,以便在消费者读取它们前删除 tombstone 消息。

流程

  • 要防止 Debezium 为删除的数据库记录发出 tombstone 事件,请将连接器的 tombstones.on.delete 选项设置为 false

    例如:

    “tombstones.on.delete”: “false”
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.