5.3. Debezium MongoDB コネクターのデータ変更イベントの説明


Debezium MongoDB コネクターは、データを挿入、更新、または削除する各ドキュメントレベルの操作に対してデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたコレクションによって異なります。

Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。

以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。

Copy to Clipboard Toggle word wrap
{
 "schema": { 
1

   ...
  },
 "payload": { 
2

   ...
 },
 "schema": { 
3

   ...
 },
 "payload": { 
4

   ...
 },
}
表5.6 変更イベントの基本内容の概要
項目フィールド名説明

1

schema

最初の schema フィールドはイベントキーの一部です。イベントキーの payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、最初の schema フィールドには、変更されたドキュメントのキーの構造を記述されます。

2

payload

最初の payload フィールドはイベントキーの一部です。前述の schema フィールドによって記述された構造を持ち、変更されたドキュメントのキーが含まれます。

3

schema

2 つ目の schema フィールドはイベント値の一部です。イベント値の payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、2 つ目の schema は変更されたドキュメントの構造を記述します。通常、このスキーマには入れ子になったスキーマが含まれます。

4

payload

2 つ目の payload フィールドはイベント値の一部です。前述の schema フィールドによって記述された構造を持ち、変更されたドキュメントの実際のデータが含まれます。

デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のコレクションと同じ名前を持つトピックにストリーミングされます。トピック名 を参照してください。

警告

MongoDB コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とコレクション名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または _) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。

論理サーバー名、データベース名、またはコレクション名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。

詳細は、以下のトピックを参照してください。

5.3.1. Debezium MongoDB 変更イベントのキー

変更イベントのキーには、変更されたドキュメントのキーのスキーマと、変更されたドキュメントの実際のキーのスキーマが含まれます。特定のコレクションでは、スキーマとそれに対応するペイロードの両方に単一の id フィールドが含まれます。このフィールドの値は、MongoDB Extended JSON のシリアライゼーションの厳格モード から派生する文字列として表されるドキュメントの識別子です。

論理名が fulfillment のコネクター、inventory データベースが含まれるレプリカセット、および以下のようなドキュメントが含まれる customers コレクションについて考えてみましょう。

ドキュメントの例

Copy to Clipboard Toggle word wrap
{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

変更イベントキーの例

customers コレクションへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers コレクションに前述の定義がある限り、customers コレクションへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。

Copy to Clipboard Toggle word wrap
{
  "schema": { 
1

    "type": "struct",
    "name": "fulfillment.inventory.customers.Key", 
2

    "optional": false, 
3

    "fields": [ 
4

      {
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": { 
5

    "id": "1004"
  }
}
表5.7 変更イベントキーの説明
項目フィールド名説明

1

schema

キーのスキーマ部分は、キーの payload 部分の内容を記述する Kafka Connect スキーマを指定します。

2

fulfillment.inventory.customers.Key

キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更したドキュメントのキーの構造を説明します。キースキーマ名の形式は connector-name.database-name.collection-name.Key です。この例では、以下のようになります。

  • fulfillment はこのイベントを生成したコネクターの名前です。
  • inventory は変更されたコレクションが含まれるデータベースです。
  • customers は更新されたドキュメントが含まれるコレクションです。

3

任意

イベントキーの payload フィールドに値が含まれる必要があるかどうかを示します。この例では、キーのペイロードに値が必要です。ドキュメントにキーがない場合、キーの payload フィールドの値は任意です。

4

fields

各フィールドの名前、型、および必要かどうかなど、payload で想定される各フィールドを指定します。

5

payload

この変更イベントが生成されたドキュメントのキーが含まれます。この例では、キーには型 string の 1 つの id フィールドが含まれ、その値は 1004 です。

この例では、整数の識別子を持つドキュメントを使用しますが、有効な MongoDB ドキュメント識別子は、ドキュメント識別子を含め、同じように動作します。ドキュメント識別子の場合、イベントキーの payload.id 値は、厳格モードを使用する MongoDB Extended JSON シリアライゼーションとして更新されたドキュメントの元の _id フィールドを表す文字列です。以下の表では、さまざまな型の _id フィールドを表す例を示します。

表5.8 イベントキーペイロードのドキュメント _id フィールドを表す例
タイプMongoDB _id の値キーのペイロード

Integer

1234

{ "id" : "1234" }

Float

12.34

{ "id" : "12.34" }

String

"1234"

{ "id" : "\"1234\"" }

Document

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

ObjectId

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

バイナリー

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }

5.3.2. Debezium MongoDB 変更イベントの値

変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、 Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルドキュメントについて考えてみましょう。

ドキュメントの例

Copy to Clipboard Toggle word wrap
{
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "annek@noanswer.org"
}

このドキュメントへの変更に対する変更イベントの値部分には、以下の各イベントタイプについて記述されています。

作成 イベント

以下の例は、customers コレクションにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。

Copy to Clipboard Toggle word wrap
{
    "schema": { 
1

      "type": "struct",
      "fields": [
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 
2

          "version": 1,
          "field": "after"
        },
        {
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json",
          "version": 1,
          "field": "patch"
        },
        {
          "type": "struct",
          "fields": [
            {
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 
3

          "field": "source"
        },
        {
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 
4

      },
    "payload": { 
5

      "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", 
6

      "source": { 
7

        "version": "2.3.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c", 
8

      "ts_ms": 1558965515240 
9

    }
  }
表5.9 作成 イベント値フィールドの説明
項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のコレクションに生成するすべての変更イベントで同じになります。

2

name

schema セクションで、各 name フィールドは、値のペイロードのフィールドに対するスキーマを指定します。

io.debezium.data.Json はペイロードの afterpatch、および filter フィールドのスキーマです。このスキーマは customers コレクションに固有です。作成 イベントは、after フィールドが含まれる唯一のイベントです。更新 イベントには、filter フィールドと patchフィールドが含まれます。delete イベントには filter フィールドが含まれますが、after フィールドや patch フィールドは含まれません。

3

name

io.debezium.connector.mongo.Source はペイロードの sourceフィールドのスキーマです。このスキーマは MongoDB コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

dbserver1.inventory.customers.Envelope は、ペイロードの全体的な構造のスキーマで、dbserver1 はコネクター名、inventory はデータベース、customers はコレクションを指します。このスキーマはコレクションに固有です。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述するドキュメントよりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。しかし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

after

イベント発生後のドキュメントの状態を指定する任意のフィールド。この例では、after フィールドには新しいドキュメントの _idfirst_namelast_name、および email フィールドの値が含まれます。after の値は常に文字列です。慣例により、ドキュメントの JSON 表現が含まれます。StreamsMongoDB oplog エントリーには、_create_ イベントと update イベントに対してのみドキュメントの完全な状態が含まれます (capture.mode オプションが change_streams_update_full に設定されている場合)。つまり、create イベントは capture.mode オプションに関係なく after フィールドが含まれる唯一のイベントです。

7

比較元

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 新しいドキュメントが含まれるコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子 (oplog イベントの h フィールド)。
  • MongoDB セッションの一意な識別子 lsid と、トランザクション内で変更が実行された場合のトランザクション番号 txnNumber (変更ストリームキャプチャモードのみ) です。

8

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によってドキュメントが作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read (読み取り、スナップショットのみに適用)

9

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

Chang Streams Capture モード

サンプル customers コレクションにある更新の変更イベントの値には、そのコレクションの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。update イベントに after 値が含まれるのは、capture.mode オプションが change_streams_update_full に設定されている場合のみです。capture.mode オプションが *_with_pre_image オプションのいずれかに設定されている場合、before 値が指定されます。この場合、新たな構造化フィールド updateDescription が追加されました。

  • updatedFields は、更新されたドキュメントフィールドの JSON 表現とその値を含む文字列フィールドです
  • removedFields は、ドキュメントから削除されたフィールド名のリストです。
  • truncatedArrays は、省略されたドキュメントのアレイのリストです。

以下は、コネクターによって customers コレクションでの更新に生成されるイベントの変更イベント値の例になります。

Copy to Clipboard Toggle word wrap
{
    "schema": { ... },
    "payload": {
      "op": "u", 
1

      "ts_ms": 1465491461815, 
2

      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 
3

      "after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", 
4

      "updateDescription": {
        "removedFields": null,
        "updatedFields": "{\"first_name\": \"Anne Marie\"}", 
5

        "truncatedArrays": null
      },
      "source": { 
6

        "version": "2.3.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 1,
        "h": null,
        "tord": null,
        "stxnid": null,
        "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
        "txnNumber":1
      }
    }
  }
表5.10 更新 イベント値フィールドの説明
項目フィールド名説明

1

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、u は操作によってドキュメントが更新されたことを示しています。

2

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

before

変更前の実際の MongoDB ドキュメントの JSON 文字列表現が含まれています。+ キャプチャーモードが *_with_preimage オプションのいずれかに設定されていない場合、update イベント値には before フィールドが含まれません。

4

after

実際の MongoDB ドキュメントを表す JSON 文字列が含まれます。
キャプチャモードが change_streams_update_full に設定されていない場合、更新 イベントの値に after フィールドが含まれません。

5

updatedFields

ドキュメントの更新されたフィールド値の JSON 文字列表現が含まれます。この例では、更新により first_name フィールドが新しい値に変更されました。

6

比較元

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 更新されたドキュメントが含まれるコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB セッションの一意な識別子 lsid とトランザクション番号 txnNumber (変更がトランザクションの中で実行された場合) です。
警告

イベント内の after の値は、ドキュメントの at-point-of-time の値として処理される必要があります。この値は動的に計算されるのではなく、コレクションから取得される。このため、複数の更新が次々に行われる場合、すべての 更新 更新イベントには、文書に保存されている最後の値を表す同じ after 値が含まれる可能性がある。

アプリケーションが段階的な変更の進化に依存している場合は、updateDescription のみに依存する必要があります。

削除 イベント

delete change イベントの値は、createupdate と同じ schema 部分を持ちます。delete イベントの payload 部分には、同じコレクションの 作成更新 イベントとは異なる値が含まれます。特に、delete イベントには after 値も updateDescription 値も含まれません。以下は、customers コレクションのドキュメントの 削除 イベントの例になります。

Copy to Clipboard Toggle word wrap
{
    "schema": { ... },
    "payload": {
      "op": "d", 
1

      "ts_ms": 1465495462115, 
2

      "before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",
3

      "source": { 
4

        "version": "2.3.7.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }
表5.11 削除 イベント値フィールドの説明
項目フィールド名説明

1

op

操作の型を記述する必須の文字列。op フィールドの値は d で、ドキュメントが削除されたことを示します。

2

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

before

変更前の実際の MongoDB ドキュメントの JSON 文字列表現が含まれています。+ キャプチャーモードが *_with_preimage オプションのいずれかに設定されていない場合、update イベント値には before フィールドが含まれません。

4

比較元

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 または 更新 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン。
  • イベントを生成したコネクターの名前。
  • 生成されたイベントの namespace を形成し、コネクターが書き込む Kafka トピック名で使用される、MongoDB レプリカセットの論理名。
  • 削除されたドキュメントが含まれたコレクションおよびデータベースの名前。
  • イベントがスナップショットの一部である場合。
  • データベースで変更が加えられた時点のタイムスタンプおよびタイムスタンプ内のイベントの順序。
  • MongoDB 操作の一意の識別子 (oplog イベントの h フィールド)。
  • MongoDB セッションの一意な識別子 lsid と、トランザクション内で変更が実行された場合のトランザクション番号 txnNumber (変更ストリームキャプチャモードのみ) です。

MongoDB コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。

廃棄 (tombstone) イベント

一意に識別ドキュメントの MongoDB コネクターイベントはすべて同じキーを持ちます。ドキュメントが削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka がそのキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の MongoDB コネクターは 削除 イベントを出力した後に、null 値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。tombstone イベントは、同じキーを持つすべてのメッセージを削除できることを Kafka に通知します。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat, Inc.