2.4. デプロイメントの確認
コネクターが起動すると、設定されたデータベースのスナップショットが実行され、指定したテーブルごとにトピックが作成されます。
前提条件
「Red Hat Enterprise Linux での AMQ Streams を使用した Debezium のデプロイ」 の手順に基づいて、Red Hat Enterprise Linux にコネクターをデプロイしています。以下の手順を実行します。
ホストのターミナルウィンドウで、次のコマンドを入力して、Kafka Connect API からコネクターのリストを要求します。
$ curl -H "Accept:application/json" localhost:8083/connectors/
クエリーは、デプロイされたコネクターの名前を返します。以下に例を示します。
["inventory-connector"]
ホストのターミナルウィンドウで次のコマンドを入力して、コネクターが実行しているタスクを表示します。
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
コマンドは、以下の例のような出力を返します。
HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", "task": 0 } ] }
Kafka クラスター内のトピックのリストを表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、以下のシェルスクリプトを実行します。./kafka-topics.sh --bootstrap-server=localhost:9092 --list
Kafka ブローカーは、コネクターが作成するトピックのリストを返します。使用可能なトピックは、コネクターの
snapshot.mode
、snapshot.include.collection.list
、およびtable.include.list
の設定プロパティーの設定によって異なります。デフォルトでは、コネクターはデータベース内の非システムテーブルごとにトピックを作成します。トピックの内容を表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、kafka-console-consumer.sh
シェルスクリプトを実行して、前のコマンドで返されたトピックの 1 つの内容を表示します。
以下に例を示します。
./kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.1 統合変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.4.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から読み込み ("op" ="r"
) イベントを生成したことを示しています。product_id
レコードの"before"
状態はnull
であり、レコードに以前の値が存在しないことを示しています。"after"
状態は、product_id
101
を持つ項目のquantity
が3
であることを示しています。
次の手順
各コネクターで使用できる設定の詳細、および変更データのキャプチャーを有効にするためにソースデータベースを設定する方法は、Debezium ユーザーガイド を参照してください。