第4章 変更イベントの表示


Debezium MySQL コネクターのデプロイ後に、inventory データベースへの変更のキャプチャーが開始されます。

コネクターが起動すると、一連の Apache Kafka トピックにイベントが書き込まれます。各トピックは、MySQL データベース内のテーブルの 1 つを表します。各トピックの名前は、データベースサーバーの名前 dbserver1 で始まります。

コネクターは、次の Kafka トピックに書き込みます。

dbserver1
変更がキャプチャーされるテーブルに適用される DDL ステートメントが書き込まれるスキーマ変更トピック。
dbserver1.inventory.products
inventory データベースの products テーブルの変更イベントレコードを受け取ります。
dbserver1.inventory.products_on_hand
inventory データベースの products_on_hand テーブルの変更イベントレコードを受け取ります。
dbserver1.inventory.customers
inventory データベースの customers テーブルの変更イベントレコードを受け取ります。
dbserver1.inventory.orders
inventory データベースの orders テーブルの変更イベントレコードを受け取ります。

このチュートリアルの残りの部分では、dbserver1.inventory.customers Kafka トピックを調べます。トピックを詳しく見ていくと、さまざまな種類の変更イベントがどのように表されているかがわかり、各イベントをキャプチャーしたコネクターに関する情報が見つかります。

このチュートリアルには、次のセクションが含まれています。

4.1. 作成 イベントの表示

dbserver1.inventory.customers トピックを表示すると、MySQL コネクターが inventory データベースの 作成 イベントをどのようにキャプチャーしたが分かります。この場合、作成 イベントは、データベースに追加された新規顧客をキャプチャーします。

手順

  1. 新しいターミナルを開き、kafka-console-consumer を使用してトピックの最初から dbserver1.inventory.customers トピックを使用します。

    このコマンドは、Kafka (my-cluster-kafka-0) を実行している Pod で簡単なコンシューマー (kafka-console-consumer.sh) を実行します。

    $ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --from-beginning \
      --property print.key=true \
      --topic dbserver1.inventory.customers

    コンシューマーは、customers テーブルの各行に 1 つずつ、4 つのメッセージ (JSON 形式) を返します。各メッセージには、対応するテーブル行のイベントレコードが含まれます。

    各イベントには、キー という 2 つの JSON ドキュメントがあります。キーは行のプライマリーキーに対応し、値は行の詳細 (行に含まれるフィールド、各フィールドの値、および行で実行された操作のタイプ) を表します。

  2. 最後のイベントでは、キー の詳細を確認します。

    最後のイベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。

    {
      "schema":{
        "type":"struct",
          "fields":[
            {
              "type":"int32",
              "optional":false,
              "field":"id"
            }
          ],
        "optional":false,
        "name":"dbserver1.inventory.customers.Key"
      },
      "payload":{
        "id":1004
      }
    }

    このイベントには、schemapayload の 2 つの部分があります。schema には、ペイロードの内容を記述する Kafka Connect スキーマが含まれています。この場合、ペイロードは dbserver1.inventory.customers.Key という名前の struct です。これはオプションではなく、必須フィールド (int32 型の ID) を 1 つ持っています。

    payload には、値が 1004id フィールドが 1 つあります。

    イベントの キー を確認すると、このイベントは id のプライマリーキー列の値が 1004 である inventory.customers テーブルの行に提供されることが分かります。

  3. 同じイベントの の詳細を確認します。

    イベントの は、行が作成されたことを示し、その行に含まれる内容 (この場合は挿入された行の idfirst_namelast_name、および email) を表します。

    最後のイベントの の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "before"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "after"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "version"
              },
              {
                "type": "string",
                "optional": false,
                "field": "name"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "server_id"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "ts_sec"
              },
              {
                "type": "string",
                "optional": true,
                "field": "gtid"
              },
              {
                "type": "string",
                "optional": false,
                "field": "file"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "pos"
              },
              {
                "type": "int32",
                "optional": false,
                "field": "row"
              },
              {
                "type": "boolean",
                "optional": true,
                "field": "snapshot"
              },
              {
                "type": "int64",
                "optional": true,
                "field": "thread"
              },
              {
                "type": "string",
                "optional": true,
                "field": "db"
              },
              {
                "type": "string",
                "optional": true,
                "field": "table"
              }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Source",
            "field": "source"
          },
          {
            "type": "string",
            "optional": false,
            "field": "op"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope",
        "version": 1
      },
      "payload": {
        "before": null,
        "after": {
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {
          "version": "2.5.4.Final",
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": true,
          "thread": null,
          "db": "inventory",
          "table": "customers"
        },
        "op": "r",
        "ts_ms": 1486500577691
      }
    }

    イベントのこの部分ははるかに長くなりますが、イベントの キー と同様に schemapayload もあります。schema には、dbserver1.inventory.customers.Envelope (バージョン 1) という名前の Kafka Connect スキーマが含まれており、次の 5 つのフィールドがあります。

    op
    操作のタイプを記述する文字列値が含まれる必須フィールド。MySQL コネクターの値は、c (作成または挿入)、u (更新)、d (削除)、および r (読み取り) です (スナップショットの場合)。
    before
    任意のフィールド。存在する場合は、イベント発生 の行の状態が含まれます。この構造は、dbserver1.inventory.customers.Value Kafka Connect スキーマによって記述されます。dbserver1 コネクターは、このスキーマを inventory.customers テーブルのすべての行に使用します。
    after
    任意のフィールド。存在する場合は、イベント発生 の行の状態が含まれます。この構造は、before で使用されるのと同じ dbserver1.inventory.customers.Value Kafka Connect スキーマで記述されます。
    source
    イベントのソースメタデータを記述する構造が含まれる必須のフィールド。MySQL の場合、次の複数のフィールドが含まれます。コネクター名、イベントが記録された binlog ファイルの名前、binlog ファイルでのイベント発生位置、イベント内の行 (複数ある場合)、影響を受けるデータベースおよびテーブルの名前、変更を行った MySQL スレッド ID、このイベントはスナップショットの一部であったかどうか、MySQL サーバー ID (ある場合)、および秒単位のタイムスタンプ。
    ts_ms
    任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。
    注記

    イベントの JSON 表現は、記述される行よりもはるかに長くなります。これは、Kafka Connect はすべてのイベントのキーと値とともに、ペイロード を記述する スキーマ を提供するためです。今後、この構造は変更される可能性があります。ただし、特に使用する側のアプリケーションが時間とともに進化する場合は、キーと値のスキーマがイベント自体にあると、メッセージを理解するのが非常に容易になります。

    Debezium MySQL コネクターは、データベーステーブルの構造に基づいてこれらのスキーマを構築します。DDL ステートメントを使用して MySQL データベースのテーブル定義を変更する場合、コネクターはこの DDL ステートメントを読み取り、Kafka Connect スキーマを更新します。これは、イベント発生時にイベントの発生元となったテーブルとまったく同じように各イベントを構造化する唯一の方法です。ただし、単一テーブルのイベントがすべて含まれる Kafka トピックには、テーブル定義の各状態に対応するイベントが含まれる場合があります。

    JSON コンバーターはすべてのメッセージにキーと値のスキーマを含めるため、非常に詳細なイベントを生成します。

  4. イベントの キー および スキーマを、inventory データベースの状態と比較します。MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)

    これは、確認したイベントレコードがデータベースのレコードと一致することを示しています。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.