5.2. Debezium MongoDB 连接器的工作方式
连接器支持的 MongoDB 拓扑概述可用于规划应用程序。
配置和部署 MongoDB 连接器时,它首先连接到 seed 地址中的 MongoDB 服务器,并确定每个可用副本集的详情。由于每个副本集都有自己的独立 oplog,因此连接器将尝试为每个副本集使用单独的任务。连接器可以限制其使用的最大任务数量,如果没有足够的任务可用,则连接器会将多个副本集分配给每个任务,尽管该任务仍会为每个副本集使用单独的线程。
当针对分片集群运行连接器时,请使用大于副本集的 tasks.max
值。这将允许连接器为每个副本集创建一个任务,并可让 Kafka Connect 协调、分发和管理所有可用 worker 进程中的任务。
以下主题详细介绍了 Debezium MongoDB 连接器的工作方式:
- 第 5.2.1 节 “Debezium 连接器支持的 MongoDB 拓扑”
- 第 5.2.2 节 “Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称”
- 第 5.2.3 节 “Debezium MongoDB 连接器如何执行快照”
- 第 5.2.4 节 “临时快照”
- 第 5.2.5 节 “增量快照”
- 第 5.2.6 节 “Debezium MongoDB 连接器流更改事件记录”
- 第 5.2.8 节 “接收 Debezium MongoDB 更改事件记录的默认 Kafka 主题名称”
- 第 5.2.9 节 “事件密钥控制 Debezium MongoDB 连接器的主题分区”
- 第 5.2.10 节 “Debezium MongoDB 连接器生成的事件代表事务边界”
5.2.1. Debezium 连接器支持的 MongoDB 拓扑 复制链接链接已复制到粘贴板!
MongoDB 连接器支持以下 MongoDB 拓扑:
- MongoDB 副本集
Debezium MongoDB 连接器可以从单个 MongoDB 副本集 捕获更改。生产副本集 至少需要三个成员。
要将 MongoDB 连接器与副本集搭配使用,您必须将连接器配置中的
mongodb.connection.string
属性的值设置为 副本集连接字符串。当连接器准备好从 MongoDB 更改流开始捕获更改时,它会启动一个连接任务。然后,连接任务使用指定的连接字符串来建立与可用副本集成员的连接。
由于连接器管理数据库连接的方式的变化,此 Debezium 发行版本不再支持使用 mongodb.members.auto.discover
属性,以防止连接器执行成员资格发现。
- MongoDB 分片集群
MongoDB 分片的集群 包括:
- 一个或多个 分片,每个分片都部署为副本集;
- 充当 集群配置服务器的单独副本集
客户端需要连接到的一个或多个 routers (也称为
mongos
)。它们会将请求路由到相关的分片。要将 MongoDB 连接器与分片集群搭配使用,在连接器配置中,将
mongodb.connection.string
属性的值设置为 分片集群连接字符串。
mongodb.connection.string
属性替换了已弃用的 mongodb.hosts
属性,用于为连接器提供 配置服务器副本的主机地址。在当前发行版本中,使用 mongodb.connection.string
为连接器提供 MongoDB 路由器的地址,也称为 mongos
。
当连接器连接到分片集群时,它会发现有关代表集群中分片的每个副本集的信息。连接器使用单独的任务来捕获每个分片的更改。当从集群中添加或删除分片时,连接器会动态调整任务数量,以补偿更改。
- MongoDB 独立服务器
- MongoDB 连接器无法监控独立 MongoDB 服务器的更改,因为独立服务器没有 oplog。如果单机服务器转换为一个带有成员的副本集,则连接器可以正常工作。
MongoDB 不建议在生产环境中运行独立服务器。如需更多信息,请参阅 MongoDB 文档。
5.2.2. Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称 复制链接链接已复制到粘贴板!
连接器配置属性 topic.prefix
充当 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称: 作为所有主题名称的前缀,并在记录每个副本集的更改流位置时作为唯一标识符。
您应该为每个 MongoDB 连接器分配一个唯一的逻辑名称,以有意义的描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头,以及字母数字字符或下划线的剩余字符。
5.2.3. Debezium MongoDB 连接器如何执行快照 复制链接链接已复制到粘贴板!
当 Debezium 任务开始使用副本集时,它使用连接器的逻辑名称和副本集名称来查找一个 偏移 信息,该偏移之前停止读取更改的位置。如果找到偏移并且仍然存在于 oplog 中,则任务会立即进行 流传输更改,从记录的偏移位置开始。
但是,如果没有找到偏移,或者 oplog 不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。这个过程首先记录 oplog 的当前位置,并记录为偏移(以及表示快照已启动的标记)。然后,该任务会继续复制每个集合,尽可能生成多个线程(最多 snapshot.max.threads
配置属性值)来并行执行此功能。连接器为其看到的每个文档记录一个单独的 读取事件。每个读取事件都包含对象的标识符、对象的完整状态和有关找到对象的 MongoDB 副本集 的源 信息。源信息还包括一个标记,表示该事件是在快照期间生成的。
此快照将继续,直到它复制了与连接器的过滤器匹配的所有集合。如果在任务快照完成前停止连接器,重启连接器会在重启连接器再次开始快照。
在连接器执行任何副本集的快照时,尝试避免任务重新分配和重新配置。连接器生成日志消息来报告快照的进度。要提供最大的控制,请为每个连接器运行单独的 Kafka Connect 集群。
您可以在以下部分找到有关快照的更多信息:
5.2.4. 临时快照 复制链接链接已复制到粘贴板!
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,在这个初始快照后,连接器不会重复快照过程。连接器捕获的任何更改事件数据都只通过流处理。
然而,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供重新捕获集合数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:
- 连接器配置会被修改来捕获不同的集合集。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时快照来为之前捕获的集合重新运行快照。临时快照需要使用 信号集合。您可以通过向 Debezium 信号集合发送信号请求来发起临时快照。
当您启动现有集合的临时快照时,连接器会将内容附加到已用于集合的主题。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。
临时快照信号指定要包含在快照中的集合。快照可以捕获整个数据库的内容,或者仅捕获数据库中集合的子集。另外,快照也可以捕获数据库中集合内容的子集。
您可以通过将 execute-snapshot
消息发送到信号集合来指定要捕获的集合。将 execute-snapshot
信号类型设置为 增量
,并提供快照中包含的集合名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| N/A |
包含与要快照的集合的完全限定域名匹配的正则表达式的数组。 |
| N/A | 可选字符串,根据集合的列指定条件,用于捕获集合内容的子集。 |
| N/A | 可选字符串,指定连接器在快照过程中用作集合的主键的列名称。 |
触发临时快照
您可以通过向信号集合添加 execute-snapshot
信号类型的条目来发起临时快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主密钥值,并使用这些值作为每个集合的开始和结束点。根据集合中的条目数量以及配置的块大小,Debezium 会将集合划分为块,并一次性执行每个块的快照。
5.2.5. 增量快照 复制链接链接已复制到粘贴板!
为了提供管理快照的灵活性,Debezium 包含附加快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,除了一次捕获数据库的完整状态,就像初始快照一样,Debebe 会在一系列可配置的块中捕获每个集合。您可以指定您希望快照捕获 的集合以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1024 行。
当增量快照进行时,Debebe 使用 watermarks 跟踪其进度,维护它捕获的每个集合行的记录。与标准初始快照过程相比,捕获数据的阶段方法具有以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是在快照完成前进行后流。连接器会在快照过程中从更改日志中捕获接近实时事件,且操作都不会阻止其他操作。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间点开始,而不是从开始获取集合。
-
您可以随时根据需要运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将集合添加到其
collection.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 会按主密钥对每个集合进行排序,然后根据 配置的块大小 将集合分成块。然后,根据块工作块,它会捕获块中的每个集合行。对于它捕获的每行,快照会发出 READ
事件。该事件代表块的快照开始时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,可能会修改集合记录。为了反映此类更改,INSERT
、UPDATE
或 DELETE
操作会按照常常提交到事务日志。同样,持续 Debezium 流进程将继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥在记录间解决冲突
在某些情况下,streaming 进程发出的 UPDATE
或 DELETE
事件会停止序列。也就是说,流流过程可能会发出一个修改集合行的事件,该事件捕获包含该行的 READ
事件的块。当快照最终为行发出对应的 READ
事件时,其值已被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debebe 使用缓冲方案来解析冲突。仅在快照事件和流化事件之间发生冲突后,De Debezium 会将事件记录发送到 Kafka。
快照窗口
为了帮助解决修改同一集合行的过期事件和流化事件之间的冲突,Debebe 会使用一个所谓的 快照窗口。快照窗口分解了增量快照捕获指定集合块数据的间隔。在块的快照窗口打开前,Debebe 会使用其常见行为,并将事件从事务日志直接下游发送到目标 Kafka 主题。但从特定块的快照打开后,直到关闭为止,De-duplication 步骤会在具有相同主密钥的事件之间解决冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其存储在单个目标 Kafka 主题中。从表直接捕获的快照记录作为 READ
操作发送。同时,当用户继续更新数据收集中的记录,并且会更新事务日志来反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会向内存缓冲区提供快照记录。在快照窗口期间,缓冲区中 READ
事件的主密钥与传入流事件的主键进行比较。如果没有找到匹配项,则流化事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流化记录写入目标主题,因为流的事件逻辑地取代静态快照事件。在块关闭的快照窗口后,缓冲区仅包含 READ
事件,这些事件不存在相关的事务日志事件。Debezium 将这些剩余的 READ
事件发送到集合的 Kafka 主题。
连接器为每个快照块重复这个过程。
增量快照需要预先排序主密钥。但是,字符串
不能保证稳定的排序作为编码,特殊字符可能会导致意外行为(Mongo sort String
)。在执行增量快照时,请考虑将其他类型的用于主键。
分片集群的增量快照是 Debezium MongoDB 连接器的技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围。
要将增量快照与分片 MongoDB 集群搭配使用,您必须为以下属性设置特定值:
-
将
mongodb.connection.mode
设置为sharded
。 -
将
incremental.snapshot.chunk.size
设置为一个足够大的值,以便满足更改流管道 的复杂性。
5.2.5.1. 触发增量快照 复制链接链接已复制到粘贴板!
目前,启动增量快照的唯一方法是向源数据库上的 信号集合发送临时快照 信号。
您可以使用 MongoDB insert ()
方法向信号提交信号。
在 Debezium 检测到信号集合中的更改后,它会读取信号并运行请求的快照操作。
您提交的查询指定要包含在快照中的集合,并可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值 incremental
。
要指定快照中包含的集合,提供一个 data-collections
数组,它列出用于匹配集合的集合或用于匹配集合的正则表达式数组,例如{"data-collections": ["public.Collection1", "public.Collection2"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不会执行快照。
如果要包含在快照中的集合名称在数据库、模式或表的名称中包含句点(.
),以将集合添加到 data-collections
数组中,您必须使用双引号转义名称的每个部分。
例如,要包含存在于 公共
数据库中的数据收集,其名称为 My.Collection
,请使用以下格式:" public"."My.Collection"
。
前提条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道来触发增量快照
在信号集合中插入快照信号文档:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 命令中的
id
、type
和data
参数的值对应于 信号集合的字段。下表描述了示例中的参数:
Expand 表 5.2. MongoDB insert ()命令中的字段的描述,用于将增量快照信号发送到信号集合 项 值 描述 1
db.debeziumSignal
指定源数据库上信号集合的完全限定名称。
2
null
_id
参数指定作为信号请求的id
标识符分配的任意字符串。
上例中的 insert 方法省略了可选的_id
参数。由于文档没有明确为该参数分配值,因此 MongoDB 自动分配给文档的任意 id 将成为信号请求的id
标识符。
使用此字符串识别信号集合中条目的日志记录消息。Debezium 不使用此标识符字符串。相反,Debebe 会在快照期间生成自己的id
字符串作为水位线信号。3
execute-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定集合名称或正则表达式数组,以匹配快照中包含的集合名称。
数组列出了按照完全限定名称匹配集合的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号集合的名称相同。5
incremental
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效的选项是默认值incremental
。
如果没有指定值,连接器将运行增量快照。
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件消息
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
5.2.5.2. 使用 Kafka 信号频道来触发增量快照 复制链接链接已复制到粘贴板!
您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 execute-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
以逗号分隔的正则表达式数组,与快照中包含的表的完全限定域名匹配。 |
| N/A | 可选字符串,指定连接器评估为指定要包含在快照中的列子集的条件。 |
execute-snapshot Kafka 消息示例:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
带有额外条件的临时增量快照
Debezium 使用 additional-condition
字段来选择集合内容的子集。
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM <tableName> ….
当快照请求包含 additional-condition
时,extra-condition
会附加到 SQL 查询中,例如:
SELECT * FROM <tableName> WHERE <additional-condition> ….
例如,如果一个带有列的 id
(主键)、颜色
和品牌
的产品
集合,如果您希望快照只包含 color='blue'
的内容,当您请求快照时,您可以附加一个 additional-condition
语句来过滤内容:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`
您可以使用 additional-condition
语句根据多个列传递条件。例如,使用与上例中的相同 产品
集合,如果您希望快照只包含来自用于 color='blue'
、和 brand='MyBrand'
的产品集合中的内容,您可以发送以下请求:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`
5.2.5.3. 停止增量快照 复制链接链接已复制到粘贴板!
您还可以通过向源数据库上的集合发送信号来停止增量快照。您可以通过将文档插入到信号集合中提交停止快照信号。在 Debezium 检测到信号集合中的更改后,它会读取信号,并在正在进行时停止增量快照操作。
您提交的查询指定 增量
的快照操作,以及要删除的当前运行快照的集合。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道停止增量快照
在信号集合中插入停止快照信号文档:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
Copy to Clipboard Copied! Toggle word wrap Toggle overflow signal 命令中的
id
、type
和data
参数的值对应于 信号集合的字段。下表描述了示例中的参数:
Expand 表 5.4. 插入命令中的字段描述,用于将停止增量快照文档发送到信号集合 项 值 描述 1
db.debeziumSignal
指定源数据库上信号集合的完全限定名称。
2
null
上例中的 insert 方法省略了可选的
_id
参数。由于文档没有明确为该参数分配值,因此 MongoDB 自动分配给文档的任意 id 将成为信号请求的id
标识符。
使用此字符串识别信号集合中条目的日志记录消息。Debezium 不使用此标识符字符串。3
stop-snapshot
type
参数指定信号旨在触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定集合名称或正则表达式数组,以匹配要从快照中删除的集合名称。
数组列出了按照完全限定名称匹配集合的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号集合的名称相同。如果省略了data
字段的这一组件,信号将停止正在进行的整个增量快照。5
incremental
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效的选项是增量的
。
如果没有指定类型
值,信号将无法停止增量快照。
5.2.5.4. 使用 Kafka 信号频道停止增量快照 复制链接链接已复制到粘贴板!
您可以将信号消息发送到 配置的 Kafka 信号主题,以停止临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 stop-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
可选数组,以逗号分隔的正则表达式,与表的完全限定域名匹配,以包含在快照中。 |
以下示例显示了典型的 stop-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
5.2.6. Debezium MongoDB 连接器流更改事件记录 复制链接链接已复制到粘贴板!
在副本集的连接器任务记录偏移后,它使用偏移来确定应该开始流更改的 oplog 中的位置。然后,任务(取决于配置)连接到副本集的主节点,或连接到副本集范围更改流,并开始从该位置流更改。它处理所有创建、插入和删除操作,并将其转换为 Debezium 更改事件。每个更改事件都包含找到操作的 oplog 中的位置,连接器会定期将其记录为最新的偏移。记录偏移的时间间隔由 offset.flush.interval.ms
进行管理,它是一个 Kafka Connect worker 配置属性。
当连接器被安全停止时,处理最后一个偏移会被记录,以便在重启后,连接器将继续保持关闭的位置。如果连接器的任务意外终止,则任务可能会在最后一次记录偏移后处理和生成事件,但在记录最后一个偏移前;重启时,连接器从最后 记录的 偏移开始,可能会生成之前在崩溃前生成的一些相同事件。
当 Kafka 管道中的所有组件都正常运行时,Kafka 用户会 准确接收每个消息一次。但是,当出现问题时,Kafka 只能保证消费者 至少接收每个消息一次。为避免意外结果,使用者必须能够处理重复的消息。
如前文所述,连接器任务始终使用副本集的主节点从 oplog 中流更改,确保连接器尽可能看到最新的操作,并可以捕获比使用第二个aries 更低的延迟的变化。当副本集选择新主节点时,连接器会立即停止流更改,连接到新主节点,并在同一位置开始从新主节点流更改。同样,如果连接器遇到与副本集成员通信的问题,它会尝试使用 exponential backoff 来重新连接,因此不会大量副本集,并在连接后继续从最后一个离开的位置继续流更改。这样,连接器可以动态地调整副本集成员资格中的更改,并自动处理通信失败。
总之,MongoDB 连接器在大多数情况下继续运行。通信问题可能会导致连接器等待问题解决。
5.2.7. MongoDB 支持填充 Debezium 更改事件中的 before 字段 复制链接链接已复制到粘贴板!
在 MongoDB 6.0 及更高版本中,您可以配置更改流来发出文档的预镜像状态,以填充 MongoDB 更改事件的 before
字段。要在 MongoDB 中启用预镜像,您必须使用 db.createCollection()
, create
, 或 collMod
为集合设置 changeStreamPreAndPostImages
。要启用 Debezium MongoDB 在更改事件中包含预镜像,请将连接器的 capture.mode
设置为其中一个 *_with_pre_image
选项。
MongoDB 更改流事件的大小限制为 16MB。因此,使用预镜像会增加超过这个阈值的可能性,这可能会导致失败。有关如何避免超过更改流限制的详情,请参考 MongoDB 文档。
5.2.8. 接收 Debezium MongoDB 更改事件记录的默认 Kafka 主题名称 复制链接链接已复制到粘贴板!
MongoDB 连接器将所有插入、更新和删除操作的事件写入每个集合中的文档到单个 Kafka 主题。Kafka 主题的名称始终使用 logicalName.databaseName.collectionName 格式,其中 logicalName 是连接器的逻辑名称(使用 topic.prefix
配置属性指定),databaseName 是创建发生在的数据库的名称,collectionName 是受影响的文档所在的 MongoDB 集合的名称。
例如,假设一个 MongoDB 副本集有一个 inventory
数据库,其中包含四个集合:products
, products_on_hand
, customers
, 和 orders
。如果监控这个数据库的连接器有一个逻辑名称 fulfillment
,则这个连接器会在这四个 Kafka 主题上生成事件:
-
fulfillment.inventory.products
-
fulfillment.inventory.products_on_hand
-
fulfillment.inventory.customers
-
fulfillment.inventory.orders
请注意,主题名称不包含副本集名称或分片名称。因此,对分片集合(每个分片都包含集合文档的子集)的所有更改都会进入相同的 Kafka 主题。
您可以根据需要将 Kafka 设置为自动创建主题。如果没有,则必须在启动连接器前使用 Kafka 管理工具创建主题。
5.2.9. 事件密钥控制 Debezium MongoDB 连接器的主题分区 复制链接链接已复制到粘贴板!
MongoDB 连接器不会明确确定如何为事件分区主题。相反,它允许 Kafka 如何根据事件密钥确定分区主题。您可以通过在 Kafka Connect worker 配置中定义分区 器
实现的名称来更改 Kafka 的分区逻辑。
Kafka 只为写入单个主题分区的事件维护总顺序。按键对事件进行分区意味着,具有相同键的所有事件始终都到达同一分区。这样可确保特定文档的所有事件始终被完全排序。
5.2.10. Debezium MongoDB 连接器生成的事件代表事务边界 复制链接链接已复制到粘贴板!
Debezium 可以生成代表事务元数据边界的事件,并增强更改数据事件消息。
Debezium 注册并只针对部署连接器后发生的事务接收元数据。部署连接器前发生的事务元数据不可用。
对于每个事务 BEGIN
和 END
,Debezium 会生成一个包含以下字段的事件:
status
-
BEGIN
或END
id
- 唯一事务标识符的字符串。
event_count
(用于END
事件)- 事务发出的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
的数组,它通过源自给定数据收集的更改来提供事件数量。
以下示例显示了一个典型的信息:
除非通过 topic.transaction
选项覆盖,否则事务事件将写入名为 <topic. prefix>
。
.transaction
的主题
更改数据事件增强
如果启用了事务元数据,数据消息 Envelope
会增加一个新的 transaction
字段。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串。
total_order
- 事件在事务生成的所有事件中绝对位置。
data_collection_order
- 在事务发出的所有事件间,按数据收集位置。
下面是一个信息示例: