9.2. Debezium SQL Server 连接器如何工作
为了优化配置和运行 Debezium SQL Server 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称并使用元数据非常有用。
有关连接器如何工作的详情,请查看以下部分:
- 第 9.2.1 节 “Debezium SQL Server 连接器如何执行数据库快照”
- 第 9.2.2 节 “临时快照”
- 第 9.2.3 节 “增量快照”
- 第 9.2.4 节 “Debezium SQL Server 连接器如何读取更改数据表”
- 第 9.2.7 节 “接收 Debezium SQL Server 更改事件的 Kafka 主题的默认名称”
- 第 9.2.9 节 “Debezium SQL Server 连接器如何使用 schema 更改主题”
- 第 9.2.10 节 “Debezium SQL Server 连接器数据更改事件的描述”
- 第 9.2.11 节 “Debezium SQL Server 连接器生成的事件代表事务边界”
9.2.1. Debezium SQL Server 连接器如何执行数据库快照
SQL Server CDC 的设计不是存储数据库更改的完整历史记录。对于 Debezium SQL Server 连接器,为数据库的当前状态建立基准,它使用名为 snapshotting 的进程。初始快照捕获数据库中表的结构和数据。
您可以在以下部分找到有关快照的更多信息:
Debezium SQL Server 连接器用来执行初始快照的默认工作流
以下工作流列出了 Debezium 创建快照所采取的步骤。这些步骤描述了当 snapshot.mode
配置属性设置为其默认值时(即 的初始
)时快照的流程。您可以通过更改 snapshot.mode
属性的值来自定义连接器创建快照的方式。如果您配置不同的快照模式,连接器使用这个工作流的修改版本完成快照。
- 建立与数据库的连接。
-
确定要捕获的表。默认情况下,连接器捕获所有非系统表。要让连接器捕获表或表元素的子集,您可以设置多个
include
和exclude
属性来过滤数据,如table.include.list
或table.exclude.list
。 -
在启用了 CDC 的 SQL Server 表上获得锁定,以防止在创建快照过程中发生结构更改。锁定的级别由
snapshot.isolation.mode
配置属性决定。 - 在服务器的事务日志中读取最大日志序列号(LSN)位置。
捕获所有非系统的结构,或者为捕获指定的所有表。连接器在其内部数据库模式历史记录主题中保留此信息。架构历史记录提供有关发生更改事件时生效的结构的信息。
注意默认情况下,连接器捕获数据库中每个表的模式,这些模式处于捕获模式,包括没有配置为捕获的表。如果没有为捕获配置表,则初始快照只捕获其结构;它不会捕获任何表数据。有关为什么没有包括在初始快照中的表的快照保留模式信息,请参阅 了解为什么初始快照捕获所有表的 schema。
- 如有必要,释放在第 3 步中获得的锁定。其他数据库客户端现在可以写入任何之前锁定的表。
在 LSN 分步读取时,连接器会扫描要捕获的表。在扫描过程中,连接器完成以下任务:
- 确认表已在快照开始前创建。如果表是在快照启动后创建的,连接器会跳过表。快照完成后,连接器过渡到 streaming,它会发出快照开始后创建的任何表的更改事件。
-
为从表获取的每行生成
读取
事件。所有读取
事件都包含相同的 LSN 位置,这是在第 4 步中获取的 LSN 位置。 -
将每个
读取
事件发送到表的 Kafka 主题。
- 在连接器偏移中记录快照成功完成。
生成的初始快照捕获了为 CDC 启用的表中每行的当前状态。在这个基准状态中,连接器会捕获后续更改。
在快照进程开始后,如果进程因为连接器失败、重新平衡或其他原因而中断,则进程会在连接器重启后重启。
连接器完成初始快照后,它会继续从在第 4 步中读取的位置进行流,使其不会错过任何更新。
如果连接器因为任何原因而再次停止,它会在重启后从之前关闭的位置恢复流更改。
9.2.1.1. 初始快照捕获所有表的 schema 历史记录的描述
连接器运行的初始快照捕获两种类型的信息:
- 表数据
-
在连接器的
table.include.list
属性中命名的表中的INSERT
、UPDATE
和DELETE
操作的信息。 - 模式数据
- 描述应用到表的结构更改的 DDL 语句。模式数据会保留给内部模式历史记录主题,以及连接器的 schema 更改主题(如果配置了)。
运行初始快照后,您可能会注意到快照捕获没有指定用于捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的每个表的模式信息,而不仅仅是从指定为捕获的表的表。连接器要求表的模式存在于架构历史记录主题中,然后才能捕获表。通过启用初始快照来捕获不是原始捕获集一部分的表的 schema 数据,Debebe 准备好连接器,以便稍后需要捕获这些表中的事件数据。如果初始快照没有捕获表的 schema,您必须将模式添加到历史记录主题,然后才能从表中捕获数据。
在某些情况下,您可能想要限制初始快照中的模式捕获。当您要减少完成快照所需的时间时,这非常有用。或者,当 Debezium 通过可访问多个逻辑数据库的用户帐户连接到数据库实例时,但您希望连接器只从特定逻辑数据库中的表捕获更改。
附加信息
- 从不是由初始快照捕获的表捕获数据(没有模式更改)
- 从不是由初始快照捕获的表捕获数据(应用程序更改)
-
设置
schema.history.internal.store.only.captured.tables.ddl
属性,以指定从中捕获模式信息的表。 -
设置
schema.history.internal.store.only.captured.databases.ddl
属性,以指定从中捕获模式更改的逻辑数据库。
9.2.1.2. 从不是由初始快照捕获的表捕获数据(没有模式更改)
在某些情况下,您可能希望连接器从其模式未被初始快照捕获的表中捕获数据。根据连接器配置,初始快照只能捕获数据库中特定表的表模式。如果历史记录主题中没有表模式,连接器将无法捕获表,并报告缺少的 schema 错误。
您可能仍然能够从表中捕获数据,但您必须执行额外的步骤来添加表模式。
前提条件
- 您希望从带有连接器在初始快照期间没有捕获的 schema 捕获数据。
- 没有模式更改应用于连接器读取的 LSN 和最新更改表条目之间的表。有关从具有结构性更改的新表中捕获数据的详情,请参考 第 3.2.1.3 节 “从不是由初始快照捕获的表捕获数据(应用程序更改)”。
流程
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 清除配置的 Kafka Connect
offset.storage.topic
中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答。警告删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,应仅作为最后的手段来执行。
对连接器配置应用以下更改:
(可选)将
schema.history.internal.captured.tables.ddl
的值设置为false
。此设置会导致快照捕获所有表的 schema,并保证以后可以重建所有表的 schema 历史记录。
注意捕获所有表的架构的快照需要更多时间来完成。
-
添加您希望连接器捕获至
table.include.list
的表。 将
snapshot.mode
设置为以下值之一:初始
-
重启连接器时,它会获取捕获表数据和表结构的数据库的完整快照。
如果您选择这个选项,请考虑将schema.history.internal.captured.tables.ddl
属性的值设置为false
,以便连接器捕获所有表的 schema。 schema_only
- 重启连接器时,它会获取仅捕获表模式的快照。与完整数据快照不同,这个选项不会捕获任何表数据。如果您要比使用完整快照更快地重启连接器,请使用这个选项。
-
重启连接器。连接器完成
snapshot.mode
指定的快照类型。 (可选)如果连接器执行了
schema_only
快照,在快照完成后,启动一个增量快照来 从您添加的表中捕获数据。连接器在继续从表中实时更改时运行快照。运行增量快照可捕获以下数据更改:- 对于之前捕获的连接器的表,增量 snapsot 捕获连接器停机时所发生的变化,即在连接器停止和当前重启之间的时间间隔。
- 对于新添加的表,增量快照会捕获所有现有表行。
9.2.1.3. 从不是由初始快照捕获的表捕获数据(应用程序更改)
如果架构更改应用到表,则在架构更改前提交的记录与更改后提交的不同结构不同。当 Debezium 从表中捕获数据时,它会读取 schema 历史记录,以确保它为每个事件应用正确的模式。如果 schema 历史记录主题中没有 schema,则连接器无法捕获表,并出现错误结果。
如果要从初始快照捕获的表中捕获数据,并且修改了表的 schema,则必须将模式添加到历史记录主题中(如果它还没有可用)。您可以通过运行新的模式快照或运行表的初始快照来添加模式。
前提条件
- 您希望从带有连接器在初始快照期间没有捕获的 schema 捕获数据。
- 架构更改应用于表,以便捕获的记录没有统一结构。
流程
- 初始快照捕获了所有表的模式(
storage.only.captured.tables.ddl
设置为false
) -
编辑
table.include.list
属性,以指定您要捕获的表。 - 重启连接器。
- 如果要从新添加的表中捕获现有数据,则启动 增量快照。
-
编辑
- 初始快照没有捕获所有表的模式(storage
.only.captured.tables.ddl
设置为true
) 如果初始快照没有保存您要捕获的表的模式,请完成以下步骤之一:
- 流程 1:架构快照,后跟增量快照
在此过程中,连接器首先执行 schema 快照。然后,您可以启动增量快照,使连接器能够同步数据。
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 清除配置的 Kafka Connect
offset.storage.topic
中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答。警告删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,应仅作为最后的手段来执行。
为连接器配置中的属性设置值,如以下步骤所述:
-
将
snapshot.mode
属性的值设置为schema_only
。 -
编辑
table.include.list
以添加您要捕获的表。
-
将
- 重启连接器。
- 等待 Debezium 捕获新表和现有表的模式。在连接器停止后发生任何表的数据更改不会被捕获。
- 为确保没有丢失数据,请启动 增量快照。
- 步骤 2:初始快照,后跟可选的增量快照
在此过程中,连接器执行数据库的完整初始快照。与任何初始快照一样,在具有多个大型表的数据库中,运行初始快照可能会非常耗时。快照完成后,您可以选择触发增量快照来捕获连接器离线时发生的任何更改。
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 清除配置的 Kafka Connect
offset.storage.topic
中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答。警告删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,应仅作为最后的手段来执行。
-
编辑
table.include.list
以添加您要捕获的表。 为连接器配置中的属性设置值,如以下步骤所述:
-
将
snapshot.mode
属性的值设置为initial
。 -
(可选)将
schema.history.internal.store.only.captured.tables.ddl
设置为false
。
-
将
- 重启连接器。连接器获取完整的数据库快照。快照完成后,连接器会过渡到 streaming。
- (可选)要捕获连接器离线时更改的任何数据,请启动 增量快照。
9.2.2. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,在这个初始快照后,连接器不会重复快照过程。连接器捕获的任何更改事件数据都只通过流处理。
然而,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:
- 连接器配置会被修改为捕获不同的表集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时快照来为之前捕获的表重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来发起临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。
临时快照信号指定要包含在快照中的表。快照可以捕获整个数据库的内容,或者仅捕获数据库中表的子集。另外,快照也可以捕获数据库中表的内容子集。
您可以通过将 execute-snapshot
消息发送到信号表来指定要捕获的表。将 execute-snapshot
信号类型设置为 增量
,并提供快照中包含的表名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| N/A |
包含与要快照的表的完全限定域名匹配的正则表达式的数组。 |
| N/A | 可选字符串,根据表的列指定条件,用于捕获表的内容的子集。 |
| N/A | 可选字符串,指定连接器在快照过程中用作表的主键的列名称。 |
触发临时快照
您可以通过向信号表中添加 execute-snapshot
信号类型的条目来发起临时快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和结束点。根据表中的条目数量以及配置的块大小,Debezium 会将表划分为块,并一次性执行每个块的快照。
9.2.3. 增量快照
为了提供管理快照的灵活性,Debezium 包含附加快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,除了一次捕获数据库的完整状态,就像初始快照一样,Debebe 会在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1024 行。
当增量快照进行时,Debebe 使用 watermarks 跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,捕获数据的阶段方法具有以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是在快照完成前进行后流。连接器会在快照过程中从更改日志中捕获接近实时事件,且操作都不会阻止其他操作。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的点开始,而不是从开始计算表。
-
您可以随时根据需要运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其
table.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据 配置的块大小 将表分成块。然后,按块的工作块会捕获块中的每个表行。对于它捕获的每行,快照会发出 READ
事件。该事件代表块的快照开始时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,可能会修改表记录。为了反映此类更改,INSERT
、UPDATE
或 DELETE
操作会按照常常提交到事务日志。同样,持续 Debezium 流进程将继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥在记录间解决冲突
在某些情况下,streaming 进程发出的 UPDATE
或 DELETE
事件会停止序列。也就是说,流流过程可能会发出一个修改表行的事件,该事件捕获包含该行的 READ
事件的块。当快照最终为行发出对应的 READ
事件时,其值已被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debebe 使用缓冲方案来解析冲突。仅在快照事件和流化事件之间发生冲突后,De Debezium 会将事件记录发送到 Kafka。
快照窗口
为了帮助解决修改同一表行的后期事件和流化事件之间的冲突,Debebe 会使用一个所谓的 快照窗口。快照窗口分解了增量快照捕获指定表块数据的间隔。在块的快照窗口打开前,Debebe 会使用其常见行为,并将事件从事务日志直接下游发送到目标 Kafka 主题。但从特定块的快照打开后,直到关闭为止,De-duplication 步骤会在具有相同主密钥的事件之间解决冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其存储在单个目标 Kafka 主题中。从表直接捕获的快照记录作为 READ
操作发送。同时,当用户继续更新数据收集中的记录,并且会更新事务日志来反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会向内存缓冲区提供快照记录。在快照窗口期间,缓冲区中 READ
事件的主密钥与传入流事件的主键进行比较。如果没有找到匹配项,则流化事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流化记录写入目标主题,因为流的事件逻辑地取代静态快照事件。在块关闭的快照窗口后,缓冲区仅包含 READ
事件,这些事件不存在相关的事务日志事件。Debezium 将这些剩余的 READ
事件发送到表的 Kafka 主题。
连接器为每个快照块重复这个过程。
SQL Server 的 Debezium 连接器不支持增量快照运行时的模式更改。
9.2.3.1. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号表发送临时快照 信号。
作为 SQL INSERT
查询,您将向信号提交信号。
在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,并可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值 incremental
。
要指定快照中包含的表,请提供列出表或用于匹配表的正则表达式数组的 数据集合
,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不会执行快照。
如果要包含在快照中的表的名称在数据库、模式或表的名称中包含句点(.
),以将表添加到 data-collections
数组中,您必须使用双引号转义名称的每个部分。
例如,要包含一个存在于 公共
模式的表,其名称为 My.Table
,请使用以下格式 :"public"."My.Table
"。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道来触发增量快照
发送 SQL 查询,将临时增量快照请求添加到信号表中:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}, 5 "additional-condition":"color=blue"}'); 6
命令中的
id
、type
和data
参数的值对应于 信号表 的字段。下表描述了示例中的参数:
表 9.2. SQL 命令中字段的描述,用于将增量快照信号发送到信号表 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定一个任意字符串,它被分配为信号请求的id
标识符。
使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。相反,Debebe 会在快照期间生成自己的id
字符串作为水位线信号。3
execute-snapshot
type
参数指定信号旨在触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号表的名称相同。5
incremental
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效的选项是默认值incremental
。
如果没有指定值,连接器将运行增量快照。6
additional-condition
可选字符串,根据表的列指定条件,用于捕获表的内容的子集。有关
additional-condition
参数的更多信息,请参阅带有额外条件
的临时增量快照。
带有额外条件
的临时增量快照
如果您希望快照只包含表中的内容子集,您可以通过向快照信号附加 additional-condition
参数来修改信号请求。
典型的快照的 SQL 查询采用以下格式:
SELECT * FROM <tableName> ....
通过添加 additional-condition
参数,您可以将 WHERE
条件附加到 SQL 查询中,如下例所示:
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,假设您有一个包含以下列的 products
表:
-
ID
(主键) -
color
-
quantity
如果您需要 product
表的增量快照,其中只包含 color=blue
的数据项,您可以使用以下 SQL 语句来触发快照:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
参数还允许您传递基于多个列的条件。例如,使用上例中的 product
表,您可以提交查询来触发增量快照,该快照仅包含 color=blue
和 quantity>10
的项数据:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件消息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
9.2.3.2. 使用 Kafka 信号频道来触发增量快照
您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 execute-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
以逗号分隔的正则表达式数组,与快照中包含的表的完全限定域名匹配。 |
| N/A | 可选字符串,指定连接器评估为指定要包含在快照中的列子集的条件。 |
execute-snapshot Kafka 消息示例:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
带有额外条件的临时增量快照
Debezium 使用 additional-condition
字段来选择表内容的子集。
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM <tableName> ….
当快照请求包含 additional-condition
时,extra-condition
会附加到 SQL 查询中,例如:
SELECT * FROM <tableName> WHERE <additional-condition> ….
例如,如果一个 product
table with the column id
(主键)、color
和 brand
,如果您希望快照只包含 color='blue'
的内容,当您请求快照时,您可以附加一个 additional-condition
语句来过滤内容:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`
您可以使用 additional-condition
语句根据多个列传递条件。例如,如果您希望快照只包含 color='blue'
的
表中,以及 products
brand='MyBrand'
,则您可以发送以下请求:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`
9.2.3.3. 停止增量快照
您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT
查询向表提交停止快照信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并在正在进行时停止增量快照操作。
您提交的查询指定 增量
的快照操作,以及要删除的当前运行快照的表。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道停止增量快照
发送 SQL 查询以停止临时增量快照到信号表:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}'); 5
signal 命令中的
id
、type
和data
参数的值对应于 信号表 的字段。下表描述了示例中的参数:
表 9.4. SQL 命令中字段的描述,用于将停止增量快照信号发送到信号表 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定一个任意字符串,它被分配为信号请求的id
标识符。
使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。3
stop-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号表的名称相同。如果省略了data
字段的这一组件,信号将停止正在进行的整个增量快照。5
incremental
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效的选项是增量的
。
如果没有指定类型
值,信号将无法停止增量快照。
9.2.3.4. 使用 Kafka 信号频道停止增量快照
您可以将信号消息发送到 配置的 Kafka 信号主题,以停止临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 stop-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
可选数组,以逗号分隔的正则表达式,与表的完全限定域名匹配,以包含在快照中。 |
以下示例显示了典型的 stop-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
9.2.4. Debezium SQL Server 连接器如何读取更改数据表
当连接器首次启动时,它会获取捕获表的结构快照,并将此信息保留在其内部数据库 schema 历史记录主题中。然后,连接器会识别每个源表的更改表,并完成以下步骤。
- 对于每个更改表,连接器会读取在上次存储的最大 LSN 和当前的最大 LSN 之间创建的所有更改。
- 连接器会根据提交 LSN 的值对读取的更改进行排序,并更改 LSN。这种排序顺序可确保,Debezium 会按照数据库中的相同顺序重新执行更改。
- 连接器传递提交并将 LSNs 作为偏移改为 Kafka Connect。
- 连接器存储最大 LSN,并从第 1 步中重启该过程。
重启后,连接器会从它读取的最后一个偏移(提交并更改 LSN)中恢复处理。
连接器可以检测为包含的源表启用或禁用 CDC,并调整其行为。
9.2.5. 没有数据库中记录的最大 LSN
有些情况下,在数据库中没有记录最大 LSN,因为:
- SQL Server Agent 没有运行
- 更改表中尚未记录任何更改
- 数据库具有低活动,cdc 清理作业会定期清除 cdc 表中的条目
由于运行 SQL Server Agent 是这些可能性,因此没有问题,因此没有问题(没有 2. 和 3. 正常)。
为了缓解这个问题,并区分 No 1. 和其他不同,SQL 服务器代理的状态是通过以下查询 "SELECT CASE WHEN dss.[status]=4 THEN 1 ELSE 0 END AS isRunning FROM [#db].sys.dm_server_services dss WHERE dss.[servicename] LIKE NSQL'SQL Server Agent;%'
.如果 SQL Server Agent 没有运行,日志中会编写 ERROR: "No maximum LSN recorded in the database; SQL Server Agent is not running"。
运行状态查询的 SQL Server Agent 需要 VIEW SERVER STATE
服务器权限。如果您不想为配置的用户授予此权限,您可以选择通过 database.sqlserver.agent.status.query
属性配置自己的查询。您可以定义一个函数,它返回 true 或 1,如果 SQL Server Agent 在运行(其他情况返回 false 或 0)并安全地使用高级别权限而无需对它们进行授权,如 What minimum permissions do I need to provide to a user so that it can check the status of SQL Server Agent Service? 或 Safely and Easily Use High-Level Permissions Without Granting Them to Anyone: Server-level 所述。query 属性的配置应类似:database.sqlserver.agent.status.query=SELECT [#db].func_is_sql_server_agent_running()
- 您需要使用 [#db]
a作为数据库名称的占位符。
9.2.6. Debezium SQL Server 连接器的限制
SQL Server 专门要求基础对象成为表,以便创建更改捕获实例。因此,SQL Server 不支持从索引视图(也称为材料化视图)捕获更改,因此 Debezium SQL Server 连接器。
9.2.7. 接收 Debezium SQL Server 更改事件的 Kafka 主题的默认名称
默认情况下,SQL Server 连接器会将表中的所有 INSERT
、UPDATE
和 DELETE
操作的事件写入特定于该表的单一 Apache Kafka 主题。连接器使用以下惯例来命名更改事件主题: < topicPrefix> . <schemaName& gt; . <tableName>
以下列表为默认名称的组件提供定义:
- topicPrefix
-
由
topic.prefix
配置属性指定的服务器的逻辑名称。 - schemaName
- 发生更改事件的数据库模式的名称。
- tableName
- 发生更改事件的数据库表的名称。
例如,如果 fulfillment
是逻辑服务器名称,dbo
是 schema 名称,数据库包括名为 products
, products_on_hand
, customers
, 和 orders
的表,连接器将更改事件发送到以下 Kafka 主题:
-
fulfillment.testDB.dbo.products
-
fulfillment.testDB.dbo.products_on_hand
-
fulfillment.testDB.dbo.customers
-
fulfillment.testDB.dbo.orders
连接器应用类似的命名约定,以标记其内部数据库架构历史记录主题、架构更改主题 和事务元数据主题。
如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 来自定义主题命名的更多信息,请参阅 主题路由。
9.2.8. Debezium SQL Server 连接器如何处理数据库架构更改
当数据库客户端查询数据库时,客户端将使用数据库的当前架构。但是,数据库模式可以随时更改,这意味着连接器必须能够识别每个插入、更新或删除操作被记录的时间。另外,连接器不一定将当前的模式应用到每个事件。如果事件相对旧,则应用当前模式之前可能会记录该事件。
为确保在 schema 更改后正确处理更改事件,Debezium SQL Server 连接器会根据 SQL Server 更改表中的结构存储新模式的快照,该表反映了其相关数据表的结构。连接器在数据库 schema 历史记录 Kafka 主题中存储表 schema 信息,以及结果更改 LSN。连接器使用存储的 schema 表示来生成更改事件,这些事件在每次插入、更新或删除操作时正确镜像表结构。
当连接器在崩溃或安全停止后重启时,它会在它读取的最后一个位置中恢复 SQL Server CDC 表中的条目。根据连接器从数据库架构历史记录主题读取的 schema 信息,连接器应用存在于连接器重启的位置上的表结构。
如果您更新处于捕获模式的 Db2 表的 schema,您也务必要更新对应更改表的模式。您必须是一个具有升级权限的 SQL Server 数据库管理员,才能更新数据库架构。有关在 Debezium 环境中更新 SQL Server 数据库模式的更多信息,请参阅 数据库模式演进。
数据库架构历史记录主题仅用于内部连接器。另外,连接器也可以将 模式更改事件发送到用于消费者应用程序的不同主题。
其他资源
- 接收 Debezium 事件记录 的主题的默认名称。
9.2.9. Debezium SQL Server 连接器如何使用 schema 更改主题
对于启用了 CDC 的每个表,Debezium SQL Server 连接器会存储应用于数据库中表的模式更改事件的历史记录。连接器将模式更改事件写入名为 < topicPrefix>
; 的 Kafka 主题,其中 topicPrefix
是 topic.prefix
配置属性中指定的逻辑服务器名称。
连接器发送到 schema 更改主题的消息包含一个有效负载,以及可选的包含更改事件消息的 schema。模式更改事件消息的有效负载包括以下元素:
databaseName
-
将语句应用到的数据库的名称。
databaseName
的值充当 message 键。 tableChanges
-
架构更改后整个表模式的结构化表示。
tableChanges
字段包含一个数组,其中包含表的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可轻松读取消息,而不必先通过 DDL 解析器处理它们。
当连接器配置为捕获表时,它只会在 schema 更改主题中存储表的历史记录,也存储在内部数据库 schema 历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不适用于消耗应用程序直接使用。确保需要通知架构更改的应用程序只消耗来自 schema 更改主题的信息。
连接器发出到其 schema 更改主题的消息格式处于 incubating 状态,且可以在不通知的情况下改变。
当发生以下事件时,Debezium 会向 schema 更改主题发出一条消息:
- 您可以为表启用 CDC。
- 您可以为表禁用 CDC。
- 您可以按照 架构演变流程 更改启用了 CDC 的表结构。
示例:消息发送到 SQL Server 连接器模式更改主题
以下示例显示了 schema 更改主题中的消息。该消息包含表模式的逻辑表示。
{ "schema": { ... }, "payload": { "source": { "version": "2.3.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 0, "snapshot": "true", "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": null, "commit_lsn": "00000025:00000d98:00a2", "event_serial_no": null }, "ts_ms": 1588252618953, 1 "databaseName": "testDB", 2 "schemaName": "dbo", "ddl": null, 3 "tableChanges": [ 4 { "type": "CREATE", 5 "id": "\"testDB\".\"dbo\".\"customers\"", 6 "table": { 7 "defaultCharsetName": null, "primaryKeyColumnNames": [ 8 "id" ], "columns": [ 9 { "name": "id", "jdbcType": 4, "nativeType": null, "typeName": "int identity", "typeExpression": "int identity", "charsetName": null, "length": 10, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "first_name", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "last_name", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "email", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 10 { "customAttribute": "attributeValue" } ] } } ] } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在源对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。 |
2 |
| 标识包含更改的数据库和架构。 |
3 |
|
对于 SQL Server 连接器,始终为 |
4 |
| 包含 DDL 命令生成的模式更改的一个或多个项目的数组。 |
5 |
| 描述更改的类型。该值如下之一:
|
6 |
| 创建、更改或丢弃的表的完整标识符。 |
7 |
| 代表应用更改后的表元数据。 |
8 |
| 组成表主密钥的列的列表。 |
9 |
| 更改表中每个列的元数据。 |
10 |
| 每个表更改的自定义属性元数据。 |
在连接器发送到 schema 更改主题的消息中,键是包含 schema 更改的数据库的名称。在以下示例中,payload
字段包含键:
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.sqlserver.SchemaChangeKey" }, "payload": { "databaseName": "testDB" } }
9.2.10. Debezium SQL Server 连接器数据更改事件的描述
Debezium SQL Server 连接器为每个行级 INSERT
、UPDATE
和 DELETE
操作生成数据更改事件。每个事件包含一个键和值。键的结构和值取决于已更改的表。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间推移而改变,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您正在使用 schema registry,用户可以使用该模式 ID 从 registry 获取 schema。这使得每个事件都自包含。
以下框架 JSON 显示更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect converter,决定更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema
字段才会处于更改事件中。同样,只有在您配置转换器来生成它时,事件密钥和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,更改事件具有此结构:
{ "schema": { 1 ... }, "payload": { 2 ... }, "schema": { 3 ... }, "payload": { 4 ... }, }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录改为与事件原始表相同的主题。如需更多信息,请参阅 主题名称。
SQL Server 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效字符,它将使用下划线字符替换。
如果逻辑服务器名称、数据库名称或表名称包含无效字符,且唯一与另一个名称区分名称的字符无效,这可能会导致意外冲突冲突,从而被下划线替换。
有关更改事件的详情,请查看以下主题:
9.2.10.1. 关于 Debezium SQL Server 中的键更改事件
更改事件的密钥包含更改表的密钥和更改行的实际键的 schema。当连接器创建事件时,schema 及其对应有效负载都会包含更改表的主键(或唯一键约束)中每个列的字段。
考虑以下 客户
表,后跟此表的更改事件键的示例。
表示例
CREATE TABLE customers ( id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE );
更改事件键示例
每次捕获 customer 表的更改事件都有相同的事件关键模式。只要
customers
表有以前的定义,可以捕获 customer
表更改的事件都有以下关键结构(JSON),它类似于:
{ "schema": { 1 "type": "struct", "fields": [ 2 { "type": "int32", "optional": false, "field": "id" } ], "optional": false, 3 "name": "server1.testDB.dbo.customers.Key" 4 }, "payload": { 5 "id": 1004 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 |
2 |
|
指定 |
3 |
|
指明 event 键是否必须在其 |
4 |
|
定义密钥有效负载结构的模式名称。这个 schema 描述了已更改的表的主键的结构。键模式名称的格式是 connector-name.database-schema-name.table-name.
|
5 |
|
包含生成此更改事件的行的密钥。在本例中,键 包含一个 |
9.2.10.2. 关于 Debezium SQL Server 中的值更改事件
更改事件中的值比键复杂一些。与键一样,该值有一个 schema
部分和 payload
部分。schema
部分包含描述 payload
部分的 Envelope
结构的 schema,包括其嵌套字段。为创建、更新或删除数据的操作更改事件,它们都有一个带有 envelope 结构的值有效负载。
考虑用于显示更改事件键示例的相同示例表:
CREATE TABLE customers ( id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE );
每个事件类型都描述了更改此表的更改事件的值部分。
创建 事件
以下示例显示了一个更改事件的值部分,连接器为在 customer
表中创建数据的操作生成的更改事件的值部分:
{ "schema": { 1 "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "server1.dbo.testDB.customers.Value", 2 "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "server1.dbo.testDB.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "string", "optional": true, "field": "change_lsn" }, { "type": "string", "optional": true, "field": "commit_lsn" }, { "type": "int64", "optional": true, "field": "event_serial_no" } ], "optional": false, "name": "io.debezium.connector.sqlserver.Source", 3 "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "server1.dbo.testDB.customers.Envelope" 4 }, "payload": { 5 "before": null, 6 "after": { 7 "id": 1005, "first_name": "john", "last_name": "doe", "email": "john.doe@example.org" }, "source": { 8 "version": "2.3.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 1559729468470, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000758:0003", "commit_lsn": "00000027:00000758:0005", "event_serial_no": "1" }, "op": "c", 9 "ts_ms": 1559729471739 10 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 值的 schema,用于描述值有效负载的结构。当连接器为特定表生成的每次更改事件中,更改事件的值模式都是相同的。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值的实际数据。这是更改事件提供的信息。 |
6 |
|
指定事件发生前行状态的可选字段。当 |
7 |
|
指定事件发生后行状态的可选字段。在本例中, |
8 |
| 描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序以及事件是否为同一事务的一部分。源元数据包括:
|
9 |
|
描述导致连接器生成事件的操作类型的强制字符串。在本例中,
|
10 |
|
可选字段,显示连接器处理事件的时间。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新 事件
示例 customers
表中一个更新的改变事件的值有与那个表的 create 事件相同的模式。同样,事件值有效负载具有相同的结构。但是,事件值有效负载在 update 事件中包含不同的值。以下是连接器为 customer 表中更新生成的更改事件值 的示例
:
{ "schema": { ... }, "payload": { "before": { 1 "id": 1005, "first_name": "john", "last_name": "doe", "email": "john.doe@example.org" }, "after": { 2 "id": 1005, "first_name": "john", "last_name": "doe", "email": "noreply@example.org" }, "source": { 3 "version": "2.3.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 1559729995937, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000ac0:0002", "commit_lsn": "00000027:00000ac0:0007", "event_serial_no": "2" }, "op": "u", 4 "ts_ms": 1559729998706 5 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定事件发生前行状态的可选字段。在 update 事件值中, |
2 |
|
指定事件发生后行状态的可选字段。您可以比较 |
3 |
|
描述事件源元数据的必需字段。
|
4 |
|
描述操作类型的强制字符串。在 update 事件值中, |
5 |
|
可选字段,显示连接器处理事件的时间。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新行 primary/unique 键的列会更改行的键值。当键更改时,Debezium 会输出 三个 事件:一个 delete 事件和带有行的旧键的 tombstone 事件,后跟一个带有行的新键的 create 事件。
删除 事件
delete 更改事件中的值与为同一表的 create 和 update 事件相同的 schema
部分。示例 customer
表的 delete 事件中 payload
部分类似如下:
{ "schema": { ... }, }, "payload": { "before": { <> "id": 1005, "first_name": "john", "last_name": "doe", "email": "noreply@example.org" }, "after": null, 1 "source": { 2 "version": "2.3.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 1559730445243, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000db0:0005", "commit_lsn": "00000027:00000db0:0007", "event_serial_no": "1" }, "op": "d", 3 "ts_ms": 1559730450205 4 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定事件发生前行状态的可选字段。在一个 delete 事件值中, |
2 |
|
指定事件发生后行状态的可选字段。在 delete 事件值中, |
3 |
|
描述事件源元数据的必需字段。在一个 delete 事件值中,
|
4 |
|
描述操作类型的强制字符串。 |
5 |
|
可选字段,显示连接器处理事件的时间。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
SQL Server 连接器事件旨在使用 Kafka 日志压缩。只要保留每个密钥的最新消息,日志压缩就会启用删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
tombstone 事件
删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须为 null
。为了实现此目的,在 Debezium 的 SQL Server 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但有一个 null
值。
9.2.11. Debezium SQL Server 连接器生成的事件代表事务边界
Debezium 可以生成代表事务边界的事件,以及丰富的数据更改事件消息。
Debezium 注册并只针对部署连接器后发生的事务接收元数据。部署连接器前发生的事务元数据不可用。
数据库事务由语句块表示,该块包含在 BEGIN
和 END
关键字之间。Debezium 为每个事务中的 BEGIN
和 END
分隔符生成事务边界事件。事务边界事件包含以下字段:
status
-
BEGIN
或END
. id
- 唯一事务标识符的字符串。
ts_ms
-
数据源的事务边界事件(
BEGIN
或END
事件)的时间。如果数据源没有向事件时间提供 Debezium,则该字段代表 Debezium 处理事件的时间。 event_count
(用于END
事件)- 事务提供的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
元素的数组,用于指示连接器发出来自数据收集的更改的事件数量。
Debezium 无法可靠地识别事务何时结束。因此,事务的 END
标记仅在另一个事务的第一个事件到达后发出。在低流量系统中,这可能会导致延迟发布 END
标记。
以下示例显示了典型的事务边界消息:
示例:SQL Server 连接器事务边界事件
{ "status": "BEGIN", "id": "00000025:00000d08:0025", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "00000025:00000d08:0025", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "testDB.dbo.testDB.tablea", "event_count": 1 }, { "data_collection": "testDB.dbo.testDB.tableb", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则事务事件将写入名为 <topic. prefix>
。
.transaction
的主题
9.2.11.1. 更改数据事件增强
如果启用了事务元数据,数据消息 Envelope
会增加一个新的 transaction
字段。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串
total_order
- 事件在事务生成的所有事件中绝对位置
data_collection_order
- 在事务发送的所有事件间事件的每个数据收集位置
以下示例显示了典型的信息是什么:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "00000025:00000d08:0025", "total_order": "1", "data_collection_order": "1" } }
9.2.12. Debezium SQL Server 连接器如何映射数据类型
Debezium SQL Server 连接器通过生成与行存在表类似的事件来代表表行数据的更改。每个事件都包含用于代表行列值的字段。事件代表操作的列值的方式取决于列的 SQL 数据类型。在事件中,连接器将每个 SQL Server 数据类型的字段映射到 字面类型和 语义类型。
连接器可以将 SQL Server 数据类型映射到 literal 和 semantic 类型。
- 字面类型
-
描述如何使用 Kafka Connect 模式类型(即
INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,BOOLEAN
,STRING
,BYTES
,ARRAY
,MAP
, 和STRUCT
)来代表值。 - 语义类型
- 描述 Kafka Connect 模式如何使用字段的名称捕获字段 的含义。
如果默认数据类型转换不满足您的需要,您可以为连接器 创建自定义转换器。
有关数据类型映射的更多信息,请参阅以下部分:
基本类型
下表显示了连接器如何映射基本 SQL Server 数据类型。
SQL Server 数据类型 | 字面类型(schema 类型) | 语义类型(模式名称)和备注 |
---|---|---|
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
|
|
|
|
|
以下部分描述了其他数据类型映射。
如果存在,则列的默认值会被传播到对应字段的 Kafka Connect 模式。更改消息将包含字段的默认值(除非给出了显式列值),因此很少需要从 schema 获取默认值。
时序值
除 SQL Server 的 DATETIMEOFFSET
数据类型(包含时区信息)外,其他 temporal 类型取决于 time.precision.mode
配置属性值。当将 time.precision.mode
配置属性设置为 adaptive
(默认值)时,连接器将根据列的数据类型确定 temporal 类型的字面类型和语义类型,以便事件 准确 代表数据库中的值:
SQL Server 数据类型 | 字面类型(schema 类型) | 语义类型(模式名称)和备注 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
当将 time.precision.mode
配置属性设为 connect
时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者只了解内置的 Kafka Connect 逻辑类型,且无法处理变量-precision 时间值时,这很有用。另一方面,因为 SQL 服务器支持十分之一的微秒精度,带有 connect
时间精度模式的连接器将在有一个大于 3 的 fractional second precision 数据库列时丢失一些精度:
SQL Server 数据类型 | 字面类型(schema 类型) | 语义类型(模式名称)和备注 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
时间戳值
DATETIME
,SMALLDATETIME
和 DATETIME2
类型代表一个没有时区信息的时间戳。此类列根据 UTC 转换为对等的 Kafka Connect 值。对于实例,DATETIME2
值 "2018-06-20 15:13:16.945104" 由值 "1529507596945104" 为 io.debezium.time.MicroTimestamp
代表。
请注意,运行 Kafka Connect 和 Debezium 的 JVM 时区不会影响这个转换。
十进制值
Debezium 连接器根据 decimal.handling.mode
连接器配置属性的 设置处理十进制。
- decimal.handling.mode=precise
表 9.13. 当 decimal.handling.mode=precise时映射 SQL Server 类型 字面类型(schema 类型) 语义类型(schema name) NUMERIC[(P[,S])]
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。DECIMAL[(P[,S])]
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。SMALLMONEY
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。金钱
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。- decimal.handling.mode=double
表 9.14. 当 decimal.handling.mode=double时映射 SQL Server 类型 字面类型 语义类型 NUMERIC[(M[,D])]
FLOAT64
不适用
DECIMAL[(M[,D])]
FLOAT64
不适用
SMALLMONEY[(M[,D])]
FLOAT64
不适用
MONEY[(M[,D])]
FLOAT64
不适用
- decimal.handling.mode=string
表 9.15. 当 decimal.handling.mode=string时映射 SQL Server 类型 字面类型 语义类型 NUMERIC[(M[,D])]
字符串
不适用
DECIMAL[(M[,D])]
字符串
不适用
SMALLMONEY[(M[,D])]
字符串
不适用
MONEY[(M[,D])]
字符串
不适用