3.3. Debezium Db2 コネクターのデータ変更イベントの説明
Debezium Db2 コネクターは、行レベルの INSERT
、UPDATE
、 および DELETE
操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。
以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema
フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。
{ "schema": { 1 ... }, "payload": { 2 ... }, "schema": { 3 ... }, "payload": { 4 ... }, }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
最初の |
2 |
|
最初の |
3 |
|
2 つ目の |
4 |
|
2 つ目の |
デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。詳細は、トピック名 を参照してください。
Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または \_) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。
論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。
また、データベース、スキーマ、およびテーブルの Db2 名では、大文字と小文字を区別することができます。つまり、コネクターは同じ Kafka トピックに複数のテーブルのイベントレコードを出力できます。
詳細は以下を参照してください。
3.3.1. Debezium db2 変更イベントのキー
変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルの PRIMARY KEY
(または一意の制約) に存在した各列のフィールドが含まれます。
以下の customers
テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。
テーブルの例
CREATE TABLE customers ( ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(255) NOT NULL, LAST_NAME VARCHAR(255) NOT NULL, EMAIL VARCHAR(255) NOT NULL UNIQUE );
変更イベントキーの例
customers
テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers
テーブルに前述の定義がある限り、customers
テーブルへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。
{ "schema": { 1 "type": "struct", "fields": [ 2 { "type": "int32", "optional": false, "field": "ID" } ], "optional": false, 3 "name": "mydatabase.MYSCHEMA.CUSTOMERS.Key" 4 }, "payload": { 5 "ID": 1004 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
キーのスキーマ部分は、キーの |
2 |
|
各フィールドの名前、型、および必要かどうかなど、 |
3 |
|
イベントキーの |
4 |
|
キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-name.table-name.
|
5 |
|
この変更イベントが生成された行のキーが含まれます。この例では、キーには値が |
3.3.2. Debezium Db2 変更イベントの値
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema
セクションと payload
セクションがあります。schema
セクションには、入れ子のフィールドを含む、 Envelope
セクションの payload
構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。
テーブルの例
CREATE TABLE customers ( ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(255) NOT NULL, LAST_NAME VARCHAR(255) NOT NULL, EMAIL VARCHAR(255) NOT NULL UNIQUE );
customers
テーブルのすべての変更イベントのイベント値部分は同じスキーマを指定します。イベント値のペイロードは、イベント型によって異なります。
作成 イベント
以下の例は、customers
テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。
{ "schema": { 1 "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value", 2 "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "string", "optional": true, "field": "change_lsn" }, { "type": "string", "optional": true, "field": "commit_lsn" }, ], "optional": false, "name": "io.debezium.connector.db2.Source", 3 "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "mydatabase.MYSCHEMA.CUSTOMERS.Envelope" 4 }, "payload": { 5 "before": null, 6 "after": { 7 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "john.doe@example.org" }, "source": { 8 "version": "2.3.4.Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559729468470, "snapshot": false, "db": "mydatabase", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000758:0003", "commit_lsn": "00000027:00000758:0005", }, "op": "c", 9 "ts_ms": 1559729471739 10 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。 |
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
6 |
|
イベント発生前の行の状態を指定する任意のフィールド。この例のように、 |
7 |
|
イベント発生後の行の状態を指定する任意のフィールド。この例では、 |
8 |
|
イベントのソースメタデータを記述する必須のフィールド。
|
9 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
10 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
更新イベント
サンプル customers
テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、更新イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers
テーブルでの更新に生成されるイベントの変更イベント値の例になります。
{ "schema": { ... }, "payload": { "before": { 1 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "john.doe@example.org" }, "after": { 2 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "noreply@example.org" }, "source": { 3 "version": "2.3.4.Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559729995937, "snapshot": false, "db": "mydatabase", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000ac0:0002", "commit_lsn": "00000027:00000ac0:0007", }, "op": "u", 4 "ts_ms": 1559729998706 5 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。 |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。
|
4 |
|
操作の型を記述する必須の文字列。更新 イベントの値では、 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つのイベントが Debezium によって出力されます。3 つのイベントとは、DELETE
イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。
削除 イベント
削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema
の部分になります。サンプル customers
テーブルの 削除 イベントのイベント値 payload
は以下のようになります。
{ "schema": { ... }, }, "payload": { "before": { 1 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "noreply@example.org" }, "after": null, 2 "source": { 3 "version": "2.3.4.Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559730445243, "snapshot": false, "db": "mydatabase", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000db0:0005", "commit_lsn": "00000027:00000db0:0007" }, "op": "d", 4 "ts_ms": 1559730450205 5 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の
|
4 |
|
操作の型を記述する必須の文字列。 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。
Db2 コネクターイベントは、Kafka のログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null
である必要があります。これを可能にするために、Debezium の Db2 コネクターは 削除 イベントを出力した後に、null
値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。