第 9 章 使用后处理器修改事件信息


后处理器执行轻量级、每个消息变异,类似于由单一消息转换(SMT)执行的修改。但是,Debezium 在事件链早期调用 post 处理器,而不是转换,使后处理器在信息被移交到消息传递运行时前对其进行响应。因为后处理器可以在 Debezium 上下文内对消息做出反应,因此在修改事件有效负载而不是转换时效率更高。

若要进行转换以修改消息,它必须重新创建消息的不可变 ConnectRecord,或者更便于其 SourceRecord。相反,由于 Debezium 范围内的一个后处理器的行为,它可以在消息的事件有效负载中操作 mutable Struct 类型,在 SourceRecord 构建前修改有效负载。关闭与 Debezium 集成后处理器提供对 Debezium 内部的访问,如有关数据库连接、关系模式模型等的 Debezium 元数据。反过来,这种访问在执行依赖于此类内部信息的任务时会提高效率。例如,在处理器后 Reselect 列 可以自动重新查询数据库来重新选择记录和检索原始更改事件中排除的列。

Debezium 提供以下后处理器:

Reselect column
重新选择可能不会由更改事件提供的特定列,如 TOASTed 列或 Oracle LOB 列没有被当前更改修改。

9.1. 使用后重新选择列来添加源字段来更改事件记录

为了提高性能并减少存储开销,数据库可以将外部存储用于某些列。这种类型的存储用于存储大量数据的列,如 PostgreSQL TOAST ( Oversized-Attribute Storage Technique)、Oracle Large Object (LOB)或 Oracle Exadata Extended String 数据类型。要减少 I/O 开销并增加查询速度,当表行中数据发生变化时,数据库仅检索包含新值的列,忽略外部存储列中的数据,而在外部存储列中保持不变。因此,外部存储列的值不会记录在数据库日志中,Debezium 随后会在发送事件记录时省略列。下游用户接收忽略所需值的事件记录可能会遇到处理错误。

当 Debezium 为事件发出记录时,外部存储列的值不会出现在数据库日志条目中,当 Debezium 为事件发出记录时,它会将缺少的值替换为 unavailable.value.placeholder sentinel 值。这些发送的值会插入到正确输入的字段中,例如,一个字节数组(以字节为单位)、字符串(字符串)或映射的键值映射。

要检索初始查询中没有的列的数据,您可以应用 Debezium 重新选择处理器后列(ReselectColumnsPostProcessor)。您可以将后处理器配置为从表中重新选择一个或多个列。配置后,它会监控连接器为 reselection 指定的列名称发出的事件。当它检测到带有指定列的事件时,处理器会重新排队源表以检索指定列的数据,并获取其当前状态。

您可以将后处理器配置为重新选择以下列类型:

  • null 列.
  • 包含 unavailable.value.placeholder sendinel 值的列。
注意

您只能在 Debezium 源连接器中使用 ReselectColumnsPostProcessor 后处理器。
后处理器不适用于 Debezium JDBC 接收器连接器。

有关使用 ReselectColumnsPostProcessor 后处理器的详情,请查看以下主题:

9.1.1. 使用带有无键表的 Debezium ReselectColumnsPostProcessor

处理器后再选一列会生成一个 reselect 查询,用于返回要修改的行。要构建查询的 WHERE 子句,默认情况下,后处理器使用关系表模型,该模型基于表的主键列或为表定义的唯一索引。

对于无键表,ReselectColumnsPostProcessor 提交的 SELECT 查询可能会返回多行,在这种情况下,Debezium 始终使用第一行。您无法对返回的行的顺序进行优先排序。要让后处理器为无键表返回持续可用的结果,最好指定可识别唯一行的自定义键。自定义键必须能够根据列的组合在源表中唯一识别记录。

要定义这样的自定义消息键,请在连接器配置中使用 message.key.columns 属性。在定义了自定义密钥后,将 reselect.use.event.key 配置属性设为 true。设置此选项可让后处理器使用指定的事件密钥字段作为选择条件,以代替主键列。确保测试配置以确保 reselection 查询提供预期的结果。

9.1.2. 示例:Debezium ReselectColumnsPostProcessor 配置

配置后处理器与配置 自定义转换器单一消息转换(SMT) 类似。要启用连接器使用 ReselectColumnsPostProcessor,请在连接器配置中添加以下条目:

  "post.processors": "reselector", 1
  "reselector.type": "io.debezium.processors.reselect.ReselectColumnsPostProcessor", 2
  "reselector.reselect.columns.include.list": "<schema>.<table>:<column>,<schema>.<table>:<column>", 3
  "reselector.reselect.unavailable.values": "true", 4
  "reselector.reselect.null.values": "true" 5
  "reselector.reselect.use.event.key": "false" 6
描述

1

以逗号分隔的后处理器前缀列表。

2

post 处理器的完全限定域名。

3

以逗号分隔的列名称列表,使用以下格式指定: < schema> . <table>: < column>

4

启用或禁用包含 unavailable.value.placeholder sentinel 值的列的重新选择。

5

启用或禁用是 null 的列的重新选择。

6

启用或禁用基于 reselection 的事件键字段名称。

9.1.3. Debezium 在处理器配置属性后重新选择列的描述

下表列出了您可以为 Reselect Columns post-processor 设置的配置选项。

表 9.1. Reselect column post processor configuration options

属性

默认

描述

reselect.columns.include.list

没有默认值

以逗号分隔的列名称列表,以便从源数据库重新选择。使用以下格式指定列名称:

<schema>. &lt;table>: &lt;column>

如果设置了 reselect.columns.exclude.list 属性,则不要设置此属性。

reselect.columns.exclude.list

没有默认值

源数据库中以逗号分隔的列名称列表,以便从 reselection 中排除。使用以下格式指定列名称:

<schema>. &lt;table>: &lt;column>

如果设置了 reselect.columns.include.list 属性,则不要设置此属性。

reselect.unavailable.values

true

如果列值是由连接器的 unavailable.value.placeholder 属性提供的,则指定 post 处理器是否取消选择与 reselect.columns.include.list 过滤器匹配的列。

reselect.null.values

true

指定后处理器是否选择与 reselect.columns.include.list 过滤器匹配的列(如果列值为 null )。

reselect.use.event.key

false

根据事件的 key 字段名称,指定后处理器是否选择,或使用关系表的主键列名称。

默认情况下,reselection 查询基于关系表的主键列或唯一键索引。对于没有主键的表,将此属性设置为 true,并在连接器配置中配置 message.key.columns 属性,以指定连接器在创建事件时使用的自定义密钥。然后,后处理器使用指定的 key 字段名称作为 SQL 重新选择查询中的主键。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.