5.3. Debezium MongoDB 连接器数据更改事件的描述


Debezium MongoDB 连接器为每个插入、更新或删除数据的文档级操作生成数据更改事件。每个事件包含一个键和值。密钥的结构和值取决于更改的集合。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间推移而改变,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您正在使用 schema registry,用户可以使用该模式 ID 从 registry 获取 schema。这使得每个事件都自包含。

以下框架 JSON 显示更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect converter,决定更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema 字段才会处于更改事件中。同样,只有在您配置转换器来生成它时,事件密钥和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表 5.6. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是事件键的一部分。它指定一个 Kafka Connect 模式,用于描述事件键的 payload 部分的内容。换句话说,第一个 schema 字段描述了更改的文档的密钥结构。

2

payload

第一个 payload 字段是 event 键的一部分。它具有上一个 schema 字段描述的结构,它包含已更改的文档的密钥。

3

schema

第二个 schema 字段是事件值的一部分。它指定 Kafka Connect 模式,用于描述事件值 有效负载部分的内容。换句话说,第二个 模式 描述了更改的文档的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它具有上一个 schema 字段描述的结构,它包含已更改的文档的实际数据。

默认情况下,连接器流将事件记录改为名称与事件原始集合相同的主题。请参阅 主题名称

警告

MongoDB 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和名称和集合名称中的每个字符都必须是一个拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效字符,它将使用下划线字符替换。

如果逻辑服务器名称、数据库名称或集合名称包含无效字符,且唯一与另一个名称区分名称的字符无效,这可能会导致意外冲突冲突,从而被下划线替换。

如需更多信息,请参阅以下主题:

5.3.1. 关于 Debezium MongoDB 中的键更改事件

更改事件的密钥包含更改的文档的密钥和更改的文档的实际密钥的 schema。对于给定的集合,schema 及其对应有效负载都包含一个 id 字段。此字段的值是文档的标识符,表示为来自 MongoDB 扩展 JSON 序列化严格模式 的字符串

考虑一个连接器,其逻辑名称为 fulfillment,包括一个 inventory 数据库的副本集,以及包含如下文档的 customers 集合。

文档示例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

更改事件键示例

每次捕获 客户 集合更改的事件都有相同的事件关键模式。只要 customers 集合有以前的定义,捕获 customer 集合更改的每个更改事件都有以下关键结构:在 JSON 中,它类似如下:

{
  "schema": { 1
    "type": "struct",
    "name": "fulfillment.inventory.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
      {
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": { 5
    "id": "1004"
  }
}
表 5.7. 更改事件键的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 payload 部分的内容。

2

fulfillment.inventory.customers.Key

定义密钥有效负载结构的模式名称。这个模式描述了已更改的文档的密钥结构。键模式名称的格式是 connector-name.database-name.collection-name.Key。在本例中:

  • fulfillment 是生成此事件的连接器的名称。
  • inventory 是包含已更改的集合的数据库。
  • 客户 是包含已更新文档的集合。

3

optional

指明 event 键是否必须在其 payload 字段中包含一个值。在本例中,键有效负载中的值是必需的。当文档没有键时,键的 payload 字段中的值是可选的。

4

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、类型以及是否需要。

5

payload

包含生成此更改事件的文档的密钥。在本例中,键包含类型为 字符串的 id 字段,其值为 1004

这个示例使用带有整数标识符的文档,但任何有效的 MongoDB 文档标识符的工作方式相同,包括文档标识符。对于文档标识符,事件键的 payload.id 值是字符串,它表示更新的文档的原始 _id 字段作为使用 strict 模式的 MongoDB 扩展 JSON 序列化。下表提供了如何表示不同类型的 _id 字段的示例。

表 5.8. 在事件键有效负载中代表文档 _id 字段的示例
类型MongoDB _id Value密钥的有效负载

整数

1234

{ "id" : "1234" }

浮点值

12.34

{ "id" : "12.34" }

字符串

"1234"

{ "id" : "\"1234\"" }

文档

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

ObjectId

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

二进制

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }

5.3.2. 关于 Debezium MongoDB 更改事件中的值

更改事件中的值比键复杂一些。与键一样,该值有一个 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。为创建、更新或删除数据的操作更改事件,它们都有一个带有 envelope 结构的值有效负载。

考虑用于显示更改事件键示例的相同示例文档:

文档示例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

每个事件类型都描述了更改此文档的更改事件的值部分:

创建 事件

以下示例显示了连接器为在 customers 集合中创建数据的操作生成的更改事件的值部分:

{
    "schema": { 1
      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 2
          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 3
          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 4
      },
    "payload": { 5
      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 6
      "source": { 7
        "version": "2.3.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c", 8
      "ts_ms": 1558965515240 9
    }
  }
表 5.9. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,用于描述值有效负载的结构。当连接器为特定集合生成的每次更改事件中,更改事件的值模式都是相同的。

2

name

schema 部分中,每个 name 字段为值有效负载中的字段指定 schema。

io.debezium.data.Json 是、patchfilter 字段 有效负载的 schema。这个模式是针对 customers 集合的。create 事件是包含 after 字段的唯一事件类型。update 事件包含一个 filter 字段和一个 patch 字段。delete 事件包含一个 过滤器 字段,但不是 after 字段或 patch 字段。

3

name

io.debezium.connector.mongo.Source 是有效负载的 source 字段的 schema。这个模式特定于 MongoDB 连接器。连接器将其用于它生成的所有事件。

4

name

dbserver1.inventory.customers.Envelope 是负载总体结构的模式,其中 dbserver1 是连接器名称,inventory 是数据库,customers 是集合。这个模式特定于集合。

5

payload

值的实际数据。这是更改事件提供的信息。

可能会出现事件的 JSON 表示比它们描述的文档大得多。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的信息大小。

6

after

指定事件发生后文档状态的可选字段。在本例中,post 字段包含 新文档的 _idfirst_namelast_nameemail 字段的值。after 值始终是一个字符串。按照惯例,它包含文档的 JSON 表示。MongoDB oplog 条目仅包含 _create_ 事件的完整文档,当 capture.mode 选项被设置为 change_streams_update_full 时还包括 update 事件的文档; 换句话说,无论 capture.mode 选项是什么,create 事件是唯一包含 after 字段的事件类型。

7

source

描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序以及事件是否为同一事务的一部分。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含新文档的集合和数据库名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行更改时的时间戳,并在时间戳内发生事件。
  • MongoDB 操作的唯一标识符( oplog 事件中的 h 字段)。
  • MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符,以防更改是在事务中执行(仅更改流捕获模式)。

8

op

描述导致连接器生成事件的操作类型的强制字符串。在本例中,c 表示操作创建了文档。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = 读取(仅适用于快照)

9

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

更改流捕获模式

示例 customers 集合中一个更新的改变事件的值有与那个集合的 create 事件相同的模式。同样,事件值有效负载具有相同的结构。但是,事件值有效负载在 update 事件中包含不同的值。只有在 capture.mode 选项被设置为 change_streams_update_full 时,update 事件才会包括一个 after 值。如果 capture.mode 选择被设置为 *_with_pre_image 选项之一,会提供一个 before 值。存在一个新的 structured 字段 updateDescription,本例中为几个额外的字段:

  • updatedFields 是一个字符串字段,其中包含更新的文档字段的 JSON 表示及其值
  • removedFields 是从文档中删除的字段名称列表
  • truncatedArrays 是文档中的数组列表,被截断

以下是连接器为 customer 集合中更新生成的更改事件值 的示例

{
    "schema": { ... },
    "payload": {
      "op": "u", 1
      "ts_ms": 1465491461815, 2
      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 3
      "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 4
      "updateDescription": {
        "removedFields": null,
        "updatedFields": "{\"first_name\": \"Anne Marie\"}", 5
        "truncatedArrays": null
      },
      "source": { 6
        "version": "2.3.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 1,
        "h": null,
        "tord": null,
        "stxnid": null,
        "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
        "txnNumber":1
      }
    }
  }
表 5.10. 更新 事件值字段的描述
字段名称描述

1

op

描述导致连接器生成事件的操作类型的强制字符串。在本例中,u 表示操作更新了文档。

2

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

3

before

在更改前包含实际 MongoDB 文档的 JSON 字符串表示。如果捕获模式没有设置为 *_with_preimage 选项之一,则 update 事件值不包含 before 字段。

4

after

包含实际 MongoDB 文档的 JSON 字符串表示。
如果捕获模式没有设置为 change_streams_update_full,则 update 事件值不会包含 after 字段

5

updatedFields

包含文档更新字段值的 JSON 字符串表示。在本例中,更新将 first_name 字段改为新值。

6

source

描述事件源元数据的必需字段。此字段包含与同一集合的 create 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含更新文档的集合和数据库名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行更改时的时间戳,并在时间戳内发生事件。
  • MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符,以防更改是在事务中执行的。
警告

事件中的 after 值应作为文档的 at-point-of-time 值进行处理。该值不会动态计算,但是从集合中获取的。因此,如果多个更新一个紧随另一个发生,则所有 update 事件都会包含在文档中存储的代表最后的值相同的 after 值。

如果您的应用程序依赖于逐步更改演进,则应该只依赖 updateDescription

删除 事件

delete 更改事件中的值与为同一集合的 createupdate 事件相同的 schema 部分。delete 事件中的 payload 部分包含与为同一集合的 createupdate 事件不同的值。特别是,delete 事件不包含 after 值和 updateDescription 值。以下是 customers 集合中文档的 delete 事件示例:

{
    "schema": { ... },
    "payload": {
      "op": "d", 1
      "ts_ms": 1465495462115, 2
      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",3
      "source": { 4
        "version": "2.3.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
表 5.11. 删除 事件值字段的描述
字段名称描述

1

op

描述操作类型的强制字符串。op 字段值为 d,表示此文档已被删除。

2

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

3

before

在更改前包含实际 MongoDB 文档的 JSON 字符串表示。如果捕获模式没有设置为 *_with_preimage 选项之一,则 update 事件值不包含 before 字段。

4

source

描述事件源元数据的必需字段。此字段包含与同一集合的 createupdate 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含已删除文档的集合和数据库名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行更改时的时间戳,并在时间戳内发生事件。
  • MongoDB 操作的唯一标识符( oplog 事件中的 h 字段)。
  • MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符,以防更改是在事务中执行(仅更改流捕获模式)。

MongoDB 连接器事件被设计为使用 Kafka 日志压缩。只要保留每个密钥的最新消息,日志压缩就会启用删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

tombstone 事件

唯一标识的文档的所有 MongoDB 连接器事件都有完全相同的密钥。删除文档时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,要让 Kafka 删除具有该键的所有消息,消息值必须为 null。为了实现此目的,在 Debezium 的 MongoDB 连接器发出一个 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但有一个 null 值。tombstone 事件告知 Kafka,可以删除具有相同键的所有消息。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.