5.2.2. Debezium MySQL 変更イベントの値


変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、ネストされたフィールドを含む、payload セクションの Envelope 構造を記述するスキーマが含まれます。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

このテーブルへの変更に対する変更イベントの値部分には以下について記述されています。

作成イベント

以下の例は、顧客 テーブルのデータを作成する操作に対してコネクターによって生成される変更イベントの値の部分を示しています。

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", 2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", 3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" 4
  },
  "payload": { 5
    "op": "c", 6
    "ts_ms": 1465491411815, 7
    "before": null, 8
    "after": { 9
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 10
      "version": "1.5.4.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}
表5.5 作成 イベント値フィールドの説明
項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。

2

name

スキーマ セクションで、各 name フィールドは、値のペイロードのフィールドに対するスキーマを指定します。

mysql-server-1.inventory.customers.Value は、beforeafter ペイロードのスキーマです。このスキーマは customers テーブルに固有です。

before および after フィールドのスキーマ名はlogicalName.tableName.Value の形式で、スキーマ名がデータベースで一意になるようにします。つまり、Avro コンバーター を使用する場合、各論理ソースの各テーブルの Avro スキーマには独自の進化と履歴があります。

3

name

io.debezium.connector.mysql.Source は、ペイロードの ソースフィールドの スキーマです。このスキーマは MySQL コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

mysql-server-1.inventory.customers.Envelope 、ペイロードの全体的な構造のスキーマ です

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。ただし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によって行が作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read(スナップショットのみに適用)

7

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

8

before

イベント発生前の行の状態を指定する任意のフィールド。この例では、op フィールドが create の c になる場合、この変更イベントは新規コンテンツ用であるため、before フィールドは null になります。

9

after

イベント発生後の行の状態を指定する任意のフィールド。この例では、after フィールドには新しい行の idfirst_name、last_name、およびメール コラム の値が含まれます。

10

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるデータベースおよびテーブルの名前
  • イベントを作成した MySQL スレッドの ID (スナップショット以外)
  • MySQL サーバー ID (利用可能な場合)
  • データベースに変更が加えられた時点のタイムスタンプ

binlog_rows_query_log_events MySQL 設定オプションが有効で、コネクター設定 include.query プロパティーが有効な場合、source フィールドは、変更イベントの起因となった元の SQL ステートメントが含まれる query フィールドも提供します。

更新イベント

サンプル 顧客 テーブルの更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターが 顧客 テーブルの更新に対して生成するイベントの変更イベント値の例になります。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 2
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 3
      "version": "1.5.4.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581029100,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 4
    "ts_ms": 1465581029523 5
  }
}
表5.6 更新 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の before フィールドには、データベースのコミット前に各テーブル列のフィールドと、その列にあった値が含まれます。この例では、first_name 値は Anne です。

2

after

イベント発生後の行の状態を指定する任意のフィールド。構造 の前後に 比較して、この行への更新内容を判断できます。この例では、first_name 値は Anne Marie になります。

3

source

イベントのソースメタデータを記述する必須のフィールド。ソースフィールド 構造には create イベントと同じフィールドがありますが、一部の値が異なります。たとえば、更新 イベントは binlog の異なる位置からです。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であるか
  • 更新された行が含まれるデータベースおよびテーブルの名前
  • イベントを作成した MySQL スレッドの ID (スナップショット以外)
  • MySQL サーバー ID (利用可能な場合)
  • データベースに変更が加えられた時点のタイムスタンプ

binlog_rows_query_log_events MySQL 設定オプションが有効で、コネクター設定 include.query プロパティーが有効な場合、source フィールドは、変更イベントの起因となった元の SQL ステートメントが含まれる query フィールドも提供します。

4

op

操作の型を記述する必須の文字列。更新 イベント値の op フィールドの値は u で、更新によってこの行が変更されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つのイベントが Debezium によって出力されます。3 つのイベントとは、DELETE イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。詳細は次のセクションで説明します。

プライマリーキーの更新

行のプライマリーキーフィールドを変更する UPDATE 操作は、プライマリーキーの変更と呼ばれます。プライマリーキーの変更では、UPDATE イベントレコードの代わりに、コネクターは古いキーの DELETE イベントレコードと、新しい(更新された)キーの CREATE イベントレコードを出力します。これらのイベントには通常の構造と内容があり、イベントごとにプライマリーキーの変更に関連するメッセージヘッダーがあります。

  • DELETE イベントレコードには、メッセージヘッダーとして __debezium.newkey があります。このヘッダーの値は、更新された行の新しいプライマリーキーです。
  • CREATE イベントレコードには、メッセージヘッダーとして __debezium.oldkey があります。このヘッダーの値は、更新された行にあった以前の (古い) プライマリーキーです。

削除 イベント

削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ スキーマ の部分になります。サンプル 顧客 テーブルの 削除 イベントの ペイロード 部分は以下のようになります。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, 2
    "source": { 3
      "version": "1.5.4.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581902300,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", 4
    "ts_ms": 1465581902461 5
  }
}
表5.7 削除 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の before フィールドには、データベースのコミットで削除される前に行にあった値が含まれます。

2

after

イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の after フィールドは null で、行が存在しないことを示します。

3

source

イベントのソースメタデータを記述する必須のフィールド。削除 イベント値では、ソースフィールドの 構造は、同じテーブルの 作成 および 更新 イベントと同じです。多くの ソースフィールド 値も同じです。削除 イベント値の the ts_ms および pos フィールドの値や、その他の値が変更された可能性があります。ただし、削除 イベント値の ソースフィールド は同じメタデータを提供します。

  • Debezium バージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であるか
  • 更新された行が含まれるデータベースおよびテーブルの名前
  • イベントを作成した MySQL スレッドの ID (スナップショット以外)
  • MySQL サーバー ID (利用可能な場合)
  • データベースに変更が加えられた時点のタイムスタンプ

binlog_rows_query_log_events MySQL 設定オプションが有効で、コネクター設定 include.query プロパティーが有効な場合、source フィールドは、変更イベントの起因となった元の SQL ステートメントが含まれる query フィールドも提供します。

4

op

操作の型を記述する必須の文字列。op フィールドの値は d で、この行が削除されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。

MySQL コネクターイベントは、Kafka のログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、 Kafka がストレージ領域を確保できるようにします。

廃棄 (tombstone) イベント

行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の MySQL コネクターは 削除 イベントを出力した後に 同じキーと null 値を持つ特別な廃棄(tombstone)イベントを出力します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.