第4章 変更イベントの表示
Debezium MySQL コネクターのデプロイ後に、inventory
データベースへの変更のキャプチャーが開始されます。
コネクターが起動すると、一連の Apache Kafka トピックにイベントが書き込まれます。各トピックは、MySQL データベース内のテーブルの 1 つを表します。各トピックの名前は、データベースサーバーの名前 dbserver1
で始まります。
コネクターは、次の Kafka トピックに書き込みます。
dbserver1
- 変更がキャプチャーされるテーブルに適用される DDL ステートメントが書き込まれるスキーマ変更トピック。
dbserver1.inventory.products
-
inventory
データベースのproducts
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.products_on_hand
-
inventory
データベースのproducts_on_hand
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.customers
-
inventory
データベースのcustomers
テーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.orders
-
inventory
データベースのorders
テーブルの変更イベントレコードを受け取ります。
このチュートリアルの残りの部分では、dbserver1.inventory.customers
Kafka トピックを調べます。トピックを詳しく見ていくと、さまざまな種類の変更イベントがどのように表されているかがわかり、各イベントをキャプチャーしたコネクターに関する情報が見つかります。
このチュートリアルには、次のセクションが含まれています。
4.1. 作成 イベントの表示
dbserver1.inventory.customers
トピックを表示すると、MySQL コネクターが inventory
データベースの 作成 イベントをどのようにキャプチャーしたが分かります。この場合、作成 イベントは、データベースに追加された新規顧客をキャプチャーします。
手順
新しいターミナルを開き、
kafka-console-consumer
を使用してトピックの最初からdbserver1.inventory.customers
トピックを使用します。このコマンドは、Kafka (
my-cluster-kafka-0
) を実行している Pod で簡単なコンシューマー (kafka-console-consumer.sh
) を実行します。$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
コンシューマーは、
customers
テーブルの各行に 1 つずつ、4 つのメッセージ (JSON 形式) を返します。各メッセージには、対応するテーブル行のイベントレコードが含まれます。各イベントには、キー と 値 という 2 つの JSON ドキュメントがあります。キーは行のプライマリーキーに対応し、値は行の詳細 (行に含まれるフィールド、各フィールドの値、および行で実行された操作のタイプ) を表します。
最後のイベントでは、キー の詳細を確認します。
最後のイベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }
このイベントには、
schema
とpayload
の 2 つの部分があります。schema
には、ペイロードの内容を記述する Kafka Connect スキーマが含まれています。この場合、ペイロードはdbserver1.inventory.customers.Key
という名前のstruct
です。これはオプションではなく、必須フィールド (int32
型のID
) を 1 つ持っています。payload
には、値が1004
のid
フィールドが 1 つあります。イベントの キー を確認すると、このイベントは
id
のプライマリーキー列の値が1004
であるinventory.customers
テーブルの行に提供されることが分かります。同じイベントの 値 の詳細を確認します。
イベントの 値 は、行が作成されたことを示し、その行に含まれる内容 (この場合は挿入された行の
id
、first_name
、last_name
、およびemail
) を表します。最後のイベントの 値の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "int64", "optional": true, "field": "ts_us" }, { "type": "int64", "optional": true, "field": "ts_ns" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "2.7.3.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "r", "ts_ms": 1486500577691, "ts_us": 1486500577691547, "ts_ns": 1486500577691547930 } }
イベントのこの部分ははるかに長くなりますが、イベントの キー と同様に
schema
とpayload
もあります。schema
には、dbserver1.inventory.customers.Envelope
(バージョン 1) という名前の Kafka Connect スキーマが含まれており、次の 5 つのフィールドがあります。op
-
操作のタイプを記述する文字列値が含まれる必須フィールド。MySQL コネクターの値は、
c
(作成または挿入)、u
(更新)、d
(削除)、およびr
(読み取り) です (スナップショットの場合)。 before
-
任意のフィールド。存在する場合は、イベント発生 前 の行の状態が含まれます。この構造は、
dbserver1.inventory.customers.Value
Kafka Connect スキーマによって記述されます。dbserver1
コネクターは、このスキーマをinventory.customers
テーブルのすべての行に使用します。 after
-
任意のフィールド。存在する場合は、イベント発生 後 の行の状態が含まれます。この構造は、
before
で使用されるのと同じdbserver1.inventory.customers.Value
Kafka Connect スキーマで記述されます。 source
-
イベントのソースメタデータを記述する構造が含まれる必須のフィールド。MySQL の場合、次の複数のフィールドが含まれます。コネクター名、イベントが記録された
binlog
ファイルの名前、binlog
ファイルでのイベント発生位置、イベント内の行 (複数ある場合)、影響を受けるデータベースおよびテーブルの名前、変更を行った MySQL スレッド ID、このイベントはスナップショットの一部であったかどうか、MySQL サーバー ID (ある場合)、および秒単位のタイムスタンプ。 ts_ms
- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。
注記イベントの JSON 表現は、記述される行よりもはるかに長くなります。これは、Kafka Connect はすべてのイベントのキーと値とともに、ペイロード を記述する スキーマ を提供するためです。今後、この構造は変更される可能性があります。ただし、特に使用する側のアプリケーションが時間とともに進化する場合は、キーと値のスキーマがイベント自体にあると、メッセージを理解するのが非常に容易になります。
Debezium MySQL コネクターは、データベーステーブルの構造に基づいてこれらのスキーマを構築します。DDL ステートメントを使用して MySQL データベースのテーブル定義を変更する場合、コネクターはこの DDL ステートメントを読み取り、Kafka Connect スキーマを更新します。これは、イベント発生時にイベントの発生元となったテーブルとまったく同じように各イベントを構造化する唯一の方法です。ただし、単一テーブルのイベントがすべて含まれる Kafka トピックには、テーブル定義の各状態に対応するイベントが含まれる場合があります。
JSON コンバーターはすべてのメッセージにキーと値のスキーマを含めるため、非常に詳細なイベントを生成します。
イベントの キー および 値 スキーマを、
inventory
データベースの状態と比較します。MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
これは、確認したイベントレコードがデータベースのレコードと一致することを示しています。