4.3. Debezium MongoDB 连接器数据更改事件的描述
Debezium MongoDB 连接器为每个插入、更新或删除数据的文档级操作生成数据更改事件。每个事件包含一个键和值。密钥的结构和值取决于更改的集合。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间推移而改变,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您正在使用 schema registry,用户可以使用该模式 ID 从 registry 获取 schema。这使得每个事件都自包含。
以下框架 JSON 显示更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect converter,决定更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema
字段才会处于更改事件中。同样,只有在您配置转换器来生成它时,事件密钥和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,更改事件具有此结构:
{ "schema": { 1 ... }, "payload": { 2 ... }, "schema": { 3 ... }, "payload": { 4 ... }, }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录改为名称与事件原始集合相同的主题。请参阅 主题名称。
MongoDB 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和名称和集合名称中的每个字符都必须是一个拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效字符,它将使用下划线字符替换。
如果逻辑服务器名称、数据库名称或集合名称包含无效字符,且唯一与另一个名称区分名称的字符无效,这可能会导致意外冲突冲突,从而被下划线替换。
如需更多信息,请参阅以下主题:
4.3.1. 关于 Debezium MongoDB 中的键更改事件
更改事件的密钥包含更改的文档的密钥和更改的文档的实际密钥的 schema。对于给定的集合,schema 及其对应有效负载都包含一个 id
字段。此字段的值是文档的标识符,表示为来自 MongoDB 扩展 JSON 序列化严格模式 的字符串。
考虑一个连接器,其逻辑名称为 fulfillment
,包括一个 inventory
数据库的副本集,以及包含如下文档的 customers
集合。
文档示例
{ "_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }
更改事件键示例
每次捕获 客户
集合更改的事件都有相同的事件关键模式。只要
集合有以前的定义,捕获 customer 集合更改的每个更改事件都有以下关键结构:在 JSON 中,它类似如下:
customers
{ "schema": { 1 "type": "struct", "name": "fulfillment.inventory.customers.Key", 2 "optional": false, 3 "fields": [ 4 { "field": "id", "type": "string", "optional": false } ] }, "payload": { 5 "id": "1004" } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 |
2 |
|
定义密钥有效负载结构的模式名称。这个模式描述了已更改的文档的密钥结构。键模式名称的格式是 connector-name.database-name.collection-name.
|
3 |
|
指明 event 键是否必须在其 |
4 |
|
指定 |
5 |
|
包含生成此更改事件的文档的密钥。在本例中,键包含类型为 |
这个示例使用带有整数标识符的文档,但任何有效的 MongoDB 文档标识符的工作方式相同,包括文档标识符。对于文档标识符,事件键的 payload.id
值是字符串,它表示更新的文档的原始 _id
字段作为使用 strict 模式的 MongoDB 扩展 JSON 序列化。下表提供了如何表示不同类型的 _id
字段的示例。
类型 | MongoDB _id Value | 密钥的有效负载 |
---|---|---|
整数 | 1234 |
|
浮点值 | 12.34 |
|
字符串 | "1234" |
|
文档 |
|
|
ObjectId |
|
|
二进制 |
|
|
4.3.2. 关于 Debezium MongoDB 更改事件中的值
更改事件中的值比键复杂一些。与键一样,该值有一个 schema
部分和 payload
部分。schema
部分包含描述 payload
部分的 Envelope
结构的 schema,包括其嵌套字段。为创建、更新或删除数据的操作更改事件,它们都有一个带有 envelope 结构的值有效负载。
考虑用于显示更改事件键示例的相同示例文档:
文档示例
{ "_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }
每个事件类型都描述了更改此文档的更改事件的值部分:
创建 事件
以下示例显示了连接器为在 customers
集合中创建数据的操作生成的更改事件的值部分:
{ "schema": { 1 "type": "struct", "fields": [ { "type": "string", "optional": true, "name": "io.debezium.data.Json", 2 "version": 1, "field": "after" }, { "type": "string", "optional": true, "name": "io.debezium.data.Json", "version": 1, "field": "patch" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "rs" }, { "type": "string", "optional": false, "field": "collection" }, { "type": "int32", "optional": false, "field": "ord" }, { "type": "int64", "optional": true, "field": "h" } ], "optional": false, "name": "io.debezium.connector.mongo.Source", 3 "field": "source" }, { "type": "string", "optional": true, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope" 4 }, "payload": { 5 "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 6 "source": { 7 "version": "2.3.4.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999 }, "op": "c", 8 "ts_ms": 1558965515240 9 } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
| 值的 schema,用于描述值有效负载的结构。当连接器为特定集合生成的每次更改事件中,更改事件的值模式都是相同的。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值的实际数据。这是更改事件提供的信息。 |
6 |
|
指定事件发生后文档状态的可选字段。在本例中,post |
7 |
| 描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序以及事件是否为同一事务的一部分。源元数据包括:
|
8 |
|
描述导致连接器生成事件的操作类型的强制字符串。在本例中,
|
9 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更改流捕获模式
示例 customers
集合中一个更新的改变事件的值有与那个集合的 create 事件相同的模式。同样,事件值有效负载具有相同的结构。但是,事件值有效负载在 update 事件中包含不同的值。只有在 capture.mode
选项被设置为 change_streams_update_full
时,update 事件才会包括一个 after
值。如果 capture.mode
选择被设置为 *_with_pre_image
选项之一,会提供一个 before
值。存在一个新的 structured 字段 updateDescription
,本例中为几个额外的字段:
-
updatedFields
是一个字符串字段,其中包含更新的文档字段的 JSON 表示及其值 -
removedFields
是从文档中删除的字段名称列表 -
truncatedArrays
是文档中的数组列表,被截断
以下是连接器为 customer 集合中更新生成的更改事件值 的示例
:
{ "schema": { ... }, "payload": { "op": "u", 1 "ts_ms": 1465491461815, 2 "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 3 "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 4 "updateDescription": { "removedFields": null, "updatedFields": "{\"first_name\": \"Anne Marie\"}", 5 "truncatedArrays": null }, "source": { 6 "version": "2.3.4.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 1, "h": null, "tord": null, "stxnid": null, "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}", "txnNumber":1 } } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
描述导致连接器生成事件的操作类型的强制字符串。在本例中, |
2 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
|
在更改前包含实际 MongoDB 文档的 JSON 字符串表示。如果捕获模式没有设置为 |
4 |
|
包含实际 MongoDB 文档的 JSON 字符串表示。 |
5 |
|
包含文档更新字段值的 JSON 字符串表示。在本例中,更新将 |
6 |
| 描述事件源元数据的必需字段。此字段包含与同一集合的 create 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:
|
事件中的 after
值应作为文档的 at-point-of-time 值进行处理。该值不会动态计算,但是从集合中获取的。因此,如果多个更新一个紧随另一个发生,则所有 update 事件都会包含在文档中存储的代表最后的值相同的 after
值。
如果您的应用程序依赖于逐步更改演进,则应该只依赖 updateDescription
。
删除 事件
delete 更改事件中的值与为同一集合的 create 和 update 事件相同的 schema
部分。delete 事件中的 payload
部分包含与为同一集合的 create 和 update 事件不同的值。特别是,delete 事件不包含 after
值和 updateDescription
值。以下是 customers
集合中文档的 delete 事件示例:
{ "schema": { ... }, "payload": { "op": "d", 1 "ts_ms": 1465495462115, 2 "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",3 "source": { 4 "version": "2.3.4.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999 } } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
描述操作类型的强制字符串。 |
2 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
|
在更改前包含实际 MongoDB 文档的 JSON 字符串表示。如果捕获模式没有设置为 |
4 |
| 描述事件源元数据的必需字段。此字段包含与同一集合的 create 或 update 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:
|
MongoDB 连接器事件被设计为使用 Kafka 日志压缩。只要保留每个密钥的最新消息,日志压缩就会启用删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
tombstone 事件
唯一标识的文档的所有 MongoDB 连接器事件都有完全相同的密钥。删除文档时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,要让 Kafka 删除具有该键的所有消息,消息值必须为 null
。为了实现此目的,在 Debezium 的 MongoDB 连接器发出一个 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但有一个 null
值。tombstone 事件告知 Kafka,可以删除具有相同键的所有消息。