第 3 章 查看更改事件


部署 Debezium MySQL 连接器后,它会开始捕获对 inventory 数据库的更改。

当连接器启动时,它会将事件写入一组 Apache Kafka 主题,每个主题代表 MySQL 数据库中一个表。每个主题的名称以数据库服务器的名称开头,dbserver1

连接器写入以下 Kafka 主题:

dbserver1
架构更改主题,其中 DDL 语句适用于正在捕获更改的表。
dbserver1.inventory.products
接收 inventory 数据库中 product 的更改事件记录。
dbserver1.inventory.products_on_hand
接收 inventory 数据库中 product _on_hand 表的更改事件记录。
dbserver1.inventory.customers
接收 inventory 数据库中 客户 表的更改事件记录。
dbserver1.inventory.orders
接收 清单 数据库中 订单 表的变更事件记录。

本教程的其余部分检查 dbserver1.inventory.customers Kafka 主题。当您仔细查看主题时,您会看到它如何代表不同类型的更改事件,并查找有关每个事件捕获的连接器的信息。

教程包含以下部分:

3.1. 查看 创建事件

查看 dbserver1.inventory.customers 主题,您可以看到 MySQL 连接器如何在 inventory 数据库中捕获创建事件。在这种情况下,创建 的事件捕获正添加到数据库中的新客户。

流程

  1. 打开一个新终端,再使用 kafka-console-consumer 从主题 开始使用 dbserver1.inventory. customers 主题。

    此命令在运行 Kafka(my-cluster-kafka-0)的 Pod 中运行一个简单的使用者(kafka-console-consumer.sh):

    $ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --from-beginning \
      --property print.key=true \
      --topic dbserver1.inventory.customers

    使用者返回四个消息(采用 JSON 格式),每个消息对应 客户 表中的每一行。每个消息都包含相应表行的事件记录。

    每个事件都有两个 JSON 文档: 一个键一个值。键与行的主键对应,值显示行的详细信息(行包含的字段、每个字段的值以及行上执行的操作类型)。

  2. 对于最后一个事件,请查看 的详细信息。

    以下是最后一次事件 的密钥 的详细信息(便于阅读):

    {
      "schema":{
        "type":"struct",
          "fields":[
            {
              "type":"int32",
              "optional":false,
              "field":"id"
            }
          ],
        "optional":false,
        "name":"dbserver1.inventory.customers.Key"
      },
      "payload":{
        "id":1004
      }
    }

    该事件有两个部分: schemapayload该模式 包含一个 Kafka 连接模式,描述有效负载中的内容。在本例中,有效负载是名为 dbserver1.inventory.customers.Key 的构造,它是可选的,并且有一个必填字段(类型为 int32)。

    有效负载 具有单个 id 字段,值设为 1004

    通过查看事件 的密钥,您可以看到此事件适用于 inventory.customers 表(其 id 主键列的值为 1004 的行)。

  3. 检查同一事件 的值 的详细信息。

    事件 的值 显示创建了行,并描述它包含的内容(在本例中,idfirst_namelast_name 以及插入行 的电子邮件 )。

    以下是最后一次事件 的值 的详细信息(便于阅读):

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "before"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "after"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "version"
              },
              {
                "type": "string",
                "optional": false,
                "field": "name"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "server_id"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "ts_sec"
              },
              {
                "type": "string",
                "optional": true,
                "field": "gtid"
              },
              {
                "type": "string",
                "optional": false,
                "field": "file"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "pos"
              },
              {
                "type": "int32",
                "optional": false,
                "field": "row"
              },
              {
                "type": "boolean",
                "optional": true,
                "field": "snapshot"
              },
              {
                "type": "int64",
                "optional": true,
                "field": "thread"
              },
              {
                "type": "string",
                "optional": true,
                "field": "db"
              },
              {
                "type": "string",
                "optional": true,
                "field": "table"
              }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Source",
            "field": "source"
          },
          {
            "type": "string",
            "optional": false,
            "field": "op"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope",
        "version": 1
      },
      "payload": {
        "before": null,
        "after": {
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {
          "version": "1.9.5.Final",
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": true,
          "thread": null,
          "db": "inventory",
          "table": "customers"
        },
        "op": "r",
        "ts_ms": 1486500577691
      }
    }

    这个部分事件比,但与事件 的密钥 一样,它也具有 schema有效负载。该架构包含名为 dbserver1.inventory.customers.Envelope (版本 1)的 Kafka Connect 模式,它包含五个字段:

    op
    包含描述操作类型的字符串值的必需字段。MySQL 连接器的值为 c,用于创建(或插入)、u 表示 update、d 表示删除,以及 r 以便读取(在快照中)。
    before
    可选字段(如果存在)包含事件 发生前 行的状态。其结构将由 dbserver1. inventory.customers.Value Kafka Connect 模式进行描述,该模式在 inventory.customers 表中使用 dbserver1 连接器。
    之后
    可选字段(如果存在)包含事件发生 行的状态。结构由 前面 使用的相同 dbserver1.inventory.customers.Value Kafka Connect 模式描述。
    source
    一个必需字段,其中包含描述事件的源元数据的结构,该元素是 MySQL 的源元数据,包含多个字段:连接器名称、记录事件的 binlog 文件的名称、该 binlog 文件中的位置,即事件内的行(如果有多个字段),受影响的数据库和表的名称, 进行更改的 MySQL 线程 ID,无论此事件是快照的一部分,如果可用,则 MySQL 服务器 ID 以及以秒为单位的时间戳。
    ts_ms
    一个可选字段(如果存在)包含时间(在运行 Kafka Connect 任务的 JVM 中)以及连接器处理事件的 JVM 中。
    注意

    事件的 JSON 表示要比它们描述的行要长。这是因为,对于每个事件键和值,Kafka Connect 提供了描述 有效负载的 schema。随着时间的推移,这个结构可能会改变。但是,在事件本身中使用键和值的 schema 使得使用应用程序更容易理解消息,特别是随着时间推移而变化。

    Debezium MySQL 连接器根据数据库表的结构构建这些模式。如果您使用 DDL 语句更改 MySQL 数据库中的表定义,连接器会读取这些 DDL 声明并更新其 Kafka Connect 模式。这是构建每个事件的唯一方法,与在事件发生时所源自的表完全相同。但是,包含单一表的所有事件的 Kafka 主题可能会具有与表定义的每个状态对应的事件。

    JSON converter 在每个消息中包含键和值模式,因此它会生成非常详细的事件。

  4. 将事件 的键和值 模式与 inventory 数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下语句:

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)

    这表明您检查的事件记录与数据库中的记录相匹配。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.