4.3. Debezium MongoDB 连接器数据更改事件的描述


Debezium MongoDB 连接器为每个插入、更新或删除数据的文档级别操作生成数据更改事件。每个事件都包含一个键和值。键和值的结构取决于更改的集合。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,则消费者可以从 registry 获取 schema 的模式 ID。这使得每个事件都可以自我包含。

以下框架 JSON 显示更改事件的基本四个部分。但是,如何配置在应用程序中使用的 Kafka Connect converter 决定了更改事件中的这四个部分的表示。只有当您将转换器配置为生成它时,schema 字段才处于更改事件中。同样,只有在将转换器配置为生成它时,才会发生更改事件中的事件密钥和事件有效负载。如果您使用 JSON 转换器,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表 4.4. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是 event 键的一部分。它指定一个 Kafka Connect 模式,它描述了事件密钥 有效负载部分中的内容。换句话说,第一个 schema 字段描述了更改的文档的密钥结构。

2

payload

第一个 payload 字段是 event 键的一部分。它具有上一个 schema 字段描述的结构,其中包含已更改的文档的密钥。

3

schema

第二个 schema 字段是 event 值的一部分。它指定了 Kafka Connect 模式,它描述了事件值 有效负载部分中的内容。换句话说,第二个 模式 描述了更改的文档的结构。通常,此架构包含嵌套模式。

4

payload

第二个 payload 字段是 event 值的一部分。它具有上一个 schema 字段描述的结构,其中包含已更改的文档的实际数据。

默认情况下,连接器流将事件记录更改为名称与事件原始集合相同的名称的主题。请参阅 主题名称

警告

MongoDB 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着,逻辑服务器名称必须以拉丁或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余的字符以及数据库和集合名称中的每个字符都必须是一个字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将替换为下划线字符。

如果逻辑服务器名称、数据库名称或集合名称包含无效字符,且区分另一个名称的唯一字符无效,则可能会导致意外冲突,因此替换为下划线。

如需更多信息,请参阅以下主题:

4.3.1. 关于 Debezium MongoDB 更改事件中的键

更改事件的密钥包含更改文档的密钥的 schema,以及更改的文档的实际密钥。对于给定集合,schema 及其对应有效负载都包含单个 id 字段。此字段的值是文档的标识符,表示为来自 MongoDB 扩展 JSON 序列化严格模式的字符串

考虑一个连接器,其逻辑名称为 fulfillment,包括一个 inventory 数据库的副本集,以及包含如下文档的 customers 集合。

文档示例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

更改事件键示例

捕获对 customers 集合的更改的每个更改事件都有相同的事件键模式。只要 customers 集合有以前的定义,捕获 客户 集合更改的事件都有以下关键结构:在 JSON 中,它类似如下:

{
  "schema": { 1
    "type": "struct",
    "name": "fulfillment.inventory.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
      {
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": { 5
    "id": "1004"
  }
}
表 4.5. 更改事件密钥的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了密钥 有效负载部分中的内容

2

fulfillment.inventory.customers.Key

定义密钥有效负载结构的 schema 名称。这个模式描述了文档的密钥结构。Key 模式名称的格式为 connector-name.database-name.collection-name.Key。在这个示例中:

  • fulfillment 是生成此事件的连接器的名称。
  • 清单是 包含更改的集合的数据库。
  • 客户 是包含更新的文档的集合。

3

optional

指明 event 键必须在其 payload 字段中包含一个值。在本例中,密钥有效负载中的值是必需的。当文档没有键时,键的 payload 字段中的值是可选的。

4

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、类型以及是否需要它。

5

payload

包含生成此更改事件的文档的密钥。在本例中,键包含一个类型为 stringid 字段,其值是 1004

本例使用带有整数标识符的文档,但任何有效的 MongoDB 文档标识符都的工作方式相同,包括文档标识符。对于文档标识符,事件的 payload.id 值是一个字符串,它代表更新的文档的原始 _id 字段作为一个 MongoDB 扩展 JSON 序列化,它使用严格的模式。下表提供了如何表示不同类型的 _id 字段的示例。

表 4.6. 代表事件密钥有效负载中的文档 _id 字段的示例
类型MongoDB _id Value密钥的有效负载

整数

1234

{ "id" : "1234" }

浮点值

12.34

{ "id" : "12.34" }

字符串

"1234"

{ "id" : "\"1234\"" }

文档

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

ObjectId

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

二进制

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }

4.3.2. 关于 Debezium MongoDB 更改事件中的值

更改事件中的值比键稍微复杂。与键一样,该值具有 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的事件,并且具有 envelope 结构的值有效负载。

考虑用于显示更改事件键示例的相同示例文档:

文档示例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

对每个事件类型都描述了更改事件的值部分:

创建 事件

以下示例显示了连接器为 在客户 集合中创建数据的操作生成的更改事件的值部分:

{
    "schema": { 1
      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 2
          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 3
          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 4
      },
    "payload": { 5
      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 6
      "source": { 7
        "version": "2.1.4.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c", 8
      "ts_ms": 1558965515240 9
    }
  }
表 4.7. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,用于描述值有效负载的结构。每次连接器为特定集合生成的更改时,更改事件的值模式都是相同的。

2

name

schema 部分中,每个 name 字段指定值有效负载中的字段的 schema。

io.debezium.data.Json 是有效负载 after,patch, 和 filter 字段的 schema。这个模式是针对 customers 集合的。create 事件是唯一包含 after 字段的事件类型。更新 事件包含 filter 字段和 patch 字段。delete 事件包含一个 filter 字段,而不是 after 字段或 patch 字段。

3

name

io.debezium.connector.mongo.Source 是有效负载的 source 字段的 schema。这个模式特定于 MongoDB 连接器。连接器将其用于它生成的所有事件。

4

name

dbserver1.inventory.customers.Envelope 是负载总体结构的模式,其中 dbserver1 是连接器名称,inventory 是数据库,customers 是集合。这个模式特定于集合。

5

payload

值的实际数据。这是更改事件提供的信息。

这可能似乎,事件的 JSON 表示要大于它们描述的文档。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的信息大小。

6

after

指定事件发生后文档状态的可选字段。在本例中,after 字段包含新文档的 _idfirst_namelast_nameemail 字段的值。after 值始终是一个字符串。按照惯例,它包含文档的 JSON 表示。MongoDB oplog 条目仅包含 _create_ 事件以及更新事件的文档的完整状态,当 capture.mode 选项被设置为 change_streams_ update _full; 换句话说,无论 capture.mode 选项是什么,create 事件是唯一包含 after 字段的事件类型。

7

source

描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序,以及事件是否为同一事务的一部分。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成的事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含新文档的集合和数据库的名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行了更改的时间戳,并在时间戳内事件被修改。
  • MongoDB 操作的唯一标识符( oplog 事件中的 h 字段)。
  • 在事务中执行更改时,MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符(仅更改流捕获模式)。

8

op

描述导致连接器生成事件的操作类型的必要字符串。在本例中,c 表示操作创建了文档。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = read (仅适用于快照)

9

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

更改流捕获模式

示例 customers 集合中一个更新的改变事件的值有与那个集合的 create 事件相同的模式。同样,事件的 payload 具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。只有在 capture.mode 选项被设置为 change_streams_ update _full 时,更新事件才会包括 after 值。如果 capture.mode 选项被设置为 一 个,则 会提供一个 before 值。新的 structured 字段 更新Description,本例中为几个附加字段:

  • updatedFields 是一个字符串字段,其中包含更新的文档字段的 JSON 表示及其值
  • removedFields 是从文档中删除的字段名称列表
  • 截断的Arrays 是被截断的文档中的数组列表

以下是连接器为 customers 集合中更新生成的改变事件值的示例:

{
    "schema": { ... },
    "payload": {
      "op": "u", 1
      "ts_ms": 1465491461815, 2
      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 3
      "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 4
      "updateDescription": {
        "removedFields": null,
        "updatedFields": "{\"first_name\": \"Anne Marie\"}", 5
        "truncatedArrays": null
      },
      "source": { 6
        "version": "2.1.4.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 1,
        "h": null,
        "tord": null,
        "stxnid": null,
        "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
        "txnNumber":1
      }
    }
  }
表 4.8. 更新 事件值字段的描述
字段名称描述

1

op

描述导致连接器生成事件的操作类型的必要字符串。在本例中,u 表示操作更新了文档。

2

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

3

之前

在更改前,包含实际 MongoDB 文档的 JSON 字符串表示。如果捕获模式没有设置为 sVirt _with_preimage 选项之一,则 update 事件值不包含 before 字段。

4

after

包含实际 MongoDB 文档的 JSON 字符串表示。
如果捕获模式没有设置为 change_streams_update_full,则 update 事件值不会包含 after 字段

5

updatedFields

包含文档更新字段值的 JSON 字符串表示。在本例中,更新将 first_name 字段改为一个新的值。

6

source

描述事件源元数据的必需字段。此字段包含与同一集合的 create 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成的事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含更新文档的集合和数据库的名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行了更改的时间戳,并在时间戳内事件被修改。
  • 在事务中执行更改时,MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符。
警告

事件中的 after 值应作为文档的 at-of-time 值进行处理。该值不会被动态计算,而是从集合中获取。因此,如果多个更新一个紧随另一个发生,则所有 update 事件都会包含在文档中存储的代表最后的值相同的 after 值。

如果您的应用程序依赖于逐步变化,那么您应该只依赖 更新描述

删除 事件

delete 更改事件中的值与为同一集合的 createupdate 事件相同的 schema 部分。delete 事件中的 payload 部分包含与为同一集合的 createupdate 事件不同的值。特别是 delete 事件既不包含 after 值,也不包含 updateDescription 值。以下是 customers 集合中文档的 delete 事件示例:

{
    "schema": { ... },
    "payload": {
      "op": "d", 1
      "ts_ms": 1465495462115, 2
      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",3
      "source": { 4
        "version": "2.1.4.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
表 4.9. 删除 事件值字段的描述
字段名称描述

1

op

描述操作类型的强制字符串。op 字段值为 d,表示此文档已被删除。

2

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

3

之前

在更改前,包含实际 MongoDB 文档的 JSON 字符串表示。如果捕获模式没有设置为 sVirt _with_preimage 选项之一,则 update 事件值不包含 before 字段。

4

source

描述事件源元数据的必需字段。此字段包含与同一集合的 createupdate 事件相同的信息,但它们的值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:

  • Debezium 版本。
  • 生成事件的连接器的名称。
  • MongoDB 副本集的逻辑名称,它组成了生成的事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。
  • 包含已删除文档的集合和数据库的名称。
  • 如果事件是快照的一部分。
  • 在数据库中进行了更改的时间戳,并在时间戳内事件被修改。
  • MongoDB 操作的唯一标识符( oplog 事件中的 h 字段)。
  • 在事务中执行更改时,MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符(仅更改流捕获模式)。

MongoDB 连接器事件旨在用于 Kafka 日志压缩。只要每个密钥至少保留最新的消息,日志压缩就可以删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

tombstone 事件

唯一标识文档的所有 MongoDB 连接器事件都完全相同。删除文档时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,对于 Kafka 删除具有该键的所有消息,消息值必须是 null。为了实现此目的,在 Debezium 的 MongoDB 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但为 null 值。tombstone 事件会通知 Kafka,所有具有相同键的消息都可被删除。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.