2.6. クラスターの使用
この手順では、メッセージを送受信するために Kafka プロデューサーとコンシューマーを開始する方法を示します。
前提条件
- AMQ Streams がホストにインストールされている。
- ZooKeeper および Kafka が稼働している。
この手順では、トピックが自動的に作成されます。または、クラスターを使用する前にトピックを設定および作成することもできます。トピックの作成と管理の詳細は、トピック を参照してください。
手順
Kafka コンソールプロデューサーを起動します。
/opt/kafka/bin/kafka-console-producer.sh --broker-list <bootstrap-address> --topic <topic-name>
以下に例を示します。
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
- プロデューサーが実行しているコンソールにメッセージを入力します。
Enter キー を押して送信します。
Kafka が新しいトピックを自動的に作成すると、トピックが存在しないという警告メッセージが表示される場合があります。
WARN Error while fetching metadata with correlation id 39 : {4-3-16-topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {my-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
新しいトピックへのメッセージの送信を開始した後、警告メッセージが再び表示されることはありません。
auto.create.topics.enable
設定プロパティーは、トピックの自動作成を有効にします。メッセージ受信側を開始します。
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <bootstrap-address> --topic <topic-name> --from-beginning
以下に例を示します。
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
- コンシューマーコンソールに受信メッセージが表示されることを確認します。
- Crtl+C を押して、Kafka コンソールプロデューサーとコンシューマーを終了します。