4.2. Debezium MongoDB 连接器的工作方式
有关连接器支持的 MongoDB 拓扑概述,用于规划应用程序。
当配置和部署 MongoDB 连接器时,它会首先通过连接到 seed 地址的 MongoDB 服务器,并确定每个可用副本集的详情。由于每个副本集都有自己的独立的 oplog,因此连接器会尝试为每个副本集使用单独的任务。连接器可以限制将使用的最大任务数量,如果没有足够的任务可用,则连接器会将多个副本集分配给每个任务,但任务仍会为每个副本集使用单独的线程。
在针对分片集群运行连接器时,请使用大于副本集数的 tasks.max
值。这将允许连接器为每个副本集创建一个任务,并允许 Kafka Connect 协调、分发和管理所有可用 worker 进程中的任务。
以下主题提供有关 Debezium MongoDB 连接器如何工作的详细信息:
- 第 4.2.1 节 “Debezium 连接器支持的 MongoDB 拓扑”
- 第 4.2.2 节 “Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称”
- 第 4.2.3 节 “Debezium MongoDB 连接器如何执行快照”
- 第 4.2.4 节 “Debezium MongoDB 连接器流更改事件记录”
- 第 4.2.6 节 “接收 Debezium MongoDB 更改事件记录的 Kafka 主题默认名称”
- 第 4.2.7 节 “Debezium MongoDB 连接器的事件键控制主题分区的方式”
- 第 4.2.8 节 “Debezium MongoDB 连接器生成的事件代表事务边界”
4.2.1. Debezium 连接器支持的 MongoDB 拓扑
MongoDB 连接器支持以下 MongoDB 拓扑:
- MongoDB 副本集
Debezium MongoDB 连接器可以从单个 MongoDB 副本集 捕获更改。生产环境副本集 至少需要三个成员。
要将 MongoDB 连接器与副本集一起使用,请使用连接器的
mongodb.hosts
属性提供一个或多个副本集服务器的地址作为 seed addresses。连接器将使用这些 seeds 连接到副本集,然后在连接后从副本集获取完整的成员集合,以及哪些成员是主成员。连接器将启动一个任务来连接到主设备并从主 oplog 中捕获更改。当副本集选择新主主时,任务将自动切换到新的主设备。注意当 MongoDB 由代理(如 OS X 或 Windows 上的 Docker)开头时,当客户端连接到副本集并发现成员时,MongoDB 客户端将排除代理作为有效成员,并将尝试并无法直接连接到成员,而不是通过代理进行连接。
在这种情况下,将连接器的可选
mongodb.members.auto.discover
配置属性设置为false
,以指示连接器强制识别身份发现,而是使用第一个 seed 地址(通过mongodb.hosts
属性指定)作为主节点。这可能会正常工作,但仍在选举发生时导致问题。
- MongoDB 分片集群
MongoDB 分片集群 包括:
- 一个或多个 分片,各自部署为副本集;
- 用作 集群配置服务器的独立副本集
客户端需要连接到的一个或多个 routers (也称为
mongos
)。它们会将请求路由到相关的分片。要将 MongoDB 连接器与分片集群搭配使用,请使用 配置服务器副本集的主机地址配置 连接器。当连接器连接到此副本集时,它会发现它充当分片集群的配置服务器,发现有关集群中用作分片的每个副本集的信息,然后启动一个单独的任务来捕获每个副本集中的更改。如果向集群或现有分片添加了新的分片,连接器将相应地调整其任务。
- MongoDB 独立服务器
- MongoDB 连接器无法监控独立 MongoDB 服务器的更改,因为单机服务器没有 oplog。如果单机服务器转换为一个成员设置的副本集,则连接器将可以正常工作。
MongoDB 不建议在生产环境中运行独立服务器。如需更多信息,请参阅 MongoDB 文档。
4.2.2. Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称
连接器配置属性 topic.prefix
充当 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称: 作为所有主题名称的前缀,在记录每个副本集的更改流位置时作为唯一标识符。
您应该为每个 MongoDB 连接器指定一个有意义的逻辑名称,它有意义的描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头,剩余字符为字母数字字符或下划线。
4.2.3. Debezium MongoDB 连接器如何执行快照
当任务使用副本集启动时,它会使用连接器的逻辑名称和副本集名称来查找偏移,该 偏移 描述了连接器之前停止读取更改的位置。如果可以找到偏移,并且它仍然存在于 oplog 中,则任务会立即进行 流更改,从记录的偏移位置开始。
但是,如果没有找到偏移,或者 oplog 不再包含该位置,则任务必须首先通过 执行快照 来获取副本集内容的当前状态。这个过程首先记录 oplog 的当前位置,并记录为偏移量(以及表示已启动快照的标记)。然后,该任务将继续复制每个集合,尽可能生成线程(最多为 snapshot.max.threads
配置属性的值)来并行执行这一工作。连接器将为每个文档记录一个单独的 读取事件,并且读取事件将包含对象的标识符、对象的完整状态,以及找到对象的 MongoDB 副本集 的源 信息。源信息还包括一个标记,表示事件是在快照期间生成的。
此快照将继续,直到复制了与连接器过滤器匹配的所有集合。如果在任务的快照完成前停止连接器,在重启连接器后再次开始快照。
尝试避免在连接器执行任何副本集的快照时避免任务重新分配和重新配置。连接器会生成日志消息来报告快照的进度。要提供最大的控制,请为每个连接器运行单独的 Kafka Connect 集群。
4.2.3.1. 临时快照
临时快照是 Debezium MongoDB 连接器的技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围。
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,遵循这个初始快照,连接器不会重复快照过程。连接器捕获的任何将来更改事件数据都仅通过流传输过程。
然而,在某些情况下,连接器在初始快照期间获取的数据可能会变得过时、丢失或不完整。为了提供总结集合数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能是执行临时快照:
- 连接器配置会被修改来捕获不同的集合集。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时命令快照,为之前捕获的快照重新运行快照。临时快照需要使用 信号集合。您可以通过向 Debezium 信号集合发送信号请求来启动临时快照。
当您启动现有集合的临时快照时,连接器会将内容附加到集合已存在的主题中。如果删除了之前存在的主题,如果启用自动主题创建,Debezium 可以自动创建主题。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q2/html-single/debezium_user_guide/index#customization-of-kafka-connect-automatic-topic-creation
临时快照信号指定要包含在快照中的集合。快照可以捕获数据库的整个内容,或者只捕获数据库中的集合子集。另外,快照也可以捕获数据库中集合内容的子集。
您可以通过向信号集合发送 execute-snapshot
消息来指定要捕获的集合。将 execute-snapshot
信号的类型设置为增量
,并提供快照中包含的集合名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| 不适用 |
包含与要快照的集合的完全限定域名匹配的正则表达式的数组。 |
| 不适用 | 可选字符串,它根据集合的列指定条件,用于捕获集合内容的子集。 |
触发临时快照
您可以通过在信号集合中添加带有 execute-snapshot
信号类型的条目来启动临时快照。连接器处理消息后,它会开始快照操作。快照过程读取第一个和最后一个主密钥值,并使用这些值作为每个集合的开始和端点。根据集合中条目数量以及配置的块大小,Debezium 将集合分成块,并一次为每个块进行快照。
4.2.3.2. 增量快照
增量快照是 Debezium MongoDB 连接器的技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围。
为了提供管理快照的灵活性,Debezium 包括一个补充的快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,而不是一次性捕获数据库的完整状态,如初始快照,Debezium 会在一系列可配置的块中捕获每个集合。您可以指定您希望快照捕获 的集合以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1 KB。
当增量快照进行时,Debezium 使用水位线来跟踪其进度,维护它捕获的每个集合行的记录。这个阶段捕获数据的方法比标准初始快照过程提供以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是经过发布流,直到快照完成为止。连接器会继续在整个快照过程中从更改日志捕获接近实时事件,且操作都不会阻断其他事件。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间开始,而不是从开始回收集合。
-
您可以随时根据需要运行增量快照,并根据需要重复这个过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将集合添加到其
collection.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 按主密钥对集合进行排序,然后根据 配置的块大小 将集合分成块。然后,按块使用块,然后捕获块中的每个集合行。对于它捕获的每行,快照会发出 READ
事件。该事件代表了块开始快照时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,从而可能会修改集合记录。要反映此类更改,INSERT
、UPDATE
或 DELETE
操作会照常提交到事务日志中。同样,持续 Debezium 流过程还会继续检测这些更改事件,并将对应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥解决记录间的冲突
在某些情况下,流传输进程发出的 UPDATE
或 DELETE
事件会耗尽序列。也就是说,流过程可能会发出一个事件,它会在快照捕获包含该行的 READ
事件前修改集合行。当快照最终会为行发出对应的 READ
事件时,其值已经被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debezium 会使用一种缓冲区方案来解决冲突。只有在快照事件和流事件间冲突时才解决后,Debezium 会向 Kafka 发送事件记录。
快照窗口
为了帮助解决后续 READ
事件和修改同一集合行的流事件之间的冲突,Debezium 采用所谓的 快照窗口。快照窗口分离了增量快照捕获指定集合块的数据的时间间隔。在打开块的快照窗口前,Debebe 会遵循其常见的行为,并将事件直接从事务日志直接降级到目标 Kafka 主题。但是,从打开特定块的快照时,Debebe 会执行重复数据删除步骤,以解决具有相同主密钥的事件之间冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ
操作发出。同时,当用户继续更新数据收集中的记录,并更新事务日志以反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会将快照记录提供给内存缓冲区。在快照窗口中,缓冲区中 READ
事件的主键将与传入流事件的主键进行比较。如果没有找到匹配项,流的事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流的记录写入目标主题,因为流的事件逻辑地替换静态快照事件。在块的快照窗口关闭后,缓冲区仅包含没有相关事务日志事件的 READ
事件。Debezium 将这些剩余的 READ
事件发送到集合的 Kafka 主题。
连接器重复每个快照块的进程。
增量快照需要完全排序主密钥。但是,字符串
可能无法保证稳定的排序,因为编码和特殊字符可能会导致意外行为(Mongo 排序 字符串
)。请考虑在执行增量快照时将其他类型用于主密钥。
目前,单个副本集部署只支持增量快照。
4.2.3.3. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号集合发送临时快照 信号。
您可以使用 MongoDB insert ()
方法向信号集合提交信号。
在 Debezium 检测到信号集合中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定要包含在快照中的集合,以及可选的指定快照操作类型。目前,快照操作的唯一有效选项是默认值 增量
。
要指定快照中包含的集合,提供一个 data-collections
数组,该数组列出了用于匹配集合的集合或一组正则表达式,例如{"data-collections": ["public.Collection1", "public.Collection2"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不执行快照。
如果要包含在快照中的集合名称包含数据库、模式或表名称的句点(.
),则需要将集合添加到 data-collections
数组中,您必须用双引号转义名称的每个部分。
例如,若要包含 公共
数据库中存在的数据收集,并且名为 MyCollection
,请使用以下格式:" public"."MyCollection
"。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
将快照信号文档插入到信号集合中:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});
例如,
db.debeziumSignal.insert({ 1 "type" : "execute-snapshot", 2 3 "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4 "type": "incremental"} 5 });
命令中的
id
、type
和data
参数的值对应于 信号集合的字段。下表描述了示例中的参数:
表 4.2. MongoDB insert ()命令中的字段描述,用于将增量快照信号发送到信号集合 项 值 描述 1
db.debeziumSignal
指定源数据库上信号集合的完全限定名称。
2
null
_id
参数指定分配给信号请求的id
标识符的任意字符串。
上例中的插入方法省略了可选_id
参数的使用。由于文档没有明确为该参数分配值,所以 MongoDB 会自动分配给文档的任意 id 成为信号请求的id
标识符。
使用此字符串来标识到信号集合中条目的日志记录信息。Debezium 不使用此标识符字符串。相反,在快照中,Debebe 会生成自己的id
字符串作为水位信号。3
execute-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定集合名称或正则表达式数组,以匹配快照中包含的集合名称。
数组根据其完全限定名称列出与集合匹配的正则表达式,其格式与您用来在signal.data.collection
配置属性中指定连接器的信号集合的名称相同。5
增量
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效选项是默认值增量
。
如果没有指定值,连接器将运行一个增量快照。
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件信息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
4.2.3.4. 停止增量快照
您还可以通过向源数据库上的集合发送信号来停止增量快照。您可以通过在信号集合中插入文档来提交停止快照信号。在 Debezium 检测到信号集合中的更改后,它会读取信号,并在进行时停止增量快照操作。
您提交的查询指定了 增量
的快照操作,以及可选的、要删除的当前运行快照的集合。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
流程
将停止快照信号文档插入到信号集合中:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});
例如,
db.debeziumSignal.insert({ 1 "type" : "stop-snapshot", 2 3 "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4 "type": "incremental"} 5 });
signal 命令中的
id
、type
和data
参数的值对应于 信号集合的字段。下表描述了示例中的参数:
表 4.3. 向信号集合发送停止增量快照文档的插入命令中字段的描述 项 值 描述 1
db.debeziumSignal
指定源数据库上信号集合的完全限定名称。
2
null
上例中的插入方法省略了可选
_id
参数的使用。由于文档没有明确为该参数分配值,所以 MongoDB 会自动分配给文档的任意 id 成为信号请求的id
标识符。
使用此字符串来标识到信号集合中条目的日志记录信息。Debezium 不使用此标识符字符串。3
stop-snapshot
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定集合名称或正则表达式数组,以匹配要从快照中删除的集合名称。
数组根据其完全限定名称列出与集合匹配的正则表达式,其格式与您用来在signal.data.collection
配置属性中指定连接器的信号集合的名称相同。如果省略了data
字段的此组件,则信号将停止正在进行的整个增量快照。5
增量
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效选项是增量的
。
如果没有指定类型
值,则信号无法停止增量快照。
4.2.4. Debezium MongoDB 连接器流更改事件记录
在副本设置的连接器任务记录偏移后,它会使用偏移来确定它应该启动流更改的 oplog 中的位置。然后,任务(取决于配置)连接到副本集的主节点,或连接到副本集范围的更改流,并开始从该位置更改流。它处理所有创建、插入和删除操作,并将其转换为 Debezium 更改事件。每个更改事件都包含找到操作的 oplog 中的位置,连接器会定期将其记录为其最新的偏移量。记录偏移的时间间隔取决于 offset.flush.interval.ms
,它是一个 Kafka Connect worker 配置属性。
当连接器安全停止时,处理的最后一个偏移会被记录,以便在重启时,连接器将继续准确关闭位置。如果连接器的任务意外终止,则任务可能会在最后记录偏移后处理并生成事件,但在记录最后一个偏移前;重启后,连接器从上次 记录的 偏移开始,可能会生成之前在崩溃前生成的相同事件。
当 Kafka 管道中的所有组件无关时,Kafka 使用者 会完全 接收每个消息。但是,当出现错误时,Kafka 只能保证消费者 至少接收每个消息一次。为避免意外的结果,消费者必须能够处理重复的消息。
如前面所述,连接器任务总是使用副本集的主节点从 oplog 流更改,确保连接器尽可能查看最新的操作,并可以捕获比使用第二个延迟更低的更改。当副本集选择新主节点时,连接器会立即停止流更改,连接到新主节点,并从同一位置的新主节点启动流更改。同样,如果连接器遇到与副本集成员通信的问题,它会尝试使用 exponential backoff 来重新连接,以便不负担副本集,并连接后继续从最后离开的位置进行更改。这样,连接器可以动态调整副本设置成员资格中的更改,并自动处理通信失败。
为了总结,MongoDB 连接器在大多数情况下继续运行。通信问题可能会导致连接器等待直到问题解决为止。
4.2.5. MongoDB 支持在 Debezium 更改事件中填充 before
字段
在 MongoDB 6.0 及更高版本中,您可以配置更改流,以发出文档的预镜像状态,以填充 MongoDB 更改事件的 before
字段。要在 MongoDB 中启用预镜像,您必须使用 db.
、创建或 create
Collection ()collMod
为集合设置 changeStreamPreAndPostImages
。要使 Debezium MongoDB 在更改事件中包含 pre-images,请将连接器的 capture.mode
设置为 一 一 一 一,with_pre_image
选项。
MongoDB 更改流事件的大小限制为 16MB。因此,使用预镜像会增加这个阈值的可能性,这可能会导致失败。有关如何避免超过更改流限制的详情,请参考 MongoDB 文档。
4.2.6. 接收 Debezium MongoDB 更改事件记录的 Kafka 主题默认名称
MongoDB 连接器将所有插入、更新和删除操作写入事件,将每个集合中的操作写入单个 Kafka 主题。Kafka 主题的名称始终使用 logicalName.databaseName.collectionName 格式,其中 logical Name 是连接器的逻辑名称,如 topic.prefix
配置属性,databaseName 是发生操作的数据库的名称,collectionName 是受影响的文档存在的 MongoDB 集合的名称。
例如,假设一个 MongoDB 副本集有一个 inventory
数据库,其中包含四个集合:products
, products_on_hand
, customers
, 和 orders
。如果监控这个数据库的连接器有一个逻辑名称 fulfillment
,则这个连接器会在这四个 Kafka 主题上生成事件:
-
fulfillment.inventory.products
-
fulfillment.inventory.products_on_hand
-
fulfillment.inventory.customers
-
fulfillment.inventory.orders
请注意,主题名称不包含副本集名称或分片名称。因此,对分片集合的所有更改(每个分片都包含集合文档的子集)都属于相同的 Kafka 主题。
您可以根据需要将 Kafka 设置为自动创建主题。https://kafka.apache.org/documentation.html#basic_ops_add_topic如果没有,则必须使用 Kafka 管理工具在启动连接器前创建主题。
4.2.7. Debezium MongoDB 连接器的事件键控制主题分区的方式
MongoDB 连接器不会明确决定如何为事件分区主题。相反,它允许 Kafka 根据事件密钥决定如何对主题进行分区。您可以通过在 Kafka Connect worker 配置中定义 Partitioner
实现来更改 Kafka 的分区逻辑。
Kafka 仅在写入单个主题分区的事件维护总顺序。按键对事件进行分区意味着所有具有相同键的事件始终都进入同一分区。这样可确保特定文档的所有事件始终被完全排序。
4.2.8. Debezium MongoDB 连接器生成的事件代表事务边界
Debezium 可以生成代表事务元数据边界的事件,并增强更改数据事件消息。
Debezium 仅在部署连接器后为发生的事务注册并接收元数据。部署连接器前发生的事务元数据。
对于每个事务 BEGIN
和 END
,Debezium 生成一个包含以下字段的事件:
status
-
BEGIN
或END
id
- 唯一事务标识符的字符串表示。
event_count
(用于END
事件)- 事务发出的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
对,它提供了由来自给定数据收集的更改所发出的事件数量。
以下示例显示了典型的信息:
{ "status": "BEGIN", "id": "1462833718356672513", "event_count": null, "data_collections": null } { "status": "END", "id": "1462833718356672513", "event_count": 2, "data_collections": [ { "data_collection": "rs0.testDB.collectiona", "event_count": 1 }, { "data_collection": "rs0.testDB.collectionb", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则事务事件将写入名为 <topic. prefix>
。
.transaction
的主题
更改数据事件增强
如果启用了事务元数据,数据消息 Envelope
会增加一个新的 transaction
字段。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串表示。
total_order
- 事件在事务生成的所有事件的绝对路径。
data_collection_order
- 事件在事务发送的所有事件中每个数据收集位置。
以下是信息类似如下的示例:
{ "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "1462833718356672513", "total_order": "1", "data_collection_order": "1" } }