2.4. デプロイメントの確認
コネクターが起動すると、設定されたデータベースのスナップショットが実行され、指定したテーブルごとにトピックが作成されます。
前提条件
- 「Red Hat Enterprise Linux 上の Streams for Apache Kafka を使用した Debezium のデプロイ」 の指示に従って、Red Hat Enterprise Linux にコネクターをデプロイしている。
手順
ホストのターミナルウィンドウで、次のコマンドを入力して、Kafka Connect API からコネクターのリストを要求します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -H "Accept:application/json" localhost:8083/connectors/
$ curl -H "Accept:application/json" localhost:8083/connectors/
クエリーは、デプロイされたコネクターの名前を返します。以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow ["inventory-connector"]
["inventory-connector"]
ホストのターミナルウィンドウで次のコマンドを入力して、コネクターが実行しているタスクを表示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
コマンドは、以下の例のような出力を返します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", "task": 0 } ] }
HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", "task": 0 } ] }
Kafka クラスター内のトピックのリストを表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、以下のシェルスクリプトを実行します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow ./kafka-topics.sh --bootstrap-server=localhost:9092 --list
./kafka-topics.sh --bootstrap-server=localhost:9092 --list
Kafka ブローカーは、コネクターが作成するトピックのリストを返します。使用可能なトピックは、コネクターの
snapshot.mode
、snapshot.include.collection.list
、およびtable.include.list
の設定プロパティーの設定によって異なります。デフォルトでは、コネクターはデータベース内の非システムテーブルごとにトピックを作成します。トピックの内容を表示します。
ターミナルウィンドウから/opt/kafka/bin/
に移動し、kafka-console-consumer.sh
シェルスクリプトを実行して、前のコマンドで返されたトピックの 1 つの内容を表示します。
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow ./kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
./kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.1 Debezium 変更イベントの内容
Copy to Clipboard Copied! Toggle word wrap Toggle overflow {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00001","connector":"mysql","name":"inventory_connector_mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から読み込み ("op" ="r"
) イベントを生成したことを示しています。product_id
レコードの"before"
状態はnull
であり、レコードに以前の値が存在しないことを示しています。"after"
状態は、product_id
101
を持つ項目のquantity
が3
であることを示しています。
次の手順
各コネクターで使用できる設定の詳細、および変更データのキャプチャーを有効にするためにソースデータベースを設定する方法は、Debezium ユーザーガイド を参照してください。