8.2. Debezium SQL Server 连接器如何工作


为了最佳配置和运行 Debezium SQL Server 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称以及使用元数据会很有帮助。

有关连接器如何工作的详情,请查看以下部分:

8.2.1. Debezium SQL Server 连接器如何执行数据库快照

SQL Server CDC 不设计存储数据库更改的完整历史记录。对于 Debezium SQL Server 连接器,为数据库当前状态建立基准,它使用名为 snapshotting 的进程。

您可以配置连接器如何创建快照。默认情况下,连接器的快照模式被设置为 initial。根据这个 初始 快照模式,连接器首次启动时,它会执行数据库的初始 一致性快照。此初始快照捕获了与为连接器配置的 includeexclude 属性定义的任何表的结构和数据(例如 table.include.listcolumn.include.listtable.exclude.list 等)。

当连接器创建快照时,它会完成以下任务:

  1. 决定要捕获的表。
  2. 获取启用 CDC 的 SQL Server 表上的锁定,以防止在创建快照时发生结构更改。锁定的级别由 snapshot.isolation.mode 配置选项决定。
  3. 在服务器的事务日志中读取最大日志序列号(LSN)位置。
  4. 捕获所有相关表的结构。
  5. 释放在第 2 步中获取的锁定(如果需要)。在大多数情况下,锁定只会保留短时间内。
  6. 根据在第 3 步中读取的 LSN 位置扫描 SQL Server 源表和模式,为表中的每一行生成一个 READ 事件,并将事件写入表的 Kafka 主题。
  7. 在连接器偏移中记录成功完成快照。

生成的初始快照捕获为 CDC 启用的表中的每行的当前状态。在这个基准状态中,连接器会捕获后续更改。

8.2.1.1. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,遵循这个初始快照,连接器不会重复快照过程。连接器捕获的任何将来更改事件数据都仅通过流传输过程。

然而,在某些情况下,连接器在初始快照期间获取的数据可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能是执行临时快照:

  • 连接器配置会被修改为捕获不同的表集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时命令快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用自动主题创建,Debezium 可以自动创建主题。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q2/html-single/debezium_user_guide/index#customization-of-kafka-connect-automatic-topic-creation

临时快照信号指定要包含在快照中的表。快照可以捕获数据库的整个内容,或者只捕获数据库中表的子集。另外,快照也可以捕获数据库中表内容的子集。

您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为增量,并提供快照中包含的表名称,如下表所述:

表 8.1. 临时 执行快照 信号记录的示例
字段默认

type

增量

指定您要运行的快照类型。
设置类型是可选的。目前,您只能请求 增量 快照。

data-collections

不适用

包含与要快照的表的完全限定域名匹配的正则表达式的数组。
名称的格式与 signal.data.collection 配置选项的格式相同。

additional-condition

不适用

可选字符串,它根据表的列指定条件,用于捕获表内容的子集。

触发临时快照

您可以通过在信号表中添加带有 execute-snapshot 信号类型的条目来启动临时快照。连接器处理消息后,它会开始快照操作。快照过程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和端点。根据表中的条目数量以及配置的块大小,Debezium 将表分成块,并一次为每个块进行快照。

目前,exec-snapshot 操作类型仅触发 增量快照。如需更多信息,请参阅 增加快照

8.2.1.2. 增量快照

为了提供管理快照的灵活性,Debezium 包括一个补充的快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,而不是一次性捕获数据库的完整状态,如初始快照,Debezium 以一系列可配置的块的形式捕获每个表。您可以指定您希望快照捕获的表,以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1 KB。

当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护它捕获的每一个表行的记录。这个阶段捕获数据的方法比标准初始快照过程提供以下优点:

  • 您可以使用流化数据捕获并行运行增量快照,而不是经过发布流,直到快照完成为止。连接器会继续在整个快照过程中从更改日志捕获接近实时事件,且操作都不会阻断其他事件。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间开始,而不是从开始重新定义表。
  • 您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其 table.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 按主密钥对表进行排序,然后根据 配置的块大小 将表分成块。然后,按块使用块,然后在块中捕获每个表行。对于它捕获的每行,快照会发出 READ 事件。该事件代表了块开始快照时的行值。

当快照继续进行时,其他进程可能会继续访问数据库,从而可能会修改表记录。要反映此类更改,INSERTUPDATEDELETE 操作会照常提交到事务日志中。同样,持续 Debezium 流过程还会继续检测这些更改事件,并将对应的更改事件记录发送到 Kafka。

Debezium 如何使用相同的主密钥解决记录间的冲突

在某些情况下,流传输进程发出的 UPDATEDELETE 事件会耗尽序列。也就是说,流过程可能会发出一个事件,它会在快照捕获包含该行的 READ 事件之前修改表行。当快照最终会为行发出对应的 READ 事件时,其值已经被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debezium 会使用一种缓冲区方案来解决冲突。只有在快照事件和流事件间冲突时才解决后,Debezium 会向 Kafka 发送事件记录。

快照窗口

为了帮助解决后续 READ 事件和修改同一表行的流事件之间的冲突,Debezium 采用所谓的 快照窗口。快照窗口分离了增量快照捕获指定表块的数据的时间间隔。在打开块的快照窗口前,Debebe 会遵循其常见的行为,并将事件直接从事务日志直接降级到目标 Kafka 主题。但是,从打开特定块的快照时,Debebe 会执行重复数据删除步骤,以解决具有相同主密钥的事件之间冲突。

对于每个数据收集,Debezium 会发出两种类型的事件,并将其记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,当用户继续更新数据收集中的记录,并更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debezium 开始处理快照块,它会将快照记录提供给内存缓冲区。在快照窗口中,缓冲区中 READ 事件的主键将与传入流事件的主键进行比较。如果没有找到匹配项,流的事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ 事件,并将流的记录写入目标主题,因为流的事件逻辑地替换静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器重复每个快照块的进程。

警告

SQL Server 的 Debezium 连接器不支持增量快照运行时的 schema 更改。

8.2.1.3. 触发增量快照

目前,启动增量快照的唯一方法是向源数据库上的 信号发送临时快照 信号。

您可以以 SQL INSERT 查询形式向信号表提交信号。

在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,以及可选的指定快照操作类型。目前,快照操作的唯一有效选项是默认值 增量

要指定快照中包含的表,请提供一个 data-collections 数组,该数组列出了用于匹配表的表或一组正则表达式,例如:

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要任何操作,且不执行快照。

注意

如果要包含在快照中的表名称包含数据库、模式或表名称的句点(.),要将表添加到 data-collections 数组中,您必须用双引号转义名称的每个部分。

例如,若要包含 公共 模式中存在的表,并且名为 My.Table,请使用以下格式:" public"."My.Table "。

先决条件

流程

  1. 发送 SQL 查询,将临时增量快照请求添加到信号表中:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}, 5
        "additional-condition":"color=blue"}'); 6

    命令中的 idtypedata 参数的值对应于 信号表的字段

    下表描述了示例中的参数:

    表 8.2. 向信号表发送增量快照信号的 SQL 命令中的字段描述
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定分配给信号请求的 id 标识符的任意字符串。
    使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。相反,在快照中,Debebe 会生成自己的 id 字符串作为水位信号。

    3

    execute-snapshot

    type 参数指定信号要触发的操作。

    4

    data-collections

    信号的 data 字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
    数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定 signal.data.collection 配置属性中的连接器信号表的名称相同。

    5

    增量

    信号的 data 字段的可选 类型 组件,用于指定要运行的快照操作类型。
    目前,唯一有效选项是默认值 增量
    如果没有指定值,连接器将运行一个增量快照。

    6

    additional-condition

    可选字符串,它根据表的列指定条件,用于捕获表内容的子集。有关 additional-condition 参数的详情请参考 带有额外条件的临时增量快照

带有额外条件的临时增量快照

如果您希望快照在表中仅包含内容子集,您可以通过将 additional-condition 参数附加到快照信号来修改信号请求。

典型的快照的 SQL 查询采用以下格式:

SELECT * FROM <tableName> ....

通过添加 additional-condition 参数,您可以将 WHERE 条件附加到 SQL 查询中,如下例所示:

SELECT * FROM <tableName> WHERE <additional-condition> ....

以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

例如,假设您有一个包含以下列的产品表:

  • id (主密钥)
  • color
  • quantity

如果您希望 product 表的增量快照只包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

additional-condition 参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含那些项目的 data =bluequantity>10

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增加快照事件信息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效选项是默认值 增量
提交给信号表的 SQL 查询中指定类型 值是可选的。
如果没有指定值,连接器将运行一个增量快照。

2

op

指定事件类型。
快照事件的值是 r,代表 READ 操作。

8.2.1.4. 停止增量快照

您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT 查询,向表提交停止快照信号。

在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行时停止增量快照操作。

您提交的查询指定了 增量 的快照操作,以及可选的、要删除的当前运行快照的表。

先决条件

流程

  1. 发送 SQL 查询以停止临时增量快照到信号表:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    signal 命令中的 idtypedata 参数的值对应于 信号表的字段

    下表描述了示例中的参数:

    表 8.3. 向信号表发送停止增量快照信号的 SQL 命令中的字段描述
    描述

    1

    myschema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定分配给信号请求的 id 标识符的任意字符串。
    使用此字符串来标识到信号表中条目的日志消息。Debezium 不使用此字符串。

    3

    stop-snapshot

    指定 type 参数指定信号要触发的操作。

    4

    data-collections

    信号的 data 字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
    数组列出了根据完全限定名称匹配表的正则表达式,其格式与您用来指定 signal.data.collection 配置属性中的连接器信号表的名称相同。如果省略了 data 字段的此组件,则信号将停止正在进行的整个增量快照。

    5

    增量

    信号的 data 字段所需的组件,用于指定要停止的快照操作类型。
    目前,唯一有效选项是 增量的
    如果没有指定 类型 值,则信号无法停止增量快照。

8.2.2. Debezium SQL Server 连接器如何读取更改数据表

当连接器首次启动时,它会对捕获表的结构进行结构快照,并将这些信息保留给其内部数据库架构历史记录主题。然后连接器会识别每个源表的更改表,并完成以下步骤。

  1. 对于每个更改表,连接器会读取最后一次存储的最大 LSN 和当前最大 LSN 之间创建的所有更改。
  2. 连接器根据提交 LSN 的值排序它以升序读取的更改,并更改 LSN。这种排序顺序可确保 Debezium 按照数据库中发生的相同顺序重新执行更改。
  3. 连接器通过提交,并将 LSN 作为偏移改为 Kafka Connect。
  4. 连接器存储最大 LSN,并从第 1 步中重启进程。

重启后,连接器会从它读取的最后一个偏移(提交并更改 LSN)中恢复处理。

连接器可以检测是否为包含的源表启用或禁用 CDC,并调整其行为。

8.2.3. 数据库中没有记录的最大 LSN

当数据库中没有记录最大 LSN 时,可能会出现以下情况,因为:

  1. SQL Server Agent 没有运行
  2. 更改表中未记录任何更改
  3. 数据库的活动较低,cdc 清理作业会定期清除 cdc 表中的条目

从这些可能之外,由于正在运行的 SQL Server Agent 是先决条件,因此没有 1。是一个实际问题(虽然无 2 和 3)。

为了缓解这个问题并区分 No 1. 和不同,对 SQL Server Agent 的检查是通过以下查询 "SELECT CASE WHEN dss.[status]=4\":\"N 1 ELSE 0 END AS 进行的,则以下查询 "SELECT CASE WRE dss.[servicename] LIKE N'SQL Server Agent (%';".如果 SQL Server Agent 没有运行,则会在日志中写入 ERROR:"No maximum LSN in the database; SQL Server Agent is not running"。

重要

运行状态查询的 SQL Server Agent 需要 7000 SERVER STATE 服务器权限。如果您不想向配置的用户授予这个权限,您可以选择通过 database.sqlserver.agent.status.query 属性配置您自己的查询。您可以定义一个功能,如果 SQL Server Agent 正在运行(false 或 0),并且安全地使用高级别权限,而无需根据此处所述,我需要为用户提供什么最小权限,以便它可以检查 SQL Server Agent 服务的状态? 或者这里使用高级别权限,而无需授予任何人:服务器级别。query 属性的配置应类似: database.sqlserver.agent.status.query=SELECT [ funcdb].func_is_sql_server_agent_running () - 您需要使用 [114db] 作为数据库名称的占位符。

8.2.4. Debezium SQL Server 连接器的限制

SQL Server 专门要求基本对象作为表,才能创建更改捕获实例。因此,SQL Server 不支持从索引视图(也称为 规范化视图)中捕获更改,因此 Debezium SQL Server 连接器。

8.2.5. 接收 Debezium SQL Server 更改事件记录的 Kafka 主题默认名称

默认情况下,SQL Server 连接器将所有 INSERTUPDATEDELETE 操作的事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题:< topicPrefix> . <schemaName& gt; . &lt;tableName>

以下列表提供默认名称组件的定义:

topicPrefix
服务器的逻辑名称,如 topic.prefix 配置属性指定。
schemaName
发生更改事件的数据库模式的名称。
tableName
发生更改事件的数据库表的名称。

例如,如果 fulfillment 是逻辑服务器名称,而 dbo 是 schema 名称,数据库包含名称 product, products _on_hand,customers, 和 orders 的表,连接器会将事件记录流传输为以下 Kafka 主题:

  • fulfillment.testDB.dbo.products
  • fulfillment.testDB.dbo.products_on_hand
  • fulfillment.testDB.dbo.customers
  • fulfillment.testDB.dbo.orders

连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 以及 事务元数据主题

如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

8.2.6. Debezium SQL Server 连接器如何使用 schema 更改主题

对于启用了 CDC 的每个表,Debezium SQL Server 连接器会存储应用于数据库中捕获的表的模式更改事件的历史记录。连接器将模式更改事件写入名为 < topicPrefix&gt; 的 Kafka 主题,其中 topicPrefixtopic.prefix 配置属性中指定的逻辑服务器名称。

连接器发送到 schema 更改主题的消息包含一个有效负载,也可以包含更改事件消息的 schema。模式更改事件消息的有效负载包括以下元素:

databaseName
将语句应用到的数据库的名称。databaseName 的值充当 message 键。
tableChanges
模式更改后整个表架构的结构化表示。tableChanges 字段包含一个数组,其中包含表中的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析程序处理它们。
重要

当连接器配置为捕获表时,它会保存表架构的历史记录,不仅在架构更改主题中,也存储在内部数据库模式历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要有关架构的通知的应用程序只消耗了该模式的更改主题的信息。

警告

连接器发送到其架构更改主题的消息格式处于 incubating 状态,可以在没有通知的情况下更改。

当发生以下事件时,Debebe 会将信息发送到 schema 更改主题:

示例:发送到 SQL Server 连接器模式更改主题的消息

以下示例显示了 schema 更改主题中的消息。消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "2.1.4.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 0,
      "snapshot": "true",
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": null,
      "commit_lsn": "00000025:00000d98:00a2",
      "event_serial_no": null
    },
    "ts_ms": 1588252618953, 1
    "databaseName": "testDB", 2
    "schemaName": "dbo",
    "ddl": null, 3
    "tableChanges": [ 4
      {
        "type": "CREATE", 5
        "id": "\"testDB\".\"dbo\".\"customers\"", 6
        "table": { 7
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 8
            "id"
          ],
          "columns": [ 9
            {
              "name": "id",
              "jdbcType": 4,
              "nativeType": null,
              "typeName": "int identity",
              "typeExpression": "int identity",
              "charsetName": null,
              "length": 10,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "first_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "last_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "email",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 10
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表 8.4. 向 schema 更改主题发送的消息中的字段描述
字段名称描述

1

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源对象中,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

2

databaseName
schemaName

标识包含更改的数据库和 schema。

3

ddl

对于 SQL Server 连接器,始终为 null。对于其他连接器,此字段包含负责架构更改的 DDL。此 DDL 不适用于 SQL Server 连接器。

4

tableChanges

包含 DDL 命令生成的模式更改的一个或多个项目的数组。

5

type

描述更改的类型。该值是以下之一:

  • CREATE - 表已创建
  • ALTER - 表被修改
  • DROP - 表被删除

6

id

创建、更改或丢弃的表的完整标识符。

7

table

应用更改后代表表元数据。

8

primaryKeyColumnNames

编写表主密钥的列列表。

9

columns

更改表中的每个列的元数据。

10

属性

每个表更改的自定义属性元数据。

在连接器发送到 schema 更改主题的消息中,键是包含架构更改的数据库名称。在以下示例中,payload 字段包含键:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.sqlserver.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "testDB"
  }
}

8.2.7. Debezium SQL Server 连接器数据更改事件的描述

Debezium SQL Server 连接器会为每行级别 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件都包含一个键和值。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,则消费者可以从 registry 获取 schema 的模式 ID。这使得每个事件都可以自我包含。

以下框架 JSON 显示更改事件的基本四个部分。但是,如何配置在应用程序中使用的 Kafka Connect converter 决定了更改事件中的这四个部分的表示。只有当您将转换器配置为生成它时,schema 字段才处于更改事件中。同样,只有在将转换器配置为生成它时,才会发生更改事件中的事件密钥和事件有效负载。如果您使用 JSON 转换器,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表 8.5. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是 event 键的一部分。它指定一个 Kafka Connect 模式,它描述了事件密钥 有效负载部分中的内容。换句话说,第一个 schema 字段描述了主密钥的结构,或者唯一键(如果表没有主密钥)用于更改的表。

可以通过设置 message.key.columns 连接器配置属性 来覆盖表的主键。在本例中,第一个 schema 字段描述了由该属性标识的密钥的结构。

2

payload

第一个 payload 字段是 event 键的一部分。它具有上一个 schema 字段描述的结构,其中包含更改的行的密钥。

3

schema

第二个 schema 字段是 event 值的一部分。它指定了 Kafka Connect 模式,它描述了事件值 有效负载部分中的内容。换句话说,第二个 模式 描述了更改的行的结构。通常,此架构包含嵌套模式。

4

payload

第二个 payload 字段是 event 值的一部分。它具有上一个 schema 字段描述的结构,其中包含更改的行的实际数据。

默认情况下,连接器流将事件记录更改为名称与事件原始表相同的名称的主题。如需更多信息,请参阅 主题名称

警告

SQL Server 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着,逻辑服务器名称必须以拉丁或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余的字符以及数据库和表名称中的每个字符都必须是字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将替换为下划线字符。

如果逻辑服务器名称、数据库名称或表名称包含无效字符,且区分另一个名称的唯一字符无效,则可能会导致意外冲突,因此替换为下划线。

有关更改事件的详情,请查看以下主题:

8.2.7.1. 关于 Debezium SQL Server 中的键更改事件

更改事件的密钥包含更改表的密钥的 schema,以及更改的行的实际键。在连接器创建事件时,模式及其对应有效负载都包含更改表的主键(或唯一键约束)中的每个列的字段。

请考虑以下 客户 表,然后是此表的更改事件键示例。

表示例

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

更改事件键示例

捕获对 customers 表的更改的每个更改事件都有相同的事件键模式。只要 customers 表有以前的定义,捕获 customer 表更改的事件都有以下键结构,在 JSON 中类似如下:

{
    "schema": { 1
        "type": "struct",
        "fields": [ 2
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false, 3
        "name": "server1.testDB.dbo.customers.Key" 4
    },
    "payload": { 5
        "id": 1004
    }
}
表 8.6. 更改事件密钥的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了密钥 有效负载部分中的内容

2

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、类型以及是否需要它。在本例中,有一个名为 id 的必填字段,类型为 int32

3

optional

指明 event 键必须在其 payload 字段中包含一个值。在本例中,密钥有效负载中的值是必需的。当表没有主密钥时,键的 payload 字段中的值是可选的。

4

server1.dbo.testDB.customers.Key

定义密钥有效负载结构的 schema 名称。这个模式描述了更改的表的主键的结构。键架构名称的格式为 connector-name.database-schema-name.table-name.Key。在这个示例中:

  • server1 是生成此事件的连接器的名称。
  • dbo 是更改的表的数据库模式。
  • 客户 是已更新的表。

5

payload

包含生成此更改事件的行的键。在本例中,键包含一个 id 字段,其值是 1004

8.2.7.2. 关于 Debezium SQL Server 更改事件中的值

更改事件中的值比键稍微复杂。与键一样,该值具有 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的事件,并且具有 envelope 结构的值有效负载。

考虑用于显示更改事件键示例的相同示例:

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

为每个事件类型描述了更改事件的值部分。

创建 事件

以下示例显示了连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.testDB.customers.Value", 2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.testDB.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source", 3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "server1.dbo.testDB.customers.Envelope" 4
  },
  "payload": { 5
    "before": null, 6
    "after": { 7
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "source": { 8
      "version": "2.1.4.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729468470,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
      "event_serial_no": "1"
    },
    "op": "c", 9
    "ts_ms": 1559729471739 10
  }
}
表 8.7. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,用于描述值有效负载的结构。每次连接器为特定表生成的更改时,更改事件的值模式都是相同的。

2

name

schema 部分中,每个 name 字段指定值有效负载中的字段的 schema。

server1.dbo.testDB.customers.Value 是有效负载 beforeafter 字段的 schema。这个模式特定于 customers 表。

beforeafter 字段的 schema 名称的格式是 logicalName.database-schemaName.tableName.Value,这样可确保架构名称在数据库中是唯一的。这意味着,在使用 Avro converter 时,每个逻辑源中的每个表生成的 Avro 模式都有自己的演进和历史记录。

3

name

io.debezium.connector.sqlserver.Source 是有效负载的 source 字段的 schema。这个模式特定于 SQL Server 连接器。连接器将其用于它生成的所有事件。

4

name

server1.dbo.testDB.customers.Envelope 是有效负载总体结构的模式,其中 server1 是连接器名称,dbo 是数据库架构名称,customers 是表。

5

payload

值的实际数据。这是更改事件提供的信息。

似乎,事件的 JSON 表示要大于它们描述的行。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的信息大小。

6

之前

指定事件发生前行状态的可选字段。当 op 字段是 c 用于创建(如本例所示),before 字段为 null,因为此更改事件用于新内容。

7

after

指定事件发生后行状态的可选字段。在本例中,after 字段包含新行的 idfirst_namelast_nameemail 列的值。

8

source

描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序,以及事件是否为同一事务的一部分。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 数据库和模式名称
  • 在数据库中进行更改的时间戳
  • 如果事件是快照的一部分
  • 包含新行的表名称
  • 服务器日志偏移

9

op

描述导致连接器生成事件的操作类型的必要字符串。在本例中,c 表示操作创建了行。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = read (仅适用于快照)

10

ts_ms

显示连接器处理事件的时间的可选字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源对象中ts_ms 表示更改在数据库中提交的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

更新 事件

示例 customer 表中更新的更改事件值与该表的 create 事件相同。同样,事件的 payload 具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。以下是连接器在 customers 表中为更新生成的更改事件值示例:

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "after": { 2
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "source": { 3
      "version": "2.1.4.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729995937,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000ac0:0002",
      "commit_lsn": "00000027:00000ac0:0007",
      "event_serial_no": "2"
    },
    "op": "u", 4
    "ts_ms": 1559729998706  5
  }
}
表 8.8. 更新 事件值字段的描述
字段名称描述

1

之前

指定事件发生前行状态的可选字段。在 update 事件值中,before 字段包含每个表列的一个字段,以及在数据库提交前在该列中的值。在这个示例中,电子邮件 值为 john.doe@example.org。

2

after

指定事件发生后行状态的可选字段。您可以比较 beforeafter 结构,以确定此行的更新是什么。在这个示例中,电子邮件 值现在是 noreply@example.org

3

source

描述事件源元数据的必需字段。source 字段结构与 create 事件中的字段相同,但一些值有所不同,例如,示例 更新 事件具有不同的偏移量。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 数据库和模式名称
  • 在数据库中进行更改的时间戳
  • 如果事件是快照的一部分
  • 包含新行的表名称
  • 服务器日志偏移

event_serial_no 字段区分具有相同提交并更改 LSN 的事件。当此字段具有 1 以外的值时,的典型情况:

  • 更新 事件将值设为 2,因为更新会在 SQL Server 的 CDC 更改表中生成两个事件(详情请参阅源文档)。第一个事件包含旧值,第二个包含新值。连接器使用第一个事件中的值来创建第二个事件。连接器丢弃第一个事件。
  • 更新主密钥时,SQL Server 会发出两个 evemts。对于删除带有旧主键的记录,有一个 delete 事件;对于添加带有新主键的记录,有一个 create 事件。两个操作共享相同的提交,并且分别更改 LSN 及其事件号为 12

4

op

描述操作类型的强制字符串。在 update 事件值中,op 字段值为 u,表示此行因为更新而改变。

5

ts_ms

显示连接器处理事件的时间的可选字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示更改提交到数据库的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

注意

更新行的主/唯一键的列会更改行键的值。当键更改时,Debebe 会输出 三个 事件:一个 delete 事件和带有行的旧键的 tombstone 事件,后跟带有行新键的 create 事件。

删除 事件

delete 更改事件中的值与为同一表的 createupdate 事件相同的 schema 部分。示例 customer 表的 delete 事件中 payload 部分类似如下:

{
  "schema": { ... },
  },
  "payload": {
    "before": { <>
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "after": null, 1
    "source": { 2
      "version": "2.1.4.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559730445243,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000db0:0005",
      "commit_lsn": "00000027:00000db0:0007",
      "event_serial_no": "1"
    },
    "op": "d", 3
    "ts_ms": 1559730450205 4
  }
}
表 8.9. 删除 事件值字段的描述
字段名称描述

1

之前

指定事件发生前行状态的可选字段。在一个 delete 事件值中,before 字段包含在使用数据库提交删除行前的值。

2

after

指定事件发生后行状态的可选字段。在 delete 事件值中,after 字段为 null,表示行不再存在。

3

source

描述事件源元数据的必需字段。在一个 delete 事件值中,source 字段结构与同一表的 createupdate 事件相同。许多 source 字段值也相同。在 delete 事件值中,ts_mspos 字段值以及其他值可能已更改。但是,delete 事件值中的 source 字段提供了相同的元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 数据库和模式名称
  • 在数据库中进行更改的时间戳
  • 如果事件是快照的一部分
  • 包含新行的表名称
  • 服务器日志偏移

4

op

描述操作类型的强制字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms

显示连接器处理事件的时间的可选字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

SQL Server 连接器事件旨在用于 Kafka 日志压缩。只要每个密钥至少保留最新的消息,日志压缩就可以删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

tombstone 事件

删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,如果 Kafka 删除具有相同键的所有消息,消息值必须是 null。为了实现此目的,在 Debezium 的 SQL Server 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但为 null 值。

8.2.8. Debezium SQL Server 连接器生成的事件代表事务边界

Debezium 可以生成代表事务边界的事件,并增强数据更改事件消息。

Debezium 接收事务元数据时的限制

Debezium 仅在部署连接器后为发生的事务注册并接收元数据。部署连接器前发生的事务元数据。

数据库事务由语句块表示,该块在 BEGINEND 关键字之间括起。Debezium 为每个事务生成 BEGINEND 分隔符的事务边界事件。事务边界事件包含以下字段:

status
BEGINEND
id
唯一事务标识符的字符串表示。
ts_ms
数据源上事务边界事件(BEGINEND 事件)的时间。如果数据源没有向 Debezium 提供事件时间,则该字段代表 Debezium 处理事件的时间。
event_count (用于 END 事件)
事务所设计的事件总数。
data_collections (用于 END 事件)
data_collectionevent_count 元素的一组对,指示连接器为来自数据收集的更改发出的事件数。
警告

Debezium 无法可靠地识别事务何时结束。因此,只有在第一个事务到达另一个事务后,才会发出事务的 END 标记。在出现低流量系统时,这可能会导致 END 标记的延迟发送。

以下示例显示了典型的事务边界信息:

示例:SQL Server 连接器事务边界事件

{
  "status": "BEGIN",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "testDB.dbo.testDB.tablea",
      "event_count": 1
    },
    {
      "data_collection": "testDB.dbo.testDB.tableb",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic. prefix>.transaction 的主题

8.2.8.1. 更改数据事件增强

如果启用了事务元数据,数据消息 Envelope 会增加一个新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串表示
total_order
事件在事务生成的所有事件的绝对路径
data_collection_order
事件在事务发送的所有事件中每个数据收集位置

以下示例显示了典型的信息是什么:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "00000025:00000d08:0025",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

8.2.9. Debezium SQL Server 连接器如何映射数据类型

Debezium SQL Server 连接器通过生成类似于行存在的表的事件来代表表行数据的更改。每个事件都包含表示行列值的字段。事件表示操作的列值的方式取决于该列的 SQL 数据类型。在事件中,连接器将每个 SQL Server 数据类型的字段映射到 字面类型和 语义类型

连接器可将 SQL Server 数据类型映射到 字面语义 类型。

字面类型
描述如何使用 Kafka Connect 模式类型(即 INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, 和 STRUCT)来代表值。
语义类型
描述 Kafka Connect 模式如何使用字段的名称捕获字段 的含义

如果默认数据类型转换无法满足您的需要,您可以为连接器 创建自定义转换器

有关数据类型映射的更多信息,请参阅以下部分:

基本类型

下表显示了连接器如何映射基本的 SQL Server 数据类型。

表 8.10. SQL Server 连接器使用的数据类型映射
SQL Server 数据类型字面类型(架构类型)语义类型(架构名称)和备注

布尔值

不适用

TINYINT

INT16

不适用

SMALLINT

INT16

不适用

INT

INT32

不适用

BIGINT

INT64

不适用

REAL

FLOAT32

不适用

FLOAT[(N)]

FLOAT64

不适用

CHAR[(N)]

字符串

不适用

VARCHAR[(N)]

字符串

不适用

文本

字符串

不适用

NCHAR[(N)]

字符串

不适用

NVARCHAR[(N)]

字符串

不适用

NTEXT

字符串

不适用

XML

字符串

io.debezium.data.Xml

包含 XML 文档的字符串表示

DATETIMEOFFSET[(P)]

字符串

io.debezium.time.ZonedTimestamp

使用时区信息的时间戳的字符串表示,其中时区为 GMT

以下部分介绍了其他数据类型映射。

如果存在,列的默认值会被传播到对应的字段的 Kafka Connect 模式。更改消息将包含字段的默认值(除非给出了显式列值),因此应该很少需要从 schema 获取默认值。

临时值

除 SQL Server 的 DATETIMEOFFSET 数据类型(包含时区信息)外,其他时间类型取决于 time.precision.mode 配置属性的值。当 time.precision.mode 配置属性被设置为 adaptive (默认值),那么连接器将根据列的数据类型定义决定 temporal 类型的字面类型和语义类型,以便事件 完全 表示数据库中的值:

SQL Server 数据类型字面类型(架构类型)语义类型(架构名称)和备注

DATE

INT32

io.debezium.time.Date

代表自 epoch 以来的天数。

TIME (0), TIME (1 ), TIME (2), TIME (3)

INT32

io.debezium.time.Time

代表过去几周的毫秒数,不包括时区信息。

TIME (4) , TIME (5), TIME (6)

INT64

io.debezium.time.MicroTime

代表过去几周的微秒数,不包括时区信息。

TIME (7)

INT64

io.debezium.time.NanoTime

代表过去几夜的纳秒数量,不包括时区信息。

日期

INT64

io.debezium.time.Timestamp

代表超过 epoch 的毫秒数,不包括时区信息。

SMALLDATETIME

INT64

io.debezium.time.Timestamp

代表超过 epoch 的毫秒数,不包括时区信息。

DATETIME2 (0), DATETIME2 (1), DATETIME2 (2) , DATETIME2 (3)

INT64

io.debezium.time.Timestamp

代表超过 epoch 的毫秒数,不包括时区信息。

DATETIME2 (4), DATETIME2 (5), DATETIME2 (6)

INT64

io.debezium.time.MicroTimestamp

代表超过 epoch 的微秒数,不包括时区信息。

DATETIME2 (7)

INT64

io.debezium.time.NanoTimestamp

代表超过 epoch 的纳秒数量,不包括时区信息。

time.precision.mode 配置属性设为 connect 时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者只了解内置的 Kafka Connect 逻辑类型且无法处理变量精度时间值时,这非常有用。另一方面,因为 SQL 服务器支持十分之一的微秒精度,带有 connect 时间精度模式的连接器将在有一个大于 3 的 fractional second precision 数据库列时丢失一些精度

SQL Server 数据类型字面类型(架构类型)语义类型(架构名称)和备注

DATE

INT32

org.apache.kafka.connect.data.Date

代表自 epoch 后的天数。

TIME([P])

INT64

org.apache.kafka.connect.data.Time

从中夜起代表毫秒的数量,不包括时区信息。SQL Server 允许 P 在 0-7 范围内存储最多十分之一的微秒精度,但此模式会在 P > 3 时丢失精度。

日期

INT64

org.apache.kafka.connect.data.Timestamp

从 epoch 开始代表毫秒的数量,不包括时区信息。

SMALLDATETIME

INT64

org.apache.kafka.connect.data.Timestamp

代表超过 epoch 的毫秒数,不包括时区信息。

DATETIME2

INT64

org.apache.kafka.connect.data.Timestamp

从 epoch 开始代表毫秒的数量,不包括时区信息。SQL Server 允许 P 在 0-7 范围内存储最多十分之一的微秒精度,但此模式会在 P > 3 时丢失精度。

时间戳值

DATETIMESMALLDATETIMEDATETIME2 类型代表没有时区信息的时间戳。这些列根据 UTC 转换为等同的 Kafka Connect 值。例如,DATETIME2 值 "2018-06-20 15:13:16.945104" 由 io.debezium.time.MicroTimestamp 代表,值为 "1539)07596945104"。

请注意,运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这个转换。

十进制值

Debezium 连接器根据 decimal.handling.mode 连接器配置属性 的设置处理十进制。

decimal.handling.mode=precise
表 8.11. 当 decimal.handling.mode=precise时进行映射
SQL Server 类型字面类型(架构类型)语义类型(架构名称)

NUMERIC[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

DECIMAL[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

SMALLMONEY

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

领导

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

decimal.handling.mode=double
表 8.12. Mappings when decimal.handling.mode=double
SQL Server 类型字面类型语义类型

NUMERIC[(M[,D])]

FLOAT64

不适用

DECIMAL[(M[,D])]

FLOAT64

不适用

SMALLMONEY[(M[,D])]

FLOAT64

不适用

MONEY[(M[,D])]

FLOAT64

不适用

decimal.handling.mode=string
表 8.13. Mappings when decimal.handling.mode=string
SQL Server 类型字面类型语义类型

NUMERIC[(M[,D])]

字符串

不适用

DECIMAL[(M[,D])]

字符串

不适用

SMALLMONEY[(M[,D])]

字符串

不适用

MONEY[(M[,D])]

字符串

不适用

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.