2.5. 用于 Oracle 的 Debezium Connector


Debezium 的 Oracle 连接器捕获并记录 Oracle 服务器上的数据库中发生的行级更改,包括在连接器运行时添加的表。您可以将连接器配置为为特定模式和表的子集发出更改事件,或者在特定列中忽略、掩码或截断值。

有关与此连接器兼容的 Oracle 数据库版本的详情,请参考 Debezium 支持的配置 页面

Debezium 可以使用原生 LogMiner 数据库软件包或 XStream API 从 Oracle 中获取最新的更改事件。

重要

将 Debezium Oracle 连接器与 XStream 搭配使用是一个开发者预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。此软件可能没有任何文档,可以随时更改或删除,并且已获得有限的测试。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。

有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围

使用 Debezium Oracle 连接器的信息和流程组织如下:

2.5.1. Debezium Oracle 连接器如何工作

要优化地配置和运行 Debezium Oracle 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称、使用元数据并实现事件缓冲会很有帮助。

如需更多信息,请参阅以下主题:

2.5.1.1. Debezium Oracle 连接器如何执行数据库快照

通常,Oracle 服务器上的红色日志配置为不保留数据库的完整历史记录。因此,Debezium Oracle 连接器无法从日志中检索数据库的完整历史记录。要让连接器为数据库的当前状态建立基线,连接器首次启动时,它会执行数据库的初始 一致快照

注意

如果完成初始快照所需的时间超过为数据库设置的 UNDO_RETENTION 时间(默认为十分钟),则可能会出现 ORA-01555 异常。有关错误的更多信息,以及您可以从该错误中恢复的步骤,请参阅 常见问题解答

重要

在表的快照期间,Oracle 可能会引发 ORA-01466 异常。当用户修改表的模式或添加、更改或丢弃与快照关联的索引或相关对象时,会出现这种情况。如果发生这种情况,连接器将停止,需要从开始获取初始快照。

要解决这个问题,您可以使用大于 0 的值配置 snapshot.database.errors.max.retries 属性,以便特定表的快照将重启。虽然整个快照不会在重试时从开始开始,但问题中的特定表将从开始重新读取,而表的主题将包含重复的快照事件。

您可以在以下部分中找到有关快照的更多信息:

2.5.1.1.1. Oracle 连接器用于执行初始快照的默认工作流

以下工作流列出了 Debezium 创建快照的步骤。这些步骤描述了当 snapshot.mode 配置属性设置为默认值时快照的进程,这是 初始。您可以通过更改 snapshot.mode 属性的值来自定义连接器创建快照的方式。如果您配置不同的快照模式,连接器使用此工作流的修改版本完成快照。

当快照模式设置为默认值时,连接器完成以下任务来创建快照:

  1. 建立与数据库的连接。
  2. 确定要捕获的表。默认情况下,连接器捕获所有表,除了那些与 捕获中排除的 schema 之外的表。快照完成后,连接器将继续流传输指定表的数据。如果您希望连接器只从特定表捕获数据,您可以通过设置 table.include.listtable.exclude.list 等属性来仅捕获表或表元素的子集的数据。
  3. 在每个捕获的表上获取 ROW SHARE MODE 锁定,以防止在创建快照期间发生结构更改。Debezium 只保存一个短时间锁定。
  4. 从服务器的 redo 日志中读取当前系统更改号(SCN)的位置。
  5. 捕获所有数据库表的结构或指定用于捕获的所有表。连接器在其内部数据库模式历史记录主题中保留 schema 信息。架构历史记录提供有关发生更改事件时所生效的结构的信息。

    注意

    默认情况下,连接器捕获数据库中处于捕获模式的每个表的模式,包括没有为捕获配置的表。如果没有为捕获配置表,则初始快照只捕获其结构;它不会捕获任何表数据。有关您在初始快照中没有包括快照持久模式信息的更多信息,请参阅 了解为什么初始快照捕获所有表的 schema

  6. 释放在第 3 步中获取的锁定。其他数据库客户端现在可以写入任何之前锁定的表。
  7. 在第 4 步中读取的 SCN 位置,连接器扫描为捕获指定的表(SELECT * FROM …​ AS OF SCN 123)。在扫描过程中,连接器完成以下任务:

    1. 确认表已在快照开始之前创建。如果在快照开始后创建了表,连接器会跳过该表。快照完成后,连接器过渡到 streaming,它会为快照开始后创建的任何表发出更改事件。
    2. 为从表中捕获的每行生成一个 读取 事件。所有 读取 事件都包含相同的 SCN 位置,这是在第 4 步中获取的 SCN 位置。
    3. 向源表的 Kafka 主题发出每个 读取 事件。
    4. 释放数据表锁定(如果适用)。
  8. 在连接器偏移中记录快照成功完成。

生成的初始快照会捕获捕获表中每行的当前状态。在这个基准状态中,连接器会捕获后续更改。

在快照过程开始后,如果进程因为连接器失败、重新平衡或其他原因而中断,进程会在连接器重启后重启。连接器完成初始快照后,它会从第 3 步中读取的位置继续流传输,以便它不会丢失任何更新。如果连接器因任何原因再次停止,它会在重启后恢复流更改,从之前离开的位置恢复流更改。

表 2.106. snapshot.mode 连接器配置属性的设置
设置描述

always

在每个连接器启动时执行快照。快照完成后,连接器将开始流传输后续数据库更改的事件记录。

Initial

连接器执行数据库快照,如用于创建初始快照的默认工作流 中所述。快照完成后,连接器将开始流传输后续数据库更改的事件记录。

initial_only

连接器执行数据库快照并在流传输任何更改事件记录前停止,不允许捕获任何后续更改事件。

schema_only

弃用,请参阅 no_data

no_data

连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但不创建 READ 事件来代表连接器启动时(Step 6)的数据集。

schema_only_recovery

弃用,请参阅恢复

recovery

设置这个选项以恢复丢失或损坏的数据库 schema 历史记录主题。重启后,连接器运行一个从源表中重建主题的快照。您还可以设置属性,以定期修剪遇到意外增长的数据库 schema 历史记录主题。

WARNING :在最后连接器关闭后将模式提交到数据库,请不要使用此模式执行快照。

when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

如需更多信息,请参阅连接器配置属性表中的 snapshot.mode

2.5.1.1.2. 有关初始快照捕获所有表的 schema 历史记录的描述

连接器运行的初始快照捕获两种类型的信息:

表数据
有关 INSERTUPDATEDELETE 操作的信息,它们在连接器的 table.include.list 属性中命名。
模式数据
描述应用到表的结构更改的 DDL 语句。如果配置了 schema 数据,则 schema 数据会被保留到内部模式历史记录主题,以及连接器的 schema 更改主题。

运行初始快照后,您可能会注意到快照捕获了未指定用于捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的每个表的模式信息,而不仅限于为捕获指定的表。连接器要求表的 schema 在捕获表之前存在于 schema 历史记录主题中。通过启用初始快照来捕获不属于原始捕获集的表,Debezium 准备连接器,以便稍后需要从这些表中读取事件数据。如果初始快照没有捕获表的模式,您必须将 schema 添加到历史记录主题,然后才能从表中捕获数据。

在某些情况下,您可能想要限制初始快照中的模式捕获。当您要减少完成快照所需的时间时,这非常有用。或者,当 Debezium 通过有权访问多个逻辑数据库的用户帐户连接到数据库实例时,但您希望连接器只从特定逻辑数据库中的表捕获更改。

2.5.1.1.3. 从初始快照未捕获的表中捕获数据(无模式更改)

在某些情况下,您可能希望连接器从最初快照未捕获其 schema 的表中捕获数据。根据连接器配置,初始快照可能会只捕获数据库中特定表的表模式。如果历史记录主题中没有表模式,连接器无法捕获表,并报告缺少的 schema 错误。

您可能仍然能够从表中捕获数据,但您必须执行额外的步骤来添加表模式。

先决条件

流程

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 在连接器配置中:

    1. snapshot.mode 设置为 recovery
    2. (可选)将 schema.history.internal.store.only.captured.tables.ddl 的值设置为 false,以确保在以后没有为捕获的表捕获数据。只有在历史记录主题中存在表的 schema 历史记录时,连接器才能从表中捕获数据。
    3. 添加您希望连接器捕获到 table.include.list 的表。
  4. 重启连接器。快照恢复过程会根据表的当前结构重建模式历史记录。
  5. (可选)快照完成后,在新添加的表中启动 增量快照。增量快照首先会流传输新添加的表的历史数据,然后恢复从之前配置的表读取更改和存档日志,包括该连接器在连接器离线时发生的更改。
  6. (可选)将 snapshot.mode 重新重置为 no_data,以防止连接器在以后的重启后启动恢复。
2.5.1.1.4. 从没有由初始快照捕获的表捕获数据(schema 更改)

如果将架构更改应用到表,则在架构更改前提交的记录与更改后提交的结构不同。当 Debezium 捕获表中的数据时,它会读取 schema 历史记录,以确保它将正确的模式应用到每个事件。如果 schema 历史记录主题中没有 schema,连接器将无法捕获表,并会产生错误结果。

如果要捕获初始快照未捕获的表中的数据,并修改了表的 schema,您必须将 schema 添加到历史记录主题(如果还没有可用)。您可以通过运行新的模式快照或运行表的初始快照来添加架构。

先决条件

  • 您需要使用一个模式来捕获数据,连接器在初始快照过程中不会捕获。
  • 对表应用了一个架构更改,因此要捕获的记录没有统一的结构。

流程

初始快照捕获了所有表的模式(storage.only.captured.tables.ddl 设置为 false)
  1. 编辑 table.include.list 属性,以指定您要捕获的表。
  2. 重启连接器。
  3. 如果要从新添加的表中捕获现有数据,则启动 增量快照
初始快照没有捕获所有表的模式(storage.only.captured.tables.ddl 设置为 true)

如果初始快照没有保存您要捕获的表的模式,请完成以下步骤之一:

流程 1:Schema 快照,后跟增量快照

在此过程中,连接器首先执行 schema 快照。然后,您可以启动增量快照,使连接器能够同步数据。

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答

    警告

    删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,并且仅应作为最后的手段来执行。

  4. 为连接器配置中的属性设置值,如以下步骤所述:

    1. snapshot.mode 属性的值设置为 no_data
    2. 编辑 table.include.list 以添加您要捕获的表。
  5. 重启连接器。
  6. 等待 Debezium 捕获新表和现有表的模式。连接器停止后发生的数据更改不会被捕获。
  7. 为确保没有数据丢失,可启动 增量快照
流程 2:初始快照,后跟可选的增量快照

在此过程中,连接器执行数据库的完整初始快照。与任何初始快照一样,在具有许多大表的数据库中,运行初始快照可能是一个耗时的操作。快照完成后,您可以选择性地触发增量快照,以捕获连接器离线期间发生的任何更改。

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答

    警告

    删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,并且仅应作为最后的手段来执行。

  4. 编辑 table.include.list 以添加您要捕获的表。
  5. 为连接器配置中的属性设置值,如以下步骤所述:

    1. snapshot.mode 属性的值设置为 initial
    2. (可选)将 schema.history.internal.store.only.captured.tables.ddl 设置为 false
  6. 重启连接器。连接器使用完整的数据库快照。快照完成后,连接器会过渡到 streaming。
  7. (可选)要捕获连接器脱机时更改的任何数据,请启动 增量快照

2.5.1.2. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。按照这个初始快照,在正常情况下,连接器不会重复快照过程。任何以后更改连接器捕获的事件数据都会通过流处理。

然而,在某些情况下,在初始快照中获取的连接器可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行临时快照:

  • 连接器配置会被修改为捕获不同的表集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。

临时快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或者仅捕获数据库中表的子集。此外,快照也可捕获数据库中表的内容的子集。

您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供快照中包含的表名称,如下表所述:

表 2.107. 临时 execute-snapshot 信号记录示例
字段默认

type

incremental

指定您要运行的快照类型。
目前,您可以请求 增量阻塞 快照。

data-collections

N/A

包含与快照中包含的表的完全限定域名匹配的正则表达式的数组。
对于 Oracle 连接器,使用以下格式来指定表的完全限定名称: database.schema.table

additional-conditions

N/A

可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数:

data-collection
过滤器应用到的表的完全限定域名。您可以对每个表应用不同的过滤器。
filter
指定在数据库记录中必须存在的列值,以便快照包含它,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

surrogate-key

N/A

可选字符串,用于指定连接器在快照过程中用作表的主键的列名称。

触发临时增量快照

您可以通过向信号表添加带有 execute-snapshot 信号类型的条目,或者向 Kafka 信号发送信号消息 来启动临时增量快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个表的开始和端点。根据表中的条目数量以及配置的块大小,Debezium 会将表分成块,并持续持续对每个块进行快照。

如需更多信息,请参阅 增加快照

触发临时阻塞快照

您可以通过在信号表或信号主题中添加带有 execute-snapshot 信号类型的条目来启动临时阻塞快照。连接器处理消息后,它会开始快照操作。连接器会临时停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,连接器会恢复流。

如需更多信息,请参阅 块快照

2.5.1.3. 增量快照

为了在管理快照方面提供灵活性,Debezium 包含一个补充快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,而不是像初始快照一样捕获数据库的完整状态,而是在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小是 1024 行。

当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护其捕获的每个表行的记录。与标准初始快照过程相比,这个分阶段方法捕获数据具有以下优点:

  • 您可以并行运行带有流数据捕获的增量快照,而不是延迟流,直到快照完成为止。连接器将继续在整个快照过程中从更改日志捕获接近实时事件,且操作都会阻止另一个事件。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从它停止的点开始,而不是从开始重新捕获表。
  • 您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其 table.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 按主密钥对每个表进行排序,然后根据 配置的块大小 将表分成块。通过块工作的块,然后捕获块中的每个表行。对于它捕获的每行,快照会发出 READ 事件。该事件代表了块开始快照时所在行的值。

当快照进行时,其他进程可能会继续访问数据库,可能会修改表记录。要反映此类更改,INSERTUPDATEDELETE 操作会按预期提交到事务日志。同样,持续 Debezium 流过程会继续检测到这些更改事件,并将对应的更改事件记录发送到 Kafka。

Debezium 如何处理具有相同主键的记录冲突

在某些情况下,streaming 进程发出的 UPDATEDELETE 事件会按顺序接收。也就是说,流处理可能会发出一个事件,在快照捕获了包含该行的 READ 事件前修改表行。当快照最终为行发出对应的 READ 事件时,其值已经被取代。为确保以正确的逻辑顺序处理出序列的增量快照事件,Debezium 采用缓冲方案来解决冲突。只有在快照事件和流事件之间冲突后,才会解析 Debezium 向 Kafka 发出事件记录。

快照窗口

为了帮助解决 late-arriving READ 事件和修改同一表行之间的冲突,Debezium 会使用一个所谓的 快照窗口。快照窗口分离间隔,在此期间会捕获指定表块的数据。在块的快照窗口打开前,Debezium 会遵循其通常的行为,并将事件从下游直接发送到目标 Kafka 主题。但是,从为特定块的快照打开,直到它关闭为止,Deduplication 会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。

对于每个数据收集,Debebe 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表捕获的快照记录被发送为 READ 操作。同时,当用户继续更新数据收集中的记录,并且更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debebe 开始处理快照块,它会向内存缓冲提供快照记录。在快照窗口中,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果没有找到匹配项,则流的事件记录直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃 buffered READ 事件,并将流记录写入目标主题,因为以逻辑方式取代静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器会为每个快照块重复这个过程。

目前,您可以使用以下任一方法启动增量快照:

警告

Oracle 的 Debezium 连接器不支持增量快照运行时的 schema 更改。

2.5.1.3.1. 触发增量快照

要启动增量快照,您可以发送 临时快照信号 到源数据库上的信号表。您可以提交快照信号,作为 SQL INSERT 查询。

在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并可选择性地指定快照操作的类型。Debezium 目前支持 增量阻塞 快照类型。

要指定要包含在快照中的表,提供一个列出表的 data-collections 数组,或用于匹配表的正则表达式数组,例如:

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debebe 会解释空数组,意味着不需要任何操作,且不会执行快照。

注意

如果要包含在快照中的表的名称包含一个点(.)、空格或其它非字母数字字符,则必须使用双引号转义表名称。
例如,若要将 公共 模式中存在的表包含在 db1 数据库中,并且名称为 My.Table,请使用以下格式 :"db1.public.\"My.Table\" "。

先决条件

使用源信号频道触发增量快照

  1. 发送 SQL 查询,将临时增量快照请求添加到信号表中:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

    例如,

    INSERT INTO db1.myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], 4
        "type":"incremental", 5
        "additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=\'blue\'"}]}'); 6

    命令中的 idtypedata 参数的值 与信号表的字段 相对应。
    下表描述了示例中的参数:

    表 2.108. SQL 命令中的字段描述,用于将增量快照信号发送到信号表
    描述

    1

    database.schema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。相反,在快照过程中,Debebe 会生成自己的 id 字符串作为水位线信号。

    3

    execute-snapshot

    type 参数指定信号要触发的操作。

    4

    data-collections

    信号的必需组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
    数组列出了使用格式 database.schema.table 的正则表达式,以匹配表的完全限定域名。此格式与您用来指定连接器 信号表的名称相同

    5

    incremental

    data 字段的可选类型 组件,用于指定要运行的快照操作类型。
    有效值为 incrementalblocking 值。
    如果没有指定值,连接器默认为执行增量快照。

    6

    additional-conditions

    可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
    每个额外条件都是带有 data-collectionfilter 属性的对象。您可以为每个数据收集指定不同的过滤器。
    请参阅 data-collection 属性是过滤器应用到的数据收集的完全限定域名。有关 additional-conditions 参数的详情,请参考 第 2.5.1.3.2 节 “使用附加 条件运行临时增量快照”

2.5.1.3.2. 使用附加 条件运行临时增量快照

如果您希望快照只在表中包括内容子集,您可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。

对典型快照的 SQL 查询采用以下格式:

SELECT * FROM <tableName> ....

通过添加 additional-conditions 参数,您可以在 SQL 查询中附加 WHERE 条件,如下例所示:

SELECT * FROM <data-collection> WHERE <filter> ....

以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

例如,假设您有一个包含以下列的 products 表:

  • ID (主密钥)
  • color
  • quantity

如果您需要 product 表的增量快照,其中只包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');

additional-conditions 参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含 color=bluequantity>10 的项数据:

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');

以下示例显示了连接器捕获的增量快照事件的 JSON。

例 2.32. 增量快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
表 2.109. 增量快照事件消息中的字段描述
字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效的选项是 阻塞增量
在提交至信号表的 SQL 查询中指定 type 值是可选的。
如果没有指定值,连接器将运行一个增量快照。

2

op

指定事件类型。
快照事件的值是 r,表示 READ 操作。

2.5.1.3.3. 使用 Kafka 信号频道触发增量快照

您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

信号类型是 execute-snapshotdata 字段必须具有以下字段:

表 2.110. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 支持 incrementalblocking 类型。
详情请查看下一部分。

data-collections

N/A

以逗号分隔的正则表达式,与快照中包含的表的完全限定域名匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式来指定名称。

additional-conditions

N/A

可选的附加条件数组,用于指定连接器评估以指定快照中包含的记录子集的条件。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数: data-collection:: 过滤器适用的表的完全限定域名。您可以对每个表应用不同的过滤器。过滤:: specifys 列值必须存在于数据库记录中才能包括快照,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

例 2.33. execute-snapshot Kafka 信息

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`

带有额外条件的临时增量快照

Debezium 使用 additional-conditions 字段来选择表内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM &lt ;tableName&gt; …​.

当快照请求包含 additional-conditions 属性时,属性的 data-collectionfilter 参数会附加到 SQL 查询中,例如:

SELECT * FROM &lt ;data-collection> WHERE & lt;filter&gt; …​.

例如,如果一个带有列 ID (主键)、颜色 和品牌products 表,如果您希望快照只包含 color='blue' 的内容,当请求快照时,您可以添加 additional-conditions 属性来过滤内容:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`

您还可以使用 additional-conditions 属性来根据多个列传递条件。例如,使用与上例中的相同 product 表,如果您希望快照只包含 color='blue'brand='MyBrand' products 表中的内容,您可以发送以下请求:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.5.1.3.4. 停止增量快照

在某些情况下,可能需要停止增量快照。例如,您可能意识到快照没有被正确配置,或者您可能要确保资源可用于其他数据库操作。您可以通过向源数据库上的信号发送信号来停止已经运行的快照。

您可以通过在 SQL INSERT 查询中发送停止快照信号,向信号表提交停止快照信号。stop-snapshot 信号将快照操作的类型指定为 增量,并选择性地指定要从当前运行的快照中省略的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行中时停止增量快照操作。

其他资源

您还可以通过向 Kafka 信号发送 JSON 消息来停止增量快照

先决条件

使用源信号频道停止增量快照

  1. 发送 SQL 查询以停止临时增量快照到信号表:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');

    例如,

    INSERT INTO db1.myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], 4
        "type":"incremental"}'); 5

    signal 命令中的 idtypedata 参数的值与 信号表的字段相对应
    下表描述了示例中的参数:

    表 2.111. SQL 命令中的字段描述,用于将停止增量快照信号发送到信号表
    描述

    1

    database.schema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。

    3

    stop-snapshot

    指定 type 参数,指定信号要触发的操作。

    4

    data-collections

    信号的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
    数组列出了正则表达式,该表达式通过其完全限定名称匹配表,格式为 database.schema.table

    如果您从 data 字段省略这个组件,信号将停止正在进行的整个增量快照。

    5

    incremental

    信号的必需组件,用于指定要停止的快照操作类型。
    目前,唯一有效的选项为 增量
    如果没有指定 类型 值,信号将无法停止增量快照。

2.5.1.3.5. 使用 Kafka 信号频道停止增量快照

您可以向 配置的 Kafka 信号主题 发送信号消息,以停止临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

signal 类型是 stop-snapshotdata 字段必须具有以下字段:

表 2.112. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 只支持 incremental 类型。
详情请查看下一部分。

data-collections

N/A

可选的、以逗号分隔的正则表达式,与表的完全限定名称匹配,表名称或正则表达式,以匹配要从快照中删除的表名称。
使用格式 database.schema.table 指定表名称。

以下示例显示了典型的 stop-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`

2.5.1.4. 阻塞快照

为了在管理快照方面提供更多灵活性,Debezium 包含一个额外的临时快照机制,称为 阻塞快照。阻塞快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

阻塞快照的行为与 初始快照 相似,但您可以在运行时触发快照。

您可能想要在以下情况下运行阻塞快照,而不是使用标准初始快照过程:

  • 您可以添加新表,并在连接器运行时完成快照。
  • 您可以添加大表,并且您希望快照在短时间内完成,而不是通过增量快照完成。

阻塞快照过程

当您运行阻塞快照时,Debebe 会停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,会恢复流。

配置快照

您可以在信号 的数据 组件中设置以下属性:

  • data-collections:指定哪个表必须是快照。
  • data-collections :指定您要包括快照的表。
    此属性接受与完全限定表名称匹配的正则表达式列表。属性的行为与 table.include.list 属性的行为类似,它指定要捕获在阻塞快照中的表。
  • additional-conditions :您可以为不同的表指定不同的过滤器。

    • data-collection 属性是要应用过滤器的表的完全限定名称,并可区分大小写或区分大小写,具体取决于数据库。
    • filter 属性将具有与 snapshot.select.statement.overrides 时使用的相同值,即条件应当匹配的表的完全限定域名。

例如:

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}

可能的副本

您发送信号触发快照的时间之间可能会有延迟,以及流停止和快照启动时的时间。因此,在快照完成后,连接器可能会发出一些由快照捕获的重复记录的事件记录。

2.5.1.5. 接收 Debezium Oracle 更改事件记录的 Kafka 主题的默认名称

默认情况下,Oracle 连接器将所有 INSERTUPDATEDELETE 操作的更改事件写入一个特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表提供了默认名称组件的定义:

topicPrefix
topic.prefix 连接器配置属性指定的主题前缀。
schemaName
发生操作的模式的名称。
tableName
操作发生的表的名称。

例如,如果 fulfillment 是服务器名称,inventory 是 schema 名称,数据库包括名为 orders, customers, 和 products 的表,Debezium Oracle 连接器会向以下 Kafka 主题发送事件,数据库中的每个表有一个。

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 和事务元数据主题

如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

2.5.1.6. Debezium Oracle 连接器如何处理数据库架构更改

当数据库客户端查询数据库时,客户端将使用数据库的当前架构。但是,可以随时更改数据库架构,这意味着连接器必须能够识别每次插入、更新或删除操作时的 schema。另外,连接器不一定将当前模式应用到每个事件。如果事件相对旧,则在应用当前模式之前记录这些事件。

为确保在架构更改后处理发生的正确事件,Oracle 包含在 redo 日志中包括影响数据的行级更改,以及应用到数据库的 DDL 语句。当连接器在 redo 日志中遇到这些 DDL 语句时,它会解析它们并更新每个表的 schema 的内存中表示。连接器使用此架构表示在每次插入、更新或删除操作时标识表的结构,并生成适当的更改事件。在单独的数据库架构历史记录 Kafka 主题中,连接器记录了所有 DDL 语句,以及出现每个 DDL 语句的红色日志中的位置。

当连接器在崩溃或安全停止后重启时,它开始从特定位置读取红色日志,即从特定时间点读取红色日志。连接器通过读取数据库模式历史记录 Kafka 主题来重建在此时存在的表结构,并在连接器启动的红色日志中解析所有 DDL 语句。

此数据库架构历史记录主题仅供内部使用。另外,连接器也可以将 schema 更改事件发送到面向消费者应用程序的不同主题

其他资源

2.5.1.7. Debezium Oracle 连接器如何公开数据库 schema 的变化

您可以配置 Debezium Oracle 连接器来生成模式更改事件,这些事件描述了应用到数据库中表的结构更改。连接器将模式更改事件写入名为 < serverName&gt; 的 Kafka 主题,其中 serverNametopic.prefix 配置属性中指定的命名空间。

当 Debezium 从新表流传输数据时,或更改表的结构时,Debebe 会向 schema 更改主题发送一条新消息。

连接器发送到 schema 更改主题的消息包含一个有效负载,并可以选择包含更改事件消息的 schema。

模式更改事件的 schema 具有以下元素:

name
模式更改事件消息的名称。
type
更改事件消息的类型。
version
架构的版本。version 是一个整数,每次更改 schema 时都会递增。
fields
更改事件消息中包含的字段。

示例:Oracle 连接器模式更改主题的 Schema

以下示例显示了 JSON 格式的典型模式。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "inventory"
  }
}

模式更改事件消息的有效负载包括以下元素:

ddl
提供导致 schema 的 SQL CREATEALTERDROP 语句。
databaseName
将语句应用到的数据库的名称。databaseName 的值充当 message 键。
tableChanges
架构更改后整个表模式的结构化表示。tableChanges 字段包含一个数组,其中包含表的每列条目。由于结构化表示以 JSON 或 Avro 格式显示数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析器处理它们。
重要

默认情况下,连接器使用 ALL_TABLES 数据库视图来识别要存储在 schema 历史记录主题中的表名称。在该视图中,连接器只能从可连接到数据库的用户帐户的表中访问数据。

您可以修改设置,以便 schema 历史记录主题存储了不同的表子集。使用以下方法之一更改主题存储的表集合:

重要

当连接器被配置为捕获表时,它会存储表的模式更改历史记录,不仅存储在 schema 更改主题中,还存储在内部数据库架构历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要针对 schema 更改通知的应用程序只消耗了来自 schema 更改主题的信息。

重要

永不对数据库 schema 历史记录主题进行分区。要使数据库架构历史记录主题正常工作,它必须保持一致的全局顺序,连接器向其发送的事件记录。

要确保主题不在分区中分割,请使用以下方法之一为主题设置分区计数:

  • 如果您手动创建数据库架构历史记录主题,请指定分区计数 1
  • 如果您使用 Apache Kafka 代理自动创建数据库模式历史记录主题,则会创建主题,将 Kafka num.partitions配置选项 的值设置为 1

示例:向 Oracle 连接器 schema 更改主题发送的消息

以下示例显示了 JSON 格式的典型的模式更改消息。消息包含表 schema 的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "3.0.8.Final",
      "connector": "oracle",
      "name": "server1",
      "ts_ms": 1588252618953,
      "ts_us": 1588252618953000,
      "ts_ns": 1588252618953000000,
      "snapshot": "true",
      "db": "ORCLPDB1",
      "schema": "DEBEZIUM",
      "table": "CUSTOMERS",
      "txId" : null,
      "scn" : "1513734",
      "commit_scn": "1513754",
      "lcr_position" : null,
      "rs_id": "001234.00012345.0124",
      "ssn": 1,
      "redo_thread": 1,
      "user_name": "user",
      "row_id": "AAASgjAAMAAAACnAAA"
    },
    "ts_ms": 1588252618953, 1
    "ts_us": 1588252618953987, 2
    "ts_ns": 1588252618953987512, 3
    "databaseName": "ORCLPDB1", 4
    "schemaName": "DEBEZIUM", //
    "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n   (    \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n    \"FIRST_NAME\" VARCHAR2(255), \n    \"LAST_NAME" VARCHAR2(255), \n    \"EMAIL\" VARCHAR2(255), \n     PRIMARY KEY (\"ID\") ENABLE, \n     SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n   ) SEGMENT CREATION IMMEDIATE \n  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n  TABLESPACE \"USERS\" ", 5
    "tableChanges": [ 6
      {
        "type": "CREATE", 7
        "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 8
        "table": { 9
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 10
            "ID"
          ],
          "columns": [ 11
            {
              "name": "ID",
              "jdbcType": 2,
              "nativeType": null,
              "typeName": "NUMBER",
              "typeExpression": "NUMBER",
              "charsetName": null,
              "length": 9,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 12
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表 2.113. 发送到 schema 更改主题的消息中的字段描述
字段名称描述

1

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

2

databaseName
schemaName

标识包含更改的数据库和模式。

3

ddl

此字段包含负责 schema 更改的 DDL。

4

tableChanges

包含 DDL 命令生成的架构更改的一个或多个项目的数组。

5

type

描述更改类型。type 可以设置为以下值之一:

创建
已创建表。
改变
已修改表。
DROP
已删除表。

6

id

创建、更改或丢弃的表的完整标识符。对于表重命名,此标识符是 < old> , < new&gt; 表名称的串联。

7

table

代表应用的更改后的表元数据。

8

primaryKeyColumnNames

编写表主键的列列表。

9

columns

changed 表中每个列的元数据。

10

属性

每个表更改的自定义属性元数据。

在连接器发送到 schema 更改主题的消息中,message 键是包含 schema 更改的数据库的名称。在以下示例中,payload 字段包含 databaseName 键:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "ORCLPDB1"
  }
}

2.5.1.8. Debezium Oracle 连接器生成的事件代表事务边界

Debezium 可以生成代表事务元数据边界的事件,以及丰富的数据更改事件消息。

Debezium 接收事务元数据时的限制

Debezium 注册并接收部署连接器后发生的事务的元数据。部署连接器前发生的事务元数据不可用。

数据库事务由一个声明块表示,该块包含在 BEGINEND 关键字之间。Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status
BEGINEND.
id
唯一事务标识符的字符串表示。
ts_ms
数据源的事务边界事件(BEGINEND 事件)的时间。如果数据源没有向 Debezium 提供事件时间,则字段代表 Debezium 处理事件的时间。
event_count (用于 END 事件)
事务处理的事件总数。
data_collections (用于 END 事件)
一组 data_collectionevent_count 元素,用于指示连接器为来自数据收集的更改发出的事件数。

以下示例显示了典型的事务边界消息:

示例:Oracle 连接器事务边界事件

{
  "status": "BEGIN",
  "id": "5.6.641",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "5.6.641",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
      "event_count": 1
    },
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 < topic.prefix>.transaction 主题。

2.5.1.8.1. Debezium Oracle 连接器如何使用事务元数据增强更改事件信息

如果启用了事务元数据,数据消息 Envelope 会增加一个新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串表示。
total_order
事件在事务生成的所有事件间的绝对位置。
data_collection_order
事件在事务发送的所有事件中的每个数据收集位置。

以下示例显示了典型的事务事件信息:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335741",
  "ts_ns": "1580390884335741963",
  "transaction": {
    "id": "5.6.641",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

LogMiner Mining 策略

Oracle redo 日志中的条目不会存储用户提交的原始 SQL 语句,以便进行 DML 更改。相反,红色条目包含一组更改向量,以及代表与这些向量相关的表、表和列的对象标识符。换句话说,红色日志条目不包括由 DML 更改影响的模式、表或列的名称。

Debezium Oracle 连接器使用 log.mining.strategy 配置属性来控制 Oracle LogMiner 如何处理更改向量中对象标识符的查找。在某些情况下,一个日志 mining 策略可能会比架构更改更可靠。但是,在选择日志 mining 策略前,务必要考虑其在性能和开销方面可能具有的影响。

编写数据字典以恢复日志

redo_log_catalog mining 策略指示数据库在每个红色日志切换后立即将数据字典的副本刷新到红色日志。这是跟踪与数据更改交互的模式更改的最可靠策略,因为 Oracle LogMiner 在一系列更改向量的开始和结束数据字典状态之间进行干预。

但是,redo_log_catalog 模式也是最昂贵,因为它需要几个关键步骤才能正常工作。首先,此模式需要在每次日志切换后将数据字典刷新到红色日志。在每个交换机后清除日志可以快速消耗存档日志中重要的空间,而归档日志的高卷可能会超过数据库管理员为准备的数量。如果要使用此模式,请与您的数据库管理员协调以确保正确配置了数据库。

重要

如果您将连接器配置为使用 redo_log_catalog 模式,请不要使用多个 Debezium Oracle 连接器来捕获来自同一逻辑数据库的更改。

直接使用在线目录

默认策略模式 online_catalog 的工作方式与 redo_log_catalog 模式不同。当策略设置为 online_catalog 时,数据库永远不会将数据字典刷新到红色日志。相反,Oracle LogMiner 始终使用最新的数据字典状态来执行比较。通过始终使用当前的字典,并消除对红色日志的清空,此策略需要较少的开销,并更有效地运行。但是,这些好处是无法解析内部模式更改和数据更改的偏移量。因此,这个策略有时可能会导致事件失败。

如果 LogMiner 在模式更改后无法重建 SQL 可靠性,请检查红色日志以了解证据。查找名称为 OBJ# 123456 (其中数字是表的对象标识符)或带有名称(如 COL1COL2) 的表的引用。当您将连接器配置为使用 online_catalog 策略时,采取措施来确保表 schema 及其索引保持静态状态,并避免更改。如果 Debezium 连接器配置为使用 online_catalog 模式,且您必须应用 schema 更改,请执行以下步骤:

  1. 等待连接器捕获所有现有的数据更改(DML)。
  2. 执行架构(DDL)更改,然后等待连接器捕获更改。
  3. 在表上恢复数据更改(DML)。

按照以下步骤,确保 Oracle LogMiner 可以安全地重建 SQL 以了解所有数据更改。

混合方法

您可以通过将 log.mining.strategy 配置属性的值设置为 hybrid 来启用此策略。此策略的目标是为 redo_log_catalog 策略提供可靠性,并提供 online_catalog 策略的性能和低开销,而无需考虑任一策略的缺点。

混合 策略主要在 online_catalog 模式下运行,这意味着 Debezium Oracle 连接器首先将事件重建委派给 Oracle LogMiner。如果 Oracle LogMiner 成功重建 SQL,Debezium 会正常处理事件,就像它被配置为使用 online_catalog 策略一样。如果连接器检测到 Oracle LogMiner 无法重建 SQL,连接器会尝试通过使用该表对象的 schema 历史记录直接重建 SQL。只有在 Oracle LogMiner 和连接器无法重建 SQL 时,连接器才会报告失败。

重要

如果 lob.enabled 属性设为 true,则无法使用 hybrid mining 策略。如果您需要流传输 CLOBBLOBXML 数据,则只能使用 online_catalogredo_log_catalog 策略。

查询模式

Debezium Oracle 连接器默认与 Oracle LogMiner 集成。此集成需要一组专门的步骤,其中包括生成复杂的 JDBC SQL 查询,以便尽可能在事务日志中记录的更改作为更改事件。JDBC SQL 查询使用的 V$LOGMNR_CONTENTS 视图没有任何索引来改进查询的性能,因此可以使用不同的查询模式来控制如何以改进查询的方式生成 SQL 查询。

log.mining.query.filter.mode 连接器属性可以配置以下内容之一,以便影响生成 JDBC SQL 查询的方式:

none
(默认)此模式会创建一个 JDBC 查询,它根据不同的操作类型(如插入、更新或删除)在数据库级别上过滤。当根据 schema, table, 或 username include/exclude 列表过滤数据时,这是在连接器中的处理循环过程中完成的。

当从数据库中捕获没有大量更改的表时,这个模式通常很有用。生成的查询非常简单,主要侧重于通过低数据库开销尽快读取。
in
这个模式会创建一个 JDBC 查询,它不仅过滤在数据库级别上的操作类型,还过滤模式、表和用户名包含/排除列表。查询的 predicates 使用 SQL in-clause 生成,具体取决于 include/exclude 列表配置属性中指定的值。

当从数据库捕获大量表时,这个模式通常很有用,而这些表在更改时非常饱和。生成的查询比 none 模式复杂,并侧重于减少网络开销,并尽可能在数据库级别执行尽可能多的过滤。

最后,不要将 正则表达式指定为 schema 和 table include/exclude 配置属性的一部分。使用正则表达式将导致连接器与基于这些配置属性的更改不匹配,从而导致更改丢失。
regex
这个模式会创建一个 JDBC 查询,它不仅过滤在数据库级别上的操作类型,还过滤模式、表和用户名包含/排除列表。但是,与 in 模式不同,此模式会根据是否指定了 include 或 exclude 的值,使用 Oracle REGEXP_LIKE 操作器生成 SQL 查询。

当捕获使用少量正则表达式的表号时,此模式通常很有用。生成的查询比任何其他模式复杂得多,并侧重于减少网络开销,并尽可能在数据库级别执行尽可能多的过滤。

2.5.1.9. Debezium Oracle 连接器如何使用事件缓冲

Oracle 将所有更改按其发生的顺序写入红色日志,包括稍后由回滚丢弃的更改。因此,来自独立事务的并发更改会被干扰。当连接器首先读取更改流时,因为它无法立即确定提交或回滚哪些更改,它会临时将更改事件存储在内部缓冲区中。提交更改后,连接器会将更改事件从缓冲区写入 Kafka。连接器丢弃了回滚丢弃的更改事件。

您可以通过设置属性 log.mining.buffer.type 来配置连接器使用的缓冲机制。

heap

默认缓冲区类型使用 memory 进行配置。在默认 内存设置 下,连接器使用 JVM 进程的堆内存来分配和管理缓冲的事件记录。如果您使用 内存 缓冲区设置,请确定分配给 Java 进程的内存量可以适合您的环境中的长时间运行和大型事务。

2.5.1.10. Debezium Oracle 连接器如何检测 SCN 值中的差距

当 Debezium Oracle 连接器配置为使用 LogMiner 时,它会使用一个基于系统更改号(SCN)的开始和结束范围从 Oracle 收集更改事件。连接器会自动管理此范围,根据连接器是否可以实时流更改,或者由于数据库中的大量或批量事务的卷而处理更改的积压。

在某些情况下,Oracle 数据库会使 SCN 变得非常高,而不是以恒定的速度增加 SCN 值。这种 SCN 值的 jump 可能会发生,因为特定集成与数据库交互,或者作为热备份等事件。

Debezium Oracle 连接器依赖于以下配置属性来检测 SCN 差距并调整 mining 范围。

log.mining.scn.gap.detection.gap.size.min
指定最小差距大小。
log.mining.scn.gap.detection.time.interval.max.ms
指定最大时间间隔。

连接器首先比较当前 SCN 和当前 mining 范围内的 SCN 之间的变化数量。如果当前 SCN 值和最高 SCN 值之间的区别大于最小差距,则连接器可能会检测到 SCN 差距。要确认是否存在差距,下一个连接器会比较当前 SCN 和前一个 mining 范围末尾的 SCN 的时间戳。如果时间戳之间的区别小于最大时间间隔,则会确认是否存在 SCN 差距。

当 SCN 差距发生时,Debezium 连接器会自动将当前的 SCN 用作当前 mining 会话范围的端点。这可让连接器快速捕获实时事件,而无需在返回任何更改之间减少较小的范围,因为 SCN 值被意外数量增加。当连接器执行上述响应 SCN 差距的步骤时,它会忽略 log.mining.batch.size.max 属性指定的值。连接器完成 mining 会话并捕获到实时事件后,它会恢复最大日志的强制批处理大小。

警告

只有在连接器运行和处理接近实时事件时,SCN 差距检测才可用。

2.5.1.11. Debezium 如何管理数据库中不经常更改的偏移量

Debezium Oracle 连接器跟踪连接器偏移中的系统更改号,以便在连接器重启时,它可以从其离开的位置开始。这些偏移是每个发出的更改事件的一部分;但是,当数据库频率较低(每数小时或天)时,偏移可能会变得过时,并防止连接器在事务日志中不再提供,防止连接器成功重启。

对于使用非CDB 模式连接到 Oracle 的连接器,您可以启用 heartbeat.interval.ms 来强制连接器定期发出 heartbeat 事件,以便偏移保持同步。

对于使用 CDB 模式连接到 Oracle 的连接器,维护同步更为复杂。不仅必须设置 heartbeat.interval.ms,还需要设置 heartbeat.action.query。需要指定这两个属性,因为在 CDB 模式中,连接器专门用于跟踪 PDB 中的更改。需要在可插拔数据库中触发更改事件所需的补充机制。定期,心跳操作查询会导致连接器插入新表行,或更新可插拔数据库中的现有行。Debezium 检测到表更改并为它们发出更改事件,确保偏移保持同步,即使在进程不经常更改的可插拔数据库中也是如此。

注意

要使连接器使用 heartbeat.action.query 以及不是 连接器用户帐户 的表,您必须授予连接器用户权限才能在这些表上运行必要的 INSERTUPDATE 查询。

2.5.2. Debezium Oracle 连接器数据更改事件的描述

Oracle 连接器发出的每个数据更改事件都有一个键和值。键和值的结构取决于更改事件源自的表。有关 Debezium 构造主题名称的详情,请参考 主题名称

警告

Debezium Oracle 连接器确保所有 Kafka Connect 模式名称都 是有效的 Avro 模式名称。这意味着,逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])开头,而逻辑服务器名称和模式名称和表名称中的所有字符必须是字母数字字符或下划线([a-z,A-Z,0-9,\_])。连接器会自动将无效的字符替换为下划线字符。

当多个逻辑服务器名称、模式名称或表名称不是有效的字符,且这些字符被替换为下划线时,意外命名冲突可能会导致。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间变化,主题消费者可能很难处理。为便于处理可变事件结构,Kafka Connect 中的每个事件都是自包含的。每个消息键和值有两个部分:schemapayload。模式描述了有效负载的结构,而有效负载包含实际数据。

警告

连接器不会捕获由 SYSSYSTEM 用户帐户执行的更改。

以下主题包含有关数据更改事件的更多详细信息:

2.5.2.1. 关于 Debezium Oracle 连接器更改事件中的键

对于每个更改的表,更改事件键的结构,以便在创建事件时,表的主键(或唯一键约束)中存在每个列的字段。

例如,在 inventory 数据库 schema 中定义的 customers 表可能有以下更改事件键:

CREATE TABLE customers (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);

如果 < topic.prefix>.transaction 配置属性的值被设置为 server1,则数据库中 customers 表中发生的每个更改事件的 JSON 表示具有以下关键结构:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,
        "name": "server1.INVENTORY.CUSTOMERS.Key"
    },
    "payload": {
        "ID": 1004
    }
}

密钥的 schema 部分包含一个 Kafka Connect 模式,用于描述密钥部分的内容。在前面的示例中,有效负载 值不是可选的,结构由名为 server1.DEBEZIUM.CUSTOMERS.Key 的 schema 定义,它有一个类型为 int32 的必需字段 id。键的 payload 字段的值表示它实际上是一个带有 id 字段的结构(在 JSON 中,是一个对象),其值为 1004

因此,您可以将这个键解释为 inventory.customers 表中的行(来自名为 server1 的连接器),其 id 主键列的值为 1004

2.5.2.2. 关于 Debezium Oracle 连接器更改事件中的值

更改事件消息中的值结构反映了消息中 change 事件中消息键 的结构,并且包含 schema 部分和 payload 部分。

更改事件值的有效负载

更改事件值的有效数据部分中有一个 envelope 数据结构,它包含以下字段:

op
包含描述操作类型的字符串值的必填字段。Oracle 连接器更改事件值有效负载中的 op 字段包含以下值之一: c (创建或插入)、u (update)、d (删除)或 r (读取,表示快照)。
before
存在的可选字段(如果存在)描述了事件 发生前 行的状态。该结构由 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述,server1 连接器用于 inventory.customers 表中的所有行。
after
存在的可选字段(如果存在)包含 更改后 行的状态。该结构由用于 before 字段的同一 server1.INVENTORY.CUSTOMERS.Value Kafka Connect schema 描述。
source

包含描述事件源元数据的结构的必填字段。对于 Oracle 连接器,结构包括以下字段:

  • Debezium 版本。
  • 连接器名称。
  • 事件是否是持续快照的一部分。
  • 事务 ID (未包含快照)。
  • 与 SCN (系统更改号)相关的以下值,数据库在提交更改时分配:

scn

数据库用来跟踪事务的唯一标识符。

start_scn

当事务启动时,SCN。

start_ts_ms

事务启动时的时间。

commit_ts_ms

事务提交的时间。

  • 根据运行 Kafka Connect 任务的 JVM 的系统时钟,代表连接器处理事件的时间戳。对于快照,时间戳表示快照何时发生。
    连接器在以下字段报告带有不同精度的时间戳值:

ts_ms

以毫秒为单位提供时间戳。

ts_us

以微秒为单位提供时间戳。

ts_ns

提供以纳秒为单位的时间戳。

  • 进行更改的用户名
  • 与行关联的 ROWID

    提示

    commit_scn 字段是可选的,描述了更改事件参与的事务提交的 SCN。只有在使用 LogMiner 连接适配器时,才会显示此字段。

    提示

    user_name 字段仅在使用 LogMiner 连接适配器时填充。

ts_ms

可选字段,如果存在,包含运行 Kafka Connect 任务的 JVM 中的时间(基于系统时钟),该字段处理事件。

更改事件值的 schema

事件消息值的 schema 部分包含一个 schema,它描述了有效负载的 envelope 结构及其中的嵌套字段。

有关更改事件值的更多信息,请参阅以下主题:

创建 事件

以下示例显示了 customers 表中 create 事件值的值,如 更改事件键 示例所述:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "rs_id"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ssn"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "redo_thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "user_name"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "row_id"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.oracle.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "source": {
            "version": "3.0.8.Final",
            "name": "server1",
            "ts_ms": 1520085154000,
            "ts_us": 1520085154000000,
            "ts_ns": 1520085154000000000,
            "txId": "6.28.807",
            "scn": "2122185",
            "commit_scn": "2122185",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA"
        },
        "op": "c",
        "ts_ms": 1532592105975,
        "ts_us": 1532592105975741,
        "ts_ns": 1532592105975741582
    }
}

在前面的示例中,注意事件如何定义以下模式:

  • 信封 (server1.DEBEZIUM.CUSTOMERS.Envelope)。
  • 结构(io.debezium.connector.oracle.Source,它特定于 Oracle 连接器并在所有事件间重复使用)。
  • beforeafter 字段的特定于表的模式。
提示

beforeafter 字段的 schema 的名称的格式为 <logicalName>.<schemaName>.<tableName>.Value, 因此完全独立与所有其他表的 schema。因此,当您使用 Avro converter 时,每个逻辑源中的表的 Avro 模式都有自己的演变和历史记录。

此事件的 valuepayload 部分提供有关事件的信息。它描述了创建了行(op=c),并显示 after 字段值包含插入到 IDFIRST_NAMELAST_NAMEEMAIL 列中的值。

提示

默认情况下,事件的 JSON 表示大于描述的行。较大的大小是因为 JSON 表示,包括消息的 schema 和 payload 部分。您可以使用 Avro Converter 减少连接器写入 Kafka 主题的消息大小。

更新 事件

以下示例显示了一个 update 更改事件,连接器从与以前的 create 事件相同的表中捕获。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "source": {
            "version": "3.0.8.Final",
            "name": "server1",
            "ts_ms": 1520085811000,
            "ts_us": 1520085811000000,
            "ts_ns": 1520085811000000000,
            "txId": "6.9.809",
            "scn": "2125544",
            "commit_scn": "2125544",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA"
        },
        "op": "u",
        "ts_ms": 1532592713485,
        "ts_us": 1532592713485152,
        "ts_ns": 1532592713485152954,
    }
}

有效负载的结构与 create (insert)事件有效负载相同,但以下值有所不同:

  • op 字段的值为 u,表示此行因为更新而改变。
  • before 字段显示行的前一状态,以及更新 数据库提交前存在的值。
  • after 字段显示行的更新状态,EMAIL 值现在设置为 anne@example.com
  • source 字段的结构包含与之前相同的字段,但值不同,因为连接器从红色日志的不同位置捕获事件。
  • ts_ms 字段显示显示 Debezium 处理事件的时间戳。

payload 部分显示一些其他有用的信息。例如,通过比较结构的 beforeafter 结构,我们可以确定在提交后如何更改行。 结构提供有关 Oracle 记录的信息,提供可追溯性。它还让我们了解此事件何时发生此主题中的其他事件和其他主题。它是否发生在与另一个事件相同的提交之前、之后还是作为其他事件的一部分?

注意

当更新行 primary/unique 键的列时,行的键值会改变。因此,Debebe 在更新后发出三个 事件:

  • DELETE 事件。
  • 一个 tombstone 事件,带有行的旧键。
  • 为行提供新密钥的 INSERT 事件。

删除 事件

以下示例显示了上一次 createupdate 事件示例中显示的表的 delete 事件。delete 事件的 schema 部分与这些事件的 schema 部分相同。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "after": null,
        "source": {
            "version": "3.0.8.Final",
            "name": "server1",
            "ts_ms": 1520085153000,
            "ts_us": 1520085153000000,
            "ts_ns": 1520085153000000000,
            "txId": "6.28.807",
            "scn": "2122184",
            "commit_scn": "2122184",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA"
        },
        "op": "d",
        "ts_ms": 1532592105960,
        "ts_us": 1532592105960854,
        "ts_ns": 1532592105960854693
    }
}

createupdate 事件相比,事件的 payload 部分显示了几个不同之处:

  • op 字段的值为 d,表示行已被删除。
  • before 字段显示与数据库提交删除的行前状态。
  • after 字段的值为 null,表示行不再存在。
  • source 字段的结构中包括了多个在 createupdate 事件中存在的键, 但 ts_ms, scn, 和 txId 中的值不同。
  • ts_ms 显示一个时间戳,指示 Debezium 处理此事件的时间。

delete 事件为用户提供了处理删除此行所需的信息。

Oracle 连接器的事件设计为与 Kafka 日志压缩 一起使用,这允许删除一些旧的信息,只要保留每个密钥的最新消息。这允许 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

删除行时,上例中显示的 delete 事件值仍可用于日志压缩,因为 Kafka 能够删除使用同一键的所有之前信息。message 值必须设置为 null,以指示 Kafka 删除共享同一键的所有消息。为了实现此目的,默认情况下 Debezium 的 Oracle 连接器总是遵循一个 delete 事件,它有一个特殊的 tombstone 事件,它具有相同的键但 null 值。您可以通过设置连接器属性 tombstones.on.delete 来改变默认的行为。

截断 事件

truncate 更改事件信号,提示表已被截断。message 键在本例中是 null,消息值类似如下:

{
    "schema": { ... },
    "payload": {
        "before": null,
        "after": null,
        "source": { 1
            "version": "3.0.8.Final",
            "connector": "oracle",
            "name": "oracle_server",
            "ts_ms": 1638974535000,
            "ts_us": 1638974535000000,
            "ts_ns": 1638974535000000000,
            "snapshot": "false",
            "db": "ORCLPDB1",
            "sequence": null,
            "schema": "DEBEZIUM",
            "table": "TEST_TABLE",
            "txId": "02000a0037030000",
            "scn": "13234397",
            "commit_scn": "13271102",
            "lcr_position": null,
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user"
        },
        "op": "t", 2
        "ts_ms": 1638974558961, 3
        "ts_us": 1638974558961987, 4
        "ts_ns": 1638974558961987251, 5
        "transaction": null
    }
}
表 2.114. truncate 事件值字段的描述
字段名称描述

1

source

描述事件源元数据的强制字段。在 truncate 事件值中,source 字段结构与为同一表的 create, update, 和 delete 事件相同,提供此元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 包含新行的数据库和表
  • 模式名称
  • 如果事件是快照的一部分(为 truncate 事件始终为 false
  • 执行操作的事务的 ID
  • 操作的 SCN
  • 在数据库中进行更改时的时间戳
  • 执行更改的用户名

2

op

描述操作类型的强制字符串。op 字段值为 t,表示此表已被截断。

3

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

如果单个 TRUNCATE 操作影响多个表,连接器会为每个 截断 表发出一个更改事件记录。

注意

truncate 事件表示对整个表所做的更改,没有消息键。因此,对于具有多个分区的主题,对更改事件没有排序保证(创建更新 等),或 截断 与表相关的事件。例如,如果消费者从多个分区读取表的事件,它可能会在收到 截断 事件后从一个分区接收一个表 的更新 事件,该事件会从另一个分区中删除表中的所有数据。排序只针对使用单个分区的主题保证。

如果您不希望连接器捕获 截断 事件,请使用 skipped.operations 选项过滤它们。

2.5.3. Debezium Oracle 连接器如何映射数据类型

当 Debezium Oracle 连接器检测到表行中的值更改时,它会发出一个代表更改的事件。每个更改事件记录的结构与原始表相同,事件记录包含每个列值的字段。表列的数据类型决定了连接器如何代表更改事件字段中的列值,如以下部分所示。

对于表中的每个列,Debebe 将源数据类型映射到 字面类型,在某些情况下,一个 语义类型,在对应的 event 字段中。

字面类型
描述如何按字面表示值,使用以下 Kafka Connect 模式类型之一: INT8,INT16,INT32,INT64, INT64 ,FLOAT32,FLOAT64,BOOLEAN,STRING,BYTES,ARRAY,MAP, 和 STRUCT.
语义类型
描述 Kafka Connect 模式如何使用字段的名称捕获字段 的含义

如果默认数据类型转换不满足您的需要,您可以为连接器 创建自定义转换器

对于某些 Oracle 大对象(CLOB、NCLOB 和 BLOB)和数字数据类型,您可以通过更改默认配置属性设置来操作连接器执行类型映射的方式。有关 Debezium 属性控制这些数据类型的映射的更多信息,请参阅 Binary 和 Character LOB 类型和 Numeric 类型

如需有关 Debezium 连接器如何映射 Oracle 数据类型的更多信息,请参阅以下主题:

字符类型

下表描述了连接器如何映射基本字符类型。

表 2.115. Oracle 基本字符类型的映射
Oracle 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

CHAR[(M)]

字符串

不适用

NCHAR[(M)]

字符串

不适用

NVARCHAR2[(M)]

字符串

不适用

VARCHAR[(M)]

字符串

不适用

VARCHAR2[(M)]

字符串

不适用

二进制和 Character LOB 类型

下表描述了连接器如何映射二进制和字符大对象(LOB)数据类型。

表 2.116. Oracle 二进制和字符 LOB 类型的映射
Oracle 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

BFILE

不适用

不支持这个数据类型

BLOB

BYTES

根据连接器配置中 binary.handling.mode 属性的设置,连接器会将此类型的 LOB 值映射到以下语义类型之一:

  • 原始字节(默认)
  • 一个 base64 编码的字符串
  • base64-url-safe-encoded 字符串
  • 一个十六进制编码的字符串

CLOB

字符串

不适用

LONG

不适用

不支持这个数据类型。

长原始

不适用

不支持这个数据类型。

NCLOB

字符串

不适用

RAW

不适用

根据连接器配置中 binary.handling.mode 属性的设置,连接器会将此类型的 LOB 值映射到以下语义类型之一:

  • 原始字节(默认)
  • 一个 base64 编码的字符串
  • base64-url-safe-encoded 字符串
  • 一个十六进制编码的字符串
注意

如果 Oracle 在 SQL 语句中明确设置了或更改,Oracle 仅为 CLOBNCLOBBLOB 数据类型提供列值。因此,更改事件永远不会包含未更改的 CLOBNCLOBBLOB 列的值。相反,它们包含连接器属性 unavailable.value.placeholder 定义的占位符。

如果 CLOBNCLOBBLOB 列的值已更新,则新值将放置在相应更新更改事件的 after 项中。before 元素包含不可用的值占位符。

数字类型

下表描述了 Debezium Oracle 连接器如何映射数字类型。

注意

您可以改变连接器映射 Oracle DECIMAL, NUMBER, NUMERIC, 和 REAL 数据类型的方式,方法是修改连接器的 decimal.handling.mode 配置属性的值。当将属性设置为 precise 的默认值时,连接器会将这些 Oracle 数据类型映射到 Kafka Connect org.apache.kafka.connect.data.Decimal 逻辑类型,如表中所示。当将 属性的值设置为 双引号字符串 时,连接器会对某些 Oracle 数据类型使用备用映射。如需更多信息,请参阅 下表中的 Semantic 类型和备注 列。

表 2.117. Oracle 数字数据类型的映射
Oracle 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

BINARY_FLOAT

FLOAT32

不适用

BINARY_DOUBLE

FLOAT64

不适用

DECIMAL[(P, S)]

BYTES / INT8 / INT16 / INT32 / INT64

org.apache.kafka.connect.data.Decimal (如果使用 BYTES

处理等效于 NUMBER )(请注意,S 默认为 DECIMAL)。

decimal.handling.mode 属性设为 double 时,连接器代表 DECIMAL 值,作为类型为 FLOAT64 的 Java double 值。

当将 decimal.handling.mode 属性设置为 string 时,连接器代表 DECIMAL 值作为使用 schema 类型 STRING 格式的字符串表示。

双精度

STRUCT

io.debezium.data.VariableScaleDecimal

包含两个字段的结构:type INT32,其中包含传输的值的 BYTES 的扩展,以未 扩展 的形式包含原始值。

FLOAT[(P)]

STRUCT

io.debezium.data.VariableScaleDecimal

包含两个字段的结构:type INT32,其中包含传输的值的 BYTES 的扩展,以未 扩展 的形式包含原始值。

整数,INT

BYTES

org.apache.kafka.connect.data.Decimal

INTEGER 映射到 Oracle 到 NUMBER (38,0),因此可以保存大于任何 INT 类型的值。

NUMBER[(P[, *])]

STRUCT

io.debezium.data.VariableScaleDecimal

包含两个字段的结构:type INT32,其中包含传输的值的 BYTES 的扩展,以未 扩展 的形式包含原始值。

当将 decimal.handling.mode 属性设置为 double 时,连接器将 NUMBER 值表示为带有 schema 类型 FLOAT64 的 Java double 值。

当将 decimal.handling.mode 属性设置为 string 时,连接器显示 NUMBER 值作为一个有特定格式的字符串(如 schema 类型 STRING)。

NUMBER (P, S <= 0)

INT8 / INT16 / INT32 / INT64

NUMBER 列,分值 0 代表整数。负规模表示 Oracle 中的循环,例如 -2 的扩展会导致舍入到数百个。

根据精度和缩放,选择以下匹配 Kafka Connect 整数类型之一:

  • P - S < 3, INT8
  • P - S < 5, INT16
  • P - S < 10, INT32
  • P - S < 19, INT64
  • P - S >= 19, BYTES (org.apache.kafka.connect.data.Decimal)

当将 decimal.handling.mode 属性设置为 double 时,连接器将 NUMBER 值表示为带有 schema 类型 FLOAT64 的 Java double 值。

当将 decimal.handling.mode 属性设置为 string 时,连接器显示 NUMBER 值作为一个有特定格式的字符串(如 schema 类型 STRING)。

NUMBER (P, S > 0)

BYTES

org.apache.kafka.connect.data.Decimal

NUMERIC[(P, S)]

BYTES / INT8 / INT16 / INT32 / INT64

org.apache.kafka.connect.data.Decimal (如果使用 BYTES

处理等效于 NUMBER )(请注意,S 默认为 0 用于 NUMERIC)。

decimal.handling.mode 属性设为 double 时,连接器代表 NUMERIC 值作为类型为 FLOAT64 的 Java double 值。

当将 decimal.handling.mode 属性设置为 string 时,连接器代表 NUMERIC 值作为其格式字符串,其格式为 STRING

SMALLINT

BYTES

org.apache.kafka.connect.data.Decimal

SMALLINT 映射到 Oracle 到 NUMBER (38,0),因此可以保存大于任何 INT 类型的值。

REAL

STRUCT

io.debezium.data.VariableScaleDecimal

包含两个字段的结构:type INT32,其中包含传输的值的 BYTES 的扩展,以未 扩展 的形式包含原始值。

当将 decimal.handling.mode 属性设置为 double 时,连接器将 REAL 值表示为带有 schema 类型为 FLOAT64 的 Java double 值。

当将 decimal.handling.mode 属性设置为 string 时,连接器将 REAL 值表示为使用 schema 类型 STRING 格式的字符串。

如前文所述,Oracle 允许 NUMBER 类型中的负扩展。当数字表示为 Decimal 时,这可能会导致转换为 Avro 格式的问题。十进制 类型包括缩放信息,但 Avro 规格 只允许规模的正数值。根据使用的 schema registry,可能会导致 Avro serialization 失败。要避免这个问题,您可以使用 NumberToZeroScaleConverter,它将带有负精度(小数点左面)的高的数字 (P - S >= 19) 转换为小数点右面零位的 Decimal 类型。它可以配置如下:

converters=zero_scale
zero_scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
zero_scale.decimal.mode=precise

默认情况下,数字会被转换为 Decimal 类型(zero_scale.decimal.mode=precise),但为了保证完全支持两种类型(字符串)也被支持。

布尔值类型

Oracle 不提供对 BOOLEAN 数据类型的原生支持。但是,通常使用带有特定语义的其他数据类型来模拟逻辑 BOOLEAN 数据类型的概念。

为了允许您将源列转换为布尔值数据类型,Debebe 提供了一个 NumberOneTo Boolean Converter 自定义转换器,您可使用以下方法之一使用:

  • 将所有 NUMBER (1) 列映射到 BOOLEAN 类型。
  • 使用以逗号分隔的正则表达式列表枚举列的子集。
    要使用这种类型的转换,您必须使用 selector 参数设置 转换器 配置属性,如下例所示:

    converters=boolean
    boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
    boolean.selector=.*MYTABLE.FLAG,.*.IS_ARCHIVED

临时类型

除了 Oracle INTERVAL,TIMESTAMP WITH TIME ZONE, 和 TIMESTAMP WITH LOCAL TIME ZONE 数据类型外,连接器转换时序类型的方式取决于 time.precision.mode 配置属性的值。

time.precision.mode 配置属性设置为 adaptive (默认值),那么连接器会根据列的数据类型确定 temporal 类型的字面和语义类型,以便事件 准确 表示数据库中的值:

Oracle 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

DATE

INT64

io.debezium.time.Timestamp

代表从 UNIX epoch 开始的毫秒数,且不包含时区信息。

间隔日[(M)] 到秒

FLOAT64

io.debezium.time.MicroDuration

每个月使用 365.25 / 12.0 公式表示的时间间隔的微秒数量。

io.debezium.time.Interval (when interval.handling.mode is set to string)

是遵循模式 P<years>Y<months>M<days>DT<hours>M<hours>DT<hours>DT<hours> <minutes<M>秒 的间隔值。例如 : P1Y2M3DT4H5M6.78 S。

间隔年[(M)] 到月

FLOAT64

io.debezium.time.MicroDuration

每个月使用 365.25 / 12.0 公式表示的时间间隔的微秒数量。

io.debezium.time.Interval (when interval.handling.mode is set to string)

是遵循模式 P<years>Y<months>M<days>DT<hours>M<hours>DT<hours>DT<hours> <minutes<M>秒 的间隔值。例如 : P1Y2M3DT4H5M6.78 S。

TIMESTAMP(0 - 3)

INT64

io.debezium.time.Timestamp

代表从 UNIX epoch 开始的毫秒数,且不包含时区信息。

时间戳、时间戳(4 - 6)

INT64

io.debezium.time.MicroTimestamp

代表从 UNIX epoch 开始的微秒数,且不包含时区信息。

TIMESTAMP (7 - 9)

INT64

io.debezium.time.NanoTimestamp

从 UNIX epoch 开始代表纳秒的数量,且不包含时区信息。

使用时区的时间戳

字符串

io.debezium.time.ZonedTimestamp

带有时区信息的时间戳的字符串。

带有本地时区的时间戳

字符串

io.debezium.time.ZonedTimestamp

UTC 中时间戳的字符串表示。

time.precision.mode 配置属性设置为 connect 时,连接器会使用预定义的 Kafka Connect 逻辑类型。当消费者只了解内置 Kafka Connect 逻辑类型且无法处理变量调整时间值时,这很有用。由于 Oracle 支持的精度级别超过 Kafka Connect 支持中的逻辑类型,如果将 time.precision.mode 设置为 connect,当数据库列的 fractional second precision 值大于 2 时,会出现丢失精度的结果:

Oracle 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

DATE

INT32

org.apache.kafka.connect.data.Date

代表从 UNIX epoch 开始的天数。

间隔日[(M)] 到秒

FLOAT64

io.debezium.time.MicroDuration

每个月使用 365.25 / 12.0 公式表示的时间间隔的微秒数量。

io.debezium.time.Interval (when interval.handling.mode is set to string)

是遵循模式 P<years>Y<months>M<days>DT<hours>M<hours>DT<hours>DT<hours> <minutes<M>秒 的间隔值。例如 : P1Y2M3DT4H5M6.78 S。

间隔年[(M)] 到月

FLOAT64

io.debezium.time.MicroDuration

每个月使用 365.25 / 12.0 公式表示的时间间隔的微秒数量。

io.debezium.time.Interval (when interval.handling.mode is set to string)

是遵循模式 P<years>Y<months>M<days>DT<hours>M<hours>DT<hours>DT<hours> <minutes<M>秒 的间隔值。例如 : P1Y2M3DT4H5M6.78 S。

TIMESTAMP(0 - 3)

INT64

org.apache.kafka.connect.data.Timestamp

代表从 UNIX epoch 开始的毫秒数,且不包含时区信息。

TIMESTAMP (4 - 6)

INT64

org.apache.kafka.connect.data.Timestamp

代表从 UNIX epoch 开始的毫秒数,且不包含时区信息。

TIMESTAMP (7 - 9)

INT64

org.apache.kafka.connect.data.Timestamp

代表从 UNIX epoch 开始的毫秒数,且不包含时区信息。

使用时区的时间戳

字符串

io.debezium.time.ZonedTimestamp

带有时区信息的时间戳的字符串。

带有本地时区的时间戳

字符串

io.debezium.time.ZonedTimestamp

UTC 中时间戳的字符串表示。

ROWID 类型

下表描述了连接器如何映射 ROWID (托管地址)数据类型。

表 2.118. Oracle ROWID 数据类型的映射
Oracle 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

ROWID

字符串

使用 Oracle XStream 时不支持此数据类型。

UROWID

不适用

不支持此数据类型

XML 类型

XMLTYPE 与 Debezium Oracle 连接器一起使用只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅 https://access.redhat.com/support/offerings/techpreview

下表描述了连接器如何映射 XMLTYPE 数据类型。

表 2.119. Oracle XMLTYPE 数据类型的映射
Oracle 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

XMLTYPE

字符串

io.debezium.data.Xml

用户定义的类型

Oracle 允许您定义自定义数据类型,以便在内置数据类型不符合您的要求时提供灵活性。有几个用户定义的类型,如对象类型、REF 数据类型、Varrays 和 Nested Tables。目前,您不能将 Debezium Oracle 连接器用于任何用户定义的类型。

Oracle 提供的类型

Oracle 提供基于 SQL 的接口,可用于在内置或 ANSI 支持的类型不足时定义新的类型。Oracle 提供多种常用的数据类型来满足各种目的,如 AnySpatial 类型。目前,您不能将 Debezium Oracle 连接器用于任何这些数据类型。

默认值

如果为数据库模式中的列指定默认值,Oracle 连接器会尝试将此值传播到对应的 Kafka 记录字段的 schema。最常见的数据类型包括:

  • 字符类型(CHARNCHARVARCHAR、VARCHAR 2、NVARCHAR 2、NVARCHAR2)
  • 数字类型(inTEGERNUMERIC 等)
  • 时序类型(DATETIMESTAMPINTERVAL 等等)

如果 temporal 类型使用 TO_TIMESTAMPTO_DATE 等函数调用来代表默认值,则连接器将通过生成额外的数据库调用来评估该函数来解析默认值。例如,如果使用默认值 TO_ DATE ('2021-01-02', 'YYYY-MM-DD') 定义 DATE 列,则列的默认值将是该日期的 UNIX epoch 或 18629 开始的天数。

如果 temporal 类型使用 SYSDATE 常代表默认值,则连接器会根据列为 NOT NULLNULL 来解决此问题。如果列可为空,则不会设置默认值;但是,如果列不可为空,则默认值将解析为 0(用于 DATETIMESTAMP(n) 数据类型)或 1970-01-01T00:00:00Z (用于 TIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE 数据类型)。默认值为数字,除非列是 TIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE,在这种情况下,其作为字符串发出。

自定义转换器

默认情况下,Debezium Oracle 连接器提供多个特定于 Oracle 数据类型的 CustomConverter 实现。这些自定义转换器根据连接器配置为特定数据类型提供替代映射。要在连接器中添加 CustomConverter,请按照 Custom Converters 文档中的 说明进行操作。

Debezium Oracle 连接器提供以下自定义转换器:

NUMBER (1) 到布尔值

从版本 23 开始,Oracle 数据库提供 BOOLEAN 逻辑数据类型。在早期版本中,数据库使用 NUMBER (1) 数据类型来模拟 BOOLEAN 类型,使用值 0 代表 false,或值 1 代表 true。

默认情况下,当 Debezium 为使用 NUMBER (1) 数据类型的源列发出更改事件时,它会将数据转换为 INT8 字面类型。如果 NUMBER (1) 数据类型的默认映射不满足您的需要,您可以将连接器配置为在通过配置 NumberOneToBooleanConverter 来发送这些列时使用逻辑 BOOL 类型,如下例所示:

示例: NumberOneToBooleanConverter 配置

converters=number-to-boolean
number-to-boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
number-to-boolean.selector=.*.MY_TABLE.DATA

在前面的示例中,selector 属性是可选的。selector 属性指定转换器应用到的表或列的正则表达式。如果省略 selector 属性,当 Debezium 发出一个事件时,带有 NUMBER (1) 数据类型的每个列都会转换为使用逻辑 BOOL 类型的字段。

NUMBER To Zero Scale

Oracle 支持 创建基于 带有负缩放的 NUMBER 列,即 NUMBER (-2)。并非所有系统都可以处理负缩放值,因此这些值可能会导致管道中处理问题。例如,因为 Apache Avro 不支持这些值,因此如果 Debezium 将事件转换为 Avro 格式,则可能会出现问题。同样,不支持这些值的下游用户也会遇到错误。

配置示例

converters=number-zero-scale
number-zero-scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
number-zero-scale.decimal.mode=precise

在前面的示例中,decimal.mode 属性指定连接器如何发送十进制值。此属性是可选的。如果省略 decimal.mode 属性,则转换器默认使用 PRECISE 十进制处理模式。

RAW 到字符串

虽然 Oracle 建议使用某些数据类型,如 RAW,但旧系统可能会继续使用此类类型。默认情况下,Debebe 将 RAW 列类型作为逻辑 BYTES 发送,这是一种类型,用于启用二进制或基于文本的数据存储。

在某些情况下,RAW 列可能会将字符数据存储为一系列字节。要协助消费者使用,您可以将 Debezium 配置为使用 RawToStringConverterRawToStringConverter 提供了一种方式,可以轻松地以此类 RAW 列为目标,并以字符串的形式发送值,而不是字节。以下示例演示了如何将 RawToStringConverter 添加到连接器配置中:

示例: RawToStringConverter 配置

converters=raw-to-string
raw-to-string.type=io.debezium.connector.oracle.converters.RawToStringConverter
raw-to-string.selector=.*.MY_TABLE.DATA

在前面的示例中,选择器 属性允许您定义一个正则表达式,以指定转换器进程的表或列。如果省略 selector 属性,则转换器将所有 RAW 列类型映射到逻辑 STRING 字段类型。

2.5.4. 设置 Oracle 以使用 Debezium

设置 Oracle 以用于 Debezium Oracle 连接器需要执行下列步骤。这些步骤假定将多租户配置与容器数据库一起使用,以及至少一个可插拔数据库。如果您不打算使用多租户配置,可能需要调整以下步骤。

有关设置用于 Debezium 连接器的 Oracle 的详情,请参考以下部分:

2.5.4.1. Debezium Oracle 连接器与 Oracle 安装类型的兼容性

Oracle 数据库可以作为独立实例安装,也可以使用 Oracle Real Application Cluster (RAC)。Debezium Oracle 连接器与两种类型的安装兼容。

2.5.4.2. 在捕获更改事件时 Debezium Oracle 连接器不包括的 schema

当 Debezium Oracle 连接器捕获表时,它会自动从以下模式中排除表:

  • appqossys
  • audsys
  • ctxsys
  • dvsys
  • dbsfwuser
  • dbsnmp
  • qsmadmin_internal
  • lbacsys
  • mdsys
  • ojvmsys
  • olapsys
  • orddata
  • ordsys
  • outln
  • sys
  • system
  • vecsys (Oracle 23+)
  • wmsys
  • xdb

要启用连接器从表中捕获更改,该表必须使用在前面的列表中未命名的模式。

2.5.4.3. 在捕获更改事件时 Debezium Oracle 连接器排除的表

当 Debezium Oracle 连接器捕获表时,它会自动排除与以下规则匹配的表:

  • CMP[3|4hmac[0-9]+ 匹配的压缩顾问表。
  • SYS_IOT_OVER_% 模式匹配的 index-organized 表。
  • 与模式 MDRT_%MDRS_%MDXT_% 匹配的 spatial 表。
  • 嵌套表

要让连接器捕获名称与上述规则匹配的表,您必须重命名该表。

2.5.4.4. 准备 Oracle 数据库以用于 Debezium

Oracle LogMiner 所需的配置

ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list

exit;

Oracle AWS RDS 不允许执行上述命令,也允许您以 sysdba 身份登录。AWS 提供了这些替代命令来配置 LogMiner。在执行这些命令前,请确保为备份启用了 Oracle AWS RDS 实例。

要确认 Oracle 启用了备份,请首先执行以下命令。LOG_MODE 应使用 ARCHIVELOG。如果没有,您可能需要重启 Oracle AWS RDS 实例。

Oracle AWS RDS LogMiner 所需的配置

SQL> SELECT LOG_MODE FROM V$DATABASE;

LOG_MODE
------------
ARCHIVELOG

当 LOG_MODE 设置为 ARCHIVELOG 后,执行命令以完成 LogMiner 配置。第一个命令将数据库设置为 archivelogs,第二个命令添加了 supplemental 日志记录。

Oracle AWS RDS LogMiner 所需的配置

exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours',24);

exec rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD');

要让 Debezium 捕获更改数据库行之前的状态,还必须为捕获的表或整个数据库启用附件日志记录。以下示例演示了如何为单个 inventory.customers 表中的所有列配置补充日志记录。

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

为所有表列启用附加日志记录会增加 Oracle redo 日志的卷。要防止日志大小过度增长,请有选择地应用前面的配置。

在数据库级别上必须启用最少的附件日志记录,并可以配置如下:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

2.5.4.5. 调整 Oracle redo 日志的大小以容纳数据字典

根据数据库配置,恢复日志的大小和数量可能不足以达到可接受的性能。在设置 Debezium Oracle 连接器前,请确保 redo 日志的容量足以支持数据库。

数据库恢复日志的容量必须足以存储其数据字典。通常,数据字典的大小会随着数据库中表和列的数量而增加。如果红色日志缺少足够容量,数据库和 Debezium 连接器都可能会遇到性能问题。

请咨询您的数据库管理员,以评估数据库可能需要增加日志容量。

2.5.4.6. 指定 Debezium Oracle 连接器使用的归档日志目的地

Oracle 数据库管理员可以为存档日志配置最多 31 个不同的目的地。管理员可以为每个目的地设置参数,以为特定用途指定它,例如,记录物理待机日志或外部存储以允许扩展日志保留。Oracle 在 V$ARCHIVE_DEST_STATUS 视图中报告有关归档日志目的地的详细信息。

Debezium Oracle 连接器只使用状态为 VALIDLOCAL 的目的地。如果您的 Oracle 环境包含多个满足该条件的目的地,请咨询您的 Oracle 管理员以确定应该使用哪个归档日志目的地。

流程

  • 要指定要使用的存档日志目的地,请在连接器配置中设置 archive.destination.name 属性。

    例如,假设数据库配置了两个归档目标路径 /path/ one 和 /path /two,并且 V$ARCHIVE_DEST_STATUS 表将这些路径与列 DEST_NAME 中指定的目标名称相关联。如果两个目的地都满足 Debezium swig-two 的标准,则 其状态为 VALID,并且其 类型是 LOCAL HEKETI-busybox,将连接器配置为使用数据库写入 /path/two 的归档日志,将 archive.destination.name 的值设置为 DEST_NAME 列中的值,该列中的 /path/twoV$ARCHIVE_DEST_STATUS 表中关联的值。例如,如果 DEST_NAMELOG_ARCHIVE_DEST_3 用于 /path/two,您可以将 Debezium 配置为:
{
  "archive.destination.name": "LOG_ARCHIVE_DEST_3"
}
注意

不要将 archive.destination.name 的值设置为数据库用于归档日志的路径。将 属性设置为满足归档日志保留策略的 V$ARCHIVE_DEST_STAT_STATUS 表中一行的 DEST_ NAME 列中的归档日志目的地名称。

警告

如果您的 Oracle 环境包含多个满足该条件的目的地,并且您无法指定首选目的地,Debezium Oracle 连接器会随机选择目的地路径。因为为每个目的地配置的保留策略可能会有所不同,因此如果连接器选择从中删除请求的日志数据的路径,这可能会导致错误。

2.5.4.7. 为 Debezium Oracle 连接器创建 Oracle 用户

要使 Debezium Oracle 连接器捕获更改事件,它必须以具有特定权限的 Oracle LogMiner 用户身份运行。以下示例显示了用于在多租户数据库模型中为连接器创建 Oracle 用户帐户的 SQL。

警告

连接器捕获其自身 Oracle 用户帐户所做的数据库更改。但是,它不会捕获 SYSSYSTEM 用户帐户所做的更改。

创建连接器的 LogMiner 用户

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba

  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE logminer_tbs
    QUOTA UNLIMITED ON logminer_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; 1
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; 2
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; 3
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; 4
  GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; 5
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 6
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 7
  GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; 8
  GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; 9

  GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; 10
  GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; 11
  GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; 12

  GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; 13
  GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; 14

  GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; 15
  GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; 16
  GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; 17
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; 18
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; 19
  GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; 20
  GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; 21
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL; 22
  GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL; 23

  GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL; 24
  GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL; 25

  exit;

表 2.120. 权限/授予的描述
角色名称描述

1

创建会话

启用连接器连接到 Oracle。

2

设置容器

启用连接器在可插拔数据库间切换。只有在 Oracle 安装启用了容器数据库支持(CDB)时才需要这样做。

3

SELECT ON V_$DATABASE

启用连接器读取 V$DATABASE 表。

4

FLASHBACK 任何表

启用连接器执行 Flashback 查询,这是连接器如何执行数据的初始快照。另外,您还可以为所有表授予 FLASHBACK 权限,而是只为特定表授予 FLASHBACK 权限。

5

选择任何表

启用连接器读取任何表。另外,您还可以为所有表授予 SELECT 权限,而是为特定表授予 SELECT 特权。

6

SELECT_CATALOG_ROLE

启用连接器读取数据字典,这是 Oracle LogMiner 会话所需的。

7

EXECUTE_CATALOG_ROLE

启用连接器将数据字典写入 Oracle redo 日志中,这是跟踪 schema 更改所必需的。

8

选择任何事务

启用快照进程,以针对任何事务执行 Flashback 快照查询。当授予 FLASHBACK ANY TABLE 时,也应授予它。

9

LOGMINING

在较新版本的 Oracle 中添加了此角色,作为授予 Oracle LogMiner 及其软件包的完整访问权限的方法。在没有此角色的较早版本的 Oracle 上,您可以忽略此授权。

10

创建表

启用连接器在其默认表空间中创建其 flush 表。flush 表允许连接器明确控制 LGWR 内部缓冲区刷新到磁盘。

11

锁定任何表

启用连接器在模式快照期间锁定表。如果通过配置明确禁用了快照锁定,则可以安全地忽略这个授权。

12

创建序列

启用连接器在默认表空间中创建序列。

13

EXECUTE ON DBMS_LOGMNR

启用连接器在 DBMS_LOGMNR 软件包中运行方法。这需要与 Oracle LogMiner 交互。在较新的 Oracle 版本中,这通过 LOGMINING 角色授予,但在旧版本中,必须明确授予它。

14

EXECUTE ON DBMS_LOGMNR_D

启用连接器在 DBMS_LOGMNR_D 软件包中运行方法。这需要与 Oracle LogMiner 交互。在较新的 Oracle 版本中,这通过 LOGMINING 角色授予,但在旧版本中,必须明确授予它。

15 到 25

SELECT ON V_$…​.

启用连接器来读取这些表。连接器必须能够读取 Oracle redo 和 archive 日志的信息,以及当前的事务状态,才能准备 Oracle LogMiner 会话。如果没有这些授权,连接器无法操作。

2.5.4.8. 使用 Oracle 待机数据库运行连接器

备用数据库提供主实例的同步副本。如果出现主数据库故障,备用数据库提供连续可用性和灾难恢复。Oracle 同时使用物理和逻辑备用数据库。

物理待机

物理待机是主生产数据库的确切块副本,其系统更改号(SCN)值与主数据库的值相同。Debezium Oracle 连接器无法直接从物理待机数据库捕获更改事件,因为物理待机不接受外部连接。连接器只能在将待机转换为主数据库后从物理待机中捕获事件。然后,连接器会连接到以前的待机,就像是任何主数据库一样。

逻辑待机

逻辑待机包含与主数据相同的逻辑数据,但数据可能会以不同的物理方式存储。逻辑待机中的 SCN 偏移与主数据库中的偏移量不同。您可以将 Debezium Oracle 连接器配置为从逻辑待机数据库捕获更改

2.5.4.8.1. 从 Oracle 故障转移数据库捕获数据

当您设置故障转移数据库时,通常最好使用物理备用数据库而不是逻辑待机数据库。物理待机与主数据库保持一致状态,而不是逻辑待机。物理待机包含主数据的确切副本,并且待机系统的更改号(SCN)值与主数据的不同。在 Debezium 环境中,数据库故障转移到物理待机后,存在一致的 SCN 值可确保连接器可以找到最后处理的 SCN 值。

物理待机以只读模式锁定,运行受管恢复以维护同步。当数据库处于待机模式时,它不接受来自客户端的外部 JDBC 连接,并且外部应用无法访问它。

在失败事件后,要允许 Debezium 连接到以前的物理待机,DBA 必须执行几个操作来启用故障转移到待机,并将它提升为主数据库。以下列表标识了一些关键操作:

  • 在待机上取消受管恢复。
  • 完成活动的恢复过程。
  • 将待机转换为主要角色。
  • 打开客户端读写操作的新主要内容。

在以前的物理待机可用后,您可以将 Debezium Oracle 连接器配置为连接它。要让连接器从新的主设备捕获,请在连接器配置中编辑数据库主机名,将原始主的主机名替换为新主主机名。

2.5.4.8.2. 配置 Debezium Oracle 连接器以从逻辑待机中捕获事件

当 Oracle 的 Debezium 连接器连接到主数据库时,它使用内部清除表来管理 Oracle Log Writer Buffer (LGWR)进程的清除周期。flush 进程要求用户帐户访问数据库具有创建和写入此清除表的权限。但是,逻辑独立数据库通常允许只读访问,从而导致连接器写入数据库。您可以修改连接器配置,使连接器可以从逻辑待机捕获事件,或者 DBA 可以创建一个新的可写表空间,连接器可以在其中存储 flush 表。

重要

Debezium Oracle 连接器从只读逻辑待机数据库中尽力更改是开发者预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。此软件可能没有任何文档,可以随时更改或删除,并且已获得有限的测试。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。

有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围

流程

  • 要启用 Debezium 从 Oracle 只读逻辑待机数据库中捕获事件,请在连接器配置中添加以下属性,以禁用清除表的创建和管理:

    internal.log.mining.read.only=true

    上述设置可防止数据库创建和更新 LOG_MINING_FLUSH 表。您可以将 internal.log.mining.read.only 属性与 Oracle Standalone 数据库一起使用,或者与 Oracle RAC 安装一起使用。

扩展最大字符串大小

数据库参数 max_string_size 控制 Oracle 数据库以及扩展 Debezium 如何解释 VARCHAR2NVARCHAR2RAW 字段的值。默认 STANDARD 意味着这些数据类型的长度与 Oracle 12c 之前的发行版本(对于 VARCHAR2NVARCHAR2 和 2000 字节)具有相同的限制。当配置为 EXTENDED 时,这些列现在允许存储最多 32767 字节的数据。

警告

虽然数据库管理员可以将 max_string_sizeSTANDARD 更改为 EXTENDED,但不允许相反。数据库更新至 EXTENDED 字符串支持后,该字符串无法撤消。

对于 Debezium Oracle 连接器,当数据库参数 max_string_sizeEXTENDED 时,应该将 lob.enabled 连接器配置选项设为 true,以捕获对 VARCHAR2NVARCHAR2 字段的更改,其长度超过 4000 字节或 RAW 字段,且超过 2000 字节。

当设置为 EXTENDED 时,当字符串数据的字节长度超过旧最大值时,Oracle 会执行字符数据的隐式转换。这种隐式转换意味着 Oracle 内部将字符串数据视为 CLOB,因此您可以获得将字段视为外部世界的常规字符串的好处,但所有缺陷以及有关数据库层级别的存储有疑问。

由于 Oracle 在内部将这些字符串视为 CLOB,因此红色日志也会反映 Debezium Oracle 连接器需要注意的一些唯一操作类型,它应该 mine。由于这些操作类型与 CLOB 操作非常相似,因此无论字符串数据的字节长度如何,必须以与其它 LOB 类型相同的方式从红色日志捕获更改。

警告

当 Oracle 配置为使用 EXTENDED 字符串大小时,当 LogMiner 为扩展字符串字段重新构建 SQL 时,LogMiner 有时无法转义单引号字符(')。如果扩展字符串字段的字节长度不超过旧的最大长度,则可能会出现此问题。因此,这些字段中的值可以被截断,从而导致 Oracle 连接器无法解析无效 SQL 语句。

为了帮助解决某些问题实例,您可以通过将以下属性设置为 true 将连接器配置为 relax 单引号检测:

internal.log.mining.sql.relaxed.quote.detection

如需更多信息,请参阅 Red Hat Integration 3.0.8 的发行注记

2.5.5. 部署 Debezium Oracle 连接器

您可以使用以下任一方法部署 Debezium Oracle 连接器:

重要

由于许可证要求,Debezium Oracle 连接器存档不包括连接器连接到 Oracle 数据库所需的 Oracle JDBC 驱动程序。要启用连接器访问数据库,您必须在连接器环境中添加驱动程序。如需更多信息,请参阅 获取 Oracle JDBC 驱动程序

2.5.5.1. 获取 Oracle JDBC 驱动程序

由于许可证要求,Debezium 连接到 Oracle 数据库所需的 Oracle JDBC 驱动程序文件不包括在 Debezium Oracle 连接器存档中。该驱动程序可从 Maven Central 下载。根据您使用的部署方法,您可以通过将命令添加到 Kafka Connect 自定义资源或用于构建连接器镜像的 Dockerfile 来检索驱动程序。

2.5.5.2. 使用 Streams for Apache Kafka 的 Debezium Oracle 连接器部署

部署 Debezium 连接器的首选方法是使用 Streams for Apache Kafka 来构建包含连接器插件的 Kafka Connect 容器镜像。

在部署过程中,您要创建和使用以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR,并包含有关镜像中包含的连接器工件的信息。
  • 提供包括连接器用来访问源数据库的信息的 KafkaConnector CR。在 Apache Kafka 的 Streams 启动 Kafka Connect pod 后,您可以通过应用 KafkaConnector CR 来启动连接器。

在 Kafka Connect 镜像的构建规格中,您可以指定用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 Apache Kafka 的 Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。

KafkaConnect CR 中的 spec.build.output 参数指定在存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在容器 registry 中,如 quay.io 或 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。

注意

如果使用 KafkaConnect 资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。

其他资源

2.5.5.3. 使用 Streams for Apache Kafka 部署 Debezium Oracle 连接器

对于 Apache Kafka 的早期版本,要在 OpenShift 上部署 Debezium 连接器,首先需要为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 Apache Kafka 的 Streams 中的构建配置,来自动构建包含您要使用的 Debezium 连接器插件的 Kafka Connect 容器镜像。

在构建过程中,Apache Kafka Operator 的 Streams 将 KafkaConnect 自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。

新创建的容器被推送到 .spec.build.output 中指定的容器 registry,并用于部署 Kafka Connect 集群。在 Apache Kafka 的 Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。

先决条件

  • 您可以访问安装了集群 Operator 的 OpenShift 集群。
  • Apache Kafka Operator 的 Streams 正在运行。
  • 部署了 Apache Kafka 集群,如 在 OpenShift 中部署和管理 Apache Kafka 的流 中所述。
  • Kafka Connect 部署在 Apache Kafka 的 Streams 中
  • 您有一个红帽构建的 Debezium 许可证。
  • OpenShift oc CLI 客户端已安装,或者您可以访问 OpenShift Container Platform Web 控制台。
  • 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:

    将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
    • 在 registry 中创建和管理镜像的帐户和权限。
    将构建镜像存储为原生 OpenShift ImageStream

流程

  1. 登录 OpenShift 集群。
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有的资源。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 metadata.annotationsspec.build 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

    例 2.34. 定义包含 Debezium 连接器的 KafkaConnect 自定义资源的 dbz-connect.yaml 文件

    在以下示例中,自定义资源被配置为下载以下工件:

    • Debezium Oracle 连接器存档。
    • 红帽构建的 Apicurio Registry 归档。Apicurio Registry 是一个可选组件。只有在打算将 Avro serialization 与连接器一起使用时,才添加 Apicurio Registry 组件。
    • Debezium 脚本 SMT 归档以及您要用于 Debezium 连接器的相关语言依赖项。SMT 归档和语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才添加这些组件。
    • Oracle JDBC 驱动程序需要连接到 Oracle 数据库,但不包含在连接器存档中。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.9.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-oracle
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-oracle/3.0.8.Final-redhat-00004/debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.5.11.redhat-00001/apicurio-registry-distro-connect-converter-2.5.11.redhat-00001.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/3.0.8.Final-redhat-00004/debezium-scripting-3.0.8.Final-redhat-00004.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
              - type: jar          11
                url: https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc11/21.6.0.0/ojdbc11-21.6.0.0.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    表 2.121. Kafka Connect 配置设置的描述
    描述

    1

    strimzi.io/use-connector-resources 注解设置为 "true",以便 Cluster Operator 使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定存储构建镜像的位置,并列出镜像中包含的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建的镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值为 docker,可推送到容器 registry (如 Docker Hub 或 Quay)或 镜像流 (用于将镜像推送到内部 OpenShift ImageStream)。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅 Streams for Apache Kafka API 参考中的 Build schema 参考

    5

    插件配置 列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括要用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定 artifacts.url 中指定的工件的文件类型。有效类型是 ziptgz、或 jar。Debezium 连接器存档以 .zip 文件格式提供。JDBC 驱动程序文件采用 .jar 格式。type 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Red Hat Maven 存储库中提供了 Debezium 连接器工件。OpenShift 集群必须有权访问指定的服务器。

    8

    (可选)指定下载 Apicurio Registry 组件的工件 类型和 url。包括 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化事件键和值,使用红帽构建的 Apicurio Registry 的值,而不是使用默认的 JSON 转换。

    9

    (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您还必须部署 JSR 223 兼容脚本实施,如 groovy。

    10

    (可选)指定与 JSR 223 脚本实施的 JAR 文件的工件 类型和 url,这是 Debezium 脚本 SMT 所需的。

    重要

    如果您使用 Streams for Apache Kafka 将连接器插件合并到 Kafka Connect 镜像中,对于每个所需的脚本语言组件 artifacts.url 必须指定 JAR 文件的位置,并且 artifacts.type 的值也必须设置为 jar。无效的值会导致连接器在运行时失败。

    要启用将 Apache Groovy 语言与脚本 SMT 搭配使用,示例中的自定义资源会检索以下库的 JAR 文件:

    • groovy
    • groovy-jsr223 (脚本代理)
    • groovy-json (用于解析 JSON 字符串的模块)

    Debezium 脚本 SMT 还支持使用 GraalVM JavaScript 的 JSR 223 实施。

    11

    指定 Maven Central 中 Oracle JDBC 驱动程序的位置。Debezium Oracle 连接器存档中不包含所需的驱动程序。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml

    根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您在配置中列出的连接器工件在集群中可用。

  4. 创建一个 KafkaConnector 资源来定义您要部署的每个连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 oracle-inventory-connector.yaml

    例 2.35. 为 Debezium 连接器定义 KafkaConnector 自定义资源的 Oracle -inventory-connector.yaml 文件

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-oracle 1
    spec:
      class: io.debezium.connector.oracle.OracleConnector 2
      tasksMax: 1  3
      config:  4
        schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
        schema.history.internal.kafka.topic: schema-changes.inventory
        database.hostname: oracle.debezium-oracle.svc.cluster.local 5
        database.port: 1521   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        topic.prefix: inventory-connector-oracle 10
        table.include.list: PUBLIC.INVENTORY  11
    
        ...
    表 2.122. 连接器配置设置的描述
    描述

    1

    要注册到 Kafka Connect 集群的连接器名称。

    2

    连接器类的名称。

    3

    可同时操作的任务数量。

    4

    连接器的配置。

    5

    主机数据库实例的地址。

    6

    数据库实例的端口号。

    7

    Debezium 用来连接到数据库的帐户名称。

    8

    Debezium 用来连接到数据库用户帐户的密码。

    9

    要从中捕获更改的数据库的名称。

    10

    数据库实例或集群的主题前缀。
    指定的名称只能从字母数字字符或下划线构成。
    因为主题前缀用作从此连接器接收更改事件的 Kafka 主题的前缀,因此该名称在集群中的连接器中必须是唯一的。
    如果您将连接器与 Avro 连接器集成,则此命名空间也用于相关的 Kafka Connect 模式的名称,以及对应的 Avro 模式的命名空间。

    11

    连接器捕获更改事件的表列表。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    例如,

    oc create -n debezium -f oracle-inventory-connector.yaml

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。

您现在已准备好 验证 Debezium Oracle 部署

2.5.5.4. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium Oracle 连接器

要部署 Debezium Oracle 连接器,您必须构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,然后将此容器镜像推送到容器 registry。然后,您需要创建以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR。CR 中的 image 属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat Streams for Apache Kafka 的 OpenShift 实例。Apache Kafka 的流提供将 Apache Kafka 到 OpenShift 的 operator 和镜像。
  • 定义 Debezium Oracle 连接器的 KafkaConnector CR。将此 CR 应用到应用 KafkaConnect CR 的同一 OpenShift 实例。

先决条件

  • Oracle 数据库正在运行,您完成了 设置 Oracle 以使用 Debezium 连接器 的步骤。
  • Apache Kafka 的流部署在 OpenShift 中,它正在运行 Apache Kafka 和 Kafka Connect。如需更多信息,请参阅在 OpenShift 中部署和管理 Apache Kafka的流
  • podman 或 Docker 已安装。
  • 您有在容器 registry (如 quay.iodocker.io)中创建和管理容器的帐户和权限,您要添加将运行 Debezium 连接器的容器。
  • Kafka Connect 服务器有权访问 Maven Central,以下载 Oracle 所需的 JDBC 驱动程序。您还可以使用驱动程序的本地副本,或使用本地 Maven 存储库或其他 HTTP 服务器可用的副本。

    如需更多信息,请参阅 获取 Oracle JDBC 驱动程序

流程

  1. 为 Kafka Connect 创建 Debezium Oracle 容器:

    1. 创建一个 Dockerfile,它使用 registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0 作为基础镜像。例如,在终端窗口中输入以下命令:

      cat <<EOF >debezium-container-for-oracle.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-oracle/3.0.8.Final-redhat-00004/debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip \
      && unzip debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip \
      && rm debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://repo1.maven.org/maven2/com/oracle/ojdbc/ojdbc11/21.6.0.0/ojdbc11-21.6.0.0.jar
      USER 1001
      EOF
      描述

      1

      您可以指定您想要的任何文件名。

      2

      指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为您的目录的实际路径。

      该命令在当前目录中创建一个名为 debezium-container-for-oracle.yaml 的 Dockerfile。

    2. 从您在上一步中创建的 debezium-container-for-oracle.yaml Docker 文件中构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:

      podman build -t debezium-container-for-oracle:latest .
      docker build -t debezium-container-for-oracle:latest .

      前面的命令使用名称 debezium-container-for-oracle 构建容器镜像。

    3. 将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器镜像仓库必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:

      podman push <myregistry.io>/debezium-container-for-oracle:latest
      docker push <myregistry.io>/debezium-container-for-oracle:latest
    4. 创建新的 Debezium Oracle KafkaConnect 自定义资源(CR)。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 注解和 镜像 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        image: debezium-container-for-oracle 2
      
        ...
      描述

      1

      metadata.annotations 表示 KafkaConnector 资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。

      2

      spec.image 指定为运行 Debezium 连接器而创建的镜像的名称。此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。

    5. 输入以下命令将 KafkaConnect CR 应用到 OpenShift Kafka Connect 环境:

      oc create -f dbz-connect.yaml

      该命令添加一个 Kafka Connect 实例,用于指定为运行 Debezium 连接器而创建的镜像的名称。

  2. 创建一个 KafkaConnector 自定义资源,用于配置 Debezium Oracle 连接器实例。

    您可以在指定连接器的配置属性的 .yaml 文件中配置 Debezium Oracle 连接器。连接器配置可能会指示 Debezium 为模式和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略、掩码或截断指定列中的值,这些值是敏感、太大或不需要的。

    以下示例配置了在端口 1521 上连接到 Oracle 主机 IP 地址的 Debezium 连接器。此主机有一个名为 ORCLCDB 的数据库,server1 是服务器的逻辑名称。

    Oracle inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: inventory-connector-oracle 1
      labels:
        strimzi.io/cluster: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: 'true'
    spec:
      class: io.debezium.connector.oracle.OracleConnector 2
      config:
        database.hostname: <oracle_ip_address> 3
        database.port: 1521 4
        database.user: c##dbzuser 5
        database.password: dbz 6
        database.dbname: ORCLCDB 7
        database.pdb.name : ORCLPDB1, 8
        topic.prefix: inventory-connector-oracle 9
        schema.history.internal.kafka.bootstrap.servers: kafka:9092 10
        schema.history.internal.kafka.topic: schema-changes.inventory 11

    表 2.123. 连接器配置设置的描述
    描述

    1

    当使用 Kafka Connect 服务注册时,连接器的名称。

    2

    此 Oracle 连接器类的名称。

    3

    Oracle 实例的地址。

    4

    Oracle 实例的端口号。

    5

    Oracle 用户的名称,如 为 连接器创建用户 中所述。

    6

    Oracle 用户的密码,如为 连接器创建用户 中所述。

    7

    要从中捕获更改的数据库的名称。

    8

    连接器从中捕获更改的 Oracle 可插拔数据库的名称。仅在容器数据库(CDB)安装中使用。

    9

    主题前缀标识,并为 Oracle 数据库服务器提供命名空间,连接器从中捕获更改。

    10

    此连接器用来将 DDL 语句写入并恢复到数据库 schema 历史记录主题的 Kafka 代理列表。

    11

    连接器写入和恢复 DDL 语句的数据库架构历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。

  3. 使用 Kafka Connect 创建连接器实例。例如,如果您在 inventory-connector.yaml 文件中保存 KafkaConnector 资源,您将运行以下命令:

    oc apply -f inventory-connector.yaml

    前面的命令注册 inventory-connector,连接器开始针对 KafkaConnector CR 中定义的 server1 数据库运行。

有关您可以为 Debezium Oracle 连接器设置的配置属性的完整列表,请参阅 Oracle 连接器属性

结果

连接器启动后,它会执行为连接器配置的 Oracle 数据库的一致性快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流传输到 Kafka 主题。

2.5.5.5. 配置容器数据库和非容器数据库

Oracle 数据库支持以下部署类型:

容器数据库(CDB)
可以包含多个可插拔数据库(PDB)的数据库。数据库客户端连接到每个 PDB,就像它是标准的非CDB 数据库一样。
非容器数据库(非CDB)
标准 Oracle 数据库,不支持创建可插拔数据库。

使用 mTLS 安全连接

当使用 mutual TLS 身份验证(mTLS)连接到 Oracle 时,这涉及连接器和数据库服务提供身份。Oracle 连接器的 Debezium 依赖于 Oracle JDBC 驱动程序的内置功能来支持 mTLS 身份验证。

您可以使用以下任一方法在 Debezium 和 Oracle 之间建立 mTLS 连接:

2.5.5.6. 使用 Java 密钥和信任存储将 Oracle 连接器配置为使用 mTLS

先决条件

  • 验证连接器可以访问配置的密钥和信任存储文件。
  • 验证数据库 TNS (Transparent Network Substrate)侦听器是否支持 TCPS 安全连接。

流程

  • 配置 Debezium Oracle 连接器,如下例所示。

    示例:Debezium Oracle 连接器 TLS 配置

    {
      "database.url": "jdbc:oracle:thin@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcps)(HOST=<host>)(PORT=<port>))(CONNECT_DATA(SERVICE_NAME=<service>)))",
      "driver.javax.net.ssl.keyStore": "<path-to-jks-keystore>",
      "driver.javax.net.ssl.keyStorePassword": "<keystore-password>",
      "driver.javax.net.ssl.keyStoreType": "JKS",
      "driver.javax.net.ssl.trustStore": "<path-to-jks-truststore>",
      "driver.javax.net.ssl.trustStorePassword": "<truststore-password>",
      "driver.javax.net.ssl.trustStoreType": "JKS"
    }

    注意

    对于使用 TLS 加密与 Oracle 通信的 Debezium,需要与服务器的 TCPS 连接。要建立 TCPS 连接,您必须将连接器配置为使用 database.url 属性,而不是 database.host 属性。与 database.host 属性不同,database.url 属性允许您定义明确需要使用 TCPS 协议的 Oracle TNS (Transparent Network Substrate)连接字符串。

2.5.5.7. 使用 Oracle wallet 将连接器配置为使用 mTLS

先决条件

  • 使用数据库管理员验证 Oracle 数据库服务器上是否已配置 Oracle Wallet
  • 在 Oracle JDBC 驱动程序存档中找到 oraclepki.jar

流程

  • 在同一位置安装 oraclepki.jar,其中 $Debezium Debezium Oracle 连接器 jars 存在。如果您下载并安装 Oracle JDBC 驱动程序,则同一位置是您也会放置 oraclepki.jar
  • 使用 database.url 配置属性而不是 database.hostname 来配置连接器。通过使用 database.url,提供了一个基于 TNS 的配置,以便与 Oracle Wallet 交互。示例配置如下所示:
  • 将 Oracle JDBC 驱动程序属性 oracle.net.wallet_location 设置为明确设置 Oracle Wallet 配置供 Oracle JDBC 驱动程序使用。

mTLS 配置示例

{
  "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=xxxx)(HOST=xx.xx.xx.xx))(CONNECT_DATA=(SID=xxxx)))",
  "driver.oracle.net.wallet_location": "(SOURCE=(METHOD=file)(METHOD_DATA=(DIRECTORY=/opt/kafka/external-configuration/oracle_wallet/)))"
}

警告

务必在 database.url 中设置正确的主机、端口和服务标识符(sid)。此外,请确保 driver.oracle.net.wallet_location 中的目录可读。

2.5.5.8. 验证 Debezium Oracle 连接器是否正在运行

如果连接器正确启动且没有错误,它会为每个表创建一个主题,这些表配置为捕获连接器。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。

要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:

  • 验证连接器状态。
  • 验证连接器是否生成主题。
  • 验证主题是否填充了用于读取操作的事件("op":"r"),连接器在每个表的初始快照过程中生成的。

先决条件

  • 在 OpenShift 中,Debezium 连接器部署到 Streams for Apache Kafka。
  • 已安装 OpenShift oc CLI 客户端。
  • 访问 OpenShift Container Platform web 控制台。

流程

  1. 使用以下方法之一检查 KafkaConnector 资源的状态:

    • 在 OpenShift Container Platform Web 控制台中:

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaConnector
      3. KafkaConnectors 列表中,点您要检查的连接器名称,如 inventory-connector-oracle
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc describe KafkaConnector <connector-name> -n <project>

        例如,

        oc describe KafkaConnector inventory-connector-oracle -n debezium

        该命令返回与以下输出类似的状态信息:

        例 2.36. KafkaConnector 资源状态

        Name:         inventory-connector-oracle
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-oracle
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-oracle.inventory
            inventory-connector-oracle.inventory.addresses
            inventory-connector-oracle.inventory.customers
            inventory-connector-oracle.inventory.geom
            inventory-connector-oracle.inventory.orders
            inventory-connector-oracle.inventory.products
            inventory-connector-oracle.inventory.products_on_hand
        Events:  <none>
  2. 验证连接器是否已创建 Kafka 主题:

    • 通过 OpenShift Container Platform Web 控制台。

      1. 导航到 Home Search
      2. Search 页面上,单击 Resources 以打开 Select Resource 框,然后键入 KafkaTopic
      3. KafkaTopics 列表中,单击要检查的主题的名称,例如 inventory-connector-oracle.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc get kafkatopics

        该命令返回与以下输出类似的状态信息:

        例 2.37. KafkaTopic 资源状态

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-oracle--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-oracle.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-oracle.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-oracle.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-oracle.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-oracle.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-oracle.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. 检查主题内容。

    • 在终端窗口中输入以下命令:
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    例如,

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-oracle.inventory.products_on_hand

    指定主题名称的格式与步骤 1 中返回的 oc describe 命令相同,例如 inventory-connector-oracle.inventory.addresses

    对于主题中的每个事件,命令会返回类似以下输出的信息:

    例 2.38. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"oracle","name":"inventory-connector-oracle","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"oracle-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}

    在前面的示例中,有效负载 值显示连接器快照从表 inventory.products_on_hand 生成一个读取("op" ="r")事件。product_id 记录的 "before" 状态为 null,表示记录没有之前的值。"after" 状态对于 product_id101 的项目的 quantity 显示为 3

2.5.6. Debezium Oracle 连接器配置属性的描述

Debezium Oracle 连接器有许多配置属性,可用于为应用程序获得正确的连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:

所需的 Debezium Oracle 连接器配置属性

除非默认值可用 否则需要以下配置属性。

属性

默认

描述

name

没有默认值

连接器的唯一名称。尝试使用相同名称再次注册将失败。(所有 Kafka 连接连接器都需要此属性。)

connector.class

没有默认值

连接器的 Java 类的名称。对于 Oracle 连接器,始终使用 io.debezium.connector.oracle.oracleConnector

converters

没有默认值

枚举连接器可以使用的 自定义转换器 实例的符号链接名称列表。
例如,布尔值
需要此属性才能使连接器使用自定义转换器。

对于您为连接器配置的每个转换器,还必须添加一个 .type 属性,它指定实现转换器接口的类的完全限定域名。.type 属性使用以下格式:

<converterSymbolicName>.type

例如,

boolean.type: io.debezium.connector.oracle.converters.NumberOneToBooleanConverter

如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数来将值传递给转换器。要将任何其他配置参数与转换器关联,请为参数名称添加转换器符号名称前缀。

例如,要定义一个 selector 参数,用于指定 布尔值 转换器进程的列子集,请添加以下属性:

boolean.selector: .*MYTABLE.FLAG,.*.IS_ARCHIVED

tasks.max

1

为此连接器创建的最大任务数量。Oracle 连接器始终使用单个任务,因此不使用这个值,因此始终可以接受默认值。

database.hostname

没有默认值

Oracle 数据库服务器的 IP 地址或主机名。

database.port

没有默认值

Oracle 数据库服务器的整数端口号。

database.user

没有默认值

连接器用来连接到 Oracle 数据库服务器的 Oracle 用户帐户的名称。

database.password

没有默认值

连接到 Oracle 数据库服务器时要使用的密码。

database.dbname

没有默认值

要连接的数据库的名称。在容器数据库环境中,指定根容器数据库(CDB)的名称,而不是包含的可插拔数据库(PDB)的名称。

database.url

没有默认值

指定原始数据库 JDBC URL。使用此属性提供定义该数据库连接的灵活性。有效值包括原始 TNS 名称和 RAC 连接字符串。

database.pdb.name

没有默认值

要连接的 Oracle 可插拔数据库的名称。仅将此属性与容器数据库(CDB)安装一起使用。

topic.prefix

没有默认值

为 Oracle 数据库服务器提供命名空间的主题前缀,连接器会捕获更改。您设置的值用作连接器发出的所有 Kafka 主题名称的前缀。指定在 Debezium 环境中所有连接器之间唯一的主题前缀。以下字符有效:字母数字字符、连字符、点和下划线。

警告

不要更改此属性的值。如果您更改了 name 值,重启后,而不是继续向原始主题发送事件,连接器会将后续事件发送到名称基于新值的主题。连接器也无法恢复其数据库架构历史记录主题。

database.connection.adapter

LogMiner

连接器在流数据库更改时使用的适配器实现。您可以设置以下值:

LogMiner (默认)
连接器使用原生 Oracle LogMiner API。
xstream
连接器使用 Oracle XStream API。

snapshot.mode

Initial

指定连接器用来获取捕获表快照的模式。您可以设置以下值:

always
快照包括捕获的表的结构和数据。指定此值,为每个连接器启动时从捕获的表中填充主题。
Initial
快照包括捕获的表的结构和数据。使用从捕获的表中数据的完整表示,指定这个值来填充主题。如果快照成功完成,则不会再次执行下一个连接器启动快照。
initial_only
快照包括捕获的表的结构和数据。连接器执行初始快照,然后停止,而无需处理任何后续更改。
schema_only
弃用,请参阅 no_data
no_data
快照仅包含捕获的表的结构。如果您希望连接器只捕获快照后发生的更改,请指定这个值。
schema_only_recovery
弃用,请参阅恢复
recovery
这是已经捕获更改的连接器的恢复设置。当您重启连接器时,此设置启用恢复已损坏或丢失的数据库 schema 历史记录主题。您可以定期将其设置为 "clean up" 是一个意外增长的数据库 schema 历史记录主题。数据库架构历史记录主题需要无限保留。请注意,只有保证不会发生架构更改时,这个模式才安全使用,因为连接器在之前关闭的时间以及生成快照的时间点时,这个模式才安全。

快照完成后,连接器将继续从数据库的红色日志中读取更改事件,除非 snapshot.mode 配置为 initial_only

when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

如需更多信息,请参阅 snapshot.mode 选项表

snapshot.locking.mode

shared

控制连接器保存表锁定的时长。表锁定可防止在连接器执行快照时发生某些类型的更改表操作。您可以设置以下值:

shared
启用对表的并发访问,但会阻止任何会话获取专用表锁定。连接器在捕获表 schema 时获取 ROW SHARE 级别锁定。
none
防止连接器在快照过程中获取任何表锁定。只有在创建快照时,仅在没有模式更改时才使用此设置。

snapshot.query.mode

select_all

指定连接器在执行快照时如何查询数据。
设置以下选项之一:

select_all
连接器默认执行 选择所有查询,可以选择根据列包含和排除列表配置调整所选列。

与使用 snapshot.select.statement.overrides 属性相比,此设置可让您以更灵活的方式管理快照内容。

snapshot.include.collection.list

在连接器 table.include.list 属性中指定的所有表。

可选的、以逗号分隔的正则表达式列表,与表的完全限定名称(<databaseName>. <schemaName&gt; . &lt;tableName&gt;)匹配,包括在快照中。

在多租户容器数据库(CDB)环境中,正则表达式必须包含 可插拔数据库(PDB)名称,格式为 < pdbName> . <schemaName&gt; . < tableName>

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。
在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。

快照只能包含在连接器的 table.include.list 属性中命名的表。

只有在连接器的 snapshot.mode 属性设置为 never 以外的值时,此属性才会生效。
此属性不会影响增量快照的行为。

snapshot.select.statement.overrides

没有默认值

指定要包含在快照中的表行。如果您希望快照只包括表中的行的子集,请使用该属性。此属性仅影响快照。它不适用于连接器从日志读取的事件。

属性包含以逗号分隔的完全限定表名称列表,格式为 < schemaName>.<tableName&gt;。例如,

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

用于列表中的每个表,添加一个进一步的配置属性,指定连接器在获取快照时在表上运行的 SELECT 语句。指定的 SELECT 语句决定了快照中包含的表行子集。使用以下格式指定这个 SELECT 语句属性的名称:

snapshot.select.statement.overrides. <schemaName> . &lt;tableName&gt;

例如, snapshot.select.statement.overrides.customers.orders

示例:

从包括软删除列 delete_flagcustomers.orders 表中,如果您希望快照只包含没有软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器只包含 delete_flag = 0 的记录。

schema.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您要 捕获更改的模式名称匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。没有包含在 schema. include.list 中的架构 名称都会被捕获。默认情况下,所有非系统模式都会捕获其更改。

要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配,它与 schema 名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 schema.exclude.list 属性。

include.schema.comments

false

布尔值指定连接器是否应该在元数据对象上解析和发布表和列注释。启用此选项将对内存用量产生影响。逻辑模式对象的数量和大小非常大会影响 Debezium 连接器消耗的内存量,并在每个对象中添加潜在的大型字符串数据可能非常昂贵。

schema.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与 您不想 捕获更改的模式名称匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。
任何名称不包含在 schema. exclude.list 中的模式 都会捕获其更改,但系统模式除外。

要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配,它与 schema 名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,请不要设置'schema.include.list' 属性。

table.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与要捕获的表的完全限定表标识符匹配。如果您使用 LogMiner 实现,则只使用带有此属性的 POSIX 正则表达式。当设置此属性时,连接器只从指定的表中捕获更改。每个表标识符都使用以下格式:

<schema_name>.<table_name>

默认情况下,连接器会监控每个捕获的数据库中的每个非系统表。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。
如果您在配置中包含此属性,不要设置 table.exclude.list 属性。

table.exclude.list

没有默认值

可选的正则表达式列表,该表达式与要从监控中排除的表的完全限定表标识符匹配。如果您使用 LogMiner 实现,则只使用带有此属性的 POSIX 正则表达式。连接器从 exclude 列表中没有指定的任何表捕获更改事件。使用以下格式指定每个表的标识符:

<schemaName>.<tableName&gt;。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。
如果您在配置中包含此属性,不要设置 table.include.list 属性。

column.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与 change 事件消息值中包含的列的完全限定域名匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。列的完全限定域名使用以下格式:

<Schema_name>.<table_name>.<column_name>

The primary key 列始终包含在事件键中,即使您没有使用此属性来显式包含其值。

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列中的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 column.exclude.list 属性。

column.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您要从更改事件消息值中排除的列的完全限定域名匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。完全限定列名称使用以下格式:

<schema_name>.<table_name>.<column_name>

主键列始终包含在事件键中,即使您使用此属性显式排除其值。

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列中的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,请不要设置 column.include.list 属性。

skip.messages.without.change

false

指定在包含的列中没有更改时跳过发布消息。如果列没有包括每个 column.include.listcolumn.exclude.list 属性的更改,这将过滤消息。

column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。列的完全限定域名格式为 <schemaName>.<tableName>.<columnName>.
要匹配 Debezium 的名称,请使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。
在生成的更改事件记录中,指定列的值替换为 pseudonyms。

一个 pseudonym,它包括了通过应用指定的 hashAlgorithmsalt 的结果的哈希值。根据使用的 hash 功能,引用完整性会被维护,而列值则替换为 pseudonyms。Java 加密架构标准算法 文档的 MessageDigest 部分中 描述了支持的哈希功能。

在以下示例中,CzQMA0cB5K 是一个随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如有必要,pseudonym 会自动缩短到列的长度。连接器配置可以包含多个属性,以指定不同的哈希算法和 salt。

根据使用的 hashAlgorithm,所选的 salt 和实际数据集,可能无法完全屏蔽。

应该使用哈希策略版本 2 来确保在不同位置或系统中对值进行哈希处理。

binary.handling.mode

bytes

指定在更改事件中二进制(blob)列如何表示更改事件,包括: bytes 代表二进制数据作为 字节数组(默认),base64 代表二进制数据作为 base64 编码的字符串,base64 编码字符串,base64-url-safe -encoded String 表示二进制数据。

schema.name.adjustment.mode

none

指定应该如何调整模式名称,以便与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是转义序列,如 Java 中的反斜杠

field.name.adjustment.mode

none

指定应如何调整字段名称,以便与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是转义序列,如 Java 中的反斜杠

如需了解更多详细信息,请参阅 Avro 命名

decimal.handling.mode

precise

指定连接器应如何处理 NUMBERDECIMALNUMERIC 列的浮点值。您可以设置以下选项之一:

precise (默认)
使用二进制格式更改事件中的 java.math.BigDecimal 值来精确表示的值。
double
使用 值表示值。使用 值比较简单,但可能会导致精度丢失。
字符串
将值编码为格式化字符串。使用 字符串 选项更易于使用,但会导致语义信息丢失实际类型。更多信息请参阅 数字类型

interval.handling.mode

numeric

指定连接器如何处理 interval 列的值:

numeric 代表使用大约微秒数的间隔。

string 代表间隔,使用 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 代表。例如: P1Y2M3DT4H5M6.78S

event.processing.failure.handling.mode

fail

指定连接器在处理事件时应如何响应异常。您可以设置以下选项之一:

fail
传播异常(代表有问题的事件的偏移),从而导致连接器停止。
warn
导致跳过有问题的事件。然后会记录有问题的事件的偏移。
skip
导致跳过有问题的事件。

max.batch.size

2048

一个正整数值,用于指定要在此连接器的每个批处理事件的最大大小。

max.queue.size

8192

正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会在将事件写入 Kafka 前将事件放在阻塞队列中。当连接器比将信息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可能会提供从数据库读取更改事件的情况。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。

max.queue.size.in.bytes

0 (禁用)

指定阻塞队列的最大卷的长整数值,以字节为单位。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。
如果也设置了 max.queue.size,当队列的大小达到由任一属性指定的限制时,写入队列会被阻断。例如,如果您设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 个记录后写入队列会被阻断,或者在队列中的记录卷达到 5000 字节。

poll.interval.ms

500 (0.5 second)

正整数值指定连接器在每次迭代过程中应等待的毫秒数,以便出现新的更改事件。

include.schema.changes

true

布尔值指定连接器是否将数据库 schema 中的更改发布到 Kafka 主题,其名称与主题前缀相同。连接器使用包含数据库名称的键记录每个架构更改,以及一个描述 schema 更新的 JSON 结构的值。这个记录模式更改的机制独立于连接器的内部记录数据库架构历史记录。

tombstones.on.delete

true

控制 delete 事件后跟一个 tombstone 事件。可能会有以下值:

true
对于每个删除操作,连接器会发出 delete 事件和后续的 tombstone 事件。
false
对于每个删除操作,连接器只发出 delete 事件。

删除源记录后,tombstone 事件(默认行为)可让 Kafka 完全删除在启用了 日志压缩的主题中已删除行的密钥的所有事件

message.key.columns

没有默认值

一个表达式列表,用于指定连接器用来组成自定义消息键的表达式列表,用于更改它发布到指定表的 Kafka 主题。

默认情况下,Debebe 使用表的主键列作为发出的记录的消息键。使用默认键,或者为缺少主密钥的表指定一个键,您可以根据一个或多个列配置自定义消息密钥。
要为表建立自定义消息密钥,请列出表,后跟要用作消息键的列。每个列表条目都采用以下格式:

< fullyQualifiedTableName> : & lt;keyColumn&gt;, <keyColumn>

To base a table key on multiple column name, insert commas between the column name.
每个完全限定表名称都是以下格式的正则表达式:

<schemaName> . & lt;tableName>

属性可以包含多个表的条目。使用分号分隔列表中的表条目。
以下示例为表 inventory.customerspurchase.orders 设置了消息键:

inventory.customers:pk1,pk2; (configured).purchaseorders:pk3,pk4

用于表 inventory.customer,列 pk1pk2 指定为消息键。对于任何模式中的 purchaseorders 表,列 pk3pk4 服务器作为消息键。
对用来创建自定义消息键的列数量没有限制。但是,最好使用指定唯一密钥所需的最小数量。

column.truncate.to.length.chars

没有默认值

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将 length 设置为一个正整数,替换在属性名称中的 length 指定的星号(*)的数量列中的数据。将 length 设置为 0 (zero),将指定列中的数据替换为空字符串。

一个列的完全限定名称会观察以下格式:< schemaName> . <tableName &gt; . <columnName&gt;。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

您可以在单个配置中指定多个长度不同的属性。

column.mask.with.length.chars

没有默认值

可选的、以逗号分隔的正则表达式列表,用于对更改事件中的列名称进行掩码处理,将字符替换为星号 (*)。
指定要在属性名称中替换的字符数,如 column.mask.with.8.chars
将 length 指定为正整数或零。然后,在要应用掩码的每个基于字符的列名称中添加正则表达式。
使用以下格式指定完全限定列名称:< schemaName> . <tableName&gt; . & lt;columnName&gt;。

连接器配置可以包含多个属性,以指定不同的长度。

column.propagate.source.type

没有默认值

可选的、以逗号分隔的正则表达式列表,与您希望连接器发送代表列元数据的额外参数匹配。当设置此属性时,连接器将以下字段添加到事件记录的 schema 中:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发送此额外数据有助于在 sink 数据库中正确调整特定数字或基于字符的列。

一个列的完全限定名称会观察以下格式之一: < tableName> . <columnName> , 或 &lt; schemaName&gt ; . <tableName& gt; . &lt;columnName&gt;。
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

datatype.propagate.source.type

没有默认值

可选的、以逗号分隔的正则表达式列表,用于指定为数据库中列定义的数据类型的完全限定名称。当设置此属性时,对于带有匹配数据类型的列,连接器会发出事件记录,该记录在其 schema 中包含以下额外字段:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发送此额外数据有助于在 sink 数据库中正确调整特定数字或基于字符的列。

一个列的完全限定名称会观察以下格式之一: < tableName> . <typeName> , 或 &lt; schemaName&gt ; . <tableName& gt; . &lt;typeName&gt;。
要匹配数据类型的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与数据类型的整个名称字符串匹配;表达式不匹配类型名称中可能存在的子字符串。

有关特定于 Oracle 数据类型名称的列表,请参阅 Oracle 数据类型映射

heartbeat.interval.ms

0

指定(以毫秒为单位)连接器将信息发送到 heartbeat 主题的频率。
使用此属性确定连接器是否继续从源数据库接收更改事件。
当捕获的表中没有发生更改事件时,它也可用于设置属性。
在这种情况下,虽然连接器继续读取 redo 日志,但它会发出任何更改事件信息,以便 Kafka 主题中的偏移保持不变。因为连接器不会清除从数据库读取的最新系统更改号(SCN),数据库可能会保留大于必要的红色日志文件。如果连接器重启,扩展保留周期可能会导致连接器冗余地发送一些更改事件。
默认值 0 可防止连接器发送任何 heartbeat 信息。

heartbeat.action.query

没有默认值

指定当连接器发送心跳消息时连接器在源数据库上执行的查询。

例如:

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

连接器会在发出 heartbeat 消息 后运行查询。

设置此属性并创建一个心跳表,以接收心跳信息,以解决 Debezium 无法同步与高流量数据库位于同一主机上的低流量数据库的偏移 的情况。在连接器将记录插入到配置的表中后,它能够接收来自 low-traffic 数据库的更改,并确认数据库中的 SCN 更改,以便偏移可以与代理同步。

snapshot.delay.ms

没有默认值

指定连接器在拍摄快照前等待的时间间隔(毫秒)。
使用此属性来防止集群中启动多个连接器时快照中断,这可能会导致连接器重新平衡。

streaming.delay.ms

0

指定连接器在完成快照后延迟流过程启动的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后马上重启快照,但在流传输过程开始前。设置一个延迟值,它高于为 Kafka Connect worker 设置的 offset.flush.interval.ms 属性的值。

snapshot.fetch.size

10000

指定在拍摄快照时应在每个表中读取的最大行数。连接器以指定大小的多个批处理读取表内容。

query.fetch.size

10000

指定给定查询的每个数据库往返获取的行数。使用 0 值将使用 JDBC 驱动程序的默认获取大小。

provide.transaction.metadata

false

如果您希望 Debezium 生成带有事务边界的事件,并使用事务元数据丰富数据事件,将属性设置为 true

如需了解更多详细信息,请参阅 Transaction Metadata

log.mining.strategy

online_catalog

指定控制 Oracle LogMiner 构建并使用给定数据字典解析表和列 ID 的 mining 策略。
设置以下选项之一:

redo_log_catalog
将数据字典写入在线红色日志,从而导致随着时间的推移生成更多归档日志。这也启用了针对所捕获的表跟踪 DDL 更改,因此,如果架构更改频繁是理想的选择。
online_catalog
使用数据库的当前数据字典来解析对象 ID,且不会将任何额外信息写入在线恢复日志中。这允许 LogMiner 大幅减去,但代价是无法跟踪 DDL 更改。如果捕获的表不经常或永远不会变化,这是理想的选择。
hybrid
使用数据库当前数据字典和 Debezium 内存中模式模型的组合来无缝解析表和列名称。此模式在 online_catalog LogMiner 策略的级别执行,具有 redo_log_catalog 策略的模式跟踪弹性,而不产生了 redo_log_catalog 策略的存档日志生成和性能成本的开销。

log.mining.query.filter.mode

none

指定控制如何构建 Oracle LogMiner 查询的 mining 查询模式。
设置以下选项之一:

none
在查询中不执行任何模式、表或用户名过滤的情况下会生成查询。
in
查询是使用标准 SQL in-clause 生成的,用于过滤数据库端的模式、表和用户名。模式、表和用户名配置包括/排除列表不应指定任何正则表达式,因为查询是直接使用这些值构建的。
regex
该查询使用 Oracle 的 REGEXP_LIKE 操作器生成,以过滤数据库端的模式和表名称,以及使用 SQL in-clause 的用户名。schema 和 table 配置 include/exclude 列表可以安全地指定正则表达式。

log.mining.buffer.type

内存

缓冲区类型控制连接器管理缓冲区事务数据的方式。

内存 - 使用 JVM 进程的堆来缓冲所有事务数据。如果您不预期连接器处理大量长时间运行或大型事务,请选择这个选项。当这个选项处于活跃状态时,缓冲区状态不会在重启后保留。重启后,从当前偏移的 SCN 值重新创建缓冲区。

log.mining.buffer.transaction.events.threshold

0

事务在事务缓冲区中可以具有的最大事件数。超过这个阈值的事件计数的事务不会发出,并会被取消。默认行为是没有事务事件阈值。

log.mining.session.max.ms

0

在使用新会话前,LogMiner 会话可以处于活跃状态的最大毫秒数。

对于低卷系统,当同一会话用于长时间时,LogMiner 会话可能会消耗太多 PGA 内存。默认行为是仅在检测到日志切换时使用新的 LogMiner 会话。通过将此值设置为大于 0 的内容,这指定了 LogMiner 会话在停止并启动来取消分配和重新分配 PGA 内存的最大毫秒数。

log.mining.restart.connection

false

指定 JDBC 连接是否关闭并在日志交换机上重新打开,还是在 mining 会话达到最长生命周期阈值时。

默认情况下,在日志交换机或最大会话生命周期之间不会关闭 JDBC 连接。
如果您使用 LogMiner 遇到过量 Oracle SGA 增长,则应该启用它。

log.mining.batch.size.min

1000

此连接器尝试从 redo/archive 日志读取的最小 SCN 间隔大小。

log.mining.batch.size.max

100000

从 redo/archive 日志读取时此连接器使用的最大 SCN 间隔大小。

log.mining.batch.size.increment

20000

增加/减少连接器用于从 redo/archive 日志读取的时间间隔。

log.mining.batch.size.default

20000

连接器用于从 redo/archive 日志读取数据的起始 SCN 间隔大小。这也服务器作为调整批处理大小的一种措施 - 当当前 SCN 和批处理开始/端 SCN 之间的区别大于这个值时,批处理大小会增加/减少。

log.mining.sleep.time.min.ms

0

从 redo/archive 日志读取数据后,连接器在从 redo/archive 日志读取后,以及再次读取数据前的最短时间。值以毫秒为单位。

log.mining.sleep.time.max.ms

3000

在从 redo/archive 日志读取数据后,连接器在从 redo/archive 日志读取数据后,以及再次开始读取数据前,连接器会处于睡眠状态的最长时间。值以毫秒为单位。

log.mining.sleep.time.default.ms

1000

从 redo/archive 日志读取数据后,连接器在从 redo/archive 日志读取后处于睡眠状态的开始时间,然后再重新开始读取数据。值以毫秒为单位。

log.mining.sleep.time.increment.ms

200

连接器用于在从 logminer 读取数据时调整最佳睡眠时间的最大时间量。值以毫秒为单位。

archive.log.hours

0

过去从 SYSDATE 到 mine 归档日志的小时数。当使用默认设置,连接器会减去所有归档日志。

log.mining.archive.log.only.mode

false

控制连接器是否从归档日志或在线恢复日志和存档日志(默认)的组合中减去更改。

redo 日志使用可在任何时间点上归档的环形缓冲。在频繁归档在线恢复日志的环境中,这可能会导致 LogMiner 会话失败。与重做日志相反,归档日志可以保证可靠。将这个选项设置为 true 以强制连接器只进行归档日志。在将连接器设置为最小归档日志后,正在提交的操作和连接器发送相关更改事件之间的延迟可能会增加。延迟程度取决于数据库配置为归档在线恢复日志的频率。

log.mining.archive.log.only.scn.poll.interval.ms

10000

连接器在轮询之间休眠的毫秒数,以确定启动系统更改号是否在存档日志中。如果没有启用 log.mining.archive.log.only.mode,则不会使用此设置。

log.mining.transaction.retention.ms

0

正整数值,用于指定在红色日志交换机之间保持长时间运行的事务的毫秒数。当设置为 0 时,事务会被保留,直到检测到提交或回滚为止。

默认情况下,LogMiner 适配器维护所有正在运行的事务的内存缓冲区。因为所有作为事务一部分的 DML 操作都会被缓冲,直到检测到提交或回滚前,应该避免长时间运行的事务,以便不会溢出该缓冲区。任何超过此配置值的事务都会被完全丢弃,连接器不会为作为事务一部分的操作发出任何消息。

archive.destination.name

没有默认值

使用 LogMiner 指定一个归档日志时要使用的 Oracle 存档目的地。

默认行为会自动选择第一个有效本地配置的目的地。但是,您可以通过提供目标名称来使用特定的目的地,例如 LOG_ARCHIVE_DEST_5

log.mining.username.include.list

没有默认值

要从 LogMiner 查询中包含的数据库用户列表。如果您希望捕获过程包含来自指定用户的更改,则设置此属性会很有用。

log.mining.username.exclude.list

没有默认值

要从 LogMiner 查询中排除的数据库用户列表。如果您希望捕获过程始终排除特定用户所做的更改,则设置此属性会很有用。

log.mining.scn.gap.detection.gap.size.min

1000000

指定一个值,连接器与当前和之前的 SCN 值之间的差别进行比较,以确定 SCN 差距是否存在。如果 SCN 值之间的区别大于指定的值,且时间差异小于 log.mining.scn.gap.detection.time.max.ms,则会检测到 SCN 差距,连接器使用大于配置的最大批处理的 mining 窗口。

log.mining.scn.gap.detection.time.interval.max.ms

20000

指定一个值,以毫秒为单位,连接器与当前和之前的 SCN 时间戳之间的差别进行比较,以确定 SCN 差距是否存在。如果时间戳之间的区别小于指定的值,并且 SCN delta 大于 log.mining.scn.gap.detection.gap.size.min,则检测到 SCN 差距,连接器使用大于配置的最大批处理的 mining 窗口。

log.mining.flush.table.name

LOG_MINING_FLUSH

指定协调将 Oracle LogWriter Buffer (LGWR)刷新表到红色日志的 flush 表的名称。可以使用 < schemaName> . <tableName&gt; 或<tableName &gt ; 格式来指定 此名称。通常,多个连接器可以使用相同的 flush 表。但是,如果连接器遇到表锁定竞争错误,请使用此属性为每个连接器部署指定一个专用表。

log.mining.include.redo.sql

false

指定在 source.redo_sql 字段中是否包含红色日志构建的 SQL 语句。使用 XStream 适配器时会忽略此配置。

lob.enabled

false

控制在更改事件中是否发出大型对象(CLOB 或 BLOB)列值。

默认情况下,更改事件具有大对象列,但列不包含任何值。处理和管理大型对象列类型和有效负载需要一定的开销。要捕获大型对象值并在更改事件中序列化它们,请将此选项设置为 true

unavailable.value.placeholder

__debezium_unavailable_value

指定连接器提供的常量,表示原始值没有变化,不是由数据库提供。

rac.nodes

没有默认值

以逗号分隔的 Oracle Real Application Clusters (RAC)节点主机名或地址列表。需要此字段才能启用与 Oracle RAC 部署的兼容性。

使用以下方法之一指定 RAC 节点列表:

  • database.port 指定一个值,并使用 rac.nodes 列表中每个地址的指定端口值。例如:

    database.port=1521
    rac.nodes=192.168.1.100,192.168.1.101
  • database.port 指定一个值,并覆盖列表中一个或多个条目的默认端口。列表中可以包含使用默认 database.port 值的条目,以及定义其自身唯一端口值的条目。例如:

    database.port=1521
    rac.nodes=192.168.1.100,192.168.1.101:1522

如果您使用 database.url 属性为数据库提供原始 JDBC URL,而不是为 database.port 定义值,则每个 RAC 节点标签必须明确指定端口值。

skipped.operations

t

以逗号分隔的操作类型列表,您希望连接器在流传输过程中跳过。您可以将连接器配置为跳过以下类型的操作:

  • c (插入/创建)
  • u (update)
  • D (删除)
  • T (截断)

默认情况下,只跳过 truncate 操作。

signal.data.collection

没有默认值

用于向连接器发送信号的数据收集的完全限定名称。https://docs.redhat.com/documentation/en/red_hat_build_of_debezium/3.0.8/html-single/debezium_user_guide/index#debezium-signaling-enabling-source-signaling-channel当您将此属性与 Oracle 可插拔数据库(PDB)搭配使用时,请将其值设为根数据库的名称。
使用以下格式指定集合名称:
<databaseName> . <schemaName& gt; . &lt;tableName>

signal.enabled.channels

source

为连接器启用的信号通道名称列表。默认情况下,以下频道可用:

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

没有默认值

为连接器启用的通知频道名称列表。默认情况下,以下频道可用:

  • sink
  • log
  • jmx

incremental.snapshot.chunk.size

1024

连接器在增量快照块期间获取并读取的最大行数。增加块大小可提供更高的效率,因为快照会减少对更大大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。

incremental.snapshot.watermarking.strategy

insert_insert

指定连接器在增量快照中使用的水位线机制,以重复数据删除事件,这些事件可能会被增量快照捕获,然后在流恢复后重新捕获。
您可以指定以下选项之一:

insert_insert
当您发送一个信号来启动增量快照时,对于 Debezium 在快照期间读取的每个块,它会将条目写入信号数据收集来记录信号,以打开快照窗口。快照完成后,Debezium 会插入第二个条目,记录信号以关闭窗口。
insert_delete
当您发送一个信号来启动增量快照时,对于 Debezium 读取的每个块,它会将单个条目写入信号数据收集,以记录信号来打开快照窗口。快照完成后,会删除此条目。不会为关闭快照窗口的信号创建条目。设置这个选项以防止快速增长信号数据收集。

topic.naming.strategy

io.debezium.schema.SchemaTopicNamingStrategy

应用于确定数据更改的主题名称、模式更改、事务、心跳事件等主题名称,默认为 SchemaTopicNamingStrategy

topic.delimiter

.

指定主题名称的分隔符,默认为

topic.cache.size

10000

在绑定的并发散列映射中保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。

topic.heartbeat.prefix

__debezium-heartbeat

控制连接器发送心跳消息的主题名称。主题名称具有此模式:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

topic.transaction

transaction

控制连接器发送事务元数据消息的主题名称。主题名称具有此模式:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认的主题名称是 fulfillment.transaction

snapshot.max.threads

1

指定连接器在执行初始快照时使用的线程数量。要启用并行初始快照,请将 属性设置为大于 1 的值。在并行初始快照中,连接器同时处理多个表。

注意

当您启用并行初始快照时,执行每个表快照的线程可能需要不同的时间来完成它们的工作。如果一个表的快照需要比其他表的快照完成的时间要长得多,则线程已完成其工作闲置。在某些环境中,网络设备(如负载均衡器或防火墙)会终止闲置以延长的时间间隔的连接。快照完成后,连接器无法关闭连接,从而导致异常和不完整的快照,即使连接器成功传输所有快照数据。

如果您遇到这个问题,请将 snapshot.max.threads 的值恢复到 1,然后重试快照。

snapshot.database.errors.max.retries

0

指定发生数据库错误时重试尝试快照表的数量。此配置属性目前只重试与 ORA-01466 例外相关的失败。默认情况下,不会执行额外的重试。

custom.metric.tags

没有默认值

通过添加提供上下文信息的元数据来定义自定义 MBean 对象名称的标签。指定以逗号分隔的键值对列表。每个键代表 MBean 对象名称的标签,对应的值代表键的值,例如
k1=v1,k2=v2

连接器将指定的标签附加到基础 MBean 对象名称。标签可帮助您组织和分类指标数据。您可以定义标签来标识特定的应用程序实例、环境、区域、版本等。如需更多信息,请参阅自定义 MBean 名称

errors.max.retries

-1

指定连接器如何在生成 Retriable 错误的操作后响应,如连接错误。
设置以下选项之一:

-1
无限制。无论之前失败的数量如何,连接器总是自动重启,并重试操作。
0
disabled。连接器会立即失败,永远不会重试操作。重启连接器需要用户干预。
> 0
连接器会自动重启,直到达到指定的最大重试次数。下一次失败后,连接器会停止,用户需要干预才能重启它。

database.query.timeout.ms

600000

等待查询执行的时间(以毫秒为单位)。默认为 600 秒(600,000 ms);零表示没有限制。

Debezium Oracle 连接器数据库模式历史记录配置属性

Debezium 提供了一组 schema.history.internal.* 属性,用于控制连接器如何与 schema 历史记录主题进行交互。

下表描述了用于配置 Debezium 连接器的 schema.history.internal 属性。

表 2.124. 连接器数据库架构历史记录配置属性
属性默认描述

schema.history.internal.kafka.topic

没有默认值

连接器存储数据库 schema 历史记录的 Kafka 主题的完整名称。

schema.history.internal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索之前由连接器存储的数据库架构历史记录,以及写入从源数据库读取的每个 DDL 语句。每个对都应该指向 Kafka Connect 进程使用的相同 Kafka 集群。

schema.history.internal.kafka.recovery.poll.interval.ms

100

整数值,指定连接器在启动/恢复期间应等待的最大毫秒数,同时轮询持久数据。默认值为 100ms。

schema.history.internal.kafka.query.timeout.ms

3000

指定连接器在使用 Kafka admin 客户端获取集群信息时应等待的最大毫秒数。

schema.history.internal.kafka.create.timeout.ms

30000

指定连接器在使用 Kafka admin 客户端创建 kafka 历史记录主题时应等待的最大毫秒数。

schema.history.internal.kafka.recovery.attempts

100

连接器在连接器恢复失败前尝试读取持久性历史记录数据的次数上限,并显示错误。没有数据后等待的最长时间是 restore.attempts recovery. poll. poll.interval.ms

schema.history.internal.skip.unparseable.ddl

false

指定连接器是否应该忽略不正确的或未知数据库语句的布尔值,或停止处理,以便用户可以解决这个问题。安全默认为 false。跳过应仅谨慎使用,因为它可以在处理 binlog 时导致数据丢失或手动忽略。

schema.history.internal.store.only.captured.tables.ddl

false

指定连接器是否记录模式或数据库中所有表的布尔值,或者仅从指定为捕获的表记录。
指定以下值之一:

false (默认)
在数据库快照中,连接器记录了数据库中所有非系统表的 schema 数据,包括没有指定用于捕获的表。最好保留默认设置。如果您稍后决定从您最初为捕获的表捕获更改,则连接器可以轻松地从这些表中捕获数据,因为其模式结构已存储在 schema 历史记录主题中。Debezium 需要表的 schema 历史记录,以便它可识别在更改事件发生时存在的结构。
true
在数据库快照中,连接器只记录 Debezium 捕获更改事件的表模式。如果您更改了默认值,且稍后将连接器配置为从数据库中的其他表捕获数据,连接器缺少从表中捕获更改事件所需的模式信息。

schema.history.internal.store.only.captured.databases.ddl

false

指定连接器是否从数据库实例中的所有逻辑数据库记录模式结构的布尔值。
指定以下值之一:

true
连接器只记录逻辑数据库中表的架构结构,以及 Debezium 捕获更改事件的 schema。
false
连接器记录所有逻辑数据库的模式结构。

直通 Oracle 连接器配置属性

连接器支持 通过传递 属性,使 Debezium 指定自定义配置选项来微调 Apache Kafka producer 和消费者的行为。有关 Kafka 生成者和消费者的完整配置属性范围的详情,请参考 Kafka 文档

直通属性,用于配置生成者和消费者客户端如何与 schema 历史记录主题交互

Debezium 依赖于 Apache Kafka producer 将 schema 更改写入数据库 schema 历史记录主题。同样,它依赖于 Kafka 使用者在连接器启动时从数据库 schema 历史记录主题中读取。您可以通过将值分配给一组以 schema.history.internal.producer和 schema.history.internal.consumerPromQL 前缀开头的 pass-through 配置属性来定义 Kafka producer消费者 客户端的配置。pass-through producer 和 consumer 数据库模式历史记录属性控制一系列行为,如这些客户端如何与 Kafka 代理安全连接,如下例所示:

schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234

schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234

Debezium 在将属性传递给 Kafka 客户端前从属性名称中剥离前缀。

有关 Kafka producer 配置属性和 Kafka 使用者配置属性的更多信息,请参阅 Apache Kafka 文档。

用于通过属性配置 Oracle 连接器如何与 Kafka 信号主题交互

Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题进行交互。

下表描述了 Kafka 信号 属性。

表 2.125. Kafka 信号配置属性
属性默认描述

signal.kafka.topic

<topic.prefix>-signal

连接器监控用于临时信号的 Kafka 主题的名称。

注意

如果禁用 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号顺序。信号主题必须具有单个分区。

signal.kafka.groupId

kafka-signal

Kafka 用户使用的组 ID 的名称。

signal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机和端口对列表。每个对引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

signal.kafka.poll.timeout.ms

100

整数值,用于指定连接器在轮询信号时等待的最大毫秒数。

为信号频道配置 Kafka 消费者客户端的属性

Debezium 连接器提供信号 Kafka 使用者的直通配置。透传信号属性以 signals.consumer.* 前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 使用者。

Debezium 在将属性传递给 Kafka 信号消费者前从属性中剥离前缀。

用于配置 Oracle 连接器接收器通知频道的直通属性

下表描述了可用于配置 Debezium sink 通知 频道的属性。

表 2.126. sink 通知配置属性
属性默认描述

notification.sink.topic.name

没有默认值

从 Debezium 接收通知的主题名称。当您将 notification.enabled.channels 属性配置为包含 sink 作为启用的通知频道之一时,需要此属性。

Debezium 连接器传递数据库驱动程序配置属性

Debezium 连接器提供数据库驱动程序的直通配置。透传数据库属性以前缀 driverPROFILE 开头。例如,连接器将 driver.foobar=false 等属性传递给 JDBC URL。

Debezium 在将属性传递给数据库驱动程序前从属性中剥离前缀。

2.5.7. 监控 Debezium Oracle 连接器性能

Debezium Oracle 连接器除了 Apache Zookeeper、Apache Kafka 和 Kafka Connect 的内置支持外,还提供三种指标类型。

有关如何通过 JMX 公开这些指标的详情,请参考 监控文档

2.5.7.1. 用于 Oracle 连接器快照和流 MBean 对象的自定义名称

Debezium 连接器通过 MBean 名称为连接器公开指标。这些指标(特定于每个连接器实例)提供有关连接器快照、流和架构历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为每个不同的连接器指标生成一个唯一的 MBean 名称。要查看连接器进程的指标,您可以将可观察性堆栈配置为监控其 MBean。但是,这些默认 MBean 名称取决于连接器配置,但配置更改可能会导致对 MBean 名称的更改。对 MBean 名称的更改会破坏连接器实例和 MBean 之间的链接,并破坏监控活动。在这种情况下,如果要恢复监控,您必须重新配置 observability 堆栈以使用新的 MBean 名称。

要防止监控 MBean 名称更改结果的中断,您可以配置自定义指标标签。您可以通过在连接器配置中添加 custom.metric.tags 属性来配置自定义指标。属性接受键值对,其中每个键代表 MBean 对象名称的标签,对应的值代表该标签的值。例如: k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称中。

为连接器配置 custom.metric.tags 属性后,您可以配置 observability 堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签,而不是可变 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了它如何构建 MBean 名称,或者连接器配置更改中的 topic.prefix,则指标集合不会中断,因为指标提取任务使用指定的标签模式来识别连接器。

使用自定义标签的更多优点是,您可以使用反映数据管道架构的标签,以便以适合您操作需求的方式组织指标。例如,您可以使用声明连接器活动类型、应用程序上下文或数据源的值来指定标签,如 db1-streaming-for-application-abc。如果您指定多个键值对,则所有指定的对都会附加到连接器的 MBean 名称中。

以下示例演示了标签如何修改默认的 MBean 名称。

例 2.39. 自定义标签如何修改连接器 MBean 名称

默认情况下,Oracle 连接器使用以下 MBean 名称进行流传输指标:

debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>

如果将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 会生成以下自定义 MBean 名称:

debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

2.5.7.2. Debezium Oracle 连接器快照指标

MBeandebezium.oracle:type=connector-metrics,context=snapshot,server= <topic.prefix>

快照指标不会被公开,除非快照操作活跃,或者快照自上次连接器开始以来发生了某种情况。

下表列出了可用的快照指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个快照事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

此连接器自上一次启动或重置以来看到的事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在快照和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在快照和主 Kafka Connect 循环之间传递事件的队列的可用容量。

TotalTableCount

int

包括在快照中的表的总数。

RemainingTableCount

int

快照必须复制的表数。

SnapshotRunning

布尔值

快照是否已启动。

SnapshotPaused

布尔值

快照是否已暂停。

SnapshotAborted

布尔值

快照是中止的。

SnapshotCompleted

布尔值

快照是否完成。

SnapshotDurationInSeconds

long

快照到目前为止需要的秒数,即使未完成也是如此。也包括快照暂停的时间。

SnapshotPausedDurationInSeconds

long

快照暂停的秒数。如果快照暂停了多次,暂停的时间会增加。

RowsScanned

Map<String, Long>

包含快照中每个表扫描的行数的映射。在处理过程中,表会以递增方式添加到映射中。更新每 10,000 行扫描并完成表后。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

连接器还会在执行增量快照时提供以下附加快照指标:

属性类型描述

ChunkId

字符串

当前快照块的标识符。

ChunkFrom

字符串

定义当前块的主密钥集的低限。

ChunkTo

字符串

定义当前块的主密钥集的上限。

TableFrom

字符串

当前快照表的主密钥集合的低限。

TableTo

字符串

当前快照表的主键集合的上限。

2.5.7.3. Debezium Oracle 连接器流指标

MBeandebezium.oracle:type=connector-metrics,context=streaming,server= <topic.prefix>

下表列出了可用的流指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个流事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

源数据库自上次连接器启动后报告的数据更改事件总数,或者因为指标重置后。代表 Debezium 处理的数据更改工作负载。

TotalNumberOfCreateEventsSeen

long

自上次启动或指标重置以来,连接器处理的创建事件总数。

TotalNumberOfUpdateEventsSeen

long

自上次启动或指标重置以来,连接器处理的更新事件总数。

TotalNumberOfDeleteEventsSeen

long

自上次启动或指标重置后,连接器处理的删除事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在流器和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在流程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

Connected

布尔值

表示连接器当前是否已连接到数据库服务器的标记。

MilliSecondsBehindSource

long

最后一次更改事件的时间戳和连接器处理之间的毫秒数。这些值将纳入运行数据库服务器和连接器的机器上时钟之间的差别。

NumberOfCommittedTransactions

long

已提交的已处理事务的数量。

SourceEventPosition

Map<String, String>

最后一次接收的事件的协调。

LastTransactionId

字符串

最后一次处理的事务的事务标识符。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

Debezium Oracle 连接器还提供以下额外的流指标:

表 2.127. 其他流指标的描述
属性类型描述

CurrentScn

BigInteger

已处理的最新系统更改号。

OldestScn

BigInteger

事务缓冲区中最旧的系统更改号。

OldestScnAgeInMilliseconds

long

最旧的系统更改号(以毫秒为单位)。如果缓冲区为空,则该值将是 0。

CommittedScn

BigInteger

最后提交的系统更改事务缓冲区中的数字。

OffsetScn

BigInteger

系统更改号当前写入连接器的偏移量。

CurrentRedoLogFileName

string[]

当前减去的日志文件数组。

MinimumMinedLogCount

long

为任何 LogMiner 会话指定的最小日志数。

MaximumMinedLogCount

long

为任何 LogMiner 会话指定的最大日志数。

RedoLogStatus

string[]

每个 mined logfile 的当前状态数组,格式为 file |status

SwitchCounter

int

数据库执行最近一天的日志切换的次数。

LastCapturedDmlCount

long

最后的 LogMiner 会话查询中观察到的 DML 操作数量。

MaxCapturedDmlInBatch

long

处理单个 LogMiner 会话查询时观察到的最大 DML 操作数。

TotalCapturedDmlCount

long

观察到的 DML 操作总数。

FetchingQueryCount

long

执行的 LogMiner 会话查询(也称为批处理)的总数。

LastDurationOfFetchQueryInMilliseconds

long

最后一次 LogMiner 会话查询的持续时间(以毫秒为单位)。

MaxDurationOfFetchQueryInMilliseconds

long

任何 LogMiner 会话查询的最长持续时间(以毫秒为单位)。

LastBatchProcessingTimeInMilliseconds

long

处理最后一个 LogMiner 查询批处理的持续时间会导致毫秒。

TotalParseTimeInMilliseconds

long

解析 DML 事件 SQL 语句的时间(毫秒)。

LastMiningSessionStartTimeInMilliseconds

long

启动最后一个 LogMiner 会话的持续时间(毫秒)。

MaxMiningSessionStartTimeInMilliseconds

long

启动 LogMiner 会话的最长时间,以毫秒为单位。

TotalMiningSessionStartTimeInMilliseconds

long

连接器启动 LogMiner 会话的总持续时间(毫秒)。

MinBatchProcessingTimeInMilliseconds

long

单一 LogMiner 会话处理结果的最短持续时间(以毫秒为单位)。

MaxBatchProcessingTimeInMilliseconds

long

单一 LogMiner 会话处理结果的最大持续时间(以毫秒为单位)。

TotalProcessingTimeInMilliseconds

long

LogMiner 会话处理结果的总持续时间(毫秒)。

TotalResultSetNextTimeInMilliseconds

long

JDBC 驱动程序所花费的总持续时间(毫秒)从 log mining 视图获取下一行。

TotalProcessedRows

long

从日志 mining 视图中处理的所有会话的行总数。

BatchSize

int

log mining query per database round-trip 获取的条目数。

MillisecondToSleep betweenMiningQuery

long

从日志 mining 视图获取另一批结果前,连接器睡眠的毫秒数。

MaxBatchProcessingThroughput

long

从 log mining 视图处理的最大行/秒数。

AverageBatchProcessingThroughput

long

从日志 mining 处理的平均行/秒数。

LastBatchProcessingThroughput

long

从上一次批处理的 log mining 视图中处理的平均行/秒数。

NetworkConnectionProblemsCounter

long

检测到的连接问题数量。

HoursToKeepTransactionInBuffer

int

在被丢弃前,事务被连接器的内存中缓冲区保留的小时数,而不提交或回滚。如需更多信息,请参阅 log.mining.transaction.retention.ms

NumberOfActiveTransactions

long

事务缓冲区中当前活动事务的数量。

NumberOfCommittedTransactions

long

事务缓冲区中提交事务的数量。

NumberOfOversizedTransactions

long

丢弃的事务数量,因为其大小超过 log.mining.buffer.transaction.events.threshold

NumberOfRolledBackTransactions

long

事务缓冲区中回滚事务的数量。

NumberOfPartialRollbackCount

long

在提交的事务中回滚的事件数量,这意味着大多数用例中的约束违反情况。

CommitThroughput

long

事务缓冲区中每秒提交事务的平均数量。

RegisteredDmlCount

long

事务缓冲区中注册的 DML 操作数量。

LagFromSourceInMilliseconds

long

事务日志中变化和添加到事务缓冲区时的时间差(毫秒)。

MaxLagFromSourceInMilliseconds

long

事务日志中变化和添加到事务缓冲区时的最大时间差(毫秒)。

MinLagFromSourceInMilliseconds

long

事务日志中变化和添加到事务缓冲区时的最小时间差异(毫秒)。

AbandonedTransactionIds

string[]

由于其年龄,从事务缓冲区中删除的最新带外事务标识符的数组。详情请查看 log.mining.transaction.retention.ms

AbandonedTransactionCount

long

带外 事务 列表中当前条目数。

RolledBackTransactionIds

string[]

最近一次事务标识符的数组,已在事务缓冲区中减和回滚。

LastCommitDurationInMilliseconds

long

最后一次事务缓冲区提交操作的持续时间(以毫秒为单位)。

MaxCommitDurationInMilliseconds

long

最长事务缓冲区提交操作的持续时间(以毫秒为单位)。

ErrorCount

int

检测到的错误数量。

WarningCount

int

检测到的警告数量。

ScnFreezeCount

int

检查系统更改号以进行改进并保持不变的次数。高的值可能会表示持续运行较长的事务,并阻止连接器清除最近处理的系统更改号到连接器的偏移量。当条件为最佳时,该值应接近或等于 0。

UnparsableDdlCount

int

检测到的 DDL 记录数量,但无法被 DDL 解析器解析。这应该始终为 0 ;但是,当允许不可解析的 DDL 跳过时,可以使用此指标来确定任何警告是否已写入连接器日志。

MiningSessionUserGlobalAreaMemoryInBytes

long

当前最小会话的用户全局区域(UGA)内存消耗(以字节为单位)。

MiningSessionUserGlobalAreaMaxMemoryInBytes

long

所有 mining 会话的用户全局区域(UGA)内存消耗上限(以字节为单位)。

MiningSessionProcessGlobalAreaMemoryInBytes

long

当前 mining 会话的进程全局区域(PGA)内存消耗(以字节为单位)。

MiningSessionProcessGlobalAreaMaxMemoryInBytes

long

所有 mining 会话的进程全局区域(PGA)内存消耗上限(以字节为单位)。

2.5.7.4. Debezium Oracle 连接器模式历史记录指标

MBeandebezium.oracle:type=connector-metrics,context=schema-history,server= <topic.prefix>

下表列出了可用的模式历史记录指标。

属性类型描述

Status

字符串

STOPPED 之一RECOVERING (从存储恢复历史记录)、RUNNING 描述数据库架构历史记录的状态。

RecoveryStartTime

long

恢复开始的时间(以 epoch 秒为单位)。

ChangesRecovered

long

在恢复阶段读取的更改数量。

ChangesApplied

long

恢复和运行时应用的模式更改总数。

MilliSecondsSinceLast​RecoveredChange

long

自上次更改从历史记录存储中恢复后经过的毫秒数。

MilliSecondsSinceLast​AppliedChange

long

从上次更改被应用后经过的毫秒数。

LastRecoveredChange

字符串

从历史记录存储中恢复最后一次更改的字符串表示。

LastAppliedChange

字符串

最后一次应用更改的字符串表示。

2.5.8. 在 Debezium 中使用 Oracle XStream 数据库(开发者预览)

Debezium Oracle 连接器默认使用原生 Oracle LogMiner 更改。连接器可以被切换为使用 Oracle XStream。要将连接器配置为使用 Oracle XStream,您必须应用与 LogMiner 搭配使用的特定数据库和连接器配置。

重要

将 Debezium Oracle 连接器与 XStream 搭配使用是一个开发者预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。此软件可能没有任何文档,可以随时更改或删除,并且已获得有限的测试。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。

有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围

先决条件

  • 要使用 XStream API,您必须有黄金产品许可证。不需要安装 GoldenGate。

2.5.8.1. 准备用于 Debezium 的 Oracle XStream 数据库

Oracle XStream 所需的配置

ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list

exit;

此外,必须为捕获的表或数据库启用补充日志记录,以便数据更改捕获更改数据库行之前的状态。下面演示如何在特定表中配置此功能,这是尽可能减少 Oracle 红色日志中捕获的信息量的理想选择。

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

2.5.8.2. 为 Debezium Oracle 连接器创建 XStream 用户

Debezium Oracle 连接器要求使用特定权限设置用户帐户,以便连接器可以捕获更改事件。以下示例提供有关如何在多租户数据库模型中创建用户配置的信息。

2.5.8.2.1. 为 Debezium Oracle 连接器创建 XStream 管理员
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE USER c##dbzadmin IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_adm_tbs
    QUOTA UNLIMITED ON xstream_adm_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION, SET CONTAINER TO c##dbzadmin CONTAINER=ALL;

  BEGIN
     DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
        grantee                 => 'c##dbzadmin',
        privilege_type          => 'CAPTURE',
        grant_select_privileges => TRUE,
        container               => 'ALL'
     );
  END;
  /

  exit;
2.5.8.2.2. 为 Debezium Oracle 连接器创建 XStream 用户
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_tbs
    QUOTA UNLIMITED ON xstream_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  exit;

2.5.8.3. 创建 XStream 出站服务器

创建 XStream Outbound 服务器

创建 XStream Outbound Server

sqlplus c##dbzadmin/dbz@//localhost:1521/ORCLCDB
DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
    tables(1)  := NULL;
    schemas(1) := 'debezium';
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name     =>  'dbzxout',
    table_names     =>  tables,
    schema_names    =>  schemas);
END;
/
exit;

注意

当您将 XStream Outbound Server 设置为捕获来自可插拔数据库的更改时,请将可插拔数据库名称指定为 source_container_name 参数的值。

配置 XStream 用户帐户以连接到 XStream Outbound Server

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
BEGIN
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name  => 'dbzxout',
    connect_user => 'c##dbzuser');
END;
/
exit;

注意

单个 XStream Outbound 服务器无法由多个 Debezium Oracle 连接器共享。每个连接器都需要配置唯一的 XStream Outbound 连接器。

2.5.8.4. 将 Debezium 配置为使用 XStream 适配器

默认情况下,Debezium 使用 Oracle LogMiner 从 Oracle 中获取最新的更改事件。您可以调整连接器配置,以启用连接器以使用 Oracle XStream 适配器。

以下配置示例添加了属性 database.connection.adapterdatabase.out.server.name,以启用连接器以使用 XStream API 实现。

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.connection.adapter": "xstream",
        "database.out.server.name" : "dbzxout"
    }
}

2.5.8.5. 获取 Oracle JDBC 驱动程序和 XStream API 文件

Debezium Oracle 连接器需要 Oracle JDBC 驱动程序(ojdbc11.jar)来连接到 Oracle 数据库。如果连接器使用 XStream 来访问数据库,还必须有 XStream API (xstreams.jar)。许可要求禁止 Debezium 在 Oracle 连接器存档中包含这些文件。但是,所需文件可作为 Oracle Instant Client 的一部分提供。以下步骤描述了如何下载 Oracle Instant Client 并提取所需的文件。

流程

  1. 在浏览器中为您的操作系统下载 Oracle Instant Client 软件包
  2. 提取存档,然后打开 instantclient_<version> 目录。

    例如:

    instantclient_21_1/
    ├── adrci
    ├── BASIC_LITE_LICENSE
    ├── BASIC_LITE_README
    ├── genezi
    ├── libclntshcore.so -> libclntshcore.so.21.1
    ├── libclntshcore.so.12.1 -> libclntshcore.so.21.1
    
    ...
    
    ├── ojdbc11.jar
    ├── ucp.jar
    ├── uidrvci
    └── xstreams.jar
  3. 复制 ojdbc11.jarxstreams.jar 文件,并将它们添加到 < kafka_home&gt; /libs 目录中,如 kafka/libs
  4. 创建环境变量 LD_LIBRARY_PATH,并将其值设为 Instant Client 目录的路径,例如:

    LD_LIBRARY_PATH=/path/to/instant_client/

2.5.8.6. xstream 连接器属性

使用 XStream 需要以下配置属性,除非默认值可用。

属性

默认

描述

database.out.server.name

没有默认值

在数据库中配置的 XStream 出站服务器的名称。

2.5.8.7. Oracle Xstream 和 DBMS_LOB 软件包

Oracle 提供名为 DBMS_LOB 的数据库软件包,它由一系列程序组成,用于操作 BLOB、CLOB 和 NCLOB 列。大多数程序都以总计性操作 LOB 列,但一个程序 WRITEAPPEND 能够操作 LOB 数据缓冲区的子集。

使用 XStream 时,WRITEAPPEND 会为每个程序调用发送逻辑更改记录(LCR)事件。这些 LCR 事件不会合并到一个更改事件中,因为它们在使用 Oracle LogMiner 适配器时。因此,主题的用户可能会接收带有部分列值的事件。

2.5.9. Oracle 连接器常见问题

是否支持 Oracle 11g?
不支持 Oracle 11g,但我们的目标是以尽力为基础与 Oracle 11g 向后兼容。我们依赖社区与 Oracle 11g 沟通兼容性问题,并在识别回归时提供 bug 修复。
Oracle LogMiner 弃用了吗?
否,Oracle 只弃用了 Oracle LogMiner 中的 Oracle LogMiner 选项,并从 Oracle 19c 开始删除该选项。Debezium Oracle 连接器不依赖于这个选项来正常工作,因此可以安全地与较新的 Oracle 版本一起使用,而不影响。
如何更改偏移中的位置?

Debezium Oracle 连接器在偏移中维护两个关键值,一个名为 scn 的字段,另一个名为 commit_scnscn 字段是一个字符串,代表捕获更改时连接器的低水位线开始位置。

  1. 查找包含连接器偏移的主题名称。这根据设为 offset.storage.topic 配置属性的值进行配置。
  2. 查找连接器的最后偏移量,其下存储了密钥,并确定用于存储偏移的分区。这可以通过 Kafka 代理安装提供的 kafkacat 工具脚本完成。一个示例可能类似如下:

    kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
    Partition(11) ["inventory-connector",{"server":"server1"}] {"scn":"324567897", "commit_scn":"324567897: 0x2832343233323:1"}

    inventory-connector 的键是 ["inventory-connector",{"server":"server1"}],分区是 11,最后一个偏移是键跟随的内容。

  3. 要移回以前的偏移偏移,应停止连接器,必须发出以下命令:

    echo '["inventory-connector",{"server":"server1"}]|{"scn":"3245675000","commit_scn":"324567500"}' | \
    kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11

    这会写入 my_connect_offsets 主题的 my_connect_offsets 主题的分区 11,即给定的键和偏移值。在本例中,我们将连接器重新定向到 SCN 3245675000,而不是 324567897

如果连接器无法使用给定偏移 SCN 查找日志,会发生什么情况?

Debezium 连接器在连接器偏移中维护低和高水位线 SCN 值。low-watermark SCN 代表起始位置,必须存在于可用的在线红色或存档日志中,以便连接器成功启动。当连接器报告无法找到这个偏移 SCN 时,这表示仍可用的日志不包含 SCN,因此连接器无法从其离开的地方清除更改。

发生这种情况时,有两个选项。第一个是删除连接器的历史记录主题和偏移量,并建议重启连接器。这将保证任何主题消费者都不会发生数据丢失。第二个是手动操作偏移,将 SCN 传播到红色或存档日志中可用的位置。这将导致在旧的 SCN 值和新提供的 SCN 值之间发生的更改丢失,且不会写入主题。不建议这样做。

各种 mining 策略之间的区别是什么?

Debezium Oracle 连接器为 log.mining.strategy 提供了三个选项。

默认为 online_catalog,这指示连接器不会将数据字典写入红色日志。相反,Oracle LogMiner 将始终使用包含表结构当前状态的在线数据字典。这也意味着,如果表的结构更改,并且不再匹配在线数据字典,如果表的结构发生更改,Oracle LogMiner 将无法解析表或列名称。如果捕获的表可能会有频繁的模式更改,则不应使用此 mining 策略选项。务必要确保所有数据更改都用 schema 更改进行锁定,以便所有更改都已从表的日志中捕获,停止连接器,应用 schema 更改,然后重新启动连接器并恢复表上的数据更改。这个选项需要较少的 Oracle 数据库内存和 Oracle LogMiner 会话,通常启动非常快,因为数据字典不需要被 LogMiner 进程加载或主要。

第二个选项 redo_log_catalog 每次检测到日志交换机时,将 Oracle 数据字典写入红色日志。Oracle LogMiner 在解析 redo 和 archive 日志时,这个数据字典需要有效跟踪模式更改。这个选项将生成比归档日志的通常数多,但允许捕获的表实时操作,而不影响捕获数据更改。这个选项通常需要更多 Oracle 数据库内存,并且将导致 Oracle LogMiner 会话和进程在每次日志切换后启动稍有更长的时间。

最终选项 混合 结合了上述两个策略的优点和其弱点。此策略利用了 online_catalog 的性能,在对 redo_log_catalog 的模式跟踪方面具有弹性,同时避免了比普通存档日志生成更高的开销和性能成本。这个模式使用回退模式,如果 LogMiner 无法重建数据库更改的 SQL,Debezium 连接器将依赖于连接器维护的内存中模式模型来重建 SQL in-flight。其目的是,此模式最终将转换到默认值,并可能在以后只有操作模式。

使用 LogMiner 对 Hybrid mining 策略存在任何限制?
是的,log.mining.strategy 的 Hybrid 模式仍然是 work-in-progress 策略,因此还不支持所有数据类型。目前,这个模式无法重建 SQL 语句,其中包括针对 CLOBNCLOBBLOBXMLJSON 数据类型的操作。因此,如果您启用了值为 truelob.enabled,则无法使用混合策略,连接器将无法启动,因为不支持此组合。
为什么连接器会出现停止捕获 AWS 上的更改?

由于 AWS 网关负载平衡器上 350 秒的固定空闲超时,需要超过 350 秒的 JDBC 调用可以无限期地挂起。

如果调用 Oracle LogMiner API 完成超过 350 秒时,可能会触发超时,从而导致 AWS 网关负载平衡器挂起。例如,当 LogMiner 会话处理大量数据与 Oracle 的定期检查点任务同时运行时,可能会发生这样的超时。

要防止在 AWS Gateway Load Balancer 上发生超时的情况,请通过以 root 或 super-user 用户身份执行以下步骤来启用来自 Kafka Connect 环境的 keep-alive 数据包:

  1. 在终端中运行以下命令:

    sysctl -w net.ipv4.tcp_keepalive_time=60
  2. 编辑 /etc/sysctl.conf 并设置以下变量的值,如下所示:

    net.ipv4.tcp_keepalive_time=60
  3. 重新配置用于 Oracle 连接器的 Debezium 以使用 database.url 属性而不是 database.hostname,并添加 (ENABLE=broken) Oracle 连接字符串描述符,如下例所示:

    database.url=jdbc:oracle:thin:username/password!@(DESCRIPTION=(ENABLE=broken)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(Host=hostname)(Port=port)))(CONNECT_DATA=(SERVICE_NAME=serviceName)))

前面的步骤将 TCP 网络堆栈配置为每 60 秒发送 keep-alive 数据包。因此,当 JDBC 调用 LogMiner API 完成 350 秒时,AWS Gateway Load Balancer 不会超时,使连接器能够继续从数据库事务日志中读取更改。

ORA-01555 的原因是什么?如何处理它?

Debezium Oracle 连接器在初始快照阶段执行时使用闪存查询。闪存查询是一种特殊的查询类型,它依赖于闪存区域,由数据库的 UNDO_RETENTION 数据库参数维护,以根据表的内容在给定时间或给定 SCN 中返回查询的结果。默认情况下,Oracle 通常只维护一个撤消或闪存区域大约 15 分钟,除非数据库管理员已增加或减少。对于捕获大型表的配置,可能需要超过 15 分钟,或者配置了 UNDO_RETENTION 来执行初始快照,这最终会导致这个例外:

ORA-01555: snapshot too old: rollback segment number 12345 with name "_SYSSMU11_1234567890$" too small

处理这个例外的第一个方法是与您的数据库管理员合作,并查看它们是否可以临时增加 UNDO_RETENTION 数据库参数。这不需要重新启动 Oracle 数据库,因此这可在不影响数据库可用性的情况下在线完成。但是,如果表空间不足以存储所需的撤消数据,则更改可能会导致上述异常或"快照过旧"异常。

处理这个异常的第二个方法是根本不依赖于初始快照,将 snapshot.mode 设置为 no_data,然后依赖于增量快照。增量快照不依赖于闪存查询,因此不受 ORA-01555 例外的约束。

ORA-04036 的原因是什么?如何进行处理?

当数据库发生经常时,Debebe Oracle 连接器可能会报告 ORA-04036 异常。在检测到日志交换机前,启动并重复使用 Oracle LogMiner 会话。会话被重新使用,因为它提供与 Oracle LogMiner 的最佳性能利用,但如果发生长时间运行的 mining 会话,这可能会导致过量 PGA 内存用量,最终导致例外:

ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMIT

通过指定 Oracle 交换机红色日志的频率或 Debezium Oracle 连接器可以重新使用 mining 会话来避免这个例外。Debezium Oracle 连接器提供了一个配置选项 log.mining.session.max.ms,它控制当前 Oracle LogMiner 会话在关闭和启动新会话的时长。这允许在不超过数据库允许的 PGA 内存的情况下保留数据库资源。

ORA-01882 的原因是什么,以及如何处理它?

Debezium Oracle 连接器在连接到 Oracle 数据库时可能会报告以下异常:

ORA-01882: timezone region not found

当时区信息无法被 JDBC 驱动程序正确解析时,会出现这种情况。为了解决这个问题,需要告知驱动程序无法使用区域解析时区详情。这可以通过使用 driver. oracle.jdbc.timezoneAsRegion=false 指定驱动程序 pass through 属性来实现。

ORA-25191 和如何处理它的原因是什么?

Debezium Oracle 连接器会自动忽略 index-organized 表(IOT),因为 Oracle LogMiner 不支持它们。但是,如果抛出 ORA-25191 异常,这可能是由于映射的唯一情况,并且可能需要额外规则来自动排除这些。ORA-25191 异常示例可能类似如下:

ORA-25191: cannot reference overflow table of an index-organized table

如果抛出 ORA-25191 异常,请引发 JIRA 问题,以及表及其与其他父表相关的映射等。作为临时解决方案,可以调整 include/exclude 配置选项,以防止连接器访问这些表。

如何解决 SAX 功能外部识别 - 不支持

使用 Debezium Oracle 连接器的 XMLTYPE 作为技术预览提供。要使用这个功能,需要 Oracle xdbxmlparserv2 依赖项。

Oracle 的 xmlparserv2 依赖项实现了基于 SAX 的解析器,如果运行时发现使用了这个实现的解析程序,则在 classpath 上会发生这个错误。为了影响一般使用 SAX 的实施,需要以特定参数启动 JVM。

提供以下 JVM 参数后,Oracle 连接器将成功启动,而不会出现这个错误。

-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.