2.7. SQL Server 的 Debezium 连接器


Debezium SQL Server 连接器捕获 SQL Server 数据库 schema 中发生的行级更改。

有关与此连接器兼容的 SQL Server 版本的详情,请参考 Debezium 支持的配置 页面

有关 Debezium SQL Server 连接器及其使用的详情,请查看以下主题:

当 Debezium SQL Server 连接器连接到 SQL Server 数据库或集群时,它会获取数据库中 schema 的一致性快照。初始快照完成后,连接器会持续捕获 INSERTUPDATEDELETE 操作的行级更改,它们提交到为 CDC 启用的 SQL Server 数据库。连接器为每个数据更改操作生成事件,并将其流传输到 Kafka 主题。连接器将表的所有事件流传输到专用 Kafka 主题。然后,应用程序和服务可以使用来自该主题的数据更改事件记录。

2.7.1. Debezium SQL Server 连接器概述

Debezium SQL Server 连接器基于 SQL Server 2016 Service Pack 1 (SP1)以及更新的 标准版本或企业版本中提供的 更改数据捕获 功能。SQL Server 捕获进程监控指定的数据库和表,并将更改存储在特定创建的 change tables 中。

要启用 Debezium SQL Server 连接器来捕获数据库操作的更改事件记录,您必须首先在 SQL Server 数据库上启用更改数据捕获。在数据库和您要捕获的每个表中都必须启用 CDC。在源数据库上设置 CDC 后,连接器可以捕获数据库中发生的行级 INSERTUPDATEDELETE 操作。连接器将每个源表的事件记录写入 Kafka 主题,特别是该表。每个捕获的表都有一个主题。客户端应用程序读取其后面的数据库表的 Kafka 主题,并可以响应这些主题中消耗的行级事件。

当连接器连接到 SQL Server 数据库或集群时,它会为它配置为捕获更改的所有表生成一致的模式快照,并将这个状态流传输到 Kafka。快照完成后,连接器会持续捕获发生的后续行级更改。通过首先建立所有数据的一致视图,连接器可以继续读取,而不会丢失快照发生期间所做的任何更改。

Debezium SQL Server 连接器接受故障。当连接器读取更改并生成事件时,它会定期记录数据库日志中事件的位置(LSN / Log Sequence Number)。如果连接器因任何原因(包括通信失败、网络问题或崩溃)停止,重启连接器会从它读取的最后点恢复 SQL Server CDC 表。

注意

偏移会定期提交。它们不会在发生更改事件时提交。因此,在中断后,可能会生成重复的事件。

容错也适用于快照。也就是说,如果连接器在快照过程中停止,连接器会在重启时启动新的快照。

2.7.2. Debezium SQL Server 连接器如何工作

要优化地配置和运行 Debezium SQL Server 连接器,了解连接器如何执行快照、流更改事件、决定 Kafka 主题名称,并使用元数据。

有关连接器如何工作的详情,请查看以下部分:

2.7.2.1. Debezium SQL Server 连接器如何执行数据库快照

SQL Server CDC 不设计为存储数据库更改的完整历史记录。对于 Debezium SQL Server 连接器,为数据库的当前状态建立基准,它会使用名为 snapshotting 的进程。初始快照捕获数据库中表的结构和数据。

您可以在以下部分中找到有关快照的更多信息:

2.7.2.1.1. Debezium SQL Server 连接器用于执行初始快照的默认工作流

以下工作流列出了 Debezium 创建快照的步骤。这些步骤描述了当 snapshot.mode 配置属性设置为默认值时快照的进程,这是 初始。您可以通过更改 snapshot.mode 属性的值来自定义连接器创建快照的方式。如果您配置不同的快照模式,连接器使用此工作流的修改版本完成快照。

  1. 建立与数据库的连接。
  2. 确定要捕获的表。默认情况下,连接器会捕获所有非系统表。要让连接器捕获表或表元素的子集,您可以设置多个 includeexclude 属性来过滤数据,如 table.include.listtable.exclude.list
  3. 在启用了 CDC 的 SQL Server 表上获取锁定,以防止在创建快照期间发生结构更改。锁定的级别由 snapshot.isolation.mode 配置属性决定。
  4. 阅读服务器事务日志中的最大日志序列号(LSN)位置。
  5. 捕获所有非系统的结构或指定用于捕获的所有表。连接器在其内部数据库模式历史记录主题中保留此信息。架构历史记录提供有关发生更改事件时所生效的结构的信息。

    注意

    默认情况下,连接器捕获数据库中处于捕获模式的每个表的模式,包括没有为捕获配置的表。如果没有为捕获配置表,则初始快照只捕获其结构;它不会捕获任何表数据。有关您在初始快照中没有包括快照持久模式信息的更多信息,请参阅 了解为什么初始快照捕获所有表的 schema

  6. 释放在第 3 步中获取的锁定(如果需要)。其他数据库客户端现在可以写入任何之前锁定的表。
  7. 在第 4 步读取的 LSN 位置,连接器会扫描要捕获的表。在扫描过程中,连接器完成以下任务:

    1. 确认表已在快照开始之前创建。如果在快照开始后创建了表,连接器会跳过该表。快照完成后,连接器过渡到 streaming,它会为快照开始后创建的任何表发出更改事件。
    2. 为从表中捕获的每行生成一个 读取 事件。所有 读取 事件都包含相同的 LSN 位置,这是在第 4 步中获取的 LSN 位置。
    3. 向表的 Kafka 主题发出每个 读取 事件。
  8. 在连接器偏移中记录成功完成快照。

生成的初始快照捕获为 CDC 启用的表中的每行的当前状态。在这个基准状态中,连接器会捕获后续更改。

在快照过程开始后,如果进程因为连接器失败、重新平衡或其他原因而中断,进程会在连接器重启后重启。

连接器完成初始快照后,它会从第 4 步中读取的位置继续流传输,以便它不会丢失任何更新。

如果连接器因任何原因再次停止,它会在重启后恢复流更改,从之前离开的位置恢复流更改。

表 2.162. snapshot.mode 连接器配置属性的设置
设置描述

always

在每个连接器启动时执行快照。快照完成后,连接器将开始流传输后续数据库更改的事件记录。

Initial

连接器执行数据库快照,如用于创建初始快照的默认工作流 中所述。快照完成后,连接器将开始流传输后续数据库更改的事件记录。

initial_only

连接器执行数据库快照并在流传输任何更改事件记录前停止,不允许捕获任何后续更改事件。

schema_only

弃用,请参阅 no_data

no_data

连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但它没有创建 READ 事件来代表连接器启动时设置的数据集(Step 7.b)。

recovery

设置这个选项以恢复丢失或损坏的数据库 schema 历史记录主题。重启后,连接器运行一个从源表中重建主题的快照。您还可以设置属性,以定期修剪遇到意外增长的数据库 schema 历史记录主题。

+ 警告:如果在最后一个连接器关闭后将架构更改提交到数据库,请不要使用此模式执行快照。

when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

如需更多信息,请参阅连接器配置属性表中的 snapshot.mode

2.7.2.1.2. 有关初始快照捕获所有表的 schema 历史记录的描述

连接器运行的初始快照捕获两种类型的信息:

表数据
有关 INSERTUPDATEDELETE 操作的信息,它们在连接器的 table.include.list 属性中命名。
模式数据
描述应用到表的结构更改的 DDL 语句。如果配置了 schema 数据,则 schema 数据会被保留到内部模式历史记录主题,以及连接器的 schema 更改主题。

运行初始快照后,您可能会注意到快照捕获了未指定用于捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的每个表的模式信息,而不仅限于为捕获指定的表。连接器要求表的 schema 在捕获表之前存在于 schema 历史记录主题中。通过启用初始快照来捕获不属于原始捕获集的表,Debezium 准备连接器,以便稍后需要从这些表中读取事件数据。如果初始快照没有捕获表的模式,您必须将 schema 添加到历史记录主题,然后才能从表中捕获数据。

在某些情况下,您可能想要限制初始快照中的模式捕获。当您要减少完成快照所需的时间时,这非常有用。或者,当 Debezium 通过有权访问多个逻辑数据库的用户帐户连接到数据库实例时,但您希望连接器只从特定逻辑数据库中的表捕获更改。

2.7.2.1.3. 从初始快照未捕获的表中捕获数据(无模式更改)

在某些情况下,您可能希望连接器从最初快照未捕获其 schema 的表中捕获数据。根据连接器配置,初始快照可能会只捕获数据库中特定表的表模式。如果历史记录主题中没有表模式,连接器无法捕获表,并报告缺少的 schema 错误。

您可能仍然能够从表中捕获数据,但您必须执行额外的步骤来添加表模式。

先决条件

流程

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答

    警告

    删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,并且仅应作为最后的手段来执行。

  4. 将以下更改应用到连接器配置:

    1. (可选)将 schema.history.internal.store.only.captured.tables.ddl 的值设置为 false。此设置会导致快照捕获所有表的模式,并保证将来,连接器可以重建所有表的 schema 历史记录。

      注意

      捕获所有表的模式的快照需要更多时间完成。

    2. 添加您希望连接器捕获到 table.include.list 的表。
    3. snapshot.mode 设置为以下值之一:

      Initial
      重启连接器时,它会获取获取表数据和表结构的完整数据库快照。
      如果您选择这个选项,请考虑将 schema.history.internal.store.only.captured.tables.ddl 属性的值设置为 false,以便连接器捕获所有表的 schema。
      schema_only
      重启连接器时,它会使用一个只捕获表模式的快照。与完整数据快照不同,此选项不会捕获任何表数据。如果要比使用完整快照更快重启连接器,请使用这个选项。
  5. 重启连接器。连接器完成 snapshot.mode 指定的快照类型。
  6. (可选)如果连接器在快照完成后执行 schema_only 快照,启动 增量快照 以从您添加的表中捕获数据。连接器在继续从表中实时更改时运行快照。运行增量快照会捕获以下数据更改:

    • 对于之前捕获的连接器的表,增量 snapsot 捕获连接器停机期间发生的更改,即连接器停止的时间和当前重启之间的间隔。
    • 对于新添加的表,增量快照会捕获所有现有表行。
2.7.2.1.4. 从没有由初始快照捕获的表捕获数据(schema 更改)

如果将架构更改应用到表,则在架构更改前提交的记录与更改后提交的结构不同。当 Debezium 捕获表中的数据时,它会读取 schema 历史记录,以确保它将正确的模式应用到每个事件。如果 schema 历史记录主题中没有 schema,连接器将无法捕获表,并会产生错误结果。

如果要捕获初始快照未捕获的表中的数据,并修改了表的 schema,您必须将 schema 添加到历史记录主题(如果还没有可用)。您可以通过运行新的模式快照或运行表的初始快照来添加架构。

先决条件

  • 您需要使用一个模式来捕获数据,连接器在初始快照过程中不会捕获。
  • 对表应用了一个架构更改,因此要捕获的记录没有统一的结构。

流程

初始快照捕获了所有表的模式(storage.only.captured.tables.ddl 设置为 false)
  1. 编辑 table.include.list 属性,以指定您要捕获的表。
  2. 重启连接器。
  3. 如果要从新添加的表中捕获现有数据,则启动 增量快照
初始快照没有捕获所有表的模式(storage.only.captured.tables.ddl 设置为 true)

如果初始快照没有保存您要捕获的表的模式,请完成以下步骤之一:

流程 1:Schema 快照,后跟增量快照

在此过程中,连接器首先执行 schema 快照。然后,您可以启动增量快照,使连接器能够同步数据。

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答

    警告

    删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,并且仅应作为最后的手段来执行。

  4. 为连接器配置中的属性设置值,如以下步骤所述:

    1. snapshot.mode 属性的值设置为 schema_only
    2. 编辑 table.include.list 以添加您要捕获的表。
  5. 重启连接器。
  6. 等待 Debezium 捕获新表和现有表的模式。连接器停止后发生的数据更改不会被捕获。
  7. 为确保没有数据丢失,可启动 增量快照
流程 2:初始快照,后跟可选的增量快照

在此过程中,连接器执行数据库的完整初始快照。与任何初始快照一样,在具有许多大表的数据库中,运行初始快照可能是一个耗时的操作。快照完成后,您可以选择性地触发增量快照,以捕获连接器离线期间发生的任何更改。

  1. 停止连接器。
  2. 删除由 schema.history.internal. kafka.topic 属性指定的内部数据库架构历史记录 主题。
  3. 清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答

    警告

    删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,并且仅应作为最后的手段来执行。

  4. 编辑 table.include.list 以添加您要捕获的表。
  5. 为连接器配置中的属性设置值,如以下步骤所述:

    1. snapshot.mode 属性的值设置为 initial
    2. (可选)将 schema.history.internal.store.only.captured.tables.ddl 设置为 false
  6. 重启连接器。连接器使用完整的数据库快照。快照完成后,连接器会过渡到 streaming。
  7. (可选)要捕获连接器脱机时更改的任何数据,请启动 增量快照

2.7.2.2. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。按照这个初始快照,在正常情况下,连接器不会重复快照过程。任何以后更改连接器捕获的事件数据都会通过流处理。

然而,在某些情况下,在初始快照中获取的连接器可能会变得过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行临时快照:

  • 连接器配置会被修改为捕获不同的表集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。

临时快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或者仅捕获数据库中表的子集。此外,快照也可捕获数据库中表的内容的子集。

您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供快照中包含的表名称,如下表所述:

表 2.163. 临时 execute-snapshot 信号记录示例
字段默认

type

incremental

指定您要运行的快照类型。
目前,您可以请求 增量阻塞 快照。

data-collections

N/A

包含与快照中包含的表的完全限定域名匹配的正则表达式的数组。
对于 SQL Server 连接器,请使用以下格式来指定表的完全限定名称: database.schema.table

additional-conditions

N/A

可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数:

data-collection
过滤器应用到的表的完全限定域名。您可以对每个表应用不同的过滤器。
filter
指定在数据库记录中必须存在的列值,以便快照包含它,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

surrogate-key

N/A

可选字符串,用于指定连接器在快照过程中用作表的主键的列名称。

触发临时增量快照

您可以通过向信号表添加带有 execute-snapshot 信号类型的条目,或者向 Kafka 信号发送信号消息 来启动临时增量快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个表的开始和端点。根据表中的条目数量以及配置的块大小,Debezium 会将表分成块,并持续持续对每个块进行快照。

如需更多信息,请参阅 增加快照

触发临时阻塞快照

您可以通过在信号表或信号主题中添加带有 execute-snapshot 信号类型的条目来启动临时阻塞快照。连接器处理消息后,它会开始快照操作。连接器会临时停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,连接器会恢复流。

如需更多信息,请参阅 块快照

2.7.2.3. 增量快照

SQL Server collations

每个 SQL Server 服务器或数据库都配置为使用特定的 collation,它决定了字符数据的存储方式、排序、比较和显示。某些 collation 组的排序规则,如 SQL Server collations (SQLvdsm) 与 Unicode 排序算法不兼容。在某些情况下,不兼容的排序规则可能会导致在连接器运行临时快照时丢失数据。例如,如果 SQL Server 配置为发送字符串为 Unicode (即,连接属性 sendStringParametersAsUnicode 设置为 true),则连接器可在快照期间跳过记录。要在临时快照期间防止丢失的数据,请将 driver.sendStringParametersAsUnicode connection string 属性的值设置为 false

有关使用 sendStringParametersAsUnicode 属性的更多信息,请参阅 SQL Server 连接属性文档

为了在管理快照方面提供灵活性,Debezium 包含一个补充快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,而不是像初始快照一样捕获数据库的完整状态,而是在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小是 1024 行。

当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护其捕获的每个表行的记录。与标准初始快照过程相比,这个分阶段方法捕获数据具有以下优点:

  • 您可以并行运行带有流数据捕获的增量快照,而不是延迟流,直到快照完成为止。连接器将继续在整个快照过程中从更改日志捕获接近实时事件,且操作都会阻止另一个事件。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从它停止的点开始,而不是从开始重新捕获表。
  • 您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其 table.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 按主密钥对每个表进行排序,然后根据 配置的块大小 将表分成块。通过块工作的块,然后捕获块中的每个表行。对于它捕获的每行,快照会发出 READ 事件。该事件代表了块开始快照时所在行的值。

当快照进行时,其他进程可能会继续访问数据库,可能会修改表记录。要反映此类更改,INSERTUPDATEDELETE 操作会按预期提交到事务日志。同样,持续 Debezium 流过程会继续检测到这些更改事件,并将对应的更改事件记录发送到 Kafka。

Debezium 如何处理具有相同主键的记录冲突

在某些情况下,streaming 进程发出的 UPDATEDELETE 事件会按顺序接收。也就是说,流处理可能会发出一个事件,在快照捕获了包含该行的 READ 事件前修改表行。当快照最终为行发出对应的 READ 事件时,其值已经被取代。为确保以正确的逻辑顺序处理出序列的增量快照事件,Debezium 采用缓冲方案来解决冲突。只有在快照事件和流事件之间冲突后,才会解析 Debezium 向 Kafka 发出事件记录。

快照窗口

为了帮助解决 late-arriving READ 事件和修改同一表行之间的冲突,Debezium 会使用一个所谓的 快照窗口。快照窗口分离间隔,在此期间会捕获指定表块的数据。在块的快照窗口打开前,Debezium 会遵循其通常的行为,并将事件从下游直接发送到目标 Kafka 主题。但是,从为特定块的快照打开,直到它关闭为止,Deduplication 会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。

对于每个数据收集,Debebe 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表捕获的快照记录被发送为 READ 操作。同时,当用户继续更新数据收集中的记录,并且更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debebe 开始处理快照块,它会向内存缓冲提供快照记录。在快照窗口中,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果没有找到匹配项,则流的事件记录直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃 buffered READ 事件,并将流记录写入目标主题,因为以逻辑方式取代静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器会为每个快照块重复这个过程。

目前,您可以使用以下任一方法启动增量快照:

警告

SQL Server 的 Debezium 连接器不支持增量快照运行时的 schema 更改。

2.7.2.3.1. 触发增量快照

要启动增量快照,您可以发送 临时快照信号 到源数据库上的信号表。您可以提交快照信号,作为 SQL INSERT 查询。

在 Debezium 检测到信号表中的更改后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并可选择性地指定快照操作的类型。Debezium 目前支持 增量阻塞 快照类型。

要指定要包含在快照中的表,提供一个列出表的 data-collections 数组,或用于匹配表的正则表达式数组,例如:

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debebe 会解释空数组,意味着不需要任何操作,且不会执行快照。

注意

如果要包含在快照中的表的名称包含一个点(.)、空格或其它非字母数字字符,则必须使用双引号转义表名称。
例如,若要将 公共 模式中存在的表包含在 db1 数据库中,并且名称为 My.Table,请使用以下格式 :"db1.public.\"My.Table\" "。

先决条件

使用源信号频道触发增量快照

  1. 发送 SQL 查询,将临时增量快照请求添加到信号表中:

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

    例如,

    INSERT INTO db1.myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], 4
        "type":"incremental", 5
        "additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=\'blue\'"}]}'); 6

    命令中的 idtypedata 参数的值 与信号表的字段 相对应。
    下表描述了示例中的参数:

    表 2.164. SQL 命令中的字段描述,用于将增量快照信号发送到信号表
    描述

    1

    database.schema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。相反,在快照过程中,Debebe 会生成自己的 id 字符串作为水位线信号。

    3

    execute-snapshot

    type 参数指定信号要触发的操作。

    4

    data-collections

    信号的必需组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
    数组列出了使用格式 database.schema.table 的正则表达式,以匹配表的完全限定域名。此格式与您用来指定连接器 信号表的名称相同

    5

    incremental

    data 字段的可选类型 组件,用于指定要运行的快照操作类型。
    有效值为 incrementalblocking 值。
    如果没有指定值,连接器默认为执行增量快照。

    6

    additional-conditions

    可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
    每个额外条件都是带有 data-collectionfilter 属性的对象。您可以为每个数据收集指定不同的过滤器。
    请参阅 data-collection 属性是过滤器应用到的数据收集的完全限定域名。有关 additional-conditions 参数的详情,请参考 第 2.7.2.3.2 节 “使用附加 条件运行临时增量快照”

2.7.2.3.2. 使用附加 条件运行临时增量快照

如果您希望快照只在表中包括内容子集,您可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。

对典型快照的 SQL 查询采用以下格式:

SELECT * FROM <tableName> ....

通过添加 additional-conditions 参数,您可以在 SQL 查询中附加 WHERE 条件,如下例所示:

SELECT * FROM <data-collection> WHERE <filter> ....

以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

例如,假设您有一个包含以下列的 products 表:

  • ID (主密钥)
  • color
  • quantity

如果您需要 product 表的增量快照,其中只包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');

additional-conditions 参数还允许您传递基于多个列的条件。例如,使用上例中的 product 表,您可以提交查询来触发增量快照,该快照仅包含 color=bluequantity>10 的项数据:

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');

以下示例显示了连接器捕获的增量快照事件的 JSON。

例 2.48. 增量快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
表 2.165. 增量快照事件消息中的字段描述
字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效的选项是 阻塞增量
在提交至信号表的 SQL 查询中指定 type 值是可选的。
如果没有指定值,连接器将运行一个增量快照。

2

op

指定事件类型。
快照事件的值是 r,表示 READ 操作。

2.7.2.3.3. 使用 Kafka 信号频道触发增量快照

您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

信号类型是 execute-snapshotdata 字段必须具有以下字段:

表 2.166. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 支持 incrementalblocking 类型。
详情请查看下一部分。

data-collections

N/A

以逗号分隔的正则表达式,与快照中包含的表的完全限定域名匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式来指定名称。

additional-conditions

N/A

可选的附加条件数组,用于指定连接器评估以指定快照中包含的记录子集的条件。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数: data-collection:: 过滤器适用的表的完全限定域名。您可以对每个表应用不同的过滤器。过滤:: specifys 列值必须存在于数据库记录中才能包括快照,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

例 2.49. execute-snapshot Kafka 信息

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`

带有额外条件的临时增量快照

Debezium 使用 additional-conditions 字段来选择表内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM &lt ;tableName&gt; …​.

当快照请求包含 additional-conditions 属性时,属性的 data-collectionfilter 参数会附加到 SQL 查询中,例如:

SELECT * FROM &lt ;data-collection> WHERE & lt;filter&gt; …​.

例如,如果一个带有列 ID (主键)、颜色 和品牌products 表,如果您希望快照只包含 color='blue' 的内容,当请求快照时,您可以添加 additional-conditions 属性来过滤内容:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`

您还可以使用 additional-conditions 属性来根据多个列传递条件。例如,使用与上例中的相同 product 表,如果您希望快照只包含 color='blue'brand='MyBrand' products 表中的内容,您可以发送以下请求:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.7.2.3.4. 停止增量快照

在某些情况下,可能需要停止增量快照。例如,您可能意识到快照没有被正确配置,或者您可能要确保资源可用于其他数据库操作。您可以通过向源数据库上的信号发送信号来停止已经运行的快照。

您可以通过在 SQL INSERT 查询中发送停止快照信号,向信号表提交停止快照信号。stop-snapshot 信号将快照操作的类型指定为 增量,并选择性地指定要从当前运行的快照中省略的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在进行中时停止增量快照操作。

其他资源

您还可以通过向 Kafka 信号发送 JSON 消息来停止增量快照

先决条件

使用源信号频道停止增量快照

  1. 发送 SQL 查询以停止临时增量快照到信号表:

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');

    例如,

    INSERT INTO db1.myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], 4
        "type":"incremental"}'); 5

    signal 命令中的 idtypedata 参数的值与 信号表的字段相对应
    下表描述了示例中的参数:

    表 2.167. SQL 命令中的字段描述,用于将停止增量快照信号发送到信号表
    描述

    1

    database.schema.debezium_signal

    指定源数据库上信号表的完全限定名称。

    2

    ad-hoc-1

    id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号表中的条目。Debezium 不使用这个字符串。

    3

    stop-snapshot

    指定 type 参数,指定信号要触发的操作。

    4

    data-collections

    信号的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
    数组列出了正则表达式,该表达式通过其完全限定名称匹配表,格式为 database.schema.table

    如果您从 data 字段省略这个组件,信号将停止正在进行的整个增量快照。

    5

    incremental

    信号的必需组件,用于指定要停止的快照操作类型。
    目前,唯一有效的选项为 增量
    如果没有指定 类型 值,信号将无法停止增量快照。

2.7.2.3.5. 使用 Kafka 信号频道停止增量快照

您可以向 配置的 Kafka 信号主题 发送信号消息,以停止临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

signal 类型是 stop-snapshotdata 字段必须具有以下字段:

表 2.168. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 只支持 incremental 类型。
详情请查看下一部分。

data-collections

N/A

可选的、以逗号分隔的正则表达式,与表的完全限定名称匹配,表名称或正则表达式,以匹配要从快照中删除的表名称。
使用格式 database.schema.table 指定表名称。

以下示例显示了典型的 stop-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`

2.7.2.4. 阻塞快照

为了在管理快照方面提供更多灵活性,Debezium 包含一个额外的临时快照机制,称为 阻塞快照。阻塞快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

阻塞快照的行为与 初始快照 相似,但您可以在运行时触发快照。

您可能想要在以下情况下运行阻塞快照,而不是使用标准初始快照过程:

  • 您可以添加新表,并在连接器运行时完成快照。
  • 您可以添加大表,并且您希望快照在短时间内完成,而不是通过增量快照完成。

阻塞快照过程

当您运行阻塞快照时,Debebe 会停止流,然后启动指定表的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,会恢复流。

配置快照

您可以在信号 的数据 组件中设置以下属性:

  • data-collections:指定哪个表必须是快照。
  • data-collections :指定您要包括快照的表。
    此属性接受与完全限定表名称匹配的正则表达式列表。属性的行为与 table.include.list 属性的行为类似,它指定要捕获在阻塞快照中的表。
  • additional-conditions :您可以为不同的表指定不同的过滤器。

    • data-collection 属性是要应用过滤器的表的完全限定名称,并可区分大小写或区分大小写,具体取决于数据库。
    • filter 属性将具有与 snapshot.select.statement.overrides 时使用的相同值,即条件应当匹配的表的完全限定域名。

例如:

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}

可能的副本

您发送信号触发快照的时间之间可能会有延迟,以及流停止和快照启动时的时间。因此,在快照完成后,连接器可能会发出一些由快照捕获的重复记录的事件记录。

2.7.2.5. Debezium SQL Server 连接器如何读取更改数据表

当连接器首次启动时,它会获取捕获表的结构快照,并将此信息保留至其内部数据库架构历史记录主题。然后,连接器为每个源表标识一个更改表,并完成以下步骤。

  1. 对于每个更改表,连接器读取最后一次存储的最大 LSN 和当前最大 LSN 之间创建的所有更改。
  2. 连接器根据其提交 LSN 的值以升序对读取的更改进行排序,并更改 LSN。这种排序顺序确保 Debezium 按照与数据库中发生的相同顺序重新执行更改。
  3. 连接器传递提交,并将 LSNs 作为偏移改为 Kafka Connect。
  4. 连接器存储最大 LSN 并从第 1 步中重启进程。

重启后,连接器会从其读取的最后偏移(提交并更改 LSN)中恢复处理。

连接器可以检测是否已为包含的源表启用或禁用 CDC,并调整其行为。

2.7.2.6. 数据库中没有记录的最大 LSN

在某些情况下,没有最大 LSN 在数据库中记录,因为:

  1. SQL Server Agent 没有运行
  2. change 表中还没有记录任何更改
  3. 数据库具有较少的活动,cdc 清理作业会定期清除 cdc 表中的条目

因此,由于运行的 SQL Server Agent 是前提条件,因此不会遇到真正问题(否 2 和 3. 是正常的情况)。

为了缓解此问题并区分 No 1. 和其他用户,SQL Server Agent 的状态通过以下查询 "SELECT CASE WHEN dss.[status]=4 THEN 1 ELSE 0 END AS isRunning FROM [:db].dm_server_services dss WHERE dss WHERE dss.[servicename] LIKE N'SQL Server (%'SQL'SQL Server Agent) (%".如果 SQL Server Agent 未运行,则会在日志中写入一个 ERROR:"No maximum LSN record in the database; SQL Server Agent not running"。

重要

运行状态查询的 SQL Server Agent 需要 VIEW SERVER STATE 服务器权限。如果您不想向配置的用户授予这个权限,您可以选择通过 database.sqlserver.agent.status.query 属性配置您自己的查询。您可以定义一个函数,它返回 true 或 1,如果 SQL Server Agent 在运行(其他情况返回 false 或 0)并安全地使用高级别权限而无需对它们进行授权,如 What minimum permissions do I need to provide to a user so that it can check the status of SQL Server Agent Service?Safely and Easily Use High-Level Permissions Without Granting Them to Anyone: Server-level 所述。query 属性的配置应类似:database.sqlserver.agent.status.query=SELECT [#db].func_is_sql_server_agent_running() - 您需要使用 [#db] a作为数据库名称的占位符。

2.7.2.7. Debezium SQL Server 连接器的限制

SQL Server 特别要求基本对象是表,以便创建更改捕获实例。因此,SQL Server 不支持从索引视图(aka. materialized 视图)捕获更改,因此 Debezium SQL Server 连接器。

2.7.2.8. 接收 Debezium SQL Server 更改事件记录的 Kafka 主题的默认名称

默认情况下,SQL Server 连接器将所有 INSERTUPDATEDELETE 操作的事件写入一个特定于该表的单个 Apache Kafka 主题。连接器使用以下惯例命名更改事件主题: < topicPrefix> . <schemaName& gt; . &lt;tableName>

以下列表提供了默认名称组件的定义:

topicPrefix
服务器的逻辑名称,如 topic.prefix 配置属性所指定。
schemaName
发生更改事件的数据库架构的名称。
tableName
发生更改事件的数据库表的名称。

例如,如果 fulfillment 是逻辑服务器名称,dbo 是 schema 名称,数据库包括名为 products, products_on_hand, customers, 和 orders 的表,连接器将更改事件发送到以下 Kafka 主题:

  • fulfillment.testDB.dbo.products
  • fulfillment.testDB.dbo.products_on_hand
  • fulfillment.testDB.dbo.customers
  • fulfillment.testDB.dbo.orders

连接器应用类似的命名约定来标记其内部数据库架构历史记录主题、架构更改主题 和事务元数据主题

如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

2.7.2.9. Debezium SQL Server 连接器如何处理数据库架构更改

当数据库客户端查询数据库时,客户端将使用数据库的当前架构。但是,可以随时更改数据库架构,这意味着连接器必须能够识别每次插入、更新或删除操作时的 schema。另外,连接器不一定将当前模式应用到每个事件。如果事件相对旧,则在应用当前模式之前记录这些事件。

为确保在模式更改后正确处理更改事件,Debezium SQL Server 连接器根据 SQL Server 更改表中的结构存储新模式的快照,这会镜像其关联的数据表的结构。连接器在数据库 schema 历史记录 Kafka 主题中存储表 schema 信息,以及操作的 LSN。连接器使用存储的模式表示来生成更改事件,每次插入、更新或删除操作时都会正确镜像表结构。

当连接器在崩溃或安全停止后重启时,它会从它读取的最后位置恢复 SQL Server CDC 表中的条目。根据连接器从数据库模式历史记录主题读取的架构信息,连接器应用在连接器重启的位置中存在的表结构。

如果您更新处于捕获模式的 Db2 表的 schema,您也可以更新相应更改表的 schema。您必须是一个具有升级权限的 SQL Server 数据库管理员,才能更新数据库架构。有关在 Debezium environmenbts 中更新 SQL Server 数据库模式的更多信息,请参阅 数据库 schema evolution

数据库架构历史记录主题仅用于内部连接器。另外,连接器也可以将 schema 更改事件发送到面向消费者应用程序的不同主题

其他资源

2.7.2.10. Debezium SQL Server 连接器如何使用 schema 更改主题

对于启用了 CDC 的每个表,Debezium SQL Server 连接器存储应用到数据库中表的 schema 更改事件的历史记录。连接器将模式更改事件写入名为 < topicPrefix&gt; 的 Kafka 主题,其中 topicPrefixtopic.prefix 配置属性中指定的逻辑服务器名称。

连接器发送到 schema 更改主题的消息包含一个有效负载,并可以选择包含更改事件消息的 schema。

模式更改事件的 schema 具有以下元素:

name
模式更改事件消息的名称。
type
更改事件消息的类型。
version
架构的版本。version 是一个整数,每次更改 schema 时都会递增。
fields
更改事件消息中包含的字段。

示例:SQL Server 连接器模式更改主题的 Schema

以下示例显示了 JSON 格式的典型模式。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.sqlserver.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "inventory"
  }
}

模式更改事件消息的有效负载包括以下元素:

databaseName
将语句应用到的数据库的名称。databaseName 的值充当 message 键。
tableChanges
架构更改后整个表模式的结构化表示。tableChanges 字段包含一个数组,其中包含表的每列条目。由于结构化表示以 JSON 或 Avro 格式显示数据,因此用户可以轻松地读取消息,而无需首先通过 DDL 解析器处理它们。
重要

当连接器被配置为捕获表时,它会存储表的模式更改历史记录,不仅存储在 schema 更改主题中,还存储在内部数据库架构历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不用于直接使用应用程序。确保需要针对 schema 更改通知的应用程序只消耗了来自 schema 更改主题的信息。

警告

连接器发送到其 schema 更改主题的消息格式处于 outcubating 状态,并可在不通知的情况下更改。

当发生以下事件时,Debebe Debezium 会向 schema 更改主题发送一条消息:

示例:向 SQL Server 连接器模式更改主题发送的消息

以下示例显示了 schema 更改主题中的消息。消息包含表 schema 的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "3.0.8.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 0,
      "snapshot": "true",
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": null,
      "commit_lsn": "00000025:00000d98:00a2",
      "event_serial_no": null
    },
    "ts_ms": 1588252618953, 1
    "databaseName": "testDB", 2
    "schemaName": "dbo",
    "ddl": null, 3
    "tableChanges": [ 4
      {
        "type": "CREATE", 5
        "id": "\"testDB\".\"dbo\".\"customers\"", 6
        "table": { 7
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 8
            "id"
          ],
          "columns": [ 9
            {
              "name": "id",
              "jdbcType": 4,
              "nativeType": null,
              "typeName": "int identity",
              "typeExpression": "int identity",
              "charsetName": null,
              "length": 10,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "first_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "last_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "email",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 10
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表 2.169. 发送到 schema 更改主题的消息中的字段描述
字段名称描述

1

ts_ms

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

2

databaseName
schemaName

标识包含更改的数据库和模式。

3

ddl

对于 SQL Server 连接器,始终为 null。对于其他连接器,此字段包含负责架构更改的 DDL。此 DDL 不适用于 SQL Server 连接器。

4

tableChanges

包含 DDL 命令生成的架构更改的一个或多个项目的数组。

5

type

描述更改类型。该值是以下之一:

  • 已创建 CREATE - table
  • ALTER - 表被修改
  • DROP - 表被删除

6

id

创建、更改或丢弃的表的完整标识符。

7

table

代表应用的更改后的表元数据。

8

primaryKeyColumnNames

编写表主键的列列表。

9

columns

changed 表中每个列的元数据。

10

属性

每个表更改的自定义属性元数据。

在连接器发送到 schema 更改主题的消息中,键是包含 schema 更改的数据库的名称。在以下示例中,payload 字段包含键:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.sqlserver.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "testDB"
  }
}

2.7.2.11. Debezium SQL Server 连接器数据更改事件的描述

Debezium SQL Server 连接器为每个行级 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件包含一个键和值。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间变化,用户很难处理。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,消费者可以用来从 registry 获取 schema 的模式 ID。这使得每个事件自包含。

以下框架 JSON 显示了更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect 转换器决定了更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema 字段才会处于 change 事件中。同样,只有在将转换器配置为生成它时,事件键和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表 2.170. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是事件键的一部分。它指定一个 Kafka Connect 模式,它描述了事件键的 payload 部分中的内容。换句话说,第一个 模式 字段描述了主键的结构,如果表没有主键,则描述主键的结构。

可以通过设置 message.key.columns 连接器配置属性 来覆盖表的主键。在这种情况下,第一个 schema 字段描述了该属性标识的密钥的结构。

2

payload

第一个 payload 字段是事件键的一部分。它有上一个 schema 字段描述的结构,其中包含更改的行的密钥。

3

schema

第二个 schema 字段是事件值的一部分。它指定 Kafka Connect 模式,它描述了事件值的 payload 部分中的内容。换句话说,第二个 模式 描述了更改的行的结构。通常,此模式包含嵌套的模式。

4

payload

第二个 payload 字段是事件值的一部分。它有上一个 schema 字段描述的结构,其中包含更改的行的实际数据。

默认情况下,连接器流将事件记录改为带有与事件原始表相同的名称的主题。如需更多信息,请参阅 主题名称

警告

SQL Server 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将被一个下划线字符替代。

如果逻辑服务器名称、数据库名称或表名称包含无效字符,则这可能会导致意外冲突,并且区分名称的唯一字符无效,因此用下划线替换。

有关更改事件的详情,请查看以下主题:

2.7.2.11.1. 关于 Debezium SQL Server 更改事件中的键

更改事件的密钥包含更改表键和更改行的实际键的 schema。在连接器创建事件时,schema 及其对应有效负载都包含更改表的主键(或唯一键约束)中每个列的字段。

请考虑以下 customers 表,后面是此表的更改事件关键示例。

表示例

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

更改事件键示例

捕获 customer 表更改的每个更改事件都有相同的事件关键模式。只要 customers 表有以前的定义,可以捕获 customer 表更改的事件都有以下关键结构(JSON),它类似于:

{
    "schema": { 1
        "type": "struct",
        "fields": [ 2
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false, 3
        "name": "server1.testDB.dbo.customers.Key" 4
    },
    "payload": { 5
        "id": 1004
    }
}
表 2.171. 更改事件键的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 payload 部分中的内容。

2

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、类型以及是否需要。在本例中,有一个名为 id 的必填字段,类型为 int32

3

optional

指明事件键是否在其 payload 字段中包含一个值。在本例中,需要键有效负载中的值。当表没有主键时,键有效负载字段中的值是可选的。

4

server1.dbo.testDB.customers.Key

定义键有效负载结构的 schema 名称。这个模式描述了更改的表的主密钥的结构。键架构名称的格式是 connector-name.database-schema-name.table-name.Key。在本例中:

  • server1 是生成此事件的连接器的名称。
  • dbo 是表的数据库模式,它已改变。
  • 客户 是更新的表。

5

payload

包含生成此更改事件的行的密钥。在本例中,键包含一个 id 字段,其值为 1004

2.7.2.11.2. 关于 Debezium SQL Server 更改事件中的值

更改事件中的值比键复杂一些。与键一样,值也有一个 schema 部分和一个 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的操作的事件,它们都有带有 envelope 结构的值 payload。

考虑用于显示更改事件键示例相同的示例表:

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

每个事件类型都会描述更改此表的值部分。

创建 事件

以下示例显示了一个更改事件的值部分,连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.testDB.customers.Value", 2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.testDB.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_us"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ns"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source", 3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_us"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ns"
      }
    ],
    "optional": false,
    "name": "server1.dbo.testDB.customers.Envelope" 4
  },
  "payload": { 5
    "before": null, 6
    "after": { 7
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "source": { 8
      "version": "3.0.8.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729468470,
      "ts_us": 1559729468470000,
      "ts_ns": 1559729468470000000,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
      "event_serial_no": "1"
    },
    "op": "c", 9
    "ts_ms": 1559729471739, 10
    "ts_ms": 1559729471739876, 11
    "ts_ms": 1559729471739876149 12
  }
}
表 2.172. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,它描述了值有效负载的结构。在连接器为特定表生成的更改事件时,更改事件的值 schema 都相同。

2

name

schema 部分中,每个 name 字段指定值有效负载中的字段的 schema。

server1.dbo.testDB.customers.Value 是有效负载 beforeafter 字段的 schema。这个模式特定于 customers 表。

beforeafter 字段的模式名称格式为 logicalName.database-schemaName.tableName.Value,这样可确保 schema 名称在数据库中是唯一的。这意味着,在使用 Avro converter 时,每个逻辑源中的每个表生成的 Avro 模式都有自己的演进和历史记录。

3

name

io.debezium.connector.sqlserver.Source 是有效负载 source 字段的 schema。这个模式特定于 SQL Server 连接器。连接器用于它生成的所有事件。

4

name

server1.dbo.testDB.customers.Envelope 是载荷总体结构的模式,其中 server1 是连接器名称,dbo 是数据库架构名称,customers 是表。

5

payload

值的实际数据。这是更改事件提供的信息。

可能会出现事件的 JSON 表示大于它们描述的行。这是因为 JSON 表示必须包含 schema 和 message 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的消息大小。

6

before

指定事件发生前行状态的可选字段。当 op 字段是 c 用于创建(如本例所示),before 字段为 null,因为此更改事件用于新内容。

7

after

指定事件发生后行状态的可选字段。在本例中,after 字段包含新行的 idfirst_namelast_nameemail 列的值。

8

source

描述事件源元数据的强制字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、发生事件的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 数据库和模式名称
  • 在数据库中进行更改时的时间戳
  • 如果事件是快照的一部分
  • 包含新行的表的名称
  • 服务器日志偏移

9

op

描述导致连接器生成事件的操作类型的强制字符串。在本例中,c 表示操作创建了行。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = 读取(仅适用于快照)

10

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 表示更改在数据库中提交的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

更新 事件

示例 customers 表中一个更新的改变事件的值有与那个表的 create 事件相同的模式。同样,事件值的有效负载具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。以下是当连接器在 customers 表中为更新生成的更改事件值的示例:

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "after": { 2
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "source": { 3
      "version": "3.0.8.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729995937,
      "ts_us": 1559729995937000,
      "ts_ns": 1559729995937000000,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000ac0:0002",
      "commit_lsn": "00000027:00000ac0:0007",
      "event_serial_no": "2"
    },
    "op": "u", 4
    "ts_ms": 1559729998706,  5
    "ts_us": 1559729998706318,  6
    "ts_ns": 1559729998706318547  7
  }
}
表 2.173. 更新 事件值字段的描述
字段名称描述

1

before

指定事件发生前行状态的可选字段。在 update 事件值中,before 字段包含每个表列中的一个字段,并在数据库提交前在该列中有一个值。在本例中,电子邮件 值为 john.doe@example.org。

2

after

指定事件发生后行状态的可选字段。您可以比较 之前和之后 结构,以确定此行的更新是什么。在这个示例中,电子邮件 值现在是 noreply@example.org

3

source

描述事件源元数据的强制字段。source 字段结构与 create 事件中的字段相同,但某些值有所不同,例如,示例 update 事件具有不同的偏移量。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 数据库和模式名称
  • 在数据库中进行更改时的时间戳
  • 如果事件是快照的一部分
  • 包含新行的表的名称
  • 服务器日志偏移

event_serial_no 字段区分具有相同提交和更改 LSN 的事件。当此字段具有除 1 以外的值时的典型情况:

  • 更新 事件的值被设置为 2,因为更新在 SQL Server 的 CDC 更改表中生成两个事件(详情请参阅 源文档)。第一个事件包含旧值,第二个事件包含新值。连接器使用第一个事件中的值来创建第二个事件。连接器丢弃第一个事件。
  • 当主键被更新 SQL Server 时,会发出两个事件。对于删除带有旧主键的记录,有一个 delete 事件;对于添加带有新主键的记录,有一个 create 事件。两个操作共享相同的提交并更改 LSN 及其事件号分别为 12

4

op

描述操作类型的强制字符串。在 update 事件值中,op 字段值为 u,表示此行因为更新而改变。

5

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改提交到数据库的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

注意

更新行主/唯一键的列会更改行键的值。当密钥更改时,Debe be 会输出三个 事件:一个 delete 事件和一个 tombstone 事件,其中包含行的旧键,后跟一个带有行的新键的 create 事件。

删除 事件

delete 更改事件中的值与为同一表的 createupdate 事件相同的 schema 部分。示例 customer 表的 delete 事件中 payload 部分类似如下:

{
  "schema": { ... },
  },
  "payload": {
    "before": { <>
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "after": null, 1
    "source": { 2
      "version": "3.0.8.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559730445243,
      "ts_us": 1559730445243000,
      "ts_ns": 1559730445243000000,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000db0:0005",
      "commit_lsn": "00000027:00000db0:0007",
      "event_serial_no": "1"
    },
    "op": "d", 3
    "ts_ms": 1559730450205, 4
    "ts_us": 1559730450205387, 5
    "ts_ns": 1559730450205387492  6
  }
}
表 2.174. 删除 事件值字段的描述
字段名称描述

1

before

指定事件发生前行的状态的可选字段。在一个 delete 事件值中,before 字段包含在使用数据库提交删除行前的值。

2

after

可选字段,用于指定事件发生后行的状态。在一个 delete 事件值中,after 字段为 null,表示行不再存在。

3

source

描述事件源元数据的强制字段。在一个 delete 事件值中,source 字段结构与同一表的 createupdate 事件相同。许多 source 字段值也相同。在 delete 事件值中,ts_mspos 字段值以及其他值可能已更改。但是 delete 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 数据库和模式名称
  • 在数据库中进行更改时的时间戳
  • 如果事件是快照的一部分
  • 包含新行的表的名称
  • 服务器日志偏移

4

op

描述操作类型的强制字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。在事件消息 envelope 中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

SQL Server 连接器事件设计为与 Kafka 日志压缩 一起使用。日志压缩支持删除一些旧的信息,只要至少保留每个密钥的最新消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

tombstone 事件

删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,为了使 Kafka 删除具有相同键的所有信息,消息值必须是 null。为了实现此目的,在 Debezium 的 SQL Server 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但一个 null 值。

2.7.2.12. Debezium SQL Server 连接器生成的事件代表事务边界

Debezium 可以生成代表事务边界的事件,以及丰富的数据更改事件消息。

Debezium 接收事务元数据时的限制

Debezium 注册并接收部署连接器后发生的事务的元数据。部署连接器前发生的事务元数据不可用。

数据库事务由一个声明块表示,该块包含在 BEGINEND 关键字之间。Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status
BEGINEND.
id
唯一事务标识符的字符串表示。
ts_ms
数据源的事务边界事件(BEGINEND 事件)的时间。如果数据源没有向 Debezium 提供事件时间,则字段代表 Debezium 处理事件的时间。
event_count (用于 END 事件)
事务处理的事件总数。
data_collections (用于 END 事件)
一组 data_collectionevent_count 元素,用于指示连接器为来自数据收集的更改发出的事件数。
警告

Debezium 无法可靠地识别事务何时终止。因此,只有在另一个事务到达的第一个事件后,才会发出事务 结束 标记。在低流量系统中,这可能会导致延迟发布 END 标记。

以下示例显示了典型的事务边界消息:

示例:SQL Server 连接器事务边界事件

{
  "status": "BEGIN",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "testDB.dbo.testDB.tablea",
      "event_count": 1
    },
    {
      "data_collection": "testDB.dbo.testDB.tableb",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic. prefix>.transaction 的主题

2.7.2.12.1. 更改数据事件增强

如果启用了事务元数据,数据消息 Envelope 会增加一个新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串表示
total_order
事件在事务生成的所有事件中的绝对位置
data_collection_order
事件在事务发送的所有事件中的 per-data 集合位置

以下示例显示了典型信息是什么:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335172",
  "ts_ns": "1580390884335172574",
  "transaction": {
    "id": "00000025:00000d08:0025",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

2.7.2.13. Debezium SQL Server 连接器如何映射数据类型

Debezium SQL Server 连接器通过生成类似行表的事件来代表表行数据的更改。每个事件都包含字段来代表行的列值。事件表示操作的列值的方式取决于列的 SQL 数据类型。在事件中,连接器将每个 SQL Server 数据类型的字段映射到 字面类型和 语义类型

连接器可以将 SQL Server 数据类型映射到 字面语义 类型。

字面类型
描述如何使用 Kafka Connect 模式类型(即 INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, 和 STRUCT)来代表值。
语义类型
描述 Kafka Connect 模式如何使用字段的 Kafka Connect 模式名称捕获字段 的含义

如果默认数据类型转换不满足您的需要,您可以为连接器 创建自定义转换器

有关数据类型映射的更多信息,请参阅以下部分:

基本类型

下表显示了连接器如何映射基本 SQL Server 数据类型。

表 2.175. SQL Server 连接器使用的数据类型映射
SQL Server 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

BIT

布尔值

不适用

TINYINT

INT16

不适用

SMALLINT

INT16

不适用

INT

INT32

不适用

BIGINT

INT64

不适用

REAL

FLOAT32

不适用

FLOAT[(N)]

FLOAT64

不适用

CHAR[(N)]

字符串

不适用

VARCHAR[(N)]

字符串

不适用

文本

字符串

不适用

NCHAR[(N)]

字符串

不适用

NVARCHAR[(N)]

字符串

不适用

NTEXT

字符串

不适用

XML

字符串

io.debezium.data.Xml

包含 XML 文档的字符串表示

DATETIMEOFFSET[(P)]

字符串

io.debezium.time.ZonedTimestamp

带有时区信息的时间戳的字符串,其中时区是 GMT

在以下部分中描述了其他数据类型映射。

如果存在,则列的默认值将传播到对应的字段 Kafka Connect 模式。更改消息将包含字段的默认值(除非给出了一个明确的列值),因此应该很少需要从 schema 获取默认值。

临时值

SQL Server 的 DATETIMEOFFSET 数据类型(包含时区信息)以外的其他类型取决于 time.precision.mode 配置属性的值。当 time.precision.mode 配置属性设置为 adaptive (默认值),那么连接器将根据列的数据类型确定 temporal 类型的字面类型和语义类型,以便事件 准确 表示数据库中的值:

SQL Server 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

DATE

INT32

io.debezium.time.Date

代表时期起的天数。

TIME(0), TIME(1), TIME(2), TIME(3)

INT32

io.debezium.time.Time

代表过去午夜的毫秒数,不包括时区信息。

TIME(4), TIME(5), TIME(6)

INT64

io.debezium.time.MicroTime

代表过去午夜的微秒数,不包括时区信息。

TIME(7)

INT64

io.debezium.time.NanoTime

代表过去午夜的纳秒数,不包括时区信息。

日期时间

INT64

io.debezium.time.Timestamp

代表时期过去的毫秒数,不包括时区信息。

SMALLDATETIME

INT64

io.debezium.time.Timestamp

代表时期过去的毫秒数,不包括时区信息。

DATETIME2 (0), DATETIME2 (1), DATETIME2 (2), DATETIME2 (3)

INT64

io.debezium.time.Timestamp

代表时期过去的毫秒数,不包括时区信息。

DATETIME2 (4), DATETIME2 (5), DATETIME2 (6)

INT64

io.debezium.time.MicroTimestamp

代表时期过去的微秒数,不包括时区信息。

DATETIME2 (7)

INT64

io.debezium.time.NanoTimestamp

代表过去 epoch 的纳秒数,且不包含时区信息。

time.precision.mode 配置属性设置为 connect 时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者只了解内置 Kafka Connect 逻辑类型且无法处理变量精度时间值时,这很有用。另一方面,因为 SQL 服务器支持十分之一的微秒精度,带有 connect 时间精度模式的连接器将在有一个大于 3 的 fractional second precision 数据库列时丢失一些精度

SQL Server 数据类型字面类型(schema 类型)语义类型(schema 名称)和备注

DATE

INT32

org.apache.kafka.connect.data.Date

代表 epoch 后的天数。

TIME([P])

INT64

org.apache.kafka.connect.data.Time

代表午夜起的毫秒数,但不包含时区信息。SQL Server 允许 P 范围 0-7 存储在微秒的精度中,但这个模式会在 P > 3 时丢失精度。

日期时间

INT64

org.apache.kafka.connect.data.Timestamp

代表自 epoch 起的毫秒数,但不包含时区信息。

SMALLDATETIME

INT64

org.apache.kafka.connect.data.Timestamp

代表 epoch 以上的毫秒数,不包括时区信息。

DATETIME2

INT64

org.apache.kafka.connect.data.Timestamp

代表自 epoch 起的毫秒数,但不包含时区信息。SQL Server 允许 P 范围 0-7 存储在微秒的精度中,但这个模式会在 P > 3 时丢失精度。

时间戳值

DATETIME,SMALLDATETIMEDATETIME2 类型代表一个没有时区信息的时间戳。这些列根据 UTC 转换为等同的 Kafka Connect 值。对于实例,DATETIME2 值 "2018-06-20 15:13:16.945104" 由值 "1529507596945104" 为 io.debezium.time.MicroTimestamp 代表。

请注意,运行 Kafka Connect 和 Debezium 的 JVM 时区不会影响此转换。

十进制值

Debezium 连接器根据 decimal.handling.mode 连接器配置属性的 设置来处理十进制。

decimal.handling.mode=precise
表 2.176. 当 decimal.handling.mode=precise时映射
SQL Server 类型字面类型(schema 类型)语义类型(架构名称)

NUMERIC[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

DECIMAL[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

SMALLMONEY

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

金钱

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包括一个整数,它代表了十进制小数点移动了多少位。

decimal.handling.mode=double
表 2.177. 当 decimal.handling.mode=double时的映射
SQL Server 类型字面类型语义类型

NUMERIC[(M[,D])]

FLOAT64

不适用

DECIMAL[(M[,D])]

FLOAT64

不适用

SMALLMONEY[(M[,D])]

FLOAT64

不适用

金钱[(M[,D])]

FLOAT64

不适用

decimal.handling.mode=string
表 2.178. 当 decimal.handling.mode=string时映射
SQL Server 类型字面类型语义类型

NUMERIC[(M[,D])]

字符串

不适用

DECIMAL[(M[,D])]

字符串

不适用

SMALLMONEY[(M[,D])]

字符串

不适用

金钱[(M[,D])]

字符串

不适用

2.7.3. 设置 SQL Server 以运行 Debezium 连接器

要使 Debezium 从 SQL Server 表捕获更改事件,具有所需权限的 SQL Server 管理员必须首先运行查询以便在数据库上启用 CDC。然后,管理员必须为您希望 Debezium 捕获的每个表启用 CDC。

注意

默认情况下,与 Microsoft SQL Server 的 JDBC 连接受 SSL 加密保护。如果没有为 SQL Server 数据库启用 SSL,或者在不使用 SSL 的情况下连接到数据库,您可以通过将连接器配置中的 database.encrypt 属性的值设置为 false 来禁用 SSL。

有关设置用于 Debezium 连接器的 SQL Server 的详情,请参考以下部分:

应用 CDC 后,它会捕获所有 INSERTUPDATEDELETE 操作,它们提交到启用了 CDC 的表。然后 Debezium 连接器可以捕获这些事件并将其发送到 Kafka 主题。

2.7.3.1. 在 SQL Server 数据库中启用 CDC

在为表启用 CDC 前,您必须为 SQL Server 数据库启用它。SQL Server 管理员通过运行系统存储的步骤启用 CDC。可使用 SQL Server Management Studio 或使用 Transact-SQL 运行系统存储的步骤。

先决条件

  • 您是 SQL Server 系统管理员 固定服务器角色的成员。
  • 您是数据库的 db_owner。
  • SQL Server Agent 正在运行。
注意

SQL Server CDC 功能只处理在用户创建的表中发生的更改。您不能在 SQL Server master 数据库上启用 CDC。

流程

  1. 在 SQL Server Management Studio 中的 View 菜单中,单击 Template Explorer
  2. Template Browser 中,展开 SQL Server Templates
  3. 展开 Change Data Capture > Configuration,然后点 Enable Database for CDC
  4. 在模板中,将 USE 语句中的数据库名称替换为您要为 CDC 启用的数据库的名称。
  5. 运行存储的步骤 sys.sp_cdc_enable_db 以启用 CDC 的数据库。

    为 CDC 启用数据库后,会创建一个带有名称 cdc 的模式,以及 CDC 用户、元数据表和其他系统对象。

    以下示例演示了如何为数据库 MyDB 启用 CDC:

    示例:为 CDC 模板启用 SQL Server 数据库

    USE MyDB
    GO
    EXEC sys.sp_cdc_enable_db
    GO

2.7.3.2. 在 SQL Server 表中启用 CDC

SQL Server 管理员必须在您要捕获的源表上启用更改数据捕获。必须为 CDC 启用数据库。要在表上启用 CDC,SQL Server 管理员为表运行存储的步骤 sys.sp_cdc_enable_table。存储的步骤可以使用 SQL Server Management Studio 运行,或者使用 Transact-SQL 来运行。必须为每个要捕获的表启用 SQL Server CDC。

先决条件

  • 在 SQL Server 数据库上启用了 CDC。
  • SQL Server Agent 正在运行。
  • 您是 db_owner 固定数据库角色的成员。

流程

  1. 在 SQL Server Management Studio 中的 View 菜单中,单击 Template Explorer
  2. Template Browser 中,展开 SQL Server Templates
  3. 展开 Change Data Capture > Configuration,然后点 Enable Table Specifying Filegroup Option
  4. 在模板中,将 USE 语句中的表名称替换为您要捕获的表的名称。
  5. 运行存储的步骤 sys.sp_cdc_enable_table

    以下示例演示了如何为表 MyTable 启用 CDC:

    示例:为 SQL Server 表启用 CDC

    USE MyDB
    GO
    
    EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'MyTable', //<.>
    @role_name     = N'MyRole',  //<.>
    @filegroup_name = N'MyDB_CT',//<.>
    @supports_net_changes = 0
    GO

    <.> 指定要捕获的表的名称。<.> 指定角色 MyRole,您可以向其中添加您要为源表捕获列授予 SELECT 权限的用户。sysadmindb_owner 角色的用户还可以访问指定的更改表。如果 @role_name 的值明确设置为 NULL,则不使用任何角色来限制对捕获的信息的访问。<.> 指定 SQL 服务器在其中放置捕获表的更改表。named filegroup 必须已经存在。最好不要在用于源表的同一 filegroup 中找到更改表。

2.7.3.3. 验证用户是否可以访问 CDC 表

SQL Server 管理员可以运行系统存储的步骤来查询数据库或表来检索其 CDC 配置信息。存储的步骤可以使用 SQL Server Management Studio 运行,或者使用 Transact-SQL 来运行。

先决条件

  • 对捕获实例的所有列都有 SELECT 权限。db_owner 数据库角色的成员可以查看所有定义的捕获实例的信息。
  • 您拥有为查询包含的表信息定义的任何 gating 角色的成员。

流程

  1. 在 SQL Server Management Studio 中的 View 菜单中,单击 Object Explorer
  2. 从 Object Explorer,展开 Databases,然后扩展您的数据库对象,如 MyDB
  3. 展开 Programmability > Stored procedures > System Stored procedures
  4. 运行 sys.sp_cdc_help_change_data_capture 存储的步骤来查询表。

    查询应该不会返回空结果。

    以下示例在数据库 MyDB 上运行存储的步骤 sys.sp_cdc_help_change_data_capture

    示例:查询 CDC 配置信息表

    USE MyDB;
    GO
    EXEC sys.sp_cdc_help_change_data_capture
    GO

    查询返回为 CDC 启用的数据库中每个表的配置信息,其中包含调用者有权访问的更改数据。如果结果为空,请验证用户是否有访问捕获实例和 CDC 表的权限。

2.7.3.4. Azure 上的 SQL Server

Debezium SQL Server 连接器可用于 Azure 上的 SQL Server。有关在 Azure 上为 SQL Server 配置 CDC 并在 Debezium 中使用它的信息,请参阅此示例。https://learn.microsoft.com/en-us/samples/azure-samples/azure-sql-db-change-stream-debezium/azure-sql%2D%2Dsql-server-change-stream-with-debezium/

2.7.3.5. SQL Server 捕获作业代理配置对服务器负载和延迟的影响

当数据库管理员为源表启用更改数据捕获时,捕获作业代理开始运行。代理从事务日志读取新的更改事件记录,并将事件记录复制到更改数据表中。在源表中提交更改的时间,以及更改出现在相应更改表中的时间,始终有一个小的延迟间隔。这个延迟间隔代表源表中出现更改时的间隔,以及 Debezium 到 Apache Kafka 的流提供时的差距。

理想情况下,对于必须快速响应数据更改的应用程序,您希望在源和更改表之间保持关闭同步。您可能会认为,运行捕获代理以尽可能迅速地处理更改事件,从而导致吞吐量增加,并减少带有新事件记录的更改表(接近实时发生)。然而,这不一定如此。对于更直接的同步,需要支付性能损失。每次捕获作业代理查询数据库以获取新事件记录时,它会增加数据库主机上的 CPU 负载。服务器上的额外负载可能会对整体数据库性能产生负面影响,并可能会降低交易效率,特别是在数据库使用高峰时。

监控数据库指标非常重要,以便您知道数据库是否达到服务器不再支持捕获代理的活动级别。如果您注意到性能问题,有 SQL Server 捕获代理设置,您可以修改它们来帮助平衡数据库主机上的总体 CPU 负载,并具有可容忍的延迟。

2.7.3.6. SQL Server 捕获作业代理配置参数

在 SQL Server 上,控制捕获作业代理行为的参数在 SQL Server 表 msdb.dbo.cdc_jobs 中定义。如果您在运行捕获作业代理时遇到性能问题,请通过运行 sys.sp_cdc_change_job 存储的步骤并提供新值来调整捕获作业设置以降低 CPU 负载。

注意

有关如何配置 SQL Server 捕获作业代理参数的具体指导超出了本文档的范围。

以下参数是修改用于 Debezium SQL Server 连接器的捕获代理行为的最显著:

pollinginterval
  • 指定捕获代理在日志扫描周期之间等待的秒数。
  • 数值越高,可以减少数据库主机上的负载并增加延迟。
  • 0 指定扫描之间没有等待。
  • 默认值为 5
maxtrans
  • 指定每次日志扫描循环期间要处理的最大事务数。在捕获作业处理指定数量的事务后,它会在下一次扫描开始前暂停 pollinginterval 指定的时间长度。
  • 较低值可减少数据库主机上的负载并增加延迟。
  • 默认值为 500
maxscans
  • 指定捕获作业可以尝试捕获数据库事务日志的所有内容的扫描周期数量的限制。如果将 continuous 参数设置为 1,则作业会在恢复扫描前暂停 pollinginterval 指定的时间长度。
  • 较低值可减少数据库主机上的负载并增加延迟。
  • 默认值为 10

其他资源

  • 有关捕获代理参数的更多信息,请参阅 SQL Server 文档。

2.7.4. 部署 Debezium SQL Server 连接器

您可以使用以下任一方法部署 Debezium SQL Server 连接器:

2.7.4.1. 使用 Streams for Apache Kafka 的 SQL Server 连接器部署

部署 Debezium 连接器的首选方法是使用 Streams for Apache Kafka 来构建包含连接器插件的 Kafka Connect 容器镜像。

在部署过程中,您要创建和使用以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR,并包含有关镜像中包含的连接器工件的信息。
  • 提供包括连接器用来访问源数据库的信息的 KafkaConnector CR。在 Apache Kafka 的 Streams 启动 Kafka Connect pod 后,您可以通过应用 KafkaConnector CR 来启动连接器。

在 Kafka Connect 镜像的构建规格中,您可以指定用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 Apache Kafka 的 Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。

KafkaConnect CR 中的 spec.build.output 参数指定在存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在容器 registry 中,如 quay.io 或 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。

注意

如果使用 KafkaConnect 资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。

其他资源

2.7.4.2. 使用 Streams for Apache Kafka 部署 Debezium SQL Server 连接器

对于 Apache Kafka 的早期版本,要在 OpenShift 上部署 Debezium 连接器,首先需要为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 Apache Kafka 的 Streams 中的构建配置,来自动构建包含您要使用的 Debezium 连接器插件的 Kafka Connect 容器镜像。

在构建过程中,Apache Kafka Operator 的 Streams 将 KafkaConnect 自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。

新创建的容器被推送到 .spec.build.output 中指定的容器 registry,并用于部署 Kafka Connect 集群。在 Apache Kafka 的 Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。

先决条件

  • 您可以访问安装了集群 Operator 的 OpenShift 集群。
  • Apache Kafka Operator 的 Streams 正在运行。
  • 部署了 Apache Kafka 集群,如 在 OpenShift 中部署和管理 Apache Kafka 的流 中所述。
  • Kafka Connect 部署在 Apache Kafka 的 Streams 中
  • 您有一个红帽构建的 Debezium 许可证。
  • OpenShift oc CLI 客户端已安装,或者您可以访问 OpenShift Container Platform Web 控制台。
  • 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:

    将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
    • 在 registry 中创建和管理镜像的帐户和权限。
    将构建镜像存储为原生 OpenShift ImageStream
    • ImageStream 资源部署到集群中,以存储新的容器镜像。您必须为集群显式创建 ImageStream。默认情况下,镜像流不可用。如需有关 ImageStreams 的更多信息,请参阅 OpenShift Container Platform 文档中的管理镜像流

流程

  1. 登录 OpenShift 集群。
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有的资源。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 metadata.annotationsspec.build 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

    例 2.50. 定义包含 Debezium 连接器的 KafkaConnect 自定义资源的 dbz-connect.yaml 文件

    在以下示例中,自定义资源被配置为下载以下工件:

    • Debezium SQL Server 连接器存档。
    • 红帽构建的 Apicurio Registry 归档。Apicurio Registry 是一个可选组件。只有在打算将 Avro serialization 与连接器一起使用时,才添加 Apicurio Registry 组件。
    • Debezium 脚本 SMT 归档以及您要用于 Debezium 连接器的相关脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才添加这些组件。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.9.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-sqlserver
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-sqlserver/3.0.8.Final-redhat-00004/debezium-connector-sqlserver-3.0.8.Final-redhat-00004-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.5.11.redhat-00001/apicurio-registry-distro-connect-converter-2.5.11.redhat-00001.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/3.0.8.Final-redhat-00004/debezium-scripting-3.0.8.Final-redhat-00004.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    表 2.179. Kafka Connect 配置设置的描述
    描述

    1

    strimzi.io/use-connector-resources 注解设置为 "true",以便 Cluster Operator 使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定存储构建镜像的位置,并列出镜像中包含的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建的镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值为 docker,可推送到容器 registry (如 Docker Hub 或 Quay)或 镜像流 (用于将镜像推送到内部 OpenShift ImageStream)。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅 Streams for Apache Kafka API 参考中的 Build schema 参考

    5

    插件配置 列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括要用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定 artifacts.url 中指定的工件的文件类型。有效类型是 ziptgz、或 jar。Debezium 连接器存档以 .zip 文件格式提供。type 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Red Hat Maven 存储库中提供了 Debezium 连接器工件。OpenShift 集群必须有权访问指定的服务器。

    8

    (可选)指定下载 Apicurio Registry 组件的工件 类型和 url。包括 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化事件键和值,使用红帽构建的 Apicurio Registry 的值,而不是使用默认的 JSON 转换。

    9

    (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您还必须部署 JSR 223 兼容脚本实施,如 groovy。

    10

    (可选)指定与 JSR 223 脚本实施的 JAR 文件的工件 类型和 url,这是 Debezium 脚本 SMT 所需的。

    重要

    如果您使用 Streams for Apache Kafka 将连接器插件合并到 Kafka Connect 镜像中,对于每个所需的脚本语言组件 artifacts.url 必须指定 JAR 文件的位置,并且 artifacts.type 的值也必须设置为 jar。无效的值会导致连接器在运行时失败。

    要启用将 Apache Groovy 语言与脚本 SMT 搭配使用,示例中的自定义资源会检索以下库的 JAR 文件:

    • groovy
    • groovy-jsr223 (脚本代理)
    • groovy-json (用于解析 JSON 字符串的模块)

    作为替代方案,Debezium 脚本 SMT 还支持使用 GraalVM JavaScript 的 JSR 223 实施。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml

    根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您在配置中列出的连接器工件在集群中可用。

  4. 创建一个 KafkaConnector 资源来定义您要部署的每个连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 sqlserver-inventory-connector.yaml

    例 2.51. sqlserver-inventory-connector.yaml 文件,为 Debezium 连接器定义 KafkaConnector 自定义资源

        apiVersion: kafka.strimzi.io/v1beta2
        kind: KafkaConnector
        metadata:
          labels:
            strimzi.io/cluster: debezium-kafka-connect-cluster
          name: inventory-connector-sqlserver 1
        spec:
          class: io.debezium.connector.sqlserver.SqlServerConnector 2
          tasksMax: 1  3
          config:  4
            schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
            schema.history.internal.kafka.topic: schema-changes.inventory
            database.hostname: sqlserver.debezium-sqlserver.svc.cluster.local 5
            database.port: 1433   6
            database.user: debezium  7
            database.password: dbz  8
            topic.prefix: inventory-connector-sqlserver 9
            table.include.list: dbo.customers  10
    
            ...
    表 2.180. 连接器配置设置的描述
    描述

    1

    要注册到 Kafka Connect 集群的连接器名称。

    2

    连接器类的名称。

    3

    可同时操作的任务数量。

    4

    连接器的配置。

    5

    主机数据库实例的地址。

    6

    数据库实例的端口号。

    7

    Debezium 用来连接到数据库的帐户名称。

    8

    Debezium 用来连接到数据库用户帐户的密码。

    9

    数据库实例或集群的主题前缀。
    指定的名称只能从字母数字字符或下划线构成。
    因为主题前缀用作从此连接器接收更改事件的 Kafka 主题的前缀,因此该名称在集群中的连接器中必须是唯一的。
    如果您将连接器与 Avro 连接器集成,则此命名空间也用于相关的 Kafka Connect 模式的名称,以及对应的 Avro 模式的命名空间。

    10

    连接器捕获更改事件的表列表。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    例如,

    oc create -n debezium -f sqlserver-inventory-connector.yaml

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。

现在,您可以验证 Debezium SQL Server 部署

2.7.4.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium SQL Server 连接器

要部署 Debezium SQL Server 连接器,您必须构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,然后将此容器镜像推送到容器 registry。然后,您需要创建以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR。CR 中的 image 属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat Streams for Apache Kafka 的 OpenShift 实例。Apache Kafka 的流提供将 Apache Kafka 到 OpenShift 的 operator 和镜像。
  • 定义 Debezium SQL Server 连接器的 KafkaConnector CR。将此 CR 应用到应用 KafkaConnect CR 的同一 OpenShift 实例。

先决条件

流程

  1. 为 Kafka Connect 创建 Debezium SQL Server 容器:

    1. 创建一个 Dockerfile,它使用 registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0 作为基础镜像。例如,在终端窗口中输入以下命令:

      cat <<EOF >debezium-container-for-sqlserver.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-sqlserver/3.0.8.Final-redhat-00004/debezium-connector-sqlserver-3.0.8.Final-redhat-00004-plugin.zip \
      && unzip debezium-connector-sqlserver-3.0.8.Final-redhat-00004-plugin.zip \
      && rm debezium-connector-sqlserver-3.0.8.Final-redhat-00004-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      描述

      1

      您可以指定您想要的任何文件名。

      2

      指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为您的目录的实际路径。

      该命令在当前目录中创建一个名为 debezium-container-for-sqlserver.yaml 的 Dockerfile。

    2. 从您在上一步中创建的 debezium-container-for-sqlserver.yaml Docker 文件中构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:

      podman build -t debezium-container-for-sqlserver:latest .
      docker build -t debezium-container-for-sqlserver:latest .

      前面的命令使用名称 debezium-container-for-sqlserver 构建容器镜像。

    3. 将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器镜像仓库必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:

      podman push <myregistry.io>/debezium-container-for-sqlserver:latest
      docker push <myregistry.io>/debezium-container-for-sqlserver:latest
    4. 创建新的 Debezium SQL Server KafkaConnect 自定义资源(CR)。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 注解和 镜像 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        #...
        image: debezium-container-for-sqlserver  2
      
        ...
      描述

      1

      metadata.annotations 表示 KafkaConnector 资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。

      2

      spec.image 指定为运行 Debezium 连接器而创建的镜像的名称。此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。

    5. 输入以下命令将 KafkaConnect CR 应用到 OpenShift Kafka Connect 环境:

      oc create -f dbz-connect.yaml

      该命令添加一个 Kafka Connect 实例,用于指定为运行 Debezium 连接器而创建的镜像的名称。

  2. 创建一个 KafkaConnector 自定义资源,用于配置 Debezium SQL Server 连接器实例。

    您可以在 .yaml 文件中配置 Debezium SQL Server 连接器,该文件指定连接器的配置属性。连接器配置可能会指示 Debezium 为模式和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略、掩码或截断指定列中的值,这些值是敏感、太大或不需要的。

    以下示例配置了在端口 1433 上连接到 SQL 服务器主机 192.168.99.100 的 Debezium 连接器。此主机有一个名为 testDB 的数据库,名为 customer 的表,inventory-connector-sqlserver 是服务器的逻辑名称。

    SQL Server inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: inventory-connector-sqlserver 1
      labels:
        strimzi.io/cluster: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: 'true'
    spec:
      class: io.debezium.connector.sqlserver.SqlServerConnector 2
      config:
        database.hostname: 192.168.99.100 3
        database.port: 1433 4
        database.user: debezium 5
        database.password: dbz 6
        topic.prefix: inventory-connector-sqlserver 7
        table.include.list: dbo.customers 8
        schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 9
        schema.history.internal.kafka.topic: schemahistory.fullfillment 10
        database.ssl.truststore: path/to/trust-store 11
        database.ssl.truststore.password: password-for-trust-store 12

    表 2.181. 连接器配置设置的描述
    描述

    1

    当使用 Kafka Connect 服务注册时,连接器的名称。

    2

    此 SQL Server 连接器类的名称。

    3

    SQL Server 实例的地址。

    4

    SQL Server 实例的端口号。

    5

    SQL Server 用户的名称。

    6

    SQL Server 用户的密码。

    7

    SQL Server instance/cluster 的主题前缀,它组成一个命名空间,并在使用 Avro converter 时,用于连接器写入的 Kafka 主题、Kafka Connect 模式名称和对应 Avro 模式的命名空间的名称。

    8

    连接器只捕获 dbo.customers 表中的更改。

    9

    此连接器将用于写入和恢复 DDL 语句的 Kafka 代理列表到数据库 schema 历史记录主题。

    10

    连接器将写入和恢复 DDL 语句的数据库架构历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。

    11

    存储服务器的 signer 证书的 SSL 信任存储路径。此属性是必需的,除非禁用数据库加密(database.encrypt=false)。

    12

    SSL truststore 密码。此属性是必需的,除非禁用数据库加密(database.encrypt=false)。

  3. 使用 Kafka Connect 创建连接器实例。例如,如果您在 inventory-connector.yaml 文件中保存 KafkaConnector 资源,您将运行以下命令:

    oc apply -f inventory-connector.yaml

    前面的命令注册 inventory-connector,连接器开始针对 KafkaConnector CR 中定义的 testDB 数据库运行。

验证 Debezium SQL Server 连接器是否正在运行

如果连接器正确启动且没有错误,它会为每个表创建一个主题,这些表配置为捕获连接器。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。

要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:

  • 验证连接器状态。
  • 验证连接器是否生成主题。
  • 验证主题是否填充了用于读取操作的事件("op":"r"),连接器在每个表的初始快照过程中生成的。

先决条件

  • 在 OpenShift 中,Debezium 连接器部署到 Streams for Apache Kafka。
  • 已安装 OpenShift oc CLI 客户端。
  • 访问 OpenShift Container Platform web 控制台。

流程

  1. 使用以下方法之一检查 KafkaConnector 资源的状态:

    • 在 OpenShift Container Platform Web 控制台中:

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaConnector
      3. KafkaConnectors 列表中,点您要检查的连接器名称,如 inventory-connector-sqlserver
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc describe KafkaConnector <connector-name> -n <project>

        例如,

        oc describe KafkaConnector inventory-connector-sqlserver -n debezium

        该命令返回与以下输出类似的状态信息:

        例 2.52. KafkaConnector 资源状态

        Name:         inventory-connector-sqlserver
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-sqlserver
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-sqlserver.inventory
            inventory-connector-sqlserver.inventory.addresses
            inventory-connector-sqlserver.inventory.customers
            inventory-connector-sqlserver.inventory.geom
            inventory-connector-sqlserver.inventory.orders
            inventory-connector-sqlserver.inventory.products
            inventory-connector-sqlserver.inventory.products_on_hand
        Events:  <none>
  2. 验证连接器是否已创建 Kafka 主题:

    • 通过 OpenShift Container Platform Web 控制台。

      1. 导航到 Home Search
      2. Search 页面上,单击 Resources 以打开 Select Resource 框,然后键入 KafkaTopic
      3. KafkaTopics 列表中,点您要检查的主题名称,如 inventory-connector-sqlserver.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc get kafkatopics

        该命令返回与以下输出类似的状态信息:

        例 2.53. KafkaTopic 资源状态

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-sqlserver--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-sqlserver.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-sqlserver.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-sqlserver.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-sqlserver.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-sqlserver.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-sqlserver.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. 检查主题内容。

    • 在终端窗口中输入以下命令:
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    例如,

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-sqlserver.inventory.products_on_hand

    指定主题名称的格式与步骤 1 中返回 的格式相同,例如 inventory-connector-sqlserver.inventory.addresses

    对于主题中的每个事件,命令会返回类似以下输出的信息:

    例 2.54. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"sqlserver","name":"inventory-connector-sqlserver","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"sqlserver-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}

    在前面的示例中,有效负载 值显示连接器快照从表 inventory.products_on_hand 生成一个读取("op" ="r")事件。product_id 记录的 "before" 状态为 null,表示记录没有之前的值。"after" 状态对于 product_id101 的项目的 quantity 显示为 3

有关您可以为 Debezium SQL Server 连接器设置的配置属性的完整列表,请参阅 SQL Server 连接器属性

结果

当连接器启动时,它会执行为连接器配置的 SQL Server 数据库的一致性快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流传输到 Kafka 主题。

2.7.4.4. Debezium SQL Server 连接器配置属性的描述

Debezium SQL Server 连接器有许多配置属性,您可以使用它们来实现应用程序的正确连接器行为。许多属性具有默认值。

有关属性的信息按如下方式进行组织:

所需的 Debezium SQL Server 连接器配置属性

除非默认值可用 否则需要以下配置属性。

属性默认描述

name

没有默认值

连接器的唯一名称。尝试使用相同名称再次注册将失败。(所有 Kafka 连接连接器都需要此属性。)

connector.class

没有默认值

连接器的 Java 类的名称。对于 SQL Server 连接器,始终使用 io.debezium.connector.sqlserver.SqlServerConnector 的值。

tasks.max

1

指定连接器用于从数据库实例捕获数据的最大任务数。

database.hostname

没有默认值

SQL Server 数据库服务器的 IP 地址或主机名。

database.port

1433

SQL Server 数据库服务器的整数端口号。如果同时指定了 database.portdatabase.instance,则忽略 database.instance。如需了解更多详细信息,请参阅 SQL 服务器文档的 JDBC 驱动程序

database.user

没有默认值

连接到 SQL Server 数据库服务器时使用的用户名。在使用 Kerberos 身份验证时可以省略,可以使用 直通属性进行配置

database.password

没有默认值

连接到 SQL Server 数据库服务器时要使用的密码。

database.instance

没有默认值

指定 SQL 服务器名称实例的实例名称。如果同时指定了 database.portdatabase.instance,则忽略 database.instance。如需了解更多详细信息,请参阅 SQL 服务器文档的 JDBC 驱动程序

topic.prefix

没有默认值

为您要捕获的 SQL Server 数据库服务器提供命名空间的主题前缀。前缀应该在所有其他连接器之间唯一,因为它用作从这个连接器接收记录的所有 Kafka 主题名称的前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。

警告

不要更改此属性的值。如果您更改了 name 值,重启后,而不是继续向原始主题发送事件,连接器会将后续事件发送到名称基于新值的主题。连接器也无法恢复其数据库架构历史记录主题。

schema.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您要 捕获更改的模式名称匹配。没有包含在 schema. include.list 中的架构 名称都会被捕获。默认情况下,连接器捕获所有非系统模式的更改。

要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配,它与 schema 名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 schema.exclude.list 属性。

schema.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与 您不想 捕获更改的模式名称匹配。任何名称不包含在 schema. exclude.list 中的模式 都会捕获其更改,但系统模式除外。

要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配,它与 schema 名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,请不要设置 schema.include.list 属性。

table.include.list

没有默认值

可选的正则表达式列表,它匹配您希望 Debezium 捕获的表的完全限定表标识符。默认情况下,连接器捕获指定模式的所有非系统表。当设置此属性时,连接器只从指定的表中捕获更改。每个标识符都是 schemaName.tableName 的形式。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。
如果您在配置中包含此属性,不要设置 table.exclude.list 属性。

table.exclude.list

没有默认值

可选的正则表达式列表,其与您要从捕获中排除的表的完全限定表标识符匹配。Debezium 捕获 table.exclude.list 中没有包括的所有表。每个标识符都是 schemaName.tableName 的形式。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。
如果您在配置中包含此属性,不要设置 table.include.list 属性。

column.include.list

空字符串

可选的正则表达式列表,与 change 事件消息值中包含的列的完全限定域名匹配。列的完全限定域名格式为 schemaName.tableName.columnName

注意

Debezium 为表发出的每个更改事件记录都包含一个事件键,其中包含表的主键或唯一键中每个列的字段。为确保正确生成事件密钥,如果您设置了此属性,请务必明确列出任何捕获的表的主键列。

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列中的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 column.exclude.list 属性。

column.exclude.list

空字符串

可选的正则表达式列表,与应排除在更改事件消息值中排除的列的完全限定名称匹配。列的完全限定域名格式为 schemaName.tableName.columnName。请注意,主键列始终包含在事件的键中,也会从值中排除。

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列中的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 column.include.list 属性。

skip.messages.without.change

false

指定在包含的列中没有更改时跳过发布消息。如果列没有包括每个 column.include.listcolumn.exclude.list 属性的更改,这将过滤消息。

column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。列的完全限定域名格式为 `<schemaName>.<tableName>.<columnName>`.
要匹配 Debezium 的名称,请应用您指定为 _anchored
正则表达式的正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。在生成的更改事件记录中,指定列的值替换为 pseudonyms。

一个 pseudonym,它包括了通过应用指定的 hashAlgorithmsalt 的结果的哈希值。根据使用的 hash 功能,引用完整性会被维护,而列值则替换为 pseudonyms。Java 加密架构标准算法 文档的 MessageDigest 部分中 描述了支持的哈希功能。

在以下示例中,CzQMA0cB5K 是一个随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如有必要,pseudonym 会自动缩短到列的长度。连接器配置可以包含多个属性,以指定不同的哈希算法和 salt。

根据使用的 hashAlgorithm,所选的 salt 和实际数据集,可能无法完全屏蔽。

应该使用哈希策略版本 2 来确保在不同位置或系统中对值进行哈希处理。

time.precision.mode

自适应性

时间、日期和时间戳可以以不同的精度类型代表:adaptive(默认)基于数据库栏的类型,使用 millisecond, microsecond, 或 nanosecond 精度值,捕获数据库中的时间和时间戳;或 connect 始终使用 Kafka Connect 的内置的 Time, Date, 和 Timestamp 的代表(无论数据库栏的精度,始终使用 millisecond 精度)来表示时间和时间戳的值。如需更多信息,请参阅 临时值

decimal.handling.mode

precise

指定连接器如何处理 DECIMALNUMERIC 列:

precise(默认值)代表它们准确使用二进制格式更改事件中的 java.math.BigDecimal 值。

double 表示它们使用 double 值,这可能会丢失一些精度但更容易使用。

string 将值编码为格式化的字符串,它容易使用,但提供有关实际类型的语义信息会丢失。

include.schema.changes

true

布尔值指定连接器是否将数据库 schema 中的更改发布到 Kafka 主题,其名称与主题前缀相同。连接器使用包含数据库名称的键记录每个架构更改,以及一个描述 schema 更新的 JSON 结构的值。这个记录模式更改的机制独立于连接器的内部记录数据库架构历史记录。

tombstones.on.delete

true

控制 删除 事件是否后跟 tombstone 事件。

true - 一个 delete 事件表示一个 delete 事件和后续的 tombstone 事件。

false - 仅有一个 delete 事件被抛出。

删除源记录后,发出 tombstone 事件(默认行为)可让 Kafka 完全删除与删除行的密钥相关的所有事件,以防为主题启用了 日志压缩

column.truncate.to.length.chars

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。如果在列中的数据超过了在属性名中的 length 指定的字符长度时删节数据,设置此属性。将 length 设置为正整数值,例如 column.truncate.to.20.chars

一个列的完全限定名称会观察以下格式:< schemaName> . <tableName &gt; . <columnName&gt;。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

您可以在单个配置中指定多个长度不同的属性。

column.mask.with.length.chars

n/a Fully-qualified name 的格式为 schemaName.tableName.columnName

可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定域名匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将 length 设置为一个正整数,替换在属性名称中的 length 指定的星号(*)的数量列中的数据。将 length 设置为 0 (zero),将指定列中的数据替换为空字符串。

列的完全限定域名观察以下格式: schemaName.tableName.columnName。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

您可以在单个配置中指定多个长度不同的属性。

column.propagate.source.type

不适用

可选的、以逗号分隔的正则表达式列表,与您希望连接器发送代表列元数据的额外参数匹配。当设置此属性时,连接器将以下字段添加到事件记录的 schema 中:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发送此额外数据有助于在 sink 数据库中正确调整特定数字或基于字符的列。

列的完全限定域名观察以下格式: schemaName.tableName.columnName
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。

datatype.propagate.source.type

不适用

可选的、以逗号分隔的正则表达式列表,用于指定为数据库中列定义的数据类型的完全限定名称。当设置此属性时,对于带有匹配数据类型的列,连接器会发出事件记录,该记录在其 schema 中包含以下额外字段:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发送此额外数据有助于在 sink 数据库中正确调整特定数字或基于字符的列。

列的完全限定域名观察以下格式: schemaName.tableName.typeName
要匹配数据类型的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与数据类型的整个名称字符串匹配;表达式不匹配类型名称中可能存在的子字符串。

有关 SQL Server 特定数据类型名称列表,请参阅 SQL Server 数据类型映射

message.key.columns

不适用

一个表达式列表,用于指定连接器用来组成自定义消息键的表达式列表,用于更改它发布到指定表的 Kafka 主题。

默认情况下,Debebe 使用表的主键列作为发出的记录的消息键。使用默认键,或者为缺少主密钥的表指定一个键,您可以根据一个或多个列配置自定义消息密钥。

要为表建立自定义消息密钥,请列出表,后跟要用作消息键的列。每个列表条目都采用以下格式:

<fully-qualified_tableName> : & lt;keyColumn&gt;, <keyColumn>

To base a table key on multiple column name, insert commas between the column name.

每个完全限定表名称都是以下格式的正则表达式:

<schemaName> . & lt;tableName>

属性可以包含多个表的条目。使用分号分隔列表中的表条目。

以下示例为表 inventory.customerspurchase.orders 设置了消息键:

inventory.customers:pk1,pk2; (configured).purchaseorders:pk3,pk4

用于表 inventory.customer,列 pk1pk2 指定为消息键。对于任何模式中的 purchaseorders 表,列 pk3pk4 服务器作为消息键。

对用来创建自定义消息键的列数量没有限制。但是,最好使用指定唯一密钥所需的最小数量。

binary.handling.mode

bytes

指定在更改事件中二进制(二进制 ,varbinary )列如何表示更改事件,包括: bytes 代表二进制数据作为字节数组(默认),base64 代表二进制数据作为 base64 编码的字符串,base64-url-safe 代表二进制数据作为十六进制编码(base16)字符串

schema.name.adjustment.mode

none

指定应该如何调整模式名称,以便与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是转义序列,如 Java 中的反斜杠

field.name.adjustment.mode

none

指定应如何调整字段名称,以便与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是转义序列,如 Java 中的反斜杠

如需更多信息,请参阅 Avro 命名

高级 SQL Server 连接器配置属性

以下 高级配置 属性具有在大多数情况下可以正常工作的良好默认值,因此很少需要在连接器配置中指定。

属性默认描述

converters

没有默认值

枚举连接器可以使用的 自定义转换器 实例的符号链接名称列表。例如,

isbn

您必须设置 converters 属性,以便连接器可以使用自定义转换器。

对于您为连接器配置的每个转换器,还必须添加一个 .type 属性,它指定实现转换器接口的类的完全限定域名。.type 属性使用以下格式:

<converterSymbolicName>.type

例如,

isbn.type: io.debezium.test.IsbnConverter

如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数来将值传递给转换器。要将任何其他配置参数与转换器关联,请为参数名称添加转换器符号名称作为前缀。例如,

isbn.schema.name: io.debezium.sqlserver.type.Isbn

snapshot.mode

Initial

对结构的初始快照以及捕获的表(可选)数据的模式。快照完成后,连接器将继续从数据库的红色日志中读取更改事件。支持以下值:

always
在每个连接器启动时执行快照。快照完成后,连接器将开始流传输后续数据库更改的事件记录。
Initial
连接器执行数据库快照,如用于创建初始快照的默认工作流 中所述。快照完成后,连接器将开始流传输后续数据库更改的事件记录。
initial_only
连接器执行数据库快照并在流传输任何更改事件记录前停止,不允许捕获任何后续更改事件。
schema_only
弃用,请参阅 no_data
no_data
连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但它没有创建 READ 事件来代表连接器启动时设置的数据集(Step 7.b)。
recovery

设置这个选项以恢复丢失或损坏的数据库 schema 历史记录主题。重启后,连接器运行一个从源表中重建主题的快照。您还可以设置属性,以定期修剪遇到意外增长的数据库 schema 历史记录主题。

警告

如果在上次连接器关闭后将 schema 更改提交到数据库,则不要使用此模式执行快照。

when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

snapshot.locking.mode

exclusive

控制连接器保存表锁定的时长。表锁定可防止在连接器执行快照时发生某些类型的更改表操作。您可以设置以下值:

exclusive
控制当 snapshot.isolation.modeREPEATABLE_READEXCLUSIVE 时,连接器如何在表中保存锁定。
连接器会保存一个表锁定,以便只对快照的初始部分进行专用表访问,同时读取数据库模式和其他元数据。快照中的其余工作涉及从每个表中选择所有行,这使用不需要锁定的闪存查询来完成。然而,在某些情况下,可能需要避免锁定,这可以通过指定 none 来完全完成。只有在创建快照时没有模式更改时,这个模式才安全使用。
none
防止连接器在快照过程中获取任何表锁定。只有在创建快照时,仅在没有模式更改时才使用此设置。

snapshot.query.mode

select_all

指定连接器在执行快照时如何查询数据。
设置以下选项之一:

select_all
连接器默认执行 选择所有查询,可以选择根据列包含和排除列表配置调整所选列。

与使用 snapshot.select.statement.overrides 属性相比,此设置可让您以更灵活的方式管理快照内容。

snapshot.include.collection.list

table.include.list中指定的所有表

一个可选的、以逗号分隔的正则表达式列表,匹配表的完全限定名 (<dbName>.<schemaName>.<tableName>) 以包括在快照中。指定的项目必须在连接器的 table.include.list 属性中命名。只有在连接器的 snapshot.mode 属性设置为 never 以外的值时,此属性才会生效。
此属性不会影响增量快照的行为。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名称中可能存在的子字符串。

snapshot.isolation.mode

repeatable_read

模式,控制使用哪个事务隔离级别,以及连接器锁定用于捕获的时长。支持以下值:

  • read_uncommitted
  • read_committed
  • repeatable_read
  • snapshot
  • exclusive (exclusive 模式使用可重复的读取隔离级别,但对要读取的所有表采用独占锁定)。

快照read_committedread_uncommitted 模式不会阻止其他事务在初始快照期间更新表行。exclusiverepeatable_read 模式阻止并发更新。

模式选择也会影响数据一致性。只有 独占 和快照 模式可以保证完整的一致性,即初始快照和流日志组成线性历史记录。如果是 repeatable_readread_committed 模式,则可能会出现,例如,添加的记录会在初始快照中出现两次 - 一次在流传输阶段。但是,对于数据镜像,该一致性级别应该这样做。对于 read_uncommitted,根本没有数据一致性保证(某些数据可能丢失或损坏)。

event.processing.failure.handling.mode

fail

指定连接器在处理事件时应如何响应异常。失败 将传播异常(代表有问题的事件的偏移),从而导致连接器停止。
警告 会导致跳过有问题的事件,并记录有问题的事件的偏移。
跳过 将导致跳过有问题的事件。

poll.interval.ms

500 (0.5 秒)

正整数值,用于指定连接器在检查数据库是否有新更改事件前等待的时间。

您指定的值会影响 heartbeat.interval.ms 的行为。连接器只能在指定的轮询周期内发出心跳消息。

要防止此设置延迟心跳性,请将其设为小于或等于 heartbeat.interval.ms 的值的值。

max.queue.size

8192

正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会在将事件写入 Kafka 前将事件放在阻塞队列中。当连接器比将信息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可能会提供从数据库读取更改事件的情况。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。

max.queue.size.in.bytes

0

指定阻塞队列的最大卷的长整数值,以字节为单位。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。
如果也设置了 max.queue.size,当队列的大小达到由任一属性指定的限制时,写入队列会被阻断。例如,如果您设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 个记录后写入队列会被阻断,或者在队列中的记录卷达到 5000 字节。

max.batch.size

2048

正整数值,用于指定应在每个迭代此连接器期间处理的每个批处理事件的最大值。

heartbeat.interval.ms

0

指定决定连接器向 Kafka heartbeat 主题发送消息的频率(以毫秒为单位),无论数据库中是否发生更改。
默认情况下,连接器不会发送 heartbeat 信息。

设置此属性可帮助确认连接器是否仍然从数据库接收更改事件。在数据库中,在长时间中,捕获的表保持不变。当数据库遇到长时间的间隔时,在捕获的表中不会发生任何更改,虽然连接器照常从事务日志读取,但它只需要很少将偏移值提交到 Kafka。因此,在连接器重启后,因为偏移值是 stale,连接器必须发送大量更改事件。

相反,当您将连接器配置为发送常规心跳信息时,它可以更频繁地更新 Kafka 中的偏移量。因为 Kafka 中的偏移值保持最新,所以必须在连接器重启后重新显示较少的更改事件。

注意

心跳仅在轮询周期期间发出。也就是说,在 Debezium 环境中,发送心跳消息之间的实际间隔由 heartbeat.interval.mspoll.interval.ms 属性的设置共同控制。发送心跳消息的实际频率基于这两个值中的较低。要防止发送心跳消息的延迟,降低其效率,请将此属性设置为大于或等于 poll.interval.ms 的值的值。例如,如果您将 poll.interval.ms 设置为 100,请将 heartbeat.interval.ms 设置为 5000

heartbeat.action.query

没有默认值

指定当连接器发送心跳消息时连接器在源数据库上执行的查询。

这在从低流量数据库捕获更改时保持误差变得过时。在 low-traffic 数据库中创建一个心跳表,将此属性设置为将记录插入到该表的声明中,例如:

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

允许从低流量数据库接收更改并确认其 LSNs,这可防止偏移变得过时。

snapshot.delay.ms

没有默认值

连接器在启动后执行快照前应等待的时间间隔;
可以用来避免在集群中启动多个连接器时进行快照中断,这可能会导致连接器的重新平衡。

streaming.delay.ms

0

指定连接器在完成快照后延迟流过程启动的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后马上重启快照,但在流传输过程开始前。设置一个延迟值,它高于为 Kafka Connect worker 设置的 offset.flush.interval.ms 属性的值。

snapshot.fetch.size

2000

指定在拍摄快照时应在每个表中读取的最大行数。连接器将在这个大小的多个批处理中读取表内容。默认值为 2000。

query.fetch.size

没有默认值

指定给定查询的每个数据库往返获取的行数。默认为 JDBC 驱动程序的默认获取大小。

snapshot.lock.timeout.ms

10000

整数值,用于指定在执行快照时等待获取表锁定的最大时间(以毫秒为单位)。如果无法在这个时间段内获取表锁定,则快照将失败(也查看 快照)。
当设置为 0 时,当无法获取锁定时连接器会立即失败。value -1 表示无限等待。

snapshot.select.statement.overrides

没有默认值

指定要包含在快照中的表行。如果您希望快照只包括表中的行的子集,请使用该属性。此属性仅影响快照。它不适用于连接器从日志读取的事件。

属性包含以逗号分隔的完全限定表名称列表,格式为 < schemaName>.<tableName&gt;。例如,

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

用于列表中的每个表,添加一个进一步的配置属性,指定连接器在获取快照时在表上运行的 SELECT 语句。指定的 SELECT 语句决定了快照中包含的表行子集。使用以下格式指定此 SELECT 语句属性的名称:

snapshot.select.statement.overrides. <schemaName> . < tableName>.例如,snapshot.select.statement.overrides.customers.orders

Example:

从包括软删除列 delete_flagcustomers.orders 表中,如果您希望快照只包含没有软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器只包含 delete_flag = 0 的记录。

provide.transaction.metadata

false

当设置为 true Debezium 时,生成带有事务边界的事件,并使用事务元数据增强数据事件。

retriable.restart.connector.wait.ms

10000 (10 秒)

在发生可分配错误后重启连接器前要等待的 milli-seconds 数量。

skipped.operations

t

以逗号分隔的操作类型列表,您希望连接器在流传输过程中跳过。您可以将连接器配置为跳过以下类型的操作:

  • c (插入/创建)
  • u (update)
  • D (删除)
  • T (截断)

如果您不希望连接器跳过任何操作,请将值设为 none。因为 Debezium SQL Server 连接器不支持 truncate 更改事件,所以设置默认的 t 值与将值设置为 none 的作用相同。

signal.data.collection

没有默认值

用于向连接器发送信号的数据收集的完全限定名称。https://docs.redhat.com/documentation/en/red_hat_build_of_debezium/3.0.8/html-single/debezium_user_guide/index#debezium-signaling-enabling-source-signaling-channel
使用以下格式指定集合名称:
<databaseName> . <schemaName& gt; . &lt;tableName>

signal.enabled.channels

source

为连接器启用的信号通道名称列表。默认情况下,以下频道可用:

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

没有默认值

为连接器启用的通知频道名称列表。默认情况下,以下频道可用:

  • sink
  • log
  • jmx

incremental.snapshot.allow.schema.changes

false

允许增量快照期间进行 schema 更改。启用连接器后,连接器将在增量快照期间检测到模式变化,并重新选择当前块以避免锁定 DDLs。

请注意,不支持对主键的更改,如果在增量快照中执行,可能会导致不正确的结果。另一个限制是,如果模式更改仅影响列的默认值,则不会检测到更改,直到从事务日志流处理 DDL。这不会影响快照事件的值,但快照事件的 schema 可能具有过时的默认值。

incremental.snapshot.chunk.size

1024

连接器在增量快照块期间获取并读取的最大行数。增加块大小可提供更高的效率,因为快照会减少对更大大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。

incremental.snapshot.watermarking.strategy

insert_insert

指定连接器在增量快照中使用的水位线机制,以重复数据删除事件,这些事件可能会被增量快照捕获,然后在流恢复后重新捕获。
您可以指定以下选项之一:

insert_insert
当您发送一个信号来启动增量快照时,对于 Debezium 在快照期间读取的每个块,它会将条目写入信号数据收集来记录信号,以打开快照窗口。快照完成后,Debezium 会插入第二个条目,记录信号以关闭窗口。
insert_delete
当您发送一个信号来启动增量快照时,对于 Debezium 读取的每个块,它会将单个条目写入信号数据收集,以记录信号来打开快照窗口。快照完成后,会删除此条目。不会为关闭快照窗口的信号创建条目。设置这个选项以防止快速增长信号数据收集。

max.iteration.transactions

500

指定每个迭代的最大事务数,以便在从数据库中的多个表流传输更改时减少内存占用量。当设置为 0 时,连接器使用当前的最大 LSN 作为从中获取更改的范围。当设置为大于零的值时,连接器使用此设置指定的第 n-th LSN 作为从中获取更改的范围。默认值为 500。

incremental.snapshot.option.recompile

false

将 OPTION (RECOMPILE)查询选项用于增量快照期间使用的所有 SELECT 语句。这有助于解决可能出现的参数嗅探问题,但可能会导致源数据库上的 CPU 负载增加,具体取决于查询执行的频率。

topic.naming.strategy

io.debezium.schema.SchemaTopicNamingStrategy

应用于确定数据更改的主题名称、模式更改、事务、心跳事件等主题名称,默认为 SchemaTopicNamingStrategy

topic.delimiter

.

指定主题名称的分隔符,默认为

topic.cache.size

10000

在绑定的并发散列映射中保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。

topic.heartbeat.prefix

__debezium-heartbeat

控制连接器发送心跳消息的主题名称。主题名称具有此模式:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

topic.transaction

transaction

控制连接器发送事务元数据消息的主题名称。主题名称具有此模式:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认的主题名称是 fulfillment.transaction

如需更多信息,请参阅 事务元数据

snapshot.max.threads

1

指定连接器在执行初始快照时使用的线程数量。要启用并行初始快照,请将 属性设置为大于 1 的值。在并行初始快照中,连接器同时处理多个表。

注意

当您启用并行初始快照时,执行每个表快照的线程可能需要不同的时间来完成它们的工作。如果一个表的快照需要比其他表的快照完成的时间要长得多,则线程已完成其工作闲置。在某些环境中,网络设备(如负载均衡器或防火墙)会终止闲置以延长的时间间隔的连接。快照完成后,连接器无法关闭连接,从而导致异常和不完整的快照,即使连接器成功传输所有快照数据。

如果您遇到这个问题,请将 snapshot.max.threads 的值恢复到 1,然后重试快照。

custom.metric.tags

没有默认值

通过添加提供上下文信息的元数据来定义自定义 MBean 对象名称的标签。指定以逗号分隔的键值对列表。每个键代表 MBean 对象名称的标签,对应的值代表键的值,例如
k1=v1,k2=v2

连接器将指定的标签附加到基础 MBean 对象名称。标签可帮助您组织和分类指标数据。您可以定义标签来标识特定的应用程序实例、环境、区域、版本等。如需更多信息,请参阅自定义 MBean 名称

errors.max.retries

-1

指定连接器如何在生成 Retriable 错误的操作后响应,如连接错误。
设置以下选项之一:

-1
无限制。无论之前失败的数量如何,连接器总是自动重启,并重试操作。
0
disabled。连接器会立即失败,永远不会重试操作。重启连接器需要用户干预。
> 0
连接器会自动重启,直到达到指定的最大重试次数。下一次失败后,连接器会停止,用户需要干预才能重启它。

data.query.mode

function

控制连接器查询 CDC 数据的方式。支持以下模式:

  • function: 数据通过调用 cdc.[fn_cdc_get_all_changes_#] 函数来查询。这是默认的模式。
  • 直接 :使连接器直接查询更改表。切换到 直接模式 并在 (__$start_lsn ASC, __$seqval ASC, __$operation ASC) 列上创建一个索引,每个更改表都会显著加快查询 CDC 数据。

database.query.timeout.ms

600000 (10 分钟)

指定连接器等待查询完成的时间(以毫秒为单位)。将值设为 0 (零)以删除超时限制。

Debezium SQL Server 连接器数据库模式历史记录配置属性

Debezium 提供了一组 schema.history.internal.* 属性,用于控制连接器如何与 schema 历史记录主题进行交互。

下表描述了用于配置 Debezium 连接器的 schema.history.internal 属性。

表 2.182. 连接器数据库架构历史记录配置属性
属性默认描述

schema.history.internal.kafka.topic

没有默认值

连接器存储数据库 schema 历史记录的 Kafka 主题的完整名称。

schema.history.internal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索之前由连接器存储的数据库架构历史记录,以及写入从源数据库读取的每个 DDL 语句。每个对都应该指向 Kafka Connect 进程使用的相同 Kafka 集群。

schema.history.internal.kafka.recovery.poll.interval.ms

100

整数值,指定连接器在启动/恢复期间应等待的最大毫秒数,同时轮询持久数据。默认值为 100ms。

schema.history.internal.kafka.query.timeout.ms

3000

指定连接器在使用 Kafka admin 客户端获取集群信息时应等待的最大毫秒数。

schema.history.internal.kafka.create.timeout.ms

30000

指定连接器在使用 Kafka admin 客户端创建 kafka 历史记录主题时应等待的最大毫秒数。

schema.history.internal.kafka.recovery.attempts

100

连接器在连接器恢复失败前尝试读取持久性历史记录数据的次数上限,并显示错误。没有数据后等待的最长时间是 restore.attempts recovery. poll. poll.interval.ms

schema.history.internal.skip.unparseable.ddl

false

指定连接器是否应该忽略不正确的或未知数据库语句的布尔值,或停止处理,以便用户可以解决这个问题。安全默认为 false。跳过应仅谨慎使用,因为它可以在处理 binlog 时导致数据丢失或手动忽略。

schema.history.internal.store.only.captured.tables.ddl

false

指定连接器是否记录模式或数据库中所有表的布尔值,或者仅从指定为捕获的表记录。
指定以下值之一:

false (默认)
在数据库快照中,连接器记录了数据库中所有非系统表的 schema 数据,包括没有指定用于捕获的表。最好保留默认设置。如果您稍后决定从您最初为捕获的表捕获更改,则连接器可以轻松地从这些表中捕获数据,因为其模式结构已存储在 schema 历史记录主题中。Debezium 需要表的 schema 历史记录,以便它可识别在更改事件发生时存在的结构。
true
在数据库快照中,连接器只记录 Debezium 捕获更改事件的表模式。如果您更改了默认值,且稍后将连接器配置为从数据库中的其他表捕获数据,连接器缺少从表中捕获更改事件所需的模式信息。

schema.history.internal.store.only.captured.databases.ddl

false

指定连接器是否从数据库实例中的所有逻辑数据库记录模式结构的布尔值。
指定以下值之一:

true
连接器只记录逻辑数据库中表的架构结构,以及 Debezium 捕获更改事件的 schema。
false
连接器记录所有逻辑数据库的模式结构。

透传 SQL Server 连接器配置属性

连接器支持 通过传递 属性,使 Debezium 指定自定义配置选项来微调 Apache Kafka producer 和消费者的行为。有关 Kafka 生成者和消费者的完整配置属性范围的详情,请参考 Kafka 文档

直通属性,用于配置生成者和消费者客户端如何与 schema 历史记录主题交互

Debezium 依赖于 Apache Kafka producer 将 schema 更改写入数据库 schema 历史记录主题。同样,它依赖于 Kafka 使用者在连接器启动时从数据库 schema 历史记录主题中读取。您可以通过将值分配给一组以 schema.history.internal.producer和 schema.history.internal.consumerPromQL 前缀开头的 pass-through 配置属性来定义 Kafka producer消费者 客户端的配置。pass-through producer 和 consumer 数据库模式历史记录属性控制一系列行为,如这些客户端如何与 Kafka 代理安全连接,如下例所示:

schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234

schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234

Debezium 在将属性传递给 Kafka 客户端前从属性名称中剥离前缀。

有关 Kafka producer 配置属性和 Kafka 使用者配置属性的更多信息,请参阅 Apache Kafka 文档。

用于配置 SQL Server 连接器如何与 Kafka 信号交互的直通属性

Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题进行交互。

下表描述了 Kafka 信号 属性。

表 2.183. Kafka 信号配置属性
属性默认描述

signal.kafka.topic

<topic.prefix>-signal

连接器监控用于临时信号的 Kafka 主题的名称。

注意

如果禁用 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号顺序。信号主题必须具有单个分区。

signal.kafka.groupId

kafka-signal

Kafka 用户使用的组 ID 的名称。

signal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机和端口对列表。每个对引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

signal.kafka.poll.timeout.ms

100

整数值,用于指定连接器在轮询信号时等待的最大毫秒数。

为信号频道配置 Kafka 消费者客户端的属性

Debezium 连接器提供信号 Kafka 使用者的直通配置。透传信号属性以 signals.consumer.* 前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 使用者。

Debezium 在将属性传递给 Kafka 信号消费者前从属性中剥离前缀。

用于配置 SQL Server 连接器接收器通知频道的直通属性

下表描述了可用于配置 Debezium sink 通知 频道的属性。

表 2.184. sink 通知配置属性
属性默认描述

notification.sink.topic.name

没有默认值

从 Debezium 接收通知的主题名称。当您将 notification.enabled.channels 属性配置为包含 sink 作为启用的通知频道之一时,需要此属性。

Debezium 连接器传递数据库驱动程序配置属性

Debezium 连接器提供数据库驱动程序的直通配置。透传数据库属性以前缀 driverPROFILE 开头。例如,连接器将 driver.foobar=false 等属性传递给 JDBC URL。

Debezium 在将属性传递给数据库驱动程序前从属性中剥离前缀。

2.7.5. 在 schema 更改后刷新捕获表

当为 SQL Server 表启用数据捕获时,如表中的更改,事件记录会被保留到服务器上的捕获表中。如果您引入源表更改的结构的变化,例如,通过添加新列,则该更改不会动态反映在更改表中。只要捕获表继续使用过时的模式,Debezium 连接器无法正确为表发送数据更改事件。您必须干预来刷新捕获表,以便连接器恢复处理更改事件。

由于在 SQL Server 中实现 CDC 的方式,您无法使用 Debezium 更新捕获表。要刷新捕获表,必须是具有升级权限的 SQL Server 数据库 operator。作为 Debezium 用户,您必须使用 SQL Server 数据库 operator 协调任务,以完成 schema 刷新并恢复到 Kafka 主题。

您可以使用以下任一方法在模式更改后更新捕获表:

使用每种流程都有优点和缺点。

警告

无论您使用在线还是离线更新方法,在在同一源表中应用后续模式更新前,您必须完成整个架构更新过程。最佳实践是在单一批处理中执行所有 DDL,以便只能运行一次。

注意

在启用了 CDC 的源表上不支持一些模式更改。例如,如果在表上启用了 CDC,如果您重命名了其中一个列或更改列类型,则 SQL Server 不允许您更改表的模式。

注意

在将源表中的列从 NULL 改为 NOT NULL 或 versa 后,SQL Server 连接器无法正确捕获更改的信息,直到您创建新的捕获实例为止。如果您在列设计后没有创建新的捕获表,请更改连接器发出的事件记录没有正确地指示列是否是可选的。也就是说,之前定义为可选(或 NULL)的列仍然是,尽管现在定义为 NOT NULL。同样,已根据需要定义的列(不是 NULL),保留这种设计,尽管它们现在定义为 NULL

注意

使用 sp_rename 功能重命名表后,它将继续在旧源表名称下发出更改,直到连接器重启为止。在连接器重启后,它将在新源表名称下发出更改。

2.7.5.1. 在 schema 更改后运行离线更新

离线 schema 更新提供了更新捕获表的安全方法。但是,离线更新可能不适用于需要高可用性的应用程序。

先决条件

  • 更新已提交到启用了 CDC 的 SQL Server 表的 schema。
  • 您是一个具有升级权限的 SQL Server 数据库 operator。

流程

  1. 暂停更新数据库的应用程序。
  2. 等待 Debezium 连接器流所有未流的更改事件记录。
  3. 停止 Debezium 连接器。
  4. 将所有更改应用到源表模式。
  5. 使用 sys.sp_cdc_enable_table 程序为 update 源表创建一个新的捕获表,其参数为 @capture_instance
  6. 恢复您在第 1 步中暂停的应用程序。
  7. 启动 Debezium 连接器。
  8. 在 Debezium 连接器从新捕获表开始流后,通过运行存储的步骤 sys.sp_cdc_disable_table 来丢弃旧的捕获表,并将参数 @capture_instance 设置为旧的捕获实例名称。

2.7.5.2. 在 schema 更改后运行在线更新

完成在线模式更新的过程比运行离线模式更新的步骤简单,您可以完成它,而无需应用程序和数据处理中的停机。但是,随着在线架构更新,在更新源数据库中的 schema 后可能会发生潜在的处理差距,但在创建新的捕获实例之前。在这个间隔中,更改事件将继续被更改表的旧实例捕获,保存至旧表的更改数据会保留之前模式的结构。因此,例如,如果您向源表添加了新列,请在新捕获表前生成的更改事件就绪,请不要包含新列的字段。如果您的应用程序不容许这样的过渡周期,最好使用离线 schema 更新过程。

先决条件

  • 更新已提交到启用了 CDC 的 SQL Server 表的 schema。
  • 您是一个具有升级权限的 SQL Server 数据库 operator。

流程

  1. 将所有更改应用到源表模式。
  2. 通过运行 sys.sp_cdc_enable_table 存储并具有参数 @capture_instance 的唯一值,为更新源表创建一个新的捕获表。
  3. 当 Debezium 从新的捕获表中开始流时,您可以通过运行 sys.sp_cdc_disable_table 存储的步骤来丢弃旧的捕获表,并将参数 @capture_instance 设置为旧的捕获实例名称。

示例:在数据库架构更改后运行在线模式更新

以下示例演示了如何在将列 phone_number 添加到 customer 源表后,在更改表中完成在线模式更新。

  1. 运行以下查询来修改 customers 源表的模式,以添加 phone_number 字段:

    ALTER TABLE customers ADD phone_number VARCHAR(32);
  2. 通过运行 sys.sp_cdc_enable_table 存储的步骤创建新捕获实例。

    EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2';
    GO
  3. 运行以下查询将新数据插入到 customers 表中:

    INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456');
    GO

    Kafka Connect 日志通过类似以下消息的条目报告配置更新:

    connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
    connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
    ...
    connect_1    | 2019-01-17 10:11:33,719 INFO   ||  Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]

    最后,phone_number 字段添加到 schema 中,其值会出现在写入 Kafka 主题的消息中。

    ...
         {
            "type": "string",
            "optional": true,
            "field": "phone_number"
         }
    ...
        "after": {
          "id": 1005,
          "first_name": "John",
          "last_name": "Doe",
          "email": "john.doe@example.com",
          "phone_number": "+1-555-123456"
        },
  4. 通过运行 sys.sp_cdc_disable_table 存储的步骤丢弃旧的捕获实例。

    EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers';
    GO

2.7.6. 监控 Debezium SQL Server 连接器性能

Debezium SQL Server 连接器提供三种类型的指标,除了对 Zookeeper、Kafka 和 Kafka Connect 提供的 JMX 指标的支持外。连接器提供以下指标:

有关如何通过 JMX 公开上述指标的详情,请参考 Debezium 监控文档

2.7.6.1. SQL Server 连接器快照和流 MBean 对象的自定义名称

Debezium 连接器通过 MBean 名称为连接器公开指标。这些指标(特定于每个连接器实例)提供有关连接器快照、流和架构历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为每个不同的连接器指标生成一个唯一的 MBean 名称。要查看连接器进程的指标,您可以将可观察性堆栈配置为监控其 MBean。但是,这些默认 MBean 名称取决于连接器配置,但配置更改可能会导致对 MBean 名称的更改。对 MBean 名称的更改会破坏连接器实例和 MBean 之间的链接,并破坏监控活动。在这种情况下,如果要恢复监控,您必须重新配置 observability 堆栈以使用新的 MBean 名称。

要防止监控 MBean 名称更改结果的中断,您可以配置自定义指标标签。您可以通过在连接器配置中添加 custom.metric.tags 属性来配置自定义指标。属性接受键值对,其中每个键代表 MBean 对象名称的标签,对应的值代表该标签的值。例如: k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称中。

为连接器配置 custom.metric.tags 属性后,您可以配置 observability 堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签,而不是可变 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了它如何构建 MBean 名称,或者连接器配置更改中的 topic.prefix,则指标集合不会中断,因为指标提取任务使用指定的标签模式来识别连接器。

使用自定义标签的更多优点是,您可以使用反映数据管道架构的标签,以便以适合您操作需求的方式组织指标。例如,您可以使用声明连接器活动类型、应用程序上下文或数据源的值来指定标签,如 db1-streaming-for-application-abc。如果您指定多个键值对,则所有指定的对都会附加到连接器的 MBean 名称中。

以下示例演示了标签如何修改默认的 MBean 名称。

例 2.55. 自定义标签如何修改连接器 MBean 名称

默认情况下,SQL Server 连接器使用以下 MBean 名称进行流传输指标:

debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>

如果将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 会生成以下自定义 MBean 名称:

debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

2.7.6.2. Debezium SQL Server 连接器快照指标

MBeandebezium.sql_server:type=connector-metrics,server= <topic.prefix& gt; ,task= <task.id>,context=snapshot

快照指标不会被公开,除非快照操作活跃,或者快照自上次连接器开始以来发生了某种情况。

下表列出了可用的快照指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个快照事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

此连接器自上一次启动或重置以来看到的事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在快照和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在快照和主 Kafka Connect 循环之间传递事件的队列的可用容量。

TotalTableCount

int

包括在快照中的表的总数。

RemainingTableCount

int

快照必须复制的表数。

SnapshotRunning

布尔值

快照是否已启动。

SnapshotPaused

布尔值

快照是否已暂停。

SnapshotAborted

布尔值

快照是中止的。

SnapshotCompleted

布尔值

快照是否完成。

SnapshotDurationInSeconds

long

快照到目前为止需要的秒数,即使未完成也是如此。也包括快照暂停的时间。

SnapshotPausedDurationInSeconds

long

快照暂停的秒数。如果快照暂停了多次,暂停的时间会增加。

RowsScanned

Map<String, Long>

包含快照中每个表扫描的行数的映射。在处理过程中,表会以递增方式添加到映射中。更新每 10,000 行扫描并完成表后。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

连接器还会在执行增量快照时提供以下附加快照指标:

属性类型描述

ChunkId

字符串

当前快照块的标识符。

ChunkFrom

字符串

定义当前块的主密钥集的低限。

ChunkTo

字符串

定义当前块的主密钥集的上限。

TableFrom

字符串

当前快照表的主密钥集合的低限。

TableTo

字符串

当前快照表的主键集合的上限。

2.7.6.3. Debezium SQL Server 连接器流指标

MBeandebezium.sql_server:type=connector-metrics,server= <topic.prefix& gt; ,task= <task.id>,context=streaming

下表列出了可用的流指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个流事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

源数据库自上次连接器启动后报告的数据更改事件总数,或者因为指标重置后。代表 Debezium 处理的数据更改工作负载。

TotalNumberOfCreateEventsSeen

long

自上次启动或指标重置以来,连接器处理的创建事件总数。

TotalNumberOfUpdateEventsSeen

long

自上次启动或指标重置以来,连接器处理的更新事件总数。

TotalNumberOfDeleteEventsSeen

long

自上次启动或指标重置后,连接器处理的删除事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在流器和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在流程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

Connected

布尔值

表示连接器当前是否已连接到数据库服务器的标记。

MilliSecondsBehindSource

long

最后一次更改事件的时间戳和连接器处理之间的毫秒数。这些值将纳入运行数据库服务器和连接器的机器上时钟之间的差别。

NumberOfCommittedTransactions

long

已提交的已处理事务的数量。

SourceEventPosition

Map<String, String>

最后一次接收的事件的协调。

LastTransactionId

字符串

最后一次处理的事务的事务标识符。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

2.7.6.4. Debezium SQL Server 连接器模式历史记录指标

MBeandebezium.sql_server:type=connector-metrics,context=schema-history,server= <topic.prefix& gt; ,task= <task.id>

下表列出了可用的模式历史记录指标。

属性类型描述

Status

字符串

STOPPED 之一RECOVERING (从存储恢复历史记录)、RUNNING 描述数据库架构历史记录的状态。

RecoveryStartTime

long

恢复开始的时间(以 epoch 秒为单位)。

ChangesRecovered

long

在恢复阶段读取的更改数量。

ChangesApplied

long

恢复和运行时应用的模式更改总数。

MilliSecondsSinceLast​RecoveredChange

long

自上次更改从历史记录存储中恢复后经过的毫秒数。

MilliSecondsSinceLast​AppliedChange

long

从上次更改被应用后经过的毫秒数。

LastRecoveredChange

字符串

从历史记录存储中恢复最后一次更改的字符串表示。

LastAppliedChange

字符串

最后一次应用更改的字符串表示。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.