4.3.2. Debezium MongoDB 変更イベントの値
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、ネストされたフィールドを含む、payload セクションの Envelope 構造を記述するスキーマが含まれます。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルドキュメントについて考えてみましょう。
ドキュメントの例
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
このドキュメントへの変更に対する変更イベントの値部分には、以下の各イベントタイプについて記述されています。
作成イベント
以下の例は、お客様が コレクションにデータを作成する操作に対してコネクターによって生成される変更イベントの値の部分を示しています。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "filter"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "int64",
"optional": true,
"field": "h"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"patch": null,
"source": {
"version": "1.5.4.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c",
"ts_ms": 1558965515240
}
}
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のコレクションに生成するすべての変更イベントで同じになります。 |
| 2 |
|
|
| 3 |
|
|
| 4 |
|
|
| 5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
| 6 |
|
イベント発生後のドキュメントの状態を指定する任意のフィールド。この例では、 |
| 7 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。
|
| 8 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
| 9 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
更新イベント
サンプル 顧客 コレクションの更新の変更イベントの値には、そのコレクションの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。更新 イベントは after 値を持ちません。その代わりに、以下の 2 つのフィールドがあります。
-
patchは、べき等更新操作の JSON 表現が含まれる文字列フィールドです。 -
filterは、更新の選択基準の JSON 表現が含まれる文字列フィールドです。フィルター文字列には、シャード化コレクションの複数のシャードキーフィールドを含めることができます。
以下は、お客様が コレクションでの更新に対してコネクターによって生成されるイベントの変更イベント値の例になります。
{
"schema": { ... },
"payload": {
"op": "u",
"ts_ms": 1465491461815,
"patch": "{\"$set\":{\"first_name\":\"Anne Marie\"}}",
"filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}",
"source": {
"version": "1.5.4.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、 |
| 2 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
| 3 |
|
ドキュメントへの実際の MongoDB のべき等変更の JSON 文字列表現が含まれます。この例では、更新で |
| 4 |
| 更新するドキュメントの特定に使用された MongoDB 選択基準の JSON 文字列表現が含まれます。 |
| 5 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。
|
Debezium の変更イベントでは、MongoDB は patch フィールドの内容を提供します。このフィールドの形式は、MongoDB データベースのバージョンによって異なります。したがって、新しい MongoDB データベースバージョンにアップグレードする場合は、形式が変更された可能性があるため注意してください。本書のサンプルは、MongoDB 3.4 から取得したため、ご使用のアプリケーションではイベントの形式が異なる場合があります。
MongoDB の oplog では、更新 イベントには変更されたドキュメントの前 または 後 の状態は含まれません。そのため、Debezium コネクターがこの情報を提供することはできません。ただし、Debezium コネクターは 作成 および 読み取り イベントでドキュメントの開始状態を提供します。ストリームのダウンストリームのコンシューマーは、ドキュメントごとに最新状態を維持し、新しいイベントの状態を保存された状態に比較することで、ドキュメント状態を再構築できます。Debezium コネクターはこの状態を維持できません。
削除 イベント
削除 変更イベントの値は、同じコレクションの 作成 および 更新 イベントと同じ スキーマ の部分になります。削除 イベントの ペイロード 部分には、同じコレクションの 作成 および 更新 イベントとは異なる値が含まれます。特に、削除 イベントには、値が after でも パッチ 値も含まれていません。以下は、お客様 コレクションのドキュメントの 削除 イベントの例になります。
{
"schema": { ... },
"payload": {
"op": "d",
"ts_ms": 1465495462115,
"filter": "{\"_id\" : {\"$numberLong\" : \"1004\"}}",
"source": {
"version": "1.5.4.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
操作の型を記述する必須の文字列。 |
| 2 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
| 3 |
| 削除するドキュメントの特定に使用された MongoDB 選択基準の JSON 文字列表現が含まれます。 |
| 4 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 または 更新 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。
|
MongoDB コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、 Kafka がストレージ領域を確保できるようにします。
廃棄 (tombstone) イベント
一意に識別ドキュメントの MongoDB コネクターイベントはすべて同じキーを持ちます。ドキュメントが削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka がそのキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の MongoDB コネクターは 削除 イベントを出力した後に 、 同じキーと null 値を持つ特別な廃棄(tombstone)イベントを出力します。tombstone イベントは、同じキーを持つすべてのメッセージを削除できることを Kafka に通知します。