3.4.6.2. 変更イベントの値
変更イベントメッセージの値は、少し複雑です。キーメッセージと同様に、schema セクションと payload セクションがあります。MongoDB コネクターによって生成されたすべての変更イベント値の payload セクションには、以下のフィールドを含む エンベロープ 構造があります。
-
opは、操作のタイプを記述する文字列値が含まれる必須フィールドです。MongoDB コネクターの値は、c(作成または挿入)、u(更新)、d(削除)、およびr(読み取り(初期同期の場合))です。 -
afterはオプションのフィールドであり、存在する場合はイベント発生 後 のドキュメントの状態が含まれます。MongoDB の oplog エントリーには 作成 イベントのドキュメントの完全な状態のみが含まれるため、これらは after フィールドが含まれるイベントのみです。 -
sourceは、イベントのソースメタデータを記述する構造が含まれる必須のフィールドです。MongoDB の場合には、Debezium バージョン、論理名、レプリカセット名、コレクションの namespace、MongoDB タイムスタンプ(タイムスタンプ内のイベントの ordinal)、MongoDB 操作の識別子(例:MongoDB 操作の識別子)が含まれます。 oplog イベントのhフィールド、およびイベントが意図的な同期中に発生した場合の初期同期フラグ。 -
ts_msは任意です。存在する場合は、コネクターがイベントを処理した時間(Kafka Connect タスクを実行している JVM のシステムクロックを使用)が含まれます。
当然ながら、イベントメッセージの値のschema 部分には、このエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。
customers テーブルの 作成/読み取り イベントの値を見てみましょう。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "int64",
"optional": true,
"field": "h"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"patch": null,
"source": {
"version": "1.0.3.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "r",
"ts_ms": 1558965515240
}
}
このイベントの 値 の スキーマ 部分を確認すると、エンベロープ のスキーマがコレクションに固有のものであること、および ソース 構造のスキーマ(MongoDB コネクターに固有ですべてのイベントで再利用)を確認できます。また、after の値は常に文字列であり、慣例によりドキュメントの JSON 表現が含まれることに注意してください。
このイベントの 値 の payload 部分を確認すると、イベントの情報を見ることができます。つまり、初期同期の一部としてドキュメントが読み取られたことが記述されています( op=r および initsync=true以降)。また、after フィールドの値にドキュメントの JSON 文字列表現が含まれていることを確認します。
イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。JSON 表現にはメッセージのスキーマ 部分と ペイロード 部分を含める必要があるため、これは True です。
このコレクションの 更新 変更イベントの値は、実際にはまったく同じ スキーマ を持ち、そのペイロードは同じですが、異なる値を保持します。具体的には、更新 イベントには after の値がなく、代わりにべき等更新操作の JSON 表現が含まれる パッチ 文字列があります。以下に例を示します。
{
"schema": { ... },
"payload": {
"op": "u",
"ts_ms": 1465491461815,
"patch": "{\"$set\":{\"first_name\":\"Anne Marie\"}}",
"source": {
"version": "1.0.3.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
これを 挿入 イベントの 値と比較すると、payload セクションにいくつかの違いがあります。
-
opフィールドの値はuになっており、更新によってこのドキュメントが変更されたことを示しています。 -
パッチフィールドが表示され、ドキュメントに対する実際の MongoDB べき等変更の文字列化された JSON 表現があります。この例では、first_nameフィールドを新しい値に設定する必要があります。 -
afterフィールドが表示されなくなる -
sourceフィールド構造には以前と同じフィールドがありますが、このイベントは oplog の異なる位置にあるため、値は異なります。 -
ts_msは、Debezium がこのイベントを処理したタイムスタンプを示します。
patch フィールドの内容は MongoDB 自体で提供され、正確な形式は特定のデータベースバージョンによって異なります。したがって、MongoDB インスタンスを新しいバージョンにアップグレードする際に、形式の変更の可能性を準備する必要があります。
本書のすべての例は MongoDB 3.4 から取得され、別のサンプルを使用する場合は異なる場合があります。
MongoDB の oplog の更新イベントには変更されたドキュメントの before または after 状態がないため、コネクターがこの情報を提供する方法はありません。ただし、create または read イベントに は 開始状態が含まれるため、ストリームのダウンストリームコンシューマーは、各ドキュメントの最新状態を維持し、各イベントをその状態に適用することで、実際に状態を完全に再構築できます。Debezium コネクターはこのような状態を維持できないため、これを行うことができません。
これまでは、作成/読み取り と 更新 イベントの例を見てきました。次に、同じテーブルの 削除 イベントの値を見てみましょう。このコレクションの 削除 イベントの値には全く同じ スキーマ があり、そのペイロードは同じですが、異なる値を保持します。特に、削除 イベントには after の値や patch の値は含まれません。
{
"schema": { ... },
"payload": {
"op": "d",
"ts_ms": 1465495462115,
"source": {
"version": "1.0.3.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
これを他のイベントの値と比較すると、payload セクションにいくつかの違いがあります。
-
opフィールドの値はdになっており、このドキュメントが削除されたことを示しています。 -
パッチフィールドが表示されない -
afterフィールドが表示されない -
sourceフィールド構造には以前と同じフィールドがありますが、このイベントは oplog の異なる位置にあるため、値は異なります。 -
ts_msは、Debezium がこのイベントを処理したタイムスタンプを示します。
MongoDB コネクターは実際には他の種類のイベントを提供します。各 削除 イベントの後に、同じキーだが null 値を持つ廃棄( tombstone )イベントの後に、Kafka ログコンパクションメカニズムがそのキーを持つ すべて のメッセージを削除できることを示す十分な情報を Kafka に提供します。
MongoDB コネクターイベントはすべて Kafka ログコンパクション と動作するように設計されています。これにより、すべてのキーの最新のメッセージが保持される限り、古いメッセージを削除できます。これは、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を回収する方法です。
一意に識別されたドキュメントの MongoDB コネクターイベントはすべて同じキーを持ち、最新のイベントのみが保持される Kafka に通知されます。また、tombstone イベントは、同じキーを持つ すべて のメッセージを削除できることを Kafka に通知します。