5.3.4.2. イベント値の変更


メッセージキーと同様に、変更イベントメッセージの値には schema セクションおよび payload セクションがあります。SQL Server コネクターによって生成されるすべての変更イベント値の payload セクションには、以下のフィールドが含まれる 構造 があります。

  • op は、操作のタイプを記述する文字列の値が含まれる必須フィールドです。SQL Server コネクターの値は c create(または insert)、u 更新、d 削除、および r 読み取り(スナップショットの場合)になります。
  • before はオプションのフィールドで、イベント発生 の行の状態がある場合はその行の状態になります。この構造は、server1 コネクターが dbo.customers テーブルのすべての行に使用する server1.dbo.customers.Value Kafka Connect スキーマによって記述されます。
  • after はオプションのフィールドで、イベント発生 の行の状態が含まれる場合です。構造は、で使用される同じ server1.dbo.customers.Value Kafka Connect スキーマによって記述され beforeます。
  • source イベントのソースメタデータを記述する構造が含まれる必須フィールドです。この場合、SQL Server のフィールドには Debezium バージョン、コネクター名、イベントが進行中のスナップショットの一部であるかどうか、またはコミット LSN、変更が発生した時点の LSN、データベース、スキーマ、および変更の時点を表すタイムスタンプが含まれます。

    また、ストリーミング時に event_serial_no はフィールドが表示されます。これは、同じコミットがあり、LSN を変更するイベント間を区別するために使用されます。値が異なる場合は、ほとんどの場合 2 つの状況があり 1ます。

    • 更新イベントにはの値がに設定されます。これは 2、更新によって SQL Server の Change テーブルに 2 つのイベントが生成されるためです(ソースドキュメント)。最初の値には古い値が含まれ、2 つ目には新しい値が含まれます。したがって、1 つ目はドロップされ、Debezium 変更イベントを作成するために 2 つ目の値とともに使用されます。
    • プライマリーキーが更新されると、SQL Server は 2 つのレコードを発行して、古いプライマリーキーの値のレコードを delete 削除 insert し、新しいプライマリーキーでレコードを作成します。どちらの操作も同じコミットを共有し、LSN を変更します。また、イベント番号は 1 およびです 2
  • ts_ms は任意で、コネクターがイベントを処理した JVM のシステムクロックを使用して(Kafka Connect タスクを実行している JVM でシステムクロックを使用)。

当然ながら、イベントメッセージ値の スキーマ 部分には、この構造と内部のネストされたフィールドを記述するスキーマが含まれます。

5.3.4.2.1. イベントの作成

イベントの 作成 値を以下の customers 表で見てみましょう。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "server1.dbo.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "source": {
      "version": "1.1.2.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729468470,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
      "event_serial_no": "1"
    },
    "op": "c",
    "ts_ms": 1559729471739
  }
}

このイベントの schema 一部を確認すると、source 構造のスキーマ(SQL Server コネクターに固有で すべてのイベントで再利用される)のスキーマ、before および after フィールドのテーブル固有のスキーマが表示されます。

注記

before および after フィールドのスキーマ名は logicalName. schemaName . tableName.Value の形式であるため、他のすべてのテーブルの他のすべてのスキーマから完全に独立した名前になります。つまり、Avro Converter を使用すると、各 論理ソース各テーブル の Avro スキーマは、独自の進化と履歴になります。

このイベントの payload 一部を確認すると、イベントの情報が表示されます。つまり、行が作成されたこと(つまり op=c)、および after フィールドの値に、新しい挿入された行の、、、id first_name last_nameおよび email 列の値が含まれていることが示されています。

注記

これは、イベントの JSON 表現が記述する行よりもはるかに大きいことが分かります。JSON 表現にはメッセージの スキーマペイロード の部分を含める必要があるため、これは true です。を使用すると、Kafka トピックに書き込まれた実際のメッセージのサイズを大幅に縮小することもできます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.