第5章 トピックからのメッセージの送受信
OpenShift にインストールされている Kafka クラスターとの間でメッセージを送受信します。
この手順では、Kafka クライアントを使用してメッセージを生成および消費する方法を説明します。クライアントを OpenShift にデプロイするか、ローカル Kafka クライアントを OpenShift クラスターに接続できます。いずれかまたは両方のオプションを使用して、Kafka クラスターのインストールをテストできます。ローカルクライアントの場合は、OpenShift ルート接続を使用して Kafka クラスターにアクセスします。
oc コマンドラインツールを使用して、Kafka クライアントをデプロイして実行します。
前提条件
ローカルプロデューサーおよびコンシューマーの場合:
- OpenShift で実行している Kafka クラスターへの外部アクセス用のルートを作成 している。
- Streams for Apache Kafka ソフトウェアダウンロードページ から最新の Kafka クライアントバイナリーにアクセスできる。
OpenShift クラスターにデプロイされた Kafka クライアントからのメッセージの送受信
プロデューサーおよびコンシューマーのクライアントを OpenShift クラスターにデプロイします。その後、クライアントを使用して、同じ namespace 内の Kafka クラスターとの間でメッセージを送受信できます。このデプロイメントでは、Kafka を実行するために Streams for Apache Kafka コンテナーイメージを使用します。
ocコマンドラインインターフェイスを使用して、Kafka プロデューサーをデプロイします。この例では、Kafka クラスター
my-clusterに接続する Kafka プロデューサーをデプロイします。my-topicという名前のトピックが作成されます。Kafka プロデューサーの OpenShift へのデプロイ
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記接続に失敗した場合は、Kafka クラスターが実行中で、正しいクラスター名が
bootstrap-serverとして指定されていることを確認してください。- コマンドプロンプトから、いくつかのメッセージを入力します。
-
OpenShift Web コンソールで Home > Projects ページに移動し、作成した
streams-kafkaプロジェクトを選択します。 -
Pod のリストから、
kafka-producerをクリックして、プロデューサー Pod の詳細を表示します。 - Logs ページを選択して、入力したメッセージが存在することを確認します。
ocコマンドラインインターフェイスを使用して、Kafka コンシューマーをデプロイします。Kafka コンシューマーの OpenShift へのデプロイ
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コンシューマーは
my-topicに生成されたメッセージを消費しました。- コマンドプロンプトから、コンシューマーコンソールに着信メッセージが表示されていることを確認します。
-
OpenShift Web コンソールで Home > Projects ページに移動し、作成した
streams-kafkaプロジェクトを選択します。 -
Pod のリストから、
kafka-consumerをクリックして、コンシューマー Pod の詳細を表示します。 - Logs ページを選択して、消費したメッセージが存在することを確認します。
ローカルで実行されている Kafka クライアントからのメッセージの送受信
コマンドラインインターフェイスを使用して、ローカルマシンで Kafka プロデューサーとコンシューマーを実行します。
Streams for Apache Kafka ソフトウェアダウンロードページ から Streams for Apache Kafka <バージョン> バイナリーをダウンロードして展開します。
amq-streams-<version>-bin.zipファイルを任意の場所に解凍します。コマンドラインインターフェイスを開き、トピック
my-topicと TLS の認証プロパティーを使用して Kafka コンソールプロデューサーを起動します。OpenShift ルートを使用して Kafka ブローカーにアクセスする のに必要なプロパティーを追加します。
- 使用している OpenShift ルートのホスト名およびポート 443 を使用します。
パスワードと、ブローカー証明書用に作成したトラストストアへの参照を使用します。
ローカル Kafka プロデューサーの起動
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- プロデューサーが実行しているコマンドラインインターフェイスにメッセージを入力します。
- Enter を押してメッセージを送信します。
新しいコマンドラインインターフェイスタブまたはウィンドウを開き、Kafka コンソールコンシューマーを起動してメッセージを受信します。
プロデューサーと同じ接続の詳細を使用します。
ローカル Kafka コンシューマーの起動
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - コンシューマーコンソールに受信メッセージが表示されることを確認します。
- Crtl+C を押して、Kafka コンソールプロデューサーとコンシューマーを終了します。