第5章 トピックからのメッセージの送受信
OpenShift にインストールされている Kafka クラスターとの間でメッセージを送受信します。
この手順では、Kafka クライアントを使用してメッセージを生成および消費する方法を説明します。クライアントを OpenShift にデプロイするか、ローカル Kafka クライアントを OpenShift クラスターに接続できます。いずれかまたは両方のオプションを使用して、Kafka クラスターのインストールをテストできます。ローカルクライアントの場合は、OpenShift ルート接続を使用して Kafka クラスターにアクセスします。
oc
コマンドラインツールを使用して、Kafka クライアントをデプロイして実行します。
前提条件
ローカルプロデューサーおよびコンシューマーの場合:
- OpenShift で実行している Kafka クラスターへの外部アクセス用のルートを作成 している。
- Streams for Apache Kafka ソフトウェアダウンロードページ から最新の Kafka クライアントバイナリーにアクセスできる。
OpenShift クラスターにデプロイされた Kafka クライアントからのメッセージの送受信
プロデューサーおよびコンシューマーのクライアントを OpenShift クラスターにデプロイします。その後、クライアントを使用して、同じ namespace 内の Kafka クラスターとの間でメッセージを送受信できます。このデプロイメントでは、Kafka を実行するために Streams for Apache Kafka コンテナーイメージを使用します。
oc
コマンドラインインターフェイスを使用して、Kafka プロデューサーをデプロイします。この例では、Kafka クラスター
my-cluster
に接続する Kafka プロデューサーをデプロイします。my-topic
という名前のトピックが作成されます。Kafka プロデューサーの OpenShift へのデプロイ
Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc run kafka-producer -ti \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic
oc run kafka-producer -ti \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic
注記接続に失敗した場合は、Kafka クラスターが実行中で、正しいクラスター名が
bootstrap-server
として指定されていることを確認してください。- コマンドプロンプトから、いくつかのメッセージを入力します。
-
OpenShift Web コンソールで Home > Projects ページに移動し、作成した
amq-streams-kafka
プロジェクトを選択します。 -
Pod のリストから、
kafka-producer
をクリックして、プロデューサー Pod の詳細を表示します。 - Logs ページを選択して、入力したメッセージが存在することを確認します。
oc
コマンドラインインターフェイスを使用して、Kafka コンシューマーをデプロイします。Kafka コンシューマーの OpenShift へのデプロイ
Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc run kafka-consumer -ti \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic \ --from-beginning
oc run kafka-consumer -ti \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic \ --from-beginning
コンシューマーは
my-topic
に生成されたメッセージを消費しました。- コマンドプロンプトから、コンシューマーコンソールに着信メッセージが表示されていることを確認します。
-
OpenShift Web コンソールで Home > Projects ページに移動し、作成した
amq-streams-kafka
プロジェクトを選択します。 -
Pod のリストから、
kafka-consumer
をクリックして、コンシューマー Pod の詳細を表示します。 - Logs ページを選択して、消費したメッセージが存在することを確認します。
ローカルで実行されている Kafka クライアントからのメッセージの送受信
コマンドラインインターフェイスを使用して、ローカルマシンで Kafka プロデューサーとコンシューマーを実行します。
Streams for Apache Kafka ソフトウェアダウンロードページ から Streams for Apache Kafka <バージョン> バイナリーをダウンロードして展開します。
amq-streams-<version>-bin.zip
ファイルを任意の場所に解凍します。コマンドラインインターフェイスを開き、トピック
my-topic
と TLS の認証プロパティーを使用して Kafka コンソールプロデューサーを起動します。OpenShift ルートを使用して Kafka ブローカーにアクセスする のに必要なプロパティーを追加します。
- 使用している OpenShift ルートのホスト名およびポート 443 を使用します。
パスワードと、ブローカー証明書用に作成したトラストストアへの参照を使用します。
ローカル Kafka プロデューサーの起動
Copy to Clipboard Copied! Toggle word wrap Toggle overflow kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.password=password \ --producer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic
kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.password=password \ --producer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic
- プロデューサーが実行しているコマンドラインインターフェイスにメッセージを入力します。
- Enter を押してメッセージを送信します。
新しいコマンドラインインターフェイスタブまたはウィンドウを開き、Kafka コンソールコンシューマーを起動してメッセージを受信します。
プロデューサーと同じ接続の詳細を使用します。
ローカル Kafka コンシューマーの起動
Copy to Clipboard Copied! Toggle word wrap Toggle overflow kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.password=password \ --consumer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic --from-beginning
kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.password=password \ --consumer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic --from-beginning
- コンシューマーコンソールに受信メッセージが表示されることを確認します。
- Crtl+C を押して、Kafka コンソールプロデューサーとコンシューマーを終了します。