13.5. 过滤 Debezium 更改事件记录
默认情况下,Debezium 会发送它收到的每个数据更改事件。但是,在很多情况下,您可能只对制作者发出的事件的子集感兴趣。要只处理与您相关的记录,Debezium 提供了 过滤 单一消息转换 (SMT)。
虽然可以使用 Java 创建自定义 SMT 来对逻辑进行编码,但使用自定义代码的 SMT 具有它的缺陷。例如:
- 需要预先编译转换并将其部署到 Kafka Connect。
- 每次更改都需要代码重新编译和重新部署,从而导致不灵活的操作。
过滤器 SMT 支持脚本语言与 JSR 223 集成(用于 Java™ 平台)。
Debezium 不附带 JSR 223 API 的任何实施。要将表达式语言与 Debezium 搭配使用,您必须下载用于语言的 JSR 223 脚本引擎实施。根据您用于部署 Debezium 的方法,您可以从 Maven Central 自动下载所需的工件,也可以手动下载工件,然后将它们添加到 Debezium 连接器插件目录中,以及语言实施使用的任何其他 JAR 文件。
13.5.1. 设置 Debezium 过滤器 SMT
为安全起见,Debezium 连接器存档中没有包括过滤 SMT。相反,它会在单独的工件 debezium-scripting-2.3.4.Final.tar.gz
中提供。
如果您通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium 连接器,以使用过滤器 SMT,您必须明确下载 SMT 归档,并将文件与连接器插件一起部署。当使用 AMQ Streams 部署连接器时,它可以根据您在 Kafka Connect 自定义资源中指定的配置参数自动下载所需的工件。重要信息:在 Kafka Connect 实例中存在过滤器 SMT 后,允许向实例添加连接器的任何用户都可以运行脚本表达式。为确保脚本表达式只能由授权用户运行,请务必在添加过滤器 SMT 前保护 Kafka Connect 实例及其配置接口。
如果您从 Dockerfile 构建 Kafka Connect 容器镜像,则应用以下步骤。如果使用 AMQ Streams 创建 Kafka Connect 镜像,请按照您的连接器部署主题中的说明进行操作。
流程
-
在浏览器中,打开 Red Hat Integration 下载站点,并下载 Debezium 脚本 SMT 归档(
debezium-scripting-2.3.4.Final.tar.gz
)。 - 将存档的内容提取到 Kafka Connect 环境的 Debezium 插件目录中。
- 获取 JSR-223 脚本引擎实施,并将其内容添加到 Kafka Connect 环境的 Debezium 插件目录中。
- 重启 Kafka Connect 进程以获取新的 JAR 文件。
Groovy 语言在 classpath 中需要以下库:
-
groovy
-
groovy-json
(可选) -
groovy-jsr223
JavaScript 语言在 classpath 中需要以下库:
-
graalvm.js
-
graalvm.js.scriptengine
13.5.2. 示例: Debezium 基本过滤器 SMT 配置
您可以在 Debezium 连接器的 Kafka Connect 配置中配置过滤器转换。在配置中,您可以通过定义基于业务规则的过滤器条件来指定您感兴趣的事件。当过滤 SMT 处理事件流时,它会根据配置的过滤器条件评估每个事件。只有满足过滤器条件条件的事件才会传递给代理。
要配置 Debezium 连接器来过滤更改事件记录,请在 Debezium 连接器的 Kafka Connect 配置中配置 Filter
SMT。配置过滤器 SMT 要求您指定一个定义过滤条件的正则表达式。
例如,您可以在连接器配置中添加以下配置。
... transforms=filter transforms.filter.type=io.debezium.transforms.Filter transforms.filter.language=jsr223.groovy transforms.filter.condition=value.op == 'u' && value.before.id == 2 ...
前面的示例指定了 Groovy
表达式语言的使用。正则表达式 value.op == 'u' && value.before.id == 2
会删除所有消息,但代表更新(u
)的 id
值除外,其 id 值等于 2
。
自定义配置
前面的例子显示一个简单的 SMT 配置,它仅用于处理 DML 事件,其中包含一个 op
字段。连接器可能会发出的其他类型的信息(heartbeat 消息、tombstone 消息或有关模式更改和事务的元数据信息)不包含此字段。为了避免处理失败,您可以定义一个 SMT predicate 语句,该语句仅有选择地将转换 应用到特定的事件。
13.5.3. 过滤器表达式中使用的变量
Debezium 将某些变量绑定到过滤器 SMT 的评估上下文中。在创建表达式来指定过滤器条件时,您可以使用 Debezium 绑定到评估上下文的变量。通过绑定变量,Debebe 可让 SMT 查找和解释它们的值,因为它评估表达式中的条件。
下表列出了 Debezium 绑定到过滤器 SMT 的评估上下文的变量:
名称 | 描述 | 类型 |
---|---|---|
| 消息的一个关键信息。 |
|
| 消息值。 |
|
| message 键的 schema。 |
|
| message 值的 schema。 |
|
| 目标主题的名称。 | 字符串 |
|
消息标头的 Java 映射。key 字段是标头名称。
|
|
表达式可以在其变量上调用任意方法。表达式应该解析为布尔值,它决定了 SMT 分布消息的方式。当表达式中的过滤器条件评估为 true
时,消息会被保留。当过滤器条件评估为 false
时,消息将被删除。
表达式不应产生任何副作用。也就是说,它们不应修改它们通过的任何变量。
13.5.4. 有选择地应用过滤器转换的选项
除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括心跳消息,以及有关 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下任一方法配置连接器来有选择地应用 SMT:
- 为转换配置 SMT predicate。
- 对 SMT 使用 topic.regex 配置选项。
13.5.5. 过滤其他脚本语言的条件配置
表达过滤条件的方式取决于您使用的脚本语言。
例如,如 基本配置示例 所示,当您使用 Groovy
作为表达式语言时,以下表达式会删除所有消息,但更新记录除外,其 id
值设置为
2:
value.op == 'u' && value.before.id == 2
其他语言使用不同的方法来表达相同的条件。
Debezium MongoDB 连接器会将 after
和 patch
字段作为序列化 JSON 文档而不是结构发送。
要将过滤器 SMT 与 MongoDB 连接器搭配使用,您必须首先将 JSON 中的数组字段解压缩到单独的文档中。
您可以在表达式中使用 JSON 解析器为每个数组项生成单独的输出文档。例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json
构件添加到类路径,然后添加一个如 (new groovy.json.JsonSlurper ()).parseText (value.after).last_name == 'Kretchmar'
。
Javascript
如果使用 JavaScript 作为表达式语言,您可以调用 Struct#get ()
方法来指定过滤条件,如下例所示:
value.get('op') == 'u' && value.get('before').get('id') == 2
JavaScript with Graal.js
如果您使用带有 Graal.js 的 JavaScript 定义过滤条件,您可以使用一个与您与 Groovy 一起使用的方法类似。例如:
value.op == 'u' && value.before.id == 2
13.5.6. 配置过滤器转换的选项
下表列出了可与过滤器 SMT 一起使用的配置选项。
属性 | 默认 | 描述 |
为事件评估目标主题名称的可选正则表达式,以确定是否应用过滤逻辑。如果目标主题的名称与 | ||
编写表达式的语言。必须以 | ||
针对每个消息评估的表达式。必须评估一个布尔值,其中 | ||
|
指定转换如何处理
|