2.3. MongoDB 的 Debezium 连接器


Debezium 的 MongoDB 连接器跟踪 MongoDB 副本集或 MongoDB sharded 集群,用于记录数据库和集合的更改,将这些更改记录在 Kafka 主题中。连接器会自动处理分片集群中的分片的添加或删除,每个副本集的成员资格更改、每个副本集中的选举,以及等待通信问题的解析。

有关与此连接器兼容的 MongoDB 版本的详情,请参考 Debezium 支持的配置 页面

使用 Debezium MongoDB 连接器的信息和流程组织如下:

2.3.1. Debezium MongoDB 连接器概述

MongoDB 的复制机制提供冗余和高可用性,是在生产环境中运行 MongoDB 的首选方法。MongoDB 连接器捕获副本集或分片集群中的更改。

MongoDB 副本集 由一组服务器组成,这些服务器都有相同数据的副本,并且复制确保客户端在副本集 上记录的所有更改都会正确应用到其他副本集的服务器,称为 secondaries。MongoDB 复制的工作原理是记录其 oplog (或操作日志)中的更改,然后每个第二项都会读取主的 oplog,并应用所有操作到他们自己的文档。当新服务器添加到副本集时,该服务器首先对主上所有数据库和集合执行快照,然后读取主的 oplog,以应用自开始快照后可能所做的所有更改。https://docs.mongodb.com/manual/core/replica-set-sync/当服务器捕获到主 oplog 的尾部时,这个新服务器将变为次要(并能够处理查询)。

2.3.1.1. MongoDB 连接器使用更改流捕获事件记录的描述

虽然 Debezium MongoDB 连接器不会成为副本集的一部分,但它使用类似的复制机制来获取 oplog 数据。主要区别在于连接器不会直接读取 oplog。相反,它会将 oplog 数据的捕获和解码到 MongoDB 更改流 功能。通过更改流,MongoDB 服务器会公开集合中更改作为事件流。Debezium 连接器监控流,然后提供下游更改。当连接器检测到副本集时,它会检查 oplog 以获取最后记录的事务,然后执行主数据库和集合的快照。连接器完成复制数据后,它会创建一个从之前读取的 oplog 位置开始的更改流。

当 MongoDB 连接器处理更改时,它会定期记录事件在 oplog 流中的位置。当连接器停止时,它会记录它处理的最后一个 oplog 流位置,以便重启后它可以从该位置恢复流。换句话说,连接器可以停止、升级或维护,并在以后重启,并始终在不丢失单个事件的情况下准确获取它的位置。当然,MongoDB oplogs 通常以最大大小上限,因此如果连接器长时间停止,则 oplog 中的操作可能会在连接器有机会读取它们前清除。在这种情况下,重启连接器检测到缺少的 oplog 操作,执行快照,然后继续流更改。

MongoDB 连接器也对副本集的成员资格和领导更改、在分片集群中添加或删除分片的改变,以及可能导致通信失败的网络问题。连接器始终使用副本集的主节点来流传输更改,因此当副本集进行选择时,不同的节点会立即停止流传输更改,连接到新的主节点,并使用新的主节点启动流更改。同样,如果连接器无法与副本集主通信,它会尝试重新连接(使用 exponential backoff,以便不破坏网络或副本集)。重新建立连接后,连接器将继续从它捕获的最后事件中更改。这样,连接器会动态调整副本集成员资格更改,并自动处理通信中断。

2.3.1.2. MongoDB 连接器如何使用 MongoDB 读取首选项的描述

您可以通过在 mongodb.connection.string 中设置 readPreference 参数来指定 MongoDB 连接的读取首选项。

2.3.2. Debezium MongoDB 连接器的工作方式

连接器支持的 MongoDB 拓扑概述对规划应用程序非常有用。

以下主题提供有关 Debezium MongoDB 连接器如何工作的详细信息:

2.3.2.1. Debezium 连接器支持的 MongoDB 拓扑

MongoDB 连接器支持以下 MongoDB 拓扑:

MongoDB 副本集

Debezium MongoDB 连接器可以捕获来自单个 MongoDB 副本集 的更改。生产副本集 至少需要三个成员

要将 MongoDB 连接器与副本集一起使用,您必须将连接器配置中的 mongodb.connection.string 属性的值设置为 副本集连接字符串。当连接器准备好开始捕获来自 MongoDB 更改流的更改时,它会启动一个连接任务。然后,connection 任务使用指定的连接字符串来建立连接。

MongoDB 分片集群

MongoDB 分片集群 包括:

  • 一个或多个 分片,每个分片都部署为副本集;
  • 充当 集群配置服务器的独立副本集
  • 客户端需要连接到的一个或多个 routers (也称为 mongos)。它们会将请求路由到相关的分片。

    要将 MongoDB 连接器与分片集群中一起使用,在连接器配置中,将 mongodb.connection.string 属性的值设置为 分片的集群连接字符串

MongoDB 独立服务器
MongoDB 连接器无法监控独立 MongoDB 服务器的更改,因为单机服务器没有 oplog。如果单机服务器转换为一个成员的副本集,则连接器将正常工作。
注意

MongoDB 不建议在生产环境中运行独立服务器。如需更多信息,请参阅 MongoDB 文档

2.3.2.2. Debezium 连接器所需的用户权限

要从 MongoDB 捕获数据,Debezium 以 MongoDB 用户身份附加到数据库。为 Debezium 创建的 MongoDB 用户帐户需要特定的数据库权限才能从数据库读取。连接器用户需要以下权限:

  • 从数据库读取。
  • 运行 hello 命令。

连接器用户可能还需要以下权限:

  • config.shards 系统集合中读取。

数据库读取权限

连接器用户必须能够从所有数据库读取,或者是从特定数据库读取,具体取决于连接器的 capture.scope 属性的值。根据 capture.scope 设置,为用户分配以下权限之一:

capture.scope 设置为 deployment
授予用户读取任何数据库的权限。
capture.scope 设置为 database
授予用户权限来读取连接器的 capture.target 属性指定的数据库。
capture.scope 设置为 collection
授予用户权限来读取连接器的 capture.target 属性指定的集合。
重要

将 Debezium 集合 选项用于 capture.scope 属性是一个开发者技术预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。

有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围

使用 MongoDB hello 命令的权限

无论 capture.scope 设置是什么,用户都需要权限来运行 MongoDB hello 命令。

读取 config.shards 集合的权限

根据您的 Debezium 环境,要启用连接器来执行偏移整合,您必须授予连接器用户明确的权限来读取 config.shards 集合。以下连接器环境需要读取 config.shards 集合的权限:

  • 从 Debezium 2.5 或更早版本升级的连接器。
  • 配置为从分片 MongoDB 集群中捕获更改的连接器。

2.3.2.3. Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称

连接器配置属性 topic.prefix 充当 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称: 作为所有主题名称的前缀,并在记录每个副本集的更改流位置时作为唯一标识符。

您应该为每个 MongoDB 连接器提供一个有意义的描述源 MongoDB 系统的唯一逻辑名称。我们建议逻辑名称以字母或下划线字符开头,剩余的字符为字母或下划线。

2.3.2.4. Debezium MongoDB 连接器如何执行偏移整合

Debezium MongoDB 连接器不再支持到分片 MongoDB 部署的 replica_set 连接。因此,使用 replica_set 连接模式的连接器版本记录的偏移量与当前版本不兼容。

为尽量减少连接模式更改的影响,并防止连接器在升级后运行不必要的快照,它会运行流程来整合偏移。在这个偏移合并过程中,连接器完成以下步骤协调早期连接器版本记录的偏移量:

  1. 由连接器版本超过 2.5 记录的偏移按原样使用。
  2. 在分片的 MongoDB 部署中捕获的分片连接模式或 MongoDB 副本集部署中的事件的偏移将原样使用。
  3. 如果满足以下任一条件,由连接器版本 2.5.x 及更早版本记录的特定于分片的偏移会被原样使用:

    • 所有当前数据库分片的偏移量都存在。
    • 启用 偏移无效
      如果禁用偏移无效,连接器无法启动。
  4. 在连接器处理前面步骤中的现有偏移后,它会恢复流更改,然后为它捕获的新事件提交偏移。
    如果偏移整合过程没有检测到任何现有偏移,则 连接器将执行初始快照

2.3.2.5. Debezium MongoDB 连接器如何执行快照

当 Debezium 任务开始使用副本集时,它会使用连接器的逻辑名称和副本集名称来查找用于描述之前停止读取更改的位置的偏移量。如果找到偏移,并且仍然存在于 oplog 中,则任务会立即进行 流传输更改,从记录的偏移位置开始。

但是,如果没有找到偏移量,或者 oplog 不再包含该位置,任务必须首先通过执行快照来获取副本集内容的当前状态。这个过程从记录 oplog 的当前位置并记录为偏移(以及表示快照已启动的标记)开始。然后,该任务会继续复制每个集合,尽可能生成尽可能多的线程(最多到 snapshot.max.threads 配置属性的值),以并行执行此工作。连接器为每个 文档记录一个单独的读取事件。每个读取事件都包含对象的标识符、对象的完整状态,以及查找对象的 MongoDB 副本集 的源 信息。源信息还包含一个标志,表示事件是在快照过程中生成的。

此快照将继续,直到复制了与连接器过滤器匹配的所有集合。如果在任务快照完成前停止连接器,重启连接器后会再次开始快照。

注意

当连接器执行任何副本集的快照时,尝试避免任务重新分配和重新配置。连接器生成日志消息来报告快照的进度。要提供最大的控制,请为每个连接器运行单独的 Kafka Connect 集群。

您可以在以下部分中找到有关快照的更多信息:

表 2.55. snapshot.mode 连接器配置属性的设置
设置描述

always

连接器在每次启动时都执行快照。快照完成后,连接器将开始流传输后续数据库更改的事件记录。

Initial

连接器启动后,它会执行初始数据库快照。

initial_only

连接器执行数据库快照。快照完成后,连接器会停止,且不会为后续数据库更改流传输事件记录。

never

弃用,请参阅 no_data

no_data

连接器捕获所有相关表的结构,但不创建 READ 事件来代表连接器启动点上设置的数据。

when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

如需更多信息,请参阅连接器配置属性表中的 snapshot.mode

2.3.2.6. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。按照这个初始快照,在正常情况下,连接器不会重复快照过程。任何以后更改连接器捕获的事件数据都会通过流处理。

然而,在某些情况下,在初始快照中获取的连接器可能会变得过时、丢失或不完整。为了提供重新捕获集合数据的机制,Debezium 包含一个执行临时快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行临时快照:

  • 连接器配置会被修改来捕获不同的集合集。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号集合。您可以通过向 Debezium 信号集合发送信号请求来启动临时快照。

当您启动现有集合的临时快照时,连接器会将内容附加到集合已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。

临时快照信号指定要包含在快照中的集合。快照可以捕获数据库的全部内容,或者仅捕获数据库中集合的子集。另外,快照也可以捕获数据库中集合的子集。

您可以通过向信号集合发送 execute-snapshot 消息来指定要捕获的集合。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供快照中包含的集合名称,如下表所述:

表 2.56. 临时 execute-snapshot 信号记录示例
字段默认

type

incremental

指定您要运行的快照类型。
目前,您可以请求 增量阻塞 快照。

data-collections

N/A

包含与快照中包含的集合的完全限定域名匹配的正则表达式的数组。
对于 MongoDB 连接器,使用以下格式来指定集合的完全限定名称: database.collection

additional-conditions

N/A

可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数:

data-collection
过滤器应用到的集合的完全限定名称。您可以为每个集合应用不同的过滤器。
filter
指定在数据库记录中必须存在的列值,以便快照包含它,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

surrogate-key

N/A

可选字符串,用于指定连接器在快照过程中用作集合的主键的列名称。

触发临时增量快照

您可以通过在信号收集中添加带有 execute-snapshot 信号类型的条目来发起临时增量快照,或者向 Kafka 信号发送信号消息。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个集合的开始和端点。根据集合中条目数量以及配置的块大小,Debezium 会将集合分成块,并持续持续对每个块进行快照。

如需更多信息,请参阅 增加快照

触发临时阻塞快照

您可以通过在信号收集或信号主题中添加带有 execute-snapshot 信号类型的条目来启动临时阻塞快照。连接器处理消息后,它会开始快照操作。连接器会临时停止流,然后启动指定集合的快照,遵循它在初始快照中使用的相同进程。快照完成后,连接器会恢复流。

如需更多信息,请参阅 块快照

2.3.2.7. 增量快照

为了在管理快照方面提供灵活性,Debezium 包含一个补充快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,而不是像初始快照一样捕获数据库的完整状态,而是在一系列可配置的块中捕获每个集合。您可以指定您希望快照捕获 的集合以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小是 1024 行。

当增量快照进行时,Debebe 使用水位线来跟踪其进度,维护它捕获的每个集合行的记录。与标准初始快照过程相比,这个分阶段方法捕获数据具有以下优点:

  • 您可以并行运行带有流数据捕获的增量快照,而不是延迟流,直到快照完成为止。连接器将继续在整个快照过程中从更改日志捕获接近实时事件,且操作都会阻止另一个事件。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从它停止的点开始,而不是从开始重新捕获集合。
  • 您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,将集合添加到其 collection.include.list 属性中。

增量快照过程

当您运行增量快照时,Debebe 会按主密钥对每个集合进行排序,然后根据 配置的块大小 将集合分成块。通过块工作的块,然后捕获块中的每个集合行。对于它捕获的每行,快照会发出 READ 事件。该事件代表了块开始快照时所在行的值。

当快照进行时,其他进程可能会继续访问数据库,可能会修改集合记录。要反映此类更改,INSERTUPDATEDELETE 操作会按预期提交到事务日志。同样,持续 Debezium 流过程会继续检测到这些更改事件,并将对应的更改事件记录发送到 Kafka。

Debezium 如何处理具有相同主键的记录冲突

在某些情况下,streaming 进程发出的 UPDATEDELETE 事件会按顺序接收。也就是说,流处理可能会发出一个事件,在快照捕获包含该行的 READ 事件前修改集合行。当快照最终为行发出对应的 READ 事件时,其值已经被取代。为确保以正确的逻辑顺序处理出序列的增量快照事件,Debezium 采用缓冲方案来解决冲突。只有在快照事件和流事件之间冲突后,才会解析 Debezium 向 Kafka 发出事件记录。

快照窗口

为了帮助解决 late-arriving READ 事件和修改同一集合行之间的冲突,Debezium 会使用一个所谓的 快照窗口。快照窗口分离间隔,在此期间会捕获指定集合块的数据。在块的快照窗口打开前,Debezium 会遵循其通常的行为,并将事件从下游直接发送到目标 Kafka 主题。但是,从为特定块的快照打开,直到它关闭为止,Deduplication 会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。

对于每个数据收集,Debebe 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表捕获的快照记录被发送为 READ 操作。同时,当用户继续更新数据收集中的记录,并且更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debebe 开始处理快照块,它会向内存缓冲提供快照记录。在快照窗口中,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果没有找到匹配项,则流的事件记录直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃 buffered READ 事件,并将流记录写入目标主题,因为以逻辑方式取代静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到集合的 Kafka 主题。

连接器会为每个快照块重复这个过程。

目前,您可以使用以下任一方法启动增量快照:

警告

增量快照要求每个表的主键完全排序。因为 String 字段可以包含特殊字符,且受到不同编码的约束,所以基于字符串的主键无法以一致且可预测的顺序排序。在执行增量快照时,最好将主键设置为 String 以外的数据类型。

有关 MongoDB 中 BSON 字符串类型的更多信息,请参阅 MongoDB 文档

分片集群的增量快照

要将增量快照与分片的 MongoDB 集群一起使用,您必须将 incremental.snapshot.chunk.size 设置为足够高的值,以便 增加更改流管道的复杂性

2.3.2.7.1. 触发增量快照

要启动增量快照,您可以发送 临时快照信号 到源数据库上的信号集合。

您可以使用 MongoDB insert () 方法向信号集合提交信号。

在 Debezium 检测到信号集合中的更改后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的集合,并可选择性地指定快照操作类型。目前,快照操作的唯一有效选项是 增量阻止 的。

要指定要包含在快照中的集合,提供一个 data-collections 数组,列出用于匹配集合的集合或一组正则表达式,例如
{"data-collections": ["public.Collection1", "public.Collection2"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debebe 会检测不需要任何操作,且不会执行快照。

注意

如果要包含在快照中的集合名称在数据库、模式或表名称中包含点(.),要将集合添加到 data-collections 数组中,您必须使用双引号转义名称的每个部分。

例如,要包含 公共 数据库中存在的数据收集,并且名称为 My.Collection,请使用以下格式:" public"."My.Collection"

先决条件

使用源信号频道触发增量快照

  1. 在信号集合中插入快照信号文档:

    <signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>, "additional-conditions" : [{"data-collections" : "<collectionName>", "filter" : "<additional-condition>"}] }});
    Copy to clipboard

    例如,

    db.debeziumSignal.insert({ 1
    "type" : "execute-snapshot", 2 3
    "data" : {
    "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4
    "type": "incremental"} 5
    "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); 6
    });
    Copy to clipboard

    命令中的 id键入data 参数的值 与信号集合的字段 相对应。

    下表描述了示例中的参数:

    表 2.57. MongoDB insert ()命令中的字段描述,用于将增量快照信号发送到信号集合
    描述

    1

    db.debeziumSignal

    指定源数据库上信号集合的完全限定名称。

    2

    null

    _id 参数指定一个任意字符串,它被分配为信号请求的 id 标识符。
    上例中的 insert 方法省略了可选 _id 参数的使用。由于文档没有为参数明确分配值,因此 MongoDB 自动分配给文档的任意 ID 将变为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号集合中的条目。Debezium 不使用此标识符字符串。相反,在快照过程中,Debebe 会生成自己的 id 字符串作为水位线信号。

    3

    execute-snapshot

    指定 type 参数,指定信号要触发的操作。

    4

    data-collections

    信号的必需组件,用于指定集合名称或正则表达式数组,以匹配快照中包含的集合名称。
    数组列出了通过完全限定名称与集合匹配的正则表达式,其使用与您用来在 signal.data.collection 配置属性中指定连接器信号集合的名称相同。

    5

    incremental

    data 字段的可选类型 组件,用于指定要运行的快照操作类型。
    目前支持 incrementalblocking 类型。
    如果没有指定值,连接器将运行一个增量快照。

    6

    additional-conditions

    可选数组,指定连接器评估的一组额外条件,以确定要包含在快照中的记录子集。
    additional-conditions 数组中的每个元素都是包含以下键的对象:

    data-collection:: 将应用过滤器的数据收集的完全限定域名。过滤:: 指定必须存在于数据收集记录中的列值,以便快照包含它,例如 "color='blue' "。

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增加快照事件信息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654962",
    "ts_ns":"1620393591654962147",
    "transaction":null
}
Copy to clipboard

字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效的选项是 阻塞增量
在提交信号集合的 SQL 查询中指定 type 值是可选的。
如果没有指定值,连接器将运行一个增量快照。

2

op

指定事件类型。
快照事件的值是 r,表示 READ 操作。

2.3.2.7.2. 使用 Kafka 信号频道触发增量快照

您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

信号类型是 execute-snapshotdata 字段必须具有以下字段:

表 2.58. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 支持 incrementalblocking 类型。
详情请查看下一部分。

data-collections

N/A

以逗号分隔的正则表达式,与快照中包含的表的完全限定域名匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式来指定名称。

additional-conditions

N/A

可选的附加条件数组,用于指定连接器评估以指定快照中包含的记录子集的条件。
每个额外的条件都是一个对象,用于指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数: data-collection::过滤器应用到的集合的完全限定域名。您可以为每个集合应用不同的过滤器。过滤:: specifys 列值必须存在于数据库记录中才能包括快照,例如 "color='blue' "。

您分配给 filter 参数的值是您在为阻塞快照设置 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中指定的值。

例 2.17. execute-snapshot Kafka 信息

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Copy to clipboard

带有额外条件的临时增量快照

Debezium 使用 additional-conditions 字段来选择集合内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM &lt ;tableName&gt; …​.

当快照请求包含 additional-conditions 属性时,属性的 data-collectionfilter 参数会附加到 SQL 查询中,例如:

SELECT * FROM &lt ;data-collection> WHERE & lt;filter&gt; …​.

例如,如果某个 产品 集合带有列 ID (主键)、颜色 和品牌,如果您希望快照只包含 color='blue' 的内容,当您请求快照时,您可以添加 additional-conditions 属性来过滤内容:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
Copy to clipboard

您还可以使用 additional-conditions 属性来根据多个列传递条件。例如,使用与上例中的相同 产品 集合,如果您希望快照只包含 color='blue'brand='MyBrand' 的产品集合中的内容,您可以发送以下请求:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Copy to clipboard
2.3.2.7.3. 停止增量快照

在某些情况下,可能需要停止增量快照。例如,您可能意识到快照没有被正确配置,或者您可能要确保资源可用于其他数据库操作。您可以通过向源数据库上的集合发送信号来停止已在运行的快照。

您可以通过将停止快照信号文档插入到信号集合中,向信号集合提交停止快照信号。您提交的 stop 快照信号将快照操作的类型指定为 增量,并且可选指定要从当前运行的快照中省略的集合。在 Debezium 检测到信号集合中的更改后,它会读取信号,并在进行中时停止增量快照操作。

其他资源

您还可以通过向 Kafka 信号发送 JSON 消息来停止增量快照

先决条件

使用源信号频道停止增量快照

  1. 在信号集合中插入停止快照信号文档:

    <signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});
    Copy to clipboard

    例如,

    db.debeziumSignal.insert({ 1
    "type" : "stop-snapshot", 2 3
    "data" : {
    "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4
    "type": "incremental"} 5
    });
    Copy to clipboard

    signal 命令中的 idtypedata 参数的值与 信号集合的字段相对应

    下表描述了示例中的参数:

    表 2.59. insert 命令中的字段描述,用于将停止增量快照文档发送到信号集合
    描述

    1

    db.debeziumSignal

    指定源数据库上信号集合的完全限定名称。

    2

    null

    上例中的 insert 方法省略了可选 _id 参数的使用。由于文档没有为参数明确分配值,因此 MongoDB 自动分配给文档的任意 ID 将变为信号请求的 id 标识符。
    使用此字符串来识别将日志消息记录到信号集合中的条目。Debezium 不使用此标识符字符串。

    3

    stop-snapshot

    type 参数指定信号要触发的操作。

    4

    data-collections

    信号的可选组件,用于指定集合名称或正则表达式数组,以匹配要从快照中删除的集合名称。
    数组列出了正则表达式,该表达式通过其完全限定名称匹配,格式为 database.collection

    如果您从 data 字段中省略 data-collections 数组,信号将停止正在进行的整个增量快照。

    5

    incremental

    信号的必需组件,用于指定要停止的快照操作类型。
    目前,唯一有效的选项为 增量
    如果没有指定 类型 值,信号将无法停止增量快照。

2.3.2.7.4. 使用 Kafka 信号频道停止增量快照

您可以向 配置的 Kafka 信号主题 发送信号消息,以停止临时增量快照。

Kafka 消息的密钥必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是带有 typedata 字段的 JSON 对象。

signal 类型是 stop-snapshotdata 字段必须具有以下字段:

表 2.60. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前 Debezium 只支持 incremental 类型。
详情请查看下一部分。

data-collections

N/A

可选的、以逗号分隔的正则表达式,与表的完全限定名称匹配,该表数组是集合名称或正则表达式,以匹配要从快照中删除的集合名称。
使用格式 database.collection 指定集合名称。

以下示例显示了典型的 stop-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
Copy to clipboard

2.3.2.8. 阻塞快照

为了在管理快照方面提供更多灵活性,Debezium 包含一个额外的临时快照机制,称为 阻塞快照。阻塞快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

阻塞快照的行为与 初始快照 相似,但您可以在运行时触发快照。

您可能想要在以下情况下运行阻塞快照,而不是使用标准初始快照过程:

  • 您可以添加新集合,并在连接器运行时完成快照。
  • 您可以添加大型集合,并且您希望快照在短时间内完成,而不是通过增量快照完成。

阻塞快照过程

当您运行阻塞快照时,Debebe 会停止流,然后启动指定集合的快照,遵循它在初始快照过程中使用的同一进程。快照完成后,会恢复流。

配置快照

您可以在信号 的数据 组件中设置以下属性:

  • data-collections :指定哪个集合必须是快照。
  • data-collections:指定您希望快照包含的集合。
    此属性接受与完全限定集合名称匹配的正则表达式列表。属性的行为与 table.include.list 属性的行为类似,它指定要捕获在阻塞快照中的表。
  • additional-conditions :您可以为不同的集合指定不同的过滤器。

    • data-collection 属性是要应用过滤器的集合的完全限定名称,并可区分大小写或区分大小写,具体取决于数据库。
    • filter 属性将具有与 snapshot.select.statement.overrides 时使用的相同值,即条件应当匹配的集合的完全限定域名。

例如:

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
Copy to clipboard

可能的副本

您发送信号触发快照的时间之间可能会有延迟,以及流停止和快照启动时的时间。因此,在快照完成后,连接器可能会发出一些由快照捕获的重复记录的事件记录。

2.3.2.9. Debezium MongoDB 连接器流如何更改事件记录

在副本集的连接器任务记录偏移后,它会使用偏移来决定 oplog 中应启动流更改的位置。然后,任务(取决于配置)连接到副本集的主节点,或连接到副本集的更改流,并从那个位置开始流流。它处理所有创建、插入和删除操作,并将它们转换为 Debezium 更改事件。每个更改事件都包含在找到操作的 oplog 中的位置,连接器会定期记录它作为其最新的偏移量。记录偏移的时间间隔由 offset.flush.interval.ms 管理,它是一个 Kafka Connect worker 配置属性。

当连接器安全停止时,会记录最后的偏移量,以便在重启时,连接器将继续完全关闭的位置。如果连接器的任务意外终止,则任务可能会在最后一次记录偏移后处理并生成的事件,但在记录最后一个偏移前;重启时,连接器从上次 记录 偏移开始,可能会生成之前在崩溃之前生成的相同事件。

注意

当 Kafka 管道中的所有组件始终始终正常运行时,Kafka 用户会完全接收每个消息 一次。但是,当出现问题时,Kafka 只能保证消费者 至少接收一次 每个消息。为避免意外的结果,消费者必须能够处理重复的消息。

如前文所述,连接器任务总是使用副本集的主节点从 oplog 中流更改,确保连接器会尽可能看到最新的操作,并可以捕获延迟低于第二个节点的更改。当副本集选择新的主时,连接器会立即停止流更改,连接到新的主节点,并在同一位置开始从新主节点流更改。同样,如果连接器遇到与副本集成员通信的问题,它会尝试使用 exponential backoff 来重新连接,以便不造成副本集,并在连接后继续流传输更改。这样,连接器就可以动态调整副本集成员资格的更改,并自动处理通信失败。

总而言之,MongoDB 连接器可在大多数情况下继续运行。通信问题可能会导致连接器等待问题解决为止。

2.3.2.10. MongoDB 支持在 Debezium 更改事件中填充 before 字段

在 MongoDB 6.0 及更高版本中,您可以配置更改流,以发出文档的预镜像状态,以填充 MongoDB 更改事件的 before 字段。要在 MongoDB 中启用预镜像,您必须使用 db.createCollection(), create, 或 collMod 为集合设置 changeStreamPreAndPostImages。要启用 Debezium MongoDB 在更改事件中包含预镜像,请将连接器的 capture.mode 设置为一个 *_with_pre_image 选项之一。

MongoDB 更改流事件的大小限制

MongoDB 更改流事件的大小限制为 16MB。因此,使用预镜像会增加超过这个阈值的可能性,这可能会导致失败。有关如何避免超过更改流限制的详情,请参考 MongoDB 文档

2.3.2.11. 接收 Debezium MongoDB 更改事件记录的 Kafka 主题的默认名称

MongoDB 连接器将所有插入、更新和删除操作的事件写入每个集合中文档到单个 Kafka 主题。Kafka 主题的名称始终使用 logicalName.databaseName.collectionName 格式,其中 logicalName 是连接器的逻辑名称(使用 topic.prefix 配置属性指定),databaseName 是创建发生在的数据库的名称,collectionName 是受影响的文档所在的 MongoDB 集合的名称。

例如,假设一个 MongoDB 副本集有一个 inventory 数据库,其中包含四个集合:products, products_on_hand, customers, 和 orders。如果监控这个数据库的连接器有一个逻辑名称 fulfillment,则这个连接器会在这四个 Kafka 主题上生成事件:

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

请注意,主题名称不包含副本集名称或分片名称。因此,对分片集合的所有更改(每个分片包含集合文档的子集)都会进入同一 Kafka 主题。

您可以根据需要将 Kafka 设置为自动创建主题。如果没有,则必须在启动连接器前使用 Kafka 管理工具创建主题。

2.3.2.12. Debezium MongoDB 连接器的事件键控制主题分区

MongoDB 连接器不会明确决定如何对事件进行分区。相反,它允许 Kafka 根据事件键决定如何分区主题。您可以通过在 Kafka Connect worker 配置中定义 分区 实现来更改 Kafka 的分区逻辑。

Kafka 只为写入单个主题分区的事件维护总数。按键对事件进行分区意味着所有具有相同键的事件始终进入同一分区。这样可确保特定文档的所有事件始终完全排序。

2.3.2.13. Debezium MongoDB 连接器生成的事件代表事务边界

Debezium 可以生成代表事务元数据边界和丰富的更改数据事件消息的事件。

Debezium 接收事务元数据时的限制

Debezium 注册并接收部署连接器后发生的事务的元数据。部署连接器前发生的事务元数据不可用。

对于每个事务 BEGINEND,Debebe 会生成一个包含以下字段的事件:

status
BEGINEND
id
唯一事务标识符的字符串表示。
event_count (用于 END 事件)
事务发出的事件总数。
data_collections (用于 END 事件)
data_collectionevent_count 的数组,提供源自给定数据收集的更改发出的事件数。

以下示例显示了一个典型的信息:

{
  "status": "BEGIN",
  "id": "1462833718356672513",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "1462833718356672513",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "rs0.testDB.collectiona",
      "event_count": 1
    },
    {
      "data_collection": "rs0.testDB.collectionb",
      "event_count": 1
    }
  ]
}
Copy to clipboard

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic. prefix>.transaction 的主题

更改数据事件增强

如果启用了事务元数据,数据消息 Envelope 会增加一个新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串表示。
total_order
事件在事务生成的所有事件间的绝对位置。
data_collection_order
事件在事务发送的所有事件中的每个数据收集位置。

以下是消息的一个示例:

{
  "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335486",
  "ts_ns": "1580390884335486281",
  "transaction": {
    "id": "1462833718356672513",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
Copy to clipboard

2.3.3. Debezium MongoDB 连接器数据更改事件的描述

Debezium MongoDB 连接器为每个文档级操作生成一个数据更改事件,用于插入、更新或删除数据。每个事件包含一个键和值。键的结构和值取决于更改的集合。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间变化,用户很难处理。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,消费者可以用来从 registry 获取 schema 的模式 ID。这使得每个事件自包含。

以下框架 JSON 显示了更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect 转换器决定了更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema 字段才会处于 change 事件中。同样,只有在将转换器配置为生成它时,事件键和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
Copy to clipboard
表 2.61. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是事件键的一部分。它指定一个 Kafka Connect 模式,它描述了事件键的 payload 部分中的内容。换句话说,第一个 模式 字段描述了更改的文档的密钥结构。

2

payload

第一个 payload 字段是事件键的一部分。它有上一个 schema 字段描述的结构,其中包含已更改的文档的密钥。

3

schema

第二个 schema 字段是事件值的一部分。它指定 Kafka Connect 模式,它描述了事件值的 payload 部分中的内容。换句话说,第二个 模式 描述了更改的文档的结构。通常,此模式包含嵌套的模式。

4

payload

第二个 payload 字段是事件值的一部分。它有上一个 schema 字段描述的结构,其中包含更改的文档的实际数据。

默认情况下,连接器流将事件记录改为带有与事件原始集合相同的名称的主题。请参阅 主题名称

警告

MongoDB 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和数据库和集合名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将被一个下划线字符替代。

如果逻辑服务器名称、数据库名称或集合名称包含无效字符,则这可能会导致意外冲突,并且区分名称的唯一字符无效,因此用下划线替换。

如需更多信息,请参阅以下主题:

2.3.3.1. 关于 Debezium MongoDB 更改事件中的键

更改事件的密钥包含更改文档的密钥和更改文档的实际键的 schema。对于给定集合,schema 及其对应有效负载都包含一个 id 字段。此字段的值是文档的标识符,表示为来自 MongoDB 扩展 JSON 序列化严格模式的字符串

考虑一个连接器,其逻辑名称为 fulfillment,包括一个 inventory 数据库的副本集,以及包含如下文档的 customers 集合。

文档示例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}
Copy to clipboard

更改事件键示例

捕获对 customers 集合的更改的每个更改事件都有相同的事件关键模式。只要 customers 集合有以前的定义,捕获 customers 集合更改的事件都有以下关键结构:在 JSON 中,类似如下:

{
  "schema": { 1
    "type": "struct",
    "name": "fulfillment.inventory.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
      {
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": { 5
    "id": "1004"
  }
}
Copy to clipboard
表 2.62. 更改事件键的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 payload 部分中的内容。

2

fulfillment.inventory.customers.Key

定义键有效负载结构的 schema 名称。这个模式描述了更改的文档的密钥结构。Key 模式名称的格式是 connector-name.database-name.collection-name.Key。在本例中:

  • fulfillment 是生成此事件的连接器的名称。
  • Inventory 是包含已更改的集合的数据库。
  • 客户 是包含更新的文档的集合。

3

optional

指明事件键是否在其 payload 字段中包含一个值。在本例中,需要键有效负载中的值。当文档没有键时,键有效负载字段中的值是可选的。

4

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、类型以及是否需要。

5

payload

包含生成此更改事件的文档的密钥。在本例中,键包含类型为 字符串 的单个 id 字段,其值为 1004

这个示例使用带有整数标识符的文档,但任何有效的 MongoDB 文档标识符的工作方式相同,包括文档标识符。对于文档标识符,事件键的 payload.id 值是一个字符串,代表更新的文档的原始 _id 字段作为使用严格的模式的 MongoDB 扩展 JSON 序列化。下表提供了如何表示不同类型的 _id 字段的示例。

表 2.63. 在事件键有效负载中表示文档 _id 字段的示例
类型MongoDB _id密钥的有效负载

整数

1234

{ "id" : "1234" }

浮点值

12.34

{ "id" : "12.34" }

字符串

"1234"

{ "id" : "\"1234\"" }

文档

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

ObjectId

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

二进制

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }

2.3.3.2. 关于 Debezium MongoDB 更改事件中的值

更改事件中的值比键复杂一些。与键一样,值也有一个 schema 部分和一个 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的操作的事件,它们都有带有 envelope 结构的值 payload。

考虑用于显示更改事件键示例相同的示例文档:

文档示例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}
Copy to clipboard

每个事件类型描述了更改本文档的更改值部分:

创建 事件

以下示例显示了一个更改事件的值部分,连接器为在 customer 集合中创建数据的操作生成的更改事件值部分:

{
    "schema": { 1
      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 2
          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_us"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ns"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 3
          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_us"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ns"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 4
      },
    "payload": { 5
      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 6
      "source": { 7
        "version": "3.0.8.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "ts_ms": 1558965508000000,
        "ts_ms": 1558965508000000000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c", 8
      "ts_ms": 1558965515240, 9
      "ts_us": 1558965515240142, 10
      "ts_ns": 1558965515240142879, 11
    }
  }
Copy to clipboard
表 2.64. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,它描述了值有效负载的结构。在连接器为特定集合生成的更改事件时,更改事件的值 schema 都相同。

2

name

schema 部分中,每个 name 字段指定值有效负载中的字段的 schema。

io.debezium.data.Json 是有效负载的、补丁和 过滤器 字段的 schema。这个模式是针对 customers 集合的。create 事件是包含 after 字段的唯一事件类型。update 事件包含一个 filter 字段和 patch 字段。delete 事件包含一个 filter 字段,但不包括 after 字段或 patch 字段。

3

name

io.debezium.connector.mongo.Source 是有效负载 source 字段的 schema。这个模式特定于 MongoDB 连接器。连接器用于它生成的所有事件。

4

name

dbserver1.inventory.customers.Envelope 是负载总体结构的模式,其中 dbserver1 是连接器名称,inventory 是数据库,customers 是集合。这个模式特定于集合。

5

payload

值的实际数据。这是更改事件提供的信息。

似乎,事件的 JSON 表示比它们描述的文档大得多。这是因为 JSON 表示必须包含 schema 和 message 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的消息大小。

6

after

指定事件发生后文档状态的可选字段。在本例中,after 字段包含新文档的 _idfirst_namelast_nameemail 字段的值。after 值始终是一个字符串。按照惯例,它包含文档的 JSON 表示。MongoDB oplog 条目仅包含 _create_ 事件的完整文档,当 capture.mode 选项被设置为 change_streams_update_full 时还包括 update 事件的文档; 换句话说,无论 capture.mode 选项是什么,create 事件是唯一包含 after 字段的事件类型。

7

source

描述事件源元数据的强制字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、发生事件的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成的事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含新文档的集合和数据库的名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行更改时的时间戳,并在时间戳内发生事件。
  • MongoDB 操作的唯一标识符( oplog 事件中的 h 字段)。
  • 在事务中执行更改时,MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符(仅更改流捕获模式)。

8

op

描述导致连接器生成事件的操作类型的强制字符串。在本例中,c 表示操作创建了文档。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = 读取(仅适用于快照)

9

ts_ms

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

10

ts_us

可选字段,显示连接器处理事件的时间(以微秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

9

ts_ns

可选字段,显示连接器处理事件的时间,以纳秒为单位。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

更改流捕获模式

示例 customers 集合中一个更新的改变事件的值有与那个集合的 create 事件相同的模式。同样,事件值的有效负载具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。只有在 capture.mode 选项被设置为 change_streams_update_full 时,update 事件才会包括一个 after 值。如果 capture.mode 选择被设置为 *_with_pre_image 选项之一,会提供一个 before 值。在这种情况下,有一个新的结构化字段 updateDescription,它有几个额外的字段:

  • updatedFields 是一个字符串字段,其中包含更新的文档字段的 JSON 表示及其值
  • removedFields 是一个从文档中删除的字段名称列表
  • truncatedArrays 是已截断的文档中的数组列表

以下是当连接器在 customers 集合中为更新生成的更改事件值的示例:

{
    "schema": { ... },
    "payload": {
      "op": "u", 1
      "ts_ms": 1465491461815, 2
      "ts_us": 1465491461815698, 3
      "ts_ns": 1465491461815698142, 4
      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 5
      "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 6
      "updateDescription": {
        "removedFields": null,
        "updatedFields": "{\"first_name\": \"Anne Marie\"}", 7
        "truncatedArrays": null
      },
      "source": { 8
        "version": "3.0.8.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "ts_us": 1558965508000000,
        "ts_ns": 1558965508000000000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 1,
        "h": null,
        "tord": null,
        "stxnid": null,
        "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
        "txnNumber":1
      }
    }
  }
Copy to clipboard
表 2.65. 更新 事件值字段的描述
字段名称描述

1

op

描述导致连接器生成事件的操作类型的强制字符串。在本例中,u 表示操作更新了文档。

2

ts_ms,ts_us,ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

3

before

包含更改前实际 MongoDB 文档的 JSON 字符串表示。

如果捕获模式没有设置为 *_with_preimage 选项之一,则 update 事件值不包含 before 字段。

4

after

包含实际 MongoDB 文档的 JSON 字符串表示。
如果捕获模式没有设置为 change_streams_update_full,则 update 事件值不会包含 after 字段

5

updatedFields

包含文档更新字段值的 JSON 字符串表示。在本例中,更新会将 first_name 字段改为新值。

6

source

描述事件源元数据的强制字段。此字段包含与同一集合的 create 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成的事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含更新的文档的集合和数据库的名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行更改时的时间戳,并在时间戳内发生事件。
  • 如果在事务中执行更改,MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符。
警告

事件中的 after 值应作为文档的时间点值进行处理。该值不会被动态计算,但是从集合中获取的。因此,如果多个更新一个紧随另一个发生,则所有 update 事件都会包含在文档中存储的代表最后的值相同的 after 值。

如果您的应用程序依赖于逐步更改演进,则应该只依赖 updateDescription

删除 事件

delete 更改事件中的值与为同一集合的 createupdate 事件相同的 schema 部分。delete 事件中的 payload 部分包含与为同一集合的 createupdate 事件不同的值。特别是,delete 事件既不包含 after 值,也不包含 updateDescription 值。以下是 customers 集合中文档的 delete 事件示例:

{
    "schema": { ... },
    "payload": {
      "op": "d", 1
      "ts_ms": 1465495462115, 2
      "ts_us": 1465495462115748, 3
      "ts_ns": 1465495462115748263, 4
      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",5
      "source": { 6
        "version": "3.0.8.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "ts_us": 1558965508000000,
        "ts_ns": 1558965508000000000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
Copy to clipboard
表 2.66. 删除 事件值字段的描述
字段名称描述

1

op

描述操作类型的强制字符串。op 字段值为 d,表示此文档已被删除。

2

ts_ms,ts_us.ts_ns

显示连接器处理事件的时间字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示更改在数据库中的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

3

before

包含更改前实际 MongoDB 文档的 JSON 字符串表示。

如果捕获模式没有设置为 *_with_preimage 选项之一,则 update 事件值不包含 before 字段。

4

source

描述事件源元数据的强制字段。此字段包含与同一集合的 createupdate 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成的事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含已删除文档的集合和数据库的名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行更改时的时间戳,并在时间戳内发生事件。
  • MongoDB 操作的唯一标识符( oplog 事件中的 h 字段)。
  • 在事务中执行更改时,MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符(仅更改流捕获模式)。

MongoDB 连接器事件设计为与 Kafka 日志压缩 一起使用。日志压缩支持删除一些旧的信息,只要至少保留每个密钥的最新消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

tombstone 事件

唯一标识的文档的所有 MongoDB 连接器事件都完全相同。删除文档时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,为了使 Kafka 删除具有该键的所有信息,消息值必须是 null。为了实现此目的,在 Debezium 的 MongoDB 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但一个 null 值。tombstone 事件会通知 Kafka 删除所有具有相同键的消息。

2.3.4. 设置 MongoDB 以使用 Debezium 连接器

MongoDB 连接器使用 MongoDB 的更改流捕获更改,因此连接器只能用于 MongoDB 副本集,或用于每个分片是单独的副本集的分片集群。有关设置 副本集或 分片集群的信息,请参阅 MongoDB 文档。另外,请确保了解如何使用副本集启用 访问控制和身份验证

您还必须有一个 MongoDB 用户,它具有适当的角色才能读取 oplog 可以读取的 admin 数据库。此外,用户还必须能够读取分片集群的配置服务器中配置数据库,并且必须具有 listDatabases 特权操作。当使用更改流时(默认),用户还必须具有集群范围的特权操作 findchangeStream

当您打算使用 pre-image 并填充 before 字段时,您需要首先为一个集合启用 changeStreamPreAndPostImages,使用 db.createCollection(), create, 或 collMod

最佳 Oplog 配置

Debezium MongoDB 连接器读取 更改流,以获取副本集的 oplog 数据。由于 oplog 是一个固定大小,上限的集合,如果超过其最大配置的大小,它开始覆盖其最旧的条目。如果因为任何原因停止连接器,它会在重启时,它会尝试从最后的 oplog 流位置恢复流。但是,如果从 oplog 中删除了最后一个流位置,具体取决于连接器的 snapshot.mode 属性中指定的值,连接器可能无法启动,报告 无效的恢复令牌错误。如果失败,您必须创建新的连接器,以便 Debezium 继续从数据库中捕获记录。如需更多信息,请参阅 如果 snapshot.mode 设为 initial,则连接器在停止了较长的时间间隔后失败

为确保 oplog 保留 Debezium 恢复流所需的偏移值,您可以使用以下任一方法:

  • 增加 oplog 的大小。根据您的典型工作负载,将 oplog 大小设置为一个大于每小时峰值 oplog 条目的值。
  • 增加 oplog 条目保留的最短小时数 (MongoDB 4.4 及更高版本)。此设置基于时间,因此保证最后 n 小时中的条目可用,即使 oplog 达到其配置的最大值。虽然这通常是首选选项,但对于具有接近容量高负载的集群,请指定最大 oplog 大小。

为了帮助防止与缺少 oplog 条目相关的故障,跟踪报告复制行为的指标非常重要,并优化 oplog 大小以支持 Debezium。特别是,您应该监控 Oplog GB/Hour 和 Replication Oplog 窗口的值。如果 Debezium 在超过 replication oplog 窗口值的间隔离线,并且主 oplog 增长速度比 Debezium 可以消耗条目快,则连接器失败可能会导致。

有关如何监控这些指标的详情,请查看 MongoDB 文档

最好将最大 oplog 大小设置为基于 oplog 的每小时增长的值(Oplog GB/Hour),乘以可能需要解决 Debezium 失败的时间。

也就是说,

Debezium 失败的 oplog GB/Hour X average 反应时间

例如,如果 oplog 大小限制被设置为 1GB,并且 oplog 每小时增长 3GB,则 oplog 条目会每小时清除三次。如果 Debezium 在这段时间内失败,则其最后一个 oplog 位置可能会被删除。

如果 oplog 以 3GB/小时的速度增长,并且 Debezium 在两个小时内离线,您可以将 oplog 大小设置为 3GB/小时 X 2 小时或 6GB。

2.3.5. 部署 Debezium MongoDB 连接器

您可以使用以下任一方法部署 Debezium MongoDB 连接器:

2.3.5.1. 使用 Apache Kafka 的 Streams 部署 MongoDB 连接器

部署 Debezium 连接器的首选方法是使用 Streams for Apache Kafka 来构建包含连接器插件的 Kafka Connect 容器镜像。

在部署过程中,您要创建和使用以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR,并包含有关镜像中包含的连接器工件的信息。
  • 提供包括连接器用来访问源数据库的信息的 KafkaConnector CR。在 Apache Kafka 的 Streams 启动 Kafka Connect pod 后,您可以通过应用 KafkaConnector CR 来启动连接器。

在 Kafka Connect 镜像的构建规格中,您可以指定用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 Apache Kafka 的 Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。

KafkaConnect CR 中的 spec.build.output 参数指定在存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在容器 registry 中,如 quay.io 或 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。

注意

如果使用 KafkaConnect 资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。

其他资源

2.3.5.2. 使用 Streams for Apache Kafka 部署 Debezium MongoDB 连接器

对于 Apache Kafka 的早期版本,要在 OpenShift 上部署 Debezium 连接器,首先需要为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 Apache Kafka 的 Streams 中的构建配置,来自动构建包含您要使用的 Debezium 连接器插件的 Kafka Connect 容器镜像。

在构建过程中,Apache Kafka Operator 的 Streams 将 KafkaConnect 自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。

新创建的容器被推送到 .spec.build.output 中指定的容器 registry,并用于部署 Kafka Connect 集群。在 Apache Kafka 的 Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。

先决条件

  • 您可以访问安装了集群 Operator 的 OpenShift 集群。
  • Apache Kafka Operator 的 Streams 正在运行。
  • 部署了 Apache Kafka 集群,如 在 OpenShift 中部署和管理 Apache Kafka 的流 中所述。
  • Kafka Connect 部署在 Apache Kafka 的 Streams 中
  • 您有一个红帽构建的 Debezium 许可证。
  • OpenShift oc CLI 客户端已安装,或者您可以访问 OpenShift Container Platform Web 控制台。
  • 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:

    将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
    • 在 registry 中创建和管理镜像的帐户和权限。
    将构建镜像存储为原生 OpenShift ImageStream

流程

  1. 登录 OpenShift 集群。
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有的资源。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 metadata.annotationsspec.build 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

    例 2.18. 定义包含 Debezium 连接器的 KafkaConnect 自定义资源的 dbz-connect.yaml 文件

    在以下示例中,自定义资源被配置为下载以下工件:

    • Debezium MongoDB 连接器存档。
    • 红帽构建的 Apicurio Registry 归档。Apicurio Registry 是一个可选组件。只有在打算将 Avro serialization 与连接器一起使用时,才添加 Apicurio Registry 组件。
    • Debezium 脚本 SMT 归档以及您要用于 Debezium 连接器的相关脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才添加这些组件。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.9.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-mongodb
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mongodb/3.0.8.Final-redhat-00004/debezium-connector-mongodb-3.0.8.Final-redhat-00004-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.5.11.redhat-00001/apicurio-registry-distro-connect-converter-2.5.11.redhat-00001.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/3.0.8.Final-redhat-00004/debezium-scripting-3.0.8.Final-redhat-00004.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    Copy to clipboard
    表 2.67. Kafka Connect 配置设置的描述
    描述

    1

    strimzi.io/use-connector-resources 注解设置为 "true",以便 Cluster Operator 使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定存储构建镜像的位置,并列出镜像中包含的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建的镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值为 docker,可推送到容器 registry (如 Docker Hub 或 Quay)或 镜像流 (用于将镜像推送到内部 OpenShift ImageStream)。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅 Streams for Apache Kafka API 参考中的 Build schema 参考

    5

    插件配置 列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括要用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定 artifacts.url 中指定的工件的文件类型。有效类型是 ziptgz、或 jar。Debezium 连接器存档以 .zip 文件格式提供。type 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Red Hat Maven 存储库中提供了 Debezium 连接器工件。OpenShift 集群必须有权访问指定的服务器。

    8

    (可选)指定下载 Apicurio Registry 组件的工件 类型和 url。包括 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化事件键和值,使用红帽构建的 Apicurio Registry 的值,而不是使用默认的 JSON 转换。

    9

    (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您还必须部署 JSR 223 兼容脚本实施,如 groovy。

    10

    (可选)指定与 JSR 223 脚本实施的 JAR 文件的工件 类型和 url,这是 Debezium 脚本 SMT 所需的。

    重要

    如果您使用 Streams for Apache Kafka 将连接器插件合并到 Kafka Connect 镜像中,对于每个所需的脚本语言组件 artifacts.url 必须指定 JAR 文件的位置,并且 artifacts.type 的值也必须设置为 jar。无效的值会导致连接器在运行时失败。

    要启用将 Apache Groovy 语言与脚本 SMT 搭配使用,示例中的自定义资源会检索以下库的 JAR 文件:

    • groovy
    • groovy-jsr223 (脚本代理)
    • groovy-json (用于解析 JSON 字符串的模块)

    作为替代方案,Debezium 脚本 SMT 还支持使用 GraalVM JavaScript 的 JSR 223 实施。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml
    Copy to clipboard

    根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您在配置中列出的连接器工件在集群中可用。

  4. 创建一个 KafkaConnector 资源来定义您要部署的每个连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 mongodb-inventory-connector.yaml

    例 2.19. mongodb-inventory-connector.yaml 文件,为 Debezium 连接器定义 KafkaConnector 自定义资源

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-mongodb 1
    spec:
      class: io.debezium.connector.mongodb.MongoDbConnector 2
      tasksMax: 1  3
      config:  4
        mongodb.hosts: rs0/192.168.99.100:27017 5
        mongodb.user: debezium  6
        mongodb.password: dbz  7
        topic.prefix: inventory-connector-mongodb 8
        collection.include.list: inventory[.]*  9
    Copy to clipboard
    表 2.68. 连接器配置设置的描述
    描述

    1

    要注册到 Kafka Connect 集群的连接器名称。

    2

    连接器类的名称。

    3

    可同时操作的任务数量。

    4

    连接器的配置。

    5

    主机数据库实例的地址和端口号。

    7

    Debezium 用来连接到数据库的帐户名称。

    8

    Debezium 用来连接到数据库用户帐户的密码。

    8

    数据库实例或集群的主题前缀。
    指定的名称只能从字母数字字符或下划线构成。
    因为主题前缀用作从此连接器接收更改事件的 Kafka 主题的前缀,因此该名称在集群中的连接器中必须是唯一的。
    如果您将连接器与 Avro 连接器集成,则此命名空间也用于相关的 Kafka Connect 模式的名称,以及对应的 Avro 模式的命名空间。

    9

    连接器从中捕获更改的集合名称。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml
    Copy to clipboard

    例如,

    oc create -n debezium -f mongodb-inventory-connector.yaml
    Copy to clipboard

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。

您现在已准备好 验证 Debezium MongoDB 部署

2.3.5.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium MongoDB 连接器

要部署 Debezium MongoDB 连接器,您必须构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,然后将此容器镜像推送到容器 registry。然后,创建两个自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR。CR 中的 image 属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat Streams for Apache Kafka 的 OpenShift 实例。Apache Kafka 的流提供将 Apache Kafka 到 OpenShift 的 operator 和镜像。
  • 定义 Debezium MongoDB 连接器的 KafkaConnector CR。将此 CR 应用到应用 KafkaConnect CR 的同一 OpenShift 实例。

先决条件

流程

  1. 为 Kafka Connect 创建 Debezium MongoDB 容器:

    1. 创建一个 Dockerfile,它使用 registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0 作为基础镜像。例如,在终端窗口中输入以下命令:

      cat <<EOF >debezium-container-for-mongodb.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-39-rhel9:2.9.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mongodb/3.0.8.Final-redhat-00004/debezium-connector-mongodb-3.0.8.Final-redhat-00004-plugin.zip \
      && unzip debezium-connector-mongodb-3.0.8.Final-redhat-00004-plugin.zip \
      && rm debezium-connector-mongodb-3.0.8.Final-redhat-00004-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      Copy to clipboard
      描述

      1

      您可以指定您想要的任何文件名。

      2

      指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为您的目录的实际路径。

      该命令在当前目录中创建一个名为 debezium-container-for-mongodb.yaml 的 Dockerfile。

    2. 从您在上一步中创建的 debezium-container-for-mongodb.yaml Docker 文件中构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:

      podman build -t debezium-container-for-mongodb:latest .
      Copy to clipboard
      docker build -t debezium-container-for-mongodb:latest .
      Copy to clipboard

      前面的命令使用名称 debezium-container-for-mongodb 构建容器镜像。

    3. 将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器镜像仓库必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:

      podman push <myregistry.io>/debezium-container-for-mongodb:latest
      Copy to clipboard
      docker push <myregistry.io>/debezium-container-for-mongodb:latest
      Copy to clipboard
    4. 创建新的 Debezium MongoDB KafkaConnect 自定义资源(CR)。例如,使用名称 dbz-connect.yaml 创建 KafkaConnect CR,用于指定 注解和 镜像 属性。以下示例显示了描述 KafkaConnect 自定义资源的 dbz-connect.yaml 文件摘录。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        #...
        image: debezium-container-for-mongodb  2
      
        ...
      Copy to clipboard
      描述

      1

      metadata.annotations 表示 KafkaConnector 资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。

      2

      spec.image 指定为运行 Debezium 连接器而创建的镜像的名称。此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。

    5. 输入以下命令将 KafkaConnect CR 应用到 OpenShift Kafka Connect 环境:

      oc create -f dbz-connect.yaml
      Copy to clipboard

      该命令添加一个 Kafka Connect 实例,用于指定为运行 Debezium 连接器而创建的镜像的名称。

  2. 创建一个 KafkaConnector 自定义资源,用于配置 Debezium MongoDB 连接器实例。

    您可以在指定连接器的配置属性的 .yaml 文件中配置 Debezium MongoDB 连接器。连接器配置可能会指示 Debezium 为 MongoDB 副本集或分片集群的子集生成更改事件。另外,您可以设置过滤不需要的集合的属性。

    以下示例配置了 Debezium 连接器,该连接器通过 192.168.99.100 上的端口 27017 连接到 MongoDB 副本集 rs0,并捕获 清单 集合中发生的更改。inventory-connector-mongodb 是副本集的逻辑名称。

    MongoDB inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-mongodb 1
        labels: strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mongodb.MongoDbConnector 2
        config:
         mongodb.connection.string: mongodb://192.168.99.100:27017/?replicaSet=rs0 3
         topic.prefix: inventory-connector-mongodb 4
         collection.include.list: inventory[.]* 5
    Copy to clipboard

    表 2.69. MongoDB inventory-connector.yaml 示例中的设置描述
    描述

    1

    用于在 Kafka Connect 中注册连接器的名称。

    2

    MongoDB 连接器类的名称。

    3

    用于连接到 MongoDB 副本集的主机地址。

    4

    MongoDB 副本集的逻辑名称。逻辑名称形成了生成的事件的命名空间,在使用 Avro converter 时,用于连接器写入的 Kafka 主题的名称、Kafka Connect 模式名称和相应 Avro 模式的命名空间。

    5

    与要监控的所有集合匹配的正则表达式(如 < dbName>.<collectionName> )的可选列表。

  3. 使用 Kafka Connect 创建连接器实例。例如,如果您在 inventory-connector.yaml 文件中保存 KafkaConnector 资源,您将运行以下命令:

    oc apply -f inventory-connector.yaml
    Copy to clipboard

    前面的命令注册 inventory-connector,连接器开始针对 KafkaConnector CR 中定义的 清单 集合运行。

有关您可以为 Debezium MongoDB 连接器设置的配置属性的完整列表,请参阅 MongoDB 连接器配置属性

结果

连接器启动后,它会完成以下操作:

2.3.5.4. 验证 Debezium MongoDB 连接器是否正在运行

如果连接器正确启动且没有错误,它会为每个表创建一个主题,这些表配置为捕获连接器。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。

要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:

  • 验证连接器状态。
  • 验证连接器是否生成主题。
  • 验证主题是否填充了用于读取操作的事件("op":"r"),连接器在每个表的初始快照过程中生成的。

先决条件

  • 在 OpenShift 中,Debezium 连接器部署到 Streams for Apache Kafka。
  • 已安装 OpenShift oc CLI 客户端。
  • 访问 OpenShift Container Platform web 控制台。

流程

  1. 使用以下方法之一检查 KafkaConnector 资源的状态:

    • 在 OpenShift Container Platform Web 控制台中:

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaConnector
      3. KafkaConnectors 列表中,点您要检查的连接器名称,如 inventory-connector-mongodb
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc describe KafkaConnector <connector-name> -n <project>
        Copy to clipboard

        例如,

        oc describe KafkaConnector inventory-connector-mongodb -n debezium
        Copy to clipboard

        该命令返回与以下输出类似的状态信息:

        例 2.20. KafkaConnector 资源状态

        Name:         inventory-connector-mongodb
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-mongodb
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-mongodb.inventory
            inventory-connector-mongodb.inventory.addresses
            inventory-connector-mongodb.inventory.customers
            inventory-connector-mongodb.inventory.geom
            inventory-connector-mongodb.inventory.orders
            inventory-connector-mongodb.inventory.products
            inventory-connector-mongodb.inventory.products_on_hand
        Events:  <none>
        Copy to clipboard
  2. 验证连接器是否已创建 Kafka 主题:

    • 通过 OpenShift Container Platform Web 控制台。

      1. 导航到 Home Search
      2. Search 页面上,单击 Resources 以打开 Select Resource 框,然后键入 KafkaTopic
      3. KafkaTopics 列表中,单击要检查的主题的名称,例如 inventory-connector-mongodb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
      4. Conditions 部分中,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc get kafkatopics
        Copy to clipboard

        该命令返回与以下输出类似的状态信息:

        例 2.21. KafkaTopic 资源状态

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-mongodb--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
        Copy to clipboard
  3. 检查主题内容。

    • 在终端窗口中输入以下命令:
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>
    Copy to clipboard

    例如,

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-mongodb.inventory.products_on_hand
    Copy to clipboard

    指定主题名称的格式与步骤 1 中返回 的格式相同,例如 inventory-connector-mongodb.inventory.addresses

    对于主题中的每个事件,命令会返回类似以下输出的信息:

    例 2.22. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mongodb.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"mongodb","name":"inventory-connector-mongodb","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mongodb-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}
    Copy to clipboard

    在前面的示例中,有效负载 值显示连接器快照从表 inventory.products_on_hand 生成一个读取("op" ="r")事件。product_id 记录的 "before" 状态为 null,表示记录没有之前的值。"after" 状态对于 product_id101 的项目的 quantity 显示为 3

2.3.5.5. Debezium MongoDB 连接器配置属性的描述

Debezium MongoDB 连接器有许多配置属性,可用于为应用程序获得正确的连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:

除非默认值可用 否则需要以下配置属性。

表 2.70. 所需的 Debezium MongoDB 连接器配置属性
属性默认描述

internal.mongodb.allow.offset.invalidation

false

将此属性设置为 true,以便连接器无效 并整合 早期连接器版本记录的特定于分片的偏移量。

警告

此属性允许您修改当前的默认行为。如果默认行为更改为允许连接器自动无效,并整合早期连接器版本记录的偏移,则属性可能会在以后的发行版本中删除。

name

没有默认值

连接器的唯一名称。尝试使用相同名称再次注册将失败。(所有 Kafka 连接连接器都需要此属性。)

connector.class

没有默认值

连接器的 Java 类的名称。对于 MongoDB 连接器,始终使用 io.debezium.connector.mongodb.MongoDbConnector 的值。

mongodb.connection.string

没有默认值

指定连接器用来连接到 MongoDB 副本集 的连接字符串。此属性替换了 MongoDB 连接器之前版本中提供的 mongodb.hosts 属性。

topic.prefix

没有默认值

标识此连接器监控的连接器和/或 MongoDB 副本集或分片集群的唯一名称。每台服务器应该被最多一个 Debezium 连接器监控,因为此服务器名称前缀所有保留的 Kafka 主题都与 MongoDB 副本集或集群类似。仅使用字母数字字符、连字符、句点和下划线来组成名称。逻辑名称应该在所有其他连接器之间唯一,因为该名称用作命名从此连接器接收记录的 Kafka 主题的前缀。

警告

不要更改此属性的值。如果您更改了 name 值,重启后,而不是继续向原始主题发送事件,连接器会将后续事件发送到名称基于新值的主题。

mongodb.authentication.class

DefaultMongoDbAuthProvider

完整的 Java 类名称,是 io.debezium.connector.mongodb.connection.MongoDbAuthProvider 接口的实现。此类处理在 MongoDB 连接上设置凭证(在每个应用程序引导时称为)。默认行为会根据每个文档使用 mongodb.usermongodb.passwordmongodb.authsource 属性,但其他实施可能会以不同的方式使用,或者忽略它们。请注意,mongodb.connection.string 中的任何设置都会覆盖此类设置的设置

mongodb.user

没有默认值

使用默认 mongodb.authentication.class: 在连接到 MongoDB 时使用的数据库用户的名称。这只有在 MongoDB 配置为使用身份验证时才需要。

mongodb.password

没有默认值

使用默认 mongodb.authentication.class: 连接到 MongoDB 时使用的密码。这只有在 MongoDB 配置为使用身份验证时才需要。

mongodb.authsource

admin

使用默认 mongodb.authentication.class: Database (身份验证源)时,包含 MongoDB 凭证。只有在 MongoDB 配置为在将身份验证配置为与 admin 以外的另一个身份验证数据库一起使用时,才需要这样做。

mongodb.ssl.enabled

false

连接器将使用 SSL 连接到 MongoDB 实例。

mongodb.ssl.invalid.hostname.allowed

false

启用 SSL 时,此设置控制是否在连接阶段禁用严格的主机名检查。如果为 true,则连接不会阻止中间人攻击。

filters.match.mode

regex

用于根据包括/excluded 数据库和集合名称匹配的事件的模式。将 属性设置为以下值之一:

regex
数据库和集合包括/excludes 作为用逗号分开的正则表达式列表进行评估。
literal
数据库和集合包括/excludes 作为以逗号分隔的字符串文字列表进行评估。与该文字相关的空格字符会被剥离。

database.include.list

空字符串

与要监控的数据库名称匹配的正则表达式或文字的逗号分隔列表。默认情况下,会监控所有数据库。
当设置了 database.include.list 时,连接器只监控属性指定的数据库。监控中排除其他数据库。

要匹配数据库的名称,Debebe 根据 filters.match.mode 属性的值执行以下操作之一

  • 应用您指定为 anchored 正则表达式的正则表达式。也就是说,指定的表达式与数据库的整个名称字符串匹配;它不匹配数据库名称中可能存在的子字符串。
  • 将您指定的文字与数据库的整个名称字符串进行比较

如果您在配置中包含此属性,不要设置 database.exclude.list 属性。

database.exclude.list

空字符串

与监控中排除的数据库名称匹配的正则表达式或文字的逗号分隔列表。当设置了 database.exclude.list 时,连接器会监控每个数据库,但属性指定除外。

要匹配数据库的名称,Debebe 根据 filters.match.mode 属性的值执行以下操作之一

  • 应用您指定为 anchored 正则表达式的正则表达式。也就是说,指定的表达式与数据库的整个名称字符串匹配;它不匹配数据库名称中可能存在的子字符串。
  • 将您指定的文字与数据库的整个名称字符串进行比较

如果您在配置中包含此属性,请不要设置 database.include.list 属性。

collection.include.list

空字符串

可选的正则表达式或文字列表,与要监控 MongoDB 集合的完全限定命名空间匹配。默认情况下,连接器会监控除 localadmin 数据库中以外的所有集合。当设置 collection.include.list 时,连接器只监控属性指定的集合。监控中排除其他集合。集合标识符是 databaseName.collectionName 的形式。

要匹配命名空间的名称,Debebe 根据 filters.match.mode 属性的值执行以下操作之一

  • 应用您指定为 anchored 正则表达式的正则表达式。也就是说,指定的表达式与命名空间的整个名称字符串匹配,它不与名称中的子字符串匹配。
  • 将您指定的文字与命名空间的整个名称字符串进行比较

如果您在配置中包含此属性,不要设置 collection.exclude.list 属性。

collection.exclude.list

空字符串

可选的正则表达式或文字列表,与 MongoDB 集合的完全限定命名空间匹配,以便从监控中排除。当设置了 collection.exclude.list 时,连接器会监控每个集合,但属性指定除外。集合标识符是 databaseName.collectionName 的形式。

要匹配命名空间的名称,Debebe 根据 filters.match.mode 属性的值执行以下操作之一

  • 应用您指定为 anchored 正则表达式的正则表达式。也就是说,指定的表达式与命名空间的整个名称字符串匹配,它不与数据库名称中可能存在的子字符串匹配。
  • 将您指定的文字与命名空间的整个名称字符串进行比较

如果您在配置中包含此属性,请不要设置 collection.include.list 属性。

capture.mode

change_streams_update_full

指定连接器用来捕获 MongoDB 服务器中的 更新 事件更改的方法。将此属性设置为以下值之一:

change_streams
update 事件消息不包括完整文档。消息不包含代表更改文档状态的字段。
change_streams_update_full

更新 事件消息包括完整文档。消息不包含代表更新前文档状态的 before 字段。事件消息返回 after 字段中文档的完整状态。设置 capture.mode.full.update.type,以指定连接器如何从数据库获取完整的文档。

注意

在某些情况下,当将 capture.mode 配置为返回完整文档时,update 事件消息的 updateDescriptionafter 字段可能会报告不一致的值。在更新多个更新后,此类差异可能会导致快速成功应用文档。连接器仅在收到事件的 updateDescription 字段中描述的更新后从 MongoDB 数据库请求完整的文档。如果后续更新在连接器可以从数据库检索它前修改源文档,连接器会接收此更新修改的文档。

change_streams_update_full_with_pre_image
更新 事件消息包括完整文档,并包含一个代表 更改前 文档状态的字段。设置 capture.mode.full.update.type,以指定连接器如何从数据库获取完整的文档。
change_streams_with_pre_image
更新 事件不包括完整的文档,而是包含一个代表 更改前 文档状态的字段。

capture.scope

部署

指定连接器 打开的更改流的范围。将此属性设置为以下值之一:

部署
为部署(副本集或分片集群)打开更改流光标,以监视所有数据库中所有非系统集合的更改,但 adminlocalconfig 除外。
database

为单个数据库打开一个更改流光标,以监视其所有非系统集合的更改。

警告
集合

为单个集合打开一个更改流光标,以监视对该集合的更改。

重要

将 Debezium 集合 选项用于 capture.scope 属性是一个开发者技术预览功能。红帽不支持开发人员预览功能,且功能完整或生产就绪。不要将开发人员预览软件用于生产环境或关键业务工作负载。开发人员预览软件提供早期对即将推出的产品软件的访问权限,以将其包括在红帽产品产品中。客户可以使用此软件来测试功能并在开发过程中提供反馈。红帽可能会提供在没有关联 SLA 的情况下对开发者预览软件提交反馈的方法。

有关 Red Hat Developer Preview 软件的支持范围的更多信息,请参阅 开发人员预览支持范围

警告

capture.scope 属性的值设置为 collection 可防止连接器使用默认的 信号 频道。由于 频道必须启用,以允许连接器处理增量快照信号,对于在 Kafka、JMX 或 File channels5-4- scoped 上发送信号,在 capture-scope 设为集合时无法执行增量快照。

capture.target

 

指定连接器监控的更改的数据库。只有在 capture.scope 设为 database 时,才会应用此属性。

field.exclude.list

空字符串

可选的、以逗号分隔的字段名称列表,它们应该不包括在更改事件消息值中。字段的完全限定域名格式为 databaseName.collectionName.fieldName.nestedFieldName,其中 databaseNamecollectionName 可能包含通配符 HEKETI,匹配任何字符。

field.renames

空字符串

可选的完全限定替换项列表,用于在更改事件消息值中重命名字段。字段的完全限定替换格式为 databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName,其中 databaseNamecollectionName 可能包含通配符 高可用性,其匹配任意字符,冒号(:)用于决定字段的重命名映射。下一个字段替换应用于列表中上一个字段替换的结果,因此在重命名同一路径的多个字段时请注意这一点。

tombstones.on.delete

true

控制 删除 事件是否后跟 tombstone 事件。

true - 一个 delete 事件表示一个 delete 事件和后续的 tombstone 事件。

false - 仅有一个 delete 事件被抛出。

删除源记录后,发出 tombstone 事件(默认行为)可让 Kafka 完全删除与删除行的密钥相关的所有事件,以防为主题启用了 日志压缩

schema.name.adjustment.mode

none

指定应该如何调整模式名称,以便与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是转义序列,如 Java 中的反斜杠

field.name.adjustment.mode

none

指定应如何调整字段名称,以便与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是转义序列,如 Java 中的反斜杠

如需了解更多详细信息,请参阅 Avro 命名

以下 高级配置 属性具有在大多数情况下可以正常工作的良好默认值,因此很少需要在连接器配置中指定。

表 2.71. Debezium MongoDB 连接器高级配置属性
属性默认描述

capture.mode.full.update.type

lookup

指定当 capture.mode 设置为完整文档时,连接器如何查找更新的文档的完整值。当将 capture.mode 设置为以下选项之一时,连接器会检索完整的文档:

  • change_streams_update_full
  • change_streams_update_full_with_pre-image

要将这个选项与 MongoDB 更改流集合一起使用,您必须配置集合来 返回文档前和后镜像。只有在操作发生前需要的配置时才可用操作的预镜像和后镜像。

将此属性设置为以下值之一:

lookup
连接器使用单独的查找来获取更新的 MongoDB 文档。
警告

如果查找过程无法检索文档,则无法将完整的文档填充为事件有效负载中的 after 状态。在这种情况下,连接器会发出在 after 字段中包含 null 值的事件消息。

可能会出现失败的查找,因为删除操作会在创建后马上删除文档,或者因为对分片键的更改会导致将文档移到不同的位置。当您修改组成密钥的任何属性时,分片密钥更改可能会导致。

post_image
连接器使用 MongoDB post 镜像使用完整的 MongoDB 文档填充事件。数据库必须运行 MongoDB 6.0 或更高版本才能使用此选项。

max.batch.size

2048

正整数值,用于指定应在每个迭代此连接器期间处理的每个批处理事件的最大值。默认值为 2048。

max.queue.size

8192

正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会在将事件写入 Kafka 前将事件放在阻塞队列中。当连接器比将信息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可能会提供从数据库读取更改事件的情况。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。

max.queue.size.in.bytes

0

指定阻塞队列的最大卷的长整数值,以字节为单位。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。
如果也设置了 max.queue.size,当队列的大小达到由任一属性指定的限制时,写入队列会被阻断。例如,如果您设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 个记录后写入队列会被阻断,或者在队列中的记录卷达到 5000 字节。

poll.interval.ms

500

正整数值指定连接器在每次迭代过程中应等待的毫秒数,以便出现新的更改事件。默认值为 500 毫秒,或 0.5 秒。

connect.backoff.initial.delay.ms

1000

正整数值,在连接第一次尝试或没有主可用后尝试重新连接主时指定初始延迟。默认值为 1 秒(1000 ms)。

connect.backoff.max.delay.ms

1000

正整数值,指定在重复连接尝试或没有主可用后尝试重新连接主时的最大延迟。默认值为 120 秒(120,000 ms)。

connect.max.attempts

16

正整数值,用于指定在异常发生前尝试副本集主的失败连接数上限,并中止任务。默认值为 16,它的默认值是 connect.backoff.initial.delay.msconnect.backoff.max.delay.ms,这会导致在失败前尝试 20 分钟。

mongodb.ssl.keystore

无默认值

指定密钥存储文件的位置的可选设置。关键存储文件可用于客户端和 MongoDB 服务器之间的双向身份验证。

mongodb.ssl.keystore.password

无默认值

密钥存储文件的密码。只有在配置了 mongodb.ssl.keystore 时,才指定密码。

mongodb.ssl.keystore.type

无默认值

密钥存储文件的类型。仅在配置了 mongodb.ssl.keystore 时指定类型。

mongodb.ssl.truststore

无默认值

服务器证书验证的信任存储文件的位置。

mongodb.ssl.truststore.password

无默认值

信任存储文件的密码。用于检查信任存储的完整性,并解锁信任存储。只有在配置了 mongodb.ssl.truststore 时,才指定密码。

mongodb.ssl.truststore.type

无默认值

信任存储文件的类型。仅在配置了 mongodb.ssl.truststore 时才指定类型。

heartbeat.interval.ms

0

控制发送心跳消息的频率。
此属性包含间隔(毫秒),用于定义连接器将消息发送到心跳主题的频率。这可用于监控连接器是否仍然从数据库接收更改事件。您还应在较长时间内更改非捕获集合中记录的情况下,利用心跳消息。在这种情况下,连接器会从数据库读取 oplog/change 流,但永远不会将任何更改消息发送到 Kafka,这意味着没有偏移更新提交到 Kafka。这将导致 oplog 文件被轮转,但连接器不会注意到它在重启一些事件时不再可用,这会导致需要重新执行初始快照。

将此参数设置为 0, 以根本不发送心跳消息。
默认禁用此选项。

skipped.operations

t

以逗号分隔的操作类型列表,您希望连接器在流传输过程中跳过。您可以将连接器配置为跳过以下类型的操作:

  • c (插入/创建)
  • u (update)
  • D (删除)
  • T (截断)

如果您不希望连接器跳过任何操作,请将值设为 none。由于 MongoDB 不支持 truncate 更改事件,因此设置默认的 t 值与将值设置为 none 的作用相同。

snapshot.collection.filter.overrides

没有默认值

控制快照中包含哪些集合项目。此属性仅影响快照。以 databaseName.collectionName 格式指定以逗号分隔的集合名称列表。

对于您指定的每个集合,还要指定另一个配置属性: snapshot.collection.filter.overrides.databaseName.collectionName。例如,其他配置属性的名称可以是: snapshot.collection.filter.overrides.customers.orders。将此属性设置为有效的过滤器表达式,该表达式仅检索快照中您想要的项目。当连接器执行快照时,它只检索与过滤器表达式匹配的项目。

snapshot.delay.ms

没有默认值

连接器在启动后执行快照前应等待的时间(毫秒);
可以用来避免在集群中启动多个连接器时进行快照中断,这可能会导致连接器重新平衡。

streaming.delay.ms

0

指定连接器在完成快照后延迟流过程启动的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后马上重启快照,但在流传输过程开始前。设置一个延迟值,它高于为 Kafka Connect worker 设置的 offset.flush.interval.ms 属性的值。

snapshot.fetch.size

0

指定在拍摄快照时应从一个集合中读取的最大文档数。连接器将在这个大小的多个批处理中读取集合内容。
默认值为 0,这表示服务器选择适当的获取大小。

snapshot.include.collection.list

collection.include.list中指定的所有集合

可选的、以逗号分隔的正则表达式列表,与您要包含在快照中的模式的完全限定域名(<databaseName&gt; .<collectionName>)匹配。指定的项目必须在连接器的 collection.include.list 属性中命名。只有在连接器的 snapshot.mode 属性设置为 never 以外的值时,此属性才会生效。
此属性不会影响增量快照的行为。

要匹配模式的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配,它与 schema 名称中可能存在的子字符串不匹配。

snapshot.max.threads

1

正整数值,用于指定在副本集中执行集合同步的最大线程数。默认为 1。

snapshot.mode

Initial

指定连接器启动时执行快照的条件。将 属性设置为以下值之一:

always
连接器在每次启动时都执行快照。快照包括捕获的表的结构和数据。指定此值,每次连接器启动时,使用从捕获的表中数据的完整表示来填充主题。快照完成后,连接器将开始流传输后续数据库更改的事件记录。
Initial
当连接器启动时,它会执行初始数据库快照。快照完成后,连接器将开始流传输后续数据库更改的事件记录。
initial_only
连接器仅在没有为逻辑服务器名称记录偏移时才执行数据库。快照完成后,连接器会停止。它不会过渡到流传输事件记录,用于后续的数据库更改。
never
弃用,请参阅 no_data
no_data
连接器运行一个快照,它捕获所有相关表的结构,但不会创建 READ 事件来代表连接器启动时设置的数据。
when_needed

连接器启动后,只有在检测到以下情况之一时才执行快照:

  • 它无法检测任何主题偏移。
  • 之前记录的偏移量指定了服务器上不可用的日志位置。

provide.transaction.metadata

false

当设置为 true Debezium 时,生成带有事务边界的事件,并使用事务元数据增强数据事件。

如需了解更多详细信息,请参阅 Transaction Metadata

retriable.restart.connector.wait.ms

10000 (10 秒)

在发生 Retriable 错误后重启连接器前等待的毫秒数。

mongodb.poll.interval.ms

30000

连接器为新的、删除或更改副本集轮询的时间间隔。

mongodb.connect.timeout.ms

10000 (10 秒)

驱动程序在中止连接尝试前等待的毫秒数。

mongodb.heartbeat.frequency.ms

10000 (10 秒)

集群监控器试图到达每台服务器的频率。

mongodb.socket.timeout.ms

0

在发生超时前,套接字上的发送/接收前的毫秒数。0 代表禁用此行为。

mongodb.server.selection.timeout.ms

30000 (30 秒)

驱动程序在超时并抛出错误前等待选择服务器的毫秒。

cursor.pipeline

没有默认值

当流传输更改时,此设置将处理应用到更改流事件,作为标准 MongoDB 聚合流管道的一部分。管道(pipeline)是一个 MongoDB 聚合管道,由数据库的说明组成,用于过滤或转换数据。这可用于自定义连接器消耗的数据。此属性的值必须是 JSON 格式的允许 聚合管道阶段 的数组。请注意,这会在用于支持连接器的内部管道后附加(例如过滤操作类型、数据库名称、集合名称等)。

cursor.pipeline.order

internal_first

用于构建有效 MongoDB 聚合流管道的顺序。将 属性设置为以下值之一:

internal_first
首先应用连接器定义的内部阶段。这意味着,只有被连接器捕获的事件才会定向到用户定义的阶段(通过设置 cursor.pipeline进行配置)。
user_first
首先应用 'cursor.pipeline' 属性定义的阶段。在这个模式中,所有事件(包括连接器未捕获的事件)都定向到用户定义的管道阶段。如果 cursor.pipeline 的值包含复杂的操作,则此模式可能会对性能造成负面影响。
user_only
由 'cursor.pipeline' 属性定义的阶段将替换连接器定义的内部阶段。这个模式 只适用于专家用户,因为所有事件都仅由用户定义的管道阶段处理。这个模式可能会对连接器的性能和整体功能造成负面影响!

cursor.oversize.handling.mode

fail

处理超过指定 BSON 大小的文档的更改事件的策略。将 属性设置为以下值之一:

fail
如果更改事件的总大小超过最大 BSON 大小,则连接器会失败。
skip
文档超过最大更改事件(由 cursor.oversize.skip.threshold 属性指定)大小将被忽略
split
更改超过最大 BSON 大小的事件将使用 $changeStreamSplitLargeEvent 聚合来分割。这个选项需要 MongoDB 6.0.9 或更新版本

cursor.oversize.skip.threshold

0

处理更改事件的最大允许大小( 以字节为单位 )。这包括数据库操作前和之后的大小,这更具体地限制了 fullDocument 和 fullDocumentBeforeChange 文件,该文件的大小用于 MongoDB 更改事件。

cursor.max.await.time.ms

0

指定 oplog/change 流光标在导致执行超时异常前等待服务器生成结果的最大毫秒数。值 0 表示使用 server/driver 默认等待超时。

signal.data.collection

没有默认值

用于向连接器发送信号的数据收集的完全限定名称。https://docs.redhat.com/documentation/en/red_hat_build_of_debezium/3.0.8/html-single/debezium_user_guide/index#debezium-signaling-enabling-source-signaling-channel使用以下格式指定集合名称:
<databaseName> . < collectionName>

signal.enabled.channels

source

为连接器启用的信号通道名称列表。默认情况下,以下频道可用:

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

没有默认值

为连接器启用的通知频道名称列表。默认情况下,以下频道可用:

  • sink
  • log
  • jmx

incremental.snapshot.chunk.size

1024

连接器在增量快照块中获取并读取的最大文档数。增加块大小可提供更高的效率,因为快照会减少对更大大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。

incremental.snapshot.watermarking.strategy

insert_insert

指定连接器在增量快照中使用的水位线机制,以重复数据删除事件,这些事件可能会被增量快照捕获,然后在流恢复后重新捕获。
您可以指定以下选项之一:

insert_insert
当您发送一个信号来启动增量快照时,对于 Debezium 在快照期间读取的每个块,它会将条目写入信号数据收集来记录信号,以打开快照窗口。快照完成后,Debezium 会插入第二个条目,记录信号以关闭窗口。
insert_delete
当您发送一个信号来启动增量快照时,对于 Debezium 读取的每个块,它会将单个条目写入信号数据收集,以记录信号来打开快照窗口。快照完成后,会删除此条目。不会为关闭快照窗口的信号创建条目。设置这个选项以防止快速增长信号数据收集。

topic.naming.strategy

io.debezium.schema.DefaultTopicNamingStrategy

应用于决定数据更改的主题名称、模式更改、事务、心跳事件等主题名称,默认为 DefaultTopicNamingStrategy

topic.delimiter

.

指定主题名称的分隔符,默认为

topic.cache.size

10000

在绑定的并发散列映射中保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。

topic.heartbeat.prefix

__debezium-heartbeat

控制连接器发送心跳消息的主题名称。主题名称具有此模式:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

topic.transaction

transaction

控制连接器发送事务元数据消息的主题名称。主题名称具有此模式:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认的主题名称是 fulfillment.transaction

custom.metric.tags

没有默认值

通过添加提供上下文信息的元数据来定义自定义 MBean 对象名称的标签。指定以逗号分隔的键值对列表。每个键代表 MBean 对象名称的标签,对应的值代表键的值,例如
k1=v1,k2=v2

连接器将指定的标签附加到基础 MBean 对象名称。标签可帮助您组织和分类指标数据。您可以定义标签来标识特定的应用程序实例、环境、区域、版本等。如需更多信息,请参阅自定义 MBean 名称

errors.max.retries

-1

指定连接器如何在生成 Retriable 错误的操作后响应,如连接错误。
设置以下选项之一:

-1
无限制。无论之前失败的数量如何,连接器总是自动重启,并重试操作。
0
disabled。连接器会立即失败,永远不会重试操作。重启连接器需要用户干预。
> 0
连接器会自动重启,直到达到指定的最大重试次数。下一次失败后,连接器会停止,用户需要干预才能重启它。

用于配置 MongoDB 连接器如何与 Kafka 信号主题交互的属性

Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题进行交互。

下表描述了 Kafka 信号 属性。

表 2.72. Kafka 信号配置属性
属性默认描述

signal.kafka.topic

<topic.prefix>-signal

连接器监控用于临时信号的 Kafka 主题的名称。

注意

如果禁用 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号顺序。信号主题必须具有单个分区。

signal.kafka.groupId

kafka-signal

Kafka 用户使用的组 ID 的名称。

signal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机和端口对列表。每个对引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

signal.kafka.poll.timeout.ms

100

整数值,用于指定连接器在轮询信号时等待的最大毫秒数。

用于配置 MongoDB 连接器接收器通知频道的透传属性

下表描述了可用于配置 Debezium sink 通知 频道的属性。

表 2.73. sink 通知配置属性
属性默认描述

notification.sink.topic.name

没有默认值

从 Debezium 接收通知的主题名称。当您将 notification.enabled.channels 属性配置为包含 sink 作为启用的通知频道之一时,需要此属性。

2.3.6. 监控 Debezium MongoDB 连接器性能

Debezium MongoDB 连接器除了对 Zookeeper、Kafka 和 Kafka Connect 具有 JMX 指标的内置支持外,还有两个指标类型。

  • 快照指标 提供在执行快照时有关连接器操作的信息。
  • 流指标 在连接器捕获更改和流更改事件记录时提供有关连接器操作的信息。

Debezium 监控文档 提供有关如何使用 JMX 公开这些指标的详细信息。

2.3.6.1. MongoDB 连接器快照和流 MBean 对象的自定义名称

Debezium 连接器通过 MBean 名称为连接器公开指标。这些指标(特定于每个连接器实例)提供有关连接器快照、流和架构历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为每个不同的连接器指标生成一个唯一的 MBean 名称。要查看连接器进程的指标,您可以将可观察性堆栈配置为监控其 MBean。但是,这些默认 MBean 名称取决于连接器配置,但配置更改可能会导致对 MBean 名称的更改。对 MBean 名称的更改会破坏连接器实例和 MBean 之间的链接,并破坏监控活动。在这种情况下,如果要恢复监控,您必须重新配置 observability 堆栈以使用新的 MBean 名称。

要防止监控 MBean 名称更改结果的中断,您可以配置自定义指标标签。您可以通过在连接器配置中添加 custom.metric.tags 属性来配置自定义指标。属性接受键值对,其中每个键代表 MBean 对象名称的标签,对应的值代表该标签的值。例如: k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称中。

为连接器配置 custom.metric.tags 属性后,您可以配置 observability 堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签,而不是可变 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了它如何构建 MBean 名称,或者连接器配置更改中的 topic.prefix,则指标集合不会中断,因为指标提取任务使用指定的标签模式来识别连接器。

使用自定义标签的更多优点是,您可以使用反映数据管道架构的标签,以便以适合您操作需求的方式组织指标。例如,您可以使用声明连接器活动类型、应用程序上下文或数据源的值来指定标签,如 db1-streaming-for-application-abc。如果您指定多个键值对,则所有指定的对都会附加到连接器的 MBean 名称中。

以下示例演示了标签如何修改默认的 MBean 名称。

例 2.23. 自定义标签如何修改连接器 MBean 名称

默认情况下,MongoDB 连接器使用以下 MBean 名称进行流传输指标:

debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>
Copy to clipboard

如果将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 会生成以下自定义 MBean 名称:

debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
Copy to clipboard

2.3.6.2. 在 MongoDB 快照过程中监控 Debezium

MBeandebezium.mongodb:type=connector-metrics,context=snapshot,server= <topic.prefix& gt; ,task= <task.id>

快照指标不会被公开,除非快照操作活跃,或者快照自上次连接器开始以来发生了某种情况。

下表列出了可用的快照指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个快照事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

此连接器自上一次启动或重置以来看到的事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在快照和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在快照和主 Kafka Connect 循环之间传递事件的队列的可用容量。

TotalTableCount

int

包括在快照中的表的总数。

RemainingTableCount

int

快照必须复制的表数。

SnapshotRunning

布尔值

快照是否已启动。

SnapshotPaused

布尔值

快照是否已暂停。

SnapshotAborted

布尔值

快照是中止的。

SnapshotCompleted

布尔值

快照是否完成。

SnapshotDurationInSeconds

long

快照到目前为止需要的秒数,即使未完成也是如此。也包括快照暂停的时间。

SnapshotPausedDurationInSeconds

long

快照暂停的秒数。如果快照暂停了多次,暂停的时间会增加。

RowsScanned

Map<String, Long>

包含快照中每个表扫描的行数的映射。在处理过程中,表会以递增方式添加到映射中。更新每 10,000 行扫描并完成表后。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

Debezium MongoDB 连接器还提供以下自定义快照指标:

属性类型描述

NumberOfDisconnects

long

数据库断开连接的数量。

2.3.6.3. 监控 Debezium MongoDB 连接器记录流

MBeandebezium.mongodb:type=connector-metrics,context=streaming,server= <topic.prefix& gt; ,task= <task.id>

下表列出了可用的流指标。

属性类型描述

LastEvent

字符串

连接器读取的最后一个流事件。

MilliSecondsSinceLastEvent

long

因为连接器已读取和处理最新事件,因此毫秒数。

TotalNumberOfEventsSeen

long

源数据库自上次连接器启动后报告的数据更改事件总数,或者因为指标重置后。代表 Debezium 处理的数据更改工作负载。

TotalNumberOfCreateEventsSeen

long

自上次启动或指标重置以来,连接器处理的创建事件总数。

TotalNumberOfUpdateEventsSeen

long

自上次启动或指标重置以来,连接器处理的更新事件总数。

TotalNumberOfDeleteEventsSeen

long

自上次启动或指标重置后,连接器处理的删除事件总数。

NumberOfEventsFiltered

long

已根据连接器上配置的 include/exclude 列表过滤规则过滤的事件数量。

CapturedTables

string[]

连接器捕获的表列表。

QueueTotalCapacity

int

在流器和主 Kafka Connect 循环之间传递事件的队列长度。

QueueRemainingCapacity

int

用于在流程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

Connected

布尔值

表示连接器当前是否已连接到数据库服务器的标记。

MilliSecondsBehindSource

long

最后一次更改事件的时间戳和连接器处理之间的毫秒数。这些值将纳入运行数据库服务器和连接器的机器上时钟之间的差别。

NumberOfCommittedTransactions

long

已提交的已处理事务的数量。

SourceEventPosition

Map<String, String>

最后一次接收的事件的协调。

LastTransactionId

字符串

最后一次处理的事务的事务标识符。

MaxQueueSizeInBytes

long

队列的最大数量(以字节为单位)。如果将 max.queue.size.in.bytes 设置为正长值,则此指标可用。

CurrentQueueSizeInBytes

long

队列中记录的当前卷(以字节为单位)。

Debezium MongoDB 连接器还提供以下自定义流指标:

属性类型描述

NumberOfDisconnects

long

数据库断开连接的数量。

NumberOfPrimaryElections

long

主要节点选举数量。

2.3.7. Debezium MongoDB 连接器如何处理错误和问题

Debezium 是一个分布式系统,它捕获多个上游数据库中的所有更改,永远不会丢失或丢失某个事件。当系统正常运行并谨慎管理时,Debezium 会在每次更改事件时发送一次

如果发生错误,系统不会丢失任何事件。但是,当它从错误中恢复时,可能会重复一些更改事件。在这种情况下,Debebe (如 Kafka )至少提供一次 更改事件交付。

以下主题详细介绍了 Debezium MongoDB 连接器如何处理各种错误和问题。

配置和启动错误

在以下情况下,连接器会在尝试启动时失败,在日志中报告错误或异常,并停止运行:

  • 连接器的配置无效。
  • 连接器无法使用指定的连接参数成功连接到 MongoDB。

失败后,连接器会尝试使用 exponential backoff 重新连接。您可以配置重新连接尝试的最大数量。

在这些情况下,这个错误将了解更多有关此问题的详细信息,并可能会有推荐的临时解决方案。当配置已被修正或 MongoDB 问题已被解决时,连接器可以重启。

MongoDB 变得不可用

当连接器运行后,如果任何 MongoDB 副本集的主节点不可用或无法访问,连接器将重复尝试重新连接到主节点,使用 exponential backoff 来防止网络或服务器饱和。如果在可配置的连接尝试次数后主不可用,则连接器将失败。

尝试重新连接由三个属性控制:

  • connect.backoff.initial.delay.ms - 在第一次尝试重新连接前的延迟,默认值为 1 秒(1000 毫秒)。
  • connect.backoff.max.delay.ms - 尝试重新连接前的最大延迟,默认值为 120 秒(120,000 毫秒)。
  • connect.max.attempts - 生成错误前尝试的最大尝试次数,默认值为 16。

每个延迟都加倍之前的延迟,最多为最大延迟。根据默认值,下表显示每个失败的连接尝试的延迟,以及失败前的总时间。

重新连接尝试号延迟(以秒为单位)尝试前的总延迟,以分钟和秒为单位

1

1

00:01

2

2

00:03

3

4

00:07

4

8

00:15

5

16

00:31

6

32

01:03

7

64

02:07

8

120

04:07

9

120

06:07

10

120

08:07

11

120

10:07

12

120

12:07

13

120

14:07

14

120

16:07

15

120

18:07

16

120

20:07

连接器无法启动 - InvalidResumeToken 或 ChangeStreamHistoryLost

在较长时间内停止的连接器无法启动,并报告以下例外:

Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog
Copy to clipboard

前面的例外表示 oplog 中不再存在与连接器恢复令牌对应的条目。因为 oplog 不再包含连接器处理的最后偏移量,所以连接器无法恢复流。

您可以使用以下选项之一从失败中恢复:

  • 删除失败的连接器,并使用相同的配置创建新连接器,但使用不同的连接器名称。
  • 暂停连接器,然后删除偏移量或更改偏移主题。

为了帮助防止与缺失恢复令牌相关的故障,请 优化 oplog 的配置

Kafka Connect 进程正常停止

如果 Kafka Connect 正在以分布式模式运行,并且 Kafka Connect 进程被安全停止,则在关闭 Kafka Connect 之前,将所有进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程,新的连接器任务将完全关闭之前的任务。处理过程中会有一个短暂的延迟,而连接器任务被安全停止并在新进程中重启。

如果组只包含一个进程,且该进程安全停止,则 Kafka Connect 将停止连接器,并记录每个副本集的最后一个偏移。重启后,副本集任务将继续完全关闭它们。

Kafka Connect 进程崩溃

如果 Kafka Connector 进程意外停止,则它运行的连接器任务将终止,而不会记录它们最近处理的偏移量。当 Kafka Connect 以分布式模式运行时,它将在其他进程中重启这些连接器任务。但是,MongoDB 连接器将从之前进程 记录 的最后偏移量中恢复,这意味着新的替换任务可能会生成在崩溃前处理的一些相同的更改事件。重复事件的数量取决于偏移清除周期以及崩溃前数据更改的卷。

注意

因为在从失败时恢复过程中可能会重复一些事件,所以消费者应始终预期一些事件可能重复。Debezium 更改是幂等的,因此一系列事件始终产生相同的状态。

Debezium 还包括每个更改事件的信息,其中包含有关事件来源的特定于源的信息,包括 MongoDB 事件的唯一事务标识符(h)和时间戳(secord)。消费者可以跟踪这些值的其他内容,以了解它是否已看到特定的事件。

Kafka 变得不可用

当连接器生成更改事件时,Kafka Connect 框架使用 Kafka producer API 在 Kafka 中记录这些事件。Kafka Connect 还会根据您在 Kafka Connect worker 配置中指定的频率定期记录这些更改事件中出现的最新偏移量。如果 Kafka 代理不可用,运行连接器的 Kafka Connect worker 进程只会重复尝试重新连接到 Kafka 代理。换句话说,连接器任务将简单暂停,直到可以重新建立连接,此时连接器将完全恢复其离开的位置。

如果 snapshot.mode 设为 initial,则在停止了较长的时间间隔后连接器会失败

如果连接器安全停止,用户可能会继续对副本设置成员执行操作。在连接器离线时发生的更改会在 MongoDB 的 oplog 中继续记录。在大多数情况下,连接器重启后,它会读取 oplog 中的偏移值,以确定每个副本集流的最后操作,然后从该点恢复流更改。重启后,当连接器停止到 Kafka 时以及一段时间后,连接器会随数据库捕获。连接器捕获所需的时间取决于 Kafka 的功能和性能,以及数据库中发生的更改卷。

但是,如果连接器为很长时间的间隔停止,则 MongoDB 可能会发生在连接器不活跃时清除 oplog,从而导致连接器最后一次位置的信息丢失。连接器重启后,它无法恢复流,因为 oplog 不再包含之前的偏移值,以标记连接器处理的最后一个操作。连接器无法执行快照,因为它通常当 snapshot.mode 属性设置为 initial,且没有偏移值时。在这种情况下,存在一个不匹配,因为 oplog 不包含之前偏移的值,但偏移值存在于连接器的内部 Kafka offsets 主题中。错误结果,连接器会失败。

要从失败中恢复,请删除失败的连接器,并使用相同的配置创建新连接器,但使用不同的连接器名称。当您启动新连接器时,它会执行快照来达到数据库状态,然后恢复流。

MongoDB 丢失写入

在某些情况下,MongoDB 可能会丢失提交,这会导致 MongoDB 连接器无法捕获丢失的更改。例如,如果在应用更改后的主要崩溃突然崩溃,并记录更改其 oplog,则 oplog 可能在辅助节点可以读取其内容之前不可用。因此,作为新主节点选择的二级节点可能会缺少 oplog 中最新的更改。

目前,无法防止 MongoDB 中的这种副作用。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat, Inc.