2.3. コネクターのデプロイメントの確認
コネクターがエラーなしで正常に起動すると、コネクターでキャプチャーするように設定した各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが AMQ Streams on OpenShift にデプロイされている。
-
OpenShift
oc
CLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnector
リソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnector
を入力します。 - KafkaConnectors リストから、確認するコネクターの名前をクリックします (例: inventory-connector)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc describe KafkaConnector <connector-name> -n <project>
以下に例を示します。
oc describe KafkaConnector inventory-connector -n debezium
このコマンドは、以下の出力のようなステータス情報を返します。
例2.3
KafkaConnector
リソースのステータスName: inventory-connector Namespace: debezium Labels: strimzi.io/cluster=my-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory_connector inventory_connector.inventory.addresses inventory_connector.inventory.customers inventory_connector.inventory.geom inventory_connector.inventory.orders inventory_connector.inventory.products inventory_connector.inventory.products_on_hand Events: <none>
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopic
を入力します。 - KafkaTopics リストから確認するトピックの名前をクリックします (例: inventory-connector.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc get kafkatopics
このコマンドは、以下の出力のようなステータス情報を返します。
例2.4
KafkaTopic
リソースのステータスNAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs my-cluster 1 1 True connect-cluster-offsets my-cluster 25 1 True connect-cluster-status my-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True inventory-connector---a96f69b23d6118ff415f772679da623fbbb99421 my-cluster 1 1 True inventory-connector.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 my-cluster 1 1 True inventory-connector.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b my-cluster 1 1 True inventory-connector.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 my-cluster 1 1 True inventory-connector.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d my-cluster 1 1 True inventory-connector.inventory.products---df0746db116844cee2297fab611c21b56f82dcef my-cluster 1 1 True inventory-connector.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 my-cluster 1 1 True schema-changes.inventory my-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
以下に例を示します。
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_hand
トピック名を指定する形式は、手順 1 で返された
oc describe
コマンドと同じです (例:inventory_connector.inventory.addresses
)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.5 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から 読み込み (op" ="r"
) イベントを生成したことを示しています。product_id
レコードのbefore
状態はnull
であり、レコードに以前の値が存在しないことを示します。"after"
状態は、product_id
101
を持つ項目のquantity
が3
であることを示しています。
これで Debezium コネクターが inventory
データベースからキャプチャーする変更イベントを表示する 準備が整いました。