7.8. 在 Debezium MongoDB 更改事件状态后提取源文档
Debezium MongoDB 连接器会发出数据更改信息,以表示 MongoDB 集合中发生的每个操作。这些事件消息的复杂结构代表原始数据库事件的详情。但是,一些下游用户可能无法以原始格式处理消息。例如,为了代表数据收集中的嵌套文档,连接器会以包含嵌套字段的格式发出事件消息。要支持接收器连接器,或其他无法处理原始消息层次结构格式的消费者,您可以使用 Debezium MongoDB 事件扁平化(ExtractNewDocumentState)单一消息转换(SMT)。SMT 简化了原始消息的结构,并可以其他方式修改消息,以便更轻松地处理数据。
事件扁平化转换是一个 Kafka Connect SMT。
本章中的信息描述了 Debezium MongoDB 连接器的事件扁平化单一消息转换(SMT)。有关与关系数据库一起使用的对等 SMT 的详情,请查看 新建记录状态 Extraction SMT 的文档。
以下主题提供详情:
- 第 7.8.1 节 “Debezium MongoDB 更改事件结构的描述”
- 第 7.8.2 节 “Debezium MongoDB 事件扁平化转换的行为”
- 第 7.8.3 节 “配置 Debezium MongoDB 事件扁平化转换”
- 第 7.8.4 节 “MongoDB 事件消息中编码数组的选项”
- 第 7.8.5 节 “在 MongoDB 事件消息中扁平化嵌套结构”
-
第 7.8.6 节 “Debezium MongoDB 连接器如何报告
$unset
操作删除的字段名称” - 第 7.8.7 节 “确定原始数据库操作的类型”
- 第 7.8.8 节 “使用 MongoDB 事件扁平化 SMT 在 Kafka 记录中添加 Debezium 元数据”
- 第 7.8.9 节 “用于应用 MongoDB 的选项会有选择地提取新的文档状态转换”
- 第 7.8.10 节 “MongoDB 的 Debezium 事件扁平化转换的配置选项”
- 已知限制
7.8.1. Debezium MongoDB 更改事件结构的描述
Debezium MongoDB 连接器会生成具有复杂结构的更改事件。每个事件消息包括以下部分:
- 源元数据
包括但不仅限于以下字段:
- 更改了集合中数据的操作类型(创建/插入、更新或删除)。
- 发生更改的数据库和集合的名称。
- 标识更改时间的时间戳。
- 可选的事务信息。
- 文档数据
- 数据
前
当 Debezium 连接器的
capture.mode
设置为以下值之一时,运行 MongoDB 6.0 及之后的版本中存在此字段:-
change_streams_with_pre_image
. change_streams_update_full_with_pre_image
.如需更多信息,请参阅 MongoDB pre-image 支持
-
- 数据
后
代表当前操作后文档中存在的值的 JSON 字符串。事件消息中的
after
字段取决于事件类型和连接器配置。MongoDBinsert
操作的create
事件始终包含一个after
字段,无论capture.mode
设置是什么。对于更新
事件,只有在将capture.mode
设置为以下值之一时才会出现after
字段:-
change_streams_update_full
change_streams_update_full_with_pre_image
.注意更改事件消息中的
after
值不一定代表事件后面的文档的状态。该值不会被动态计算;在连接器捕获更改事件后,它会查询集合来检索文档的当前值。例如,假设一种情况,它有多个操作
a
,b
, 和c
在快速成功中修改文档。当连接器处理时,修改a
,它会查询集合以获取完整的文档。同时,改变b
和c
发生。当连接器收到对更改a
的完整文档的响应时,可能会收到一个基于b
或c
后续更改的文档版本。如需更多信息,请参阅capture.mode
属性的文档。
-
- 数据
以下片段显示了 创建
更改事件的基本结构,连接器在 MongoDB insert
操作后发出:
{ "op": "c", "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}", "source": { ... } }
上例中的 after
字段复杂格式提供有关源数据库中发生的更改的详细信息。但是,一些消费者无法处理包含嵌套值的消息。要将原始消息的复杂嵌套字段转换为更简单、更通用的兼容结构,请将事件扁平化 SMT 用于 MongoDB。SMT 扁平化消息中嵌套字段的结构,如下例所示:
{ "field1" : "newvalue1", "field2" : "newvalue2" }
有关 Debezium MongoDB 连接器生成的消息的默认结构的更多信息,请参阅 连接器文档。
7.8.2. Debezium MongoDB 事件扁平化转换的行为
MongoDB 的事件扁平化 SMT 会从 Debezium MongoDB 连接器发送的 create
或 update
更改事件消息中拉取 after
字段。在 SMT 处理原始更改事件消息后,它会生成一个只包含 after
字段内容的简化版本。
根据您的用例,您可以将 ExtractNewDocumentState SMT 应用到 Debezium MongoDB 连接器,或应用于使用 Debezium 连接器生成的消息的接收器连接器。如果您将 SMT 应用到 Debezium MongoDB 连接器,则 SMT 会修改连接器在发送到 Apache Kafka 前发出的消息。为确保 Kafka 以原始格式保留完整的 Debezium 更改事件信息,请将 SMT 应用到接收器连接器。
当您使用事件扁平化 SMT 处理从 MongoDB 连接器发出的消息时,SMT 会将原始消息中的记录结构转换为正确输入的 Kafka Connect 记录,这些记录可以被典型的接收器连接器使用。例如,SMT 将原始消息中代表 后
信息的 JSON 字符串转换为任何消费者可以处理的模式结构。
另外,您还可以将 MongoDB 的事件扁平化 SMT 配置为在处理过程中以其他方式修改消息。如需更多信息,请参阅配置 主题。
7.8.3. 配置 Debezium MongoDB 事件扁平化转换
为使用 Debezium MongoDB 连接器发出的消息的接收器连接器配置 event flattening (ExtractNewDocumentState) SMT。
以下主题提供详情:
- 第 7.8.3.1 节 “示例:Debebe MongoDB event flattening-transformation 的基本配置”
- 第 7.8.4 节 “MongoDB 事件消息中编码数组的选项”
- 第 7.8.5 节 “在 MongoDB 事件消息中扁平化嵌套结构”
-
第 7.8.6 节 “Debezium MongoDB 连接器如何报告
$unset
操作删除的字段名称” - 第 7.8.7 节 “确定原始数据库操作的类型”
- 第 7.8.8 节 “使用 MongoDB 事件扁平化 SMT 在 Kafka 记录中添加 Debezium 元数据”
- 第 7.8.9 节 “用于应用 MongoDB 的选项会有选择地提取新的文档状态转换”
- 第 7.8.10 节 “MongoDB 的 Debezium 事件扁平化转换的配置选项”
7.8.3.1. 示例:Debebe MongoDB event flattening-transformation 的基本配置
要获得 SMT 的默认行为,请在不指定任何选项的情况下将 SMT 添加到 sink 连接器配置中,如下例所示:
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
与任何 Kafka Connect 连接器配置一样,您可以将 transforms=
设置为多个,用逗号分开的 SMT 别名。Kafka Connect 应用按列出顺序指定的转换。
您可以为使用 MongoDB 事件扁平化 SMT 的连接器设置多个选项。以下示例显示了为连接器设置 drop.tombstones
、delete.handling.mode
和 add.headers
选项的配置:
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=drop transforms.unwrap.add.headers=op
有关上例中配置选项的更多信息,请参阅 配置主题。
自定义配置
连接器可能会发出许多类型的事件消息(如 heartbeat 消息、tombstone 消息或有关事务的元数据消息)。要将转换应用到事件的子集,您可以定义 SMT predicate 语句,该语句仅有选择地将转换 应用到特定的事件。
7.8.4. MongoDB 事件消息中编码数组的选项
默认情况下,事件扁平化 SMT 会将 MongoDB 数组转换为与 Apache Kafka Connect 或 Apache Avro 模式兼容的数组。虽然 MongoDB 数组可以包含多个元素,但 Kafka 数组中的所有元素都必须是相同的类型。
要确保 SMT 以满足环境需求的方式对数组进行编码,您可以指定 array.encoding
配置选项。以下示例显示了设置阵列编码的配置:
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.array.encoding=<array|document>
根据配置,SMT 使用以下编码方法之一处理源消息中的数组实例:
- 数组编码
-
如果将
array.encoding
设置为array
(默认),则 SMT 编码使用数组
数据类型对原始消息中的数组进行编码。为确保正确处理,数组实例中的所有元素都必须相同类型。这个选项是一个限制,但它可让下游客户端轻松处理数组。 - 文档编码
-
如果将
array.encoding
设置为document
,则 SMT 会将源中的每个阵列转换为 structs 中的一个 struct,类似于 BSON 序列化。主 结构 包含名为_0
、_1
、_2
等字段,其中每个字段名称代表原始阵列中元素的索引。SMT 使用源数组中对等元素的值填充这些索引字段的值。索引名称以下划线作为前缀,因为 Avro 编码会禁止以数字字符开头的字段名称。
以下示例演示了 Debezium MongoDB 连接器如何代表包含异构数据类型的数组的数据库文档:
例 7.2. 示例:记录包含多个数据类型的数组的编码
{ "_id": 1, "a1": [ { "a": 1, "b": "none" }, { "a": "c", "d": "something" } ] }
如果将 array.encoding
设置为 document
,则 SMT 会将前面的文档转换为以下格式:
{ "_id": 1, "a1": { "_0": { "a": 1, "b": "none" }, "_1": { "a": "c", "d": "something" } } }
文档
编码选项可让 SMT 处理由异构元素组成的任意数组。但是,在使用这个选项前,请始终验证接收器连接器和其他下游用户是否能够处理包含多个数据类型的数组。
7.8.5. 在 MongoDB 事件消息中扁平化嵌套结构
当数据库操作涉及嵌入式文档时,Debebe MongoDB 连接器会发出一个 Kafka 事件记录,其结构反映了原始文档的层次结构。也就是说,事件消息代表嵌套文档作为一组嵌套字段结构。在下游连接器无法处理包含嵌套结构的消息的环境中,您可以将事件扁平化 SMT 配置为在消息中扁平化分层结构。扁平消息结构更适合表类存储。
要将 SMT 配置为扁平化嵌套结构,请将 flatten.struct
配置选项设为 true
。在转换的消息中,字段名称会被构建,以便与文档源保持一致。SMT 通过将父文档字段的名称与嵌套文档字段的名称连接来重命名每个扁平化的字段。由 flatten.struct.delimiter
选项定义的分隔符将分隔名称的组件。struct.delimiter
的默认值是一个下划线字符(_
)。
以下示例显示了指定 SMT flattens 嵌套结构的配置:
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.flatten.struct=<true|false> transforms.unwrap.flatten.struct.delimiter=<string>
以下示例显示了由 MongoDB 连接器发出的事件消息。消息包括了一个文档 a
的字段,其中包含两个嵌套文档 (b
和 c
) 的字段:
{ "_id": 1, "a": { "b": 1, "c": "none" }, "d": 100 }
以下示例中的消息显示了 MongoDB 的 SMT 后的输出,其中包含前面的消息中的嵌套结构:
{ "_id": 1, "a_b": 1, "a_c": "none", "d": 100 }
在生成的消息中,嵌套在原始消息中的 b
和 c
字段会被扁平化并重命名。重命名的字段将父文档 a
的名称与嵌套文档的名称连接在一起: a_b
和 a_c
。新字段名称的组件由下划线字符分开,如 struct.delimiter
配置属性的设置所定义,
7.8.6. Debezium MongoDB 连接器如何报告 $unset
操作删除的字段名称
在 MongoDB 中,$unset
operator 和 $rename
operator 都从文档中删除字段。因为 MongoDB 集合是无模式的,所以在更新从文档中删除了字段后,无法推断更新文档中缺少的字段的名称。为了支持接收器连接器或其他可能需要删除字段信息的用户,Debezium 会发出更新信息,其中包含一个 removedFields
元素,用于列出已删除字段的名称。
以下示例显示一个操作的一个更新消息的部分,这导致删除了 a
字段:
"payload": { "op": "u", "ts_ms": "...", "ts_us" : "...", "ts_ns" : "...", "before": "{ ... }", "after": "{ ... }", "updateDescription": { "removedFields": ["a"], "updatedFields": null, "truncatedArrays": null } }
在上例中,before
和 after
代表文档更新以前和以后的源文档的状态。只有在设置了连接器的 capture.mode
时,连接器才会发出这些字段,如以下列表所述:
before
field在更改前提供文档的状态。只有在将
capture.mode
设置为以下值之一时,才会显示此字段:-
change_streams_with_pre_image
-
change_streams_update_full_with_pre_image
.
-
after
字段更改后提供文档的完整状态。只有在将
capture.mode
设置为以下值之一时,才会显示此字段:-
change_streams_update_full
-
change_streams_update_full_with_pre_image
.
-
假设配置为捕获完整文档的连接器,当 ExtractNewDocumentState
SMT 收到 $unset
事件 的更新
消息时,SMT 会通过代表已删除字段来对消息进行编码,如下例所示:
{ "id": 1, "a": null }
对于没有配置为捕获完整文档的连接器,当 SMT 收到 $unset
操作的更新事件时,它会生成以下输出信息:
{ "a": null }
7.8.7. 确定原始数据库操作的类型
在 SMT 扁平化事件消息后,生成的消息不再指示生成事件的操作是 创建
、更新
或初始快照 读取
。通常,您可以通过将连接器配置为公开有关 删除
附带的 tombstone 或重写事件的信息,来识别删除操作。有关配置连接器以公开事件消息中的 tombstones 和 rewrites 的信息的更多信息,请参阅 drop.tombstones
和 delete.handling.mode
属性。
要在事件消息中报告数据库操作的类型,SMT 可以在以下元素之一中添加 op
字段:
- 事件消息正文。
- 消息标头。
例如,要添加一个标头属性来显示原始操作的类型,添加转换,然后将 add.headers
属性添加到连接器配置中,如下例所示:
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.headers=op
根据前面的配置,SMT 通过向消息添加一个 op
标头来报告事件类型,并为它分配一个字符串值来标识操作类型。分配的字符串值基于原始 MongoDB 更改事件消息 中的 op
字段值。
7.8.8. 使用 MongoDB 事件扁平化 SMT 在 Kafka 记录中添加 Debezium 元数据
MongoDB 的事件扁平化 SMT 可以将原始更改事件消息的元数据字段添加到简化的消息。添加的元数据字段以双下划线("__"
)作为前缀。在事件记录中添加元数据可以包括内容,如发生更改事件的集合名称,或者包含特定于连接器的字段,如副本集名称。目前,SMT 只能从以下更改事件子结构中添加字段:source
, transaction
和 updateDescription
。
有关 MongoDB 更改事件结构的更多信息,请参阅 MongoDB 连接器文档。
例如,您可以指定以下配置,将副本集名称(rs
)和更改事件的集合名称添加到最终扁平化的事件记录中:
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.fields=rs,collection
前面的配置会导致以下内容被添加到扁平化的记录中:
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
如果您希望 SMT 添加元数据字段来删除事件,请将 delete
. handling.mode 选项的值设置为
重写
。
7.8.9. 用于应用 MongoDB 的选项会有选择地提取新的文档状态转换
除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括 heartbeat 信息,以及有关 schema 更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。
有关如何有选择地应用 SMT 的更多信息,请参阅为转换配置 SMT predicate。
7.8.10. MongoDB 的 Debezium 事件扁平化转换的配置选项
下表描述了 MongoDB 事件扁平化 SMT 的配置选项。
属性 | 默认 | 描述 |
---|---|---|
| 指定 SMT 在对原始事件读取的数组进行编码时使用的格式。设置以下选项之一:
有关 | |
| SMT flattens 结构(structs)在原始事件消息中连接嵌套属性的名称(由可配置的分隔符分开),以形成一个简单的字段名称。 | |
|
当 | |
|
Debezium 为每个 注意
计划在以后的发行版本中删除这个选项。在这一位置,使用 | |
|
指定 SMT 如何处理 Debezium 为
注意
计划在以后的发行版本中删除这个选项。在这一位置,使用 | |
没有默认值 |
Debezium 为每个 注意
这个选项的设置优先于您可以为已弃用的 设置以下选项之一:
| |
|
当设置为 | |
__ (double-underscore) | 将此可选字符串设置为前缀标头。 | |
没有默认值 |
指定以逗号分隔的列表,没有空格,以及您希望 SMT 添加到简化消息的标头中的元数据字段。当原始消息包含重复的字段名称时,您可以通过提供 struct 的名称和字段名称来标识要修改的特定字段,如
另外,您还可以覆盖字段的原始名称,并通过在列表中添加以下格式的条目来为其分配新名称:
例如: version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP
您指定的新名称值区分大小写。 | |
__ (double-underscore) | 指定字段名称前缀的可选字符串。 | |
没有默认值 |
将这个选项设置为用逗号分开的列表,没有空格,即 metadata 字段要添加到简化的 Kafka 消息的
例如: version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP
您指定的新名称值区分大小写。
当 SMT 在简化消息的 |
已知限制
- 因为 MongoDB 是一个无模式数据库,因此在使用 Debezium 对基于 schema 的数据关系数据库的更改时,确保一致的列定义,所以名称相同类型的字段中必须存储相同类型的数据。
- 配置 SMT 以与接收器连接器兼容的格式生成信息。如果接收器连接器需要 "flat" 消息结构,但它收到对源 MongoDB 文档中的数组编码的消息,接收器连接器无法处理消息。