7.2. Debezium Oracle コネクターのデータ変更イベントの説明
Oracle コネクターが出力する全データ変更イベントにはキーと値があります。キーと値の構造は、変更イベントの発生元となるテーブルによって異なります。Debezium のトピック名を構築する方法は、トピック名を参照してください。
Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名 が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベット文字またはアンダースコア ([a-z,A-Z,_]) で始まり、論理サーバー名の残りの文字と、スキーマおよびテーブル名の残りの文字は英数字またはアンダースコア ([a-z,A-Z,0-9,\_]) で始まる必要があります。コネクターは無効な文字をアンダースコアに自動的に置き換えます。
複数の論理サーバー名、スキーマ名、またはテーブル名の中で区別ができる文字のみが無効な場合に、アンダースコアに置き換えられると、命名で予期しない競合が発生する可能性があります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、トピックコンシューマーによる処理が困難になることがあります。変更可能なイベント構造の処理を容易にするため、Kafka Connect の各イベントは自己完結型となっています。すべてのメッセージキーと値には、スキーマ と ペイロード の 2 つの部分で設定されます。スキーマはペイロードの構造を記述しますが、ペイロードには実際のデータが含まれます。
SYS
、SYSTEM
ユーザーアカウントが加える変更は、コネクターではキャプチャーされません。
データ変更イベントの詳細は、以下を参照してください。
7.2.1. Debezium Oracle コネクター変更イベントのキー
変更されたテーブルごとに変更イベントキーは、イベントの作成時にテーブルのプライマリーキー (または一意のキー制約) の各コラムにフィールドが存在するように設定されます。
たとえば、inventory
データベーススキーマに定義されている customers
テーブルには、以下の変更イベントキーが含まれる場合があります。
CREATE TABLE customers ( id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY, first_name VARCHAR2(255) NOT NULL, last_name VARCHAR2(255) NOT NULL, email VARCHAR2(255) NOT NULL UNIQUE );
<topic.prefix>
.transaction
設定プロパティーの値が server1
に設定されている場合は、データベースの customers
テーブルで発生するすべての変更イベントの JSON 表現には以下のキー構造が使用されます。
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" } ], "optional": false, "name": "server1.INVENTORY.CUSTOMERS.Key" }, "payload": { "ID": 1004 } }
キーのスキーマ
部分には、キーの部分の内容を記述する Kafka Connect スキーマが含まれます。上記の例では、payload
値はオプションではなく、構造は server1.DEBEZIUM.CUSTOMERS.Key
という名前のスキーマで定義され、タイプ int32
の id
という名前の必須フィールドが 1 つあります。キーの payload
フィールドの値から、id
フィールドが 1 つでその値が 1004
の構造 (JSON では単なるオブジェクト) であることが分かります。
つまり、このキーは、id
プライマリーキーの列にある値が 1004
の、inventory.customers
テーブルの行 (名前が server1
のコネクターからの出力) を記述していると解釈できます。
7.2.2. Debezium Oracle 変更イベントの値
変更イベントメッセージの値の構造は、メッセージの変更イベントの メッセージキーの構造 をミラーリングし、スキーマ セクションと ペイロード セクションの両方が含まれます。
変更イベント値のペイロード
変更イベント値のペイロードセクションの エンベロープ 構造には、以下のフィールドが含まれます。
op
-
操作のタイプを記述する文字列値が含まれる必須フィールド。Oracle コネクターの変更イベント値のペイロードの
op
フィールドには、c
(作成または挿入)、u
(更新)、d
(delete)、またはr
(スナップショットを示す read) のいずれかの値が含まれます。 before
-
任意のフィールド。存在する場合は、イベント発生 前 の行の状態を記述します。この構造は、
server1.INVENTORY.CUSTOMERS.Value
Kafka Connect スキーマによって記述され、server1
コネクターはinventory.customers
テーブルのすべての行に使用します。
after
-
オプションのフィールドで、存在する場合は、変更が発生した 後 の行の状態が含まれます。構造は、
before
フィールドに使用されるserver1.INVENTORY.CUSTOMERS.Value
Kafka Connect スキーマによって記述されます。 source
イベントのソースメタデータを記述する構造体を含む必須フィールド。Oracle コネクターの場合、構造には以下のフィールドが含まれます。
- Debezium のバージョン。
- コネクター名。
- イベントが進行中のスナップショットの一部であるかどうか。
- トランザクション ID(スナップショットの含まない)。
- 変更の SCN。
- ソースデータベースのレコードがいつ変更されたかを示すタイムスタンプ (スナップショットの場合、タイムスタンプはスナップショットの発生時刻を示す)。
変更を加えたユーザー名
ヒントcommit_scn
フィールドは任意で、変更イベントが参加するトランザクションコミットの SCN を記述します。
ts_ms
- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックがベース) が含まれます。
変更イベント値のスキーマ
イベントメッセージの値の スキーマ 部分には、ペイロードのエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。
変更イベント値の詳細は、以下を参照してください。
作成 イベント
次の例は、イベントキーの変更 例で説明した customers
テーブルの create イベントの値を示しています。
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "string", "optional": true, "field": "txId" }, { "type": "string", "optional": true, "field": "scn" }, { "type": "string", "optional": true, "field": "commit_scn" }, { "type": "string", "optional": true, "field": "rs_id" }, { "type": "int64", "optional": true, "field": "ssn" }, { "type": "int32", "optional": true, "field": "redo_thread" }, { "type": "string", "optional": true, "field": "user_name" }, { "type": "boolean", "optional": true, "field": "snapshot" } ], "optional": false, "name": "io.debezium.connector.oracle.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "server1.DEBEZIUM.CUSTOMERS.Envelope" }, "payload": { "before": null, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "source": { "version": "2.3.4.Final", "name": "server1", "ts_ms": 1520085154000, "txId": "6.28.807", "scn": "2122185", "commit_scn": "2122185", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "c", "ts_ms": 1532592105975 } }
上記の例では、イベントが以下のスキーマを定義する方法に注目してください。
-
エンベロープ (
server1.DEBEZIUM.CUSTOMERS.Envelope
)。 -
ソース
構造 (io.debezium.connector.oracle.Source
、Oracle コネクターに固有で、すべてのイベントで再利用)。 -
before
フィールドおよびafter
フィールドのテーブル固有のスキーマ。
before
フィールドおよび after
フィールドのスキーマ名は <logicalName>.<schemaName>.<tableName>.Value
の形式であるため、他のすべてのテーブルのスキーマからは完全に独立しています。その結果、Avro コンバーター を使用した場合、各論理ソースのテーブルの Avro スキーマは、それぞれ独自に進化し、独自の履歴を持つことになります。
このイベントの 値 の payload
部分には、イベントに関する情報が含まれます。行が作成された (op=c
) ことを術士、および after
フィールドの値に、行の ID
、FIRST_NAME
、LAST_NAME
、および EMAIL
列に挿入された値が含まれていることを表しています。
デフォルトでは、イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。サイズが大きいのは、JSON 表現がメッセージのスキーマ部分とペイロード部分の両方を含んでいるためです。Avro コンバーター を使用すると、コネクターが Kafka トピックに記述するメッセージのサイズを小さくすることができます。
更新イベント
次の例は、前の create イベントと同じテーブルからコネクターが捕捉した 更新 変更イベントを示しています。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "source": { "version": "2.3.4.Final", "name": "server1", "ts_ms": 1520085811000, "txId": "6.9.809", "scn": "2125544", "commit_scn": "2125544", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "u", "ts_ms": 1532592713485 } }
ペイロードは create (挿入) イベントのペイロードと同じですが、以下の値は異なります。
-
op
フィールドの値がu
であり、この行が更新により変更されたことを示す。 -
before
フィールドは、update
データベースのコミット前に存在する値が含まれる行の以前の状態を表示します。 -
after
フィールドは行の更新状態を示しており、EMAIL
の値は現在anne@example.com
に設定されています。 -
source
フィールドの構造は以前と同じフィールドを含みますが、コネクターが REDO ログ内の異なる位置からイベントをキャプチャしたため、値は異なります。 -
ts_ms
フィールドは、Debezium がいつイベントを処理したかを示すタイムスタンプを示す。
payload
セクションでは、他に有用な情報を複数示しています。たとえば、before
と after
の構造を比較して、コミットの結果として行がどのように変更されたかを判断できます。ソース
構造で、この変更の記録に関する情報がわかるので、追跡が可能です。また、このトピックや他のトピックの他のイベントと関連して、このイベントが発生した場合に、見識を得ることができます。これは、別のイベントと同じコミットの前、後、または一部として発生していましたか ?
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。その結果、Debezium はこのような更新後に 3 つのイベントを出力します。
-
DELETE
イベント。 - 行のキーが古い tombstone イベント
-
行に新しいキーを提供する
INSERT
イベント。
delete イベント
次の例は、先の create と update のイベント例で示したテーブルの delete イベントを示しています。delete イベントの schema
部分は、これらのイベントの schema
部分と同一です。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "after": null, "source": { "version": "2.3.4.Final", "name": "server1", "ts_ms": 1520085153000, "txId": "6.28.807", "scn": "2122184", "commit_scn": "2122184", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "d", "ts_ms": 1532592105960 } }
イベントの payload
部分は、create または update イベントのペイロードと比較して、いくつかの違いを示しています。
-
op
フィールドの値はd
であり、行が削除されたことを意味します。 -
before
フィールドは、データベースのコミットで削除された行の旧状態を示します。 -
after
フィールドの値がnull
場合、その行はもはや存在しないことを意味します。 -
source
フィールドの構造には、create または update イベントに存在する多くのキーが含まれるが、ts_ms
、scn
、txId
フィールドの値は異なっている。 -
ts_ms
は、Debezium がこのイベントを処理したタイミングを示すタイムスタンプを示します。
delete イベントは、コンシューマーがこの行の削除を処理するために必要な情報を提供する。
Oracle コネクターのイベントは、Kafka ログコンパクション と連携するように設計されており、すべてのキーで最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
行が削除されると、Kafka は同じキーを使用する以前のメッセージをすべて削除できるため、前の例で示された delete イベント値は、ログ圧縮でまだ機能します。同じキーを共有する メッセージをすべて 削除するように Kafka に指示するには、メッセージの値を null
に設定する必要があります。これを可能にするにはデフォルトで、Debezium Oracle コネクターは、値が null
以外で同じキーを持つ特別な 廃棄 イベントが含まれる 削除 イベントに従います。コネクタープロパティー tombstones.on.delete
を設定すると、デフォルトの動作を変更できます。
切り捨て (truncate) イベント
切り捨て (truncate) 変更イベントは、テーブルが切り捨てられていることを伝えます。この場合のメッセージキーは null
で、メッセージの値は以下のようになります。
{ "schema": { ... }, "payload": { "before": null, "after": null, "source": { 1 "version": "2.3.4.Final", "connector": "oracle", "name": "oracle_server", "ts_ms": 1638974535000, "snapshot": "false", "db": "ORCLPDB1", "sequence": null, "schema": "DEBEZIUM", "table": "TEST_TABLE", "txId": "02000a0037030000", "scn": "13234397", "commit_scn": "13271102", "lcr_position": null, "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user" }, "op": "t", 2 "ts_ms": 1638974558961, 3 "transaction": null } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベントのソースメタデータを記述する必須のフィールド。切り捨て (truncate) イベント値の
|
2 |
|
操作の型を記述する必須の文字列。 |
3 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。
|
切り捨て (truncate) イベントは、テーブル全体に加えられた変更を表し、メッセージキーを持たないため、コンシューマーが 切り捨て られたイベントや変更イベントを受信する保証はありません (作成、更新 など)。たとえば、コンシューマーが異なるパーティションからイベントを読み込む場合、同じテーブルの truncate イベントを受信した後に、テーブルの update イベントを受信することがあります。順序は、トピックが単一のパーティションを使用する場合にのみ保証されます。
切り捨て (truncate) イベントをキャプチャーしない場合は、skipped.operations
オプションを使用して除外します。