第 14 章 使用 post 处理器修改事件消息
后处理器执行轻量级、按消息修改,类似于由单一消息转换(SMT)执行的修改。但是,Debebe 会在事件链中提前调用 post 处理器,启用 post 处理器在消息移移到消息传递运行时前对消息进行操作。由于后处理器可以在 Debezium 上下文内处理消息,因此在修改事件有效负载而非转换时,它们更高效。
要使转换来修改消息,它必须重新创建消息的不可变 ConnectRecord
,或者更安全地使用其 SourceRecord
。相反,因为处理器在 Debezium 范围内操作,它可以对消息的事件有效负载中的 mutable Struct
类型进行操作,在 SourceRecord
构造前修改有效负载。与 Debezium 紧密集成为处理器提供对 Debezium 内部的访问,如 Debezium 元数据有关数据库连接、关系架构模型等。反过来,此访问提高了执行依赖此类内部信息的任务时的效率。例如,在处理器后,Reselect 列 可以自动重新查询数据库,以重新选择从原始更改事件中排除的记录和检索列。
Debezium 提供以下后处理器:
- Reselect 列:: Re-selects specific 列可能未由更改事件提供,如 TOASTed 列或未由当前更改修改的 Oracle LOB 列。
14.1. 使用 reselect 列后处理器添加源字段以更改事件记录
为提高性能并减少存储开销,数据库可以为某些列使用外部存储。这种类型的存储用于存储大量数据的列,如 PostgreSQL TOAST ( Oversized-Attribute Storage Technique)、Oracle Large Object (LOB)或 Oracle Exadata Extended String 数据类型。为减少 I/O 开销并提高查询速度,当表行中数据更改时,数据库只检索包含新值的列,忽略外部存储列中的数据保持不变。因此,externally 存储的列的值不会记录在数据库日志中,Debezium 会在发出事件记录时省略列。下游用户接收省略所需值的事件记录可能会遇到处理错误。
当 Debezium 为事件发出记录时,事件的数据库日志条目中没有 IF 值,它会将缺少的值替换为 unavailable.value.placeholder
sentinel 值。这些 sentinel 值插入到适当键入的字段中,例如,一个字节数组(以字节为单位)、字符串(字符串)或映射的键值映射。
要检索初始查询中没有的列的数据,您可以应用 Debezium reselect column post 处理器(ReselectColumnsPostProcessor
)。您可以将后处理器配置为从表中重新选择一个或多个列。配置后,它会监控连接器为 reselection 指定的列名称发出的事件。当它检测到带有指定列的事件时,后处理器会重新排序源表以检索指定列的数据,并获取其当前状态。
您可以将 post 处理器配置为重新选择以下列类型:
-
null
列. -
包含
unavailable.value.placeholder
sentinel 值的列。
您只能在 Debezium SQL 数据库连接器中使用 ReselectColumnsPostProcessor
post 处理器。
有关使用 ReselectColumnsPostProcessor
post 处理器的详情,请查看以下主题:
14.1.1. 使用带有无键表的 Debezium ReselectColumnsPostProcessor
在处理器后,reselect 列会生成一个 reselect 查询,该查询返回要修改的行。为查询构建 WHERE
子句,默认情况下,后处理器使用关系表模型,该模型基于表的主键列或为表定义的唯一索引。
对于无键表,ReselectColumnsPostProcessor
提交可能会返回多行,在这种情况下,De Debezium 总是只使用第一行。您不能对返回的行的顺序进行优先排序。要启用 post 处理器为无键表返回一致的可用结果,最好指定一个可以识别唯一行的自定义键。自定义密钥必须能够根据列的组合在源表中唯一标识记录。
要定义此类自定义消息键,请在连接器配置中使用 message.key.columns
属性。在定义了自定义密钥后,将 reselect.use.event.key
配置属性设为 true
。设置此选项可让后处理器使用指定的事件键字段作为选择条件,而不是主键列。确保测试配置以确保 reselection 查询提供了预期的结果。
14.1.2. 示例: Debezium ReselectColumnsPostProcessor
配置
配置后处理器类似于配置 自定义转换器或 单个消息 转换(SMT)。要启用连接器使用 ReselectColumnsPostProcessor
,请在连接器配置中添加以下条目:
"post.processors": "reselector", 1 "reselector.type": "io.debezium.processors.reselect.ReselectColumnsPostProcessor", 2 "reselector.reselect.columns.include.list": "<schema>.<table>:<column>,<schema>.<table>:<column>", 3 "reselector.reselect.unavailable.values": "true", 4 "reselector.reselect.null.values": "true" 5 "reselector.reselect.use.event.key": "false" 6
项 | 描述 |
---|---|
1 | 以逗号分隔的后处理器前缀列表。 |
2 | 后处理器的完全限定类类型名称。 |
3 |
以逗号分隔的列名称列表,使用以下格式指定:< |
4 |
启用或禁用包含 |
5 |
启用或禁用 Reselection of column,这些列为 |
6 | 启用或禁用基于 reselection 的事件键字段名称。 |
14.1.3. Debezium 重新选择处理器配置属性的描述
下表列出了您可以为 Reselect Columns post-processor 设置的配置选项。
属性 | 默认 | 描述 |
没有默认值 |
从源数据库重新选择的列名称的逗号分隔列表。如果设置 | |
没有默认值 |
源数据库中以逗号分隔的列名称列表,以便从 reselection 中排除。如果设置 | |
|
指定 post 处理器是否重新选择与 | |
|
指定 post 处理器是否重新选择与 | |
|
指定 post 处理器是否根据事件的密钥字段名称重新选择,或使用关系表的主键列名称。 |
更新于 2024-07-19