12.3. 根据事件内容将更改事件记录路由到主题


默认情况下,Debebe 将从表读取的所有更改事件流传输到单个静态主题。然而,在有些情况下,您可能希望根据事件内容将所选事件重新路由到其他主题。基于内容的路由消息的过程在 基于内容的路由 消息模式中进行了描述。要在 Debezium 中应用此模式,您可以使用基于内容的路由 单个消息转换 (SMT)来编写为每个事件评估的表达式。根据评估事件的方式,SMT 会将事件消息路由到原始目标主题,或者将其重新路由到您在表达式中指定的主题。

重要

Debezium 基于内容的路由 SMT 是一个技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围

虽然可以使用 Java 创建自定义 SMT 来编码路由逻辑,但使用自定义编码的 SMT 具有其缺陷。例如:

  • 需要预先编译转换并将其部署到 Kafka Connect。
  • 每个更改都需要代码重新编译和重新部署,从而造成不灵活的操作。

基于内容的路由 SMT 支持与 JSR 223 集成的脚本语言(用于 Java™ 平台)。

Debezium 不附带 JSR 223 API 的任何实现。要将表达式语言与 Debezium 搭配使用,您必须为语言下载 JSR 223 脚本引擎实施。根据您用于部署 Debezium 的方法,您可以自动从 Maven Central 下载所需的工件,也可以手动下载工件,然后将它们添加到 Debezium 连接器插件目录中,以及语言实施所使用的任何其他 JAR 文件中。

12.3.1. 设置 Debezium 基于 content-routing SMT

为安全起见,Debezium 连接器存档不包含基于内容的路由 SMT。相反,它会在单独的工件中提供 debezium-scripting-2.1.4.Final.tar.gz

如果通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium 连接器,要使用过滤器 SMT,您必须将 SMT 工件明确添加到 Kafka Connect 环境中。当使用 AMQ Streams 部署连接器时,它可以根据您在 Kafka Connect 自定义资源中指定的配置参数自动下载所需的工件。重要信息:在 Kafka Connect 实例中存在路由 SMT 后,允许向实例添加连接器的任何用户都可以运行脚本表达式。要确保脚本表达式只能由授权用户运行,请务必在添加路由 SMT 前保护 Kafka Connect 实例及其配置接口。

如果您从 Dockerfile 构建 Kafka Connect 容器镜像,则适用以下步骤。如果使用 AMQ Streams 创建 Kafka Connect 镜像,请按照连接器部署主题中的说明操作。

流程

  1. 从浏览器中,打开 Debezium 下载站点的红帽构建,并下载 Debezium 脚本 SMT 存档(debezium-scripting-2.1.4.Final.tar.gz)。
  2. 将存档的内容提取到 Kafka Connect 环境的 Debezium 插件目录中。
  3. 获取 JSR-223 脚本引擎实现,并将其内容添加到 Kafka Connect 环境的 Debezium 插件目录中。
  4. 重启 Kafka Connect 进程以获取新的 JAR 文件。

Groovy 语言在 classpath 上需要以下库:

  • groovy
  • groovy-json (可选)
  • groovy-jsr223

JavaScript 语言在 classpath 上需要以下库:

  • graalvm.js
  • graalvm.js.scriptengine

要将 Debezium 连接器配置为根据事件内容路由更改事件记录,您可以在连接器的 Kafka Connect 配置中配置 ContentBasedRouter SMT。

配置基于内容的路由 SMT 要求您指定一个定义过滤条件的正则表达式。在配置中,您可以创建一个定义路由条件的正则表达式。表达式定义了用于评估事件记录的模式。它还指定目的地主题的名称,其中路由与模式匹配的事件。您指定的模式可能会指定一个事件类型,如表插入、更新或删除操作。您还可以定义一个与特定列或行中值匹配的模式。

例如,要将所有更新(u)记录重新路由到 updates 主题,您可以在连接器配置中添加以下配置:

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...
Copy to Clipboard Toggle word wrap

前面的例子指定了使用 Groovy 表达式语言。

与模式不匹配的记录将路由到默认主题。

自定义配置

前面的示例演示了一个简单的 SMT 配置,它旨在只处理 DML 事件,其中包含 op 字段。连接器可能会发出的其他类型的消息(心跳消息、tombstone 消息或有关事务或模式更改的元数据消息)不包含此字段。为了避免处理失败,您可以定义一个 SMT predicate 语句,该语句只会将转换应用到 特定事件。

Debezium 将某些变量绑定到 SMT 的评估上下文中。当您创建表达式来指定控制路由目的地的条件时,SMT 可以查找并解释这些变量的值,以评估表达式中的条件。

下表列出了 Debezium 绑定到基于内容路由 SMT 的评估上下文的变量:

Expand
表 12.2. 基于内容的路由表达式变量
名称描述类型

key

消息的一个键。

org.apache.kafka.connect​.data​.Struct

value

消息的值。

org.apache.kafka.connect​.data​.Struct

keySchema

message 键的 schema。

org.apache.kafka.connect​.data​.Schema

valueSchema

消息值的 schema。

org.apache.kafka.connect​.data​.Schema

topic

目标主题的名称。

字符串

标头

消息标头的 Java 映射。key 字段是标头名称。headers 变量公开以下属性:

  • (类型为 Object
  • 模式 (类型为 org.apache.kafka​.connect​.data​.Schema

java.util.Map​<String,​ io.debezium​.transforms​.scripting​.RecordHeader>

表达式可以在其变量上调用任意方法。表达式应解析为布尔值,用来决定 SMT 如何处理消息。当表达式中的路由条件评估为 true 时,会保留消息。当路由条件评估为 false 时,会删除消息。

表达式不应产生任何副作用。也就是说,不应修改他们通过的任何变量。

除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

您表达基于内容的路由条件的方式取决于您使用的脚本语言。例如,如 基本配置示例所示,当您使用 Groovy 作为表达式语言时,以下表达式会将所有更新(u)记录重新路由到 更新 主题,而将其他记录路由到默认主题:

value.op == 'u' ? 'updates' : null
Copy to Clipboard Toggle word wrap

其他语言使用不同的方法表达相同的条件。

提示

Debezium MongoDB 连接器将 afterpatch 字段作为序列化 JSON 文档而不是作为结构发出。
要将 ContentBasedRouting SMT 与 MongoDB 连接器搭配使用,您必须首先将 JSON 中的数组字段取消回单独的文档。
您可以使用表达式中的 JSON 解析器为每个数组项目生成单独的输出文档。例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到 classpath 中,然后添加一个表达式,如 (new groovy.json.JsonSlurper ()).parseText (value.after).last_name == 'Kretchmar'

JavaScript

当使用 JavaScript 作为表达式语言时,您可以调用 StructVirtualMachineget () 方法来指定基于内容的路由条件,如下例所示:

value.get('op') == 'u' ? 'updates' : null
Copy to Clipboard Toggle word wrap

带有 Graal.js 的 JavaScript

当使用带有 Graal.js 的 JavaScript 来创建基于内容的路由条件时,您可以使用与 Groovy 一起使用的方法类似。例如:

value.op == 'u' ? 'updates' : null
Copy to Clipboard Toggle word wrap

12.3.6. 配置基于内容的路由转换的选项

Expand

属性

默认

描述

topic.regex

 

可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用条件逻辑。如果目标主题的名称与 topic.regex 中的值匹配,则转换会在将事件传递给主题之前应用条件逻辑。如果主题的名称与 topic.regex 中的值不匹配,则 SMT 会将事件传递给主题未修改。

language

 

写入表达式的语言。必须以 jsr223. 例如: jsr223.groovyjsr223.graal.js。Debezium 仅支持通过 JSR 223 API 进行引导("Java ™ Platform" 跟踪)。

topic.expression

 

要为每个消息评估的表达式。必须评估一个 String 值,其中出现非空将消息重新路由到新主题,而 null 值会将消息路由到默认主题。

null.handling.mode

keep

指定转换如何处理 null (tombstone)信息。您可以指定以下选项之一:

keep
(默认)传递消息。
drop
完全删除消息。
评估
将条件逻辑应用到消息。
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat