7.3. 根据事件内容将事件记录路由到主题


默认情况下,Debezium 将从表读取的所有更改事件流到单个静态主题。但是,在某些情况下,您可能希望根据事件内容将所选事件重新路由到其他主题。基于内容的路由消息的过程 在基于 Content 的路由 消息传递模式中描述。要在 Debezium 中应用此模式,您可以使用基于内容的路由 单一消息转换 (SMT)来编写为每个事件评估的表达式。根据事件的评估方式,SMT 会将事件消息路由到原始目标主题,或者将其重新路由到您在表达式中指定的主题。

虽然可以使用 Java 创建自定义 SMT 来编码路由逻辑,但使用自定义代码的 SMT 有它的缺陷。例如:

  • 需要提前编译转换并将其部署到 Kafka Connect。
  • 每个更改都需要重新编译和重新部署代码,从而导致无法灵活的操作。

基于内容的路由 SMT 支持与 JSR 223 集成的脚本语言(Java™ 平台强制执行)。

Debezium 不附带 JSR 223 API 的任何实现。要将表达式语言与 Debezium 搭配使用,您必须下载语言的 JSR 223 脚本引擎实施。根据用于部署 Debezium 的方法,您可以从 Maven Central 自动下载所需的工件,也可以手动下载工件,然后将它们添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。

7.3.1. 设置 Debezium content-based-routing SMT

为了安全起见,Debezium 连接器存档中不包含基于内容的路由 SMT。相反,它在单独的工件( debezium-scripting-2.7.3.Final.tar.gz )中提供。

如果您通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium 连接器,以便使用过滤器 SMT,您必须将 SMT 工件明确添加到 Kafka Connect 环境中。当使用 Streams for Apache Kafka 部署连接器时,它可以根据您在 Kafka Connect 自定义资源中指定的配置参数自动下载所需的工件。重要信息:在 Kafka Connect 实例中存在路由 SMT 后,任何允许向实例添加连接器的用户都可以运行脚本表达式。为确保脚本表达式只能由授权用户运行,请务必在添加路由 SMT 前保护 Kafka Connect 实例及其配置接口。

如果您从 Dockerfile 构建 Kafka Connect 容器镜像,则适用以下步骤。如果您使用 Streams for Apache Kafka 创建 Kafka Connect 镜像,请按照连接器的部署主题中的说明操作。

流程

  1. 在浏览器中,打开 Software Downloads,并下载 Debezium 脚本 SMT 归档(debezium-scripting-2.7.3.Final.tar.gz)。
  2. 将存档的内容提取到 Kafka Connect 环境的 Debezium 插件目录中。
  3. 获取 JSR-223 脚本引擎实施,并将其内容添加到 Kafka Connect 环境的 Debezium 插件目录中。
  4. 重启 Kafka Connect 进程以获取新的 JAR 文件。

Groovy 语言在 classpath 上需要以下库:

  • groovy
  • groovy-json (可选)
  • groovy-jsr223

JavaScript 语言需要 classpath 上的以下库:

  • graalvm.js
  • graalvm.js.scriptengine

7.3.2. 示例:Debezium 基本基于内容的路由配置

要将 Debezium 连接器配置为根据事件内容路由更改事件记录,您可以在连接器的 Kafka Connect 配置中配置 ContentBasedRouter SMT。

配置基于内容的路由 SMT 要求您指定一个定义过滤条件的正则表达式。在配置中,您要创建一个定义路由条件的正则表达式。表达式定义了用于评估事件记录的模式。它还指定与模式匹配的事件所路由的目标主题的名称。您指定的模式可能会指定事件类型,如表插入、更新或删除操作。您还可以定义一个与特定列或行中值匹配的模式。

例如,要将所有更新(u)记录重新路由到 更新 主题,您可以在连接器配置中添加以下配置:

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...

前面的示例指定了使用 Groovy 表达式语言。

与模式匹配的记录会路由到默认主题。

自定义配置

前面的示例演示了一个简单的 SMT 配置,它旨在仅处理 DML 事件,其中包含 op 字段。连接器可能会发出的其他类型的消息(heartbeat 消息、tombstone 消息或有关事务或模式更改的元数据消息)不包含此字段。为避免处理失败,您可以定义 SMT predicate 语句,该语句仅有选择地将转换 应用到特定的事件。

Debezium 将特定的变量绑定到 SMT 的评估上下文中。当您创建表达式来指定控制路由目的地的条件时,SMT 可以查找并解释这些变量的值来评估表达式中的条件。

下表列出了 Debezium 绑定到基于内容的路由 SMT 的评估上下文的变量:

Expand
表 7.2. 基于内容的路由表达式变量
名称描述类型

key

消息的密钥。

org.apache.kafka.connect​.data​.Struct

value

消息值。

org.apache.kafka.connect​.data​.Struct

keySchema

消息键的 schema。

org.apache.kafka.connect​.data​.Schema

valueSchema

消息值的 schema。

org.apache.kafka.connect​.data​.Schema

topic

目标主题的名称。

字符串

标头

消息标头的 Java 映射。key 字段是标头名称。headers 变量公开以下属性:

  • (类型为 Object
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String,​ io.debezium​.transforms​.scripting​.RecordHeader>

表达式可以对其变量调用任意方法。表达式应解析为布尔值,用于决定 SMT 如何组成消息。当表达式中的路由条件评估为 true 时,消息会被保留。当路由条件评估为 false 时,消息会被删除。

表达式不应产生任何副作用。也就是说,它们不应该修改它们通过的任何变量。

除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括 heartbeat 信息,以及有关 schema 更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

您表达基于内容的路由条件的方式取决于您使用的脚本语言。例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式会将所有更新(u)记录路由到 更新 主题,同时将其他记录路由到默认主题:

value.op == 'u' ? 'updates' : null

其他语言使用不同的方法来表达相同的条件。

提示

Debezium MongoDB 连接器会将 afterpatch 字段作为序列化 JSON 文档而不是结构发送。
要将 ContentBasedRouting SMT 与 MongoDB 连接器搭配使用,您必须首先将 JSON 中的数组字段解压缩到单独的文档中。
您可以使用表达式中的 JSON 解析器为每个数组项目生成单独的输出文档。例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到 classpath 中,然后添加一个表达式,如 (new groovy.json.JsonSlurper ()).parseText (value.after).last_name == 'Kretchmar'.

JavaScript

当使用 JavaScript 作为表达式语言时,您可以调用 Struct""get () 方法来指定基于内容的路由条件,如下例所示:

value.get('op') == 'u' ? 'updates' : null

JavaScript with Graal.js

当使用 JavaScript 和 Graal.js 创建基于内容的路由条件时,您可以使用类似 Groovy 的方法。例如:

value.op == 'u' ? 'updates' : null

7.3.6. 配置基于内容的路由转换的选项

Expand

属性

默认

描述

topic.regex

 

可选正则表达式,用于评估事件的目标主题名称,以确定是否应用条件逻辑。如果目标主题的名称与 topic.regex 中的值匹配,则转换会在将事件传递给主题前应用条件逻辑。如果主题的名称与 topic.regex 中的值不匹配,则 SMT 会将事件传递给未修改的主题。

language

 

编写表达式的语言。必须以 jsr223. 开始,例如 jsr223.groovyjsr223.graal.js。Debezium 仅支持通过 JSR 223 API ("Java ™ 平台") 进行引导。

topic.expression

 

要为每个消息评估的表达式。必须评估一个 String 值,其中结果为非null 将消息重新路由到新主题,而 null 值会将消息路由到默认主题。

null.handling.mode

keep

指定转换如何处理 null (tombstone)消息。您可以指定以下选项之一:

keep
(默认)通过传递信息。
drop
完全删除消息。
评估
将条件逻辑应用到消息。
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部