第 6 章 Oracle 的 Debezium Connector


Debezium 的 Oracle 连接器捕获并记录在 Oracle 服务器上的数据库中进行的行级更改,包括连接器运行时添加的表。您可以将连接器配置为为特定 schema 和表的特定子集发出更改事件,或者在特定列中忽略、屏蔽或截断值。

有关与此连接器兼容的 Oracle 数据库版本的详情,请参考 Debezium 支持的配置页面

Debezium 使用原生 LogMiner 数据库软件包从 Oracle 更改事件。

使用 Debezium Oracle 连接器的信息和流程进行组织,如下所示:

6.1. Debezium Oracle 连接器如何工作

为了最佳配置和运行 Debezium Oracle 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称、使用元数据以及实施事件缓冲会很有帮助。

如需更多信息,请参阅以下主题:

6.1.1. Debezium Oracle 连接器如何执行数据库快照

通常,Oracle 服务器上的红色日志被配置为不保留数据库的完整历史记录。因此,Debebe Oracle 连接器无法从日志检索数据库的整个历史记录。要启用连接器为数据库当前状态建立基准,连接器首次启动时,它会执行数据库的初始 一致性快照

您可以通过设置 snapshot.mode 连接器配置属性的值来自定义连接器创建快照的方法。默认情况下,连接器的快照模式被设置为 initial

创建初始快照的默认连接器工作流

当快照模式被设置为默认模式时,连接器会完成以下任务来创建快照:

  1. 决定要捕获的表。
  2. 获取每个捕获表上的 ROW SHARE MODE 锁定,以防止在创建快照过程中发生结构更改。Debezium 只包含短时间的锁定。
  3. 从服务器的 redo 日志读取当前系统更改号(SCN)位置。
  4. 捕获所有相关表的结构。
  5. 释放在第 2 步中获取的锁定。
  6. 扫描所有相关数据库表和模式,使其在 SCN 位置处有效,该位置在第 3 步读取的 SCN 位置(选择OF SCN 123),为每行生成一个 READ 事件,然后将事件记录写入特定于表的 Kafka 主题。
  7. 在连接器偏移中记录成功完成快照。

在快照过程开始后,如果因为连接器失败、重新平衡或其他原因导致进程中断,则进程会在连接器重启后重启。连接器完成初始快照后,它会继续从第 3 步中读取的位置进行流传输,使其不会丢失任何更新。如果连接器因为某种原因再次停止,则重启后,它会恢复之前从之前停止的流传输更改。

表 6.1. snapshot.mode 连接器配置属性的设置
设置描述

always

在每个连接器启动时执行快照。快照完成后,连接器开始流传输事件记录以进行后续数据库更改。

Initial

连接器执行数据库快照,如 创建初始快照的默认工作流 中所述。快照完成后,连接器开始流传输事件记录以进行后续数据库更改。

initial_only

连接器执行数据库快照,并在流传输任何更改事件记录前停止,不允许捕获任何后续更改事件。

schema_only

连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但它不会创建 READ 事件来代表连接器启动时的数据集(Step 6)。

schema_only_recovery

设置这个选项来恢复丢失或损坏的数据库模式历史记录主题。重启后,连接器运行一个快照,它会从源表中重建该主题。您还可以设置属性来定期修剪遇到意外增长的数据库架构历史记录主题。

警告:如果在最后一次连接器关闭后将模式更改提交到数据库,则不要使用此模式来执行快照。

如需更多信息,请参阅连接器配置属性表中的 snapshot.mode

6.1.1.1. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,遵循这个初始快照,连接器不会重复快照过程。连接器捕获的任何将来更改事件数据都仅通过流传输过程。

然而,在某些情况下,连接器在初始快照期间获取的数据可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能是执行临时快照:

  • 连接器配置会被修改为捕获不同的表集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时命令快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用自动主题创建,Debezium 可以自动创建主题。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q2/html-single/debezium_user_guide/index#customization-of-kafka-connect-automatic-topic-creation

临时快照信号指定要包含在快照中的表。快照可以捕获数据库的整个内容,或者只捕获数据库中表的子集。另外,快照也可以捕获数据库中表内容的子集。

您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为增量,并提供快照中包含的表名称,如下表所述:

表 6.2. 临时 执行快照 信号记录的示例
字段默认

type

增量

指定您要运行的快照类型。
设置类型是可选的。目前,您只能请求 增量 快照。

data-collections

不适用

包含与要快照的表的完全限定域名匹配的正则表达式的数组。
名称的格式与 signal.data.collection 配置选项的格式相同。

additional-condition

不适用

可选字符串,它根据表的列指定条件,用于捕获表内容的子集。

触发临时快照

您可以通过在信号表中添加带有 execute-snapshot 信号类型的条目来启动临时快照。连接器处理消息后,它会开始快照操作。快照过程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和端点。根据表中的条目数量以及配置的块大小,Debezium 将表分成块,并一次为每个块进行快照。

目前,exec-snapshot 操作类型仅触发 增量快照。如需更多信息,请参阅 增加快照

6.1.1.2. 增量快照

为了提供管理快照的灵活性,Debezium 包括一个补充的快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,而不是一次性捕获数据库的完整状态,如初始快照,Debezium 以一系列可配置的块的形式捕获每个表。您可以指定您希望快照捕获的表,以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1 KB。

当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护它捕获的每一个表行的记录。这个阶段捕获数据的方法比标准初始快照过程提供以下优点:

  • 您可以使用流化数据捕获并行运行增量快照,而不是经过发布流,直到快照完成为止。连接器会继续在整个快照过程中从更改日志捕获接近实时事件,且操作都不会阻断其他事件。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间开始,而不是从开始重新定义表。
  • 您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其 table.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 按主密钥对表进行排序,然后根据 配置的块大小 将表分成块。然后,按块使用块,然后在块中捕获每个表行。对于它捕获的每行,快照会发出 READ 事件。该事件代表了块开始快照时的行值。

当快照继续进行时,其他进程可能会继续访问数据库,从而可能会修改表记录。要反映此类更改,INSERTUPDATEDELETE 操作会照常提交到事务日志中。同样,持续 Debezium 流过程还会继续检测这些更改事件,并将对应的更改事件记录发送到 Kafka。

Debezium 如何使用相同的主密钥解决记录间的冲突

在某些情况下,流传输进程发出的 UPDATEDELETE 事件会耗尽序列。也就是说,流过程可能会发出一个事件,它会在快照捕获包含该行的 READ 事件之前修改表行。当快照最终会为行发出对应的 READ 事件时,其值已经被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debezium 会使用一种缓冲区方案来解决冲突。只有在快照事件和流事件间冲突时才解决后,Debezium 会向 Kafka 发送事件记录。

快照窗口

为了帮助解决后续 READ 事件和修改同一表行的流事件之间的冲突,Debezium 采用所谓的 快照窗口。快照窗口分离了增量快照捕获指定表块的数据的时间间隔。在打开块的快照窗口前,Debebe 会遵循其常见的行为,并将事件直接从事务日志直接降级到目标 Kafka 主题。但是,从打开特定块的快照时,Debebe 会执行重复数据删除步骤,以解决具有相同主密钥的事件之间冲突。

对于每个数据收集,Debezium 会发出两种类型的事件,并将其记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,当用户继续更新数据收集中的记录,并更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debezium 开始处理快照块,它会将快照记录提供给内存缓冲区。在快照窗口中,缓冲区中 READ 事件的主键将与传入流事件的主键进行比较。如果没有找到匹配项,流的事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ 事件,并将流的记录写入目标主题,因为流的事件逻辑地替换静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器重复每个快照块的进程。

警告

Oracle 的 Debezium 连接器不支持增量快照运行时的模式更改。

6.1.1.3. 触发增量快照

目前,启动增量快照的唯一方法是向源数据库上的 信号发送临时快照 信号。

您可以以 SQL INSERT 查询形式向信号表提交信号。

在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,以及可选的指定快照操作类型。目前,快照操作的唯一有效选项是默认值 增量

要指定快照中包含的表,请提供一个 data-collections 数组,该数组列出了用于匹配表的表或一组正则表达式,例如:

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要任何操作,且不执行快照。

注意

如果要包含在快照中的表名称包含数据库、模式或表名称的句点(.),要将表添加到 data-collections 数组中,您必须用双引号转义名称的每个部分。

例如,若要包含 公共 模式中存在的表,并且名为 My.Table,请使用以下格式:" public"."My.Table "。

先决条件

流程

  1. 发送 SQL 查询,将临时增量快照请求添加到信号表中:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}, 5
        "additional-condition":"color=blue"}'); 6

    命令中的 idtypedata 参数的值对应于 信号表的字段

    下表描述了示例中的参数:

    表 6.3. 向信号表发送增量快照信号的 SQL 命令中的字段描述
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定分配给信号请求的 id 标识符的任意字符串。
    使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。相反,在快照中,Debebe 会生成自己的 id 字符串作为水位信号。

    3

    execute-snapshot

    type 参数指定信号要触发的操作。

    4

    data-collections

    信号的 data 字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
    数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定 signal.data.collection 配置属性中的连接器信号表的名称相同。

    5

    增量

    信号的 data 字段的可选 类型 组件,用于指定要运行的快照操作类型。
    目前,唯一有效选项是默认值 增量
    如果没有指定值,连接器将运行一个增量快照。

    6

    additional-condition

    可选字符串,它根据表的列指定条件,用于捕获表内容的子集。有关 additional-condition 参数的详情请参考 带有额外条件的临时增量快照

带有额外条件的临时增量快照

如果您希望快照在表中仅包含内容子集,您可以通过将 additional-condition 参数附加到快照信号来修改信号请求。

典型的快照的 SQL 查询采用以下格式:

SELECT * FROM <tableName> ....

通过添加 additional-condition 参数,您可以将 WHERE 条件附加到 SQL 查询中,如下例所示:

SELECT * FROM <tableName> WHERE <additional-condition> ....

以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

例如,假设您有一个包含以下列的产品表:

  • id (主密钥)
  • color
  • quantity

如果您希望 product 表的增量快照只包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

additional-condition 参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含那些项目的 data =bluequantity>10

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增加快照事件信息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效选项是默认值 增量
提交给信号表的 SQL 查询中指定类型 值是可选的。
如果没有指定值,连接器将运行一个增量快照。

2

op

指定事件类型。
快照事件的值是 r,代表 READ 操作。

6.1.1.4. 停止增量快照

您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT 查询,向表提交停止快照信号。

在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行时停止增量快照操作。

您提交的查询指定了 增量 的快照操作,以及可选的、要删除的当前运行快照的表。

先决条件

流程

  1. 发送 SQL 查询以停止临时增量快照到信号表:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    signal 命令中的 idtypedata 参数的值对应于 信号表的字段

    下表描述了示例中的参数:

    表 6.4. 向信号表发送停止增量快照信号的 SQL 命令中的字段描述
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定分配给信号请求的 id 标识符的任意字符串。
    使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。

    3

    stop-snapshot

    指定 type 参数指定信号要触发的操作。

    4

    data-collections

    信号的 data 字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
    数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定 signal.data.collection 配置属性中的连接器信号表的名称相同。如果省略了 data 字段的此组件,则信号将停止正在进行的整个增量快照。

    5

    增量

    信号的 data 字段所需的组件,用于指定要停止的快照操作类型。
    目前,唯一有效选项是 增量的
    如果没有指定 类型 值,则信号无法停止增量快照。

6.1.2. 接收 Debezium Oracle 更改事件记录的 Kafka 主题默认名称

默认情况下,Oracle 连接器将所有 INSERTUPDATEDELETE 操作的更改事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表提供默认名称组件的定义:

topicPrefix
topic.prefix 连接器配置属性指定的主题前缀。
schemaName
在其中操作的 schema 的名称。
tableName
操作发生的表的名称。

例如,如果 fulfillment 是服务器名称,inventory 是 schema 名称,数据库包括名为 orders, customers, 和 products 的表,Debezium Oracle 连接器会向以下 Kafka 主题发送事件,数据库中的每个表有一个。

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 以及 事务元数据主题

如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

6.1.3. Debezium Oracle 连接器如何公开数据库 schema 的变化

您可以配置 Debezium Oracle 连接器来生成模式更改事件,这些事件描述了应用于数据库中捕获的表的结构更改。连接器将模式更改事件写入名为 < serverName&gt; 的 Kafka 主题,其中 topicNametopic.prefix 配置属性中指定的命名空间。

每当从新表流传输数据时,Debezium 会将新消息发送到此主题。

连接器发送到 schema 更改主题的消息包含一个有效负载,也可以包含更改事件消息的 schema。模式更改事件消息的有效负载包括以下元素:

ddl
提供导致架构更改的 SQL CREATEALTERDROP 语句。
databaseName
将语句应用到的数据库的名称。databaseName 的值充当 message 键。
tableChanges
模式更改后整个表架构的结构化表示。tableChanges 字段包含一个数组,其中包含表中的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析程序处理它们。
重要

默认情况下,连接器使用 ALL_TABLES 数据库视图来识别存储在 schema 历史记录主题中的表名称。在这个视图中,连接器只能从连接到数据库的用户帐户可用的表访问数据。

您可以修改设置,以便 schema 历史记录主题存储不同的表子集。使用以下方法之一更改主题存储的表集合:

重要

当连接器配置为捕获表时,它会保存表架构的历史记录,不仅在架构更改主题中,也存储在内部数据库模式历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要有关架构的通知的应用程序只消耗了该模式的更改主题的信息。

重要

永不对数据库架构历史记录进行分区。要使数据库架构历史记录主题正常工作,它必须维护连接器向其发出的事件记录的一致性全局顺序。

要确保主题不在分区中分割,请使用以下方法之一为主题设置分区计数:

  • 如果您手动创建数据库模式历史记录主题,请指定分区计数 1
  • 如果您使用 Apache Kafka 代理自动创建数据库架构历史记录主题,则创建主题,将 Kafka num.partitions配置选项 的值设置为 1

示例:向 Oracle 连接器 schema 更改主题发送的消息

以下示例显示了 JSON 格式的典型架构更改消息。消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "2.1.4.Final",
      "connector": "oracle",
      "name": "server1",
      "ts_ms": 1588252618953,
      "snapshot": "true",
      "db": "ORCLPDB1",
      "schema": "DEBEZIUM",
      "table": "CUSTOMERS",
      "txId" : null,
      "scn" : "1513734",
      "commit_scn": "1513754",
      "lcr_position" : null,
      "rs_id": "001234.00012345.0124",
      "ssn": 1,
      "redo_thread": 1,
      "user_name": "user"
    },
    "ts_ms": 1588252618953, 1
    "databaseName": "ORCLPDB1", 2
    "schemaName": "DEBEZIUM", //
    "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n   (    \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n    \"FIRST_NAME\" VARCHAR2(255), \n    \"LAST_NAME" VARCHAR2(255), \n    \"EMAIL\" VARCHAR2(255), \n     PRIMARY KEY (\"ID\") ENABLE, \n     SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n   ) SEGMENT CREATION IMMEDIATE \n  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n  TABLESPACE \"USERS\" ", 3
    "tableChanges": [ 4
      {
        "type": "CREATE", 5
        "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 6
        "table": { 7
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 8
            "ID"
          ],
          "columns": [ 9
            {
              "name": "ID",
              "jdbcType": 2,
              "nativeType": null,
              "typeName": "NUMBER",
              "typeExpression": "NUMBER",
              "charsetName": null,
              "length": 9,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 10
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表 6.5. 向 schema 更改主题发送的消息中的字段描述
字段名称描述

1

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源对象中,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

2

databaseName
schemaName

标识包含更改的数据库和 schema。

3

ddl

此字段包含负责架构更改的 DDL。

4

tableChanges

包含 DDL 命令生成的模式更改的一个或多个项目的数组。

5

type

描述更改的类型。类型设置为 以下值之一:

创建
已创建表。
更改
表已修改。
DROP
表已删除。

6

id

创建、更改或丢弃的表的完整标识符。对于表重命名,这个标识符是 < old> , < new&gt; 表名称的串联。

7

table

应用更改后代表表元数据。

8

primaryKeyColumnNames

编写表主密钥的列列表。

9

columns

更改表中的每个列的元数据。

10

属性

每个表更改的自定义属性元数据。

在连接器发送到 schema 更改主题的消息中,message 键是包含架构更改的数据库的名称。在以下示例中,payload 字段包含 databaseName 键:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "ORCLPDB1"
  }
}

6.1.4. Debezium Oracle 连接器生成的事件代表事务边界

Debezium 可以生成代表事务元数据边界的事件,并增强数据更改事件消息。

Debezium 接收事务元数据时的限制

Debezium 仅在部署连接器后为发生的事务注册并接收元数据。部署连接器前发生的事务元数据。

数据库事务由语句块表示,该块在 BEGINEND 关键字之间括起。Debezium 为每个事务生成 BEGINEND 分隔符的事务边界事件。事务边界事件包含以下字段:

status
BEGINEND
id
唯一事务标识符的字符串表示。
ts_ms
数据源上事务边界事件(BEGINEND 事件)的时间。如果数据源没有向 Debezium 提供事件时间,则该字段代表 Debezium 处理事件的时间。
event_count (用于 END 事件)
事务所设计的事件总数。
data_collections (用于 END 事件)
data_collectionevent_count 元素的一组对,指示连接器为来自数据收集的更改发出的事件数。

以下示例显示了典型的事务边界信息:

示例:Oracle 连接器事务边界事件

{
  "status": "BEGIN",
  "id": "5.6.641",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "5.6.641",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
      "event_count": 1
    },
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 < topic.prefix>.transaction 主题。

6.1.4.1. Debezium Oracle 连接器如何通过事务元数据增强更改事件信息

如果启用了事务元数据,数据消息 Envelope 会增加一个新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串表示。
total_order
事件在事务生成的所有事件的绝对路径。
data_collection_order
事件在事务发送的所有事件中每个数据收集位置。

以下示例显示了典型的事务事件信息:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "5.6.641",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

6.1.5. Debezium Oracle 连接器如何使用事件缓冲

Oracle 按它们发生的顺序将所有更改写入 redo 日志,包括以后由回滚丢弃的更改。因此,来自不同事务的并发更改会被交集。当连接器首次读取更改流时,因为它无法立即决定提交或回滚哪些更改,它会临时将更改事件存储在内部缓冲区中。提交更改后,连接器会将更改事件从缓冲区写入 Kafka。连接器丢弃回滚所丢弃的更改事件。

您可以通过设置属性 log.mining.buffer.type 来配置连接器使用的缓冲机制。

heap

默认缓冲区类型使用 memory 进行配置。在默认 内存设置 下,连接器使用 JVM 进程的堆内存来分配和管理缓冲的事件记录。如果您使用 内存 缓冲区设置,请确保分配给 Java 进程的内存量可以容纳您环境中的长时间运行和大型事务。

6.1.6. Debezium Oracle 连接器如何检测 SCN 值中的差距

当 Debezium Oracle 连接器配置为使用 LogMiner 时,它会使用基于系统更改号(SCN)的开始和结束范围从 Oracle 收集更改事件。连接器自动管理此范围,根据连接器是否可以在接近实时更改的情况下增加或减少范围,或者必须处理因为数据库中大型或批量事务卷导致更改的积压。

在某些情况下,Oracle 数据库以不高的数量为 SCN 加快 SCN,而不是以恒定率增加 SCN 值。SCN 值中的这个跳过可能会因为特定集成与数据库交互的方式发生,或者作为热备份等事件的结果。

Debezium Oracle 连接器依赖于以下配置属性来检测 SCN 差距并调整最小范围。

log.mining.scn.gap.detection.gap.size.min
指定最小差距大小。
log.mining.scn.gap.detection.time.interval.max.ms
指定最大时间间隔。

连接器首先比较当前 SCN 和当前最小范围内的最高 SCN 之间的更改数的不同。如果当前 SCN 值和最高 SCN 值之间的差别大于最小差距大小,那么连接器可能会检测到 SCN 差距。要确认是否存在差距,连接器 接下来比较当前 SCN 和之前最小范围末尾的 SCN 的时间戳。如果时间戳之间的区别小于最大时间间隔,则确认存在 SCN 差距。

当发生 SCN 差距时,Debezium 连接器会自动使用当前的 SCN 作为当前最小会话范围的端点。这允许连接器快速捕获实时事件,而不会在返回不小的范围的情况下快速捕获实时事件,因为 SCN 值被意外数字增加。当连接器执行前面的步骤以响应 SCN 差距时,它会忽略 log.mining.batch.size.max 属性指定的值。连接器完成 mining 会话并捕获到实时事件后,它会恢复最大日志减批处理大小的强制。

警告

只有在连接器运行时发生大型 SCN 递增并处理接近实时事件时,SCN 差距检测才可用。

6.1.7. Debezium 如何管理数据库中不经常更改的偏移量

Debezium Oracle 连接器跟踪连接器偏移中的系统更改号,以便在连接器重启时,它可以开始其停止位置。这些偏移是每个发出的更改事件的一部分;但是,当数据库更改的频率较低(几小时或天数)时,偏移可能会变得过时,并防止连接器在事务日志中不再提供系统更改数。

对于使用非CDB 模式连接到 Oracle 的连接器,您可以启用 heartbeat.interval.ms 来强制连接器定期发出心跳事件,以便偏移保持同步。

对于使用 CDB 模式连接到 Oracle 的连接器,维护同步更为复杂。不仅必须设置 heartbeat.interval.ms,而且还需要设置 heartbeat.action.query。需要指定这两个属性,因为在 CDB 模式中,连接器仅会专门跟踪 PDB 中的更改。需要补充机制才能从可插拔数据库内触发更改事件。定期间隔,心跳操作查询会导致连接器插入新表行,或更新可插拔数据库中的现有行。Debezium 会检测到表更改,并为它们发出更改事件,确保偏移保持同步,即使在处理经常更改的可插拔数据库中也是如此。

注意

要使连接器使用 heartbeat.action.query 带有不是 连接器用户帐户 拥有的表,您必须授予连接器用户权限,以便在这些表上运行必要的 INSERTUPDATE 查询。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.