5.2. Debezium MongoDB 连接器的工作方式
连接器支持的 MongoDB 拓扑概述可用于规划应用程序。
配置和部署 MongoDB 连接器时,它首先连接到 seed 地址中的 MongoDB 服务器,并确定每个可用副本集的详情。由于每个副本集都有自己的独立 oplog,因此连接器将尝试为每个副本集使用单独的任务。连接器可以限制其使用的最大任务数量,如果没有足够的任务可用,则连接器会将多个副本集分配给每个任务,尽管该任务仍会为每个副本集使用单独的线程。
当针对分片集群运行连接器时,请使用大于副本集的 tasks.max
值。这将允许连接器为每个副本集创建一个任务,并可让 Kafka Connect 协调、分发和管理所有可用 worker 进程中的任务。
以下主题详细介绍了 Debezium MongoDB 连接器的工作方式:
- 第 5.2.1 节 “Debezium 连接器支持的 MongoDB 拓扑”
- 第 5.2.2 节 “Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称”
- 第 5.2.3 节 “Debezium MongoDB 连接器如何执行快照”
- 第 5.2.4 节 “临时快照”
- 第 5.2.5 节 “增量快照”
- 第 5.2.6 节 “Debezium MongoDB 连接器流更改事件记录”
- 第 5.2.8 节 “接收 Debezium MongoDB 更改事件记录的默认 Kafka 主题名称”
- 第 5.2.9 节 “事件密钥控制 Debezium MongoDB 连接器的主题分区”
- 第 5.2.10 节 “Debezium MongoDB 连接器生成的事件代表事务边界”
5.2.1. Debezium 连接器支持的 MongoDB 拓扑
MongoDB 连接器支持以下 MongoDB 拓扑:
- MongoDB 副本集
Debezium MongoDB 连接器可以从单个 MongoDB 副本集 捕获更改。生产副本集 至少需要三个成员。
要将 MongoDB 连接器与副本集搭配使用,您必须将连接器配置中的
mongodb.connection.string
属性的值设置为 副本集连接字符串。当连接器准备好从 MongoDB 更改流开始捕获更改时,它会启动一个连接任务。然后,连接任务使用指定的连接字符串来建立与可用副本集成员的连接。
由于连接器管理数据库连接的方式的变化,此 Debezium 发行版本不再支持使用 mongodb.members.auto.discover
属性,以防止连接器执行成员资格发现。
- MongoDB 分片集群
MongoDB 分片的集群 包括:
- 一个或多个 分片,每个分片都部署为副本集;
- 充当 集群配置服务器的单独副本集
客户端需要连接到的一个或多个 routers (也称为
mongos
)。它们会将请求路由到相关的分片。要将 MongoDB 连接器与分片集群搭配使用,在连接器配置中,将
mongodb.connection.string
属性的值设置为 分片集群连接字符串。
mongodb.connection.string
属性替换了已弃用的 mongodb.hosts
属性,用于为连接器提供 配置服务器副本的主机地址。在当前发行版本中,使用 mongodb.connection.string
为连接器提供 MongoDB 路由器的地址,也称为 mongos
。
当连接器连接到分片集群时,它会发现有关代表集群中分片的每个副本集的信息。连接器使用单独的任务来捕获每个分片的更改。当从集群中添加或删除分片时,连接器会动态调整任务数量,以补偿更改。
- MongoDB 独立服务器
- MongoDB 连接器无法监控独立 MongoDB 服务器的更改,因为独立服务器没有 oplog。如果单机服务器转换为一个带有成员的副本集,则连接器可以正常工作。
MongoDB 不建议在生产环境中运行独立服务器。如需更多信息,请参阅 MongoDB 文档。
5.2.2. Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称
连接器配置属性 topic.prefix
充当 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称: 作为所有主题名称的前缀,并在记录每个副本集的更改流位置时作为唯一标识符。
您应该为每个 MongoDB 连接器分配一个唯一的逻辑名称,以有意义的描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头,以及字母数字字符或下划线的剩余字符。
5.2.3. Debezium MongoDB 连接器如何执行快照
当 Debezium 任务开始使用副本集时,它使用连接器的逻辑名称和副本集名称来查找一个 偏移 信息,该偏移之前停止读取更改的位置。如果找到偏移并且仍然存在于 oplog 中,则任务会立即进行 流传输更改,从记录的偏移位置开始。
但是,如果没有找到偏移,或者 oplog 不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。这个过程首先记录 oplog 的当前位置,并记录为偏移(以及表示快照已启动的标记)。然后,该任务会继续复制每个集合,尽可能生成多个线程(最多 snapshot.max.threads
配置属性值)来并行执行此功能。连接器为其看到的每个文档记录一个单独的 读取事件。每个读取事件都包含对象的标识符、对象的完整状态和有关找到对象的 MongoDB 副本集 的源 信息。源信息还包括一个标记,表示该事件是在快照期间生成的。
此快照将继续,直到它复制了与连接器的过滤器匹配的所有集合。如果在任务快照完成前停止连接器,重启连接器会在重启连接器再次开始快照。
在连接器执行任何副本集的快照时,尝试避免任务重新分配和重新配置。连接器生成日志消息来报告快照的进度。要提供最大的控制,请为每个连接器运行单独的 Kafka Connect 集群。
您可以在以下部分找到有关快照的更多信息:
5.2.4. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,在这个初始快照后,连接器不会重复快照过程。连接器捕获的任何更改事件数据都只通过流处理。
然而,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供重新捕获集合数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:
- 连接器配置会被修改来捕获不同的集合集。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时快照来为之前捕获的集合重新运行快照。临时快照需要使用 信号集合。您可以通过向 Debezium 信号集合发送信号请求来发起临时快照。
当您启动现有集合的临时快照时,连接器会将内容附加到已用于集合的主题。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。
临时快照信号指定要包含在快照中的集合。快照可以捕获整个数据库的内容,或者仅捕获数据库中集合的子集。另外,快照也可以捕获数据库中集合内容的子集。
您可以通过将 execute-snapshot
消息发送到信号集合来指定要捕获的集合。将 execute-snapshot
信号类型设置为 增量
,并提供快照中包含的集合名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| N/A |
包含与要快照的集合的完全限定域名匹配的正则表达式的数组。 |
| N/A | 可选字符串,根据集合的列指定条件,用于捕获集合内容的子集。 |
| N/A | 可选字符串,指定连接器在快照过程中用作集合的主键的列名称。 |
触发临时快照
您可以通过向信号集合添加 execute-snapshot
信号类型的条目来发起临时快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主密钥值,并使用这些值作为每个集合的开始和结束点。根据集合中的条目数量以及配置的块大小,Debezium 会将集合划分为块,并一次性执行每个块的快照。
5.2.5. 增量快照
为了提供管理快照的灵活性,Debezium 包含附加快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,除了一次捕获数据库的完整状态,就像初始快照一样,Debebe 会在一系列可配置的块中捕获每个集合。您可以指定您希望快照捕获 的集合以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1024 行。
当增量快照进行时,Debebe 使用 watermarks 跟踪其进度,维护它捕获的每个集合行的记录。与标准初始快照过程相比,捕获数据的阶段方法具有以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是在快照完成前进行后流。连接器会在快照过程中从更改日志中捕获接近实时事件,且操作都不会阻止其他操作。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间点开始,而不是从开始获取集合。
-
您可以随时根据需要运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将集合添加到其
collection.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 会按主密钥对每个集合进行排序,然后根据 配置的块大小 将集合分成块。然后,根据块工作块,它会捕获块中的每个集合行。对于它捕获的每行,快照会发出 READ
事件。该事件代表块的快照开始时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,可能会修改集合记录。为了反映此类更改,INSERT
、UPDATE
或 DELETE
操作会按照常常提交到事务日志。同样,持续 Debezium 流进程将继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥在记录间解决冲突
在某些情况下,streaming 进程发出的 UPDATE
或 DELETE
事件会停止序列。也就是说,流流过程可能会发出一个修改集合行的事件,该事件捕获包含该行的 READ
事件的块。当快照最终为行发出对应的 READ
事件时,其值已被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debebe 使用缓冲方案来解析冲突。仅在快照事件和流化事件之间发生冲突后,De Debezium 会将事件记录发送到 Kafka。
快照窗口
为了帮助解决修改同一集合行的过期事件和流化事件之间的冲突,Debebe 会使用一个所谓的 快照窗口。快照窗口分解了增量快照捕获指定集合块数据的间隔。在块的快照窗口打开前,Debebe 会使用其常见行为,并将事件从事务日志直接下游发送到目标 Kafka 主题。但从特定块的快照打开后,直到关闭为止,De-duplication 步骤会在具有相同主密钥的事件之间解决冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其存储在单个目标 Kafka 主题中。从表直接捕获的快照记录作为 READ
操作发送。同时,当用户继续更新数据收集中的记录,并且会更新事务日志来反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会向内存缓冲区提供快照记录。在快照窗口期间,缓冲区中 READ
事件的主密钥与传入流事件的主键进行比较。如果没有找到匹配项,则流化事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流化记录写入目标主题,因为流的事件逻辑地取代静态快照事件。在块关闭的快照窗口后,缓冲区仅包含 READ
事件,这些事件不存在相关的事务日志事件。Debezium 将这些剩余的 READ
事件发送到集合的 Kafka 主题。
连接器为每个快照块重复这个过程。
增量快照需要预先排序主密钥。但是,字符串
不能保证稳定的排序作为编码,特殊字符可能会导致意外行为(Mongo sort String
)。在执行增量快照时,请考虑将其他类型的用于主键。
分片集群的增量快照是 Debezium MongoDB 连接器的技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围。
要将增量快照与分片 MongoDB 集群搭配使用,您必须为以下属性设置特定值:
-
将
mongodb.connection.mode
设置为sharded
。 -
将
incremental.snapshot.chunk.size
设置为一个足够大的值,以便满足更改流管道 的复杂性。
5.2.5.1. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号集合发送临时快照 信号。
您可以使用 MongoDB insert ()
方法向信号提交信号。
在 Debezium 检测到信号集合中的更改后,它会读取信号并运行请求的快照操作。
您提交的查询指定要包含在快照中的集合,并可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值 incremental
。
要指定快照中包含的集合,提供一个 data-collections
数组,它列出用于匹配集合的集合或用于匹配集合的正则表达式数组,例如{"data-collections": ["public.Collection1", "public.Collection2"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不会执行快照。
如果要包含在快照中的集合名称在数据库、模式或表的名称中包含句点(.
),以将集合添加到 data-collections
数组中,您必须使用双引号转义名称的每个部分。
例如,要包含存在于 公共
数据库中的数据收集,其名称为 My.Collection
,请使用以下格式:" public"."My.Collection"
。
前提条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道来触发增量快照
在信号集合中插入快照信号文档:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});
例如,
db.debeziumSignal.insert({ 1 "type" : "execute-snapshot", 2 3 "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4 "type": "incremental"} 5 });
命令中的
id
、type
和data
参数的值对应于 信号集合的字段。下表描述了示例中的参数:
表 5.2. MongoDB insert ()命令中的字段的描述,用于将增量快照信号发送到信号集合 项 值 描述 1
db.debeziumSignal
指定源数据库上信号集合的完全限定名称。
2
null
_id
参数指定作为信号请求的id
标识符分配的任意字符串。
上例中的 insert 方法省略了可选的_id
参数。由于文档没有明确为该参数分配值,因此 MongoDB 自动分配给文档的任意 id 将成为信号请求的id
标识符。
使用此字符串识别信号集合中条目的日志记录消息。Debezium 不使用此标识符字符串。相反,Debebe 会在快照期间生成自己的id
字符串作为水位线信号。3
execute-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定集合名称或正则表达式数组,以匹配快照中包含的集合名称。
数组列出了按照完全限定名称匹配集合的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号集合的名称相同。5
incremental
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效的选项是默认值incremental
。
如果没有指定值,连接器将运行增量快照。
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件消息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
5.2.5.2. 使用 Kafka 信号频道来触发增量快照
您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 execute-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
以逗号分隔的正则表达式数组,与快照中包含的表的完全限定域名匹配。 |
| N/A | 可选字符串,指定连接器评估为指定要包含在快照中的列子集的条件。 |
execute-snapshot Kafka 消息示例:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
带有额外条件的临时增量快照
Debezium 使用 additional-condition
字段来选择集合内容的子集。
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM <tableName> ….
当快照请求包含 additional-condition
时,extra-condition
会附加到 SQL 查询中,例如:
SELECT * FROM <tableName> WHERE <additional-condition> ….
例如,如果一个带有列的 id
(主键)、颜色
和品牌
的产品
集合,如果您希望快照只包含 color='blue'
的内容,当您请求快照时,您可以附加一个 additional-condition
语句来过滤内容:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`
您可以使用 additional-condition
语句根据多个列传递条件。例如,使用与上例中的相同 产品
集合,如果您希望快照只包含来自用于 color='blue'
、和 brand='MyBrand'
的产品集合中的内容,您可以发送以下请求:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`
5.2.5.3. 停止增量快照
您还可以通过向源数据库上的集合发送信号来停止增量快照。您可以通过将文档插入到信号集合中提交停止快照信号。在 Debezium 检测到信号集合中的更改后,它会读取信号,并在正在进行时停止增量快照操作。
您提交的查询指定 增量
的快照操作,以及要删除的当前运行快照的集合。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道停止增量快照
在信号集合中插入停止快照信号文档:
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});
例如,
db.debeziumSignal.insert({ 1 "type" : "stop-snapshot", 2 3 "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4 "type": "incremental"} 5 });
signal 命令中的
id
、type
和data
参数的值对应于 信号集合的字段。下表描述了示例中的参数:
表 5.4. 插入命令中的字段描述,用于将停止增量快照文档发送到信号集合 项 值 描述 1
db.debeziumSignal
指定源数据库上信号集合的完全限定名称。
2
null
上例中的 insert 方法省略了可选的
_id
参数。由于文档没有明确为该参数分配值,因此 MongoDB 自动分配给文档的任意 id 将成为信号请求的id
标识符。
使用此字符串识别信号集合中条目的日志记录消息。Debezium 不使用此标识符字符串。3
stop-snapshot
type
参数指定信号旨在触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定集合名称或正则表达式数组,以匹配要从快照中删除的集合名称。
数组列出了按照完全限定名称匹配集合的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号集合的名称相同。如果省略了data
字段的这一组件,信号将停止正在进行的整个增量快照。5
incremental
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效的选项是增量的
。
如果没有指定类型
值,信号将无法停止增量快照。
5.2.5.4. 使用 Kafka 信号频道停止增量快照
您可以将信号消息发送到 配置的 Kafka 信号主题,以停止临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 stop-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
可选数组,以逗号分隔的正则表达式,与表的完全限定域名匹配,以包含在快照中。 |
以下示例显示了典型的 stop-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
5.2.6. Debezium MongoDB 连接器流更改事件记录
在副本集的连接器任务记录偏移后,它使用偏移来确定应该开始流更改的 oplog 中的位置。然后,任务(取决于配置)连接到副本集的主节点,或连接到副本集范围更改流,并开始从该位置流更改。它处理所有创建、插入和删除操作,并将其转换为 Debezium 更改事件。每个更改事件都包含找到操作的 oplog 中的位置,连接器会定期将其记录为最新的偏移。记录偏移的时间间隔由 offset.flush.interval.ms
进行管理,它是一个 Kafka Connect worker 配置属性。
当连接器被安全停止时,处理最后一个偏移会被记录,以便在重启后,连接器将继续保持关闭的位置。如果连接器的任务意外终止,则任务可能会在最后一次记录偏移后处理和生成事件,但在记录最后一个偏移前;重启时,连接器从最后 记录的 偏移开始,可能会生成之前在崩溃前生成的一些相同事件。
当 Kafka 管道中的所有组件都正常运行时,Kafka 用户会 准确接收每个消息一次。但是,当出现问题时,Kafka 只能保证消费者 至少接收每个消息一次。为避免意外结果,使用者必须能够处理重复的消息。
如前文所述,连接器任务始终使用副本集的主节点从 oplog 中流更改,确保连接器尽可能看到最新的操作,并可以捕获比使用第二个aries 更低的延迟的变化。当副本集选择新主节点时,连接器会立即停止流更改,连接到新主节点,并在同一位置开始从新主节点流更改。同样,如果连接器遇到与副本集成员通信的问题,它会尝试使用 exponential backoff 来重新连接,因此不会大量副本集,并在连接后继续从最后一个离开的位置继续流更改。这样,连接器可以动态地调整副本集成员资格中的更改,并自动处理通信失败。
总之,MongoDB 连接器在大多数情况下继续运行。通信问题可能会导致连接器等待问题解决。
5.2.7. MongoDB 支持填充 Debezium 更改事件中的 before
字段
在 MongoDB 6.0 及更高版本中,您可以配置更改流来发出文档的预镜像状态,以填充 MongoDB 更改事件的 before
字段。要在 MongoDB 中启用预镜像,您必须使用 db.createCollection()
, create
, 或 collMod
为集合设置 changeStreamPreAndPostImages
。要启用 Debezium MongoDB 在更改事件中包含预镜像,请将连接器的 capture.mode
设置为其中一个 *_with_pre_image
选项。
MongoDB 更改流事件的大小限制为 16MB。因此,使用预镜像会增加超过这个阈值的可能性,这可能会导致失败。有关如何避免超过更改流限制的详情,请参考 MongoDB 文档。
5.2.8. 接收 Debezium MongoDB 更改事件记录的默认 Kafka 主题名称
MongoDB 连接器将所有插入、更新和删除操作的事件写入每个集合中的文档到单个 Kafka 主题。Kafka 主题的名称始终使用 logicalName.databaseName.collectionName 格式,其中 logicalName 是连接器的逻辑名称(使用 topic.prefix
配置属性指定),databaseName 是创建发生在的数据库的名称,collectionName 是受影响的文档所在的 MongoDB 集合的名称。
例如,假设一个 MongoDB 副本集有一个 inventory
数据库,其中包含四个集合:products
, products_on_hand
, customers
, 和 orders
。如果监控这个数据库的连接器有一个逻辑名称 fulfillment
,则这个连接器会在这四个 Kafka 主题上生成事件:
-
fulfillment.inventory.products
-
fulfillment.inventory.products_on_hand
-
fulfillment.inventory.customers
-
fulfillment.inventory.orders
请注意,主题名称不包含副本集名称或分片名称。因此,对分片集合(每个分片都包含集合文档的子集)的所有更改都会进入相同的 Kafka 主题。
您可以根据需要将 Kafka 设置为自动创建主题。如果没有,则必须在启动连接器前使用 Kafka 管理工具创建主题。
5.2.9. 事件密钥控制 Debezium MongoDB 连接器的主题分区
MongoDB 连接器不会明确确定如何为事件分区主题。相反,它允许 Kafka 如何根据事件密钥确定分区主题。您可以通过在 Kafka Connect worker 配置中定义分区 器
实现的名称来更改 Kafka 的分区逻辑。
Kafka 只为写入单个主题分区的事件维护总顺序。按键对事件进行分区意味着,具有相同键的所有事件始终都到达同一分区。这样可确保特定文档的所有事件始终被完全排序。
5.2.10. Debezium MongoDB 连接器生成的事件代表事务边界
Debezium 可以生成代表事务元数据边界的事件,并增强更改数据事件消息。
Debezium 注册并只针对部署连接器后发生的事务接收元数据。部署连接器前发生的事务元数据不可用。
对于每个事务 BEGIN
和 END
,Debezium 会生成一个包含以下字段的事件:
status
-
BEGIN
或END
id
- 唯一事务标识符的字符串。
event_count
(用于END
事件)- 事务发出的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
的数组,它通过源自给定数据收集的更改来提供事件数量。
以下示例显示了一个典型的信息:
{ "status": "BEGIN", "id": "1462833718356672513", "event_count": null, "data_collections": null } { "status": "END", "id": "1462833718356672513", "event_count": 2, "data_collections": [ { "data_collection": "rs0.testDB.collectiona", "event_count": 1 }, { "data_collection": "rs0.testDB.collectionb", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则事务事件将写入名为 <topic. prefix>
。
.transaction
的主题
更改数据事件增强
如果启用了事务元数据,数据消息 Envelope
会增加一个新的 transaction
字段。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串。
total_order
- 事件在事务生成的所有事件中绝对位置。
data_collection_order
- 在事务发出的所有事件间,按数据收集位置。
下面是一个信息示例:
{ "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "1462833718356672513", "total_order": "1", "data_collection_order": "1" } }