第 7 章 Oracle 的 Debezium Connector


Debezium 的 Oracle 连接器捕获并记录在 Oracle 服务器上的数据库中发生的行级更改,包括在连接器运行时添加的表。您可以将连接器配置为为特定模式和表的子集发出更改事件,或者在特定列中忽略、掩码或截断值。

有关与此连接器兼容的 Oracle 数据库版本的详情,请查看 Debezium 支持的配置页面

Debezium 使用原生 LogMiner 数据库软件包更改来自 Oracle 的事件。

使用 Debezium Oracle 连接器的信息和步骤进行组织,如下所示:

7.1. Debezium Oracle 连接器如何工作

为了优化配置和运行 Debezium Oracle 连接器,了解连接器如何执行快照、流更改事件、确定 Kafka 主题名称、使用元数据并实现事件缓冲。

如需更多信息,请参阅以下主题:

7.1.1. Debezium Oracle 连接器如何执行数据库快照

通常,Oracle 服务器上的 redo 日志配置为不保留数据库的完整历史记录。因此,Debezium Oracle 连接器无法从日志检索数据库的完整历史记录。要让连接器为数据库的当前状态建立基准,连接器首次启动时,它会执行数据库的初始 一致快照

注意

如果完成初始快照所需的时间超过为数据库设置的 UNDO_RETENTION 时间(默认为十分钟),则可能会出现 ORA-01555 异常。有关错误以及您可以从其中恢复的步骤的更多信息,请参阅 常见问题

您可以在以下部分找到有关快照的更多信息:

Oracle 连接器用来执行初始快照的默认工作流

以下工作流列出了 Debezium 创建快照所采取的步骤。这些步骤描述了当 snapshot.mode 配置属性设置为其默认值时(即 的初始 )时快照的流程。您可以通过 changig snapshot.mode 属性的值来自定义连接器创建快照的方式。如果您配置不同的快照模式,连接器使用这个工作流的修改版本完成快照。

当快照模式被设置为默认模式时,连接器会完成以下任务来创建快照:

  1. 建立与数据库的连接。
  2. 确定要捕获的表。默认情况下,连接器捕获除 从捕获中排除的模式 以外的所有表。快照完成后,连接器将继续流传输指定表的数据。如果您希望连接器只从特定表捕获数据,您可以通过设置 table.include.listtable.exclude.list 等属性来只捕获表或表元素子集的数据。
  3. 在每个捕获的表中获取 ROW SHARE MODE 锁定,以防止在创建快照过程中发生结构更改。Debezium 只保存一个短时间的锁定。
  4. 从服务器的 redo 日志中读取当前系统更改号(SCN)位置。
  5. 捕获所有数据库表的结构,或指定用于捕获的所有表。连接器在其内部数据库模式历史记录主题中保留模式信息。架构历史记录提供有关发生更改事件时生效的结构的信息。

    注意

    默认情况下,连接器捕获数据库中每个表的模式,这些模式处于捕获模式,包括没有配置为捕获的表。如果没有为捕获配置表,则初始快照只捕获其结构;它不会捕获任何表数据。有关为什么没有包括在初始快照中的表的快照保留模式信息,请参阅 了解为什么初始快照捕获所有表的 schema

  6. 释放在第 3 步中获取的锁定。其他数据库客户端现在可以写入任何之前锁定的表。
  7. 在步骤 4 中读取的 SCN 位置,连接器会扫描为捕获指定的表(SELECT * FROM …​ AS OF SCN 123)。在扫描过程中,连接器完成以下任务:

    1. 确认表已在快照开始前创建。如果表是在快照启动后创建的,连接器会跳过表。快照完成后,连接器过渡到 streaming,它会发出快照开始后创建的任何表的更改事件。
    2. 为从表获取的每行生成 读取 事件。所有 读取 事件都包含相同的 SCN 位置,这是在第 4 步中获得的 SCN 位置。
    3. 将每个 读取 事件发送到源表的 Kafka 主题。
    4. 释放数据表锁定(如果适用)。
  8. 在连接器偏移中记录快照成功完成。

生成的初始快照捕获捕获捕获的表中每行的当前状态。在这个基准状态中,连接器会捕获后续更改。

在快照进程开始后,如果进程因为连接器失败、重新平衡或其他原因而中断,则进程会在连接器重启后重启。连接器完成初始快照后,它会继续从在第 3 步中读取的位置进行流,使其不会错过任何更新。如果连接器因为任何原因而再次停止,它会在重启后从之前关闭的位置恢复流更改。

表 7.1. snapshot.mode 连接器配置属性的设置
设置描述

always

在每个连接器启动时执行快照。快照完成后,连接器开始流传输后续数据库更改的事件记录。

初始

连接器执行数据库快照,如 创建初始快照的默认工作流 中所述。快照完成后,连接器开始流传输后续数据库更改的事件记录。

initial_only

连接器在流传输任何更改事件记录前执行数据库快照并停止,不允许捕获任何后续更改事件。

schema_only

连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但它不会创建 READ 事件来代表连接器启动时(Step 6)时所设置的数据集。

schema_only_recovery

设置这个选项来恢复丢失或损坏的数据库架构历史记录主题。重启后,连接器会运行一个快照,它从源表中重建主题。您还可以设置该属性来定期修剪出现意外增长的数据库架构历史记录主题。

警告:如果在最后一个连接器关闭后,如果模式更改提交到数据库,则不要使用此模式来执行快照。

如需更多信息,请参阅连接器配置属性表中的 snapshot.mode

7.1.1.1. 初始快照捕获所有表的 schema 历史记录的描述

连接器运行的初始快照捕获两种类型的信息:

表数据
在连接器的 table.include.list 属性中命名的表中的 INSERTUPDATEDELETE 操作的信息。
模式数据
描述应用到表的结构更改的 DDL 语句。模式数据会保留给内部模式历史记录主题,以及连接器的 schema 更改主题(如果配置了)。

运行初始快照后,您可能会注意到快照捕获没有指定用于捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的每个表的模式信息,而不仅仅是从指定为捕获的表的表。连接器要求表的模式存在于架构历史记录主题中,然后才能捕获表。通过启用初始快照来捕获不是原始捕获集一部分的表的 schema 数据,Debebe 准备好连接器,以便稍后需要捕获这些表中的事件数据。如果初始快照没有捕获表的 schema,您必须将模式添加到历史记录主题,然后才能从表中捕获数据。

在某些情况下,您可能想要限制初始快照中的模式捕获。当您要减少完成快照所需的时间时,这非常有用。或者,当 Debezium 通过可访问多个逻辑数据库的用户帐户连接到数据库实例时,但您希望连接器只从特定逻辑数据库中的表捕获更改。

7.1.1.2. 从不是由初始快照捕获的表捕获数据(没有模式更改)

在某些情况下,您可能希望连接器从其模式未被初始快照捕获的表中捕获数据。根据连接器配置,初始快照只能捕获数据库中特定表的表模式。如果历史记录主题中没有表模式,连接器将无法捕获表,并报告缺少的 schema 错误。

您可能仍然能够从表中捕获数据,但您必须执行额外的步骤来添加表模式。

前提条件

流程

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 在连接器配置中:

    1. snapshot.mode 设置为 schema_only_recovery
    2. schema.history.internal.store.only.captured.tables.ddl 的值设置为 false
    3. 添加您希望连接器捕获至 table.include.list 的表。这样可保证将来,连接器可以重建所有表的 schema 历史记录。
  4. 重启连接器。快照恢复过程根据表的当前结构重建模式历史记录。
  5. (可选)在快照完成后,启动一个 增量快照 来捕获新添加的表的现有数据,以及该连接器关闭时发生的其他表的更改。
  6. (可选)将 snapshot.mode 重置为 schema_only,以防止连接器在以后的重启后启动恢复。

7.1.1.3. 从不是由初始快照捕获的表捕获数据(应用程序更改)

如果架构更改应用到表,则在架构更改前提交的记录与更改后提交的不同结构不同。当 Debezium 从表中捕获数据时,它会读取 schema 历史记录,以确保它为每个事件应用正确的模式。如果 schema 历史记录主题中没有 schema,则连接器无法捕获表,并出现错误结果。

如果要从初始快照捕获的表中捕获数据,并且修改了表的 schema,则必须将模式添加到历史记录主题中(如果它还没有可用)。您可以通过运行新的模式快照或运行表的初始快照来添加模式。

前提条件

  • 您希望从带有连接器在初始快照期间没有捕获的 schema 捕获数据。
  • 架构更改应用于表,以便捕获的记录没有统一结构。

流程

初始快照捕获了所有表的模式(storage.only.captured.tables.ddl 设置为 false)
  1. 编辑 table.include.list 属性,以指定您要捕获的表。
  2. 重启连接器。
  3. 如果要从新添加的表中捕获现有数据,则启动 增量快照
初始快照没有捕获所有表的模式(storage.only.captured.tables.ddl 设置为 true)

如果初始快照没有保存您要捕获的表的模式,请完成以下步骤之一:

流程 1:架构快照,后跟增量快照

在此过程中,连接器首先执行 schema 快照。然后,您可以启动增量快照,使连接器能够同步数据。

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答

    警告

    删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,应仅作为最后的手段来执行。

  4. 为连接器配置中的属性设置值,如以下步骤所述:

    1. snapshot.mode 属性的值设置为 schema_only
    2. 编辑 table.include.list 以添加您要捕获的表。
  5. 重启连接器。
  6. 等待 Debezium 捕获新表和现有表的模式。在连接器停止后发生任何表的数据更改不会被捕获。
  7. 为确保没有丢失数据,请启动 增量快照
步骤 2:初始快照,后跟可选的增量快照

在此过程中,连接器执行数据库的完整初始快照。与任何初始快照一样,在具有多个大型表的数据库中,运行初始快照可能会非常耗时。快照完成后,您可以选择触发增量快照来捕获连接器离线时发生的任何更改。

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答

    警告

    删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,应仅作为最后的手段来执行。

  4. 编辑 table.include.list 以添加您要捕获的表。
  5. 为连接器配置中的属性设置值,如以下步骤所述:

    1. snapshot.mode 属性的值设置为 initial
    2. (可选)将 schema.history.internal.store.only.captured.tables.ddl 设置为 false
  6. 重启连接器。连接器获取完整的数据库快照。快照完成后,连接器会过渡到 streaming。
  7. (可选)要捕获连接器离线时更改的任何数据,请启动 增量快照

7.1.2. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,在这个初始快照后,连接器不会重复快照过程。连接器捕获的任何更改事件数据都只通过流处理。

然而,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:

  • 连接器配置会被修改为捕获不同的表集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时快照来为之前捕获的表重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来发起临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。

临时快照信号指定要包含在快照中的表。快照可以捕获整个数据库的内容,或者仅捕获数据库中表的子集。另外,快照也可以捕获数据库中表的内容子集。

您可以通过将 execute-snapshot 消息发送到信号表来指定要捕获的表。将 execute-snapshot 信号类型设置为 增量,并提供快照中包含的表名称,如下表所述:

表 7.2. 临时 execute-snapshot 信号记录的示例
字段默认

type

incremental

指定您要运行的快照类型。
设置类型是可选的。目前,您只能请求 增量 快照。

data-collections

N/A

包含与要快照的表的完全限定域名匹配的正则表达式的数组。
名称的格式与 signal.data.collection 配置选项的格式相同。

additional-condition

N/A

可选字符串,根据表的列指定条件,用于捕获表的内容的子集。

surrogate-key

N/A

可选字符串,指定连接器在快照过程中用作表的主键的列名称。

触发临时快照

您可以通过向信号表中添加 execute-snapshot 信号类型的条目来发起临时快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和结束点。根据表中的条目数量以及配置的块大小,Debezium 会将表划分为块,并一次性执行每个块的快照。

目前,execute-snapshot 操作类型仅触发 增量快照。如需更多信息,请参阅 增加快照

7.1.3. 增量快照

为了提供管理快照的灵活性,Debezium 包含附加快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,除了一次捕获数据库的完整状态,就像初始快照一样,Debebe 会在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1024 行。

当增量快照进行时,Debebe 使用 watermarks 跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,捕获数据的阶段方法具有以下优点:

  • 您可以使用流化数据捕获并行运行增量快照,而不是在快照完成前进行后流。连接器会在快照过程中从更改日志中捕获接近实时事件,且操作都不会阻止其他操作。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的点开始,而不是从开始计算表。
  • 您可以随时根据需要运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其 table.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据 配置的块大小 将表分成块。然后,按块的工作块会捕获块中的每个表行。对于它捕获的每行,快照会发出 READ 事件。该事件代表块的快照开始时的行值。

当快照继续进行时,其他进程可能会继续访问数据库,可能会修改表记录。为了反映此类更改,INSERTUPDATEDELETE 操作会按照常常提交到事务日志。同样,持续 Debezium 流进程将继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。

Debezium 如何使用相同的主密钥在记录间解决冲突

在某些情况下,streaming 进程发出的 UPDATEDELETE 事件会停止序列。也就是说,流流过程可能会发出一个修改表行的事件,该事件捕获包含该行的 READ 事件的块。当快照最终为行发出对应的 READ 事件时,其值已被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debebe 使用缓冲方案来解析冲突。仅在快照事件和流化事件之间发生冲突后,De Debezium 会将事件记录发送到 Kafka。

快照窗口

为了帮助解决修改同一表行的后期事件和流化事件之间的冲突,Debebe 会使用一个所谓的 快照窗口快照窗口分解了增量快照捕获指定表块数据的间隔。在块的快照窗口打开前,Debebe 会使用其常见行为,并将事件从事务日志直接下游发送到目标 Kafka 主题。但从特定块的快照打开后,直到关闭为止,De-duplication 步骤会在具有相同主密钥的事件之间解决冲突。

对于每个数据收集,Debezium 会发出两种类型的事件,并将其存储在单个目标 Kafka 主题中。从表直接捕获的快照记录作为 READ 操作发送。同时,当用户继续更新数据收集中的记录,并且会更新事务日志来反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debezium 开始处理快照块,它会向内存缓冲区提供快照记录。在快照窗口期间,缓冲区中 READ 事件的主密钥与传入流事件的主键进行比较。如果没有找到匹配项,则流化事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ 事件,并将流化记录写入目标主题,因为流的事件逻辑地取代静态快照事件。在块关闭的快照窗口后,缓冲区仅包含 READ 事件,这些事件不存在相关的事务日志事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器为每个快照块重复这个过程。

警告

Oracle 的 Debezium 连接器不支持增量快照运行时的模式更改。

7.1.3.1. 触发增量快照

目前,启动增量快照的唯一方法是向源数据库上的 信号表发送临时快照 信号。

作为 SQL INSERT 查询,您将向信号提交信号。

在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值 incremental

要指定快照中包含的表,请提供列出表或用于匹配表的正则表达式数组的 数据集合,例如:

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要任何操作,且不会执行快照。

注意

如果要包含在快照中的表的名称在数据库、模式或表的名称中包含句点(.),以将表添加到 data-collections 数组中,您必须使用双引号转义名称的每个部分。

例如,要包含一个存在于 公共 模式的表,其名称为 My.Table,请使用以下格式 :"public"."My.Table "。

先决条件

使用源信号频道来触发增量快照

  1. 发送 SQL 查询,将临时增量快照请求添加到信号表中:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}, 5
        "additional-condition":"color=blue"}'); 6

    命令中的 idtypedata 参数的值对应于 信号表 的字段

    下表描述了示例中的参数:

    表 7.3. SQL 命令中字段的描述,用于将增量快照信号发送到信号表
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。相反,Debebe 会在快照期间生成自己的 id 字符串作为水位线信号。

    3

    execute-snapshot

    type 参数指定信号旨在触发的操作。

    4

    data-collections

    信号的 data 字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
    数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在 signal.data.collection 配置属性中指定连接器信号表的名称相同。

    5

    incremental

    信号的 data 字段的可选 类型 组件,用于指定要运行的快照操作类型。
    目前,唯一有效的选项是默认值 incremental
    如果没有指定值,连接器将运行增量快照。

    6

    additional-condition

    可选字符串,根据表的列指定条件,用于捕获表的内容的子集。有关 additional-condition 参数的更多信息,请参阅 带有额外条件的临时增量快照

带有额外条件的临时增量快照

如果您希望快照只包含表中的内容子集,您可以通过向快照信号附加 additional-condition 参数来修改信号请求。

典型的快照的 SQL 查询采用以下格式:

SELECT * FROM <tableName> ....

通过添加 additional-condition 参数,您可以将 WHERE 条件附加到 SQL 查询中,如下例所示:

SELECT * FROM <tableName> WHERE <additional-condition> ....

以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

例如,假设您有一个包含以下列的 products 表:

  • ID (主键)
  • color
  • quantity

如果您需要 product 表的增量快照,其中只包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

additional-condition 参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含 color=bluequantity>10 的项数据:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增加快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效的选项是默认值 incremental
在 SQL 查询中指定 type 值,您提交到信号表是可选的。
如果没有指定值,连接器将运行增量快照。

2

op

指定事件类型。
快照事件的值是 r,表示 READ 操作。

7.1.3.2. 使用 Kafka 信号频道来触发增量快照

您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

message 的值是带有 typedata 字段的 JSON 对象。

信号类型是 execute-snapshotdata 字段必须具有以下字段:

表 7.4. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前,Debeium 仅支持 增量 类型。
详情请查看下一节。

data-collections

N/A

以逗号分隔的正则表达式数组,与快照中包含的表的完全限定域名匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

additional-condition

N/A

可选字符串,指定连接器评估为指定要包含在快照中的列子集的条件。

execute-snapshot Kafka 消息示例:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

带有额外条件的临时增量快照

Debezium 使用 additional-condition 字段来选择表内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM <tableName> …​.

当快照请求包含 additional-condition 时,extra-condition 会附加到 SQL 查询中,例如:

SELECT * FROM <tableName> WHERE <additional-condition> …​.

例如,如果一个 product table with the column id (主键)、colorbrand,如果您希望快照只包含 color='blue' 的内容,当您请求快照时,您可以附加一个 additional-condition 语句来过滤内容:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`

您可以使用 additional-condition 语句根据多个列传递条件。例如,如果您希望快照只包含 color='blue' products 表中,以及 brand='MyBrand',则您可以发送以下请求:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`

7.1.3.3. 停止增量快照

您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT 查询向表提交停止快照信号。

在 Debezium 检测到信号表中的更改后,它会读取信号,并在正在进行时停止增量快照操作。

您提交的查询指定 增量 的快照操作,以及要删除的当前运行快照的表。

先决条件

使用源信号频道停止增量快照

  1. 发送 SQL 查询以停止临时增量快照到信号表:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    signal 命令中的 idtypedata 参数的值对应于 信号表 的字段

    下表描述了示例中的参数:

    表 7.5. SQL 命令中字段的描述,用于将停止增量快照信号发送到信号表
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。

    3

    stop-snapshot

    指定 type 参数指定信号要触发的操作。

    4

    data-collections

    信号的 data 字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
    数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在 signal.data.collection 配置属性中指定连接器信号表的名称相同。如果省略了 data 字段的这一组件,信号将停止正在进行的整个增量快照。

    5

    incremental

    信号的 data 字段所需的组件,用于指定要停止的快照操作类型。
    目前,唯一有效的选项是 增量的
    如果没有指定 类型 值,信号将无法停止增量快照。

7.1.3.4. 使用 Kafka 信号频道停止增量快照

您可以将信号消息发送到 配置的 Kafka 信号主题,以停止临时增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

message 的值是带有 typedata 字段的 JSON 对象。

信号类型是 stop-snapshotdata 字段必须具有以下字段:

表 7.6. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前,Debeium 仅支持 增量 类型。
详情请查看下一节。

data-collections

N/A

可选数组,以逗号分隔的正则表达式,与表的完全限定域名匹配,以包含在快照中。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

以下示例显示了典型的 stop-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

7.1.4. 接收 Debezium Oracle 更改事件记录的默认 Kafka 主题名称

默认情况下,Oracle 连接器将所有 INSERTUPDATEDELETE 操作的更改事件写入特定于该表的单一 Apache Kafka 主题。连接器使用以下惯例来命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表为默认名称的组件提供定义:

topicPrefix
topic.prefix 连接器配置属性指定的主题前缀。
schemaName
操作所在的模式的名称。
tableName
操作所在的表的名称。

例如,如果 fulfillment 是服务器名称,inventory 是 schema 名称,数据库包括名为 orders, customers, 和 products 的表,Debezium Oracle 连接器会向以下 Kafka 主题发送事件,数据库中的每个表有一个。

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

连接器应用类似的命名约定,以标记其内部数据库架构历史记录主题、架构更改主题 和事务元数据主题

如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 来自定义主题命名的更多信息,请参阅 主题路由

7.1.5. Debezium Oracle 连接器如何处理数据库架构更改

当数据库客户端查询数据库时,客户端将使用数据库的当前架构。但是,数据库模式可以随时更改,这意味着连接器必须能够识别每个插入、更新或删除操作被记录的时间。另外,连接器不一定将当前的模式应用到每个事件。如果事件相对旧,则应用当前模式之前可能会记录该事件。

为确保在架构更改后正确处理事件,Oracle 包含在红色日志中,不仅影响数据的行级更改,还应用于数据库的 DDL 语句。当连接器在红色日志中遇到这些 DDL 语句时,它会解析它们并更新每个表模式的内存表示。连接器使用此模式表示来识别每个插入、更新或删除操作时表的结构,并生成适当的更改事件。在单独的数据库架构历史记录 Kafka 主题中,连接器记录所有 DDL 语句,以及在红色日志中记录每个 DDL 语句的位置。

当连接器在崩溃或安全停止后重启时,它从特定位置(即时间点)开始读取 redo 日志。连接器通过读取数据库模式历史记录 Kafka 主题,并将所有 DDL 语句解析为连接器启动的红色日志点,以此重建此时存在的表结构。

此数据库架构历史记录主题为内部连接器,仅用于内部连接器。另外,连接器也可以将 模式更改事件发送到用于消费者应用程序的不同主题

其他资源

7.1.6. Debezium Oracle 连接器如何公开数据库 schema 的变化

您可以配置 Debezium Oracle 连接器来生成模式更改事件,该事件描述了应用到数据库中表的结构更改。连接器将模式更改事件写入名为 < serverName&gt; 的 Kafka 主题,其中 topicNametopic.prefix 配置属性中指定的命名空间。

当 Debezium 从新表中流数据或更改表结构时,Debezium 会向 schema 更改主题发送一个新消息。

连接器发送到 schema 更改主题的消息包含一个有效负载,以及可选的包含更改事件消息的 schema。模式更改事件消息的有效负载包括以下元素:

ddl
提供会导致架构更改的 SQL CREATEALTERDROP 语句。
databaseName
将语句应用到的数据库的名称。databaseName 的值充当 message 键。
tableChanges
架构更改后整个表模式的结构化表示。tableChanges 字段包含一个数组,其中包含表的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可轻松读取消息,而不必先通过 DDL 解析器处理它们。
重要

默认情况下,连接器使用 ALL_TABLES 数据库视图来识别要存储在 schema 历史记录主题中的表名称。在该视图中,连接器只能访问可通过它连接到数据库的用户帐户的表中的数据。

您可以修改设置,以便 schema 历史记录主题存储不同的表子集。使用以下方法之一更改主题存储的表集合:

重要

当连接器配置为捕获表时,它只会在 schema 更改主题中存储表的历史记录,也存储在内部数据库 schema 历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不适用于消耗应用程序直接使用。确保需要通知架构更改的应用程序只消耗来自 schema 更改主题的信息。

重要

切勿对数据库架构历史记录主题进行分区。要使数据库架构历史记录主题正常工作,它必须维护连接器发出的事件记录的全局顺序。

要确保主题没有在分区间分割,请使用以下方法之一为主题设置分区计数:

  • 如果您手动创建数据库架构历史记录主题,请指定分区计数 1
  • 如果您使用 Apache Kafka 代理自动创建数据库 schema 历史记录主题,则会创建该主题,将 Kafka num.partitions配置选项 的值设置为 1

示例:消息发送到 Oracle 连接器模式更改主题

以下示例显示了 JSON 格式的典型的模式更改消息。该消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "2.3.4.Final",
      "connector": "oracle",
      "name": "server1",
      "ts_ms": 1588252618953,
      "snapshot": "true",
      "db": "ORCLPDB1",
      "schema": "DEBEZIUM",
      "table": "CUSTOMERS",
      "txId" : null,
      "scn" : "1513734",
      "commit_scn": "1513754",
      "lcr_position" : null,
      "rs_id": "001234.00012345.0124",
      "ssn": 1,
      "redo_thread": 1,
      "user_name": "user"
    },
    "ts_ms": 1588252618953, 1
    "databaseName": "ORCLPDB1", 2
    "schemaName": "DEBEZIUM", //
    "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n   (    \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n    \"FIRST_NAME\" VARCHAR2(255), \n    \"LAST_NAME" VARCHAR2(255), \n    \"EMAIL\" VARCHAR2(255), \n     PRIMARY KEY (\"ID\") ENABLE, \n     SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n   ) SEGMENT CREATION IMMEDIATE \n  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n  TABLESPACE \"USERS\" ", 3
    "tableChanges": [ 4
      {
        "type": "CREATE", 5
        "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 6
        "table": { 7
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 8
            "ID"
          ],
          "columns": [ 9
            {
              "name": "ID",
              "jdbcType": 2,
              "nativeType": null,
              "typeName": "NUMBER",
              "typeExpression": "NUMBER",
              "charsetName": null,
              "length": 9,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 10
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表 7.7. 向 schema 更改主题发送的消息中字段的描述
字段名称描述

1

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

2

databaseName
schemaName

标识包含更改的数据库和架构。

3

ddl

此字段包含负责架构更改的 DDL。

4

tableChanges

包含 DDL 命令生成的模式更改的一个或多个项目的数组。

5

type

描述更改的类型。type 可以设置为以下值之一:

创建
已创建的表。
更改
修改表。
DROP
表已删除。

6

id

创建、更改或丢弃的表的完整标识符。如果是表重命名,这个标识符是 < old>、&lt;new&gt; 表名称的串联。

7

table

代表应用更改后的表元数据。

8

primaryKeyColumnNames

组成表主密钥的列的列表。

9

columns

更改表中每个列的元数据。

10

属性

每个表更改的自定义属性元数据。

在连接器发送到 schema 更改主题的消息中,message 键是包含 schema 更改的数据库的名称。在以下示例中,payload 字段包含 databaseName 键:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "ORCLPDB1"
  }
}

7.1.7. Debezium Oracle 连接器生成的事件代表事务边界

Debezium 可以生成代表事务元数据边界的事件,并增强数据更改事件消息。

Debezium 接收事务元数据时的限制

Debezium 注册并只针对部署连接器后发生的事务接收元数据。部署连接器前发生的事务元数据不可用。

数据库事务由语句块表示,该块包含在 BEGINEND 关键字之间。Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status
BEGINEND.
id
唯一事务标识符的字符串。
ts_ms
数据源的事务边界事件(BEGINEND 事件)的时间。如果数据源没有向事件时间提供 Debezium,则该字段代表 Debezium 处理事件的时间。
event_count (用于 END 事件)
事务提供的事件总数。
data_collections (用于 END 事件)
data_collectionevent_count 元素的数组,用于指示连接器发出来自数据收集的更改的事件数量。

以下示例显示了典型的事务边界消息:

示例:Oracle 连接器事务边界事件

{
  "status": "BEGIN",
  "id": "5.6.641",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "5.6.641",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
      "event_count": 1
    },
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 < topic.prefix>.transaction 主题。

7.1.7.1. Debezium Oracle 连接器如何使用事务元数据增强更改事件信息

如果启用了事务元数据,数据消息 Envelope 会增加一个新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串。
total_order
事件在事务生成的所有事件中绝对位置。
data_collection_order
在事务发出的所有事件间,按数据收集位置。

以下示例显示了典型的事务事件信息:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "5.6.641",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

查询模式

Debezium Oracle 连接器默认与 Oracle LogMiner 集成。此集成需要一组专用的步骤,其中包括生成复杂的 JDBC SQL 查询,以评估事务日志中记录的更改作为更改事件。JDBC SQL 查询使用的 V$LOGMNR_CONTENTS 视图没有任何索引来改进查询的性能,因此有不同的查询模式来控制 SQL 查询的生成方式,以改进查询的执行方式。

log.mining.query.filter.mode 连接器属性可使用以下任一方式配置 JDBC SQL 查询:

none
(默认)此模式会创建一个 JDBC 查询,该查询仅根据不同操作类型(如插入、更新或删除)过滤到数据库级别。当根据 schema, table, 或 username include/exclude 列表过滤数据时,会在连接器中的处理循环中完成此操作。

当从数据库捕获少量表时,这个模式通常很有用,这些表没有大量更改饱和。生成的查询非常简单,主要以低数据库开销尽快阅读。
in
此模式创建 JDBC 查询,该查询不仅过滤在数据库级别上的操作类型,还创建 schema、表和用户名 include/exclude 列表。查询的 predicates 会根据 include/exclude 列表配置属性中指定的值使用 SQL in-clause 生成。

从数据库中捕获大量表时,这个模式通常很有用。生成的查询比 none 模式要复杂得多,它侧重于减少网络开销,并尽可能在数据库级别上执行一次过滤。

最后,不要将 正则表达式指定为 schema 和 table include/exclude 配置属性的一部分。使用正则表达式将导致连接器与基于这些配置属性的更改不匹配,从而导致更改丢失。
regex
此模式创建 JDBC 查询,该查询不仅过滤在数据库级别上的操作类型,还创建 schema、表和用户名 include/exclude 列表。但是,与 in 模式不同,这个模式使用 Oracle REGEXP_LIKE 运算符生成 SQL 查询,具体取决于是否指定了 include 或 exclude 值。

当捕获可以使用少量正则表达式来标识的表数时,此模式通常很有用。生成的查询比任何其他模式更复杂,它侧重于减少网络开销,并尽可能在数据库级别执行一次过滤。

7.1.8. Debezium Oracle 连接器如何使用事件缓冲

Oracle 按照它们发生的顺序将所有更改写入 redo 日志,包括回滚后丢弃的更改。因此,来自独立事务的并发更改会被干预。当连接器首次读取更改流时,因为它无法立即决定提交或回滚哪些更改,它会临时将更改事件存储在内部缓冲区中。提交更改后,连接器会将更改事件从缓冲区写入 Kafka。连接器丢弃回滚丢弃的更改事件。

您可以通过设置属性 log.mining.buffer.type 来配置连接器使用的缓冲机制。

Heap

默认缓冲区类型使用 memory 进行配置。在默认 内存设置 下,连接器使用 JVM 进程的堆内存来分配和管理缓冲的事件记录。如果使用 内存 缓冲区设置,请确保分配给 Java 进程的内存量可以容纳环境中的长时间运行和大型事务。

7.1.9. Debezium Oracle 连接器如何检测 SCN 值中的差距

当 Debezium Oracle 连接器被配置为使用 LogMiner 时,它会使用基于系统更改号(SCN)的开始和结束范围从 Oracle 收集更改事件。连接器会自动管理这个范围,根据连接器是否能够流向实时流更改,或者因为数据库中大或批量事务的卷而处理更改,以自动增加或减少范围。

在某些情况下,Oracle 数据库会以一个非常高的数量来提升 SCN,而不是以恒定率增加 SCN 值。由于特定集成与数据库交互方式或出现热备份等事件,所以可能会出现 SCN 值的跳过。

Debezium Oracle 连接器依赖于以下配置属性来检测 SCN 差距并调整 mining 范围。

log.mining.scn.gap.detection.gap.size.min
指定最小空白大小。
log.mining.scn.gap.detection.time.interval.max.ms
指定最大时间间隔。

连接器首先比较当前 SCN 和当前 mining 范围内的最高 SCN 之间的变化数量。如果当前 SCN 值和最高 SCN 值之间的区别大于最小空白大小,则连接器可能会检测到 SCN 差距。要确认是否存在差距,连接器会随后比较当前 SCN 和 SCN 在前一个 mining 范围末尾的时间戳。如果时间戳之间的区别小于最大时间间隔,则确认存在 SCN 差距。

当发生 SCN 差距时,Debezium 连接器会自动使用当前的 SCN 作为当前 mining 会话范围的端点。这允许连接器在没有返回任何更改之间快速捕获实时事件,因为 SCN 值意外增加。当连接器执行前面的步骤以响应 SCN 差距时,它会忽略 log.mining.batch.size.max 属性指定的值。在连接器完成 mining 会话并捕获到实时事件后,它会恢复最大日志最小批处理大小的强制。

警告

只有在连接器运行和处理接近实时事件时,才会提供 SCN 差距检测。

7.1.10. Debezium 如何管理数据库中不经常更改的偏移量

Debezium Oracle 连接器跟踪连接器偏移中的系统更改号,以便在连接器重启时,它可以从离开的位置开始。这些偏移是每个发出的更改事件的一部分;但是,当数据库更改的频率较低(几小时或天)时,偏移可能会过时,并阻止连接器在事务日志中不再可用时成功重启。

对于使用非CDB 模式连接到 Oracle 的连接器,您可以启用 heartbeat.interval.ms 来强制连接器定期发出心跳事件,以便偏移保持同步。

对于使用 CDB 模式连接到 Oracle 的连接器,维护同步更为复杂。不仅必须设置 heartbeat.interval.ms,还需要设置 heartbeat.action.query。需要指定这两个属性,因为在 CDB 模式中,连接器仅跟踪 PDB 中的更改。需要补充机制来从可插拔数据库内触发更改事件。定期,心跳操作查询会导致连接器插入一个新的表行,或者在可插拔数据库中更新现有行。Debezium 检测到表更改,并为它们发出更改事件,确保偏移保持同步,即使在可插拔数据库中,进程不经常更改。

注意

要使连接器使用 heartbeat.action.query连接器用户帐户 所有的表,您必须授予连接器用户权限来对这些表运行必要的 INSERTUPDATE 查询。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.