第 3 章 查看更改事件


部署 Debezium MySQL 连接器后,它开始捕获 清单 数据库的更改。

当连接器启动时,它会将事件写入一组 Apache Kafka 主题,每个主题代表 MySQL 数据库中的一个表。每个主题的名称都以数据库服务器 dbserver1 的名称开头。

连接器写入以下 Kafka 主题:

dbserver1
架构更改主题为应用于要捕获的更改的 DDL 语句将写入到其中。
dbserver1.inventory.products
接收 inventory 数据库中 product 的更改事件记录。
dbserver1.inventory.products_on_hand
接收 inventory 数据库中 product _on_hand 表的更改事件记录。
dbserver1.inventory.customers
接收 库存 数据库中 客户 表的更改事件记录。
dbserver1.inventory.orders
接收 清单 数据库中的 排序 表的更改事件记录。

本教程的其余部分检查 dbserver1.inventory.customers Kafka 主题。正如您在主题中仔细查看,您会看到它如何代表不同类型的更改事件,并查找有关每个事件捕获的连接器的信息。

教程包含以下部分:

3.1. 查看 创建事件

通过查看 dbserver1.inventory.customers 主题,您可以看到 MySQL 连接器如何获取在 inventory 数据库中捕获的事件。在这种情况下,创建事件 会捕获向数据库添加新客户。

流程

  1. 打开一个新终端,再使用 kafka-console-consumer 来消耗主题开头的 dbserver1.inventory.customers 主题。

    该命令在运行 Kafka (my-cluster-kafka-0)的 Pod 中运行一个简单的使用者(kafka-console-consumer.sh):

    $ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --from-beginning \
      --property print.key=true \
      --topic dbserver1.inventory.customers

    使用者返回四条消息(采用 JSON 格式),客户 表中的每一行都会返回一条消息。每个消息都包含相应表行的事件记录。

    每个事件都有两个 JSON 文档:一个 键和值 。键与行的主键对应,值显示行的详情(包括行的字段、每个字段的值以及在该行中执行的操作类型)。

  2. 对于最后的事件,请查看 密钥 的详细信息。

    以下是最后一次事件 的密钥 的详细信息(为可读性而格式化):

    {
      "schema":{
        "type":"struct",
          "fields":[
            {
              "type":"int32",
              "optional":false,
              "field":"id"
            }
          ],
        "optional":false,
        "name":"dbserver1.inventory.customers.Key"
      },
      "payload":{
        "id":1004
      }
    }

    该事件有两个部分:一个 schema 和一个 有效负载模式 包含一个 Kafka Connect 模式,描述有效负载中的内容。在本例中,有效负载是名为 dbserver1.inventory.customers.Keystruct,它是可选的,并且有一个必填字段(类型为 int32)。

    有效负载 有一个 id 字段,值为 1004

    通过查看事件 的密钥,您可以看到此事件应用到 清单中的行。其 id 主键列值为 1004 的客户 表。

  3. 检查同一事件 的值 的详细信息。

    事件 的值 显示创建了行,并描述其所包含的内容(本例中为 idfirst_namelast_name,以及插入行 的电子邮件 )。

    以下是最后一次事件 的值 的详细信息(为可读性而格式化):

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "before"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "after"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "version"
              },
              {
                "type": "string",
                "optional": false,
                "field": "name"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "server_id"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "ts_sec"
              },
              {
                "type": "string",
                "optional": true,
                "field": "gtid"
              },
              {
                "type": "string",
                "optional": false,
                "field": "file"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "pos"
              },
              {
                "type": "int32",
                "optional": false,
                "field": "row"
              },
              {
                "type": "boolean",
                "optional": true,
                "field": "snapshot"
              },
              {
                "type": "int64",
                "optional": true,
                "field": "thread"
              },
              {
                "type": "string",
                "optional": true,
                "field": "db"
              },
              {
                "type": "string",
                "optional": true,
                "field": "table"
              }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Source",
            "field": "source"
          },
          {
            "type": "string",
            "optional": false,
            "field": "op"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope",
        "version": 1
      },
      "payload": {
        "before": null,
        "after": {
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {
          "version": "1.9.7.Final",
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": true,
          "thread": null,
          "db": "inventory",
          "table": "customers"
        },
        "op": "r",
        "ts_ms": 1486500577691
      }
    }

    此部分事件已长长,但与事件 的关键 相似,它还具有架构和 有效载荷该架构包含一个名为 dbserver1.inventory.customers.Envelope (版本 1)的 Kafka Connect 模式,它可以包含五个字段:

    op
    包含描述操作类型的字符串值的必需字段。MySQL 连接器的值为 c,用于创建(或插入)、u 用于更新、d 表示删除,以及 r 表示读取(在快照的情况下)。
    before
    可选字段(如果存在)包含事件 发生前 行的状态。该结构将由 dbserver1. inventory.customers.Value Kafka Connect 模式描述,dbserver1 连接器用于清单.customers 表中的所有行。
    after
    可选字段(如果存在)包含事件发生 行的状态。结构由 前面 使用的相同 dbserver1.inventory.customers.Value Kafka Connect 模式描述。
    source
    一个必需字段,其中包含一个描述事件的源元数据的结构(在 MySQL 中包含几个字段):连接器名称、正在记录该事件的 binlog 文件的名称、事件所出现的 binlog 文件中的位置、事件中的行(如果是多个字段)、受影响数据库和表的名称,以及受影响的数据库和表的名称。 进行更改的 MySQL 线程 ID (此事件是否为快照的一部分),以及 MySQL 服务器 ID 以及以秒为单位的时间戳。
    ts_ms
    如果存在可选字段,其中包含运行 Kafka Connect 任务的 JVM 时间(使用系统时钟),其中可处理该事件。
    注意

    事件的 JSON 表示比它们描述的行要长。这是因为,带有每个事件键和值,Kafka Connect 会发布描述 有效负载的 schema。这种结构可能会随着时间推移而变化。但是,拥有键和事件本身的模式,使得使用应用程序更容易理解消息,特别是随着时间发展而变化。

    Debezium MySQL 连接器根据数据库表的结构来构建这些架构。如果您使用 DDL 语句更改 MySQL 数据库中的表定义,连接器会读取这些 DDL 声明并更新其 Kafka 连接模式。这是每个事件的结构与来自事件发生时所源自于其的唯一方式。但是,包含单个表的所有事件的 Kafka 主题可能具有与表定义每个状态对应的事件。

    JSON 转换器包含每条消息中的键和值 schema,因此它会生成非常详细的事件。

  4. 将事件 的键和值 模式与 inventory 数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下声明:

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)

    这表明您检查的事件记录与数据库中的记录相匹配。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.