第 5 章 从主题发送和接收信息
向在 OpenShift 上安装的 Kafka 集群发送或接收信息。
此流程描述了如何使用 Kafka 客户端生成和使用信息。您可以将客户端部署到 OpenShift,或将本地 Kafka 客户端连接到 OpenShift 集群。您可以使用其中一个或这两个选项来测试 Kafka 集群安装。对于本地客户端,您可以使用 OpenShift 路由连接访问 Kafka 集群。
您将使用 oc
命令行工具来部署并运行 Kafka 客户端。
先决条件
对于生成者和消费者:
从部署到 OpenShift 集群的 Kafka 客户端发送和接收信息
将生成者和消费者客户端部署到 OpenShift 集群。然后,您可以使用客户端从同一命名空间中的 Kafka 集群发送和接收信息。部署使用 Streams for Apache Kafka 容器镜像来运行 Kafka。
使用
oc
命令行界面部署 Kafka producer。本例部署一个 Kafka 生成者,它连接到 Kafka 集群
my-cluster
名为
my-topic
的主题被创建。将 Kafka producer 部署到 OpenShift
oc run kafka-producer -ti \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic
注意如果连接失败,请检查 Kafka 集群是否正在运行,并将正确的集群名称指定为
bootstrap-server
。- 从命令提示符输入多个信息。
-
在 OpenShift web 控制台中进入到 Home > Projects 页面,然后选择
amq-streams-kafka
项目。 -
从 pod 列表中,点
kafka-producer
以查看 producer pod 详情。 - 选择 Logs 页面,以检查您输入的消息是否存在。
使用
oc
命令行界面部署 Kafka 用户。将 Kafka 使用者部署到 OpenShift
oc run kafka-consumer -ti \ --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 \ --rm=true \ --restart=Never \ -- bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-bootstrap:9092 \ --topic my-topic \ --from-beginning
消费者使用生成给
my-topic
的消息。- 从命令提示符,确认您在使用者控制台中看到传入的信息。
-
在 OpenShift web 控制台中进入到 Home > Projects 页面,然后选择
amq-streams-kafka
项目。 -
从 pod 列表中,点
kafka-consumer
查看使用者 pod 详情。 - 选择 Logs 页面,以检查您消耗的消息是否存在。
从本地运行的 Kafka 客户端发送和接收信息
使用命令行界面在本地机器上运行 Kafka 制作者和使用者。
从 Streams for Apache Kafka 软件下载页面下载并提取 Apache Kafka < version > 二进制文件的 Streams。
解压
amq-streams-<version>-bin.zip
文件到任何目标。打开命令行界面,并使用主题
my-topic
和 TLS 的身份验证属性启动 Kafka 控制台制作者。添加使用 OpenShift 路由访问 Kafka 代理所需的属性。
- 将主机名和端口 443 用于您正在使用的 OpenShift 路由。
使用密码并引用您为代理证书创建的信任存储。
启动本地 Kafka producer
kafka-console-producer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --producer-property security.protocol=SSL \ --producer-property ssl.truststore.password=password \ --producer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic
- 在运行制作者的命令行界面中键入您的消息。
- 按 enter 来发送邮件。
打开新的命令行界面标签页或窗口,并启动 Kafka 控制台使用者来接收信息。
使用与生成者相同的连接详情。
启动本地 Kafka 消费者
kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-listener1-bootstrap-amq-streams-kafka.apps.ci-ln-50kcyvt-72292.origin-ci-int-gce.dev.rhcloud.com:443 \ --consumer-property security.protocol=SSL \ --consumer-property ssl.truststore.password=password \ --consumer-property ssl.truststore.location=client.truststore.jks \ --topic my-topic --from-beginning
- 确认您在使用者控制台中看到传入的信息。
- 按 Crtl+C 退出 Kafka 控制台生成者和消费者。