6.2. Debezium Oracle コネクターのデータ変更イベントの説明
Oracle コネクターが出力する全データ変更イベントにはキーと値があります。キーと値の構造は、変更イベントの発生元となるテーブルによって異なります。Debezium のトピック名を構築する方法は、トピック名を参照してください。
Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名 が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベット文字またはアンダースコア ([a-z,A-Z,_]) で始まり、論理サーバー名の残りの文字と、スキーマおよびテーブル名の残りの文字は英数字またはアンダースコア ([a-z,A-Z,0-9,\_]) で始まる必要があります。コネクターは無効な文字をアンダースコアに自動的に置き換えます。
複数の論理サーバー名、スキーマ名、またはテーブル名の中で区別ができる文字のみが無効な場合に、アンダースコアに置き換えられると、命名で予期しない競合が発生する可能性があります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、トピックコンシューマーによる処理が困難になることがあります。変更可能なイベント構造の処理を容易にするため、Kafka Connect の各イベントは自己完結型となっています。すべてのメッセージキーと値には、スキーマ と ペイロード の 2 つの部分で設定されます。スキーマはペイロードの構造を記述しますが、ペイロードには実際のデータが含まれます。
SYS
、SYSTEM
ユーザーアカウントが加える変更は、コネクターではキャプチャーされません。
データ変更イベントの詳細は、以下を参照してください。
6.2.1. Debezium Oracle コネクター変更イベントのキー
変更されたテーブルごとに変更イベントキーは、イベントの作成時にテーブルのプライマリーキー (または一意のキー制約) の各コラムにフィールドが存在するように設定されます。
たとえば、inventory
データベーススキーマに定義されている customers
テーブルには、以下の変更イベントキーが含まれる場合があります。
CREATE TABLE customers ( id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY, first_name VARCHAR2(255) NOT NULL, last_name VARCHAR2(255) NOT NULL, email VARCHAR2(255) NOT NULL UNIQUE );
<database.server.name>
.transaction
設定プロパティーの値が server1
に設定されている場合は、データベースの customers
テーブルで発生するすべての変更イベントの JSON 表現には以下のキー構造が使用されます。
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" } ], "optional": false, "name": "server1.INVENTORY.CUSTOMERS.Key" }, "payload": { "ID": 1004 } }
キーのスキーマ
部分には、キーの部分の内容を記述する Kafka Connect スキーマが含まれます。上記の例では、payload
値はオプションではなく、構造は server1.DEBEZIUM.CUSTOMERS.Key
という名前のスキーマで定義され、タイプ int32
の id
という名前の必須フィールドが 1 つあります。キーの payload
フィールドの値から、id
フィールドが 1 つでその値が 1004
の構造 (JSON では単なるオブジェクト) であることが分かります。
つまり、このキーは、id
プライマリーキーの列にある値が 1004
の、inventory.customers
テーブルの行 (名前が server1
のコネクターからの出力) を記述していると解釈できます。
6.2.2. Debezium Oracle 変更イベントの値
message キーと同様に、変更イベントメッセージの値には schema セクションと payload セクションがあります。Oracle コネクターによって生成されたすべての変更イベント値の payload セクションは、以下のフィールドを含む envelope 構造となっています。
op
-
操作のタイプを記述する文字列値が含まれる必須フィールド。Oracle コネクターの値は、
c
(作成または挿入)、u
(更新)、d
(削除)、およびr
(読み取り、初回のスナップショットでない場合) です。 before
-
任意のフィールド。存在する場合は、イベント発生 前 の行の状態が含まれます。この構造は、
server1.INVENTORY.CUSTOMERS.Value
Kafka Connect スキーマによって記述され、server1
コネクターはinventory.customers
テーブルのすべての行に使用します。
after
-
任意のフィールド。存在する場合は、イベント発生後の行の状態が含まれます。この構造は、
before
で使用されるのと同じserver1.INVENTORY.CUSTOMERS.Value
Kafka Connect スキーマで記述されます。 source
- Oracle に、Debezium バージョン、コネクター名、イベントが作成中のスナップショットかどうか、トランザクション ID (スナップショット作成中ではない)、変更の SCN、ソースとなるデータベースでレコードが変更された時点を表すタイムスタンプ (スナップショット中、スナップショットの作成時点) といったフィールドが含まれる場合に、イベントのソースメタデータを記述する構造を含む必須のフィールドです。
commit_scn
フィールドは任意で、変更イベントが参加するトランザクションコミットの SCN を記述します。このフィールドは、LogMiner 接続ファクトリーを使用している場合にのみ表示されます。
ts_ms
- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。
当然ながら、イベントメッセージの値のschema 部分には、このエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。
作成 イベント
customers
テーブルの create イベント値を見てみましょう。
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "string", "optional": true, "field": "txId" }, { "type": "string", "optional": true, "field": "scn" }, { "type": "string", "optional": true, "field": "commit_scn" }, { "type": "boolean", "optional": true, "field": "snapshot" } ], "optional": false, "name": "io.debezium.connector.oracle.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "server1.DEBEZIUM.CUSTOMERS.Envelope" }, "payload": { "before": null, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "source": { "version": "1.7.2.Final", "name": "server1", "ts_ms": 1520085154000, "txId": "6.28.807", "scn": "2122185", "commit_scn": "2122185", "snapshot": false }, "op": "c", "ts_ms": 1532592105975 } }
前述のイベントの value の schema
部分を調べると、以下のスキーマが定義されている方法を確認できます。
- エンベロープ
-
ソース
構造 (Oracle コネクターに固有で、すべてのイベントで再利用)。 -
before
フィールドおよびafter
フィールドのテーブル固有のスキーマ。
before
フィールドおよび after
フィールドのスキーマ名は <logicalName>.<schemaName>.<tableName>.Value
の形式であるため、他のすべてのテーブルのスキーマからは完全に独立しています。つまり、Avro コンバーター を使用する場合、各論理ソースの各テーブルの Avro スキーマには独自の進化と履歴があります。
このイベントの 値 の payload
部分には、イベントに関する情報が含まれます。行が作成された (op=c
) ことを術士、および after
フィールドの値に、行の ID
、FIRST_NAME
、LAST_NAME
、および EMAIL
列に挿入された値が含まれていることを表しています。
デフォルトでは、イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。JSON 表現にはメッセージのスキーマ 部分と ペイロード 部分を含める必要があるため、これは True です。Avro コンバーター を使用すると、コネクターが Kafka トピックに記述するメッセージのサイズを大幅に小さくすることができます。
更新イベント
このテーブル 更新 変更イベントの値には、作成 イベントと同じ スキーマ があります。ペイロードは同じ構造を使用しますが、保持する値が異なります。以下に例を示します。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "source": { "version": "1.7.2.Final", "name": "server1", "ts_ms": 1520085811000, "txId": "6.9.809", "scn": "2125544", "commit_scn": "2125544", "snapshot": false }, "op": "u", "ts_ms": 1532592713485 } }
更新 イベントの値を 作成 (insert) イベントと比較して、payload
セクションの以下の相違点に注意してください。
-
op
フィールドの値はu
になっており、更新によってこの行が変更されたことを示しています。 -
before
フィールドは、データベースのコミット前の行と値の状態を表しています。 -
after
フィールドの行の状態が更新され、ここでEMAIL
の値がanne@example.com
であることを確認することができます。 -
source
フィールドの構造には以前と同じフィールドがありますが、このイベントは redo ログの位置とは異なるめ、値は異なります。 -
ts_ms
は、Debezium がこのイベントを処理したタイムスタンプを示します。
payload
セクションでは、他に有用な情報を複数示しています。たとえば、before
と after
の構造を比較して、コミットの結果として行がどのように変更されたかを判断できます。ソース
構造で、この変更の記録に関する情報がわかるので、追跡が可能です。また、このトピックや他のトピックの他のイベントと関連して、このイベントが発生した場合に、見識を得ることができます。これは、別のイベントと同じコミットの前、後、または一部として発生していましたか ?
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。その結果、Debezium はこのような更新後に 3 つのイベントを出力します。
-
DELETE
イベント。 - 行のキーが古い tombstone イベント
-
行に新しいキーを提供する
INSERT
イベント。
delete イベント
ここまでで、create と update イベントのサンプルを確認しました。次に、同じテーブルの 削除 イベントの値を見てみましょう。create と update イベントと同様に、delete
イベントの場合は、値の schema
部分は全く同じです。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "after": null, "source": { "version": "1.7.2.Final", "name": "server1", "ts_ms": 1520085153000, "txId": "6.28.807", "scn": "2122184", "commit_scn": "2122184", "snapshot": false }, "op": "d", "ts_ms": 1532592105960 } }
ペイロード
部分を確認すると、create または updateイベントペイロードと比べて多くの違いがあります。
-
op
フィールドの値はd
になっており、この行が削除されたことを示しています。 -
before
フィールドは、データベースのコミットで削除した行の状態を表しています。 -
after
フィールドが null で、行が存在しなくなったことが分かります。 -
変更された
ts_ms
、scn
およびtxId
フィールドの除き、source
フィールド構造には以前と同じ値が多数あります。 -
ts_ms
は、Debezium がこのイベントを処理したタイムスタンプを示します。
このイベントでは、この行の削除の処理に使用できるあらゆる種類の情報をコンシューマーに提供します。
Oracle コネクターのイベントは、Kafka ログコンパクション と連携するように設計されており、すべてのキーで最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、 Kafka がストレージ領域を確保できるようにします。
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、上記の削除 イベントの値はログコンパクションで動作します。同じキーを共有する メッセージをすべて 削除するように Kafka に指示するには、メッセージの値を null
に設定する必要があります。これを可能にするにはデフォルトで、Debezium Oracle コネクターは、値が null
以外で同じキーを持つ特別な 廃棄 イベントが含まれる 削除 イベントに従います。コネクタープロパティー tombstones.on.delete
を設定すると、デフォルトの動作を変更できます。