搜索

5.2. Debezium MongoDB 连接器的工作方式

download PDF

连接器支持的 MongoDB 拓扑概述可用于规划应用程序。

配置和部署 MongoDB 连接器时,它首先连接到 seed 地址中的 MongoDB 服务器,并确定每个可用副本集的详情。由于每个副本集都有自己的独立 oplog,因此连接器将尝试为每个副本集使用单独的任务。连接器可以限制其使用的最大任务数量,如果没有足够的任务可用,则连接器会将多个副本集分配给每个任务,尽管该任务仍会为每个副本集使用单独的线程。

注意

当针对分片集群运行连接器时,请使用大于副本集的 tasks.max 值。这将允许连接器为每个副本集创建一个任务,并可让 Kafka Connect 协调、分发和管理所有可用 worker 进程中的任务。

以下主题详细介绍了 Debezium MongoDB 连接器的工作方式:

5.2.1. Debezium 连接器支持的 MongoDB 拓扑

MongoDB 连接器支持以下 MongoDB 拓扑:

MongoDB 副本集

Debezium MongoDB 连接器可以从单个 MongoDB 副本集 捕获更改。生产副本集 至少需要三个成员

要将 MongoDB 连接器与副本集搭配使用,您必须将连接器配置中的 mongodb.connection.string 属性的值设置为 副本集连接字符串。当连接器准备好从 MongoDB 更改流开始捕获更改时,它会启动一个连接任务。然后,连接任务使用指定的连接字符串来建立与可用副本集成员的连接。

警告

由于连接器管理数据库连接的方式的变化,此 Debezium 发行版本不再支持使用 mongodb.members.auto.discover 属性,以防止连接器执行成员资格发现。

MongoDB 分片集群

MongoDB 分片的集群 包括:

  • 一个或多个 分片,每个分片都部署为副本集;
  • 充当 集群配置服务器的单独副本集
  • 客户端需要连接到的一个或多个 routers (也称为 mongos)。它们会将请求路由到相关的分片。

    要将 MongoDB 连接器与分片集群搭配使用,在连接器配置中,将 mongodb.connection.string 属性的值设置为 分片集群连接字符串

警告

mongodb.connection.string 属性替换了已弃用的 mongodb.hosts 属性,用于为连接器提供 配置服务器副本的主机地址。在当前发行版本中,使用 mongodb.connection.string 为连接器提供 MongoDB 路由器的地址,也称为 mongos

注意

当连接器连接到分片集群时,它会发现有关代表集群中分片的每个副本集的信息。连接器使用单独的任务来捕获每个分片的更改。当从集群中添加或删除分片时,连接器会动态调整任务数量,以补偿更改。

MongoDB 独立服务器
MongoDB 连接器无法监控独立 MongoDB 服务器的更改,因为独立服务器没有 oplog。如果单机服务器转换为一个带有成员的副本集,则连接器可以正常工作。
注意

MongoDB 不建议在生产环境中运行独立服务器。如需更多信息,请参阅 MongoDB 文档

5.2.2. Debezium MongoDB 连接器如何为副本集和分片集群使用逻辑名称

连接器配置属性 topic.prefix 充当 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称: 作为所有主题名称的前缀,并在记录每个副本集的更改流位置时作为唯一标识符。

您应该为每个 MongoDB 连接器分配一个唯一的逻辑名称,以有意义的描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头,以及字母数字字符或下划线的剩余字符。

5.2.3. Debezium MongoDB 连接器如何执行快照

当 Debezium 任务开始使用副本集时,它使用连接器的逻辑名称和副本集名称来查找一个 偏移 信息,该偏移之前停止读取更改的位置。如果找到偏移并且仍然存在于 oplog 中,则任务会立即进行 流传输更改,从记录的偏移位置开始。

但是,如果没有找到偏移,或者 oplog 不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。这个过程首先记录 oplog 的当前位置,并记录为偏移(以及表示快照已启动的标记)。然后,该任务会继续复制每个集合,尽可能生成多个线程(最多 snapshot.max.threads 配置属性值)来并行执行此功能。连接器为其看到的每个文档记录一个单独的 读取事件。每个读取事件都包含对象的标识符、对象的完整状态和有关找到对象的 MongoDB 副本集 的源 信息。源信息还包括一个标记,表示该事件是在快照期间生成的。

此快照将继续,直到它复制了与连接器的过滤器匹配的所有集合。如果在任务快照完成前停止连接器,重启连接器会在重启连接器再次开始快照。

注意

在连接器执行任何副本集的快照时,尝试避免任务重新分配和重新配置。连接器生成日志消息来报告快照的进度。要提供最大的控制,请为每个连接器运行单独的 Kafka Connect 集群。

您可以在以下部分找到有关快照的更多信息:

5.2.4. 临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,在这个初始快照后,连接器不会重复快照过程。连接器捕获的任何更改事件数据都只通过流处理。

然而,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供重新捕获集合数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:

  • 连接器配置会被修改来捕获不同的集合集。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或某些其他问题导致数据损坏。

您可以通过启动所谓的 临时快照来为之前捕获的集合重新运行快照。临时快照需要使用 信号集合。您可以通过向 Debezium 信号集合发送信号请求来发起临时快照。

当您启动现有集合的临时快照时,连接器会将内容附加到已用于集合的主题。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。

临时快照信号指定要包含在快照中的集合。快照可以捕获整个数据库的内容,或者仅捕获数据库中集合的子集。另外,快照也可以捕获数据库中集合内容的子集。

您可以通过将 execute-snapshot 消息发送到信号集合来指定要捕获的集合。将 execute-snapshot 信号类型设置为 增量,并提供快照中包含的集合名称,如下表所述:

表 5.1. 临时 execute-snapshot 信号记录的示例
字段默认

type

incremental

指定您要运行的快照类型。
设置类型是可选的。目前,您只能请求 增量 快照。

data-collections

N/A

包含与要快照的集合的完全限定域名匹配的正则表达式的数组。
名称的格式与 signal.data.collection 配置选项的格式相同。

additional-condition

N/A

可选字符串,根据集合的列指定条件,用于捕获集合内容的子集。

surrogate-key

N/A

可选字符串,指定连接器在快照过程中用作集合的主键的列名称。

触发临时快照

您可以通过向信号集合添加 execute-snapshot 信号类型的条目来发起临时快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主密钥值,并使用这些值作为每个集合的开始和结束点。根据集合中的条目数量以及配置的块大小,Debezium 会将集合划分为块,并一次性执行每个块的快照。

目前,execute-snapshot 操作类型仅触发 增量快照。如需更多信息,请参阅 增加快照

5.2.5. 增量快照

为了提供管理快照的灵活性,Debezium 包含附加快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号

在增量快照中,除了一次捕获数据库的完整状态,就像初始快照一样,Debebe 会在一系列可配置的块中捕获每个集合。您可以指定您希望快照捕获 的集合以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1024 行。

当增量快照进行时,Debebe 使用 watermarks 跟踪其进度,维护它捕获的每个集合行的记录。与标准初始快照过程相比,捕获数据的阶段方法具有以下优点:

  • 您可以使用流化数据捕获并行运行增量快照,而不是在快照完成前进行后流。连接器会在快照过程中从更改日志中捕获接近实时事件,且操作都不会阻止其他操作。
  • 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的时间点开始,而不是从开始获取集合。
  • 您可以随时根据需要运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将集合添加到其 collection.include.list 属性中。

增量快照过程

当您运行增量快照时,Debezium 会按主密钥对每个集合进行排序,然后根据 配置的块大小 将集合分成块。然后,根据块工作块,它会捕获块中的每个集合行。对于它捕获的每行,快照会发出 READ 事件。该事件代表块的快照开始时的行值。

当快照继续进行时,其他进程可能会继续访问数据库,可能会修改集合记录。为了反映此类更改,INSERTUPDATEDELETE 操作会按照常常提交到事务日志。同样,持续 Debezium 流进程将继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。

Debezium 如何使用相同的主密钥在记录间解决冲突

在某些情况下,streaming 进程发出的 UPDATEDELETE 事件会停止序列。也就是说,流流过程可能会发出一个修改集合行的事件,该事件捕获包含该行的 READ 事件的块。当快照最终为行发出对应的 READ 事件时,其值已被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debebe 使用缓冲方案来解析冲突。仅在快照事件和流化事件之间发生冲突后,De Debezium 会将事件记录发送到 Kafka。

快照窗口

为了帮助解决修改同一集合行的过期事件和流化事件之间的冲突,Debebe 会使用一个所谓的 快照窗口快照窗口分解了增量快照捕获指定集合块数据的间隔。在块的快照窗口打开前,Debebe 会使用其常见行为,并将事件从事务日志直接下游发送到目标 Kafka 主题。但从特定块的快照打开后,直到关闭为止,De-duplication 步骤会在具有相同主密钥的事件之间解决冲突。

对于每个数据收集,Debezium 会发出两种类型的事件,并将其存储在单个目标 Kafka 主题中。从表直接捕获的快照记录作为 READ 操作发送。同时,当用户继续更新数据收集中的记录,并且会更新事务日志来反映每个提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。

当快照窗口打开时,Debezium 开始处理快照块,它会向内存缓冲区提供快照记录。在快照窗口期间,缓冲区中 READ 事件的主密钥与传入流事件的主键进行比较。如果没有找到匹配项,则流化事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ 事件,并将流化记录写入目标主题,因为流的事件逻辑地取代静态快照事件。在块关闭的快照窗口后,缓冲区仅包含 READ 事件,这些事件不存在相关的事务日志事件。Debezium 将这些剩余的 READ 事件发送到集合的 Kafka 主题。

连接器为每个快照块重复这个过程。

警告

增量快照需要预先排序主密钥。但是,字符串 不能保证稳定的排序作为编码,特殊字符可能会导致意外行为(Mongo sort String)。在执行增量快照时,请考虑将其他类型的用于主键。

分片集群的增量快照

分片集群的增量快照是 Debezium MongoDB 连接器的技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围

要将增量快照与分片 MongoDB 集群搭配使用,您必须为以下属性设置特定值:

5.2.5.1. 触发增量快照

目前,启动增量快照的唯一方法是向源数据库上的 信号集合发送临时快照 信号。

您可以使用 MongoDB insert () 方法向信号提交信号。

在 Debezium 检测到信号集合中的更改后,它会读取信号并运行请求的快照操作。

您提交的查询指定要包含在快照中的集合,并可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值 incremental

要指定快照中包含的集合,提供一个 data-collections 数组,它列出用于匹配集合的集合或用于匹配集合的正则表达式数组,例如
{"data-collections": ["public.Collection1", "public.Collection2"]}

增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要任何操作,且不会执行快照。

注意

如果要包含在快照中的集合名称在数据库、模式或表的名称中包含句点(.),以将集合添加到 data-collections 数组中,您必须使用双引号转义名称的每个部分。

例如,要包含存在于 公共 数据库中的数据收集,其名称为 My.Collection,请使用以下格式:" public"."My.Collection"

前提条件

使用源信号频道来触发增量快照

  1. 在信号集合中插入快照信号文档:

    <signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});

    例如,

    db.debeziumSignal.insert({ 1
    "type" : "execute-snapshot", 2 3
    "data" : {
    "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4
    "type": "incremental"} 5
    });

    命令中的 idtypedata 参数的值对应于 信号集合的字段

    下表描述了示例中的参数:

    表 5.2. MongoDB insert ()命令中的字段的描述,用于将增量快照信号发送到信号集合
    描述

    1

    db.debeziumSignal

    指定源数据库上信号集合的完全限定名称。

    2

    null

    _id 参数指定作为信号请求的 id 标识符分配的任意字符串。
    上例中的 insert 方法省略了可选的 _id 参数。由于文档没有明确为该参数分配值,因此 MongoDB 自动分配给文档的任意 id 将成为信号请求的 id 标识符。
    使用此字符串识别信号集合中条目的日志记录消息。Debezium 不使用此标识符字符串。相反,Debebe 会在快照期间生成自己的 id 字符串作为水位线信号。

    3

    execute-snapshot

    指定 type 参数指定信号要触发的操作。

    4

    data-collections

    信号的 data 字段所需的组件,用于指定集合名称或正则表达式数组,以匹配快照中包含的集合名称。
    数组列出了按照完全限定名称匹配集合的正则表达式,其格式与您在 signal.data.collection 配置属性中指定连接器信号集合的名称相同。

    5

    incremental

    信号的 data 字段的可选 类型 组件,用于指定要运行的快照操作类型。
    目前,唯一有效的选项是默认值 incremental
    如果没有指定值,连接器将运行增量快照。

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增加快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

字段名称描述

1

snapshot

指定要运行的快照操作类型。
目前,唯一有效的选项是默认值 incremental
在 SQL 查询中指定 type 值,您提交到信号集合是可选的。
如果没有指定值,连接器将运行增量快照。

2

op

指定事件类型。
快照事件的值是 r,表示 READ 操作。

5.2.5.2. 使用 Kafka 信号频道来触发增量快照

您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

message 的值是带有 typedata 字段的 JSON 对象。

信号类型是 execute-snapshotdata 字段必须具有以下字段:

表 5.3. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前,Debeium 仅支持 增量 类型。
详情请查看下一节。

data-collections

N/A

以逗号分隔的正则表达式数组,与快照中包含的表的完全限定域名匹配。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

additional-condition

N/A

可选字符串,指定连接器评估为指定要包含在快照中的列子集的条件。

execute-snapshot Kafka 消息示例:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

带有额外条件的临时增量快照

Debezium 使用 additional-condition 字段来选择集合内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM <tableName> …​.

当快照请求包含 additional-condition 时,extra-condition 会附加到 SQL 查询中,例如:

SELECT * FROM <tableName> WHERE <additional-condition> …​.

例如,如果一个带有列的 id (主键)、颜色 和品牌 的产品 集合,如果您希望快照只包含 color='blue' 的内容,当您请求快照时,您可以附加一个 additional-condition 语句来过滤内容:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`

您可以使用 additional-condition 语句根据多个列传递条件。例如,使用与上例中的相同 产品 集合,如果您希望快照只包含来自用于 color='blue'、和 brand='MyBrand' 的产品集合中的内容,您可以发送以下请求:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`

5.2.5.3. 停止增量快照

您还可以通过向源数据库上的集合发送信号来停止增量快照。您可以通过将文档插入到信号集合中提交停止快照信号。在 Debezium 检测到信号集合中的更改后,它会读取信号,并在正在进行时停止增量快照操作。

您提交的查询指定 增量 的快照操作,以及要删除的当前运行快照的集合。

先决条件

使用源信号频道停止增量快照

  1. 在信号集合中插入停止快照信号文档:

    <signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});

    例如,

    db.debeziumSignal.insert({ 1
    "type" : "stop-snapshot", 2 3
    "data" : {
    "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4
    "type": "incremental"} 5
    });

    signal 命令中的 idtypedata 参数的值对应于 信号集合的字段

    下表描述了示例中的参数:

    表 5.4. 插入命令中的字段描述,用于将停止增量快照文档发送到信号集合
    描述

    1

    db.debeziumSignal

    指定源数据库上信号集合的完全限定名称。

    2

    null

    上例中的 insert 方法省略了可选的 _id 参数。由于文档没有明确为该参数分配值,因此 MongoDB 自动分配给文档的任意 id 将成为信号请求的 id 标识符。
    使用此字符串识别信号集合中条目的日志记录消息。Debezium 不使用此标识符字符串。

    3

    stop-snapshot

    type 参数指定信号旨在触发的操作。

    4

    data-collections

    信号的 data 字段的可选组件,用于指定集合名称或正则表达式数组,以匹配要从快照中删除的集合名称。
    数组列出了按照完全限定名称匹配集合的正则表达式,其格式与您在 signal.data.collection 配置属性中指定连接器信号集合的名称相同。如果省略了 data 字段的这一组件,信号将停止正在进行的整个增量快照。

    5

    incremental

    信号的 data 字段所需的组件,用于指定要停止的快照操作类型。
    目前,唯一有效的选项是 增量的
    如果没有指定 类型 值,信号将无法停止增量快照。

5.2.5.4. 使用 Kafka 信号频道停止增量快照

您可以将信号消息发送到 配置的 Kafka 信号主题,以停止临时增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

message 的值是带有 typedata 字段的 JSON 对象。

信号类型是 stop-snapshotdata 字段必须具有以下字段:

表 5.5. 执行快照数据字段
字段默认

type

incremental

要执行的快照的类型。目前,Debeium 仅支持 增量 类型。
详情请查看下一节。

data-collections

N/A

可选数组,以逗号分隔的正则表达式,与表的完全限定域名匹配,以包含在快照中。
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

以下示例显示了典型的 stop-snapshot Kafka 信息:

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

5.2.6. Debezium MongoDB 连接器流更改事件记录

在副本集的连接器任务记录偏移后,它使用偏移来确定应该开始流更改的 oplog 中的位置。然后,任务(取决于配置)连接到副本集的主节点,或连接到副本集范围更改流,并开始从该位置流更改。它处理所有创建、插入和删除操作,并将其转换为 Debezium 更改事件。每个更改事件都包含找到操作的 oplog 中的位置,连接器会定期将其记录为最新的偏移。记录偏移的时间间隔由 offset.flush.interval.ms 进行管理,它是一个 Kafka Connect worker 配置属性。

当连接器被安全停止时,处理最后一个偏移会被记录,以便在重启后,连接器将继续保持关闭的位置。如果连接器的任务意外终止,则任务可能会在最后一次记录偏移后处理和生成事件,但在记录最后一个偏移前;重启时,连接器从最后 记录的 偏移开始,可能会生成之前在崩溃前生成的一些相同事件。

注意

当 Kafka 管道中的所有组件都正常运行时,Kafka 用户会 准确接收每个消息一次。但是,当出现问题时,Kafka 只能保证消费者 至少接收每个消息一次。为避免意外结果,使用者必须能够处理重复的消息。

如前文所述,连接器任务始终使用副本集的主节点从 oplog 中流更改,确保连接器尽可能看到最新的操作,并可以捕获比使用第二个aries 更低的延迟的变化。当副本集选择新主节点时,连接器会立即停止流更改,连接到新主节点,并在同一位置开始从新主节点流更改。同样,如果连接器遇到与副本集成员通信的问题,它会尝试使用 exponential backoff 来重新连接,因此不会大量副本集,并在连接后继续从最后一个离开的位置继续流更改。这样,连接器可以动态地调整副本集成员资格中的更改,并自动处理通信失败。

总之,MongoDB 连接器在大多数情况下继续运行。通信问题可能会导致连接器等待问题解决。

5.2.7. MongoDB 支持填充 Debezium 更改事件中的 before 字段

在 MongoDB 6.0 及更高版本中,您可以配置更改流来发出文档的预镜像状态,以填充 MongoDB 更改事件的 before 字段。要在 MongoDB 中启用预镜像,您必须使用 db.createCollection(), create, 或 collMod 为集合设置 changeStreamPreAndPostImages。要启用 Debezium MongoDB 在更改事件中包含预镜像,请将连接器的 capture.mode 设置为其中一个 *_with_pre_image 选项。

MongoDB 更改流事件的大小限制

MongoDB 更改流事件的大小限制为 16MB。因此,使用预镜像会增加超过这个阈值的可能性,这可能会导致失败。有关如何避免超过更改流限制的详情,请参考 MongoDB 文档

5.2.8. 接收 Debezium MongoDB 更改事件记录的默认 Kafka 主题名称

MongoDB 连接器将所有插入、更新和删除操作的事件写入每个集合中的文档到单个 Kafka 主题。Kafka 主题的名称始终使用 logicalName.databaseName.collectionName 格式,其中 logicalName 是连接器的逻辑名称(使用 topic.prefix 配置属性指定),databaseName 是创建发生在的数据库的名称,collectionName 是受影响的文档所在的 MongoDB 集合的名称。

例如,假设一个 MongoDB 副本集有一个 inventory 数据库,其中包含四个集合:products, products_on_hand, customers, 和 orders。如果监控这个数据库的连接器有一个逻辑名称 fulfillment,则这个连接器会在这四个 Kafka 主题上生成事件:

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

请注意,主题名称不包含副本集名称或分片名称。因此,对分片集合(每个分片都包含集合文档的子集)的所有更改都会进入相同的 Kafka 主题。

您可以根据需要将 Kafka 设置为自动创建主题。如果没有,则必须在启动连接器前使用 Kafka 管理工具创建主题。

5.2.9. 事件密钥控制 Debezium MongoDB 连接器的主题分区

MongoDB 连接器不会明确确定如何为事件分区主题。相反,它允许 Kafka 如何根据事件密钥确定分区主题。您可以通过在 Kafka Connect worker 配置中定义分区 实现的名称来更改 Kafka 的分区逻辑。

Kafka 只为写入单个主题分区的事件维护总顺序。按键对事件进行分区意味着,具有相同键的所有事件始终都到达同一分区。这样可确保特定文档的所有事件始终被完全排序。

5.2.10. Debezium MongoDB 连接器生成的事件代表事务边界

Debezium 可以生成代表事务元数据边界的事件,并增强更改数据事件消息。

Debezium 接收事务元数据时的限制

Debezium 注册并只针对部署连接器后发生的事务接收元数据。部署连接器前发生的事务元数据不可用。

对于每个事务 BEGINEND,Debezium 会生成一个包含以下字段的事件:

status
BEGINEND
id
唯一事务标识符的字符串。
event_count (用于 END 事件)
事务发出的事件总数。
data_collections (用于 END 事件)
data_collectionevent_count 的数组,它通过源自给定数据收集的更改来提供事件数量。

以下示例显示了一个典型的信息:

{
  "status": "BEGIN",
  "id": "1462833718356672513",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "1462833718356672513",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "rs0.testDB.collectiona",
      "event_count": 1
    },
    {
      "data_collection": "rs0.testDB.collectionb",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic. prefix>.transaction 的主题

更改数据事件增强

如果启用了事务元数据,数据消息 Envelope 会增加一个新的 transaction 字段。此字段以字段复合的形式提供有关每个事件的信息:

id
唯一事务标识符的字符串。
total_order
事件在事务生成的所有事件中绝对位置。
data_collection_order
在事务发出的所有事件间,按数据收集位置。

下面是一个信息示例:

{
  "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "1462833718356672513",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.