第 6 章 Oracle 的 Debezium Connector
Debezium 的 Oracle 连接器捕获并记录在 Oracle 服务器上的数据库中进行的行级更改,包括连接器运行时添加的表。您可以将连接器配置为为特定 schema 和表的特定子集发出更改事件,或者在特定列中忽略、屏蔽或截断值。
有关与此连接器兼容的 Oracle 数据库版本的详情,请参考 Debezium 支持的配置页面。
Debezium 使用原生 LogMiner 数据库软件包从 Oracle 更改事件。
使用 Debezium Oracle 连接器的信息和流程进行组织,如下所示:
6.1. Debezium Oracle 连接器如何工作
为了最佳配置和运行 Debezium Oracle 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称、使用元数据以及实施事件缓冲会很有帮助。
如需更多信息,请参阅以下主题:
6.1.1. Debezium Oracle 连接器如何执行数据库快照
通常,Oracle 服务器上的红色日志被配置为不保留数据库的完整历史记录。因此,Debebe Oracle 连接器无法从日志检索数据库的整个历史记录。要启用连接器为数据库当前状态建立基准,连接器首次启动时,它会执行数据库的初始 一致性快照。
您可以通过设置 snapshot.mode
连接器配置属性的值来自定义连接器创建快照的方法。默认情况下,连接器的快照模式被设置为 initial
。
创建初始快照的默认连接器工作流
当快照模式被设置为默认模式时,连接器会完成以下任务来创建快照:
- 决定要捕获的表。
-
获取每个捕获表上的
ROW SHARE MODE
锁定,以防止在创建快照过程中发生结构更改。Debezium 只包含短时间的锁定。 - 从服务器的 redo 日志读取当前系统更改号(SCN)位置。
- 捕获所有相关表的结构。
- 释放在第 2 步中获取的锁定。
-
扫描所有相关数据库表和模式,使其在 SCN 位置处有效,该位置在第 3 步读取的 SCN 位置(选择
OF SCN 123
),为每行生成一个READ
事件,然后将事件记录写入特定于表的 Kafka 主题。 - 在连接器偏移中记录成功完成快照。
在快照过程开始后,如果因为连接器失败、重新平衡或其他原因导致进程中断,则进程会在连接器重启后重启。连接器完成初始快照后,它会继续从第 3 步中读取的位置进行流传输,使其不会丢失任何更新。如果连接器因为某种原因再次停止,则重启后,它会恢复之前从之前停止的流传输更改。
设置 | 描述 |
---|---|
| 在每个连接器启动时执行快照。快照完成后,连接器开始流传输事件记录以进行后续数据库更改。 |
| 连接器执行数据库快照,如 创建初始快照的默认工作流 中所述。快照完成后,连接器开始流传输事件记录以进行后续数据库更改。 |
| 连接器执行数据库快照,并在流传输任何更改事件记录前停止,不允许捕获任何后续更改事件。 |
|
连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但它不会创建 |
|
设置这个选项来恢复丢失或损坏的数据库模式历史记录主题。重启后,连接器运行一个快照,它会从源表中重建该主题。您还可以设置属性来定期修剪遇到意外增长的数据库架构历史记录主题。 |
如需更多信息,请参阅连接器配置属性表中的 snapshot.mode
。
6.1.1.1. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,遵循这个初始快照,连接器不会重复快照过程。连接器捕获的任何将来更改事件数据都仅通过流传输过程。
然而,在某些情况下,连接器在初始快照期间获取的数据可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能是执行临时快照:
- 连接器配置会被修改为捕获不同的表集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时命令快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用自动主题创建,Debezium 可以自动创建主题。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q2/html-single/debezium_user_guide/index#customization-of-kafka-connect-automatic-topic-creation
临时快照信号指定要包含在快照中的表。快照可以捕获数据库的整个内容,或者只捕获数据库中表的子集。另外,快照也可以捕获数据库中表内容的子集。
您可以通过向信号表发送 execute-snapshot
消息来指定要捕获的表。将 execute-snapshot
信号的类型设置为增量
,并提供快照中包含的表名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| 不适用 |
包含与要快照的表的完全限定域名匹配的正则表达式的数组。 |
| 不适用 | 可选字符串,它根据表的列指定条件,用于捕获表内容的子集。 |
触发临时快照
您可以通过在信号表中添加带有 execute-snapshot
信号类型的条目来启动临时快照。连接器处理消息后,它会开始快照操作。快照过程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和端点。根据表中的条目数量以及配置的块大小,Debezium 将表分成块,并一次为每个块进行快照。
6.1.1.2. 增量快照
为了提供管理快照的灵活性,Debezium 包括一个补充的快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,而不是一次性捕获数据库的完整状态,如初始快照,Debezium 以一系列可配置的块的形式捕获每个表。您可以指定您希望快照捕获的表,以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1 KB。
当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护它捕获的每一个表行的记录。这个阶段捕获数据的方法比标准初始快照过程提供以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是经过发布流,直到快照完成为止。连接器会继续在整个快照过程中从更改日志捕获接近实时事件,且操作都不会阻断其他事件。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间开始,而不是从开始重新定义表。
-
您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其
table.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 按主密钥对表进行排序,然后根据 配置的块大小 将表分成块。然后,按块使用块,然后在块中捕获每个表行。对于它捕获的每行,快照会发出 READ
事件。该事件代表了块开始快照时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,从而可能会修改表记录。要反映此类更改,INSERT
、UPDATE
或 DELETE
操作会照常提交到事务日志中。同样,持续 Debezium 流过程还会继续检测这些更改事件,并将对应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥解决记录间的冲突
在某些情况下,流传输进程发出的 UPDATE
或 DELETE
事件会耗尽序列。也就是说,流过程可能会发出一个事件,它会在快照捕获包含该行的 READ
事件之前修改表行。当快照最终会为行发出对应的 READ
事件时,其值已经被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debezium 会使用一种缓冲区方案来解决冲突。只有在快照事件和流事件间冲突时才解决后,Debezium 会向 Kafka 发送事件记录。
快照窗口
为了帮助解决后续 READ
事件和修改同一表行的流事件之间的冲突,Debezium 采用所谓的 快照窗口。快照窗口分离了增量快照捕获指定表块的数据的时间间隔。在打开块的快照窗口前,Debebe 会遵循其常见的行为,并将事件直接从事务日志直接降级到目标 Kafka 主题。但是,从打开特定块的快照时,Debebe 会执行重复数据删除步骤,以解决具有相同主密钥的事件之间冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ
操作发出。同时,当用户继续更新数据收集中的记录,并更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会将快照记录提供给内存缓冲区。在快照窗口中,缓冲区中 READ
事件的主键将与传入流事件的主键进行比较。如果没有找到匹配项,流的事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流的记录写入目标主题,因为流的事件逻辑地替换静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ
事件。Debezium 将这些剩余的 READ
事件发送到表的 Kafka 主题。
连接器重复每个快照块的进程。
Oracle 的 Debezium 连接器不支持增量快照运行时的模式更改。
6.1.1.3. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号发送临时快照 信号。
您可以以 SQL INSERT
查询形式向信号表提交信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,以及可选的指定快照操作类型。目前,快照操作的唯一有效选项是默认值 增量
。
要指定快照中包含的表,请提供一个 data-collections
数组,该数组列出了用于匹配表的表或一组正则表达式,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不执行快照。
如果要包含在快照中的表名称包含数据库、模式或表名称的句点(.
),要将表添加到 data-collections
数组中,您必须用双引号转义名称的每个部分。
例如,若要包含 公共
模式中存在的表,并且名为 My.Table
,请使用以下格式:" public"."My.Table
"。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询,将临时增量快照请求添加到信号表中:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}, 5 "additional-condition":"color=blue"}'); 6
命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 6.3. 向信号表发送增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。相反,在快照中,Debebe 会生成自己的id
字符串作为水位信号。3
execute-snapshot
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。5
增量
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效选项是默认值增量
。
如果没有指定值,连接器将运行一个增量快照。6
additional-condition
可选字符串,它根据表的列指定条件,用于捕获表内容的子集。有关
additional-condition
参数的详情请参考带有额外条件
的临时增量快照。
带有额外条件
的临时增量快照
如果您希望快照在表中仅包含内容子集,您可以通过将 additional-condition
参数附加到快照信号来修改信号请求。
典型的快照的 SQL 查询采用以下格式:
SELECT * FROM <tableName> ....
通过添加 additional-condition
参数,您可以将 WHERE
条件附加到 SQL 查询中,如下例所示:
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,假设您有一个包含以下列的产品表:
-
id
(主密钥) -
color
-
quantity
如果您希望 product 表的增量快照只包含 color=blue
的数据项,您可以使用以下 SQL 语句来触发快照:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含那些项目的 data =blue
和 quantity>10
:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件信息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
6.1.1.4. 停止增量快照
您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT
查询,向表提交停止快照信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行时停止增量快照操作。
您提交的查询指定了 增量
的快照操作,以及可选的、要删除的当前运行快照的表。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询以停止临时增量快照到信号表:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}'); 5
signal 命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 6.4. 向信号表发送停止增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。3
stop-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。如果省略了data
字段的此组件,则信号将停止正在进行的整个增量快照。5
增量
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效选项是增量的
。
如果没有指定类型
值,则信号无法停止增量快照。
6.1.2. 接收 Debezium Oracle 更改事件记录的 Kafka 主题默认名称
默认情况下,Oracle 连接器将所有 INSERT
、UPDATE
和 DELETE
操作的更改事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:
topicPrefix.schemaName.tableName
以下列表提供默认名称组件的定义:
- topicPrefix
-
由
topic.prefix
连接器配置属性指定的主题前缀。 - schemaName
- 在其中操作的 schema 的名称。
- tableName
- 操作发生的表的名称。
例如,如果 fulfillment
是服务器名称,inventory
是 schema 名称,数据库包括名为 orders
, customers
, 和 products
的表,Debezium Oracle 连接器会向以下 Kafka 主题发送事件,数据库中的每个表有一个。
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products
连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 以及 事务元数据主题。
如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由。
6.1.3. Debezium Oracle 连接器如何公开数据库 schema 的变化
您可以配置 Debezium Oracle 连接器来生成模式更改事件,这些事件描述了应用于数据库中捕获的表的结构更改。连接器将模式更改事件写入名为 < serverName>
; 的 Kafka 主题,其中 topicName
是 topic.prefix
配置属性中指定的命名空间。
每当从新表流传输数据时,Debezium 会将新消息发送到此主题。
连接器发送到 schema 更改主题的消息包含一个有效负载,也可以包含更改事件消息的 schema。模式更改事件消息的有效负载包括以下元素:
ddl
-
提供导致架构更改的 SQL
CREATE
、ALTER
或DROP
语句。 databaseName
-
将语句应用到的数据库的名称。
databaseName
的值充当 message 键。 tableChanges
-
模式更改后整个表架构的结构化表示。
tableChanges
字段包含一个数组,其中包含表中的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析程序处理它们。
默认情况下,连接器使用 ALL_TABLES
数据库视图来识别存储在 schema 历史记录主题中的表名称。在这个视图中,连接器只能从连接到数据库的用户帐户可用的表访问数据。
您可以修改设置,以便 schema 历史记录主题存储不同的表子集。使用以下方法之一更改主题存储的表集合:
-
更改 Debezium 用于访问数据库的帐户的权限,以便在
ALL_TABLES
视图中看到不同的表集合。 -
将连接器属性
schema.history.internal.store.only.captured.tables.ddl
设置为true
。
当连接器配置为捕获表时,它会保存表架构的历史记录,不仅在架构更改主题中,也存储在内部数据库模式历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要有关架构的通知的应用程序只消耗了该模式的更改主题的信息。
永不对数据库架构历史记录进行分区。要使数据库架构历史记录主题正常工作,它必须维护连接器向其发出的事件记录的一致性全局顺序。
要确保主题不在分区中分割,请使用以下方法之一为主题设置分区计数:
-
如果您手动创建数据库模式历史记录主题,请指定分区计数
1
。 -
如果您使用 Apache Kafka 代理自动创建数据库架构历史记录主题,则创建主题,将 Kafka
num.partitions
配置选项 的值设置为1
。
示例:向 Oracle 连接器 schema 更改主题发送的消息
以下示例显示了 JSON 格式的典型架构更改消息。消息包含表模式的逻辑表示。
{ "schema": { ... }, "payload": { "source": { "version": "2.1.4.Final", "connector": "oracle", "name": "server1", "ts_ms": 1588252618953, "snapshot": "true", "db": "ORCLPDB1", "schema": "DEBEZIUM", "table": "CUSTOMERS", "txId" : null, "scn" : "1513734", "commit_scn": "1513754", "lcr_position" : null, "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user" }, "ts_ms": 1588252618953, 1 "databaseName": "ORCLPDB1", 2 "schemaName": "DEBEZIUM", // "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n ( \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n \"FIRST_NAME\" VARCHAR2(255), \n \"LAST_NAME" VARCHAR2(255), \n \"EMAIL\" VARCHAR2(255), \n PRIMARY KEY (\"ID\") ENABLE, \n SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n ) SEGMENT CREATION IMMEDIATE \n PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n TABLESPACE \"USERS\" ", 3 "tableChanges": [ 4 { "type": "CREATE", 5 "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 6 "table": { 7 "defaultCharsetName": null, "primaryKeyColumnNames": [ 8 "ID" ], "columns": [ 9 { "name": "ID", "jdbcType": 2, "nativeType": null, "typeName": "NUMBER", "typeExpression": "NUMBER", "charsetName": null, "length": 9, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "FIRST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "LAST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "EMAIL", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 10 { "customAttribute": "attributeValue" } ] } } ] } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在源对象中,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。 |
2 |
| 标识包含更改的数据库和 schema。 |
3 |
| 此字段包含负责架构更改的 DDL。 |
4 |
| 包含 DDL 命令生成的模式更改的一个或多个项目的数组。 |
5 |
|
描述更改的类型。
|
6 |
|
创建、更改或丢弃的表的完整标识符。对于表重命名,这个标识符是 < |
7 |
| 应用更改后代表表元数据。 |
8 |
| 编写表主密钥的列列表。 |
9 |
| 更改表中的每个列的元数据。 |
10 |
| 每个表更改的自定义属性元数据。 |
在连接器发送到 schema 更改主题的消息中,message 键是包含架构更改的数据库的名称。在以下示例中,payload
字段包含 databaseName
键:
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.oracle.SchemaChangeKey" }, "payload": { "databaseName": "ORCLPDB1" } }
6.1.4. Debezium Oracle 连接器生成的事件代表事务边界
Debezium 可以生成代表事务元数据边界的事件,并增强数据更改事件消息。
Debezium 仅在部署连接器后为发生的事务注册并接收元数据。部署连接器前发生的事务元数据。
数据库事务由语句块表示,该块在 BEGIN
和 END
关键字之间括起。Debezium 为每个事务生成 BEGIN
和 END
分隔符的事务边界事件。事务边界事件包含以下字段:
status
-
BEGIN
或END
。 id
- 唯一事务标识符的字符串表示。
ts_ms
-
数据源上事务边界事件(
BEGIN
或END
事件)的时间。如果数据源没有向 Debezium 提供事件时间,则该字段代表 Debezium 处理事件的时间。 event_count
(用于END
事件)- 事务所设计的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
元素的一组对,指示连接器为来自数据收集的更改发出的事件数。
以下示例显示了典型的事务边界信息:
示例:Oracle 连接器事务边界事件
{ "status": "BEGIN", "id": "5.6.641", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "5.6.641", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER", "event_count": 1 }, { "data_collection": "ORCLPDB1.DEBEZIUM.ORDER", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则连接器会将事务事件发送到 < topic.prefix>
.transaction
主题。
6.1.4.1. Debezium Oracle 连接器如何通过事务元数据增强更改事件信息
如果启用了事务元数据,数据消息 Envelope
会增加一个新的 transaction
字段。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串表示。
total_order
- 事件在事务生成的所有事件的绝对路径。
data_collection_order
- 事件在事务发送的所有事件中每个数据收集位置。
以下示例显示了典型的事务事件信息:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "5.6.641", "total_order": "1", "data_collection_order": "1" } }
6.1.5. Debezium Oracle 连接器如何使用事件缓冲
Oracle 按它们发生的顺序将所有更改写入 redo 日志,包括以后由回滚丢弃的更改。因此,来自不同事务的并发更改会被交集。当连接器首次读取更改流时,因为它无法立即决定提交或回滚哪些更改,它会临时将更改事件存储在内部缓冲区中。提交更改后,连接器会将更改事件从缓冲区写入 Kafka。连接器丢弃回滚所丢弃的更改事件。
您可以通过设置属性 log.mining.buffer.type
来配置连接器使用的缓冲机制。
heap
默认缓冲区类型使用 memory
进行配置。在默认 内存设置
下,连接器使用 JVM 进程的堆内存来分配和管理缓冲的事件记录。如果您使用 内存
缓冲区设置,请确保分配给 Java 进程的内存量可以容纳您环境中的长时间运行和大型事务。
6.1.6. Debezium Oracle 连接器如何检测 SCN 值中的差距
当 Debezium Oracle 连接器配置为使用 LogMiner 时,它会使用基于系统更改号(SCN)的开始和结束范围从 Oracle 收集更改事件。连接器自动管理此范围,根据连接器是否可以在接近实时更改的情况下增加或减少范围,或者必须处理因为数据库中大型或批量事务卷导致更改的积压。
在某些情况下,Oracle 数据库以不高的数量为 SCN 加快 SCN,而不是以恒定率增加 SCN 值。SCN 值中的这个跳过可能会因为特定集成与数据库交互的方式发生,或者作为热备份等事件的结果。
Debezium Oracle 连接器依赖于以下配置属性来检测 SCN 差距并调整最小范围。
log.mining.scn.gap.detection.gap.size.min
- 指定最小差距大小。
log.mining.scn.gap.detection.time.interval.max.ms
- 指定最大时间间隔。
连接器首先比较当前 SCN 和当前最小范围内的最高 SCN 之间的更改数的不同。如果当前 SCN 值和最高 SCN 值之间的差别大于最小差距大小,那么连接器可能会检测到 SCN 差距。要确认是否存在差距,连接器 接下来比较当前 SCN 和之前最小范围末尾的 SCN 的时间戳。如果时间戳之间的区别小于最大时间间隔,则确认存在 SCN 差距。
当发生 SCN 差距时,Debezium 连接器会自动使用当前的 SCN 作为当前最小会话范围的端点。这允许连接器快速捕获实时事件,而不会在返回不小的范围的情况下快速捕获实时事件,因为 SCN 值被意外数字增加。当连接器执行前面的步骤以响应 SCN 差距时,它会忽略 log.mining.batch.size.max 属性指定的值。连接器完成 mining 会话并捕获到实时事件后,它会恢复最大日志减批处理大小的强制。
只有在连接器运行时发生大型 SCN 递增并处理接近实时事件时,SCN 差距检测才可用。
6.1.7. Debezium 如何管理数据库中不经常更改的偏移量
Debezium Oracle 连接器跟踪连接器偏移中的系统更改号,以便在连接器重启时,它可以开始其停止位置。这些偏移是每个发出的更改事件的一部分;但是,当数据库更改的频率较低(几小时或天数)时,偏移可能会变得过时,并防止连接器在事务日志中不再提供系统更改数。
对于使用非CDB 模式连接到 Oracle 的连接器,您可以启用 heartbeat.interval.ms
来强制连接器定期发出心跳事件,以便偏移保持同步。
对于使用 CDB 模式连接到 Oracle 的连接器,维护同步更为复杂。不仅必须设置 heartbeat.interval.ms
,而且还需要设置 heartbeat.action.query
。需要指定这两个属性,因为在 CDB 模式中,连接器仅会专门跟踪 PDB 中的更改。需要补充机制才能从可插拔数据库内触发更改事件。定期间隔,心跳操作查询会导致连接器插入新表行,或更新可插拔数据库中的现有行。Debezium 会检测到表更改,并为它们发出更改事件,确保偏移保持同步,即使在处理经常更改的可插拔数据库中也是如此。
要使连接器使用 heartbeat.action.query
带有不是 连接器用户帐户 拥有的表,您必须授予连接器用户权限,以便在这些表上运行必要的 INSERT
或 UPDATE
查询。