6.2. Debezium MySQL 连接器数据更改事件的描述


Debezium MySQL 连接器为每个行级 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件包含一个键和值。键的结构和值取决于已更改的表。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间推移而改变,而用户很难处理这些事件。要解决这个问题,每个事件都包含其内容的 schema,或者如果您正在使用 schema registry,用户可以使用该模式 ID 从 registry 获取 schema。这使得每个事件都自包含。

以下框架 JSON 显示更改事件的基本四部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect converter,决定更改事件中的这四个部分的表示。只有在将转换器配置为生成它时,schema 字段才会处于更改事件中。同样,只有在您配置转换器来生成它时,事件密钥和事件有效负载才会处于更改事件中。如果您使用 JSON 转换程序,并将其配置为生成所有四个基本更改事件部分,更改事件具有此结构:

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表 6.7. 更改事件基本内容概述
字段名称描述

1

schema

第一个 schema 字段是事件键的一部分。它指定一个 Kafka Connect 模式,用于描述事件键的 payload 部分的内容。换句话说,第一个 schema 字段描述了主密钥的结构,如果表没有主键,则描述主键的结构。

可以通过设置 message.key.columns 连接器配置属性 来覆盖表的主键。在这种情况下,第一个 schema 字段描述了该属性标识的键的结构。

2

payload

第一个 payload 字段是 event 键的一部分。它具有前面的 schema 字段描述的结构,它包含已更改的行的密钥。

3

schema

第二个 schema 字段是事件值的一部分。它指定 Kafka Connect 模式,用于描述事件值 有效负载部分的内容。换句话说,第二个 模式 描述了已更改的行的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它具有上一个 schema 字段描述的结构,它包含已更改的行的实际数据。

默认情况下,连接器流将事件记录改为与事件原始表相同的主题。请参阅 主题名称

警告

MySQL 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称和数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,它将使用下划线字符替换。

如果逻辑服务器名称、数据库名称或表名称包含无效字符,且唯一与另一个名称区分名称的字符无效,这可能会导致意外冲突冲突,从而被下划线替换。

详情包括在以下主题中:

6.2.1. 关于 Debezium MySQL 中的键更改事件

更改事件的密钥包含更改表的密钥和更改行的实际键的 schema。当连接器创建事件时,schema 及其对应有效负载都会包含更改表的 PRIMARY KEY (或唯一约束)中每个列的字段。

考虑以下 客户 表,后跟此表的更改事件键的示例。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

每次捕获 customer 表的更改事件都有相同的事件关键模式。只要 customers 表有以前的定义,可以捕获 customer 表更改的事件都有以下关键结构:在 JSON 中,它类似如下:

{
 "schema": { 1
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { 5
    "id": 1001
  }
}
表 6.8. 更改事件键的描述
字段名称描述

1

schema

键的 schema 部分指定一个 Kafka Connect 模式,它描述了键的 payload 部分的内容。

2

mysql-server-1.inventory.customers.Key

定义密钥有效负载结构的模式名称。这个 schema 描述了已更改的表的主键的结构。键模式名称的格式是 connector-name.database-name.table-name.Key.在本例中:

  • MySQL-server-1 是生成此事件的连接器的名称。
  • inventory 是包含已更改表的数据库。
  • 客户 是更新的表。

3

optional

指明 event 键是否必须在其 payload 字段中包含一个值。在本例中,键有效负载中的值是必需的。当表没有主键时,键的 payload 字段中的值是可选的。

4

fields

指定 有效负载中 预期的每个字段,包括每个字段的名称、类型以及是否需要。

5

payload

包含生成此更改事件的行的密钥。在本例中,键 包含一个 id 字段,其值为 1001

6.2.2. 关于 Debezium MySQL 更改事件中的值

更改事件中的值比键复杂一些。与键一样,该值有一个 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的 schema,包括其嵌套字段。为创建、更新或删除数据的操作更改事件,它们都有一个带有 envelope 结构的值有效负载。

考虑用于显示更改事件键示例的相同示例表:

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

对这个表的更改事件的值部分描述:

创建 事件

以下示例显示了一个更改事件的值部分,连接器为在 customer 表中创建数据的操作生成的更改事件的值部分:

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", 2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", 3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" 4
  },
  "payload": { 5
    "op": "c", 6
    "ts_ms": 1465491411815, 7
    "before": null, 8
    "after": { 9
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 10
      "version": "2.3.7.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}
表 6.9. 创建 事件值字段的描述
字段名称描述

1

schema

值的 schema,用于描述值有效负载的结构。当连接器为特定表生成的每次更改事件中,更改事件的值模式都是相同的。

2

name

schema 部分中,每个 name 字段指定值的有效负载中字段的 schema。

mysql-server-1.inventory.customers.Value 是有效负载 beforeafter 字段的 schema。这个模式特定于 customers 表。

beforeafter 字段的模式名称格式为 logicalName.tableName.Value,这样可确保 schema 名称在数据库中是唯一的。这意味着,在使用 Avro converter 时,每个逻辑源中的每个表生成的 Avro 模式都有自己的 evolution 和 history。

3

name

io.debezium.connector.mysql.Source 是有效负载的 source 字段的 schema。这个模式特定于 MySQL 连接器。连接器将其用于它生成的所有事件。

4

name

mysql-server-1.inventory.customers.Envelope 是负载总体结构的模式,其中 dbserver1 是连接器名称,inventory 是数据库,customers 是表。

5

payload

值的实际数据。这是更改事件提供的信息。

可能会出现事件的 JSON 表示比它们描述的行大得多。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro converter,您可以显著减少连接器流到 Kafka 主题的信息大小。

6

op

描述导致连接器生成事件的操作类型的强制字符串。在本例中,c 表示操作创建了行。有效值为:

  • c = create
  • u = update
  • d = delete
  • r = 读取(仅适用于快照)

7

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

8

before

指定事件发生前行状态的可选字段。当 op 字段是 c 用于创建(如本例所示),before 字段为 null,因为此更改事件用于新内容。

9

after

指定事件发生后行状态的可选字段。在本例中,after 字段包含新行的 idfirst_namelast_nameemail 列的值。

10

source

描述事件源元数据的必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,以及事件的来源、事件发生的顺序以及事件是否为同一事务的一部分。源元数据包括:

  • Debezium 版本
  • 连接器名称
  • 记录事件的 binlog 名称
  • binlog 位置
  • 事件中的行
  • 如果事件是快照的一部分
  • 包含新行的数据库和表的名称
  • 创建事件的 MySQL 线程的 ID (仅限非快照)
  • MySQL 服务器 ID (如果可用)
  • 在数据库中进行更改时的时间戳

如果启用了 binlog_rows_query_log_events MySQL 配置选项,并且启用了连接器配置 include.query 属性,source 字段还提供 query 字段,其中包含导致更改事件的原始 SQL 语句。

更新 事件

示例 customers 表中一个更新的改变事件的值有与那个表的 create 事件相同的模式。同样,事件值有效负载具有相同的结构。但是,事件值有效负载在 update 事件中包含不同的值。以下是连接器为 customer 表中更新生成的更改事件值 的示例

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 2
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 3
      "version": "2.3.7.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581029100,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 4
    "ts_ms": 1465581029523 5
  }
}
表 6.10. 更新 事件值字段的描述
字段名称描述

1

before

指定事件发生前行状态的可选字段。在 update 事件值中,before 字段包含每个表列的字段,以及数据库提交前该列中的值。在本例中,first_name 值为 Anne。

2

after

指定事件发生后行状态的可选字段。您可以比较 beforeafter 结构,以确定这个行的更新是什么。在示例中,first_name 值现在是 Anne Marie

3

source

描述事件源元数据的必需字段。source 字段结构与 create 事件中的字段相同,但一些值有所不同,例如,示例 update 事件来自 binlog 中的不同位置。源元数据包括:

  • Debezium 版本
  • 连接器名称
  • 记录事件的 binlog 名称
  • binlog 位置
  • 事件中的行
  • 如果事件是快照的一部分
  • 包含更新行的数据库和表的名称
  • 创建事件的 MySQL 线程的 ID (仅限非快照)
  • MySQL 服务器 ID (如果可用)
  • 在数据库中进行更改时的时间戳

如果启用了 binlog_rows_query_log_events MySQL 配置选项,并且启用了连接器配置 include.query 属性,source 字段还提供 query 字段,其中包含导致更改事件的原始 SQL 语句。

4

op

描述操作类型的强制字符串。在 update 事件值中,op 字段值为 u,表示此行因为更新而改变。

5

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

注意

更新行 primary/unique 键的列会更改行的键值。当键更改时,Debezium 会输出 三个 事件:一个 DELETE 事件,以及一个带有行的旧键的 tombstone 事件,后跟一个带有行的新键的事件。详情在下一节中。

主密钥更新

更改行的主键字段的 UPDATE 操作称为主密钥更改。对于主键更改,以代替 UPDATE 事件记录,连接器为旧密钥发出 DELETE 事件记录,并为新的(updated)密钥的 CREATE 事件记录。这些事件具有常见的结构和内容,另外,每个事件都有与主密钥更改相关的消息标头:

  • DELETE 事件记录具有 __debezium.newkey 作为消息标头。此标头的值是更新行的新主键。
  • CREATE 事件记录具有 __debezium.oldkey 作为消息标头。此标头的值是更新行所具有的前一个主键(旧的)主键。

删除 事件

delete 更改事件中的值与为同一表的 createupdate 事件相同的 schema 部分。示例 customer 表的 delete 事件中 payload 部分类似如下:

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, 2
    "source": { 3
      "version": "2.3.7.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581902300,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", 4
    "ts_ms": 1465581902461 5
  }
}
表 6.11. 删除 事件值字段的描述
字段名称描述

1

before

指定事件发生前行状态的可选字段。在一个 delete 事件值中,before 字段包含在使用数据库提交删除行前的值。

2

after

指定事件发生后行状态的可选字段。在 delete 事件值中,after 字段为 null,表示行不再存在。

3

source

描述事件源元数据的必需字段。在一个 delete 事件值中,source 字段结构与同一表的 createupdate 事件相同。许多 source 字段值也相同。在 delete 事件值中,ts_mspos 字段值以及其他值可能已更改。但是 delete 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本
  • 连接器名称
  • 记录事件的 binlog 名称
  • binlog 位置
  • 事件中的行
  • 如果事件是快照的一部分
  • 包含更新行的数据库和表的名称
  • 创建事件的 MySQL 线程的 ID (仅限非快照)
  • MySQL 服务器 ID (如果可用)
  • 在数据库中进行更改时的时间戳

如果启用了 binlog_rows_query_log_events MySQL 配置选项,并且启用了连接器配置 include.query 属性,source 字段还提供 query 字段,其中包含导致更改事件的原始 SQL 语句。

4

op

描述操作类型的强制字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

删除 更改事件记录为消费者提供处理此行删除所需的信息。包含旧值,因为有些用户可能需要它们才能正确处理删除。

MySQL 连接器事件被设计为使用 Kafka 日志压缩。只要保留每个密钥的最新消息,日志压缩就会启用删除一些旧的消息。这可让 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

tombstone 事件

删除行时,delete 事件值仍可用于日志压缩,因为 Kafka 您可以删除具有相同键的所有之前信息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须为 null。为了实现此目的,在 Debezium 的 MySQL 连接器发出 一个 delete 事件后,连接器会发出一个特殊的 tombstone 事件,它具有相同的键但有一个 null 值。

Truncate 事件

截断 更改事件信号,表示表已被截断。在这种情况下,message 键为 null,message 值类似如下:

{
    "schema": { ... },
    "payload": {
        "source": { 1
            "version": "2.3.7.Final",
            "name": "mysql-server-1",
            "connector": "mysql",
            "name": "mysql-server-1",
            "ts_ms": 1465581029100,
            "snapshot": false,
            "db": "inventory",
            "table": "customers",
            "server_id": 223344,
            "gtid": null,
            "file": "mysql-bin.000003",
            "pos": 484,
            "row": 0,
            "thread": 7,
            "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
        },
        "op": "t", 2
        "ts_ms": 1465581029523 3
    }
}
表 6.12. truncate 事件值字段的描述
字段名称描述

1

source

描述事件源元数据的必需字段。在 truncate 事件值中,source 字段结构与为同一表的 create, update, 和 delete 事件相同,提供此元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 记录事件的 binlog 名称
  • binlog 位置
  • 事件中的行
  • 如果事件是快照的一部分
  • 数据库和表的名称
  • 截断事件的 MySQL 线程的 ID (仅限非快照)
  • MySQL 服务器 ID (如果可用)
  • 在数据库中进行更改时的时间戳

2

op

描述操作类型的强制字符串。op 字段值为 t,表示此表已被截断。

3

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

+ 在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

如果单个 TRUNCATE 语句应用到多个表,则会为每个删节的表发出一个 truncate 更改事件记录。

请注意,由于 truncate 事件代表对整个表进行的更改,且没有消息密钥,除非您使用单个分区的主题,否则与表相关的更改事件没有排序保证(创建更新 等)和 截断 该表的事件。例如,当这些事件从不同的分区读取时,消费者只能在该表的 truncate 事件后收到 update 事件。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.