4.3.2. Debezium MongoDB 変更イベントの値


変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、 Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルドキュメントについて考えてみましょう。

ドキュメントの例

{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

このドキュメントへの変更に対する変更イベントの値部分には、以下の各イベントタイプについて記述されています。

作成 イベント

以下の例は、customers コレクションにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。

{
    "schema": { 1
      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 2
          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 3
          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 4
      },
    "payload": { 5
      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 6
      "patch": null,
      "source": { 7
        "version": "1.9.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c", 8
      "ts_ms": 1558965515240 9
    }
  }
表4.6 作成 イベント値フィールドの説明
項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のコレクションに生成するすべての変更イベントで同じになります。

2

name

schema セクションで、各 name フィールドは、値のペイロードのフィールドに対するスキーマを指定します。

io.debezium.data.Json はペイロードの afterpatch、および filter フィールドのスキーマです。このスキーマは customers コレクションに固有です。作成 イベントは、after フィールドが含まれる唯一のイベントです。更新 イベントには、filter フィールドと patchフィールドが含まれます。delete イベントには filter フィールドが含まれますが、after フィールドや patch フィールドは含まれません。

3

name

io.debezium.connector.mongo.Source はペイロードの sourceフィールドのスキーマです。このスキーマは MongoDB コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

dbserver1.inventory.customers.Envelope は、ペイロードの全体的な構造のスキーマで、dbserver1 はコネクター名、inventory はデータベース、customers はコレクションを指します。このスキーマはコレクションに固有です。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述するドキュメントよりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。しかし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

after

イベント発生後のドキュメントの状態を指定する任意のフィールド。この例では、after フィールドには新しいドキュメントの _idfirst_namelast_name、および email フィールドの値が含まれます。after の値は常に文字列です。慣例により、ドキュメントの JSON 表現が含まれます。MongoDB の oplog エントリーには、_create_ イベントと update イベント (capture.mode オプションを change_streams_update_full に設定した場合) のときだけドキュメントの完全な状態が含まれます。言い換えると、capture.mode オプションを oplogchange_streams にしたときに 事後 フィールドを含むイベントは create イベントだけです。

7

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 新しいドキュメントが含まれるコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子。これは MongoDB のバージョンに依存します。これは、oplog イベントの h フィールド、または oplog イベントの lsidtxnNumber フィールドを表す stxnid といいうフィールドです (oplog capture モードのみ)。
  • MongoDB セッションの一意な識別子 lsid と、トランザクション内で変更が実行された場合のトランザクション番号 txnNumber (変更ストリームキャプチャモードのみ) です。

8

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によってドキュメントが作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read (読み取り、スナップショットのみに適用)

9

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

Oplog キャプチャーモード (レガシー)

サンプル customers コレクションにある更新の変更イベントの値には、そのコレクションの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。更新 イベントには after の値はありません。その代わりに、以下の 2 つのフィールドがあります。

  • patch は、べき等更新操作の JSON 表現が含まれる文字列フィールドです。
  • filter は、更新の選択基準の JSON 表現が含まれる文字列フィールドです。filter 文字列には、シャード化コレクションの複数のシャードキーフィールドを含めることができます。

以下は、コネクターによって customers コレクションでの更新に生成されるイベントの変更イベント値の例になります。

{
    "schema": { ... },
    "payload": {
      "op": "u", 1
      "ts_ms": 1465491461815, 2
      "patch": "{\"$set\":{\"first_name\":\"Anne Marie\"}}", 3
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}", 4
      "source": { 5
        "version": "1.9.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
表4.7 更新 イベント値フィールドの説明
項目フィールド名説明

1

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、u は操作によってドキュメントが更新されたことを示しています。

2

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

patch

ドキュメントへの実際の MongoDB のべき等変更の JSON 文字列表現が含まれます。この例では、更新で first_name フィールドを新しい値に変更されています。

更新 イベント値には after フィールドが含まれません。

4

filter

更新するドキュメントの特定に使用された MongoDB 選択基準の JSON 文字列表現が含まれます。

5

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 更新されたドキュメントが含まれるコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子。これは MongoDB のバージョンに依存します。これは、oplog イベントの h フィールド、または oplog イベントの lsid および txnNumber フィールドを表す stxnid という名前のフィールドです。
警告

Debezium 変更イベントでは、MongoDB は patch フィールドの内容を提供します。このフィールドの形式は、MongoDB データベースのバージョンによって異なります。したがって、新しい MongoDB データベースバージョンにアップグレードする場合は、形式が変更された可能性があるため注意してください。本書のサンプルは、MongoDB 3.4 から取得したため、ご使用のアプリケーションではイベントの形式が異なる場合があります。

注記

MongoDB の oplog では、更新 イベントには変更されたドキュメントの または の状態は含まれません。そのため、Debezium コネクターがこの情報を提供することはできません。ただし、Debezium コネクターは 作成 および 読み取り イベントでドキュメントの開始状態を提供します。ストリームのダウンストリームのコンシューマーは、ドキュメントごとに最新状態を維持し、新しいイベントの状態を保存された状態に比較することで、ドキュメント状態を再構築できます。Debezium コネクターはこの状態を維持できません。

Chang Streams Capture モード

サンプル customers コレクションにある更新の変更イベントの値には、そのコレクションの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。更新 イベントは capture.mode オプションが change_streams_update_full に設定されている場合のみ、after 値を持つようになります。この場合、新たな構造化フィールド updateDescription が追加されました。

  • updatedFields は、更新されたドキュメントフィールドの JSON 表現とその値を含む文字列フィールドです
  • removedFields は、ドキュメントから削除されたフィールド名のリストです。
  • truncatedArrays は、省略されたドキュメントのアレイの一覧です。

以下は、コネクターによって customers コレクションでの更新に生成されるイベントの変更イベント値の例になります。

{
    "schema": { ... },
    "payload": {
      "op": "u", 1
      "ts_ms": 1465491461815, 2
      "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 3
      "updateDescription": {
        "removedFields": null,
        "updatedFields": "{\"first_name\": \"Anne Marie\"}", 4
        "truncatedArrays": null
      },
      "source": { 5
        "version": "1.9.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 1,
        "h": null,
        "tord": null,
        "stxnid": null,
        "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
        "txnNumber":1
      }
    }
  }
表4.8 更新 イベント値フィールドの説明
項目フィールド名説明

1

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、u は操作によってドキュメントが更新されたことを示しています。

2

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

after

実際の MongoDB ドキュメントを表す JSON 文字列が含まれます。
キャプチャモードが change_streams_update_full に設定されていない場合、更新 イベントの値に after フィールドが含まれません。

4

updatedFields

ドキュメントの更新されたフィールド値の JSON 文字列表現が含まれます。この例では、更新により first_name フィールドが新しい値に変更されました。

5

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 更新されたドキュメントが含まれるコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB セッションの一意な識別子 lsid とトランザクション番号 txnNumber (変更がトランザクションの中で実行された場合) です。
警告

イベント内の after の値は、ドキュメントの at-point-of-time の値として処理される必要があります。この値は動的に計算されるのではなく、コレクションから取得される。このため、複数の更新が次々に行われる場合、すべての 更新 更新イベントには、文書に保存されている最後の値を表す同じ after 値が含まれる可能性がある。

アプリケーションが段階的な変更の進化に依存している場合は、updateDescription のみに依存する必要があります。

削除 イベント

delete change イベントの値は、createupdate と同じ schema 部分を持ちます。delete イベントの payload 部分には、同じコレクションの 作成更新 イベントとは異なる値が含まれます。特に、delete イベントは、after 値も patch 値も updateDescription 値も含まない。以下は、customers コレクションのドキュメントの 削除 イベントの例になります。

{
    "schema": { ... },
    "payload": {
      "op": "d", 1
      "ts_ms": 1465495462115, 2
      "filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}", 3
      "source": { 4
        "version": "1.9.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
表4.9 削除 イベント値フィールドの説明
項目フィールド名説明

1

op

操作の型を記述する必須の文字列。op フィールドの値は d で、ドキュメントが削除されたことを示します。

2

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

filter

削除するドキュメントを特定するために使った MongoDB の選択基準の JSON 文字列表現が含まれます (oplog capture モードのみ)。

4

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 または 更新 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 削除されたドキュメントが含まれたコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子。これは MongoDB のバージョンに依存します。これは、oplog イベントの h フィールド、または oplog イベントの lsidtxnNumber フィールドを表す stxnid といいうフィールドです (oplog capture モードのみ)。
  • MongoDB セッションの一意な識別子 lsid と、トランザクション内で変更が実行された場合のトランザクション番号 txnNumber (変更ストリームキャプチャモードのみ) です。

MongoDB コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。

廃棄 (tombstone) イベント

一意に識別ドキュメントの MongoDB コネクターイベントはすべて同じキーを持ちます。ドキュメントが削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka がそのキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の MongoDB コネクターは 削除 イベントを出力した後に、null 値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。tombstone イベントは、同じキーを持つすべてのメッセージを削除できることを Kafka に通知します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.