第 14 章 使用 post 处理器修改事件消息


后处理器执行轻量级、按消息修改,类似于由单一消息转换(SMT)执行的修改。但是,Debebe 会在事件链中提前调用 post 处理器,启用 post 处理器在消息移移到消息传递运行时前对消息进行操作。由于后处理器可以在 Debezium 上下文内处理消息,因此在修改事件有效负载而非转换时,它们更高效。

要使转换来修改消息,它必须重新创建消息的不可变 ConnectRecord,或者更安全地使用其 SourceRecord。相反,因为处理器在 Debezium 范围内操作,它可以对消息的事件有效负载中的 mutable Struct 类型进行操作,在 SourceRecord 构造前修改有效负载。与 Debezium 紧密集成为处理器提供对 Debezium 内部的访问,如 Debezium 元数据有关数据库连接、关系架构模型等。反过来,此访问提高了执行依赖此类内部信息的任务时的效率。例如,在处理器后,Reselect 列 可以自动重新查询数据库,以重新选择从原始更改事件中排除的记录和检索列。

Debezium 提供以下后处理器:

  • Reselect 列:: Re-selects specific 列可能未由更改事件提供,如 TOASTed 列或未由当前更改修改的 Oracle LOB 列。

14.1. 使用 reselect 列后处理器添加源字段以更改事件记录

为提高性能并减少存储开销,数据库可以为某些列使用外部存储。这种类型的存储用于存储大量数据的列,如 PostgreSQL TOAST ( Oversized-Attribute Storage Technique)、Oracle Large Object (LOB)或 Oracle Exadata Extended String 数据类型。为减少 I/O 开销并提高查询速度,当表行中数据更改时,数据库只检索包含新值的列,忽略外部存储列中的数据保持不变。因此,externally 存储的列的值不会记录在数据库日志中,Debezium 会在发出事件记录时省略列。下游用户接收省略所需值的事件记录可能会遇到处理错误。

当 Debezium 为事件发出记录时,事件的数据库日志条目中没有 IF 值,它会将缺少的值替换为 unavailable.value.placeholder sentinel 值。这些 sentinel 值插入到适当键入的字段中,例如,一个字节数组(以字节为单位)、字符串(字符串)或映射的键值映射。

要检索初始查询中没有的列的数据,您可以应用 Debezium reselect column post 处理器(ReselectColumnsPostProcessor)。您可以将后处理器配置为从表中重新选择一个或多个列。配置后,它会监控连接器为 reselection 指定的列名称发出的事件。当它检测到带有指定列的事件时,后处理器会重新排序源表以检索指定列的数据,并获取其当前状态。

您可以将 post 处理器配置为重新选择以下列类型:

  • null 列.
  • 包含 unavailable.value.placeholder sentinel 值的列。
注意

您只能在 Debezium SQL 数据库连接器中使用 ReselectColumnsPostProcessor post 处理器。

有关使用 ReselectColumnsPostProcessor post 处理器的详情,请查看以下主题:

14.1.1. 使用带有无键表的 Debezium ReselectColumnsPostProcessor

在处理器后,reselect 列会生成一个 reselect 查询,该查询返回要修改的行。为查询构建 WHERE 子句,默认情况下,后处理器使用关系表模型,该模型基于表的主键列或为表定义的唯一索引。

对于无键表,ReselectColumnsPostProcessor 提交可能会返回多行,在这种情况下,De Debezium 总是只使用第一行。您不能对返回的行的顺序进行优先排序。要启用 post 处理器为无键表返回一致的可用结果,最好指定一个可以识别唯一行的自定义键。自定义密钥必须能够根据列的组合在源表中唯一标识记录。

要定义此类自定义消息键,请在连接器配置中使用 message.key.columns 属性。在定义了自定义密钥后,将 reselect.use.event.key 配置属性设为 true。设置此选项可让后处理器使用指定的事件键字段作为选择条件,而不是主键列。确保测试配置以确保 reselection 查询提供了预期的结果。

14.1.2. 示例: Debezium ReselectColumnsPostProcessor 配置

配置后处理器类似于配置 自定义转换器或 单个消息 转换(SMT)。要启用连接器使用 ReselectColumnsPostProcessor,请在连接器配置中添加以下条目:

  "post.processors": "reselector", 1
  "reselector.type": "io.debezium.processors.reselect.ReselectColumnsPostProcessor", 2
  "reselector.reselect.columns.include.list": "<schema>.<table>:<column>,<schema>.<table>:<column>", 3
  "reselector.reselect.unavailable.values": "true", 4
  "reselector.reselect.null.values": "true" 5
  "reselector.reselect.use.event.key": "false" 6
描述

1

以逗号分隔的后处理器前缀列表。

2

后处理器的完全限定类类型名称。

3

以逗号分隔的列名称列表,使用以下格式指定:< schema> . <table>: < column>

4

启用或禁用包含 unavailable.value.placeholder sentinel 值的 reselection。

5

启用或禁用 Reselection of column,这些列为 null

6

启用或禁用基于 reselection 的事件键字段名称。

14.1.3. Debezium 重新选择处理器配置属性的描述

下表列出了您可以为 Reselect Columns post-processor 设置的配置选项。

表 14.1. 重新选择列后处理器配置选项

属性

默认

描述

reselect.columns.include.list

没有默认值

从源数据库重新选择的列名称的逗号分隔列表。如果设置 reselect.columns.exclude .list 属性,使用以下格式指定列名称:+<schema > . <table > :_<column>_

不会设置此属性。

reselect.columns.exclude.list

没有默认值

源数据库中以逗号分隔的列名称列表,以便从 reselection 中排除。如果设置 reselect.columns.include .list属性,使用以下格式指定列名称:+<schema > . <table > :_<column>_

不会设置此属性。

reselect.unavailable.values

true

指定 post 处理器是否重新选择与 reselect.columns.include.list 过滤器匹配的列,如果连接器的 unavailable.value.placeholder 属性提供了列值。

reselect.null.values

true

指定 post 处理器是否重新选择与 reselect.columns.include.list 过滤器匹配的列(如果列值为 null )。

reselect.use.event.key

false

指定 post 处理器是否根据事件的密钥字段名称重新选择,或使用关系表的主键列名称。

默认情况下,Reselection 查询基于关系表的主键列或唯一键索引。对于没有主键的表,将此属性设置为 true,并在连接器配置中配置 message.key.columns 属性,为连接器指定创建事件时要使用的自定义密钥。然后,后处理器使用指定的 key 字段名称作为 SQL reselection 查询中的主密钥。

更新于 2024-07-19

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.