3.3. Debezium Db2 连接器数据更改事件的描述


Debezium Db2 连接器会为每行级别 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件都包含一个键和值。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您使用 schema registry,则消费者可以从 registry 获取 schema 的模式 ID。这使得每个事件都可以自我包含。

以下框架 JSON 显示更改事件的基本四个部分。但是,如何配置在应用程序中使用的 Kafka Connect converter 决定了更改事件中的这四个部分的表示。只有当您将转换器配置为生成它时,schema 字段才处于更改事件中。同样,只有在将转换器配置为生成它时,才会发生更改事件中的事件密钥和事件有效负载。如果您使用 JSON 转换器,并将其配置为生成所有四个基本更改事件部分,则更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表 3.5. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是 event 键的一部分。它指定一个 Kafka Connect 模式,它描述了事件密钥 有效负载部分中的内容。换句话说,第一个 schema 字段描述了主密钥的结构,或者唯一键(如果表没有主密钥)用于更改的表。

可以通过设置 message.key.columns 连接器配置属性 来覆盖表的主键。在本例中,第一个 schema 字段描述了由该属性标识的密钥的结构。

2

payload

第一个 payload 字段是 event 键的一部分。它具有上一个 schema 字段描述的结构,其中包含更改的行的密钥。

3

schema

第二个 schema 字段是 event 值的一部分。它指定了 Kafka Connect 模式,它描述了事件值 有效负载部分中的内容。换句话说,第二个 模式 描述了更改的行的结构。通常,此架构包含嵌套模式。

4

payload

第二个 payload 字段是 event 值的一部分。它具有上一个 schema 字段描述的结构,其中包含更改的行的实际数据。

默认情况下,连接器流将事件记录更改为名称与事件原始表相同的名称的主题。如需更多信息,请参阅 主题名称

警告

Debezium Db2 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着,逻辑服务器名称必须以拉丁或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余的字符以及数据库和表名称中的每个字符都必须是字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效的字符,它将替换为下划线字符。

如果逻辑服务器名称、数据库名称或表名称包含无效字符,且区分另一个名称的唯一字符无效,则可能会导致意外冲突,因此替换为下划线。

另外,对于数据库、模式和表的 Db2 名称可能区分大小写。这意味着连接器可能会向同一 Kafka 主题发出多个表的事件记录。

详情包括在以下主题中:

3.3.1. 关于 Debezium db2 中的键更改事件

更改事件的密钥包含更改表的密钥的 schema,以及更改的行的实际键。在连接器创建事件时,模式及其对应有效负载都包含更改表的 PRIMARY KEY (或唯一约束)中的每个列的字段。

请考虑以下 客户 表,然后是此表的更改事件键示例。

表示例

CREATE TABLE customers (
 ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
 FIRST_NAME VARCHAR(255) NOT NULL,
 LAST_NAME VARCHAR(255) NOT NULL,
 EMAIL VARCHAR(255) NOT NULL UNIQUE
);

更改事件键示例

捕获对 customers 表的更改的每个更改事件都有相同的事件键模式。只要 customers 表有以前的定义,捕获 customer 表更改的事件都有以下关键结构:在 JSON 中,它类似如下:

{
    "schema": {  1
        "type": "struct",
        "fields": [  2
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,  3
        "name": "mydatabase.MYSCHEMA.CUSTOMERS.Key"  4
    },
    "payload": {  5
        "ID": 1004
    }
}
表 3.6. 更改事件密钥的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了密钥 有效负载部分中的内容

2

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、类型以及是否需要它。

3

optional

指明 event 键必须在其 payload 字段中包含一个值。在本例中,密钥有效负载中的值是必需的。当表没有主密钥时,键的 payload 字段中的值是可选的。

4

mydatabase.MYSCHEMA.CUSTOMERS.Key

定义密钥有效负载结构的 schema 名称。这个模式描述了更改的表的主键的结构。键架构名称的格式为 connector-name.database-name.table-name.Key。在这个示例中:

  • mydatabase 是生成此事件的连接器的名称。
  • MYSCHEMA 是包含已更改表的数据库模式。
  • CUSTOMERS 是更新的表。

5

payload

包含生成此更改事件的行的键。在本例中,键包含一个 ID 字段,其值是 1004

3.3.2. 关于 Debezium Db2 更改事件中的值

更改事件中的值比键稍微复杂。与键一样,该值具有 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。更改创建、更新或删除数据的事件,并且具有 envelope 结构的值有效负载。

考虑用于显示更改事件键示例的相同示例:

表示例

CREATE TABLE customers (
 ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
 FIRST_NAME VARCHAR(255) NOT NULL,
 LAST_NAME VARCHAR(255) NOT NULL,
 EMAIL VARCHAR(255) NOT NULL UNIQUE
);

customers 表的每个更改事件的事件值部分都指定了相同的模式。事件值的有效负载因事件类型而异:

创建 事件

以下示例显示了连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:

{
  "schema": {  1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "ID"
          },
          {
            "type": "string",
            "optional": false,
            "field": "FIRST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "LAST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "EMAIL"
          }
        ],
        "optional": true,
        "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value",  2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "ID"
          },
          {
            "type": "string",
            "optional": false,
            "field": "FIRST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "LAST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "EMAIL"
          }
        ],
        "optional": true,
        "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
        ],
        "optional": false,
        "name": "io.debezium.connector.db2.Source",  3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mydatabase.MYSCHEMA.CUSTOMERS.Envelope"  4
  },
  "payload": {  5
    "before": null,  6
    "after": {  7
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "john.doe@example.org"
    },
    "source": {  8
      "version": "2.1.4.Final",
      "connector": "db2",
      "name": "myconnector",
      "ts_ms": 1559729468470,
      "snapshot": false,
      "db": "mydatabase",
      "schema": "MYSCHEMA",
      "table": "CUSTOMERS",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
    },
    "op": "c",  9
    "ts_ms": 1559729471739  10
  }
}
表 3.7. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,用于描述值有效负载的结构。每次连接器为特定表生成的更改时,更改事件的值模式都是相同的。

2

name

schema 部分中,每个 name 字段指定值有效负载中的字段的 schema。

mydatabase.MYSCHEMA.CUSTOMERS.Value 是有效负载之前和之后字段的 schema。 这个模式特定于 customers 表。连接器将此模式用于 MYSCHEMA.CUSTOMERS 表中的所有行。

beforeafter 字段的 schema 的名称格式为 logicalName.schemaName.tableName.Value,这样可确保架构名称在数据库中是唯一的。这意味着,在使用 Avro converter 时,每个逻辑源中的每个表生成的 Avro 模式都有自己的演进和历史记录。

3

name

io.debezium.connector.db2.Source 是有效负载的 source 字段的 schema。这个模式特定于 Db2 连接器。连接器将其用于它生成的所有事件。

4

name

mydatabase.MYSCHEMA.CUSTOMERS.Envelope 是载荷总体结构的 schema,其中 mydatabase 是数据库,MYSCHEMA 是架构,CUSTOMERS 是表。

5

payload

值的实际数据。这是更改事件提供的信息。

似乎,事件的 JSON 表示量大于它们描述的行。这是因为 JSON 表示必须包含 schema 部分和消息的 payload 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的信息大小。

6

之前

指定事件发生前行状态的可选字段。当 op 字段是 c 用于创建(如本例所示),before 字段为 null,因为此更改事件用于新内容。

7

after

指定事件发生后行状态的可选字段。在本例中,after 字段包含新行的 IDFIRST_NAMELAST_NAMEEMAIL 列的值。

8

source

描述事件源元数据的必需字段。 结构显示有关此更改的 Db2 信息,它提供了可追溯性。它还包含可用于与同一主题或其他事件进行比较的信息,以了解此事件是否在之前、之后或作为其他事件相同的提交的一部分发生。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 在数据库中进行更改的时间戳
  • 事件是持续快照的一部分
  • 包含新行的数据库、模式和表的名称
  • 更改 LSN
  • 提交 LSN (如果此事件是快照的一部分,则随机)

9

op

描述导致连接器生成事件的操作类型的必要字符串。在本例中,c 表示操作创建了行。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = read (仅适用于快照)

10

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

更新 事件

示例 customer 表中更新的更改事件值与该表的 create 事件相同。同样,update 事件值的有效负载具有相同的结构。但是,事件值 payload 在更新 事件中包含不同的值。以下是连接器在 customers 表中为更新生成的更改事件值示例:

{
  "schema": { ... },
  "payload": {
    "before": {  1
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "john.doe@example.org"
    },
    "after": {  2
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "noreply@example.org"
    },
    "source": {  3
      "version": "2.1.4.Final",
      "connector": "db2",
      "name": "myconnector",
      "ts_ms": 1559729995937,
      "snapshot": false,
      "db": "mydatabase",
      "schema": "MYSCHEMA",
      "table": "CUSTOMERS",
      "change_lsn": "00000027:00000ac0:0002",
      "commit_lsn": "00000027:00000ac0:0007",
    },
    "op": "u",  4
    "ts_ms": 1559729998706  5
  }
}
表 3.8. 更新 事件值字段的描述
字段名称描述

1

之前

指定事件发生前行状态的可选字段。在 update 事件值中,before 字段包含每个表列的一个字段,以及在数据库提交前在该列中的值。在这个示例中,EMAIL 值为 john.doe@example.com

2

after

指定事件发生后行状态的可选字段。您可以比较 beforeafter 结构,以确定此行的更新是什么。在这个示例中,EMAIL 值现在是 noreply@example.com

3

source

描述事件源元数据的必需字段。source 字段结构包含与 create 事件中的相同字段,但某些值有所不同,例如,示例 update 事件具有不同的 LSN。您可以使用这些信息将此事件与其他事件进行比较,以了解此事件是否在与其他事件相同的提交之前、之后或作为与其他事件相同的提交的一部分发生。源元数据包括:

  • Debezium 版本
  • 连接器类型和名称
  • 在数据库中进行更改的时间戳
  • 事件是持续快照的一部分
  • 包含新行的数据库、模式和表的名称
  • 更改 LSN
  • 提交 LSN (如果此事件是快照的一部分,则随机)

4

op

描述操作类型的强制字符串。在 update 事件值中,op 字段值为 u,表示此行因为更新而改变。

5

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

注意

更新行的主/唯一键的列会更改行键的值。当键更改时,Debebe 会输出 三个 事件: DELETE 事件和带有行的旧键的 tombstone 事件,后跟带有行的新键的事件。

删除 事件

delete 更改事件中的值与为同一表的 createupdate 事件相同的 schema 部分。示例 customer 表的 delete 事件中的事件值 payload 类似如下:

{
  "schema": { ... },
  },
  "payload": {
    "before": {  1
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "noreply@example.org"
    },
    "after": null,  2
    "source": {  3
      "version": "2.1.4.Final",
      "connector": "db2",
      "name": "myconnector",
      "ts_ms": 1559730445243,
      "snapshot": false,
      "db": "mydatabase",
      "schema": "MYSCHEMA",
      "table": "CUSTOMERS",
      "change_lsn": "00000027:00000db0:0005",
      "commit_lsn": "00000027:00000db0:0007"
    },
    "op": "d",  4
    "ts_ms": 1559730450205  5
  }
}
表 3.9. 删除 事件值字段的描述
字段名称描述

1

之前

指定事件发生前行状态的可选字段。在一个 delete 事件值中,before 字段包含在使用数据库提交删除行前的值。

2

after

指定事件发生后行状态的可选字段。在 delete 事件值中,after 字段为 null,表示行不再存在。

3

source

描述事件源元数据的必需字段。在一个 delete 事件值中,source 字段结构与同一表的 createupdate 事件相同。许多 source 字段值也相同。在 delete 事件值中,ts_ms 和 LSN 字段值以及其他值可能已更改。但是,delete 事件值中的 source 字段提供了相同的元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 在数据库中进行更改的时间戳
  • 事件是持续快照的一部分
  • 包含新行的数据库、模式和表的名称
  • 更改 LSN
  • 提交 LSN (如果此事件是快照的一部分,则随机)

4

op

描述操作类型的强制字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms

显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中 ,ts_ms 表示数据库中更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

删除 更改事件记录为消费者提供处理此行删除所需的信息。包括了旧值,因为有些使用者可能需要它们才能正确处理删除。

Db2 连接器事件旨在用于 Kafka 日志压缩。只要每个密钥至少保留最新的消息,日志压缩就可以删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,如果 Kafka 删除具有相同键的所有消息,消息值必须是 null。为了实现此目的,在 Debezium 的 Db2 连接器发出 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键有一个 null 值 。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.