5.2. Debezium MySQL コネクターのデータ変更イベントの説明
Debezium MySQL コネクターは、行レベルの INSERT
、UPDATE
、および DELETE
操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。
以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema
フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。
{ "schema": { 1 ... }, "payload": { 2 ... }, "schema": { 3 ... }, "payload": { 4 ... }, }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
最初の |
2 |
|
最初の |
3 |
|
2 つ目の |
4 |
|
2 つ目の |
デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。トピック名 を参照してください。
MySQL コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または _) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。
論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。
詳細は以下を参照してください。
5.2.1. Debezium MySQL 変更イベントのキー
変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルの PRIMARY KEY
(または一意の制約) に存在した各列のフィールドが含まれます。
以下の customers
テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。
CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001;
customers
テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers
テーブルに前述の定義がある限り、customers
テーブルへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。
{ "schema": { 1 "type": "struct", "name": "mysql-server-1.inventory.customers.Key", 2 "optional": false, 3 "fields": [ 4 { "field": "id", "type": "int32", "optional": false } ] }, "payload": { 5 "id": 1001 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
キーのスキーマ部分は、キーの |
2 |
|
キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-name.table-name.
|
3 |
|
イベントキーの |
4 |
|
各フィールドの名前、型、および必要かどうかなど、 |
5 |
|
この変更イベントが生成された行のキーが含まれます。この例では、キーには値が |
5.2.2. Debezium MySQL 変更イベントの値
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema
セクションと payload
セクションがあります。schema
セクションには、入れ子のフィールドを含む、Envelope
セクションの payload
構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。
CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001;
このテーブルへの変更に対する変更イベントの値部分には以下について記述されています。
作成 イベント
以下の例は、customers
テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。
{ "schema": { 1 "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "mysql-server-1.inventory.customers.Value", 2 "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "mysql-server-1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", 3 "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "mysql-server-1.inventory.customers.Envelope" 4 }, "payload": { 5 "op": "c", 6 "ts_ms": 1465491411815, 7 "before": null, 8 "after": { 9 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 10 "version": "2.5.4.Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 0, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "thread": 7, "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')" } } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。 |
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
6 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
7 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
8 |
|
イベント発生前の行の状態を指定する任意のフィールド。この例のように、 |
9 |
|
イベント発生後の行の状態を指定する任意のフィールド。この例では、 |
10 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。
|
更新イベント
サンプル customers
テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers
テーブルでの更新に生成されるイベントの変更イベント値の例になります。
{ "schema": { ... }, "payload": { "before": { 1 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { 2 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "version": "2.5.4.Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581029100, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004" }, "op": "u", 4 "ts_ms": 1465581029523 5 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。 |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。
|
4 |
|
操作の型を記述する必須の文字列。更新 イベントの値では、 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つのイベントが Debezium によって出力されます。3 つのイベントとは、DELETE
イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。詳細は次のセクションで説明します。
プライマリーキーの更新
行のプライマリーキーフィールドを変更する UPDATE
操作は、プライマリーキーの変更と呼ばれます。プライマリーキーの変更では、UPDATE
イベントレコードの代わりにコネクターが古いキーの DELETE
イベントレコードと、新しい (更新された) キーの CREATE
イベントレコードを出力します。これらのイベントには通常の構造と内容があり、イベントごとにプライマリーキーの変更に関連するメッセージヘッダーがあります。
-
DELETE
イベントレコードには、メッセージヘッダーとして__debezium.newkey
が含まれます。このヘッダーの値は、更新された行の新しいプライマリーキーです。 -
CREATE
イベントレコードには、メッセージヘッダーとして__debezium.oldkey
が含まれます。このヘッダーの値は、更新された行にあった以前の (古い) プライマリーキーです。
delete イベント
削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema
の部分になります。サンプル customers
テーブルの 削除 イベントの payload
部分は以下のようになります。
{ "schema": { ... }, "payload": { "before": { 1 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, 2 "source": { 3 "version": "2.5.4.Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581902300, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 805, "row": 0, "thread": 7, "query": "DELETE FROM customers WHERE id=1004" }, "op": "d", 4 "ts_ms": 1465581902461 5 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の
|
4 |
|
操作の型を記述する必須の文字列。 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。
MySQL コネクターイベントは、Kafka のログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
廃棄 (tombstone) イベント
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null
である必要があります。これを可能にするために、Debezium の MySQL コネクターは 削除 イベントを出力した後に、null
値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。
切り捨て (truncate) イベント
切り捨て (truncate) 変更イベントは、テーブルが切り捨てられていることを伝えます。この場合のメッセージキーは null
で、メッセージの値は以下のようになります。
{ "schema": { ... }, "payload": { "source": { 1 "version": "2.5.4.Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581029100, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004" }, "op": "t", 2 "ts_ms": 1465581029523 3 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベントのソースメタデータを記述する必須のフィールド。切り捨て (truncate) イベント値の
|
2 |
|
操作の型を記述する必須の文字列。 |
3 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。
+ |
1 つの TRUNCATE
ステートメントが複数のテーブルに適用された場合、切り捨てられたテーブルごとに 1 つの切り捨て (truncate) 変更イベントレコードが出力されます。
切り捨て (truncate) イベントは、テーブル全体に加えた変更を表し、メッセージキーを持たないので、単一のパーティションを持つトピックを使用しない限り、テーブルに関する変更イベント (作成、更新 など) とそのテーブルの 切り捨て (truncate) イベントの順番は保証されません。たとえば、これらのイベントが異なるパーティションから読み取られる場合、コンシューマーは 更新 イベントを 切り捨て (truncate) イベントの後でのみ受け取る可能性があります。