8.2. Debezium SQL Server 连接器如何工作
为了最佳配置和运行 Debezium SQL Server 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称以及使用元数据会很有帮助。
有关连接器如何工作的详情,请查看以下部分:
8.2.1. Debezium SQL Server 连接器如何执行数据库快照
SQL Server CDC 不设计存储数据库更改的完整历史记录。对于 Debezium SQL Server 连接器,为数据库当前状态建立基准,它使用名为 snapshotting 的进程。
您可以配置连接器如何创建快照。默认情况下,连接器的快照模式被设置为 initial
。根据这个 初始
快照模式,连接器首次启动时,它会执行数据库的初始 一致性快照。此初始快照捕获了与为连接器配置的 include
和 exclude
属性定义的任何表的结构和数据(例如 table.include.list
、column.include.list
、table.exclude.list
等)。
当连接器创建快照时,它会完成以下任务:
- 决定要捕获的表。
-
获取启用 CDC 的 SQL Server 表上的锁定,以防止在创建快照时发生结构更改。锁定的级别由
snapshot.isolation.mode
配置选项决定。 - 在服务器的事务日志中读取最大日志序列号(LSN)位置。
- 捕获所有相关表的结构。
- 释放在第 2 步中获取的锁定(如果需要)。在大多数情况下,锁定只会保留短时间内。
-
根据在第 3 步中读取的 LSN 位置扫描 SQL Server 源表和模式,为表中的每一行生成一个
READ
事件,并将事件写入表的 Kafka 主题。 - 在连接器偏移中记录成功完成快照。
生成的初始快照捕获为 CDC 启用的表中的每行的当前状态。在这个基准状态中,连接器会捕获后续更改。
8.2.1.1. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,遵循这个初始快照,连接器不会重复快照过程。连接器捕获的任何将来更改事件数据都仅通过流传输过程。
然而,在某些情况下,连接器在初始快照期间获取的数据可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能是执行临时快照:
- 连接器配置会被修改为捕获不同的表集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时命令快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用自动主题创建,Debezium 可以自动创建主题。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q2/html-single/debezium_user_guide/index#customization-of-kafka-connect-automatic-topic-creation
临时快照信号指定要包含在快照中的表。快照可以捕获数据库的整个内容,或者只捕获数据库中表的子集。另外,快照也可以捕获数据库中表内容的子集。
您可以通过向信号表发送 execute-snapshot
消息来指定要捕获的表。将 execute-snapshot
信号的类型设置为增量
,并提供快照中包含的表名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| 不适用 |
包含与要快照的表的完全限定域名匹配的正则表达式的数组。 |
| 不适用 | 可选字符串,它根据表的列指定条件,用于捕获表内容的子集。 |
触发临时快照
您可以通过在信号表中添加带有 execute-snapshot
信号类型的条目来启动临时快照。连接器处理消息后,它会开始快照操作。快照过程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和端点。根据表中的条目数量以及配置的块大小,Debezium 将表分成块,并一次为每个块进行快照。
8.2.1.2. 增量快照
为了提供管理快照的灵活性,Debezium 包括一个补充的快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,而不是一次性捕获数据库的完整状态,如初始快照,Debezium 以一系列可配置的块的形式捕获每个表。您可以指定您希望快照捕获的表,以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1 KB。
当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护它捕获的每一个表行的记录。这个阶段捕获数据的方法比标准初始快照过程提供以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是经过发布流,直到快照完成为止。连接器会继续在整个快照过程中从更改日志捕获接近实时事件,且操作都不会阻断其他事件。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间开始,而不是从开始重新定义表。
-
您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其
table.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 按主密钥对表进行排序,然后根据 配置的块大小 将表分成块。然后,按块使用块,然后在块中捕获每个表行。对于它捕获的每行,快照会发出 READ
事件。该事件代表了块开始快照时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,从而可能会修改表记录。要反映此类更改,INSERT
、UPDATE
或 DELETE
操作会照常提交到事务日志中。同样,持续 Debezium 流过程还会继续检测这些更改事件,并将对应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥解决记录间的冲突
在某些情况下,流传输进程发出的 UPDATE
或 DELETE
事件会耗尽序列。也就是说,流过程可能会发出一个事件,它会在快照捕获包含该行的 READ
事件之前修改表行。当快照最终会为行发出对应的 READ
事件时,其值已经被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debezium 会使用一种缓冲区方案来解决冲突。只有在快照事件和流事件间冲突时才解决后,Debezium 会向 Kafka 发送事件记录。
快照窗口
为了帮助解决后续 READ
事件和修改同一表行的流事件之间的冲突,Debezium 采用所谓的 快照窗口。快照窗口分离了增量快照捕获指定表块的数据的时间间隔。在打开块的快照窗口前,Debebe 会遵循其常见的行为,并将事件直接从事务日志直接降级到目标 Kafka 主题。但是,从打开特定块的快照时,Debebe 会执行重复数据删除步骤,以解决具有相同主密钥的事件之间冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ
操作发出。同时,当用户继续更新数据收集中的记录,并更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会将快照记录提供给内存缓冲区。在快照窗口中,缓冲区中 READ
事件的主键将与传入流事件的主键进行比较。如果没有找到匹配项,流的事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流的记录写入目标主题,因为流的事件逻辑地替换静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ
事件。Debezium 将这些剩余的 READ
事件发送到表的 Kafka 主题。
连接器重复每个快照块的进程。
SQL Server 的 Debezium 连接器不支持增量快照运行时的 schema 更改。
8.2.1.3. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号发送临时快照 信号。
您可以以 SQL INSERT
查询形式向信号表提交信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,以及可选的指定快照操作类型。目前,快照操作的唯一有效选项是默认值 增量
。
要指定快照中包含的表,请提供一个 data-collections
数组,该数组列出了用于匹配表的表或一组正则表达式,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不执行快照。
如果要包含在快照中的表名称包含数据库、模式或表名称的句点(.
),要将表添加到 data-collections
数组中,您必须用双引号转义名称的每个部分。
例如,若要包含 公共
模式中存在的表,并且名为 My.Table
,请使用以下格式:" public"."My.Table
"。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询,将临时增量快照请求添加到信号表中:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}, 5 "additional-condition":"color=blue"}'); 6
命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 8.2. 向信号表发送增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。相反,在快照中,Debebe 会生成自己的id
字符串作为水位信号。3
execute-snapshot
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。5
增量
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效选项是默认值增量
。
如果没有指定值,连接器将运行一个增量快照。6
additional-condition
可选字符串,它根据表的列指定条件,用于捕获表内容的子集。有关
additional-condition
参数的详情请参考带有额外条件
的临时增量快照。
带有额外条件
的临时增量快照
如果您希望快照在表中仅包含内容子集,您可以通过将 additional-condition
参数附加到快照信号来修改信号请求。
典型的快照的 SQL 查询采用以下格式:
SELECT * FROM <tableName> ....
通过添加 additional-condition
参数,您可以将 WHERE
条件附加到 SQL 查询中,如下例所示:
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,假设您有一个包含以下列的产品表:
-
id
(主密钥) -
color
-
quantity
如果您希望 product 表的增量快照只包含 color=blue
的数据项,您可以使用以下 SQL 语句来触发快照:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含那些项目的 data =blue
和 quantity>10
:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件信息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
8.2.1.4. 停止增量快照
您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT
查询,向表提交停止快照信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行时停止增量快照操作。
您提交的查询指定了 增量
的快照操作,以及可选的、要删除的当前运行快照的表。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
发送 SQL 查询以停止临时增量快照到信号表:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}'); 5
signal 命令中的
id
、type
和data
参数的值对应于 信号表的字段。下表描述了示例中的参数:
表 8.3. 向信号表发送停止增量快照信号的 SQL 命令中的字段描述 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定分配给信号请求的id
标识符的任意字符串。
使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。3
stop-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定signal.data.collection
配置属性中的连接器信号表的名称相同。如果省略了data
字段的此组件,则信号将停止正在进行的整个增量快照。5
增量
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效选项是增量的
。
如果没有指定类型
值,则信号无法停止增量快照。
8.2.2. Debezium SQL Server 连接器如何读取更改数据表
当连接器首次启动时,它会对捕获表的结构进行结构快照,并将这些信息保留给其内部数据库架构历史记录主题。然后连接器会识别每个源表的更改表,并完成以下步骤。
- 对于每个更改表,连接器会读取最后一次存储的最大 LSN 和当前最大 LSN 之间创建的所有更改。
- 连接器根据提交 LSN 的值排序它以升序读取的更改,并更改 LSN。这种排序顺序可确保 Debezium 按照数据库中发生的相同顺序重新执行更改。
- 连接器通过提交,并将 LSN 作为偏移改为 Kafka Connect。
- 连接器存储最大 LSN,并从第 1 步中重启进程。
重启后,连接器会从它读取的最后一个偏移(提交并更改 LSN)中恢复处理。
连接器可以检测是否为包含的源表启用或禁用 CDC,并调整其行为。
8.2.3. 数据库中没有记录的最大 LSN
当数据库中没有记录最大 LSN 时,可能会出现以下情况,因为:
- SQL Server Agent 没有运行
- 更改表中未记录任何更改
- 数据库的活动较低,cdc 清理作业会定期清除 cdc 表中的条目
从这些可能之外,由于正在运行的 SQL Server Agent 是先决条件,因此没有 1。是一个实际问题(虽然无 2 和 3)。
为了缓解这个问题并区分 No 1. 和不同,对 SQL Server Agent 的检查是通过以下查询 "SELECT CASE WHEN dss.[status]=4\":\"N 1 ELSE 0 END AS 进行的,则以下查询 "SELECT CASE WRE dss.[servicename] LIKE N'SQL Server Agent (%';"
.如果 SQL Server Agent 没有运行,则会在日志中写入 ERROR:"No maximum LSN in the database; SQL Server Agent is not running"。
运行状态查询的 SQL Server Agent 需要 7000 SERVER STATE
服务器权限。如果您不想向配置的用户授予这个权限,您可以选择通过 database.sqlserver.agent.status.query
属性配置您自己的查询。您可以定义一个功能,如果 SQL Server Agent 正在运行(false 或 0),并且安全地使用高级别权限,而无需根据此处所述,我需要为用户提供什么最小权限,以便它可以检查 SQL Server Agent 服务的状态? 或者这里使用高级别权限,而无需授予任何人:服务器级别。query 属性的配置应类似: database.sqlserver.agent.status.query=SELECT [ funcdb].func_is_sql_server_agent_running ()
- 您需要使用 [114db]
作为数据库名称的占位符。
8.2.4. Debezium SQL Server 连接器的限制
SQL Server 专门要求基本对象作为表,才能创建更改捕获实例。因此,SQL Server 不支持从索引视图(也称为 规范化视图)中捕获更改,因此 Debezium SQL Server 连接器。
8.2.5. 接收 Debezium SQL Server 更改事件记录的 Kafka 主题默认名称
默认情况下,SQL Server 连接器将所有 INSERT
、UPDATE
和 DELETE
操作的事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:< topicPrefix> . <schemaName& gt; . <tableName>
以下列表提供默认名称组件的定义:
- topicPrefix
-
服务器的逻辑名称,如
topic.prefix
配置属性指定。 - schemaName
- 发生更改事件的数据库模式的名称。
- tableName
- 发生更改事件的数据库表的名称。
例如,如果 fulfillment
是逻辑服务器名称,而 dbo
是 schema 名称,数据库包含名称 product,
,products
_on_handcustomers
, 和 orders
的表,连接器会将事件记录流传输为以下 Kafka 主题:
-
fulfillment.testDB.dbo.products
-
fulfillment.testDB.dbo.products_on_hand
-
fulfillment.testDB.dbo.customers
-
fulfillment.testDB.dbo.orders
连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 以及 事务元数据主题。
如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由。
8.2.6. Debezium SQL Server 连接器如何使用 schema 更改主题
对于启用了 CDC 的每个表,Debezium SQL Server 连接器会存储应用于数据库中捕获的表的模式更改事件的历史记录。连接器将模式更改事件写入名为 < topicPrefix>
; 的 Kafka 主题,其中 topicPrefix
是 topic.prefix
配置属性中指定的逻辑服务器名称。
连接器发送到 schema 更改主题的消息包含一个有效负载,也可以包含更改事件消息的 schema。模式更改事件消息的有效负载包括以下元素:
databaseName
-
将语句应用到的数据库的名称。
databaseName
的值充当 message 键。 tableChanges
-
模式更改后整个表架构的结构化表示。
tableChanges
字段包含一个数组,其中包含表中的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析程序处理它们。
当连接器配置为捕获表时,它会保存表架构的历史记录,不仅在架构更改主题中,也存储在内部数据库模式历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要有关架构的通知的应用程序只消耗了该模式的更改主题的信息。
连接器发送到其架构更改主题的消息格式处于 incubating 状态,可以在没有通知的情况下更改。
当发生以下事件时,Debebe 会将信息发送到 schema 更改主题:
- 您可以为表启用 CDC。
- 您可以为表禁用 CDC。
- 您可以按照 架构演进过程来更改启用 CDC 的表的结构。
示例:发送到 SQL Server 连接器模式更改主题的消息
以下示例显示了 schema 更改主题中的消息。消息包含表模式的逻辑表示。
{ "schema": { ... }, "payload": { "source": { "version": "2.1.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 0, "snapshot": "true", "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": null, "commit_lsn": "00000025:00000d98:00a2", "event_serial_no": null }, "ts_ms": 1588252618953, 1 "databaseName": "testDB", 2 "schemaName": "dbo", "ddl": null, 3 "tableChanges": [ 4 { "type": "CREATE", 5 "id": "\"testDB\".\"dbo\".\"customers\"", 6 "table": { 7 "defaultCharsetName": null, "primaryKeyColumnNames": [ 8 "id" ], "columns": [ 9 { "name": "id", "jdbcType": 4, "nativeType": null, "typeName": "int identity", "typeExpression": "int identity", "charsetName": null, "length": 10, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "first_name", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "last_name", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "email", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 10 { "customAttribute": "attributeValue" } ] } } ] } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在源对象中,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。 |
2 |
| 标识包含更改的数据库和 schema。 |
3 |
|
对于 SQL Server 连接器,始终为 |
4 |
| 包含 DDL 命令生成的模式更改的一个或多个项目的数组。 |
5 |
| 描述更改的类型。该值是以下之一:
|
6 |
| 创建、更改或丢弃的表的完整标识符。 |
7 |
| 应用更改后代表表元数据。 |
8 |
| 编写表主密钥的列列表。 |
9 |
| 更改表中的每个列的元数据。 |
10 |
| 每个表更改的自定义属性元数据。 |
在连接器发送到 schema 更改主题的消息中,键是包含架构更改的数据库名称。在以下示例中,payload
字段包含键:
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.sqlserver.SchemaChangeKey" }, "payload": { "databaseName": "testDB" } }
8.2.7. Debezium SQL Server 连接器数据更改事件的描述
Debezium SQL Server 连接器会为每行级别 INSERT
、UPDATE
和 DELETE
操作生成数据更改事件。每个事件都包含一个键和值。键和值的结构取决于更改的表。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,则消费者可以从 registry 获取 schema 的模式 ID。这使得每个事件都可以自我包含。
以下框架 JSON 显示更改事件的基本四个部分。但是,如何配置在应用程序中使用的 Kafka Connect converter 决定了更改事件中的这四个部分的表示。只有当您将转换器配置为生成它时,schema
字段才处于更改事件中。同样,只有在将转换器配置为生成它时,才会发生更改事件中的事件密钥和事件有效负载。如果您使用 JSON 转换器,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:
{ "schema": { 1 ... }, "payload": { 2 ... }, "schema": { 3 ... }, "payload": { 4 ... }, }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为名称与事件原始表相同的名称的主题。如需更多信息,请参阅 主题名称。
SQL Server 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着,逻辑服务器名称必须以拉丁或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余的字符以及数据库和表名称中的每个字符都必须是字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将替换为下划线字符。
如果逻辑服务器名称、数据库名称或表名称包含无效字符,且区分另一个名称的唯一字符无效,则可能会导致意外冲突,因此替换为下划线。
有关更改事件的详情,请查看以下主题:
8.2.7.1. 关于 Debezium SQL Server 中的键更改事件
更改事件的密钥包含更改表的密钥的 schema,以及更改的行的实际键。在连接器创建事件时,模式及其对应有效负载都包含更改表的主键(或唯一键约束)中的每个列的字段。
请考虑以下 客户
表,然后是此表的更改事件键示例。
表示例
CREATE TABLE customers ( id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE );
更改事件键示例
捕获对 customers
表的更改的每个更改事件都有相同的事件键模式。只要
表有以前的定义,捕获 customer 表更改的事件都有以下键结构,在 JSON 中类似如下:
customers
{ "schema": { 1 "type": "struct", "fields": [ 2 { "type": "int32", "optional": false, "field": "id" } ], "optional": false, 3 "name": "server1.testDB.dbo.customers.Key" 4 }, "payload": { 5 "id": 1004 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
键的 schema 部分指定一个 Kafka Connect 模式,它描述了密钥 |
2 |
|
指定 |
3 |
|
指明 event 键必须在其 |
4 |
|
定义密钥有效负载结构的 schema 名称。这个模式描述了更改的表的主键的结构。键架构名称的格式为 connector-name.database-schema-name.table-name.
|
5 |
|
包含生成此更改事件的行的键。在本例中,键包含一个 |
8.2.7.2. 关于 Debezium SQL Server 更改事件中的值
更改事件中的值比键稍微复杂。与键一样,该值具有 schema
部分和 payload
部分。schema
部分包含描述 payload
部分的 Envelope
结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的事件,并且具有 envelope 结构的值有效负载。
考虑用于显示更改事件键示例的相同示例:
CREATE TABLE customers ( id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE );
为每个事件类型描述了更改事件的值部分。
创建 事件
以下示例显示了连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:
{ "schema": { 1 "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "server1.dbo.testDB.customers.Value", 2 "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "server1.dbo.testDB.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "string", "optional": true, "field": "change_lsn" }, { "type": "string", "optional": true, "field": "commit_lsn" }, { "type": "int64", "optional": true, "field": "event_serial_no" } ], "optional": false, "name": "io.debezium.connector.sqlserver.Source", 3 "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "server1.dbo.testDB.customers.Envelope" 4 }, "payload": { 5 "before": null, 6 "after": { 7 "id": 1005, "first_name": "john", "last_name": "doe", "email": "john.doe@example.org" }, "source": { 8 "version": "2.1.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 1559729468470, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000758:0003", "commit_lsn": "00000027:00000758:0005", "event_serial_no": "1" }, "op": "c", 9 "ts_ms": 1559729471739 10 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 值的 schema,用于描述值有效负载的结构。每次连接器为特定表生成的更改时,更改事件的值模式都是相同的。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值的实际数据。这是更改事件提供的信息。 |
6 |
|
指定事件发生前行状态的可选字段。当 |
7 |
|
指定事件发生后行状态的可选字段。在本例中, |
8 |
| 描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序,以及事件是否为同一事务的一部分。源元数据包括:
|
9 |
|
描述导致连接器生成事件的操作类型的必要字符串。在本例中,
|
10 |
|
显示连接器处理事件的时间的可选字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新 事件
示例 customer 表中更新的更改事件值与该表的 create 事件相同。同样,事件的 payload 具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。以下是连接器在
customers
表中为更新生成的更改事件值示例:
{ "schema": { ... }, "payload": { "before": { 1 "id": 1005, "first_name": "john", "last_name": "doe", "email": "john.doe@example.org" }, "after": { 2 "id": 1005, "first_name": "john", "last_name": "doe", "email": "noreply@example.org" }, "source": { 3 "version": "2.1.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 1559729995937, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000ac0:0002", "commit_lsn": "00000027:00000ac0:0007", "event_serial_no": "2" }, "op": "u", 4 "ts_ms": 1559729998706 5 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定事件发生前行状态的可选字段。在 update 事件值中, |
2 |
|
指定事件发生后行状态的可选字段。您可以比较 |
3 |
|
描述事件源元数据的必需字段。
|
4 |
|
描述操作类型的强制字符串。在 update 事件值中, |
5 |
|
显示连接器处理事件的时间的可选字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新行的主/唯一键的列会更改行键的值。当键更改时,Debebe 会输出 三个 事件:一个 delete 事件和带有行的旧键的 tombstone 事件,后跟带有行新键的 create 事件。
删除 事件
delete 更改事件中的值与为同一表的 create 和 update 事件相同的 schema
部分。示例 customer
表的 delete 事件中 payload
部分类似如下:
{ "schema": { ... }, }, "payload": { "before": { <> "id": 1005, "first_name": "john", "last_name": "doe", "email": "noreply@example.org" }, "after": null, 1 "source": { 2 "version": "2.1.4.Final", "connector": "sqlserver", "name": "server1", "ts_ms": 1559730445243, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000db0:0005", "commit_lsn": "00000027:00000db0:0007", "event_serial_no": "1" }, "op": "d", 3 "ts_ms": 1559730450205 4 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定事件发生前行状态的可选字段。在一个 delete 事件值中, |
2 |
|
指定事件发生后行状态的可选字段。在 delete 事件值中, |
3 |
|
描述事件源元数据的必需字段。在一个 delete 事件值中,
|
4 |
|
描述操作类型的强制字符串。 |
5 |
|
显示连接器处理事件的时间的可选字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
SQL Server 连接器事件旨在用于 Kafka 日志压缩。只要每个密钥至少保留最新的消息,日志压缩就可以删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
tombstone 事件
删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,如果 Kafka 删除具有相同键的所有消息,消息值必须是 null
。为了实现此目的,在 Debezium 的 SQL Server 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但为 null
值。
8.2.8. Debezium SQL Server 连接器生成的事件代表事务边界
Debezium 可以生成代表事务边界的事件,并增强数据更改事件消息。
Debezium 仅在部署连接器后为发生的事务注册并接收元数据。部署连接器前发生的事务元数据。
数据库事务由语句块表示,该块在 BEGIN
和 END
关键字之间括起。Debezium 为每个事务生成 BEGIN
和 END
分隔符的事务边界事件。事务边界事件包含以下字段:
status
-
BEGIN
或END
。 id
- 唯一事务标识符的字符串表示。
ts_ms
-
数据源上事务边界事件(
BEGIN
或END
事件)的时间。如果数据源没有向 Debezium 提供事件时间,则该字段代表 Debezium 处理事件的时间。 event_count
(用于END
事件)- 事务所设计的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
元素的一组对,指示连接器为来自数据收集的更改发出的事件数。
Debezium 无法可靠地识别事务何时结束。因此,只有在第一个事务到达另一个事务后,才会发出事务的 END
标记。在出现低流量系统时,这可能会导致 END
标记的延迟发送。
以下示例显示了典型的事务边界信息:
示例:SQL Server 连接器事务边界事件
{ "status": "BEGIN", "id": "00000025:00000d08:0025", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "00000025:00000d08:0025", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "testDB.dbo.testDB.tablea", "event_count": 1 }, { "data_collection": "testDB.dbo.testDB.tableb", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则事务事件将写入名为 <topic. prefix>
。
.transaction
的主题
8.2.8.1. 更改数据事件增强
如果启用了事务元数据,数据消息 Envelope
会增加一个新的 transaction
字段。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串表示
total_order
- 事件在事务生成的所有事件的绝对路径
data_collection_order
- 事件在事务发送的所有事件中每个数据收集位置
以下示例显示了典型的信息是什么:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "00000025:00000d08:0025", "total_order": "1", "data_collection_order": "1" } }
8.2.9. Debezium SQL Server 连接器如何映射数据类型
Debezium SQL Server 连接器通过生成类似于行存在的表的事件来代表表行数据的更改。每个事件都包含表示行列值的字段。事件表示操作的列值的方式取决于该列的 SQL 数据类型。在事件中,连接器将每个 SQL Server 数据类型的字段映射到 字面类型和 语义类型。
连接器可将 SQL Server 数据类型映射到 字面 和 语义 类型。
- 字面类型
-
描述如何使用 Kafka Connect 模式类型(即
INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,BOOLEAN
,STRING
,BYTES
,ARRAY
,MAP
, 和STRUCT
)来代表值。 - 语义类型
- 描述 Kafka Connect 模式如何使用字段的名称捕获字段 的含义。
如果默认数据类型转换无法满足您的需要,您可以为连接器 创建自定义转换器。
有关数据类型映射的更多信息,请参阅以下部分:
基本类型
下表显示了连接器如何映射基本的 SQL Server 数据类型。
SQL Server 数据类型 | 字面类型(架构类型) | 语义类型(架构名称)和备注 |
---|---|---|
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
| 不适用 |
|
|
|
|
|
|
以下部分介绍了其他数据类型映射。
如果存在,列的默认值会被传播到对应的字段的 Kafka Connect 模式。更改消息将包含字段的默认值(除非给出了显式列值),因此应该很少需要从 schema 获取默认值。
临时值
除 SQL Server 的 DATETIMEOFFSET
数据类型(包含时区信息)外,其他时间类型取决于 time.precision.mode
配置属性的值。当 time.precision.mode
配置属性被设置为 adaptive
(默认值),那么连接器将根据列的数据类型定义决定 temporal 类型的字面类型和语义类型,以便事件 完全 表示数据库中的值:
SQL Server 数据类型 | 字面类型(架构类型) | 语义类型(架构名称)和备注 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
当 time.precision.mode
配置属性设为 connect
时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者只了解内置的 Kafka Connect 逻辑类型且无法处理变量精度时间值时,这非常有用。另一方面,因为 SQL 服务器支持十分之一的微秒精度,带有 connect
时间精度模式的连接器将在有一个大于 3 的 fractional second precision 数据库列时丢失一些精度:
SQL Server 数据类型 | 字面类型(架构类型) | 语义类型(架构名称)和备注 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
时间戳值
DATETIME
、SMALLDATETIME
和 DATETIME2
类型代表没有时区信息的时间戳。这些列根据 UTC 转换为等同的 Kafka Connect 值。例如,DATETIME2
值 "2018-06-20 15:13:16.945104" 由 io.debezium.time.MicroTimestamp
代表,值为 "1539)07596945104"。
请注意,运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这个转换。
十进制值
Debezium 连接器根据 decimal.handling.mode
连接器配置属性 的设置处理十进制。
- decimal.handling.mode=precise
表 8.11. 当 decimal.handling.mode=precise时进行映射 SQL Server 类型 字面类型(架构类型) 语义类型(架构名称) NUMERIC[(P[,S])]
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。DECIMAL[(P[,S])]
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。SMALLMONEY
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。领导
BYTES
org.apache.kafka.connect.data.Decimal
scale
模式参数包括一个整数,它代表了十进制小数点移动了多少位。- decimal.handling.mode=double
表 8.12. Mappings when decimal.handling.mode=double SQL Server 类型 字面类型 语义类型 NUMERIC[(M[,D])]
FLOAT64
不适用
DECIMAL[(M[,D])]
FLOAT64
不适用
SMALLMONEY[(M[,D])]
FLOAT64
不适用
MONEY[(M[,D])]
FLOAT64
不适用
- decimal.handling.mode=string
表 8.13. Mappings when decimal.handling.mode=string SQL Server 类型 字面类型 语义类型 NUMERIC[(M[,D])]
字符串
不适用
DECIMAL[(M[,D])]
字符串
不适用
SMALLMONEY[(M[,D])]
字符串
不适用
MONEY[(M[,D])]
字符串
不适用