6.2.2. Debezium Oracle コネクターの変更イベントの値
メッセージキーと同様に、変更イベントメッセージの値には スキーマ セクションと ペイロード セクションがあります。Oracle コネクターによって生成されたすべての変更イベント値のペイロードセクションには、以下のフィールドを含む エンベロープ 構造があります。
op-
操作のタイプを記述する文字列値が含まれる必須フィールド。Oracle コネクターの値は、作成(または挿入)、更新の場合は
u、削除の場合はd、読み取り(スナップショットの場合)の場合はcです。 before任意のフィールド。存在する場合は、イベント発生前の行の状態が含まれます。構造は、
server1.INVENTORY.CUSTOMERS.ValueKafka Connect スキーマによって記述され、server1コネクターはinventory.customersテーブルのすべての行に使用します。警告このフィールドとその要素を利用できるかどうかは、テーブルに適用する Supplemental Logging 設定により大きく左右されます。
after-
任意のフィールド。存在する場合は、イベント発生 後 の行の状態が含まれます。構造は、
以前で使用される同じserver1.INVENTORY.CUSTOMERS.ValueKafka Connect スキーマによって記述されます。 sourceイベントのソースメタデータを記述する構造が含まれる必須のフィールドです。Oracle の場合、これには以下のフィールドが含まれます。Debezium のバージョン、コネクター名、イベントが継続中のスナップショットの一部であるかどうか、トランザクション ID (スナップショット中でない)、変更時の SCN、および (スナップショット作成中)、およびソースデータベースでレコードが変更された時点を示すタイムスタンプ (スナップショットの作成中は、スナップショットの時点)。
ヒントcommit_scnフィールドは任意で、変更イベントが参加するトランザクションコミットの SCN を記述します。このフィールドは、LogMiner 接続アダプターを使用している場合にのみ表示されます。ts_ms- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。
当然ながら、イベントメッセージの値の スキーマ の部分には、このエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれています。
作成イベント
顧客 テーブルの 作成 イベント値を見てみましょう。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": false,
"field": "FIRST_NAME"
},
{
"type": "string",
"optional": false,
"field": "LAST_NAME"
},
{
"type": "string",
"optional": false,
"field": "EMAIL"
}
],
"optional": true,
"name": "server1.DEBEZIUM.CUSTOMERS.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": false,
"field": "FIRST_NAME"
},
{
"type": "string",
"optional": false,
"field": "LAST_NAME"
},
{
"type": "string",
"optional": false,
"field": "EMAIL"
}
],
"optional": true,
"name": "server1.DEBEZIUM.CUSTOMERS.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "txId"
},
{
"type": "string",
"optional": true,
"field": "scn"
},
{
"type": "string",
"optional": true,
"field": "commit_scn"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
},
"payload": {
"before": null,
"after": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "annek@noanswer.org"
},
"source": {
"version": "1.5.4.Final",
"name": "server1",
"ts_ms": 1520085154000,
"txId": "6.28.807",
"scn": "2122185",
"commit_scn": "2122185",
"snapshot": false
},
"op": "c",
"ts_ms": 1532592105975
}
}
前述のイベント の値 のスキーマ 部分を調べると、以下のスキーマが定義されている方法を確認できます。
- エンベロープ
-
ソース構造(Oracle コネクターに固有で、すべてのイベントで再利用)。 -
beforeフィールドおよびafterフィールドのテーブル固有のスキーマ。
before フィールドおよび after フィールドのスキーマ名は <logicalName>.< schemaName>. <tableName>.Value の形式であるため、他のすべてのテーブルのスキーマからは完全に独立しています。つまり、Avro コンバーター を使用する場合、得られる各 論理ソース の 各テーブル の Avro スキーマには独自の進化と履歴があります。
このイベント の値 のペイロード 部分で、イベントに関する情報を提供します。行が作成された(op=c)、および after フィールドの値に、行の ID、FIRST_NAME、LAST、および _NAME EMAIL 列に挿入された値が含まれていることを表しています。
デフォルトでは、イベントの JSON 表現はそれが記述する行よりもはるかに大きくなります。これは、JSON 表現にはメッセージの スキーマ 部分と ペイロード 部分を含める必要があるためです。Avro コンバーター を使用して、コネクターが Kafka トピックに書き込むメッセージのサイズを大幅に小さくすることができます。
更新イベント
このテーブルの 更新 変更イベントの値は、作成 イベントと同じ スキーマ を持ちます。ペイロードは同じ構造を使用しますが、異なる値を保持します。以下に例を示します。
{
"schema": { ... },
"payload": {
"before": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "annek@noanswer.org"
},
"after": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "anne@example.com"
},
"source": {
"version": "1.5.4.Final",
"name": "server1",
"ts_ms": 1520085811000,
"txId": "6.9.809",
"scn": "2125544",
"commit_scn": "2125544",
"snapshot": false
},
"op": "u",
"ts_ms": 1532592713485
}
}
更新 イベントの値を 作成 (insert)イベントと比較すると、payload セクションの以下の相違点に注意してください。
-
opフィールドの値はuで、更新によってこの行が変更されたことを示しています。 -
beforeフィールドは、データベースのコミット前の行と値の状態を表しています。 -
afterフィールドは、更新された行の状態になり、ここでEMAILの値がanne@example.comであることを確認することができます。 -
ソースフィールド構造には以前と同じフィールドがありますが、このイベントはやり直すログとは異なる位置であるため、値は異なります。 -
The
ts_msは、Debezium がこのイベントを処理したタイムスタンプを示します。
payload セクションは、他のいくつかの有用な情報を示しています。たとえば、以前と 後の 構造を比較することで、コミットの結果として行がどのように変更されたかを判断できます。ソース 構造は、この変更の記録に関する情報を提供し、トレーサビリティーを提供します。また、本トピックおよび他のトピックの他のイベントとの関連で、このイベントがいつ発生したかも分かります。発生の前後関係、あるいは他のイベントと同じコミットの一部として発生したかが分かります。
行のプライマリーキー/一意キーの列が更新されると、行のキーの値が変更されます。その結果、Debezium は更新後に 3 つの イベントを出力します。
-
DELETEイベント。 - 行のキーが古い tombstone イベント
-
行に新しいキーを提供する
INSERTイベント。
削除 イベント
これまで、作成 および 更新 イベントの例を見てきました。ここで、同じテーブルの 削除 イベントの値を見てみましょう。削除イベント の作成および更新 の場合のように、値 のスキーマ 部分は同じです。
{
"schema": { ... },
"payload": {
"before": {
"ID": 1004,
"FIRST_NAME": "Anne",
"LAST_NAME": "Kretchmar",
"EMAIL": "anne@example.com"
},
"after": null,
"source": {
"version": "1.5.4.Final",
"name": "server1",
"ts_ms": 1520085153000,
"txId": "6.28.807",
"scn": "2122184",
"commit_scn": "2122184",
"snapshot": false
},
"op": "d",
"ts_ms": 1532592105960
}
}
ペイロード 部分を確認すると、作成または 更新 イベントペイロードと比べて多くの違いがあります。
-
opフィールドの値はdで、この行が削除されたことを示しています。 -
beforeフィールドは、データベースのコミットで削除した行の状態を表しています。 -
afterフィールドは null で、行が存在しないことを示します。 -
ソースフィールド構造には以前と同じ値が多数ありますが、ts_ms、scn、およびtxIdフィールドが異なります。 -
The
ts_msは、Debezium がこのイベントを処理したタイムスタンプを示します。
このイベントは、コンシューマーがこの行の削除を処理するのに使用できるあらゆる種類の情報を提供します。
Oracle コネクターのイベントは、Kafka ログコンパクション と動作するように設計されています。これにより、すべてのキーで少なくとも最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を開放できるようにします。
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、上述の 削除 イベントの値は引き続きログコンパクションで動作します。同じキーを共有する メッセージをすべて削除するよう Kafka に指示するには、メッセージ の値を null に設定する必要があります。これを可能にするために、デフォルトでは Debezium の Oracle コネクターは、同じキーと null 値を持つ特別な廃棄( tombstone )イベントで 削除イベント を常にフォローします。コネクタープロパティー tombstones.on.delete を設定すると、デフォルトの動作を変更できます。