7.5. 过滤 Debezium 更改事件记录


默认情况下,Debezium 提供它接收到 Kafka 代理的每个数据更改事件。但是,在很多情况下,您可能只对生成者发出的事件的子集感兴趣。要只处理与您相关的记录,Debezium 提供了 过滤器 单个消息转换 (SMT)。

虽然可以使用 Java 创建自定义 SMT 来编码过滤逻辑,但使用自定义代码的 SMT 有它的缺陷。例如:

  • 需要提前编译转换并将其部署到 Kafka Connect。
  • 每个更改都需要重新编译和重新部署代码,从而导致无法灵活的操作。

过滤器 SMT 支持与 JSR 223 集成的脚本语言(Java™ 平台强制执行)。

Debezium 不附带 JSR 223 API 的任何实现。要将表达式语言与 Debezium 搭配使用,您必须下载语言的 JSR 223 脚本引擎实施。根据用于部署 Debezium 的方法,您可以从 Maven Central 自动下载所需的工件,也可以手动下载工件,然后将它们添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。

7.5.1. 设置 Debezium 过滤器 SMT

为了安全起见,Debezium 连接器存档中不包含过滤器 SMT。相反,它在单独的工件( debezium-scripting-2.7.3.Final.tar.gz )中提供。

如果您通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium 连接器,以便使用过滤器 SMT,您必须明确下载 SMT 归档,并与连接器插件一起部署文件。当使用 Streams for Apache Kafka 部署连接器时,它可以根据您在 Kafka Connect 自定义资源中指定的配置参数自动下载所需的工件。重要信息:在 Kafka Connect 实例中存在过滤器 SMT 后,任何允许向实例添加连接器的用户都可以运行脚本表达式。为确保脚本表达式只能由授权用户运行,请务必在添加过滤器 SMT 前保护 Kafka Connect 实例及其配置接口。

如果您从 Dockerfile 构建 Kafka Connect 容器镜像,则适用以下步骤。如果您使用 Streams for Apache Kafka 创建 Kafka Connect 镜像,请按照连接器的部署主题中的说明操作。

流程

  1. 在浏览器中,打开 Software Downloads,并下载 Debezium 脚本 SMT 归档(debezium-scripting-2.7.3.Final.tar.gz)。
  2. 将存档的内容提取到 Kafka Connect 环境的 Debezium 插件目录中。
  3. 获取 JSR-223 脚本引擎实施,并将其内容添加到 Kafka Connect 环境的 Debezium 插件目录中。
  4. 重启 Kafka Connect 进程以获取新的 JAR 文件。

Groovy 语言在 classpath 上需要以下库:

  • groovy
  • groovy-json (可选)
  • groovy-jsr223

JavaScript 语言需要 classpath 上的以下库:

  • graalvm.js
  • graalvm.js.scriptengine

7.5.2. 示例:Debezium 基本过滤器 SMT 配置

您可以在 Debezium 连接器的 Kafka Connect 配置中配置过滤器转换。在配置中,您可以通过定义基于业务规则的过滤条件来指定您感兴趣的事件。当过滤器 SMT 处理事件流时,它会根据配置的过滤器条件评估每个事件。只有满足过滤器条件条件的事件才会传递给代理。

要将 Debezium 连接器配置为过滤更改事件记录,请在 Debezium 连接器的 Kafka Connect 配置中配置 Filter SMT。过滤器 SMT 配置要求您指定一个定义过滤条件的正则表达式。

例如,您可以在连接器配置中添加以下配置。

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...
Copy to Clipboard Toggle word wrap

前面的示例指定了使用 Groovy 表达式语言。正则表达式 value.op == 'u' && value.before.id == 2 会删除所有消息,除了代表与 2 相等的 id 值的 update (u)记录。

自定义配置

前面的示例演示了一个简单的 SMT 配置,它旨在仅处理 DML 事件,其中包含 op 字段。连接器可能会发出的其他类型的消息(heartbeat 消息、tombstone 消息或架构更改和事务的元数据消息)不包含此字段。为避免处理失败,您可以定义 SMT predicate 语句,该语句仅有选择地将转换 应用到特定的事件。

7.5.3. 在过滤表达式中使用的变量

Debezium 将特定的变量绑定到过滤器 SMT 的评估上下文中。当您创建表达式来指定过滤器条件时,您可以使用 Debezium 绑定到评估上下文的变量。通过绑定变量,Debebe 可让 SMT 查找并解释它们的值,因为它在表达式中评估条件。

下表列出了 Debezium 绑定到过滤器 SMT 的评估上下文的变量:

Expand
表 7.4. 过滤表达式变量
名称描述类型

key

消息的密钥。

org.apache.kafka.connect​.data​.Struct

value

消息值。

org.apache.kafka.connect.data​.Struct

keySchema

消息键的 schema。

org.apache.kafka.connect​.data​.Schema

valueSchema

消息值的 schema。

org.apache.kafka.connect​.data​.Schema

topic

目标主题的名称。

字符串

header

消息标头的 Java 映射。key 字段是标头名称。标头 变量公开以下属性:

  • (类型为 Object
  • schema (of type org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

表达式可以对其变量调用任意方法。表达式应解析为布尔值,用于决定 SMT 如何组成消息。当表达式中的过滤器条件评估为 true 时,消息会被保留。当过滤器条件评估为 false 时,消息会被删除。

表达式不应产生任何副作用。也就是说,它们不应该修改它们通过的任何变量。

7.5.4. 用于有选择地应用过滤器转换的选项

除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括 heartbeat 信息,以及有关 schema 更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

7.5.5. 过滤其他脚本语言的条件配置

表达过滤条件的方式取决于您使用的脚本语言。

例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式会删除所有消息,除了 id 值设置为 2 的更新记录外:

value.op == 'u' && value.before.id == 2
Copy to Clipboard Toggle word wrap

其他语言使用不同的方法来表达相同的条件。

提示

Debezium MongoDB 连接器会将 afterpatch 字段作为序列化 JSON 文档而不是结构发送。
要将过滤器 SMT 与 MongoDB 连接器搭配使用,您必须首先将 JSON 中的数组字段解压缩到单独的文档中。
您可以使用表达式中的 JSON 解析器为每个数组项目生成单独的输出文档。例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到 classpath 中,然后添加一个表达式,如 (new groovy.json.JsonSlurper ()).parseText (value.after).last_name == 'Kretchmar'.

JavaScript

如果使用 JavaScript 作为表达式语言,您可以调用 Struct"get () 方法来指定过滤条件,如下例所示:

value.get('op') == 'u' && value.get('before').get('id') == 2
Copy to Clipboard Toggle word wrap

JavaScript with Graal.js

如果您使用 JavaScript 和 Graal.js 定义过滤条件,您可以使用与您与 Groovy 搭配使用的方法类似。例如:

value.op == 'u' && value.before.id == 2
Copy to Clipboard Toggle word wrap

7.5.6. 配置过滤器转换的选项

下表列出了您可以在过滤器 SMT 中使用的配置选项。

Expand
表 7.5. 过滤 SMT 配置选项

属性

默认

描述

topic.regex

 

可选正则表达式,用于评估事件的目标主题名称,以确定是否应用过滤逻辑。如果目标主题的名称与 topic.regex 中的值匹配,则转换会在将事件传递给主题前应用过滤器逻辑。如果主题的名称与 topic.regex 中的值不匹配,则 SMT 会将事件传递给未修改的主题。

language

 

编写表达式的语言。必须以 jsr223. 开始,例如 jsr223.groovyjsr223.graal.js。Debezium 仅支持通过 JSR 223 API ("Java ™ 平台") 进行引导。

condition

 

要为每个消息评估的表达式。必须评估为 true 结果保留消息的布尔值,结果为 false 会删除它。

null.handling.mode

keep

指定转换如何处理 null (tombstone)消息。您可以指定以下选项之一:

keep
(默认)通过传递信息。
drop
完全删除消息。
评估
将过滤器条件应用到消息。
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat