3.2. Debezium Db2 连接器如何工作
为了最佳配置和运行 Debezium Db2 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称以及处理架构更改会很有帮助。
详情包括在以下主题中:
3.2.1. Debezium Db2 连接器如何执行数据库快照
Db2 的复制功能不是存储数据库更改的完整历史记录。因此,当 Debezium Db2 连接器第一次连接到数据库时,它会生成处于捕获模式的表快照,并将这个状态流传输到 Kafka。这会为表内容建立基准。
默认情况下,当 Db2 连接器执行快照时,它会执行以下操作:
-
确定哪些表处于捕获模式,因此必须包含在快照中。默认情况下,所有非系统表都处于捕获模式。连接器配置属性,如
table.exclude.list
和table.include.list
,允许您指定哪些表应处于捕获模式。 -
以捕获模式获取每个表的锁定。这样可确保在快照期间不会在这些表中发生架构更改。锁定的级别由
snapshot.isolation.mode
连接器配置属性决定。 - 在服务器的事务日志中读取最高(最新)LSN 位置。
- 捕获处于捕获模式的所有表的模式。连接器在其内部数据库模式历史记录主题中保留此信息。
- 可选,释放在第 2 步中获取的锁定。通常,这些锁定只会在短时间内保留。
在第 3 步中读取的 LSN 位置中,连接器会扫描捕获模式表及其模式。在扫描过程中,连接器:
- 确认在快照开始前创建了表。如果没有,则快照将跳过该表。快照完成后,连接器开始发出更改事件,连接器会为快照期间创建的任何表生成更改事件。
- 为处于捕获模式的每个行生成 读取 事件。所有 读取 事件都包含相同的 LSN 位置,这是在第 3 步中获取的 LSN 位置。
- 将每个 读取 事件发送到与表同名的 Kafka 主题。
- 在连接器偏移中记录成功完成快照。
3.2.1.1. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,遵循这个初始快照,连接器不会重复快照过程。连接器捕获的任何将来更改事件数据都仅通过流传输过程。
然而,在某些情况下,连接器在初始快照期间获取的数据可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能是执行临时快照:
- 连接器配置会被修改为捕获不同的表集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时命令快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用自动主题创建,Debezium 可以自动创建主题。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q2/html-single/debezium_user_guide/index#customization-of-kafka-connect-automatic-topic-creation
临时快照信号指定要包含在快照中的表。快照可以捕获数据库的整个内容,或者只捕获数据库中表的子集。另外,快照也可以捕获数据库中表内容的子集。
您可以通过向信号表发送 execute-snapshot
消息来指定要捕获的表。将 execute-snapshot
信号的类型设置为增量
,并提供快照中包含的表名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| 不适用 |
包含与要快照的表的完全限定域名匹配的正则表达式的数组。 |
| 不适用 | 可选字符串,它根据表的列指定条件,用于捕获表内容的子集。 |
触发临时快照
您可以通过在信号表中添加带有 execute-snapshot
信号类型的条目来启动临时快照。连接器处理消息后,它会开始快照操作。快照过程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和端点。根据表中的条目数量以及配置的块大小,Debezium 将表分成块,并一次为每个块进行快照。
3.2.1.2. 增量快照
为了提供管理快照的灵活性,Debezium 包括一个补充的快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,而不是一次性捕获数据库的完整状态,如初始快照,Debezium 以一系列可配置的块的形式捕获每个表。您可以指定您希望快照捕获的表,以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1 KB。
当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护它捕获的每一个表行的记录。这个阶段捕获数据的方法比标准初始快照过程提供以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是经过发布流,直到快照完成为止。连接器会继续在整个快照过程中从更改日志捕获接近实时事件,且操作都不会阻断其他事件。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间开始,而不是从开始重新定义表。
-
您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其
table.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 按主密钥对表进行排序,然后根据 配置的块大小 将表分成块。然后,按块使用块,然后在块中捕获每个表行。对于它捕获的每行,快照会发出 READ
事件。该事件代表了块开始快照时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,从而可能会修改表记录。要反映此类更改,INSERT
、UPDATE
或 DELETE
操作会照常提交到事务日志中。同样,持续 Debezium 流过程还会继续检测这些更改事件,并将对应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥解决记录间的冲突
在某些情况下,流传输进程发出的 UPDATE
或 DELETE
事件会耗尽序列。也就是说,流过程可能会发出一个事件,它会在快照捕获包含该行的 READ
事件之前修改表行。当快照最终会为行发出对应的 READ
事件时,其值已经被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debezium 会使用一种缓冲区方案来解决冲突。只有在快照事件和流事件间冲突时才解决后,Debezium 会向 Kafka 发送事件记录。
快照窗口
为了帮助解决后续 READ
事件和修改同一表行的流事件之间的冲突,Debezium 采用所谓的 快照窗口。快照窗口分离了增量快照捕获指定表块的数据的时间间隔。在打开块的快照窗口前,Debebe 会遵循其常见的行为,并将事件直接从事务日志直接降级到目标 Kafka 主题。但是,从打开特定块的快照时,Debebe 会执行重复数据删除步骤,以解决具有相同主密钥的事件之间冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ
操作发出。同时,当用户继续更新数据收集中的记录,并更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会将快照记录提供给内存缓冲区。在快照窗口中,缓冲区中 READ
事件的主键将与传入流事件的主键进行比较。如果没有找到匹配项,流的事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流的记录写入目标主题,因为流的事件逻辑地替换静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ
事件。Debezium 将这些剩余的 READ
事件发送到表的 Kafka 主题。
连接器重复每个快照块的进程。
Db2 的 Debezium 连接器不支持增量快照运行时的 schema 更改。
3.2.1.3. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号发送临时快照 信号。
您可以以 SQL INSERT
查询形式向信号表提交信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,以及可选的指定快照操作类型。目前,快照操作的唯一有效选项是默认值 增量
。
要指定快照中包含的表,请提供一个 data-collections
数组,该数组列出了用于匹配表的表或一组正则表达式,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不执行快照。
如果要包含在快照中的表名称包含数据库、模式或表名称的句点(.
),要将表添加到 data-collections
数组中,您必须用双引号转义名称的每个部分。
例如,若要包含 公共
模式中存在的表,并且名为 My.Table
,请使用以下格式:" public"."My.Table
"。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询,将临时增量快照请求添加到信号表中:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}, 5 "additional-condition":"color=blue"}'); 6
命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 3.2. 向信号表发送增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。相反,在快照中,Debebe 会生成自己的id
字符串作为水位信号。3
execute-snapshot
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。5
增量
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效选项是默认值增量
。
如果没有指定值,连接器将运行一个增量快照。6
additional-condition
可选字符串,它根据表的列指定条件,用于捕获表内容的子集。有关
additional-condition
参数的详情请参考带有额外条件
的临时增量快照。
带有额外条件
的临时增量快照
如果您希望快照在表中仅包含内容子集,您可以通过将 additional-condition
参数附加到快照信号来修改信号请求。
典型的快照的 SQL 查询采用以下格式:
SELECT * FROM <tableName> ....
通过添加 additional-condition
参数,您可以将 WHERE
条件附加到 SQL 查询中,如下例所示:
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,假设您有一个包含以下列的产品表:
-
id
(主密钥) -
color
-
quantity
如果您希望 product 表的增量快照只包含 color=blue
的数据项,您可以使用以下 SQL 语句来触发快照:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含那些项目的 data =blue
和 quantity>10
:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件信息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
3.2.1.4. 停止增量快照
您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT
查询,向表提交停止快照信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行时停止增量快照操作。
您提交的查询指定了 增量
的快照操作,以及可选的、要删除的当前运行快照的表。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询以停止临时增量快照到信号表:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}'); 5
signal 命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 3.3. 向信号表发送停止增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。3
stop-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。如果省略了data
字段的此组件,则信号将停止正在进行的整个增量快照。5
增量
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效选项是增量的
。
如果没有指定类型
值,则信号无法停止增量快照。
3.2.2. Debezium Db2 连接器如何读取更改数据表
完成快照后,当 Debezium Db2 连接器首次启动后,连接器会标识处于捕获模式的每个源表的 change-data 表。连接器为每个 change-data 表执行以下操作:
- 读取上次存储、最高 LSN 和当前最高 LSN 之间创建的更改事件。
- 根据提交 LSN 和每个事件的更改 LSN 排序更改事件。这样可确保连接器按表更改的顺序发出更改事件。
- 将提交并更改 LSNs 作为偏移到 Kafka Connect。
- 存储连接器传递给 Kafka Connect 的最高 LSN。
重启后,连接器会从关闭的偏移(提交并更改 LSN)中恢复发出更改事件。当连接器正在运行并发出更改事件时,如果您从捕获模式中删除表,或者添加表来捕获模式,连接器会检测到更改并相应地修改其行为。
3.2.3. 接收 Debezium Db2 更改事件记录的 Kafka 主题默认名称
默认情况下,Db2 连接器将表中发生的所有 INSERT
、UPDATE
和 DELETE
操作的更改事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:
topicPrefix.schemaName.tableName
以下列表提供默认名称组件的定义:
- topicPrefix
-
由
topic.prefix
连接器配置属性指定的主题前缀。 - schemaName
- 在其中操作的 schema 的名称。
- tableName
- 操作发生的表的名称。
例如,一个使用 mydatabase
数据库的 Db2 安装,其中包含四个表:PRODUCTS
, PRODUCTS_ON_HAND
, CUSTOMERS
, 和 ORDERS
,它们包括在 MYSCHEMA
schema 中。连接器会将事件发送到这四个 Kafka 主题:
-
mydatabase.MYSCHEMA.PRODUCTS
-
mydatabase.MYSCHEMA.PRODUCTS_ON_HAND
-
mydatabase.MYSCHEMA.CUSTOMERS
-
mydatabase.MYSCHEMA.ORDERS
连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 以及 事务元数据主题。
如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由。
3.2.4. 关于 Debezium Db2 连接器模式更改主题
您可以配置 Debezium Db2 连接器来生成 schema 更改事件,该事件描述了应用于数据库中捕获的表的 schema 更改。
Debezium 在以下情况下向 schema 更改主题发送信息:
- 新表进入捕获模式。
- 表已从捕获模式中删除。
- 在 数据库 schema 更新过程中,表在捕获模式下有变化。
连接器将模式更改事件改为 Kafka 模式更改主题,其名称为 < topicPrefix&
gt;,其中 < ;topicPrefix
> 是 topic.prefix
connector 配置属性中指定的主题前缀。连接器发送到 schema 更改主题的消息包含以下元素的有效负载:
databaseName
-
将语句应用到的数据库的名称。
databaseName
的值充当 message 键。 pos
- 出现语句的 binlog 中的位置。
tableChanges
-
模式更改后整个表架构的结构化表示。
tableChanges
字段包含一个数组,其中包含表中的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析程序处理它们。
对于处于捕获模式的表,连接器不仅将架构更改的历史记录存储在架构更改主题中,也存储在内部数据库架构历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要有关架构的通知的应用程序只消耗了该模式的更改主题的信息。
永不对数据库架构历史记录进行分区。要使数据库架构历史记录主题正常工作,它必须维护连接器向其发出的事件记录的一致性全局顺序。
要确保主题不在分区中分割,请使用以下方法之一为主题设置分区计数:
-
如果您手动创建数据库模式历史记录主题,请指定分区计数
1
。 -
如果您使用 Apache Kafka 代理自动创建数据库架构历史记录主题,则创建主题,将 Kafka
num.partitions
配置选项 的值设置为1
。
连接器发送到其架构更改主题的消息格式处于 incubating 状态,在没有通知的情况下可能会改变。
示例:发送到 Db2 连接器模式更改主题的消息
以下示例显示了 schema 更改主题中的消息。消息包含表模式的逻辑表示。
{ "schema": { ... }, "payload": { "source": { "version": "2.1.4.Final", "connector": "db2", "name": "db2", "ts_ms": 0, "snapshot": "true", "db": "testdb", "schema": "DB2INST1", "table": "CUSTOMERS", "change_lsn": null, "commit_lsn": "00000025:00000d98:00a2", "event_serial_no": null }, "ts_ms": 1588252618953, 1 "databaseName": "TESTDB", 2 "schemaName": "DB2INST1", "ddl": null, 3 "tableChanges": [ 4 { "type": "CREATE", 5 "id": "\"DB2INST1\".\"CUSTOMERS\"", 6 "table": { 7 "defaultCharsetName": null, "primaryKeyColumnNames": [ 8 "ID" ], "columns": [ 9 { "name": "ID", "jdbcType": 4, "nativeType": null, "typeName": "int identity", "typeExpression": "int identity", "charsetName": null, "length": 10, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "FIRST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "LAST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "EMAIL", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 10 { "customAttribute": "attributeValue" } ] } } ] } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在源对象中,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。 |
2 |
| 标识包含更改的数据库和 schema。 |
3 |
|
对于 Db2 连接器,始终为 |
4 |
| 包含 DDL 命令生成的模式更改的一个或多个项目的数组。 |
5 |
| 描述更改的类型。该值是以下之一:
|
6 |
| 创建、更改或丢弃的表的完整标识符。 |
7 |
| 应用更改后代表表元数据。 |
8 |
| 编写表主密钥的列列表。 |
9 |
| 更改表中的每个列的元数据。 |
10 |
| 每个表更改的自定义属性元数据。 |
在连接器发送到 schema 更改主题的消息中,message 键是包含架构更改的数据库的名称。在以下示例中,payload
字段包含键:
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.db2.SchemaChangeKey" }, "payload": { "databaseName": "TESTDB" } }
3.2.5. Debezium Db2 连接器生成的事件代表事务边界
Debezium 可以生成代表事务边界的事件,并增强更改数据事件消息。
Debezium 仅在部署连接器后为发生的事务注册并接收元数据。部署连接器前发生的事务元数据。
Debezium 为每个事务生成 BEGIN
和 END
分隔符的事务边界事件。事务边界事件包含以下字段:
status
-
BEGIN
或END
。 id
- 唯一事务标识符的字符串表示。
ts_ms
-
数据源上事务边界事件(
BEGIN
或END
事件)的时间。如果数据源没有向 Debezium 提供事件时间,则该字段代表 Debezium 处理事件的时间。 event_count
(用于END
事件)- 事务所设计的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
元素的一组对,指示连接器为来自数据收集的更改发出的事件数。
示例
{ "status": "BEGIN", "id": "00000025:00000d08:0025", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "00000025:00000d08:0025", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "testDB.dbo.tablea", "event_count": 1 }, { "data_collection": "testDB.dbo.tableb", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则连接器会将事务事件发送到 < topic.prefix>
.transaction
主题。
数据更改事件增强
当启用事务元数据时,连接器会使用新的 transaction
字段增强更改事件 Envelope
。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串表示。
total_order
- 事件在事务生成的所有事件的绝对路径。
data_collection_order
- 事件在事务发送的所有事件中每个数据收集位置。
以下是消息的示例:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "00000025:00000d08:0025", "total_order": "1", "data_collection_order": "1" } }