12.2. Kafka Bridge quickstart
使用此快速入门在 Red Hat Enterprise Linux 上尝试 AMQ Streams Kafka Bridge。您将学习如何:
- 安装 Kafka Bridge
- 生成信息到 Kafka 集群中的主题和分区
- 创建 Kafka Bridge 使用者
- 执行基本的消费者操作,如将消费者订阅到主题并检索您生成的消息
在这个快速入门中,HTTP 请求被格式化为 curl 命令,您可以复制并粘贴到终端。
确定您有先决条件,然后按照本章中提供的顺序按照任务进行操作。
关于数据格式
在本快速入门中,您将以 JSON 格式生成和使用消息,而不是二进制。有关示例请求中使用的数据格式和 HTTP 标头的更多信息,请参阅 第 12.1.1 节 “身份验证和加密”。
Quickstart 的先决条件
12.2.1. 在本地部署 Kafka Bridge 复制链接链接已复制到粘贴板!
将 AMQ Streams Kafka Bridge 的实例部署到主机。使用安装存档提供的 application.properties
文件以应用默认配置设置。
流程
打开
application.properties
文件,并检查是否定义了默认的HTTP 相关设置
:http.enabled=true http.host=0.0.0.0 http.port=8080
http.enabled=true http.host=0.0.0.0 http.port=8080
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 这会将 Kafka Bridge 配置为侦听端口 8080 上的请求。
使用配置属性作为参数运行 Kafka Bridge 脚本:
./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
接下来要做什么
12.2.2. 向主题和分区生成信息 复制链接链接已复制到粘贴板!
使用 topics 端点将消息生成到 JSON 格式的主题。
您可以为请求正文中的消息指定目标分区,如下所示。分区 端点提供了一种替代方法,为所有消息指定单一目标分区作为路径参数。
流程
使用
kafka-topics.sh
实用程序创建 Kafka 主题:bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1 --config retention.ms=7200000 --config segment.bytes=1073741824
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1 --config retention.ms=7200000 --config segment.bytes=1073741824
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 指定三个分区。
验证是否已创建该主题:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 Kafka Bridge,为您创建的主题生成三个信息:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
sales-lead-0001
发送到基于密钥哈希的分区。 -
Sales-lead-0002
直接发送到分区 2。 -
Sales-lead-0003
使用 round-robin 方法发送到bridge-quickstart-topic
主题中的分区。
-
如果请求成功,Kafka Bridge 会返回一个
偏移
数组,以及200
(OK)代码和application/vnd.kafka.v2+json
的content-type
标头。对于每个消息,偏移
数组描述:- 消息发送到的分区
分区的当前消息偏移
响应示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
接下来要做什么
在生成消息到主题和分区后,创建一个 Kafka Bridge 消费者。
其他资源
- API 参考文档中的 POST /topics/{topicname}。
- API 参考文档中 POST /topics/{topicname}/partitions/{partitionid}。
12.2.3. 创建 Kafka Bridge 使用者 复制链接链接已复制到粘贴板!
在 Kafka 集群上执行任何消费者操作前,您必须首先使用 consumers 端点创建消费者。消费者被称为 Kafka Bridge 消费者。
流程
在名为
bridge-quickstart-consumer-group
的新消费者组中创建一个 Kafka Bridge 使用者:Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
消费者名为
bridge-quickstart-consumer
,嵌入式数据格式被设置为json
。 消费者不会自动向日志提交偏移,因为
enable.auto.commit
设置为false
。稍后您将在此快速入门中手动提交偏移量。注意如果您没有在请求正文中指定消费者名称,则 Kafka Bridge 会生成一个随机消费者名称。
如果请求成功,Kafka Bridge 会返回响应正文中的使用者 ID (
instance_id
)和基本 URL (base_uri
),以及200
(OK)代码。响应示例
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
-
消费者名为
-
复制基础 URL (
base_uri
),以便在这个快速入门中的其他消费者操作中使用。
接下来要做什么
现在,您已创建了 Kafka Bridge 消费者,您可以将其订阅到主题。
其他资源
- API 参考文档中的 POST /consumers/{groupid}。
12.2.4. 将 Kafka Bridge 消费者订阅到主题 复制链接链接已复制到粘贴板!
使用 订阅 端点将 Kafka Bridge 使用者订阅到一个或多个主题。订阅后,消费者开始接收生成到该主题的所有消息。
流程
将消费者订阅之前创建的
bridge-quickstart-topic
主题,在 Produc ing 信息到主题和分区 :Copy to Clipboard Copied! Toggle word wrap Toggle overflow topics
数组可以包含单个主题(如上所示)或多个主题。如果要将消费者订阅与正则表达式匹配的多个主题,您可以使用topic_pattern
字符串而不是topics
数组。如果请求成功,Kafka Bridge 只会返回一个
204 No Content
代码。
接下来要做什么
将 Kafka Bridge 消费者订阅到主题后,您可以从 消费者检索信息。
其他资源
12.2.5. 从 Kafka Bridge consumer 检索最新的信息 复制链接链接已复制到粘贴板!
通过从 records 端点请求数据,从 Kafka Bridge 使用者检索最新的消息。在生产环境中,HTTP 客户端可以重复调用此端点(在循环中)。
流程
- 为 Kafka Bridge consumer 生成额外消息,如 Produc ing 消息到主题和分区 中所述。
向
记录
端点提交GET
请求:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 创建并订阅 Kafka Bridge 消费者后,第一个 GET 请求将返回空响应,因为轮询操作会触发重新平衡过程来分配分区。
重复步骤 2,以从 Kafka Bridge 使用者检索信息。
Kafka Bridge 返回一条信息 数组,它带有 200 (OK)代码,它包括主题名称、键、值、分区和偏移来来包括
200
(OK)代码。默认情况下,消息从最新的偏移中检索。Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注意如果返回空响应,请生成更多记录到消费者,如 Produc ing 消息到主题和分区 中所述,然后尝试再次检索消息。
接下来要做什么
从 Kafka Bridge 消费者检索信息后,尝试向 日志提交偏移。
其他资源
12.2.6. 向日志提交偏移量 复制链接链接已复制到粘贴板!
使用 偏移 端点手动向日志提交 Kafka Bridge 消费者接收的所有消息。这是必要的,因为在 创建 Kafka Bridge 消费者 中的 Kafka Bridge 消费者 被配置,其 enable.auto.commit
设置为 false
。
流程
将偏移提交到
bridge-quickstart-consumer
的日志:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 因为没有提交请求正文,所以针对消费者收到的所有记录提交偏移。另外,请求正文可以包含数组(OffsetCommitSeekList),用于指定您要为其提交偏移的主题和分区。
如果请求成功,Kafka Bridge 只会返回一个
204 No Content
代码。
接下来要做什么
向日志提交偏移后,尝试查找查找 偏移的端点。
其他资源
12.2.7. 寻找分区偏移量 复制链接链接已复制到粘贴板!
使用 位置 端点配置 Kafka Bridge 消费者,从特定偏移中检索分区的消息,然后从最新的偏移中检索。这在 Apache Kafka 中被称为 seek 操作。
流程
请参阅
quickstart-bridge-topic
主题的分区 0 的特定偏移:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果请求成功,Kafka Bridge 只会返回一个
204 No Content
代码。向
记录
端点提交GET
请求:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka Bridge 从您看到的偏移返回信息。
通过查找同一分区的最后一个偏移来恢复默认消息检索行为。这一次,使用 位置/结束 端点。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果请求成功,Kafka Bridge 会返回另一个
204 No Content
代码。
您还可以使用 positions/beginning 端点查找一个或多个分区的第一个偏移。
接下来要做什么
在这个快速入门中,您使用 AMQ Streams Kafka Bridge 在 Kafka 集群上执行几个常见操作。现在 ,您可以删除之前创建的 Kafka Bridge 使用者。
12.2.8. 删除 Kafka Bridge 使用者 复制链接链接已复制到粘贴板!
最后,删除您在此快速入门中使用的 Kafa Bridge 使用者。
流程
通过向 实例 端点发送
DELETE
请求来删除 Kafka Bridge 使用者。curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果请求成功,Kafka Bridge 只会返回一个
204 No Content
代码。
其他资源
- API 参考文档中 DELETE /consumers/{groupid}/instances/{name}。