12.3. 根据事件内容将更改事件记录路由到主题
默认情况下,Debebe 将从表读取的所有更改事件流传输到单个静态主题。然而,在有些情况下,您可能希望根据事件内容将所选事件重新路由到其他主题。基于内容的路由消息的过程在 基于内容的路由 消息模式中进行了描述。要在 Debezium 中应用此模式,您可以使用基于内容的路由 单个消息转换 (SMT)来编写为每个事件评估的表达式。根据评估事件的方式,SMT 会将事件消息路由到原始目标主题,或者将其重新路由到您在表达式中指定的主题。
Debezium 基于内容的路由 SMT 是一个技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围。
虽然可以使用 Java 创建自定义 SMT 来编码路由逻辑,但使用自定义编码的 SMT 具有其缺陷。例如:
- 需要预先编译转换并将其部署到 Kafka Connect。
- 每个更改都需要代码重新编译和重新部署,从而造成不灵活的操作。
基于内容的路由 SMT 支持与 JSR 223 集成的脚本语言(用于 Java™ 平台)。
Debezium 不附带 JSR 223 API 的任何实现。要将表达式语言与 Debezium 搭配使用,您必须为语言下载 JSR 223 脚本引擎实施。根据您用于部署 Debezium 的方法,您可以自动从 Maven Central 下载所需的工件,也可以手动下载工件,然后将它们添加到 Debezium 连接器插件目录中,以及语言实施所使用的任何其他 JAR 文件中。
12.3.1. 设置 Debezium 基于 content-routing SMT 复制链接链接已复制到粘贴板!
为安全起见,Debezium 连接器存档不包含基于内容的路由 SMT。相反,它会在单独的工件中提供 debezium-scripting-2.1.4.Final.tar.gz。
如果通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium 连接器,要使用过滤器 SMT,您必须将 SMT 工件明确添加到 Kafka Connect 环境中。当使用 AMQ Streams 部署连接器时,它可以根据您在 Kafka Connect 自定义资源中指定的配置参数自动下载所需的工件。重要信息:在 Kafka Connect 实例中存在路由 SMT 后,允许向实例添加连接器的任何用户都可以运行脚本表达式。要确保脚本表达式只能由授权用户运行,请务必在添加路由 SMT 前保护 Kafka Connect 实例及其配置接口。
如果您从 Dockerfile 构建 Kafka Connect 容器镜像,则适用以下步骤。如果使用 AMQ Streams 创建 Kafka Connect 镜像,请按照连接器部署主题中的说明操作。
流程
-
从浏览器中,打开 Debezium 下载站点的红帽构建,并下载 Debezium 脚本 SMT 存档(
debezium-scripting-2.1.4.Final.tar.gz)。 - 将存档的内容提取到 Kafka Connect 环境的 Debezium 插件目录中。
- 获取 JSR-223 脚本引擎实现,并将其内容添加到 Kafka Connect 环境的 Debezium 插件目录中。
- 重启 Kafka Connect 进程以获取新的 JAR 文件。
Groovy 语言在 classpath 上需要以下库:
-
groovy -
groovy-json(可选) -
groovy-jsr223
JavaScript 语言在 classpath 上需要以下库:
-
graalvm.js -
graalvm.js.scriptengine
12.3.2. 示例: Debezium 基本基于内容的路由配置 复制链接链接已复制到粘贴板!
要将 Debezium 连接器配置为根据事件内容路由更改事件记录,您可以在连接器的 Kafka Connect 配置中配置 ContentBasedRouter SMT。
配置基于内容的路由 SMT 要求您指定一个定义过滤条件的正则表达式。在配置中,您可以创建一个定义路由条件的正则表达式。表达式定义了用于评估事件记录的模式。它还指定目的地主题的名称,其中路由与模式匹配的事件。您指定的模式可能会指定一个事件类型,如表插入、更新或删除操作。您还可以定义一个与特定列或行中值匹配的模式。
例如,要将所有更新(u)记录重新路由到 updates 主题,您可以在连接器配置中添加以下配置:
前面的例子指定了使用 Groovy 表达式语言。
与模式不匹配的记录将路由到默认主题。
自定义配置
前面的示例演示了一个简单的 SMT 配置,它旨在只处理 DML 事件,其中包含 op 字段。连接器可能会发出的其他类型的消息(心跳消息、tombstone 消息或有关事务或模式更改的元数据消息)不包含此字段。为了避免处理失败,您可以定义一个 SMT predicate 语句,该语句只会将转换应用到 特定事件。
12.3.3. 在 Debezium 内容的路由表达式中使用的变量 复制链接链接已复制到粘贴板!
Debezium 将某些变量绑定到 SMT 的评估上下文中。当您创建表达式来指定控制路由目的地的条件时,SMT 可以查找并解释这些变量的值,以评估表达式中的条件。
下表列出了 Debezium 绑定到基于内容路由 SMT 的评估上下文的变量:
| 名称 | 描述 | 类型 |
|---|---|---|
|
| 消息的一个键。 |
|
|
| 消息的值。 |
|
|
| message 键的 schema。 |
|
|
| 消息值的 schema。 |
|
|
| 目标主题的名称。 | 字符串 |
|
|
消息标头的 Java 映射。key 字段是标头名称。
|
|
表达式可以在其变量上调用任意方法。表达式应解析为布尔值,用来决定 SMT 如何处理消息。当表达式中的路由条件评估为 true 时,会保留消息。当路由条件评估为 false 时,会删除消息。
表达式不应产生任何副作用。也就是说,不应修改他们通过的任何变量。
12.3.4. 用于有选择地应用基于内容的路由转换的选项 复制链接链接已复制到粘贴板!
除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:
- 为转换配置 SMT predicate。
- 对 SMT 使用 topic.regex 配置选项。
12.3.5. 为其他脚本语言配置基于内容的路由条件 复制链接链接已复制到粘贴板!
您表达基于内容的路由条件的方式取决于您使用的脚本语言。例如,如 基本配置示例所示,当您使用 Groovy 作为表达式语言时,以下表达式会将所有更新(u)记录重新路由到 更新 主题,而将其他记录路由到默认主题:
value.op == 'u' ? 'updates' : null
value.op == 'u' ? 'updates' : null
其他语言使用不同的方法表达相同的条件。
Debezium MongoDB 连接器将 after 和 patch 字段作为序列化 JSON 文档而不是作为结构发出。
要将 ContentBasedRouting SMT 与 MongoDB 连接器搭配使用,您必须首先将 JSON 中的数组字段取消回单独的文档。
您可以使用表达式中的 JSON 解析器为每个数组项目生成单独的输出文档。例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到 classpath 中,然后添加一个表达式,如 (new groovy.json.JsonSlurper ()).parseText (value.after).last_name == 'Kretchmar'。
JavaScript
当使用 JavaScript 作为表达式语言时,您可以调用 StructVirtualMachineget () 方法来指定基于内容的路由条件,如下例所示:
value.get('op') == 'u' ? 'updates' : null
value.get('op') == 'u' ? 'updates' : null
带有 Graal.js 的 JavaScript
当使用带有 Graal.js 的 JavaScript 来创建基于内容的路由条件时,您可以使用与 Groovy 一起使用的方法类似。例如:
value.op == 'u' ? 'updates' : null
value.op == 'u' ? 'updates' : null
12.3.6. 配置基于内容的路由转换的选项 复制链接链接已复制到粘贴板!
| 属性 | 默认 | 描述 |
|
可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用条件逻辑。如果目标主题的名称与 | ||
|
写入表达式的语言。必须以 | ||
|
要为每个消息评估的表达式。必须评估一个 | ||
|
|
指定转换如何处理
|