2.5. 用于 Oracle 的 Debezium Connector
Debezium 的 Oracle 连接器捕获并记录 Oracle 服务器上的数据库中发生的行级更改,包括在连接器运行时添加的表。您可以将连接器配置为为特定模式和表的子集发出更改事件,或者在特定列中忽略、掩码或截断值。
有关与此连接器兼容的 Oracle 数据库版本的详情,请参考 Debezium 支持的配置 页面。
Debezium 可以使用原生 LogMiner 数据库软件包或 XStream API 从 Oracle 中获取最新的更改事件。
将 Debezium Oracle 连接器与 XStream 搭配使用是一个开发者预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。此软件可能没有任何文档,可以随时更改或删除,并且已获得有限的测试。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。
有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围。
使用 Debezium Oracle 连接器的信息和流程组织如下:
2.5.1. Debezium Oracle 连接器如何工作
要优化地配置和运行 Debezium Oracle 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称、使用元数据并实现事件缓冲会很有帮助。
如需更多信息,请参阅以下主题:
2.5.1.1. Debezium Oracle 连接器如何执行数据库快照
通常,Oracle 服务器上的红色日志配置为不保留数据库的完整历史记录。因此,Debezium Oracle 连接器无法从日志中检索数据库的完整历史记录。要让连接器为数据库的当前状态建立基线,连接器首次启动时,它会执行数据库的初始 一致快照。
如果完成初始快照所需的时间超过为数据库设置的 UNDO_RETENTION
时间(默认为十分钟),则可能会出现 ORA-01555 异常。有关错误的更多信息,以及您可以从该错误中恢复的步骤,请参阅 常见问题解答。
在表的快照期间,Oracle 可能会引发 ORA-01466 异常。当用户修改表的模式或添加、更改或丢弃与快照关联的索引或相关对象时,会出现这种情况。如果发生这种情况,连接器将停止,需要从开始获取初始快照。
要解决这个问题,您可以使用大于 0
的值配置 snapshot.database.errors.max.retries
属性,以便特定表的快照将重启。虽然整个快照不会在重试时从开始开始,但问题中的特定表将从开始重新读取,而表的主题将包含重复的快照事件。
您可以在以下部分中找到有关快照的更多信息:
2.5.1.1.1. Oracle 连接器用于执行初始快照的默认工作流
以下工作流列出了 Debezium 创建快照的步骤。这些步骤描述了当 snapshot.mode
配置属性设置为默认值时快照的进程,这是 初始
。您可以通过更改 snapshot.mode
属性的值来自定义连接器创建快照的方式。如果您配置不同的快照模式,连接器使用此工作流的修改版本完成快照。
当快照模式设置为默认值时,连接器完成以下任务来创建快照:
- 建立与数据库的连接。
-
确定要捕获的表。默认情况下,连接器捕获所有表,除了那些与 捕获中排除的 schema 之外的表。快照完成后,连接器将继续流传输指定表的数据。如果您希望连接器只从特定表捕获数据,您可以通过设置
table.include.list
或table.exclude.list
等属性来仅捕获表或表元素的子集的数据。 -
在每个捕获的表上获取
ROW SHARE MODE
锁定,以防止在创建快照期间发生结构更改。Debezium 只保存一个短时间锁定。 - 从服务器的 redo 日志中读取当前系统更改号(SCN)的位置。
捕获所有数据库表的结构或指定用于捕获的所有表。连接器在其内部数据库模式历史记录主题中保留 schema 信息。架构历史记录提供有关发生更改事件时所生效的结构的信息。
注意默认情况下,连接器捕获数据库中处于捕获模式的每个表的模式,包括没有为捕获配置的表。如果没有为捕获配置表,则初始快照只捕获其结构;它不会捕获任何表数据。有关您在初始快照中没有包括快照持久模式信息的更多信息,请参阅 了解为什么初始快照捕获所有表的 schema。
- 释放在第 3 步中获取的锁定。其他数据库客户端现在可以写入任何之前锁定的表。
在第 4 步中读取的 SCN 位置,连接器扫描为捕获指定的表(
SELECT * FROM … AS OF SCN 123
)。在扫描过程中,连接器完成以下任务:- 确认表已在快照开始之前创建。如果在快照开始后创建了表,连接器会跳过该表。快照完成后,连接器过渡到 streaming,它会为快照开始后创建的任何表发出更改事件。
-
为从表中捕获的每行生成一个
读取
事件。所有读取
事件都包含相同的 SCN 位置,这是在第 4 步中获取的 SCN 位置。 -
向源表的 Kafka 主题发出每个
读取
事件。 - 释放数据表锁定(如果适用)。
- 在连接器偏移中记录快照成功完成。
生成的初始快照会捕获捕获表中每行的当前状态。在这个基准状态中,连接器会捕获后续更改。
在快照过程开始后,如果进程因为连接器失败、重新平衡或其他原因而中断,进程会在连接器重启后重启。连接器完成初始快照后,它会从第 3 步中读取的位置继续流传输,以便它不会丢失任何更新。如果连接器因任何原因再次停止,它会在重启后恢复流更改,从之前离开的位置恢复流更改。
设置 | 描述 |
---|---|
| 在每个连接器启动时执行快照。快照完成后,连接器将开始流传输后续数据库更改的事件记录。 |
| 连接器执行数据库快照,如用于创建初始快照的默认工作流 中所述。快照完成后,连接器将开始流传输后续数据库更改的事件记录。 |
| 连接器执行数据库快照并在流传输任何更改事件记录前停止,不允许捕获任何后续更改事件。 |
|
弃用,请参阅 |
|
连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但不创建 |
|
弃用, |
|
设置这个选项以恢复丢失或损坏的数据库 schema 历史记录主题。重启后,连接器运行一个从源表中重建主题的快照。您还可以设置属性,以定期修剪遇到意外增长的数据库 schema 历史记录主题。 |
| 连接器启动后,只有在检测到以下情况之一时才执行快照:
|
如需更多信息,请参阅连接器配置属性表中的 snapshot.mode
。
2.5.1.1.2. 有关初始快照捕获所有表的 schema 历史记录的描述
连接器运行的初始快照捕获两种类型的信息:
- 表数据
-
有关
INSERT
、UPDATE
和DELETE
操作的信息,它们在连接器的table.include.list
属性中命名。 - 模式数据
- 描述应用到表的结构更改的 DDL 语句。如果配置了 schema 数据,则 schema 数据会被保留到内部模式历史记录主题,以及连接器的 schema 更改主题。
运行初始快照后,您可能会注意到快照捕获了未指定用于捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的每个表的模式信息,而不仅限于为捕获指定的表。连接器要求表的 schema 在捕获表之前存在于 schema 历史记录主题中。通过启用初始快照来捕获不属于原始捕获集的表,Debezium 准备连接器,以便稍后需要从这些表中读取事件数据。如果初始快照没有捕获表的模式,您必须将 schema 添加到历史记录主题,然后才能从表中捕获数据。
在某些情况下,您可能想要限制初始快照中的模式捕获。当您要减少完成快照所需的时间时,这非常有用。或者,当 Debezium 通过有权访问多个逻辑数据库的用户帐户连接到数据库实例时,但您希望连接器只从特定逻辑数据库中的表捕获更改。
其他信息
- 从初始快照未捕获的表中捕获数据(无模式更改)
- 从没有由初始快照捕获的表捕获数据(schema 更改)
-
设置
schema.history.internal.store.only.captured.tables.ddl
属性,以指定要从中捕获模式信息的表。 -
设置
schema.history.internal.store.only.captured.databases.ddl
属性,以指定捕获模式更改的逻辑数据库。
2.5.1.1.3. 从初始快照未捕获的表中捕获数据(无模式更改)
在某些情况下,您可能希望连接器从最初快照未捕获其 schema 的表中捕获数据。根据连接器配置,初始快照可能会只捕获数据库中特定表的表模式。如果历史记录主题中没有表模式,连接器无法捕获表,并报告缺少的 schema 错误。
您可能仍然能够从表中捕获数据,但您必须执行额外的步骤来添加表模式。
先决条件
- 您需要使用一个模式来捕获数据,连接器在初始快照过程中不会捕获。
- 事务日志中表的所有条目都使用相同的模式。有关从具有结构化更改的新表捕获数据的详情,请参考 第 2.5.1.1.4 节 “从没有由初始快照捕获的表捕获数据(schema 更改)”。
流程
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 在连接器配置中:
-
将
snapshot.mode
设置为recovery
。 -
(可选)将
schema.history.internal.store.only.captured.tables.ddl
的值设置为false
,以确保在以后没有为捕获的表捕获数据。只有在历史记录主题中存在表的 schema 历史记录时,连接器才能从表中捕获数据。 -
添加您希望连接器捕获到
table.include.list
的表。
-
将
- 重启连接器。快照恢复过程会根据表的当前结构重建模式历史记录。
- (可选)快照完成后,在新添加的表中启动 增量快照。增量快照首先会流传输新添加的表的历史数据,然后恢复从之前配置的表读取更改和存档日志,包括该连接器在连接器离线时发生的更改。
-
(可选)将
snapshot.mode
重新重置为no_data
,以防止连接器在以后的重启后启动恢复。
2.5.1.1.4. 从没有由初始快照捕获的表捕获数据(schema 更改)
如果将架构更改应用到表,则在架构更改前提交的记录与更改后提交的结构不同。当 Debezium 捕获表中的数据时,它会读取 schema 历史记录,以确保它将正确的模式应用到每个事件。如果 schema 历史记录主题中没有 schema,连接器将无法捕获表,并会产生错误结果。
如果要捕获初始快照未捕获的表中的数据,并修改了表的 schema,您必须将 schema 添加到历史记录主题(如果还没有可用)。您可以通过运行新的模式快照或运行表的初始快照来添加架构。
先决条件
- 您需要使用一个模式来捕获数据,连接器在初始快照过程中不会捕获。
- 对表应用了一个架构更改,因此要捕获的记录没有统一的结构。
流程
- 初始快照捕获了所有表的模式(
storage.only.captured.tables.ddl
设置为false
) -
编辑
table.include.list
属性,以指定您要捕获的表。 - 重启连接器。
- 如果要从新添加的表中捕获现有数据,则启动 增量快照。
-
编辑
- 初始快照没有捕获所有表的模式(storage
.only.captured.tables.ddl
设置为true
) 如果初始快照没有保存您要捕获的表的模式,请完成以下步骤之一:
- 流程 1:Schema 快照,后跟增量快照
在此过程中,连接器首先执行 schema 快照。然后,您可以启动增量快照,使连接器能够同步数据。
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 清除配置的 Kafka Connect
offset.storage.topic
中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答。警告删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,并且仅应作为最后的手段来执行。
为连接器配置中的属性设置值,如以下步骤所述:
-
将
snapshot.mode
属性的值设置为no_data
。 -
编辑
table.include.list
以添加您要捕获的表。
-
将
- 重启连接器。
- 等待 Debezium 捕获新表和现有表的模式。连接器停止后发生的数据更改不会被捕获。
- 为确保没有数据丢失,可启动 增量快照。
- 流程 2:初始快照,后跟可选的增量快照
在此过程中,连接器执行数据库的完整初始快照。与任何初始快照一样,在具有许多大表的数据库中,运行初始快照可能是一个耗时的操作。快照完成后,您可以选择性地触发增量快照,以捕获连接器离线期间发生的任何更改。
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 清除配置的 Kafka Connect
offset.storage.topic
中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答。警告删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,并且仅应作为最后的手段来执行。
-
编辑
table.include.list
以添加您要捕获的表。 为连接器配置中的属性设置值,如以下步骤所述:
-
将
snapshot.mode
属性的值设置为initial
。 -
(可选)将
schema.history.internal.store.only.captured.tables.ddl
设置为false
。
-
将
- 重启连接器。连接器使用完整的数据库快照。快照完成后,连接器会过渡到 streaming。
- (可选)要捕获连接器脱机时更改的任何数据,请启动 增量快照。
2.5.1.2. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。按照这个初始快照,在正常情况下,连接器不会重复快照过程。任何以后更改连接器捕获的事件数据都会通过流处理。
然而,在某些情况下,在初始快照中获取的连接器可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行临时快照:
- 连接器配置会被修改为捕获不同的表集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。
临时快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或者仅捕获数据库中表的子集。此外,快照也可捕获数据库中表的内容的子集。
您可以通过向信号表发送 execute-snapshot
消息来指定要捕获的表。将 execute-snapshot
信号的类型设置为 incremental
或 blocking
,并提供快照中包含的表名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| N/A |
包含与快照中包含的表的完全限定域名匹配的正则表达式的数组。 |
| N/A |
可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
|
| N/A | 可选字符串,用于指定连接器在快照过程中用作表的主键的列名称。 |
触发临时增量快照
您可以通过向信号表添加带有 execute-snapshot
信号类型的条目,或者向 Kafka 信号发送信号消息 来启动临时增量快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个表的开始和端点。根据表中的条目数量以及配置的块大小,Debezium 会将表分成块,并持续持续对每个块进行快照。
如需更多信息,请参阅 增加快照。
触发临时阻塞快照
您可以通过在信号表或信号主题中添加带有 execute-snapshot
信号类型的条目来启动临时阻塞快照。连接器处理消息后,它会开始快照操作。连接器会临时停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,连接器会恢复流。
如需更多信息,请参阅 块快照。
2.5.1.3. 增量快照
为了在管理快照方面提供灵活性,Debezium 包含一个补充快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,而不是像初始快照一样捕获数据库的完整状态,而是在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小是 1024 行。
当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护其捕获的每个表行的记录。与标准初始快照过程相比,这个分阶段方法捕获数据具有以下优点:
- 您可以并行运行带有流数据捕获的增量快照,而不是延迟流,直到快照完成为止。连接器将继续在整个快照过程中从更改日志捕获接近实时事件,且操作都会阻止另一个事件。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从它停止的点开始,而不是从开始重新捕获表。
-
您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其
table.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 按主密钥对每个表进行排序,然后根据 配置的块大小 将表分成块。通过块工作的块,然后捕获块中的每个表行。对于它捕获的每行,快照会发出 READ
事件。该事件代表了块开始快照时所在行的值。
当快照进行时,其他进程可能会继续访问数据库,可能会修改表记录。要反映此类更改,INSERT
、UPDATE
或 DELETE
操作会按预期提交到事务日志。同样,持续 Debezium 流过程会继续检测到这些更改事件,并将对应的更改事件记录发送到 Kafka。
Debezium 如何处理具有相同主键的记录冲突
在某些情况下,streaming 进程发出的 UPDATE
或 DELETE
事件会按顺序接收。也就是说,流处理可能会发出一个事件,在快照捕获了包含该行的 READ
事件前修改表行。当快照最终为行发出对应的 READ
事件时,其值已经被取代。为确保以正确的逻辑顺序处理出序列的增量快照事件,Debezium 采用缓冲方案来解决冲突。只有在快照事件和流事件之间冲突后,才会解析 Debezium 向 Kafka 发出事件记录。
快照窗口
为了帮助解决 late-arriving READ
事件和修改同一表行之间的冲突,Debezium 会使用一个所谓的 快照窗口。快照窗口分离间隔,在此期间会捕获指定表块的数据。在块的快照窗口打开前,Debezium 会遵循其通常的行为,并将事件从下游直接发送到目标 Kafka 主题。但是,从为特定块的快照打开,直到它关闭为止,Deduplication 会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。
对于每个数据收集,Debebe 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表捕获的快照记录被发送为 READ
操作。同时,当用户继续更新数据收集中的记录,并且更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debebe 开始处理快照块,它会向内存缓冲提供快照记录。在快照窗口中,缓冲区中 READ
事件的主键与传入流事件的主键进行比较。如果没有找到匹配项,则流的事件记录直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃 buffered READ
事件,并将流记录写入目标主题,因为以逻辑方式取代静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ
事件。Debezium 将这些剩余的 READ
事件发送到表的 Kafka 主题。
连接器会为每个快照块重复这个过程。
目前,您可以使用以下任一方法启动增量快照:
Oracle 的 Debezium 连接器不支持增量快照运行时的 schema 更改。
2.5.1.3.1. 触发增量快照
要启动增量快照,您可以发送 临时快照信号 到源数据库上的信号表。您可以提交快照信号,作为 SQL INSERT
查询。
在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,并可选择性地指定快照操作的类型。Debezium 目前支持 增量
和 阻塞
快照类型。
要指定要包含在快照中的表,提供一个列出表的 data-collections
数组,或用于匹配表的正则表达式数组,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debebe 会解释空数组,意味着不需要任何操作,且不会执行快照。
如果要包含在快照中的表的名称包含一个点(.
)、空格或其它非字母数字字符,则必须使用双引号转义表名称。
例如,若要将 公共
模式中存在的表包含在 db1
数据库中,并且名称为 My.Table
,请使用以下格式 :"db1.public.\"My.Table\"
"。
先决条件
启用信号。
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道触发增量快照
发送 SQL 查询,将临时增量快照请求添加到信号表中:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
例如,
INSERT INTO db1.myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], 4 "type":"incremental", 5 "additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=\'blue\'"}]}'); 6
命令中的
id
、type
和data
参数的值 与信号表的字段 相对应。
下表描述了示例中的参数:表 2.108. SQL 命令中的字段描述,用于将增量快照信号发送到信号表 项 值 描述 1
database.schema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定一个任意字符串,它被分配为信号请求的id
标识符。
使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。相反,在快照过程中,Debebe 会生成自己的id
字符串作为水位线信号。3
execute-snapshot
type
参数指定信号要触发的操作。
4
data-collections
信号的必需组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
数组列出了使用格式database.schema.table
的正则表达式,以匹配表的完全限定域名。此格式与您用来指定连接器 信号表的名称相同。5
incremental
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
有效值为incremental
和blocking
值。
如果没有指定值,连接器默认为执行增量快照。6
additional-conditions
可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
每个额外条件都是带有data-collection
和filter
属性的对象。您可以为每个数据收集指定不同的过滤器。
请参阅data-collection
属性是过滤器应用到的数据收集的完全限定域名。有关additional-conditions
参数的详情,请参考 第 2.5.1.3.2 节 “使用附加条件
运行临时增量快照”。
2.5.1.3.2. 使用附加 条件
运行临时增量快照
如果您希望快照只在表中包括内容子集,您可以通过将 additional-conditions
参数附加到快照信号来修改信号请求。
对典型快照的 SQL 查询采用以下格式:
SELECT * FROM <tableName> ....
通过添加 additional-conditions
参数,您可以在 SQL 查询中附加 WHERE
条件,如下例所示:
SELECT * FROM <data-collection> WHERE <filter> ....
以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
例如,假设您有一个包含以下列的 products
表:
-
ID
(主密钥) -
color
-
quantity
如果您需要 product
表的增量快照,其中只包含 color=blue
的数据项,您可以使用以下 SQL 语句来触发快照:
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
additional-conditions
参数还允许您传递基于多个列的条件。例如,使用上例中的 product
表,您可以提交查询来触发增量快照,该快照仅包含 color=blue
和 quantity>10
的项数据:
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
例 2.32. 增量快照事件消息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "ts_us":"1620393591654547", "ts_ns":"1620393591654547920", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
2.5.1.3.3. 使用 Kafka 信号频道触发增量快照
您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。
Kafka 消息的密钥必须与 topic.prefix
连接器配置选项的值匹配。
消息的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 execute-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前 Debezium 支持 |
| N/A |
以逗号分隔的正则表达式,与快照中包含的表的完全限定域名匹配。 |
| N/A |
可选的附加条件数组,用于指定连接器评估以指定快照中包含的记录子集的条件。 |
例 2.33. execute-snapshot
Kafka 信息
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
带有额外条件的临时增量快照
Debezium 使用 additional-conditions
字段来选择表内容的子集。
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM < ;tableName> ….
当快照请求包含 additional-conditions
属性时,属性的 data-collection
和 filter
参数会附加到 SQL 查询中,例如:
SELECT * FROM < ;data-collection> WHERE & lt;filter> ….
例如,如果一个带有列 ID (主键)、颜色
和品牌
的 products
表,如果您希望快照只包含 color='blue'
的内容,当请求快照时,您可以添加 additional-conditions
属性来过滤内容:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
您还可以使用 additional-conditions
属性来根据多个列传递条件。例如,使用与上例中的相同 product 表,如果您希望快照只包含 color='blue'
和 brand='MyBrand'
的
表中的内容,您可以发送以下请求:
products
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.5.1.3.4. 停止增量快照
在某些情况下,可能需要停止增量快照。例如,您可能意识到快照没有被正确配置,或者您可能要确保资源可用于其他数据库操作。您可以通过向源数据库上的信号发送信号来停止已经运行的快照。
您可以通过在 SQL INSERT
查询中发送停止快照信号,向信号表提交停止快照信号。stop-snapshot 信号将快照操作的类型指定为 增量
,并选择性地指定要从当前运行的快照中省略的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行中时停止增量快照操作。
其他资源
您还可以通过向 Kafka 信号发送 JSON 消息来停止增量快照。
先决条件
启用信号。
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道停止增量快照
发送 SQL 查询以停止临时增量快照到信号表:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
例如,
INSERT INTO db1.myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], 4 "type":"incremental"}'); 5
signal 命令中的
id
、type
和data
参数的值与 信号表的字段相对应。
下表描述了示例中的参数:表 2.111. SQL 命令中的字段描述,用于将停止增量快照信号发送到信号表 项 值 描述 1
database.schema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定一个任意字符串,它被分配为信号请求的id
标识符。
使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。3
stop-snapshot
指定
type
参数,指定信号要触发的操作。
4
data-collections
信号的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
数组列出了正则表达式,该表达式通过其完全限定名称匹配表,格式为database.schema.table
如果您从
data
字段省略这个组件,信号将停止正在进行的整个增量快照。5
incremental
信号的必需组件,用于指定要停止的快照操作类型。
目前,唯一有效的选项为增量
。
如果没有指定类型
值,信号将无法停止增量快照。
2.5.1.3.5. 使用 Kafka 信号频道停止增量快照
您可以向 配置的 Kafka 信号主题 发送信号消息,以停止临时增量快照。
Kafka 消息的密钥必须与 topic.prefix
连接器配置选项的值匹配。
消息的值是带有 type
和 data
字段的 JSON 对象。
signal 类型是 stop-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前 Debezium 只支持 |
| N/A |
可选的、以逗号分隔的正则表达式,与表的完全限定名称匹配,表名称或正则表达式,以匹配要从快照中删除的表名称。 |
以下示例显示了典型的 stop-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`
2.5.1.4. 阻塞快照
为了在管理快照方面提供更多灵活性,Debezium 包含一个额外的临时快照机制,称为 阻塞快照。阻塞快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
阻塞快照的行为与 初始快照 相似,但您可以在运行时触发快照。
您可能想要在以下情况下运行阻塞快照,而不是使用标准初始快照过程:
- 您可以添加新表,并在连接器运行时完成快照。
- 您可以添加大表,并且您希望快照在短时间内完成,而不是通过增量快照完成。
阻塞快照过程
当您运行阻塞快照时,Debebe 会停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,会恢复流。
配置快照
您可以在信号 的数据
组件中设置以下属性:
- data-collections:指定哪个表必须是快照。
-
data-collections :指定您要包括快照的表。
此属性接受与完全限定表名称匹配的正则表达式列表。属性的行为与table.include.list
属性的行为类似,它指定要捕获在阻塞快照中的表。 additional-conditions :您可以为不同的表指定不同的过滤器。
-
data-collection
属性是要应用过滤器的表的完全限定名称,并可区分大小写或区分大小写,具体取决于数据库。 -
filter
属性将具有与snapshot.select.statement.overrides
时使用的相同值,即条件应当匹配的表的完全限定域名。
-
例如:
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
可能的副本
您发送信号触发快照的时间之间可能会有延迟,以及流停止和快照启动时的时间。因此,在快照完成后,连接器可能会发出一些由快照捕获的重复记录的事件记录。
2.5.1.5. 接收 Debezium Oracle 更改事件记录的 Kafka 主题的默认名称
默认情况下,Oracle 连接器将所有 INSERT
、UPDATE
和 DELETE
操作的更改事件写入一个特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:
topicPrefix.schemaName.tableName
以下列表提供了默认名称组件的定义:
- topicPrefix
-
由
topic.prefix
连接器配置属性指定的主题前缀。 - schemaName
- 发生操作的模式的名称。
- tableName
- 操作发生的表的名称。
例如,如果 fulfillment
是服务器名称,inventory
是 schema 名称,数据库包括名为 orders
, customers
, 和 products
的表,Debezium Oracle 连接器会向以下 Kafka 主题发送事件,数据库中的每个表有一个。
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products
连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 和事务元数据主题。
如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由。
2.5.1.6. Debezium Oracle 连接器如何处理数据库架构更改
当数据库客户端查询数据库时,客户端将使用数据库的当前架构。但是,可以随时更改数据库架构,这意味着连接器必须能够识别每次插入、更新或删除操作时的 schema。另外,连接器不一定将当前模式应用到每个事件。如果事件相对旧,则在应用当前模式之前记录这些事件。
为确保在架构更改后处理发生的正确事件,Oracle 包含在 redo 日志中包括影响数据的行级更改,以及应用到数据库的 DDL 语句。当连接器在 redo 日志中遇到这些 DDL 语句时,它会解析它们并更新每个表的 schema 的内存中表示。连接器使用此架构表示在每次插入、更新或删除操作时标识表的结构,并生成适当的更改事件。在单独的数据库架构历史记录 Kafka 主题中,连接器记录了所有 DDL 语句,以及出现每个 DDL 语句的红色日志中的位置。
当连接器在崩溃或安全停止后重启时,它开始从特定位置读取红色日志,即从特定时间点读取红色日志。连接器通过读取数据库模式历史记录 Kafka 主题来重建在此时存在的表结构,并在连接器启动的红色日志中解析所有 DDL 语句。
此数据库架构历史记录主题仅供内部使用。另外,连接器也可以将 schema 更改事件发送到面向消费者应用程序的不同主题。
其他资源
- 接收 Debezium 事件记录 的主题的默认名称。
2.5.1.7. Debezium Oracle 连接器如何公开数据库 schema 的变化
您可以配置 Debezium Oracle 连接器来生成模式更改事件,这些事件描述了应用到数据库中表的结构更改。连接器将模式更改事件写入名为 < serverName>
; 的 Kafka 主题,其中 serverName
是 topic.prefix
配置属性中指定的命名空间。
当 Debezium 从新表流传输数据时,或更改表的结构时,Debebe 会向 schema 更改主题发送一条新消息。
连接器发送到 schema 更改主题的消息包含一个有效负载,并可以选择包含更改事件消息的 schema。
模式更改事件的 schema 具有以下元素:
name
- 模式更改事件消息的名称。
type
- 更改事件消息的类型。
version
- 架构的版本。version 是一个整数,每次更改 schema 时都会递增。
fields
- 更改事件消息中包含的字段。
示例:Oracle 连接器模式更改主题的 Schema
以下示例显示了 JSON 格式的典型模式。
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.oracle.SchemaChangeKey", "version": 1 }, "payload": { "databaseName": "inventory" } }
模式更改事件消息的有效负载包括以下元素:
ddl
-
提供导致 schema 的 SQL
CREATE
、ALTER
或DROP
语句。 databaseName
-
将语句应用到的数据库的名称。
databaseName
的值充当 message 键。 tableChanges
-
架构更改后整个表模式的结构化表示。
tableChanges
字段包含一个数组,其中包含表的每列条目。由于结构化表示以 JSON 或 Avro 格式显示数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析器处理它们。
默认情况下,连接器使用 ALL_TABLES
数据库视图来识别要存储在 schema 历史记录主题中的表名称。在该视图中,连接器只能从可连接到数据库的用户帐户的表中访问数据。
您可以修改设置,以便 schema 历史记录主题存储了不同的表子集。使用以下方法之一更改主题存储的表集合:
-
更改 Debezium 用于访问数据库的帐户的权限,以便在
ALL_TABLES
视图中看到不同的表集合。 -
将连接器属性
schema.history.internal.store.only.captured.tables.ddl
设置为true
。
当连接器被配置为捕获表时,它会存储表的模式更改历史记录,不仅存储在 schema 更改主题中,还存储在内部数据库架构历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要针对 schema 更改通知的应用程序只消耗了来自 schema 更改主题的信息。
永不对数据库 schema 历史记录主题进行分区。要使数据库架构历史记录主题正常工作,它必须保持一致的全局顺序,连接器向其发送的事件记录。
要确保主题不在分区中分割,请使用以下方法之一为主题设置分区计数:
-
如果您手动创建数据库架构历史记录主题,请指定分区计数
1
。 -
如果您使用 Apache Kafka 代理自动创建数据库模式历史记录主题,则会创建主题,将 Kafka
num.partitions
配置选项 的值设置为1
。
示例:向 Oracle 连接器 schema 更改主题发送的消息
以下示例显示了 JSON 格式的典型的模式更改消息。消息包含表 schema 的逻辑表示。
{ "schema": { ... }, "payload": { "source": { "version": "3.0.8.Final", "connector": "oracle", "name": "server1", "ts_ms": 1588252618953, "ts_us": 1588252618953000, "ts_ns": 1588252618953000000, "snapshot": "true", "db": "ORCLPDB1", "schema": "DEBEZIUM", "table": "CUSTOMERS", "txId" : null, "scn" : "1513734", "commit_scn": "1513754", "lcr_position" : null, "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "row_id": "AAASgjAAMAAAACnAAA" }, "ts_ms": 1588252618953, 1 "ts_us": 1588252618953987, 2 "ts_ns": 1588252618953987512, 3 "databaseName": "ORCLPDB1", 4 "schemaName": "DEBEZIUM", // "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n ( \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n \"FIRST_NAME\" VARCHAR2(255), \n \"LAST_NAME" VARCHAR2(255), \n \"EMAIL\" VARCHAR2(255), \n PRIMARY KEY (\"ID\") ENABLE, \n SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n ) SEGMENT CREATION IMMEDIATE \n PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n TABLESPACE \"USERS\" ", 5 "tableChanges": [ 6 { "type": "CREATE", 7 "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 8 "table": { 9 "defaultCharsetName": null, "primaryKeyColumnNames": [ 10 "ID" ], "columns": [ 11 { "name": "ID", "jdbcType": 2, "nativeType": null, "typeName": "NUMBER", "typeExpression": "NUMBER", "charsetName": null, "length": 9, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "FIRST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "LAST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "EMAIL", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 12 { "customAttribute": "attributeValue" } ] } } ] } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在源对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。 |
2 |
| 标识包含更改的数据库和模式。 |
3 |
| 此字段包含负责 schema 更改的 DDL。 |
4 |
| 包含 DDL 命令生成的架构更改的一个或多个项目的数组。 |
5 |
|
描述更改类型。
|
6 |
|
创建、更改或丢弃的表的完整标识符。对于表重命名,此标识符是 < |
7 |
| 代表应用的更改后的表元数据。 |
8 |
| 编写表主键的列列表。 |
9 |
| changed 表中每个列的元数据。 |
10 |
| 每个表更改的自定义属性元数据。 |
在连接器发送到 schema 更改主题的消息中,message 键是包含 schema 更改的数据库的名称。在以下示例中,payload
字段包含 databaseName
键:
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.oracle.SchemaChangeKey", "version": 1 }, "payload": { "databaseName": "ORCLPDB1" } }
2.5.1.8. Debezium Oracle 连接器生成的事件代表事务边界
Debezium 可以生成代表事务元数据边界的事件,以及丰富的数据更改事件消息。
Debezium 注册并接收部署连接器后发生的事务的元数据。部署连接器前发生的事务元数据不可用。
数据库事务由一个声明块表示,该块包含在 BEGIN
和 END
关键字之间。Debezium 为每个事务中的 BEGIN
和 END
分隔符生成事务边界事件。事务边界事件包含以下字段:
status
-
BEGIN
或END
. id
- 唯一事务标识符的字符串表示。
ts_ms
-
数据源的事务边界事件(
BEGIN
或END
事件)的时间。如果数据源没有向 Debezium 提供事件时间,则字段代表 Debezium 处理事件的时间。 event_count
(用于END
事件)- 事务处理的事件总数。
data_collections
(用于END
事件)-
一组
data_collection
和event_count
元素,用于指示连接器为来自数据收集的更改发出的事件数。
以下示例显示了典型的事务边界消息:
示例:Oracle 连接器事务边界事件
{ "status": "BEGIN", "id": "5.6.641", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "5.6.641", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER", "event_count": 1 }, { "data_collection": "ORCLPDB1.DEBEZIUM.ORDER", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则连接器会将事务事件发送到 < topic.prefix>
.transaction
主题。
2.5.1.8.1. Debezium Oracle 连接器如何使用事务元数据增强更改事件信息
如果启用了事务元数据,数据消息 Envelope
会增加一个新的 transaction
字段。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串表示。
total_order
- 事件在事务生成的所有事件间的绝对位置。
data_collection_order
- 事件在事务发送的所有事件中的每个数据收集位置。
以下示例显示了典型的事务事件信息:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "ts_us": "1580390884335741", "ts_ns": "1580390884335741963", "transaction": { "id": "5.6.641", "total_order": "1", "data_collection_order": "1" } }
LogMiner Mining 策略
Oracle redo 日志中的条目不会存储用户提交的原始 SQL 语句,以便进行 DML 更改。相反,红色条目包含一组更改向量,以及代表与这些向量相关的表、表和列的对象标识符。换句话说,红色日志条目不包括由 DML 更改影响的模式、表或列的名称。
Debezium Oracle 连接器使用 log.mining.strategy
配置属性来控制 Oracle LogMiner 如何处理更改向量中对象标识符的查找。在某些情况下,一个日志 mining 策略可能会比架构更改更可靠。但是,在选择日志 mining 策略前,务必要考虑其在性能和开销方面可能具有的影响。
编写数据字典以恢复日志
redo_log_catalog
mining 策略指示数据库在每个红色日志切换后立即将数据字典的副本刷新到红色日志。这是跟踪与数据更改交互的模式更改的最可靠策略,因为 Oracle LogMiner 在一系列更改向量的开始和结束数据字典状态之间进行干预。
但是,redo_log_catalog
模式也是最昂贵,因为它需要几个关键步骤才能正常工作。首先,此模式需要在每次日志切换后将数据字典刷新到红色日志。在每个交换机后清除日志可以快速消耗存档日志中重要的空间,而归档日志的高卷可能会超过数据库管理员为准备的数量。如果要使用此模式,请与您的数据库管理员协调以确保正确配置了数据库。
如果您将连接器配置为使用 redo_log_catalog
模式,请不要使用多个 Debezium Oracle 连接器来捕获来自同一逻辑数据库的更改。
直接使用在线目录
默认策略模式 online_catalog
的工作方式与 redo_log_catalog
模式不同。当策略设置为 online_catalog
时,数据库永远不会将数据字典刷新到红色日志。相反,Oracle LogMiner 始终使用最新的数据字典状态来执行比较。通过始终使用当前的字典,并消除对红色日志的清空,此策略需要较少的开销,并更有效地运行。但是,这些好处是无法解析内部模式更改和数据更改的偏移量。因此,这个策略有时可能会导致事件失败。
如果 LogMiner 在模式更改后无法重建 SQL 可靠性,请检查红色日志以了解证据。查找名称为 OBJ# 123456
(其中数字是表的对象标识符)或带有名称(如 COL1
或 COL2)
的表的引用。当您将连接器配置为使用 online_catalog
策略时,采取措施来确保表 schema 及其索引保持静态状态,并避免更改。如果 Debezium 连接器配置为使用 online_catalog
模式,且您必须应用 schema 更改,请执行以下步骤:
- 等待连接器捕获所有现有的数据更改(DML)。
- 执行架构(DDL)更改,然后等待连接器捕获更改。
- 在表上恢复数据更改(DML)。
按照以下步骤,确保 Oracle LogMiner 可以安全地重建 SQL 以了解所有数据更改。
混合方法
您可以通过将 log.mining.strategy
配置属性的值设置为 hybrid
来启用此策略。此策略的目标是为 redo_log_catalog
策略提供可靠性,并提供 online_catalog
策略的性能和低开销,而无需考虑任一策略的缺点。
混合
策略主要在 online_catalog
模式下运行,这意味着 Debezium Oracle 连接器首先将事件重建委派给 Oracle LogMiner。如果 Oracle LogMiner 成功重建 SQL,Debezium 会正常处理事件,就像它被配置为使用 online_catalog
策略一样。如果连接器检测到 Oracle LogMiner 无法重建 SQL,连接器会尝试通过使用该表对象的 schema 历史记录直接重建 SQL。只有在 Oracle LogMiner 和连接器无法重建 SQL 时,连接器才会报告失败。
如果 lob.enabled
属性设为 true
,则无法使用 hybrid
mining 策略。如果您需要流传输 CLOB
、BLOB
或 XML
数据,则只能使用 online_catalog
或 redo_log_catalog
策略。
查询模式
Debezium Oracle 连接器默认与 Oracle LogMiner 集成。此集成需要一组专门的步骤,其中包括生成复杂的 JDBC SQL 查询,以便尽可能在事务日志中记录的更改作为更改事件。JDBC SQL 查询使用的 V$LOGMNR_CONTENTS
视图没有任何索引来改进查询的性能,因此可以使用不同的查询模式来控制如何以改进查询的方式生成 SQL 查询。
log.mining.query.filter.mode
连接器属性可以配置以下内容之一,以便影响生成 JDBC SQL 查询的方式:
none
-
(默认)此模式会创建一个 JDBC 查询,它根据不同的操作类型(如插入、更新或删除)在数据库级别上过滤。当根据 schema, table, 或 username include/exclude 列表过滤数据时,这是在连接器中的处理循环过程中完成的。
当从数据库中捕获没有大量更改的表时,这个模式通常很有用。生成的查询非常简单,主要侧重于通过低数据库开销尽快读取。 in
-
这个模式会创建一个 JDBC 查询,它不仅过滤在数据库级别上的操作类型,还过滤模式、表和用户名包含/排除列表。查询的 predicates 使用 SQL in-clause 生成,具体取决于 include/exclude 列表配置属性中指定的值。
当从数据库捕获大量表时,这个模式通常很有用,而这些表在更改时非常饱和。生成的查询比none
模式复杂,并侧重于减少网络开销,并尽可能在数据库级别执行尽可能多的过滤。
最后,不要将 正则表达式指定为 schema 和 table include/exclude 配置属性的一部分。使用正则表达式将导致连接器与基于这些配置属性的更改不匹配,从而导致更改丢失。 regex
-
这个模式会创建一个 JDBC 查询,它不仅过滤在数据库级别上的操作类型,还过滤模式、表和用户名包含/排除列表。但是,与
in
模式不同,此模式会根据是否指定了 include 或 exclude 的值,使用 OracleREGEXP_LIKE
操作器生成 SQL 查询。
当捕获使用少量正则表达式的表号时,此模式通常很有用。生成的查询比任何其他模式复杂得多,并侧重于减少网络开销,并尽可能在数据库级别执行尽可能多的过滤。
2.5.1.9. Debezium Oracle 连接器如何使用事件缓冲
Oracle 将所有更改按其发生的顺序写入红色日志,包括稍后由回滚丢弃的更改。因此,来自独立事务的并发更改会被干扰。当连接器首先读取更改流时,因为它无法立即确定提交或回滚哪些更改,它会临时将更改事件存储在内部缓冲区中。提交更改后,连接器会将更改事件从缓冲区写入 Kafka。连接器丢弃了回滚丢弃的更改事件。
您可以通过设置属性 log.mining.buffer.type
来配置连接器使用的缓冲机制。
heap
默认缓冲区类型使用 memory
进行配置。在默认 内存设置
下,连接器使用 JVM 进程的堆内存来分配和管理缓冲的事件记录。如果您使用 内存
缓冲区设置,请确定分配给 Java 进程的内存量可以适合您的环境中的长时间运行和大型事务。
2.5.1.10. Debezium Oracle 连接器如何检测 SCN 值中的差距
当 Debezium Oracle 连接器配置为使用 LogMiner 时,它会使用一个基于系统更改号(SCN)的开始和结束范围从 Oracle 收集更改事件。连接器会自动管理此范围,根据连接器是否可以实时流更改,或者由于数据库中的大量或批量事务的卷而处理更改的积压。
在某些情况下,Oracle 数据库会使 SCN 变得非常高,而不是以恒定的速度增加 SCN 值。这种 SCN 值的 jump 可能会发生,因为特定集成与数据库交互,或者作为热备份等事件。
Debezium Oracle 连接器依赖于以下配置属性来检测 SCN 差距并调整 mining 范围。
log.mining.scn.gap.detection.gap.size.min
- 指定最小差距大小。
log.mining.scn.gap.detection.time.interval.max.ms
- 指定最大时间间隔。
连接器首先比较当前 SCN 和当前 mining 范围内的 SCN 之间的变化数量。如果当前 SCN 值和最高 SCN 值之间的区别大于最小差距,则连接器可能会检测到 SCN 差距。要确认是否存在差距,下一个连接器会比较当前 SCN 和前一个 mining 范围末尾的 SCN 的时间戳。如果时间戳之间的区别小于最大时间间隔,则会确认是否存在 SCN 差距。
当 SCN 差距发生时,Debezium 连接器会自动将当前的 SCN 用作当前 mining 会话范围的端点。这可让连接器快速捕获实时事件,而无需在返回任何更改之间减少较小的范围,因为 SCN 值被意外数量增加。当连接器执行上述响应 SCN 差距的步骤时,它会忽略 log.mining.batch.size.max 属性指定的值。连接器完成 mining 会话并捕获到实时事件后,它会恢复最大日志的强制批处理大小。
只有在连接器运行和处理接近实时事件时,SCN 差距检测才可用。
2.5.1.11. Debezium 如何管理数据库中不经常更改的偏移量
Debezium Oracle 连接器跟踪连接器偏移中的系统更改号,以便在连接器重启时,它可以从其离开的位置开始。这些偏移是每个发出的更改事件的一部分;但是,当数据库频率较低(每数小时或天)时,偏移可能会变得过时,并防止连接器在事务日志中不再提供,防止连接器成功重启。
对于使用非CDB 模式连接到 Oracle 的连接器,您可以启用 heartbeat.interval.ms
来强制连接器定期发出 heartbeat 事件,以便偏移保持同步。
对于使用 CDB 模式连接到 Oracle 的连接器,维护同步更为复杂。不仅必须设置 heartbeat.interval.ms
,还需要设置 heartbeat.action.query
。需要指定这两个属性,因为在 CDB 模式中,连接器专门用于跟踪 PDB 中的更改。需要在可插拔数据库中触发更改事件所需的补充机制。定期,心跳操作查询会导致连接器插入新表行,或更新可插拔数据库中的现有行。Debezium 检测到表更改并为它们发出更改事件,确保偏移保持同步,即使在进程不经常更改的可插拔数据库中也是如此。
要使连接器使用 heartbeat.action.query
以及不是 连接器用户帐户 的表,您必须授予连接器用户权限才能在这些表上运行必要的 INSERT
或 UPDATE
查询。
2.5.2. Debezium Oracle 连接器数据更改事件的描述
Oracle 连接器发出的每个数据更改事件都有一个键和值。键和值的结构取决于更改事件源自的表。有关 Debezium 构造主题名称的详情,请参考 主题名称。
Debezium Oracle 连接器确保所有 Kafka Connect 模式名称都 是有效的 Avro 模式名称。这意味着,逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])开头,而逻辑服务器名称和模式名称和表名称中的所有字符必须是字母数字字符或下划线([a-z,A-Z,0-9,\_])。连接器会自动将无效的字符替换为下划线字符。
当多个逻辑服务器名称、模式名称或表名称不是有效的字符,且这些字符被替换为下划线时,意外命名冲突可能会导致。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间变化,主题消费者可能很难处理。为便于处理可变事件结构,Kafka Connect 中的每个事件都是自包含的。每个消息键和值有两个部分:schema 和 payload。模式描述了有效负载的结构,而有效负载包含实际数据。
连接器不会捕获由 SYS
或 SYSTEM
用户帐户执行的更改。
以下主题包含有关数据更改事件的更多详细信息:
2.5.2.1. 关于 Debezium Oracle 连接器更改事件中的键
对于每个更改的表,更改事件键的结构,以便在创建事件时,表的主键(或唯一键约束)中存在每个列的字段。
例如,在 inventory
数据库 schema 中定义的 customers
表可能有以下更改事件键:
CREATE TABLE customers ( id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY, first_name VARCHAR2(255) NOT NULL, last_name VARCHAR2(255) NOT NULL, email VARCHAR2(255) NOT NULL UNIQUE );
如果 < topic.prefix
>.transaction
配置属性的值被设置为 server1
,则数据库中 customers
表中发生的每个更改事件的 JSON 表示具有以下关键结构:
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" } ], "optional": false, "name": "server1.INVENTORY.CUSTOMERS.Key" }, "payload": { "ID": 1004 } }
密钥的 schema
部分包含一个 Kafka Connect 模式,用于描述密钥部分的内容。在前面的示例中,有效负载
值不是可选的,结构由名为 server1.DEBEZIUM.CUSTOMERS.Key
的 schema 定义,它有一个类型为 int32
的必需字段 id
。键的 payload
字段的值表示它实际上是一个带有 id
字段的结构(在 JSON 中,是一个对象),其值为 1004
。
因此,您可以将这个键解释为 inventory.customers
表中的行(来自名为 server1
的连接器),其 id
主键列的值为 1004
。
2.5.2.2. 关于 Debezium Oracle 连接器更改事件中的值
更改事件消息中的值结构反映了消息中 change 事件中消息键 的结构,并且包含 schema 部分和 payload 部分。
更改事件值的有效负载
更改事件值的有效数据部分中有一个 envelope 数据结构,它包含以下字段:
op
-
包含描述操作类型的字符串值的必填字段。Oracle 连接器更改事件值有效负载中的
op
字段包含以下值之一:c
(创建或插入)、u
(update)、d
(删除)或r
(读取,表示快照)。 before
-
存在的可选字段(如果存在)描述了事件 发生前 行的状态。该结构由
server1.INVENTORY.CUSTOMERS.Value
Kafka Connect 模式描述,server1
连接器用于inventory.customers
表中的所有行。
after
-
存在的可选字段(如果存在)包含 更改后 行的状态。该结构由用于
before
字段的同一server1.INVENTORY.CUSTOMERS.Value
Kafka Connect schema 描述。 source
包含描述事件源元数据的结构的必填字段。对于 Oracle 连接器,结构包括以下字段:
- Debezium 版本。
- 连接器名称。
- 事件是否是持续快照的一部分。
- 事务 ID (未包含快照)。
- 与 SCN (系统更改号)相关的以下值,数据库在提交更改时分配:
| 数据库用来跟踪事务的唯一标识符。 |
| 当事务启动时,SCN。 |
| 事务启动时的时间。 |
| 事务提交的时间。
|
| 以毫秒为单位提供时间戳。 |
| 以微秒为单位提供时间戳。 |
| 提供以纳秒为单位的时间戳。
|
| 可选字段,如果存在,包含运行 Kafka Connect 任务的 JVM 中的时间(基于系统时钟),该字段处理事件。 |
更改事件值的 schema
事件消息值的 schema 部分包含一个 schema,它描述了有效负载的 envelope 结构及其中的嵌套字段。
有关更改事件值的更多信息,请参阅以下主题:
创建 事件
以下示例显示了 customers
表中 create 事件值的值,如 更改事件键 示例所述:
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "int64", "optional": true, "field": "ts_us" }, { "type": "int64", "optional": true, "field": "ts_ns" }, { "type": "string", "optional": true, "field": "txId" }, { "type": "string", "optional": true, "field": "scn" }, { "type": "string", "optional": true, "field": "commit_scn" }, { "type": "string", "optional": true, "field": "rs_id" }, { "type": "int64", "optional": true, "field": "ssn" }, { "type": "int32", "optional": true, "field": "redo_thread" }, { "type": "string", "optional": true, "field": "user_name" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "string", "optional": true, "field": "row_id" } ], "optional": false, "name": "io.debezium.connector.oracle.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "int64", "optional": true, "field": "ts_us" }, { "type": "int64", "optional": true, "field": "ts_ns" } ], "optional": false, "name": "server1.DEBEZIUM.CUSTOMERS.Envelope" }, "payload": { "before": null, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "source": { "version": "3.0.8.Final", "name": "server1", "ts_ms": 1520085154000, "ts_us": 1520085154000000, "ts_ns": 1520085154000000000, "txId": "6.28.807", "scn": "2122185", "commit_scn": "2122185", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false, "row_id": "AAASgjAAMAAAACnAAA" }, "op": "c", "ts_ms": 1532592105975, "ts_us": 1532592105975741, "ts_ns": 1532592105975741582 } }
在前面的示例中,注意事件如何定义以下模式:
-
信封 (
server1.DEBEZIUM.CUSTOMERS.Envelope
)。 -
源
结构(io.debezium.connector.oracle.Source
,它特定于 Oracle 连接器并在所有事件间重复使用)。 -
before
和after
字段的特定于表的模式。
before
和 after
字段的 schema 的名称的格式为 <logicalName>.<schemaName>.<tableName>.Value
, 因此完全独立与所有其他表的 schema。因此,当您使用 Avro converter 时,每个逻辑源中的表的 Avro 模式都有自己的演变和历史记录。
此事件的 value 的 payload
部分提供有关事件的信息。它描述了创建了行(op=c
),并显示 after
字段值包含插入到 ID
、FIRST_NAME
、LAST_NAME
和 EMAIL
列中的值。
默认情况下,事件的 JSON 表示大于描述的行。较大的大小是因为 JSON 表示,包括消息的 schema 和 payload 部分。您可以使用 Avro Converter 减少连接器写入 Kafka 主题的消息大小。
更新 事件
以下示例显示了一个 update 更改事件,连接器从与以前的 create 事件相同的表中捕获。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "source": { "version": "3.0.8.Final", "name": "server1", "ts_ms": 1520085811000, "ts_us": 1520085811000000, "ts_ns": 1520085811000000000, "txId": "6.9.809", "scn": "2125544", "commit_scn": "2125544", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false, "row_id": "AAASgjAAMAAAACnAAA" }, "op": "u", "ts_ms": 1532592713485, "ts_us": 1532592713485152, "ts_ns": 1532592713485152954, } }
有效负载的结构与 create (insert)事件有效负载相同,但以下值有所不同:
-
op
字段的值为u
,表示此行因为更新而改变。 -
before
字段显示行的前一状态,以及更新
数据库提交前存在的值。 -
after
字段显示行的更新状态,EMAIL
值现在设置为anne@example.com
。 -
source
字段的结构包含与之前相同的字段,但值不同,因为连接器从红色日志的不同位置捕获事件。 -
ts_ms
字段显示显示 Debezium 处理事件的时间戳。
payload
部分显示一些其他有用的信息。例如,通过比较结构的 before
和 after
结构,我们可以确定在提交后如何更改行。源
结构提供有关 Oracle 记录的信息,提供可追溯性。它还让我们了解此事件何时发生此主题中的其他事件和其他主题。它是否发生在与另一个事件相同的提交之前、之后还是作为其他事件的一部分?
当更新行 primary/unique 键的列时,行的键值会改变。因此,Debebe 在更新后发出三个 事件:
-
DELETE
事件。 - 一个 tombstone 事件,带有行的旧键。
-
为行提供新密钥的
INSERT
事件。
删除 事件
以下示例显示了上一次 create 和 update 事件示例中显示的表的 delete 事件。delete 事件的 schema
部分与这些事件的 schema
部分相同。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "after": null, "source": { "version": "3.0.8.Final", "name": "server1", "ts_ms": 1520085153000, "ts_us": 1520085153000000, "ts_ns": 1520085153000000000, "txId": "6.28.807", "scn": "2122184", "commit_scn": "2122184", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false, "row_id": "AAASgjAAMAAAACnAAA" }, "op": "d", "ts_ms": 1532592105960, "ts_us": 1532592105960854, "ts_ns": 1532592105960854693 } }
与 create 或 update 事件相比,事件的 payload
部分显示了几个不同之处:
-
op
字段的值为d
,表示行已被删除。 -
before
字段显示与数据库提交删除的行前状态。 -
after
字段的值为null
,表示行不再存在。 -
source
字段的结构中包括了多个在 create 或 update 事件中存在的键, 但ts_ms
,scn
, 和txId
中的值不同。 -
ts_ms
显示一个时间戳,指示 Debezium 处理此事件的时间。
delete 事件为用户提供了处理删除此行所需的信息。
Oracle 连接器的事件设计为与 Kafka 日志压缩 一起使用,这允许删除一些旧的信息,只要保留每个密钥的最新消息。这允许 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
删除行时,上例中显示的 delete 事件值仍可用于日志压缩,因为 Kafka 能够删除使用同一键的所有之前信息。message 值必须设置为 null
,以指示 Kafka 删除共享同一键的所有消息。为了实现此目的,默认情况下 Debezium 的 Oracle 连接器总是遵循一个 delete 事件,它有一个特殊的 tombstone 事件,它具有相同的键但 null
值。您可以通过设置连接器属性 tombstones.on.delete
来改变默认的行为。
截断 事件
truncate 更改事件信号,提示表已被截断。message 键在本例中是 null
,消息值类似如下:
{ "schema": { ... }, "payload": { "before": null, "after": null, "source": { 1 "version": "3.0.8.Final", "connector": "oracle", "name": "oracle_server", "ts_ms": 1638974535000, "ts_us": 1638974535000000, "ts_ns": 1638974535000000000, "snapshot": "false", "db": "ORCLPDB1", "sequence": null, "schema": "DEBEZIUM", "table": "TEST_TABLE", "txId": "02000a0037030000", "scn": "13234397", "commit_scn": "13271102", "lcr_position": null, "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user" }, "op": "t", 2 "ts_ms": 1638974558961, 3 "ts_us": 1638974558961987, 4 "ts_ns": 1638974558961987251, 5 "transaction": null } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
描述事件源元数据的强制字段。在 truncate 事件值中,
|
2 |
|
描述操作类型的强制字符串。 |
3 |
|
显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。
|
如果单个 TRUNCATE
操作影响多个表,连接器会为每个 截断 表发出一个更改事件记录。
truncate 事件表示对整个表所做的更改,没有消息键。因此,对于具有多个分区的主题,对更改事件没有排序保证(创建、更新 等),或 截断 与表相关的事件。例如,如果消费者从多个分区读取表的事件,它可能会在收到 截断 事件后从一个分区接收一个表 的更新 事件,该事件会从另一个分区中删除表中的所有数据。排序只针对使用单个分区的主题保证。
如果您不希望连接器捕获 截断 事件,请使用 skipped.operations
选项过滤它们。
2.5.3. Debezium Oracle 连接器如何映射数据类型
当 Debezium Oracle 连接器检测到表行中的值更改时,它会发出一个代表更改的事件。每个更改事件记录的结构与原始表相同,事件记录包含每个列值的字段。表列的数据类型决定了连接器如何代表更改事件字段中的列值,如以下部分所示。
对于表中的每个列,Debebe 将源数据类型映射到 字面类型,在某些情况下,一个 语义类型,在对应的 event 字段中。
- 字面类型
-
描述如何按字面表示值,使用以下 Kafka Connect 模式类型之一:
INT8
,INT16
,INT32
,INT64
, INT64 ,FLOAT32
,FLOAT64
,BOOLEAN
,STRING
,BYTES
,ARRAY
,MAP
, 和STRUCT
. - 语义类型
- 描述 Kafka Connect 模式如何使用字段的名称捕获字段 的含义。
如果默认数据类型转换不满足您的需要,您可以为连接器 创建自定义转换器。
对于某些 Oracle 大对象(CLOB、NCLOB 和 BLOB)和数字数据类型,您可以通过更改默认配置属性设置来操作连接器执行类型映射的方式。有关 Debezium 属性控制这些数据类型的映射的更多信息,请参阅 Binary 和 Character LOB 类型和 Numeric 类型。
如需有关 Debezium 连接器如何映射 Oracle 数据类型的更多信息,请参阅以下主题:
字符类型
下表描述了连接器如何映射基本字符类型。
Oracle 数据类型 | 字面类型(schema 类型) | 语义类型(schema 名称)和备注 |
---|---|---|
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
二进制和 Character LOB 类型
下表描述了连接器如何映射二进制和字符大对象(LOB)数据类型。
Oracle 数据类型 | 字面类型(schema 类型) | 语义类型(schema 名称)和备注 |
---|---|---|
| 不适用 | 不支持这个数据类型 |
|
|
根据连接器配置中
|
|
| 不适用 |
| 不适用 | 不支持这个数据类型。 |
| 不适用 | 不支持这个数据类型。 |
|
| 不适用 |
| 不适用 |
根据连接器配置中
|
如果 Oracle 在 SQL 语句中明确设置了或更改,Oracle 仅为 CLOB
、NCLOB
和 BLOB
数据类型提供列值。因此,更改事件永远不会包含未更改的 CLOB
、NCLOB
或 BLOB
列的值。相反,它们包含连接器属性 unavailable.value.placeholder
定义的占位符。
如果 CLOB
、NCLOB
或 BLOB
列的值已更新,则新值将放置在相应更新更改事件的 after
项中。before
元素包含不可用的值占位符。
数字类型
下表描述了 Debezium Oracle 连接器如何映射数字类型。
您可以改变连接器映射 Oracle DECIMAL
, NUMBER
, NUMERIC
, 和 REAL
数据类型的方式,方法是修改连接器的 decimal.handling.mode
配置属性的值。当将属性设置为 precise
的默认值时,连接器会将这些 Oracle 数据类型映射到 Kafka Connect org.apache.kafka.connect.data.Decimal
逻辑类型,如表中所示。当将 属性的值设置为 双引号
或 字符串
时,连接器会对某些 Oracle 数据类型使用备用映射。如需更多信息,请参阅 下表中的 Semantic 类型和备注 列。
Oracle 数据类型 | 字面类型(schema 类型) | 语义类型(schema 名称)和备注 |
---|---|---|
|
| 不适用 |
|
| 不适用 |
|
|
当
当将 |
|
|
|
|
|
|
|
|
|
|
|
当将
当将 |
|
|
当将
当将 |
|
|
|
|
|
当
当将 |
|
|
|
|
|
当将
当将 |
如前文所述,Oracle 允许 NUMBER
类型中的负扩展。当数字表示为 Decimal
时,这可能会导致转换为 Avro 格式的问题。十进制
类型包括缩放信息,但 Avro 规格 只允许规模的正数值。根据使用的 schema registry,可能会导致 Avro serialization 失败。要避免这个问题,您可以使用 NumberToZeroScaleConverter
,它将带有负精度(小数点左面)的高的数字 (P - S >= 19) 转换为小数点右面零位的 Decimal
类型。它可以配置如下:
converters=zero_scale zero_scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter zero_scale.decimal.mode=precise
默认情况下,数字会被转换为 Decimal
类型(zero_scale.decimal.mode=precise
),但为了保证完全支持两种类型(双
和 字符串
)也被支持。
布尔值类型
Oracle 不提供对 BOOLEAN
数据类型的原生支持。但是,通常使用带有特定语义的其他数据类型来模拟逻辑 BOOLEAN
数据类型的概念。
为了允许您将源列转换为布尔值数据类型,Debebe 提供了一个 NumberOneTo Boolean Converter
自定义转换器,您可使用以下方法之一使用:
-
将所有
NUMBER (1)
列映射到BOOLEAN
类型。 使用以逗号分隔的正则表达式列表枚举列的子集。
要使用这种类型的转换,您必须使用selector
参数设置转换器
配置属性,如下例所示:converters=boolean boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter boolean.selector=.*MYTABLE.FLAG,.*.IS_ARCHIVED
临时类型
除了 Oracle INTERVAL
,TIMESTAMP WITH TIME ZONE
, 和 TIMESTAMP WITH LOCAL TIME ZONE
数据类型外,连接器转换时序类型的方式取决于 time.precision.mode
配置属性的值。
当 time.precision.mode
配置属性设置为 adaptive
(默认值),那么连接器会根据列的数据类型确定 temporal 类型的字面和语义类型,以便事件 准确 表示数据库中的值:
Oracle 数据类型 | 字面类型(schema 类型) | 语义类型(schema 名称)和备注 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
当 time.precision.mode
配置属性设置为 connect
时,连接器会使用预定义的 Kafka Connect 逻辑类型。当消费者只了解内置 Kafka Connect 逻辑类型且无法处理变量调整时间值时,这很有用。由于 Oracle 支持的精度级别超过 Kafka Connect 支持中的逻辑类型,如果将 time.precision.mode
设置为 connect
,当数据库列的 fractional second precision 值大于 2 时,会出现丢失精度的结果:
Oracle 数据类型 | 字面类型(schema 类型) | 语义类型(schema 名称)和备注 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ROWID 类型
下表描述了连接器如何映射 ROWID (托管地址)数据类型。
Oracle 数据类型 | 字面类型(schema 类型) | 语义类型(schema 名称)和备注 |
---|---|---|
|
| 使用 Oracle XStream 时不支持此数据类型。 |
| 不适用 | 不支持此数据类型。 |
将 XMLTYPE
与 Debezium Oracle 连接器一起使用只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅 https://access.redhat.com/support/offerings/techpreview。
下表描述了连接器如何映射 XMLTYPE 数据类型。
Oracle 数据类型 | 字面类型(schema 类型) | 语义类型(schema 名称)和备注 |
---|---|---|
|
|
|
用户定义的类型
Oracle 允许您定义自定义数据类型,以便在内置数据类型不符合您的要求时提供灵活性。有几个用户定义的类型,如对象类型、REF 数据类型、Varrays 和 Nested Tables。目前,您不能将 Debezium Oracle 连接器用于任何用户定义的类型。
Oracle 提供的类型
Oracle 提供基于 SQL 的接口,可用于在内置或 ANSI 支持的类型不足时定义新的类型。Oracle 提供多种常用的数据类型来满足各种目的,如 Any 或 Spatial 类型。目前,您不能将 Debezium Oracle 连接器用于任何这些数据类型。
默认值
如果为数据库模式中的列指定默认值,Oracle 连接器会尝试将此值传播到对应的 Kafka 记录字段的 schema。最常见的数据类型包括:
-
字符类型(
CHAR
、NCHAR
、
2、VARCHAR
、VARCHARNVARCHAR
2、NVARCHAR2
) -
数字类型(
inTEGER
、NUMERIC
等) -
时序类型(
DATE
、TIMESTAMP
、INTERVAL
等等)
如果 temporal 类型使用 TO_TIMESTAMP
或 TO_DATE
等函数调用来代表默认值,则连接器将通过生成额外的数据库调用来评估该函数来解析默认值。例如,如果使用默认值 TO_
定义 DATE 列,则列的默认值将是该日期的 UNIX epoch 或 DATE
('2021-01-02', 'YYYY-MM-DD')18629
开始的天数。
如果 temporal 类型使用 SYSDATE
常代表默认值,则连接器会根据列为 NOT NULL
或 NULL
来解决此问题。如果列可为空,则不会设置默认值;但是,如果列不可为空,则默认值将解析为 0
(用于 DATE
或 TIMESTAMP(n)
数据类型)或 1970-01-01T00:00:00Z
(用于 TIMESTAMP WITH TIME ZONE
或 TIMESTAMP WITH LOCAL TIME ZONE
数据类型)。默认值为数字,除非列是 TIMESTAMP WITH TIME ZONE
或 TIMESTAMP WITH LOCAL TIME ZONE
,在这种情况下,其作为字符串发出。
自定义转换器
默认情况下,Debezium Oracle 连接器提供多个特定于 Oracle 数据类型的 CustomConverter
实现。这些自定义转换器根据连接器配置为特定数据类型提供替代映射。要在连接器中添加 CustomConverter
,请按照 Custom Converters 文档中的 说明进行操作。
Debezium Oracle 连接器提供以下自定义转换器:
NUMBER (1)
到布尔值
从版本 23 开始,Oracle 数据库提供 BOOLEAN
逻辑数据类型。在早期版本中,数据库使用 NUMBER (1)
数据类型来模拟 BOOLEAN
类型,使用值 0
代表 false,或值 1
代表 true。
默认情况下,当 Debezium 为使用 NUMBER (1)
数据类型的源列发出更改事件时,它会将数据转换为 INT8
字面类型。如果 NUMBER (1)
数据类型的默认映射不满足您的需要,您可以将连接器配置为在通过配置 NumberOneToBooleanConverter
来发送这些列时使用逻辑 BOOL
类型,如下例所示:
示例: NumberOneToBooleanConverter
配置
converters=number-to-boolean number-to-boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter number-to-boolean.selector=.*.MY_TABLE.DATA
在前面的示例中,selector
属性是可选的。selector
属性指定转换器应用到的表或列的正则表达式。如果省略 selector
属性,当 Debezium 发出一个事件时,带有 NUMBER (1)
数据类型的每个列都会转换为使用逻辑 BOOL
类型的字段。
NUMBER
To Zero Scale
Oracle 支持 创建基于
带有负缩放的 NUMBER 列,即 NUMBER (-2)。
并非所有系统都可以处理负缩放值,因此这些值可能会导致管道中处理问题。例如,因为 Apache Avro 不支持这些值,因此如果 Debezium 将事件转换为 Avro 格式,则可能会出现问题。同样,不支持这些值的下游用户也会遇到错误。
配置示例
converters=number-zero-scale number-zero-scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter number-zero-scale.decimal.mode=precise
在前面的示例中,decimal.mode
属性指定连接器如何发送十进制值。此属性是可选的。如果省略 decimal.mode
属性,则转换器默认使用 PRECISE
十进制处理模式。
RAW
到字符串
虽然 Oracle 建议使用某些数据类型,如 RAW
,但旧系统可能会继续使用此类类型。默认情况下,Debebe 将 RAW
列类型作为逻辑 BYTES
发送,这是一种类型,用于启用二进制或基于文本的数据存储。
在某些情况下,RAW
列可能会将字符数据存储为一系列字节。要协助消费者使用,您可以将 Debezium 配置为使用 RawToStringConverter
。RawToStringConverter
提供了一种方式,可以轻松地以此类 RAW
列为目标,并以字符串的形式发送值,而不是字节。以下示例演示了如何将 RawToStringConverter
添加到连接器配置中:
示例: RawToStringConverter
配置
converters=raw-to-string raw-to-string.type=io.debezium.connector.oracle.converters.RawToStringConverter raw-to-string.selector=.*.MY_TABLE.DATA
在前面的示例中,选择器
属性允许您定义一个正则表达式,以指定转换器进程的表或列。如果省略 selector
属性,则转换器将所有 RAW
列类型映射到逻辑 STRING
字段类型。
2.5.4. 设置 Oracle 以使用 Debezium
设置 Oracle 以用于 Debezium Oracle 连接器需要执行下列步骤。这些步骤假定将多租户配置与容器数据库一起使用,以及至少一个可插拔数据库。如果您不打算使用多租户配置,可能需要调整以下步骤。
有关设置用于 Debezium 连接器的 Oracle 的详情,请参考以下部分:
- 第 2.5.4.1 节 “Debezium Oracle 连接器与 Oracle 安装类型的兼容性”
- 第 2.5.4.2 节 “在捕获更改事件时 Debezium Oracle 连接器不包括的 schema”
- 第 2.5.4.4 节 “准备 Oracle 数据库以用于 Debezium”
- 第 2.5.4.5 节 “调整 Oracle redo 日志的大小以容纳数据字典”
- 第 2.5.4.7 节 “为 Debezium Oracle 连接器创建 Oracle 用户”
- 第 2.5.4.8 节 “使用 Oracle 待机数据库运行连接器”
- 第 2.5.8 节 “在 Debezium 中使用 Oracle XStream 数据库(开发者预览)”
2.5.4.1. Debezium Oracle 连接器与 Oracle 安装类型的兼容性
Oracle 数据库可以作为独立实例安装,也可以使用 Oracle Real Application Cluster (RAC)。Debezium Oracle 连接器与两种类型的安装兼容。
2.5.4.2. 在捕获更改事件时 Debezium Oracle 连接器不包括的 schema
当 Debezium Oracle 连接器捕获表时,它会自动从以下模式中排除表:
-
appqossys
-
audsys
-
ctxsys
-
dvsys
-
dbsfwuser
-
dbsnmp
-
qsmadmin_internal
-
lbacsys
-
mdsys
-
ojvmsys
-
olapsys
-
orddata
-
ordsys
-
outln
-
sys
-
system
-
vecsys
(Oracle 23+) -
wmsys
-
xdb
要启用连接器从表中捕获更改,该表必须使用在前面的列表中未命名的模式。
2.5.4.3. 在捕获更改事件时 Debezium Oracle 连接器排除的表
当 Debezium Oracle 连接器捕获表时,它会自动排除与以下规则匹配的表:
-
与
CMP[3|4hmac[0-9]+
匹配的压缩顾问表。 -
与
SYS_IOT_OVER_%
模式匹配的 index-organized 表。 -
与模式
MDRT_%
、MDRS_%
或MDXT_%
匹配的 spatial 表。 - 嵌套表
要让连接器捕获名称与上述规则匹配的表,您必须重命名该表。
2.5.4.4. 准备 Oracle 数据库以用于 Debezium
Oracle LogMiner 所需的配置
ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog CONNECT sys/top_secret AS SYSDBA alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate startup mount alter database archivelog; alter database open; -- Should now "Database log mode: Archive Mode" archive log list exit;
Oracle AWS RDS 不允许执行上述命令,也允许您以 sysdba 身份登录。AWS 提供了这些替代命令来配置 LogMiner。在执行这些命令前,请确保为备份启用了 Oracle AWS RDS 实例。
要确认 Oracle 启用了备份,请首先执行以下命令。LOG_MODE 应使用 ARCHIVELOG。如果没有,您可能需要重启 Oracle AWS RDS 实例。
Oracle AWS RDS LogMiner 所需的配置
SQL> SELECT LOG_MODE FROM V$DATABASE; LOG_MODE ------------ ARCHIVELOG
当 LOG_MODE 设置为 ARCHIVELOG 后,执行命令以完成 LogMiner 配置。第一个命令将数据库设置为 archivelogs,第二个命令添加了 supplemental 日志记录。
Oracle AWS RDS LogMiner 所需的配置
exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours',24); exec rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD');
要让 Debezium 捕获更改数据库行之前的状态,还必须为捕获的表或整个数据库启用附件日志记录。以下示例演示了如何为单个 inventory.customers
表中的所有列配置补充日志记录。
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
为所有表列启用附加日志记录会增加 Oracle redo 日志的卷。要防止日志大小过度增长,请有选择地应用前面的配置。
在数据库级别上必须启用最少的附件日志记录,并可以配置如下:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
2.5.4.5. 调整 Oracle redo 日志的大小以容纳数据字典
根据数据库配置,恢复日志的大小和数量可能不足以达到可接受的性能。在设置 Debezium Oracle 连接器前,请确保 redo 日志的容量足以支持数据库。
数据库恢复日志的容量必须足以存储其数据字典。通常,数据字典的大小会随着数据库中表和列的数量而增加。如果红色日志缺少足够容量,数据库和 Debezium 连接器都可能会遇到性能问题。
请咨询您的数据库管理员,以评估数据库可能需要增加日志容量。
2.5.4.6. 指定 Debezium Oracle 连接器使用的归档日志目的地
Oracle 数据库管理员可以为存档日志配置最多 31 个不同的目的地。管理员可以为每个目的地设置参数,以为特定用途指定它,例如,记录物理待机日志或外部存储以允许扩展日志保留。Oracle 在 V$ARCHIVE_DEST_STATUS
视图中报告有关归档日志目的地的详细信息。
Debezium Oracle 连接器只使用状态为 VALID
和 LOCAL
的目的地。如果您的 Oracle 环境包含多个满足该条件的目的地,请咨询您的 Oracle 管理员以确定应该使用哪个归档日志目的地。
流程
-
要指定要使用的存档日志目的地,请在连接器配置中设置
archive.destination.name
属性。
例如,假设数据库配置了两个归档目标路径 /path/one 和
,并且/path
/twoV$ARCHIVE_DEST_STATUS
表将这些路径与列DEST_NAME
中指定的目标名称相关联。如果两个目的地都满足 Debezium swig-two 的标准,则其状态为
VALID
,并且其类型是
LOCAL
HEKETI-busybox,将连接器配置为使用数据库写入/path/two
的归档日志,将archive.destination.name
的值设置为DEST_NAME
列中的值,该列中的/path/two
与V$ARCHIVE_DEST_STATUS
表中关联的值。例如,如果DEST_NAME
为LOG_ARCHIVE_DEST_3
用于/path/two
,您可以将 Debezium 配置为:
{ "archive.destination.name": "LOG_ARCHIVE_DEST_3" }
不要将 archive.destination.name
的值设置为数据库用于归档日志的路径。将 属性设置为满足归档日志保留策略的 V$ARCHIVE_DEST_STAT_STATUS 表中一行的
列中的归档日志目的地名称。
DEST_
NAME
如果您的 Oracle 环境包含多个满足该条件的目的地,并且您无法指定首选目的地,Debezium Oracle 连接器会随机选择目的地路径。因为为每个目的地配置的保留策略可能会有所不同,因此如果连接器选择从中删除请求的日志数据的路径,这可能会导致错误。
2.5.4.7. 为 Debezium Oracle 连接器创建 Oracle 用户
要使 Debezium Oracle 连接器捕获更改事件,它必须以具有特定权限的 Oracle LogMiner 用户身份运行。以下示例显示了用于在多租户数据库模型中为连接器创建 Oracle 用户帐户的 SQL。
连接器捕获其自身 Oracle 用户帐户所做的数据库更改。但是,它不会捕获 SYS
或 SYSTEM
用户帐户所做的更改。
创建连接器的 LogMiner 用户
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL; GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; 1 GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; 2 GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; 3 GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; 4 GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; 5 GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 6 GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 7 GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; 8 GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; 9 GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; 10 GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; 11 GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; 12 GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; 13 GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; 14 GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; 15 GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; 16 GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; 17 GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; 18 GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; 19 GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; 20 GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; 21 GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL; 22 GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL; 23 GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL; 24 GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL; 25 exit;
项 | 角色名称 | 描述 |
---|---|---|
1 | 创建会话 | 启用连接器连接到 Oracle。 |
2 | 设置容器 | 启用连接器在可插拔数据库间切换。只有在 Oracle 安装启用了容器数据库支持(CDB)时才需要这样做。 |
3 | SELECT ON V_$DATABASE |
启用连接器读取 |
4 | FLASHBACK 任何表 |
启用连接器执行 Flashback 查询,这是连接器如何执行数据的初始快照。另外,您还可以为所有表授予 |
5 | 选择任何表 |
启用连接器读取任何表。另外,您还可以为所有表授予 |
6 | SELECT_CATALOG_ROLE | 启用连接器读取数据字典,这是 Oracle LogMiner 会话所需的。 |
7 | EXECUTE_CATALOG_ROLE | 启用连接器将数据字典写入 Oracle redo 日志中,这是跟踪 schema 更改所必需的。 |
8 | 选择任何事务 |
启用快照进程,以针对任何事务执行 Flashback 快照查询。当授予 |
9 | LOGMINING | 在较新版本的 Oracle 中添加了此角色,作为授予 Oracle LogMiner 及其软件包的完整访问权限的方法。在没有此角色的较早版本的 Oracle 上,您可以忽略此授权。 |
10 | 创建表 | 启用连接器在其默认表空间中创建其 flush 表。flush 表允许连接器明确控制 LGWR 内部缓冲区刷新到磁盘。 |
11 | 锁定任何表 | 启用连接器在模式快照期间锁定表。如果通过配置明确禁用了快照锁定,则可以安全地忽略这个授权。 |
12 | 创建序列 | 启用连接器在默认表空间中创建序列。 |
13 | EXECUTE ON DBMS_LOGMNR |
启用连接器在 |
14 | EXECUTE ON DBMS_LOGMNR_D |
启用连接器在 |
15 到 25 | SELECT ON V_$…. | 启用连接器来读取这些表。连接器必须能够读取 Oracle redo 和 archive 日志的信息,以及当前的事务状态,才能准备 Oracle LogMiner 会话。如果没有这些授权,连接器无法操作。 |
2.5.4.8. 使用 Oracle 待机数据库运行连接器
备用数据库提供主实例的同步副本。如果出现主数据库故障,备用数据库提供连续可用性和灾难恢复。Oracle 同时使用物理和逻辑备用数据库。
物理待机
物理待机是主生产数据库的确切块副本,其系统更改号(SCN)值与主数据库的值相同。Debezium Oracle 连接器无法直接从物理待机数据库捕获更改事件,因为物理待机不接受外部连接。连接器只能在将待机转换为主数据库后从物理待机中捕获事件。然后,连接器会连接到以前的待机,就像是任何主数据库一样。
逻辑待机
逻辑待机包含与主数据相同的逻辑数据,但数据可能会以不同的物理方式存储。逻辑待机中的 SCN 偏移与主数据库中的偏移量不同。您可以将 Debezium Oracle 连接器配置为从逻辑待机数据库捕获更改。
2.5.4.8.1. 从 Oracle 故障转移数据库捕获数据
当您设置故障转移数据库时,通常最好使用物理备用数据库而不是逻辑待机数据库。物理待机与主数据库保持一致状态,而不是逻辑待机。物理待机包含主数据的确切副本,并且待机系统的更改号(SCN)值与主数据的不同。在 Debezium 环境中,数据库故障转移到物理待机后,存在一致的 SCN 值可确保连接器可以找到最后处理的 SCN 值。
物理待机以只读模式锁定,运行受管恢复以维护同步。当数据库处于待机模式时,它不接受来自客户端的外部 JDBC 连接,并且外部应用无法访问它。
在失败事件后,要允许 Debezium 连接到以前的物理待机,DBA 必须执行几个操作来启用故障转移到待机,并将它提升为主数据库。以下列表标识了一些关键操作:
- 在待机上取消受管恢复。
- 完成活动的恢复过程。
- 将待机转换为主要角色。
- 打开客户端读写操作的新主要内容。
在以前的物理待机可用后,您可以将 Debezium Oracle 连接器配置为连接它。要让连接器从新的主设备捕获,请在连接器配置中编辑数据库主机名,将原始主的主机名替换为新主主机名。
2.5.4.8.2. 配置 Debezium Oracle 连接器以从逻辑待机中捕获事件
当 Oracle 的 Debezium 连接器连接到主数据库时,它使用内部清除表来管理 Oracle Log Writer Buffer (LGWR)进程的清除周期。flush 进程要求用户帐户访问数据库具有创建和写入此清除表的权限。但是,逻辑独立数据库通常允许只读访问,从而导致连接器写入数据库。您可以修改连接器配置,使连接器可以从逻辑待机捕获事件,或者 DBA 可以创建一个新的可写表空间,连接器可以在其中存储 flush 表。
Debezium Oracle 连接器从只读逻辑待机数据库中尽力更改是开发者预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。此软件可能没有任何文档,可以随时更改或删除,并且已获得有限的测试。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。
有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围。
流程
要启用 Debezium 从 Oracle 只读逻辑待机数据库中捕获事件,请在连接器配置中添加以下属性,以禁用清除表的创建和管理:
internal.log.mining.read.only=true
上述设置可防止数据库创建和更新
LOG_MINING_FLUSH
表。您可以将internal.log.mining.read.only
属性与 Oracle Standalone 数据库一起使用,或者与 Oracle RAC 安装一起使用。
扩展最大字符串大小
数据库参数 max_string_size
控制 Oracle 数据库以及扩展 Debezium 如何解释 VARCHAR2
、NVARCHAR2
和 RAW
字段的值。默认 STANDARD
意味着这些数据类型的长度与 Oracle 12c 之前的发行版本(对于 VARCHAR2
和 NVARCHAR2
和 2000 字节)具有相同的限制。当配置为
EXTENDED
时,这些列现在允许存储最多 32767 字节的数据。
虽然数据库管理员可以将 max_string_size
从 STANDARD
更改为 EXTENDED
,但不允许相反。数据库更新至 EXTENDED
字符串支持后,该字符串无法撤消。
对于 Debezium Oracle 连接器,当数据库参数 max_string_size
为 EXTENDED
时,应该将 lob.enabled
连接器配置选项设为 true
,以捕获对 VARCHAR2
和 NVARCHAR2
字段的更改,其长度超过 4000 字节或 RAW
字段,且超过 2000 字节。
当设置为 EXTENDED
时,当字符串数据的字节长度超过旧最大值时,Oracle 会执行字符数据的隐式转换。这种隐式转换意味着 Oracle 内部将字符串数据视为 CLOB
,因此您可以获得将字段视为外部世界的常规字符串的好处,但所有缺陷以及有关数据库层级别的存储有疑问。
由于 Oracle 在内部将这些字符串视为 CLOB
,因此红色日志也会反映 Debezium Oracle 连接器需要注意的一些唯一操作类型,它应该 mine。由于这些操作类型与 CLOB
操作非常相似,因此无论字符串数据的字节长度如何,必须以与其它 LOB 类型相同的方式从红色日志捕获更改。
当 Oracle 配置为使用 EXTENDED 字符串大小时,当 LogMiner 为扩展字符串字段重新构建 SQL 时,LogMiner 有时无法转义单引号字符('
)。如果扩展字符串字段的字节长度不超过旧的最大长度,则可能会出现此问题。因此,这些字段中的值可以被截断,从而导致 Oracle 连接器无法解析无效 SQL 语句。
为了帮助解决某些问题实例,您可以通过将以下属性设置为 true
将连接器配置为 relax 单引号检测:
internal.log.mining.sql.relaxed.quote.detection
如需更多信息,请参阅 Red Hat Integration 3.0.8 的发行注记。
2.5.5. 部署 Debezium Oracle 连接器
您可以使用以下任一方法部署 Debezium Oracle 连接器:
使用 Streams for Apache Kafka 自动创建一个包含连接器插件的镜像。
这是首选的方法。
-
从 Dockerfile 构建自定义 Kafka Connect 容器镜像。
这个 Containerfile 部署方法已弃用。计划在以后的文档中删除这个方法的说明。
由于许可证要求,Debezium Oracle 连接器存档不包括连接器连接到 Oracle 数据库所需的 Oracle JDBC 驱动程序。要启用连接器访问数据库,您必须在连接器环境中添加驱动程序。如需更多信息,请参阅 获取 Oracle JDBC 驱动程序。
其他资源
2.5.5.1. 获取 Oracle JDBC 驱动程序
由于许可证要求,Debezium 连接到 Oracle 数据库所需的 Oracle JDBC 驱动程序文件不包括在 Debezium Oracle 连接器存档中。该驱动程序可从 Maven Central 下载。根据您使用的部署方法,您可以通过将命令添加到 Kafka Connect 自定义资源或用于构建连接器镜像的 Dockerfile 来检索驱动程序。
-
如果您使用 Streams for Apache Kafka 将连接器添加到 Kafka Connect 镜像,请将驱动程序的 Maven Central 位置添加到
KafkaConnect
自定义资源中的builds.plugins.artifact.url
中,如 第 2.5.5.3 节 “使用 Streams for Apache Kafka 部署 Debezium Oracle 连接器” 所示。 -
如果您使用 Dockerfile 为连接器构建容器镜像,请在 Dockerfile 中插入
curl
命令,以指定从 Maven Central 下载所需驱动程序文件的 URL。如需更多信息,请参阅 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium Oracle 连接器。
2.5.5.2. 使用 Streams for Apache Kafka 的 Debezium Oracle 连接器部署
部署 Debezium 连接器的首选方法是使用 Streams for Apache Kafka 来构建包含连接器插件的 Kafka Connect 容器镜像。
在部署过程中,您要创建和使用以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR,并包含有关镜像中包含的连接器工件的信息。 -
提供包括连接器用来访问源数据库的信息的
KafkaConnector
CR。在 Apache Kafka 的 Streams 启动 Kafka Connect pod 后,您可以通过应用KafkaConnector
CR 来启动连接器。
在 Kafka Connect 镜像的构建规格中,您可以指定用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 Apache Kafka 的 Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。
KafkaConnect
CR 中的 spec.build.output
参数指定在存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在容器 registry 中,如 quay.io 或 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。
如果使用 KafkaConnect
资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。
其他资源
- 在 OpenShift 中部署和管理 Apache Kafka Streams 中的配置 Kafka 连接。
- 在 OpenShift 中部署和管理 Apache Kafka 的 Streams 中自动构建新容器镜像。
2.5.5.3. 使用 Streams for Apache Kafka 部署 Debezium Oracle 连接器
对于 Apache Kafka 的早期版本,要在 OpenShift 上部署 Debezium 连接器,首先需要为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 Apache Kafka 的 Streams 中的构建配置,来自动构建包含您要使用的 Debezium 连接器插件的 Kafka Connect 容器镜像。
在构建过程中,Apache Kafka Operator 的 Streams 将 KafkaConnect
自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。
新创建的容器被推送到 .spec.build.output
中指定的容器 registry,并用于部署 Kafka Connect 集群。在 Apache Kafka 的 Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector
自定义资源来启动构建中包含的连接器。
先决条件
- 您可以访问安装了集群 Operator 的 OpenShift 集群。
- Apache Kafka Operator 的 Streams 正在运行。
- 部署了 Apache Kafka 集群,如 在 OpenShift 中部署和管理 Apache Kafka 的流 中所述。
- Kafka Connect 部署在 Apache Kafka 的 Streams 中
- 您有一个红帽构建的 Debezium 许可证。
-
OpenShift
oc
CLI 客户端已安装,或者您可以访问 OpenShift Container Platform Web 控制台。 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:
- 将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
- 在 registry 中创建和管理镜像的帐户和权限。
- 将构建镜像存储为原生 OpenShift ImageStream
- ImageStream 资源部署到集群中,以存储新的容器镜像。您必须为集群显式创建 ImageStream。默认情况下,镜像流不可用。如需有关 ImageStreams 的更多信息,请参阅在 OpenShift Container Platform 中管理镜像流。
流程
- 登录 OpenShift 集群。
为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有的资源。例如,使用名称dbz-connect.yaml
创建KafkaConnect
CR,用于指定metadata.annotations
和spec.build
属性。以下示例显示了描述KafkaConnect
自定义资源的dbz-connect.yaml
文件摘录。
例 2.34. 定义包含 Debezium 连接器的
KafkaConnect
自定义资源的dbz-connect.yaml
文件在以下示例中,自定义资源被配置为下载以下工件:
- Debezium Oracle 连接器存档。
- 红帽构建的 Apicurio Registry 归档。Apicurio Registry 是一个可选组件。只有在打算将 Avro serialization 与连接器一起使用时,才添加 Apicurio Registry 组件。
- Debezium 脚本 SMT 归档以及您要用于 Debezium 连接器的相关语言依赖项。SMT 归档和语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才添加这些组件。
- Oracle JDBC 驱动程序需要连接到 Oracle 数据库,但不包含在连接器存档中。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.9.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-oracle artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-oracle/3.0.8.Final-redhat-00004/debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.5.11.redhat-00001/apicurio-registry-distro-connect-converter-2.5.11.redhat-00001.zip 8 - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/3.0.8.Final-redhat-00004/debezium-scripting-3.0.8.Final-redhat-00004.zip 9 - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar 10 - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar - type: jar 11 url: https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc11/21.6.0.0/ojdbc11-21.6.0.0.jar bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093 ...
表 2.121. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources
注解设置为"true"
,以便 Cluster Operator 使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定存储构建镜像的位置,并列出镜像中包含的插件,以及插件工件的位置。3
build.output
指定存储新构建的镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值为docker
,可推送到容器 registry (如 Docker Hub 或 Quay)或镜像流
(用于将镜像推送到内部 OpenShift ImageStream)。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output
的更多信息,请参阅 Streams for Apache Kafka API 参考中的 Build schema 参考。5
插件配置
列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称
,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括要用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type
的值指定artifacts.url
中指定的工件的文件类型。有效类型是zip
、tgz
、或jar
。Debezium 连接器存档以.zip
文件格式提供。JDBC 驱动程序文件采用.jar
格式。type
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Red Hat Maven 存储库中提供了 Debezium 连接器工件。OpenShift 集群必须有权访问指定的服务器。8
(可选)指定下载 Apicurio Registry 组件的工件
类型和
url
。包括 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化事件键和值,使用红帽构建的 Apicurio Registry 的值,而不是使用默认的 JSON 转换。9
(可选)指定 Debezium 脚本 SMT 归档的工件
类型和
url
,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您还必须部署 JSR 223 兼容脚本实施,如 groovy。10
(可选)指定与 JSR 223 脚本实施的 JAR 文件的工件
类型和
url
,这是 Debezium 脚本 SMT 所需的。重要如果您使用 Streams for Apache Kafka 将连接器插件合并到 Kafka Connect 镜像中,对于每个所需的脚本语言组件
artifacts.url
必须指定 JAR 文件的位置,并且artifacts.type
的值也必须设置为jar
。无效的值会导致连接器在运行时失败。要启用将 Apache Groovy 语言与脚本 SMT 搭配使用,示例中的自定义资源会检索以下库的 JAR 文件:
-
groovy
-
groovy-jsr223
(脚本代理) -
groovy-json
(用于解析 JSON 字符串的模块)
Debezium 脚本 SMT 还支持使用 GraalVM JavaScript 的 JSR 223 实施。
11
指定 Maven Central 中 Oracle JDBC 驱动程序的位置。Debezium Oracle 连接器存档中不包含所需的驱动程序。
输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您在配置中列出的连接器工件在集群中可用。创建一个
KafkaConnector
资源来定义您要部署的每个连接器的实例。
例如,创建以下KafkaConnector
CR,并将它保存为oracle-inventory-connector.yaml
例 2.35. 为 Debezium 连接器定义
KafkaConnector
自定义资源的 Oracle-inventory-connector.yaml
文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-oracle 1 spec: class: io.debezium.connector.oracle.OracleConnector 2 tasksMax: 1 3 config: 4 schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092 schema.history.internal.kafka.topic: schema-changes.inventory database.hostname: oracle.debezium-oracle.svc.cluster.local 5 database.port: 1521 6 database.user: debezium 7 database.password: dbz 8 database.dbname: mydatabase 9 topic.prefix: inventory-connector-oracle 10 table.include.list: PUBLIC.INVENTORY 11 ...
表 2.122. 连接器配置设置的描述 项 描述 1
要注册到 Kafka Connect 集群的连接器名称。
2
连接器类的名称。
3
可同时操作的任务数量。
4
连接器的配置。
5
主机数据库实例的地址。
6
数据库实例的端口号。
7
Debezium 用来连接到数据库的帐户名称。
8
Debezium 用来连接到数据库用户帐户的密码。
9
要从中捕获更改的数据库的名称。
10
数据库实例或集群的主题前缀。
指定的名称只能从字母数字字符或下划线构成。
因为主题前缀用作从此连接器接收更改事件的 Kafka 主题的前缀,因此该名称在集群中的连接器中必须是唯一的。
如果您将连接器与 Avro 连接器集成,则此命名空间也用于相关的 Kafka Connect 模式的名称,以及对应的 Avro 模式的命名空间。11
连接器捕获更改事件的表列表。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
例如,
oc create -n debezium -f oracle-inventory-connector.yaml
连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。
您现在已准备好 验证 Debezium Oracle 部署。
2.5.5.4. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium Oracle 连接器
要部署 Debezium Oracle 连接器,您必须构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,然后将此容器镜像推送到容器 registry。然后,您需要创建以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR。CR 中的image
属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat Streams for Apache Kafka 的 OpenShift 实例。Apache Kafka 的流提供将 Apache Kafka 到 OpenShift 的 operator 和镜像。 -
定义 Debezium Oracle 连接器的
KafkaConnector
CR。将此 CR 应用到应用KafkaConnect
CR 的同一 OpenShift 实例。
先决条件
- Oracle 数据库正在运行,您完成了 设置 Oracle 以使用 Debezium 连接器 的步骤。
- Apache Kafka 的流部署在 OpenShift 中,它正在运行 Apache Kafka 和 Kafka Connect。如需更多信息,请参阅在 OpenShift 中部署和管理 Apache Kafka的流
- podman 或 Docker 已安装。
-
您有在容器 registry (如
quay.io
或docker.io
)中创建和管理容器的帐户和权限,您要添加将运行 Debezium 连接器的容器。 Kafka Connect 服务器有权访问 Maven Central,以下载 Oracle 所需的 JDBC 驱动程序。您还可以使用驱动程序的本地副本,或使用本地 Maven 存储库或其他 HTTP 服务器可用的副本。
如需更多信息,请参阅 获取 Oracle JDBC 驱动程序。
流程
为 Kafka Connect 创建 Debezium Oracle 容器:
创建一个 Dockerfile,它使用
registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0
作为基础镜像。例如,在终端窗口中输入以下命令:cat <<EOF >debezium-container-for-oracle.yaml 1 FROM registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0 USER root:root RUN mkdir -p /opt/kafka/plugins/debezium 2 RUN cd /opt/kafka/plugins/debezium/ \ && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-oracle/3.0.8.Final-redhat-00004/debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip \ && unzip debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip \ && rm debezium-connector-oracle-3.0.8.Final-redhat-00004-plugin.zip RUN cd /opt/kafka/plugins/debezium/ \ && curl -O https://repo1.maven.org/maven2/com/oracle/ojdbc/ojdbc11/21.6.0.0/ojdbc11-21.6.0.0.jar USER 1001 EOF
项 描述 1
您可以指定您想要的任何文件名。
2
指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为您的目录的实际路径。
该命令在当前目录中创建一个名为
debezium-container-for-oracle.yaml
的 Dockerfile。从您在上一步中创建的
debezium-container-for-oracle.yaml
Docker 文件中构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:podman build -t debezium-container-for-oracle:latest .
docker build -t debezium-container-for-oracle:latest .
前面的命令使用名称
debezium-container-for-oracle
构建容器镜像。将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器镜像仓库必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:
podman push <myregistry.io>/debezium-container-for-oracle:latest
docker push <myregistry.io>/debezium-container-for-oracle:latest
创建新的 Debezium Oracle KafkaConnect 自定义资源(CR)。例如,使用名称
dbz-connect.yaml
创建KafkaConnect
CR,用于指定注解和
镜像
属性。以下示例显示了描述KafkaConnect
自定义资源的dbz-connect.yaml
文件摘录。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: image: debezium-container-for-oracle 2 ...
项 描述 1
metadata.annotations
表示KafkaConnector
资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。2
spec.image
指定为运行 Debezium 连接器而创建的镜像的名称。此属性覆盖 Cluster Operator 中的STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
变量。输入以下命令将
KafkaConnect
CR 应用到 OpenShift Kafka Connect 环境:oc create -f dbz-connect.yaml
该命令添加一个 Kafka Connect 实例,用于指定为运行 Debezium 连接器而创建的镜像的名称。
创建一个
KafkaConnector
自定义资源,用于配置 Debezium Oracle 连接器实例。您可以在指定连接器的配置属性的
.yaml
文件中配置 Debezium Oracle 连接器。连接器配置可能会指示 Debezium 为模式和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略、掩码或截断指定列中的值,这些值是敏感、太大或不需要的。以下示例配置了在端口
1521
上连接到 Oracle 主机 IP 地址的 Debezium 连接器。此主机有一个名为ORCLCDB
的数据库,server1
是服务器的逻辑名称。Oracle
inventory-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector-oracle 1 labels: strimzi.io/cluster: my-connect-cluster annotations: strimzi.io/use-connector-resources: 'true' spec: class: io.debezium.connector.oracle.OracleConnector 2 config: database.hostname: <oracle_ip_address> 3 database.port: 1521 4 database.user: c##dbzuser 5 database.password: dbz 6 database.dbname: ORCLCDB 7 database.pdb.name : ORCLPDB1, 8 topic.prefix: inventory-connector-oracle 9 schema.history.internal.kafka.bootstrap.servers: kafka:9092 10 schema.history.internal.kafka.topic: schema-changes.inventory 11
表 2.123. 连接器配置设置的描述 项 描述 1
当使用 Kafka Connect 服务注册时,连接器的名称。
2
此 Oracle 连接器类的名称。
3
Oracle 实例的地址。
4
Oracle 实例的端口号。
5
Oracle 用户的名称,如 为 连接器创建用户 中所述。
6
Oracle 用户的密码,如为 连接器创建用户 中所述。
7
要从中捕获更改的数据库的名称。
8
连接器从中捕获更改的 Oracle 可插拔数据库的名称。仅在容器数据库(CDB)安装中使用。
9
主题前缀标识,并为 Oracle 数据库服务器提供命名空间,连接器从中捕获更改。
10
此连接器用来将 DDL 语句写入并恢复到数据库 schema 历史记录主题的 Kafka 代理列表。
11
连接器写入和恢复 DDL 语句的数据库架构历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。
使用 Kafka Connect 创建连接器实例。例如,如果您在
inventory-connector.yaml
文件中保存KafkaConnector
资源,您将运行以下命令:oc apply -f inventory-connector.yaml
前面的命令注册
inventory-connector
,连接器开始针对KafkaConnector
CR 中定义的server1
数据库运行。
有关您可以为 Debezium Oracle 连接器设置的配置属性的完整列表,请参阅 Oracle 连接器属性。
结果
连接器启动后,它会执行为连接器配置的 Oracle 数据库的一致性快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流传输到 Kafka 主题。
2.5.5.5. 配置容器数据库和非容器数据库
Oracle 数据库支持以下部署类型:
- 容器数据库(CDB)
- 可以包含多个可插拔数据库(PDB)的数据库。数据库客户端连接到每个 PDB,就像它是标准的非CDB 数据库一样。
- 非容器数据库(非CDB)
- 标准 Oracle 数据库,不支持创建可插拔数据库。
使用 mTLS 安全连接
当使用 mutual TLS 身份验证(mTLS)连接到 Oracle 时,这涉及连接器和数据库服务提供身份。Oracle 连接器的 Debezium 依赖于 Oracle JDBC 驱动程序的内置功能来支持 mTLS 身份验证。
您可以使用以下任一方法在 Debezium 和 Oracle 之间建立 mTLS 连接:
2.5.5.6. 使用 Java 密钥和信任存储将 Oracle 连接器配置为使用 mTLS
先决条件
- 验证连接器可以访问配置的密钥和信任存储文件。
- 验证数据库 TNS (Transparent Network Substrate)侦听器是否支持 TCPS 安全连接。
流程
配置 Debezium Oracle 连接器,如下例所示。
示例:Debezium Oracle 连接器 TLS 配置
{ "database.url": "jdbc:oracle:thin@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcps)(HOST=<host>)(PORT=<port>))(CONNECT_DATA(SERVICE_NAME=<service>)))", "driver.javax.net.ssl.keyStore": "<path-to-jks-keystore>", "driver.javax.net.ssl.keyStorePassword": "<keystore-password>", "driver.javax.net.ssl.keyStoreType": "JKS", "driver.javax.net.ssl.trustStore": "<path-to-jks-truststore>", "driver.javax.net.ssl.trustStorePassword": "<truststore-password>", "driver.javax.net.ssl.trustStoreType": "JKS" }
注意对于使用 TLS 加密与 Oracle 通信的 Debezium,需要与服务器的 TCPS 连接。要建立 TCPS 连接,您必须将连接器配置为使用
database.url
属性,而不是database.host
属性。与database.host
属性不同,database.url
属性允许您定义明确需要使用TCPS
协议的 Oracle TNS (Transparent Network Substrate)连接字符串。
2.5.5.7. 使用 Oracle wallet 将连接器配置为使用 mTLS
先决条件
- 使用数据库管理员验证 Oracle 数据库服务器上是否已配置 Oracle Wallet
-
在 Oracle JDBC 驱动程序存档中找到
oraclepki.jar
流程
-
在同一位置安装
oraclepki.jar
,其中 $Debezium Debezium Oracle 连接器 jars 存在。如果您下载并安装 Oracle JDBC 驱动程序,则同一位置是您也会放置oraclepki.jar
。 -
使用
database.url
配置属性而不是database.hostname
来配置连接器。通过使用database.url
,提供了一个基于 TNS 的配置,以便与 Oracle Wallet 交互。示例配置如下所示: -
将 Oracle JDBC 驱动程序属性
oracle.net.wallet_location
设置为明确设置 Oracle Wallet 配置供 Oracle JDBC 驱动程序使用。
mTLS 配置示例
{ "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=xxxx)(HOST=xx.xx.xx.xx))(CONNECT_DATA=(SID=xxxx)))", "driver.oracle.net.wallet_location": "(SOURCE=(METHOD=file)(METHOD_DATA=(DIRECTORY=/opt/kafka/external-configuration/oracle_wallet/)))" }
务必在 database.url
中设置正确的主机、端口和服务标识符(sid)。此外,请确保 driver.oracle.net.wallet_location
中的目录可读。
2.5.5.8. 验证 Debezium Oracle 连接器是否正在运行
如果连接器正确启动且没有错误,它会为每个表创建一个主题,这些表配置为捕获连接器。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。
要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:
- 验证连接器状态。
- 验证连接器是否生成主题。
- 验证主题是否填充了用于读取操作的事件("op":"r"),连接器在每个表的初始快照过程中生成的。
先决条件
- 在 OpenShift 中,Debezium 连接器部署到 Streams for Apache Kafka。
-
已安装 OpenShift
oc
CLI 客户端。 - 访问 OpenShift Container Platform web 控制台。
流程
使用以下方法之一检查
KafkaConnector
资源的状态:在 OpenShift Container Platform Web 控制台中:
-
导航到 Home
Search。 -
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaConnector
。 - 在 KafkaConnectors 列表中,点您要检查的连接器名称,如 inventory-connector-oracle。
- 在 Conditions 部分中,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在终端窗口中:
使用以下命令:
oc describe KafkaConnector <connector-name> -n <project>
例如,
oc describe KafkaConnector inventory-connector-oracle -n debezium
该命令返回与以下输出类似的状态信息:
例 2.36.
KafkaConnector
资源状态Name: inventory-connector-oracle Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-oracle Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory-connector-oracle.inventory inventory-connector-oracle.inventory.addresses inventory-connector-oracle.inventory.customers inventory-connector-oracle.inventory.geom inventory-connector-oracle.inventory.orders inventory-connector-oracle.inventory.products inventory-connector-oracle.inventory.products_on_hand Events: <none>
验证连接器是否已创建 Kafka 主题:
通过 OpenShift Container Platform Web 控制台。
-
导航到 Home
Search。 -
在 Search 页面上,单击 Resources 以打开 Select Resource 框,然后键入
KafkaTopic
。 -
从 KafkaTopics 列表中,单击要检查的主题的名称,例如
inventory-connector-oracle.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
。 - 在 Conditions 部分中,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在终端窗口中:
使用以下命令:
oc get kafkatopics
该命令返回与以下输出类似的状态信息:
例 2.37.
KafkaTopic
资源状态NAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-oracle--a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-oracle.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-oracle.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-oracle.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-oracle.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-oracle.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-oracle.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
检查主题内容。
- 在终端窗口中输入以下命令:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
例如,
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-oracle.inventory.products_on_hand
指定主题名称的格式与步骤 1 中返回的
oc describe
命令相同,例如inventory-connector-oracle.inventory.addresses
。对于主题中的每个事件,命令会返回类似以下输出的信息:
例 2.38. Debezium 更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"oracle","name":"inventory-connector-oracle","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"oracle-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}
在前面的示例中,
有效负载
值显示连接器快照从表inventory.products_on_hand
生成一个读取("op" ="r"
)事件。product_id
记录的"before"
状态为null
,表示记录没有之前的值。"after"
状态对于product_id
为101
的项目的quantity
显示为3
。
2.5.6. Debezium Oracle 连接器配置属性的描述
Debezium Oracle 连接器有许多配置属性,可用于为应用程序获得正确的连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:
- 所需的 Debezium Oracle 连接器配置属性
- 数据库架构历史记录连接器配置属性,用于控制 Debezium 如何处理从数据库架构历史记录主题读取的事件。
所需的 Debezium Oracle 连接器配置属性
除非默认值可用 , 否则需要以下配置属性。
属性 | 默认 | 描述 |
没有默认值 | 连接器的唯一名称。尝试使用相同名称再次注册将失败。(所有 Kafka 连接连接器都需要此属性。) | |
没有默认值 |
连接器的 Java 类的名称。对于 | |
没有默认值 |
枚举连接器可以使用的 自定义转换器 实例的符号链接名称列表。
对于您为连接器配置的每个转换器,还必须添加一个
例如, boolean.type: io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数来将值传递给转换器。要将任何其他配置参数与转换器关联,请为参数名称添加转换器符号名称前缀。 boolean.selector: .*MYTABLE.FLAG,.*.IS_ARCHIVED | |
| 为此连接器创建的最大任务数量。Oracle 连接器始终使用单个任务,因此不使用这个值,因此始终可以接受默认值。 | |
没有默认值 | Oracle 数据库服务器的 IP 地址或主机名。 | |
没有默认值 | Oracle 数据库服务器的整数端口号。 | |
没有默认值 | 连接器用来连接到 Oracle 数据库服务器的 Oracle 用户帐户的名称。 | |
没有默认值 | 连接到 Oracle 数据库服务器时要使用的密码。 | |
没有默认值 | 要连接的数据库的名称。在容器数据库环境中,指定根容器数据库(CDB)的名称,而不是包含的可插拔数据库(PDB)的名称。 | |
没有默认值 | 指定原始数据库 JDBC URL。使用此属性提供定义该数据库连接的灵活性。有效值包括原始 TNS 名称和 RAC 连接字符串。 | |
没有默认值 | 要连接的 Oracle 可插拔数据库的名称。仅将此属性与容器数据库(CDB)安装一起使用。 | |
没有默认值 |
为 Oracle 数据库服务器提供命名空间的主题前缀,连接器会捕获更改。您设置的值用作连接器发出的所有 Kafka 主题名称的前缀。指定在 Debezium 环境中所有连接器之间唯一的主题前缀。以下字符有效:字母数字字符、连字符、点和下划线。 警告 不要更改此属性的值。如果您更改了 name 值,重启后,而不是继续向原始主题发送事件,连接器会将后续事件发送到名称基于新值的主题。连接器也无法恢复其数据库架构历史记录主题。 | |
| 连接器在流数据库更改时使用的适配器实现。您可以设置以下值:
| |
Initial | 指定连接器用来获取捕获表快照的模式。您可以设置以下值:
快照完成后,连接器将继续从数据库的红色日志中读取更改事件,除非
如需更多信息,请参阅 | |
shared | 控制连接器保存表锁定的时长。表锁定可防止在连接器执行快照时发生某些类型的更改表操作。您可以设置以下值:
| |
|
指定连接器在执行快照时如何查询数据。
与使用 | |
在连接器 |
可选的、以逗号分隔的正则表达式列表,与表的完全限定名称(<
在多租户容器数据库(CDB)环境中,正则表达式必须包含 可插拔数据库(PDB)名称,格式为 <
要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。
快照只能包含在连接器的
只有在连接器的 | |
没有默认值 | 指定要包含在快照中的表行。如果您希望快照只包括表中的行的子集,请使用该属性。此属性仅影响快照。它不适用于连接器从日志读取的事件。
属性包含以逗号分隔的完全限定表名称列表,格式为 <
从包括软删除列 "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"
在生成的快照中,连接器只包含 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您要 捕获更改的模式名称匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。没有包含在 schema.
要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配,它与 schema 名称中可能存在的子字符串不匹配。 | |
| 布尔值指定连接器是否应该在元数据对象上解析和发布表和列注释。启用此选项将对内存用量产生影响。逻辑模式对象的数量和大小非常大会影响 Debezium 连接器消耗的内存量,并在每个对象中添加潜在的大型字符串数据可能非常昂贵。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与 您不想 捕获更改的模式名称匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。
要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配,它与 schema 名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与要捕获的表的完全限定表标识符匹配。如果您使用 LogMiner 实现,则只使用带有此属性的 POSIX 正则表达式。当设置此属性时,连接器只从指定的表中捕获更改。每个表标识符都使用以下格式:
要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。 | |
没有默认值 |
可选的正则表达式列表,该表达式与要从监控中排除的表的完全限定表标识符匹配。如果您使用 LogMiner 实现,则只使用带有此属性的 POSIX 正则表达式。连接器从 exclude 列表中没有指定的任何表捕获更改事件。使用以下格式指定每个表的标识符:
要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与 change 事件消息值中包含的列的完全限定域名匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。列的完全限定域名使用以下格式:
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列中的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您要从更改事件消息值中排除的列的完全限定域名匹配。在使用 LogMiner 实现的环境中,您必须只使用 POSIX 正则表达式。完全限定列名称使用以下格式:
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列中的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。 | |
|
指定在包含的列中没有更改时跳过发布消息。如果列没有包括每个 | |
| 不适用 |
可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。列的完全限定域名格式为
一个 pseudonym,它包括了通过应用指定的 hashAlgorithm 和 salt 的结果的哈希值。根据使用的 hash 功能,引用完整性会被维护,而列值则替换为 pseudonyms。Java 加密架构标准算法 文档的 MessageDigest 部分中 描述了支持的哈希功能。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
如有必要,pseudonym 会自动缩短到列的长度。连接器配置可以包含多个属性,以指定不同的哈希算法和 salt。 |
bytes |
指定在更改事件中二进制( | |
none |
指定应该如何调整模式名称,以便与连接器使用的消息转换器兼容。可能的设置:
| |
none |
指定应如何调整字段名称,以便与连接器使用的消息转换器兼容。可能的设置:
如需了解更多详细信息,请参阅 Avro 命名。 | |
|
指定连接器应如何处理
| |
|
指定连接器如何处理 | |
| 指定连接器在处理事件时应如何响应异常。您可以设置以下选项之一:
| |
| 一个正整数值,用于指定要在此连接器的每个批处理事件的最大大小。 | |
|
正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会在将事件写入 Kafka 前将事件放在阻塞队列中。当连接器比将信息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可能会提供从数据库读取更改事件的情况。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 | |
|
指定阻塞队列的最大卷的长整数值,以字节为单位。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。 | |
| 正整数值指定连接器在每次迭代过程中应等待的毫秒数,以便出现新的更改事件。 | |
| 布尔值指定连接器是否将数据库 schema 中的更改发布到 Kafka 主题,其名称与主题前缀相同。连接器使用包含数据库名称的键记录每个架构更改,以及一个描述 schema 更新的 JSON 结构的值。这个记录模式更改的机制独立于连接器的内部记录数据库架构历史记录。 | |
| 控制 delete 事件后跟一个 tombstone 事件。可能会有以下值:
删除源记录后,tombstone 事件(默认行为)可让 Kafka 完全删除在启用了 日志压缩的主题中已删除行的密钥的所有事件。 | |
没有默认值 | 一个表达式列表,用于指定连接器用来组成自定义消息键的表达式列表,用于更改它发布到指定表的 Kafka 主题。
默认情况下,Debebe 使用表的主键列作为发出的记录的消息键。使用默认键,或者为缺少主密钥的表指定一个键,您可以根据一个或多个列配置自定义消息密钥。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将
一个列的完全限定名称会观察以下格式:< 您可以在单个配置中指定多个长度不同的属性。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,用于对更改事件中的列名称进行掩码处理,将字符替换为星号 ( | |
没有默认值 | 可选的、以逗号分隔的正则表达式列表,与您希望连接器发送代表列元数据的额外参数匹配。当设置此属性时,连接器将以下字段添加到事件记录的 schema 中:
这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
一个列的完全限定名称会观察以下格式之一: < | |
没有默认值 | 可选的、以逗号分隔的正则表达式列表,用于指定为数据库中列定义的数据类型的完全限定名称。当设置此属性时,对于带有匹配数据类型的列,连接器会发出事件记录,该记录在其 schema 中包含以下额外字段:
这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
一个列的完全限定名称会观察以下格式之一: < 有关特定于 Oracle 数据类型名称的列表,请参阅 Oracle 数据类型映射。 | |
|
指定(以毫秒为单位)连接器将信息发送到 heartbeat 主题的频率。 | |
没有默认值 |
指定当连接器发送心跳消息时连接器在源数据库上执行的查询。 设置此属性并创建一个心跳表,以接收心跳信息,以解决 Debezium 无法同步与高流量数据库位于同一主机上的低流量数据库的偏移 的情况。在连接器将记录插入到配置的表中后,它能够接收来自 low-traffic 数据库的更改,并确认数据库中的 SCN 更改,以便偏移可以与代理同步。 | |
没有默认值 |
指定连接器在拍摄快照前等待的时间间隔(毫秒)。 | |
0 |
指定连接器在完成快照后延迟流过程启动的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后马上重启快照,但在流传输过程开始前。设置一个延迟值,它高于为 Kafka Connect worker 设置的 | |
| 指定在拍摄快照时应在每个表中读取的最大行数。连接器以指定大小的多个批处理读取表内容。 | |
|
指定给定查询的每个数据库往返获取的行数。使用 | |
|
如果您希望 Debezium 生成带有事务边界的事件,并使用事务元数据丰富数据事件,将属性设置为 如需了解更多详细信息,请参阅 Transaction Metadata。 | |
|
指定控制 Oracle LogMiner 构建并使用给定数据字典解析表和列 ID 的 mining 策略。
| |
|
指定控制如何构建 Oracle LogMiner 查询的 mining 查询模式。
| |
|
缓冲区类型控制连接器管理缓冲区事务数据的方式。 | |
| 事务在事务缓冲区中可以具有的最大事件数。超过这个阈值的事件计数的事务不会发出,并会被取消。默认行为是没有事务事件阈值。 | |
|
在使用新会话前,LogMiner 会话可以处于活跃状态的最大毫秒数。 | |
|
指定 JDBC 连接是否关闭并在日志交换机上重新打开,还是在 mining 会话达到最长生命周期阈值时。 | |
| 此连接器尝试从 redo/archive 日志读取的最小 SCN 间隔大小。 | |
| 从 redo/archive 日志读取时此连接器使用的最大 SCN 间隔大小。 | |
| 增加/减少连接器用于从 redo/archive 日志读取的时间间隔。 | |
| 连接器用于从 redo/archive 日志读取数据的起始 SCN 间隔大小。这也服务器作为调整批处理大小的一种措施 - 当当前 SCN 和批处理开始/端 SCN 之间的区别大于这个值时,批处理大小会增加/减少。 | |
| 从 redo/archive 日志读取数据后,连接器在从 redo/archive 日志读取后,以及再次读取数据前的最短时间。值以毫秒为单位。 | |
| 在从 redo/archive 日志读取数据后,连接器在从 redo/archive 日志读取数据后,以及再次开始读取数据前,连接器会处于睡眠状态的最长时间。值以毫秒为单位。 | |
| 从 redo/archive 日志读取数据后,连接器在从 redo/archive 日志读取后处于睡眠状态的开始时间,然后再重新开始读取数据。值以毫秒为单位。 | |
| 连接器用于在从 logminer 读取数据时调整最佳睡眠时间的最大时间量。值以毫秒为单位。 | |
|
过去从 SYSDATE 到 mine 归档日志的小时数。当使用默认设置 | |
|
控制连接器是否从归档日志或在线恢复日志和存档日志(默认)的组合中减去更改。 | |
|
连接器在轮询之间休眠的毫秒数,以确定启动系统更改号是否在存档日志中。如果没有启用 | |
|
正整数值,用于指定在红色日志交换机之间保持长时间运行的事务的毫秒数。当设置为 默认情况下,LogMiner 适配器维护所有正在运行的事务的内存缓冲区。因为所有作为事务一部分的 DML 操作都会被缓冲,直到检测到提交或回滚前,应该避免长时间运行的事务,以便不会溢出该缓冲区。任何超过此配置值的事务都会被完全丢弃,连接器不会为作为事务一部分的操作发出任何消息。 | |
没有默认值 |
使用 LogMiner 指定一个归档日志时要使用的 Oracle 存档目的地。 | |
没有默认值 | 要从 LogMiner 查询中包含的数据库用户列表。如果您希望捕获过程包含来自指定用户的更改,则设置此属性会很有用。 | |
没有默认值 | 要从 LogMiner 查询中排除的数据库用户列表。如果您希望捕获过程始终排除特定用户所做的更改,则设置此属性会很有用。 | |
|
指定一个值,连接器与当前和之前的 SCN 值之间的差别进行比较,以确定 SCN 差距是否存在。如果 SCN 值之间的区别大于指定的值,且时间差异小于 | |
|
指定一个值,以毫秒为单位,连接器与当前和之前的 SCN 时间戳之间的差别进行比较,以确定 SCN 差距是否存在。如果时间戳之间的区别小于指定的值,并且 SCN delta 大于 | |
|
指定协调将 Oracle LogWriter Buffer (LGWR)刷新表到红色日志的 flush 表的名称。可以使用 < | |
|
指定在 | |
|
控制在更改事件中是否发出大型对象(CLOB 或 BLOB)列值。 | |
| 指定连接器提供的常量,表示原始值没有变化,不是由数据库提供。 | |
没有默认值 | 以逗号分隔的 Oracle Real Application Clusters (RAC)节点主机名或地址列表。需要此字段才能启用与 Oracle RAC 部署的兼容性。 使用以下方法之一指定 RAC 节点列表:
如果您使用 | |
| 以逗号分隔的操作类型列表,您希望连接器在流传输过程中跳过。您可以将连接器配置为跳过以下类型的操作:
默认情况下,只跳过 truncate 操作。 | |
没有默认值 |
用于向连接器发送信号的数据收集的完全限定名称。https://docs.redhat.com/documentation/en/red_hat_build_of_debezium/3.0.8/html-single/debezium_user_guide/index#debezium-signaling-enabling-source-signaling-channel当您将此属性与 Oracle 可插拔数据库(PDB)搭配使用时,请将其值设为根数据库的名称。 | |
source | 为连接器启用的信号通道名称列表。默认情况下,以下频道可用:
| |
没有默认值 | 为连接器启用的通知频道名称列表。默认情况下,以下频道可用:
| |
| 连接器在增量快照块期间获取并读取的最大行数。增加块大小可提供更高的效率,因为快照会减少对更大大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。 | |
|
指定连接器在增量快照中使用的水位线机制,以重复数据删除事件,这些事件可能会被增量快照捕获,然后在流恢复后重新捕获。
| |
|
应用于确定数据更改的主题名称、模式更改、事务、心跳事件等主题名称,默认为 | |
|
指定主题名称的分隔符,默认为 | |
| 在绑定的并发散列映射中保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。 | |
|
控制连接器发送心跳消息的主题名称。主题名称具有此模式: | |
|
控制连接器发送事务元数据消息的主题名称。主题名称具有此模式: | |
|
指定连接器在执行初始快照时使用的线程数量。要启用并行初始快照,请将 属性设置为大于 1 的值。在并行初始快照中,连接器同时处理多个表。 注意
当您启用并行初始快照时,执行每个表快照的线程可能需要不同的时间来完成它们的工作。如果一个表的快照需要比其他表的快照完成的时间要长得多,则线程已完成其工作闲置。在某些环境中,网络设备(如负载均衡器或防火墙)会终止闲置以延长的时间间隔的连接。快照完成后,连接器无法关闭连接,从而导致异常和不完整的快照,即使连接器成功传输所有快照数据。 | |
|
指定发生数据库错误时重试尝试快照表的数量。此配置属性目前只重试与 | |
|
通过添加提供上下文信息的元数据来定义自定义 MBean 对象名称的标签。指定以逗号分隔的键值对列表。每个键代表 MBean 对象名称的标签,对应的值代表键的值,例如 连接器将指定的标签附加到基础 MBean 对象名称。标签可帮助您组织和分类指标数据。您可以定义标签来标识特定的应用程序实例、环境、区域、版本等。如需更多信息,请参阅自定义 MBean 名称。 | |
|
指定连接器如何在生成 Retriable 错误的操作后响应,如连接错误。
| |
| 等待查询执行的时间(以毫秒为单位)。默认为 600 秒(600,000 ms);零表示没有限制。 |
Debezium Oracle 连接器数据库模式历史记录配置属性
Debezium 提供了一组 schema.history.internal.*
属性,用于控制连接器如何与 schema 历史记录主题进行交互。
下表描述了用于配置 Debezium 连接器的 schema.history.internal
属性。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 | 连接器存储数据库 schema 历史记录的 Kafka 主题的完整名称。 | |
没有默认值 | 连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索之前由连接器存储的数据库架构历史记录,以及写入从源数据库读取的每个 DDL 语句。每个对都应该指向 Kafka Connect 进程使用的相同 Kafka 集群。 | |
| 整数值,指定连接器在启动/恢复期间应等待的最大毫秒数,同时轮询持久数据。默认值为 100ms。 | |
| 指定连接器在使用 Kafka admin 客户端获取集群信息时应等待的最大毫秒数。 | |
| 指定连接器在使用 Kafka admin 客户端创建 kafka 历史记录主题时应等待的最大毫秒数。 | |
|
连接器在连接器恢复失败前尝试读取持久性历史记录数据的次数上限,并显示错误。没有数据后等待的最长时间是 | |
|
指定连接器是否应该忽略不正确的或未知数据库语句的布尔值,或停止处理,以便用户可以解决这个问题。安全默认为 | |
|
指定连接器是否记录模式或数据库中所有表的布尔值,或者仅从指定为捕获的表记录。
| |
|
指定连接器是否从数据库实例中的所有逻辑数据库记录模式结构的布尔值。
|
直通 Oracle 连接器配置属性
连接器支持 通过传递 属性,使 Debezium 指定自定义配置选项来微调 Apache Kafka producer 和消费者的行为。有关 Kafka 生成者和消费者的完整配置属性范围的详情,请参考 Kafka 文档。
直通属性,用于配置生成者和消费者客户端如何与 schema 历史记录主题交互
Debezium 依赖于 Apache Kafka producer 将 schema 更改写入数据库 schema 历史记录主题。同样,它依赖于 Kafka 使用者在连接器启动时从数据库 schema 历史记录主题中读取。您可以通过将值分配给一组以 schema.history.internal.producer和 schema.history.internal.consumerPromQL 前缀开头的 pass-through 配置属性来定义 Kafka producer
和 消费者
客户端的配置。pass-through producer 和 consumer 数据库模式历史记录属性控制一系列行为,如这些客户端如何与 Kafka 代理安全连接,如下例所示:
schema.history.internal.producer.security.protocol=SSL schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks schema.history.internal.producer.ssl.keystore.password=test1234 schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks schema.history.internal.producer.ssl.truststore.password=test1234 schema.history.internal.producer.ssl.key.password=test1234 schema.history.internal.consumer.security.protocol=SSL schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks schema.history.internal.consumer.ssl.keystore.password=test1234 schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks schema.history.internal.consumer.ssl.truststore.password=test1234 schema.history.internal.consumer.ssl.key.password=test1234
Debezium 在将属性传递给 Kafka 客户端前从属性名称中剥离前缀。
有关 Kafka producer 配置属性和 Kafka 使用者配置属性的更多信息,请参阅 Apache Kafka 文档。
用于通过属性配置 Oracle 连接器如何与 Kafka 信号主题交互
Debezium 提供了一组 signal.*
属性,用于控制连接器如何与 Kafka 信号主题进行交互。
下表描述了 Kafka 信号
属性。
属性 | 默认 | 描述 |
---|---|---|
<topic.prefix>-signal | 连接器监控用于临时信号的 Kafka 主题的名称。 注意 如果禁用 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号顺序。信号主题必须具有单个分区。 | |
kafka-signal | Kafka 用户使用的组 ID 的名称。 | |
没有默认值 | 连接器用来建立到 Kafka 集群的初始连接的主机和端口对列表。每个对引用 Debezium Kafka Connect 进程使用的 Kafka 集群。 | |
| 整数值,用于指定连接器在轮询信号时等待的最大毫秒数。 |
为信号频道配置 Kafka 消费者客户端的属性
Debezium 连接器提供信号 Kafka 使用者的直通配置。透传信号属性以 signals.consumer.*
前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL
等属性传递给 Kafka 使用者。
Debezium 在将属性传递给 Kafka 信号消费者前从属性中剥离前缀。
用于配置 Oracle 连接器接收器通知频道的直通属性
下表描述了可用于配置 Debezium sink 通知
频道的属性。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 |
从 Debezium 接收通知的主题名称。当您将 |
Debezium 连接器传递数据库驱动程序配置属性
Debezium 连接器提供数据库驱动程序的直通配置。透传数据库属性以前缀 driverPROFILE 开头
。例如,连接器将 driver.foobar=false
等属性传递给 JDBC URL。
Debezium 在将属性传递给数据库驱动程序前从属性中剥离前缀。
2.5.7. 监控 Debezium Oracle 连接器性能
Debezium Oracle 连接器除了 Apache Zookeeper、Apache Kafka 和 Kafka Connect 的内置支持外,还提供三种指标类型。
有关如何通过 JMX 公开这些指标的详情,请参考 监控文档。
2.5.7.1. 用于 Oracle 连接器快照和流 MBean 对象的自定义名称
Debezium 连接器通过 MBean 名称为连接器公开指标。这些指标(特定于每个连接器实例)提供有关连接器快照、流和架构历史记录进程行为的数据。
默认情况下,当您部署正确配置的连接器时,Debezium 会为每个不同的连接器指标生成一个唯一的 MBean 名称。要查看连接器进程的指标,您可以将可观察性堆栈配置为监控其 MBean。但是,这些默认 MBean 名称取决于连接器配置,但配置更改可能会导致对 MBean 名称的更改。对 MBean 名称的更改会破坏连接器实例和 MBean 之间的链接,并破坏监控活动。在这种情况下,如果要恢复监控,您必须重新配置 observability 堆栈以使用新的 MBean 名称。
要防止监控 MBean 名称更改结果的中断,您可以配置自定义指标标签。您可以通过在连接器配置中添加 custom.metric.tags
属性来配置自定义指标。属性接受键值对,其中每个键代表 MBean 对象名称的标签,对应的值代表该标签的值。例如: k1=v1,k2=v2
。Debezium 将指定的标签附加到连接器的 MBean 名称中。
为连接器配置 custom.metric.tags
属性后,您可以配置 observability 堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签,而不是可变 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了它如何构建 MBean 名称,或者连接器配置更改中的 topic.prefix
,则指标集合不会中断,因为指标提取任务使用指定的标签模式来识别连接器。
使用自定义标签的更多优点是,您可以使用反映数据管道架构的标签,以便以适合您操作需求的方式组织指标。例如,您可以使用声明连接器活动类型、应用程序上下文或数据源的值来指定标签,如 db1-streaming-for-application-abc
。如果您指定多个键值对,则所有指定的对都会附加到连接器的 MBean 名称中。
以下示例演示了标签如何修改默认的 MBean 名称。
例 2.39. 自定义标签如何修改连接器 MBean 名称
默认情况下,Oracle 连接器使用以下 MBean 名称进行流传输指标:
debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>
如果将 custom.metric.tags
的值设置为 database=salesdb-streaming,table=inventory
,Debezium 会生成以下自定义 MBean 名称:
debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
2.5.7.2. Debezium Oracle 连接器快照指标
MBean 为 debezium.oracle:type=connector-metrics,context=snapshot,server= <topic.prefix>
。
快照指标不会被公开,除非快照操作活跃,或者快照自上次连接器开始以来发生了某种情况。
下表列出了可用的快照指标。
属性 | 类型 | 描述 |
---|---|---|
| 连接器读取的最后一个快照事件。 | |
| 因为连接器已读取和处理最新事件,因此毫秒数。 | |
| 此连接器自上一次启动或重置以来看到的事件总数。 | |
| 已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。 | |
| 连接器捕获的表列表。 | |
| 在快照和主 Kafka Connect 循环之间传递事件的队列长度。 | |
| 用于在快照和主 Kafka Connect 循环之间传递事件的队列的可用容量。 | |
| 包括在快照中的表的总数。 | |
| 快照必须复制的表数。 | |
| 快照是否已启动。 | |
| 快照是否已暂停。 | |
| 快照是中止的。 | |
| 快照是否完成。 | |
| 快照到目前为止需要的秒数,即使未完成也是如此。也包括快照暂停的时间。 | |
| 快照暂停的秒数。如果快照暂停了多次,暂停的时间会增加。 | |
| 包含快照中每个表扫描的行数的映射。在处理过程中,表会以递增方式添加到映射中。更新每 10,000 行扫描并完成表后。 | |
|
队列的最大数量(以字节为单位)。如果将 | |
| 队列中记录的当前卷(以字节为单位)。 |
连接器还会在执行增量快照时提供以下附加快照指标:
属性 | 类型 | 描述 |
---|---|---|
| 当前快照块的标识符。 | |
| 定义当前块的主密钥集的低限。 | |
| 定义当前块的主密钥集的上限。 | |
| 当前快照表的主密钥集合的低限。 | |
| 当前快照表的主键集合的上限。 |
2.5.7.3. Debezium Oracle 连接器流指标
MBean 为 debezium.oracle:type=connector-metrics,context=streaming,server= <topic.prefix>
。
下表列出了可用的流指标。
属性 | 类型 | 描述 |
---|---|---|
| 连接器读取的最后一个流事件。 | |
| 因为连接器已读取和处理最新事件,因此毫秒数。 | |
| 源数据库自上次连接器启动后报告的数据更改事件总数,或者因为指标重置后。代表 Debezium 处理的数据更改工作负载。 | |
| 自上次启动或指标重置以来,连接器处理的创建事件总数。 | |
| 自上次启动或指标重置以来,连接器处理的更新事件总数。 | |
| 自上次启动或指标重置后,连接器处理的删除事件总数。 | |
| 已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。 | |
| 连接器捕获的表列表。 | |
| 在流器和主 Kafka Connect 循环之间传递事件的队列长度。 | |
| 用于在流程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。 | |
| 表示连接器当前是否已连接到数据库服务器的标记。 | |
| 最后一次更改事件的时间戳和连接器处理之间的毫秒数。这些值将纳入运行数据库服务器和连接器的机器上时钟之间的差别。 | |
| 已提交的已处理事务的数量。 | |
| 最后一次接收的事件的协调。 | |
| 最后一次处理的事务的事务标识符。 | |
|
队列的最大数量(以字节为单位)。如果将 | |
| 队列中记录的当前卷(以字节为单位)。 |
Debezium Oracle 连接器还提供以下额外的流指标:
属性 | 类型 | 描述 |
---|---|---|
| 已处理的最新系统更改号。 | |
| 事务缓冲区中最旧的系统更改号。 | |
|
最旧的系统更改号(以毫秒为单位)。如果缓冲区为空,则该值将是 | |
| 最后提交的系统更改事务缓冲区中的数字。 | |
| 系统更改号当前写入连接器的偏移量。 | |
| 当前减去的日志文件数组。 | |
| 为任何 LogMiner 会话指定的最小日志数。 | |
| 为任何 LogMiner 会话指定的最大日志数。 | |
|
每个 mined logfile 的当前状态数组,格式为 file | |
| 数据库执行最近一天的日志切换的次数。 | |
| 最后的 LogMiner 会话查询中观察到的 DML 操作数量。 | |
| 处理单个 LogMiner 会话查询时观察到的最大 DML 操作数。 | |
| 观察到的 DML 操作总数。 | |
| 执行的 LogMiner 会话查询(也称为批处理)的总数。 | |
| 最后一次 LogMiner 会话查询的持续时间(以毫秒为单位)。 | |
| 任何 LogMiner 会话查询的最长持续时间(以毫秒为单位)。 | |
| 处理最后一个 LogMiner 查询批处理的持续时间会导致毫秒。 | |
| 解析 DML 事件 SQL 语句的时间(毫秒)。 | |
| 启动最后一个 LogMiner 会话的持续时间(毫秒)。 | |
| 启动 LogMiner 会话的最长时间,以毫秒为单位。 | |
| 连接器启动 LogMiner 会话的总持续时间(毫秒)。 | |
| 单一 LogMiner 会话处理结果的最短持续时间(以毫秒为单位)。 | |
| 单一 LogMiner 会话处理结果的最大持续时间(以毫秒为单位)。 | |
| LogMiner 会话处理结果的总持续时间(毫秒)。 | |
| JDBC 驱动程序所花费的总持续时间(毫秒)从 log mining 视图获取下一行。 | |
| 从日志 mining 视图中处理的所有会话的行总数。 | |
| log mining query per database round-trip 获取的条目数。 | |
| 从日志 mining 视图获取另一批结果前,连接器睡眠的毫秒数。 | |
| 从 log mining 视图处理的最大行/秒数。 | |
| 从日志 mining 处理的平均行/秒数。 | |
| 从上一次批处理的 log mining 视图中处理的平均行/秒数。 | |
| 检测到的连接问题数量。 | |
|
在被丢弃前,事务被连接器的内存中缓冲区保留的小时数,而不提交或回滚。如需更多信息,请参阅 | |
| 事务缓冲区中当前活动事务的数量。 | |
| 事务缓冲区中提交事务的数量。 | |
|
丢弃的事务数量,因为其大小超过 | |
| 事务缓冲区中回滚事务的数量。 | |
| 在提交的事务中回滚的事件数量,这意味着大多数用例中的约束违反情况。 | |
| 事务缓冲区中每秒提交事务的平均数量。 | |
| 事务缓冲区中注册的 DML 操作数量。 | |
| 事务日志中变化和添加到事务缓冲区时的时间差(毫秒)。 | |
| 事务日志中变化和添加到事务缓冲区时的最大时间差(毫秒)。 | |
| 事务日志中变化和添加到事务缓冲区时的最小时间差异(毫秒)。 | |
|
由于其年龄,从事务缓冲区中删除的最新带外事务标识符的数组。详情请查看 | |
| 带外 事务 列表中当前条目数。 | |
| 最近一次事务标识符的数组,已在事务缓冲区中减和回滚。 | |
| 最后一次事务缓冲区提交操作的持续时间(以毫秒为单位)。 | |
| 最长事务缓冲区提交操作的持续时间(以毫秒为单位)。 | |
| 检测到的错误数量。 | |
| 检测到的警告数量。 | |
|
检查系统更改号以进行改进并保持不变的次数。高的值可能会表示持续运行较长的事务,并阻止连接器清除最近处理的系统更改号到连接器的偏移量。当条件为最佳时,该值应接近或等于 | |
|
检测到的 DDL 记录数量,但无法被 DDL 解析器解析。这应该始终为 | |
| 当前最小会话的用户全局区域(UGA)内存消耗(以字节为单位)。 | |
| 所有 mining 会话的用户全局区域(UGA)内存消耗上限(以字节为单位)。 | |
| 当前 mining 会话的进程全局区域(PGA)内存消耗(以字节为单位)。 | |
| 所有 mining 会话的进程全局区域(PGA)内存消耗上限(以字节为单位)。 |
2.5.7.4. Debezium Oracle 连接器模式历史记录指标
MBean 为 debezium.oracle:type=connector-metrics,context=schema-history,server= <topic.prefix>
。
下表列出了可用的模式历史记录指标。
属性 | 类型 | 描述 |
---|---|---|
|
| |
| 恢复开始的时间(以 epoch 秒为单位)。 | |
| 在恢复阶段读取的更改数量。 | |
| 恢复和运行时应用的模式更改总数。 | |
| 自上次更改从历史记录存储中恢复后经过的毫秒数。 | |
| 从上次更改被应用后经过的毫秒数。 | |
| 从历史记录存储中恢复最后一次更改的字符串表示。 | |
| 最后一次应用更改的字符串表示。 |
2.5.8. 在 Debezium 中使用 Oracle XStream 数据库(开发者预览)
Debezium Oracle 连接器默认使用原生 Oracle LogMiner 更改。连接器可以被切换为使用 Oracle XStream。要将连接器配置为使用 Oracle XStream,您必须应用与 LogMiner 搭配使用的特定数据库和连接器配置。
将 Debezium Oracle 连接器与 XStream 搭配使用是一个开发者预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。此软件可能没有任何文档,可以随时更改或删除,并且已获得有限的测试。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。
有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围。
先决条件
- 要使用 XStream API,您必须有黄金产品许可证。不需要安装 GoldenGate。
2.5.8.1. 准备用于 Debezium 的 Oracle XStream 数据库
Oracle XStream 所需的配置
ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog CONNECT sys/top_secret AS SYSDBA alter system set db_recovery_file_dest_size = 5G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; alter system set enable_goldengate_replication=true; shutdown immediate startup mount alter database archivelog; alter database open; -- Should show "Database log mode: Archive Mode" archive log list exit;
此外,必须为捕获的表或数据库启用补充日志记录,以便数据更改捕获更改数据库行之前的状态。下面演示如何在特定表中配置此功能,这是尽可能减少 Oracle 红色日志中捕获的信息量的理想选择。
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
2.5.8.2. 为 Debezium Oracle 连接器创建 XStream 用户
Debezium Oracle 连接器要求使用特定权限设置用户帐户,以便连接器可以捕获更改事件。以下示例提供有关如何在多租户数据库模型中创建用户配置的信息。
2.5.8.2.1. 为 Debezium Oracle 连接器创建 XStream 管理员
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE USER c##dbzadmin IDENTIFIED BY dbz DEFAULT TABLESPACE xstream_adm_tbs QUOTA UNLIMITED ON xstream_adm_tbs CONTAINER=ALL; GRANT CREATE SESSION, SET CONTAINER TO c##dbzadmin CONTAINER=ALL; BEGIN DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'c##dbzadmin', privilege_type => 'CAPTURE', grant_select_privileges => TRUE, container => 'ALL' ); END; / exit;
2.5.8.2.2. 为 Debezium Oracle 连接器创建 XStream 用户
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs CONTAINER=ALL; GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; exit;
2.5.8.3. 创建 XStream 出站服务器
创建 XStream Outbound Server
sqlplus c##dbzadmin/dbz@//localhost:1521/ORCLCDB DECLARE tables DBMS_UTILITY.UNCL_ARRAY; schemas DBMS_UTILITY.UNCL_ARRAY; BEGIN tables(1) := NULL; schemas(1) := 'debezium'; DBMS_XSTREAM_ADM.CREATE_OUTBOUND( server_name => 'dbzxout', table_names => tables, schema_names => schemas); END; / exit;
当您将 XStream Outbound Server 设置为捕获来自可插拔数据库的更改时,请将可插拔数据库名称指定为 source_container_name
参数的值。
配置 XStream 用户帐户以连接到 XStream Outbound Server
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba BEGIN DBMS_XSTREAM_ADM.ALTER_OUTBOUND( server_name => 'dbzxout', connect_user => 'c##dbzuser'); END; / exit;
单个 XStream Outbound 服务器无法由多个 Debezium Oracle 连接器共享。每个连接器都需要配置唯一的 XStream Outbound 连接器。
2.5.8.4. 将 Debezium 配置为使用 XStream 适配器
默认情况下,Debezium 使用 Oracle LogMiner 从 Oracle 中获取最新的更改事件。您可以调整连接器配置,以启用连接器以使用 Oracle XStream 适配器。
以下配置示例添加了属性 database.connection.adapter
和 database.out.server.name
,以启用连接器以使用 XStream API 实现。
{ "name": "inventory-connector", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "tasks.max" : "1", "topic.prefix" : "server1", "database.hostname" : "<oracle ip>", "database.port" : "1521", "database.user" : "c##dbzuser", "database.password" : "dbz", "database.dbname" : "ORCLCDB", "database.pdb.name" : "ORCLPDB1", "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "database.connection.adapter": "xstream", "database.out.server.name" : "dbzxout" } }
2.5.8.5. 获取 Oracle JDBC 驱动程序和 XStream API 文件
Debezium Oracle 连接器需要 Oracle JDBC 驱动程序(ojdbc11.jar
)来连接到 Oracle 数据库。如果连接器使用 XStream 来访问数据库,还必须有 XStream API (xstreams.jar
)。许可要求禁止 Debezium 在 Oracle 连接器存档中包含这些文件。但是,所需文件可作为 Oracle Instant Client 的一部分提供。以下步骤描述了如何下载 Oracle Instant Client 并提取所需的文件。
流程
- 在浏览器中为您的操作系统下载 Oracle Instant Client 软件包。
提取存档,然后打开
instantclient_<version>
目录。例如:
instantclient_21_1/ ├── adrci ├── BASIC_LITE_LICENSE ├── BASIC_LITE_README ├── genezi ├── libclntshcore.so -> libclntshcore.so.21.1 ├── libclntshcore.so.12.1 -> libclntshcore.so.21.1 ... ├── ojdbc11.jar ├── ucp.jar ├── uidrvci └── xstreams.jar
-
复制
ojdbc11.jar
和xstreams.jar
文件,并将它们添加到 <kafka_home> /libs
目录中,如kafka/libs
。 创建环境变量
LD_LIBRARY_PATH
,并将其值设为 Instant Client 目录的路径,例如:LD_LIBRARY_PATH=/path/to/instant_client/
2.5.8.6. xstream 连接器属性
使用 XStream 时 需要以下配置属性,除非默认值可用。
属性 | 默认 | 描述 |
没有默认值 | 在数据库中配置的 XStream 出站服务器的名称。 |
2.5.8.7. Oracle Xstream 和 DBMS_LOB
软件包
Oracle 提供名为 DBMS_LOB
的数据库软件包,它由一系列程序组成,用于操作 BLOB、CLOB 和 NCLOB 列。大多数程序都以总计性操作 LOB 列,但一个程序 WRITEAPPEND
能够操作 LOB 数据缓冲区的子集。
使用 XStream 时,WRITEAPPEND
会为每个程序调用发送逻辑更改记录(LCR)事件。这些 LCR 事件不会合并到一个更改事件中,因为它们在使用 Oracle LogMiner 适配器时。因此,主题的用户可能会接收带有部分列值的事件。
2.5.9. Oracle 连接器常见问题
- 是否支持 Oracle 11g?
- 不支持 Oracle 11g,但我们的目标是以尽力为基础与 Oracle 11g 向后兼容。我们依赖社区与 Oracle 11g 沟通兼容性问题,并在识别回归时提供 bug 修复。
- Oracle LogMiner 弃用了吗?
- 否,Oracle 只弃用了 Oracle LogMiner 中的 Oracle LogMiner 选项,并从 Oracle 19c 开始删除该选项。Debezium Oracle 连接器不依赖于这个选项来正常工作,因此可以安全地与较新的 Oracle 版本一起使用,而不影响。
- 如何更改偏移中的位置?
Debezium Oracle 连接器在偏移中维护两个关键值,一个名为
scn
的字段,另一个名为commit_scn
。scn
字段是一个字符串,代表捕获更改时连接器的低水位线开始位置。-
查找包含连接器偏移的主题名称。这根据设为
offset.storage.topic
配置属性的值进行配置。 查找连接器的最后偏移量,其下存储了密钥,并确定用于存储偏移的分区。这可以通过 Kafka 代理安装提供的
kafkacat
工具脚本完成。一个示例可能类似如下:kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n' Partition(11) ["inventory-connector",{"server":"server1"}] {"scn":"324567897", "commit_scn":"324567897: 0x2832343233323:1"}
inventory-connector
的键是["inventory-connector",{"server":"server1"}]
,分区是11
,最后一个偏移是键跟随的内容。要移回以前的偏移偏移,应停止连接器,必须发出以下命令:
echo '["inventory-connector",{"server":"server1"}]|{"scn":"3245675000","commit_scn":"324567500"}' | \ kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11
这会写入 my_connect_offsets 主题的
my_connect_offsets
主题的分区11
,即给定的键和偏移值。在本例中,我们将连接器重新定向到 SCN3245675000
,而不是324567897
。
-
查找包含连接器偏移的主题名称。这根据设为
- 如果连接器无法使用给定偏移 SCN 查找日志,会发生什么情况?
Debezium 连接器在连接器偏移中维护低和高水位线 SCN 值。low-watermark SCN 代表起始位置,必须存在于可用的在线红色或存档日志中,以便连接器成功启动。当连接器报告无法找到这个偏移 SCN 时,这表示仍可用的日志不包含 SCN,因此连接器无法从其离开的地方清除更改。
发生这种情况时,有两个选项。第一个是删除连接器的历史记录主题和偏移量,并建议重启连接器。这将保证任何主题消费者都不会发生数据丢失。第二个是手动操作偏移,将 SCN 传播到红色或存档日志中可用的位置。这将导致在旧的 SCN 值和新提供的 SCN 值之间发生的更改丢失,且不会写入主题。不建议这样做。
- 各种 mining 策略之间的区别是什么?
Debezium Oracle 连接器为
log.mining.strategy
提供了三个选项。默认为
online_catalog
,这指示连接器不会将数据字典写入红色日志。相反,Oracle LogMiner 将始终使用包含表结构当前状态的在线数据字典。这也意味着,如果表的结构更改,并且不再匹配在线数据字典,如果表的结构发生更改,Oracle LogMiner 将无法解析表或列名称。如果捕获的表可能会有频繁的模式更改,则不应使用此 mining 策略选项。务必要确保所有数据更改都用 schema 更改进行锁定,以便所有更改都已从表的日志中捕获,停止连接器,应用 schema 更改,然后重新启动连接器并恢复表上的数据更改。这个选项需要较少的 Oracle 数据库内存和 Oracle LogMiner 会话,通常启动非常快,因为数据字典不需要被 LogMiner 进程加载或主要。第二个选项
redo_log_catalog
每次检测到日志交换机时,将 Oracle 数据字典写入红色日志。Oracle LogMiner 在解析 redo 和 archive 日志时,这个数据字典需要有效跟踪模式更改。这个选项将生成比归档日志的通常数多,但允许捕获的表实时操作,而不影响捕获数据更改。这个选项通常需要更多 Oracle 数据库内存,并且将导致 Oracle LogMiner 会话和进程在每次日志切换后启动稍有更长的时间。最终选项
混合
结合了上述两个策略的优点和其弱点。此策略利用了online_catalog
的性能,在对redo_log_catalog
的模式跟踪方面具有弹性,同时避免了比普通存档日志生成更高的开销和性能成本。这个模式使用回退模式,如果 LogMiner 无法重建数据库更改的 SQL,Debezium 连接器将依赖于连接器维护的内存中模式模型来重建 SQL in-flight。其目的是,此模式最终将转换到默认值,并可能在以后只有操作模式。- 使用 LogMiner 对 Hybrid mining 策略存在任何限制?
-
是的,
log.mining.strategy
的 Hybrid 模式仍然是 work-in-progress 策略,因此还不支持所有数据类型。目前,这个模式无法重建 SQL 语句,其中包括针对CLOB
、NCLOB
、BLOB
、XML
或JSON
数据类型的操作。因此,如果您启用了值为true
的lob.enabled
,则无法使用混合策略,连接器将无法启动,因为不支持此组合。 - 为什么连接器会出现停止捕获 AWS 上的更改?
由于 AWS 网关负载平衡器上 350 秒的固定空闲超时,需要超过 350 秒的 JDBC 调用可以无限期地挂起。
如果调用 Oracle LogMiner API 完成超过 350 秒时,可能会触发超时,从而导致 AWS 网关负载平衡器挂起。例如,当 LogMiner 会话处理大量数据与 Oracle 的定期检查点任务同时运行时,可能会发生这样的超时。
要防止在 AWS Gateway Load Balancer 上发生超时的情况,请通过以 root 或 super-user 用户身份执行以下步骤来启用来自 Kafka Connect 环境的 keep-alive 数据包:
在终端中运行以下命令:
sysctl -w net.ipv4.tcp_keepalive_time=60
编辑
/etc/sysctl.conf
并设置以下变量的值,如下所示:net.ipv4.tcp_keepalive_time=60
重新配置用于 Oracle 连接器的 Debezium 以使用
database.url
属性而不是database.hostname
,并添加(ENABLE=broken)
Oracle 连接字符串描述符,如下例所示:database.url=jdbc:oracle:thin:username/password!@(DESCRIPTION=(ENABLE=broken)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(Host=hostname)(Port=port)))(CONNECT_DATA=(SERVICE_NAME=serviceName)))
前面的步骤将 TCP 网络堆栈配置为每 60 秒发送 keep-alive 数据包。因此,当 JDBC 调用 LogMiner API 完成 350 秒时,AWS Gateway Load Balancer 不会超时,使连接器能够继续从数据库事务日志中读取更改。
- ORA-01555 的原因是什么?如何处理它?
Debezium Oracle 连接器在初始快照阶段执行时使用闪存查询。闪存查询是一种特殊的查询类型,它依赖于闪存区域,由数据库的
UNDO_RETENTION
数据库参数维护,以根据表的内容在给定时间或给定 SCN 中返回查询的结果。默认情况下,Oracle 通常只维护一个撤消或闪存区域大约 15 分钟,除非数据库管理员已增加或减少。对于捕获大型表的配置,可能需要超过 15 分钟,或者配置了UNDO_RETENTION
来执行初始快照,这最终会导致这个例外:ORA-01555: snapshot too old: rollback segment number 12345 with name "_SYSSMU11_1234567890$" too small
处理这个例外的第一个方法是与您的数据库管理员合作,并查看它们是否可以临时增加
UNDO_RETENTION
数据库参数。这不需要重新启动 Oracle 数据库,因此这可在不影响数据库可用性的情况下在线完成。但是,如果表空间不足以存储所需的撤消数据,则更改可能会导致上述异常或"快照过旧"异常。处理这个异常的第二个方法是根本不依赖于初始快照,将
snapshot.mode
设置为no_data
,然后依赖于增量快照。增量快照不依赖于闪存查询,因此不受 ORA-01555 例外的约束。- ORA-04036 的原因是什么?如何进行处理?
当数据库发生经常时,Debebe Oracle 连接器可能会报告 ORA-04036 异常。在检测到日志交换机前,启动并重复使用 Oracle LogMiner 会话。会话被重新使用,因为它提供与 Oracle LogMiner 的最佳性能利用,但如果发生长时间运行的 mining 会话,这可能会导致过量 PGA 内存用量,最终导致例外:
ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMIT
通过指定 Oracle 交换机红色日志的频率或 Debezium Oracle 连接器可以重新使用 mining 会话来避免这个例外。Debezium Oracle 连接器提供了一个配置选项
log.mining.session.max.ms
,它控制当前 Oracle LogMiner 会话在关闭和启动新会话的时长。这允许在不超过数据库允许的 PGA 内存的情况下保留数据库资源。- ORA-01882 的原因是什么,以及如何处理它?
Debezium Oracle 连接器在连接到 Oracle 数据库时可能会报告以下异常:
ORA-01882: timezone region not found
当时区信息无法被 JDBC 驱动程序正确解析时,会出现这种情况。为了解决这个问题,需要告知驱动程序无法使用区域解析时区详情。这可以通过使用 driver.
oracle.jdbc.timezoneAsRegion=false 指定驱动程序
pass through 属性来实现。- ORA-25191 和如何处理它的原因是什么?
Debezium Oracle 连接器会自动忽略 index-organized 表(IOT),因为 Oracle LogMiner 不支持它们。但是,如果抛出 ORA-25191 异常,这可能是由于映射的唯一情况,并且可能需要额外规则来自动排除这些。ORA-25191 异常示例可能类似如下:
ORA-25191: cannot reference overflow table of an index-organized table
如果抛出 ORA-25191 异常,请引发 JIRA 问题,以及表及其与其他父表相关的映射等。作为临时解决方案,可以调整 include/exclude 配置选项,以防止连接器访问这些表。
- 如何解决 SAX 功能外部识别 - 不支持
使用 Debezium Oracle 连接器的
XMLTYPE
作为技术预览提供。要使用这个功能,需要 Oraclexdb
和xmlparserv2
依赖项。
Oracle 的xmlparserv2
依赖项实现了基于 SAX 的解析器,如果运行时发现使用了这个实现的解析程序,则在 classpath 上会发生这个错误。为了影响一般使用 SAX 的实施,需要以特定参数启动 JVM。
提供以下 JVM 参数后,Oracle 连接器将成功启动,而不会出现这个错误。-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl