第 12 章 应用转换以修改使用 Apache Kafka 交换的消息


Debezium 提供多个消息转换(SMT),可用于修改事件记录。您可以配置连接器来应用一个转换,在将记录发送到 Apache Kafka 前修改记录。您还可以将 Debezium SMT 应用到接收器连接器,以便在连接器从 Kafka 主题读取记录前修改记录。

如果要 只把转换应用到特定的消息,您可以配置 Kafka Connect predicate 来定义应用 SMT 的条件。

Debezium 提供以下 SMT:

主题路由器 SMT
根据应用于原始主题名称的正则表达式,将事件记录重新设置为特定主题。
基于内容的路由器 SMT
根据事件内容重新路由指定的更改事件记录。
事件记录更改 SMT
增强事件消息,以识别值在数据库操作后更改或保持不变的字段
消息过滤 SMT
允许您将事件记录的子集传播到目标 Kafka 主题。转换对连接器发出的更改事件记录(根据事件记录的内容)应用正则表达式。只有与表达式匹配的记录才会写入目标主题。其他记录将被忽略。
HeaderToValue SMT
从事件记录中提取指定的标头字段,然后将标头字段复制或移到事件记录中的值。
新的记录状态提取 SMT
将 Debezium 更改事件记录的复杂结构扁平化为简化的格式。简化的结构允许由接收器连接器处理,这些连接器无法使用原始结构。
MongoDB 新记录状态提取
简化 Debezium MongoDB 连接器更改事件记录的复杂结构。简化的结构允许由接收器连接器处理,这些连接器无法使用原始事件结构。
outbox 事件路由器 SMT
提供对 outbox 模式的支持,以启用在多个服务间的安全可靠数据交换。
MongoDB outbox 事件路由器 SMT
支持将 outbox 模式与 MongoDB 连接器一起使用,以启用在多个服务间的安全可靠数据交换。
分区路由 SMT
根据一个或多个指定有效负载字段的值将事件路由到特定的目标分区。
时区转换器 SMT
将事件记录中的 Debezium 和 Kafka Connect 时间戳字段转换为指定时区。

12.1. 使用 SMT predicates 有选择地应用转换

当您为连接器配置单个消息转换(SMT)时,您可以为转换定义 predicate。predicate 指定如何将转换条件应用到连接器进程的消息的子集。您可以分配 predicates 来转换您为源连接器配置,如 Debezium 或 sink 连接器。

12.1.1. 关于 SMT predicates

Debezium 提供多个消息转换(SMT),可用于在 Kafka Connect 将记录保存到 Kafka 主题前修改事件记录。默认情况下,当您为 Debezium 连接器配置其中一个 SMT 时,Kafka Connect 会将该转换应用到连接器发出的每个记录。但是,您可能有实例有选择地应用转换,以便它只修改共享共同特征的事件消息的子集。

例如,对于 Debezium 连接器,您可能希望仅在来自特定表的事件消息或包含特定标头键的事件信息上运行转换。在运行 Apache Kafka 2.6 或更高版本的环境中,您可以将 predicate 语句附加到转换中,以指示 Kafka Connect 只将 SMT 应用到特定的记录。在 predicate 中,您可以指定 Kafka Connect 用来评估它处理的每个消息的条件。当 Debezium 连接器发出更改事件消息时,Kafka Connect 会根据配置的 predicate 条件检查消息。如果事件消息条件为 true,Kafka Connect 会应用转换,然后将消息写入 Kafka 主题。与条件不匹配的消息会不修改地发送到 Kafka。

这种情况与您为接收器连接器 SMT 定义的 predicates 类似。连接器从 Kafka 主题读取信息,Kafka Connect 根据 predicate 条件评估信息。如果消息与条件匹配,Kafka Connect 会应用转换,然后将信息传递给接收器连接器。

在定义了 predicate 后,您可以重复使用它并将其应用到多个转换。predicates 还包括可用于反转 predicate 的 negate 选项,以便将 predicate 条件应用到与 predicate 语句中定义的条件 不匹配的 记录。您可以使用 negate 选项将 predicate 与基于负条件的其他转换配对。

predicate 元素

predicates 包含以下元素:

  • predicates 前缀
  • alias (例如,isOutboxTable
  • type (例如,org.apache.kafka.connect.transforms.predicates.TopicNameMatches)。Kafka Connect 提供了一组默认的 predicate 类型,您可以通过定义自己的自定义 predicates 来补充。
  • condition 语句和任何其他配置属性,具体取决于 predicate 的类型(例如,正则表达式命名模式)

默认 predicate 类型

默认提供以下 predicate 类型:

HasHeaderKey
在您希望 Kafka Connect 要评估的事件消息中指定一个键名称。对于包含具有指定名称的标头键的任何记录,predicate 会评估为 true。
RecordIsTombstone

匹配 Kafka tombstone 记录。对于具有 null 值的任何记录,predicate 会评估为 true。将此 predicate 与过滤器 SMT 结合使用,以删除 tombstone 记录。这个 predicate 没有配置参数。

Kafka 中的 tombstone 是一个记录,它有一个带有 0 字节、null 有效负载的密钥。当 Debezium 连接器在源数据库中处理 delete 操作时,连接器会为 delete 操作发出两个更改事件:

  • 提供数据库记录的先前值的删除操作(op" : "d")事件。
  • 有一个 tombstone 事件,它具有相同的键,但有一个 null 值。

    tombstone 代表行的一个删除标记。当为 Kafka 启用 日志 压缩时,在压缩 Kafka 过程中会删除与 tombstone 共享的所有事件。日志压缩会定期进行,由主题的 delete.retention.ms 设置控制的压缩间隔。

    虽然可以 配置 Debezium,使其不发出 tombstone 事件,但最好允许 Debezium 发出 tombstones 在日志压缩过程中维护预期的行为。限制 tombstones 可防止 Kafka 在日志压缩过程中删除删除密钥的记录。如果您的环境包含无法处理 tombstones 的接收器连接器,您可以将 sink 连接器配置为使用带有 RecordIsTombstone predicate 的 SMT 来过滤 tombstone 记录。

TopicNameMatches
指定您要 Kafka Connect 匹配的主题名称的正则表达式。连接器名称与指定正则表达式匹配的连接器记录为 true。使用此 predicate 根据源表的名称将 SMT 应用到记录。

12.1.2. 定义 SMT predicates

默认情况下,Kafka Connect 会将 Debezium 连接器配置中的每个单一消息转换应用到它从 Debezium 接收的每个更改事件记录。从 Apache Kafka 2.6 开始,您可以在控制 Kafka Connect 应用转换的连接器配置中定义 SMT predicate。predicate 语句定义了 Kafka Connect 将转换应用到 Debezium 发出的事件记录的条件。Kafka Connect 评估 predicate 语句,然后有选择地将 SMT 应用到与 predicate 中定义的条件匹配的记录子集。配置 Kafka Connect predicates 与配置转换类似。您可以指定 predicate 别名,将别名与转换关联,然后定义 predicate 的类型和配置。

先决条件

  • Debezium 环境运行 Apache Kafka 2.6 或更高版本。
  • 为 Debezium 连接器配置 SMT。

流程

  1. 在 Debezium 连接器配置中,为 predicates 参数指定 predicate 别名,例如 IsOutboxTable
  2. 通过将 predicate 别名附加到连接器配置中的转换中,将 predicate 别名与您要有条件地应用的转换相关联:

    transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>

    例如:

    transforms.outbox.predicate=IsOutboxTable
  3. 通过指定类型并为配置参数提供值来配置 predicate。

    1. 对于类型,指定 Kafka Connect 中提供的以下默认类型之一:

      • HasHeaderKey
      • RecordIsTombstone
      • TopicNameMatches

        例如:

        predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    2. 对于 TopicNameMatch 或 HasHeaderKey predicates,请为您要匹配的主题或标头名称指定一个正则表达式。

      例如:

      predicates.IsOutboxTable.pattern=outbox.event.*
  4. 如果要对条件进行求值,请将 negate 关键字附加到转换别名,并将它设为 true

    例如:

    transforms.outbox.negate=true

    前面的属性会反转 predicate 匹配的记录集合,以便 Kafka Connect 将转换应用到与 predicate 中指定的条件不匹配的任何记录。

示例: outbox 事件路由器转换的 TopicNameMatch predicate

以下示例显示了一个 Debezium 连接器配置,它只会将 outbox 事件路由器转换为 Debezium 发送到 Kafka outbox.event.order 主题的消息。

由于只有对于来自 outbox 表 (outbox.event.*) 的信息 TopicNameMatch predicate 被评估为 true,所以源来自数据库中其他表的消息不会进行转换。

transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsOutboxTable.pattern=outbox.event.*

12.1.3. 忽略 tombstone 事件

您可以控制 Debezium 是否发出 tombstone 事件,以及 Kafka 如何保留它们。根据您的数据管道,您可能需要为连接器设置 tombstones.on.delete 属性,以便 Debezium 不会发出 tombstone 事件。

是否启用 Debezium 来发出 tombstones 取决于环境中如何消耗主题,以及 sink 消费者的特性。一些接收器(sink)连接器依赖于 tombstone 事件从下游数据存储中删除记录。如果接收器连接器依赖 tombstone 记录来指示何时删除下游数据存储中的记录,请将 Debezium 配置为发出它们。

当您将 Debezium 配置为生成 tombstones 时,需要进一步配置以确保接收器连接器接收 tombstone 事件。必须设置主题的保留策略,以便连接器在 Kafka 在日志压缩过程中删除事件信息前有时间读取事件信息。主题在压缩前保留 tombstones 的时间长度由主题的 delete.retention.ms 属性控制。

默认情况下,连接器的 tombstones.on.delete 属性被设置为 true,以便连接器在每个 delete 事件后生成一个 tombstone。如果将属性设置为 false 以防止 Debezium 将 tombstone 记录保存到 Kafka 主题,没有 tombstone 记录可能会导致意外的后果。Kafka 在日志压缩过程中依赖于 tombstone 来删除与已删除密钥相关的记录。

如果您需要支持无法使用 null 值处理记录的接收器连接器或下游 Kafka 用户,而不是防止 Debezium 发出 tombstones,请考虑使用 RecordIsTombstone predicate 类型在消费者读取它们前删除 tombstone 信息。

流程

  • 要防止 Debezium 为删除的数据库记录发出 tombstone 事件,请将连接器的 tombstones.on.delete 选项设置为 false

    例如:

    “tombstones.on.delete”: “false”
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.