第4章 変更イベントの表示
Debezium MySQL コネクターのデプロイ後に、inventory データベースへの変更のキャプチャーが開始されます。
コネクターが起動すると、一連の Apache Kafka トピックにイベントが書き込まれます。各トピックは、MySQL データベース内のテーブルの 1 つを表します。各トピックの名前は、データベースサーバーの名前 dbserver1 で始まります。
コネクターは、次の Kafka トピックに書き込みます。
dbserver1- 変更がキャプチャーされるテーブルに適用される DDL ステートメントが書き込まれるスキーマ変更トピック。
dbserver1.inventory.products-
inventoryデータベースのproductsテーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.products_on_hand-
inventoryデータベースのproducts_on_handテーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.customers-
inventoryデータベースのcustomersテーブルの変更イベントレコードを受け取ります。 dbserver1.inventory.orders-
inventoryデータベースのordersテーブルの変更イベントレコードを受け取ります。
このチュートリアルの残りの部分では、dbserver1.inventory.customers Kafka トピックを調べます。トピックを詳しく見ていくと、さまざまな種類の変更イベントがどのように表されているかがわかり、各イベントをキャプチャーしたコネクターに関する情報が見つかります。
このチュートリアルには、次のセクションが含まれています。
4.1. 作成 イベントの表示 リンクのコピーリンクがクリップボードにコピーされました!
dbserver1.inventory.customers トピックを表示すると、MySQL コネクターが inventory データベースの 作成 イベントをどのようにキャプチャーしたが分かります。この場合、作成 イベントは、データベースに追加された新規顧客をキャプチャーします。
手順
新しいターミナルを開き、
kafka-console-consumerを使用してトピックの最初からdbserver1.inventory.customersトピックを使用します。このコマンドは、Kafka (
my-cluster-kafka-0) を実行している Pod で簡単なコンシューマー (kafka-console-consumer.sh) を実行します。oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customersCopy to Clipboard Copied! Toggle word wrap Toggle overflow コンシューマーは、
customersテーブルの各行に 1 つずつ、4 つのメッセージ (JSON 形式) を返します。各メッセージには、対応するテーブル行のイベントレコードが含まれます。各イベントには、キー と 値 という 2 つの JSON ドキュメントがあります。キーは行のプライマリーキーに対応し、値は行の詳細 (行に含まれるフィールド、各フィールドの値、および行で実行された操作のタイプ) を表します。
最後のイベントでは、キー の詳細を確認します。
最後のイベントの キー の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow このイベントには、
schemaとpayloadの 2 つの部分があります。schemaには、ペイロードの内容を記述する Kafka Connect スキーマが含まれています。この場合、ペイロードはdbserver1.inventory.customers.Keyという名前のstructです。これはオプションではなく、必須フィールド (int32型のID) を 1 つ持っています。payloadには、値が1004のidフィールドが 1 つあります。イベントの キー を確認すると、このイベントは
idのプライマリーキー列の値が1004であるinventory.customersテーブルの行に提供されることが分かります。同じイベントの 値 の詳細を確認します。
イベントの 値 は、行が作成されたことを示し、その行に含まれる内容 (この場合は挿入された行の
id、first_name、last_name、およびemail) を表します。最後のイベントの 値 の詳細は以下のとおりです (書式を調整して読みやすくしてあります)。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow イベントのこの部分ははるかに長くなりますが、イベントの キー と同様に
schemaとpayloadもあります。schemaには、dbserver1.inventory.customers.Envelope(バージョン 1) という名前の Kafka Connect スキーマが含まれており、次の 5 つのフィールドがあります。op-
操作のタイプを記述する文字列値が含まれる必須フィールド。MySQL コネクターの値は、
c(作成または挿入)、u(更新)、d(削除)、およびr(読み取り) です (スナップショットの場合)。 before-
任意のフィールド。存在する場合は、イベント発生 前 の行の状態が含まれます。この構造は、
dbserver1.inventory.customers.ValueKafka Connect スキーマによって記述されます。dbserver1コネクターは、このスキーマをinventory.customersテーブルのすべての行に使用します。 after-
任意のフィールド。存在する場合は、イベント発生 後 の行の状態が含まれます。この構造は、
beforeで使用されるのと同じdbserver1.inventory.customers.ValueKafka Connect スキーマで記述されます。 source-
イベントのソースメタデータを記述する構造が含まれる必須のフィールド。MySQL の場合、次の複数のフィールドが含まれます。コネクター名、イベントが記録された
binlogファイルの名前、binlogファイルでのイベント発生位置、イベント内の行 (複数ある場合)、影響を受けるデータベースおよびテーブルの名前、変更を行った MySQL スレッド ID、このイベントはスナップショットの一部であったかどうか、MySQL サーバー ID (ある場合)、および秒単位のタイムスタンプ。 ts_ms- 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックを使用) が含まれます。
注記イベントの JSON 表現は、記述される行よりもはるかに長くなります。これは、Kafka Connect はすべてのイベントのキーと値とともに、ペイロード を記述する スキーマ を提供するためです。今後、この構造は変更される可能性があります。ただし、特に使用する側のアプリケーションが時間とともに進化する場合は、キーと値のスキーマがイベント自体にあると、メッセージを理解するのが非常に容易になります。
Debezium MySQL コネクターは、データベーステーブルの構造に基づいてこれらのスキーマを構築します。DDL ステートメントを使用して MySQL データベースのテーブル定義を変更する場合、コネクターはこの DDL ステートメントを読み取り、Kafka Connect スキーマを更新します。これは、イベント発生時にイベントの発生元となったテーブルとまったく同じように各イベントを構造化する唯一の方法です。ただし、単一テーブルのイベントがすべて含まれる Kafka トピックには、テーブル定義の各状態に対応するイベントが含まれる場合があります。
JSON コンバーターはすべてのメッセージにキーと値のスキーマを含めるため、非常に詳細なイベントを生成します。
イベントの キー および 値 スキーマを、
inventoryデータベースの状態と比較します。MySQL コマンドラインクライアントを実行しているターミナルで、以下のステートメントを実行します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow これは、確認したイベントレコードがデータベースのレコードと一致することを示しています。