12.2. Kafka Bridge quickstart
使用此快速入门在 Red Hat Enterprise Linux 上尝试 AMQ Streams Kafka Bridge。您将学习如何:
- 安装 Kafka Bridge
- 生成信息到 Kafka 集群中的主题和分区
- 创建 Kafka Bridge 使用者
- 执行基本的消费者操作,如将消费者订阅到主题并检索您生成的消息
在这个快速入门中,HTTP 请求被格式化为 curl 命令,您可以复制并粘贴到终端。
确定您有先决条件,然后按照本章中提供的顺序按照任务进行操作。
关于数据格式
在本快速入门中,您将以 JSON 格式生成和使用消息,而不是二进制。有关示例请求中使用的数据格式和 HTTP 标头的更多信息,请参阅 第 12.1.1 节 “身份验证和加密”。
Quickstart 的先决条件
12.2.1. 在本地部署 Kafka Bridge 复制链接链接已复制到粘贴板!
将 AMQ Streams Kafka Bridge 的实例部署到主机。使用安装存档提供的 application.properties 文件以应用默认配置设置。
流程
打开
application.properties文件,并检查是否定义了默认的HTTP 相关设置:http.enabled=true http.host=0.0.0.0 http.port=8080这会将 Kafka Bridge 配置为侦听端口 8080 上的请求。
使用配置属性作为参数运行 Kafka Bridge 脚本:
./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
接下来要做什么
12.2.2. 向主题和分区生成信息 复制链接链接已复制到粘贴板!
使用 topics 端点将消息生成到 JSON 格式的主题。
您可以为请求正文中的消息指定目标分区,如下所示。分区 端点提供了一种替代方法,为所有消息指定单一目标分区作为路径参数。
流程
使用
kafka-topics.sh实用程序创建 Kafka 主题:bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1 --config retention.ms=7200000 --config segment.bytes=1073741824指定三个分区。
验证是否已创建该主题:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic使用 Kafka Bridge,为您创建的主题生成三个信息:
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'-
sales-lead-0001发送到基于密钥哈希的分区。 -
Sales-lead-0002直接发送到分区 2。 -
Sales-lead-0003使用 round-robin 方法发送到bridge-quickstart-topic主题中的分区。
-
如果请求成功,Kafka Bridge 会返回一个
偏移数组,以及200(OK)代码和application/vnd.kafka.v2+json的content-type标头。对于每个消息,偏移数组描述:- 消息发送到的分区
分区的当前消息偏移
响应示例
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
接下来要做什么
在生成消息到主题和分区后,创建一个 Kafka Bridge 消费者。
其他资源
- API 参考文档中的 POST /topics/{topicname}。
- API 参考文档中 POST /topics/{topicname}/partitions/{partitionid}。
12.2.3. 创建 Kafka Bridge 使用者 复制链接链接已复制到粘贴板!
在 Kafka 集群上执行任何消费者操作前,您必须首先使用 consumers 端点创建消费者。消费者被称为 Kafka Bridge 消费者。
流程
在名为
bridge-quickstart-consumer-group的新消费者组中创建一个 Kafka Bridge 使用者:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "bridge-quickstart-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": false, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'-
消费者名为
bridge-quickstart-consumer,嵌入式数据格式被设置为json。 消费者不会自动向日志提交偏移,因为
enable.auto.commit设置为false。稍后您将在此快速入门中手动提交偏移量。注意如果您没有在请求正文中指定消费者名称,则 Kafka Bridge 会生成一个随机消费者名称。
如果请求成功,Kafka Bridge 会返回响应正文中的使用者 ID (
instance_id)和基本 URL (base_uri),以及200(OK)代码。响应示例
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
-
消费者名为
-
复制基础 URL (
base_uri),以便在这个快速入门中的其他消费者操作中使用。
接下来要做什么
现在,您已创建了 Kafka Bridge 消费者,您可以将其订阅到主题。
其他资源
- API 参考文档中的 POST /consumers/{groupid}。
12.2.4. 将 Kafka Bridge 消费者订阅到主题 复制链接链接已复制到粘贴板!
使用 订阅 端点将 Kafka Bridge 使用者订阅到一个或多个主题。订阅后,消费者开始接收生成到该主题的所有消息。
流程
将消费者订阅之前创建的
bridge-quickstart-topic主题,在 Produc ing 信息到主题和分区 :curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "bridge-quickstart-topic" ] }'topics数组可以包含单个主题(如上所示)或多个主题。如果要将消费者订阅与正则表达式匹配的多个主题,您可以使用topic_pattern字符串而不是topics数组。如果请求成功,Kafka Bridge 只会返回一个
204 No Content代码。
接下来要做什么
将 Kafka Bridge 消费者订阅到主题后,您可以从 消费者检索信息。
其他资源
12.2.5. 从 Kafka Bridge consumer 检索最新的信息 复制链接链接已复制到粘贴板!
通过从 records 端点请求数据,从 Kafka Bridge 使用者检索最新的消息。在生产环境中,HTTP 客户端可以重复调用此端点(在循环中)。
流程
- 为 Kafka Bridge consumer 生成额外消息,如 Produc ing 消息到主题和分区 中所述。
向
记录端点提交GET请求:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'创建并订阅 Kafka Bridge 消费者后,第一个 GET 请求将返回空响应,因为轮询操作会触发重新平衡过程来分配分区。
重复步骤 2,以从 Kafka Bridge 使用者检索信息。
Kafka Bridge 返回一条信息 数组,它带有 200 (OK)代码,它包括主题名称、键、值、分区和偏移来来包括
200(OK)代码。默认情况下,消息从最新的偏移中检索。HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...注意如果返回空响应,请生成更多记录到消费者,如 Produc ing 消息到主题和分区 中所述,然后尝试再次检索消息。
接下来要做什么
从 Kafka Bridge 消费者检索信息后,尝试向 日志提交偏移。
其他资源
12.2.6. 向日志提交偏移量 复制链接链接已复制到粘贴板!
使用 偏移 端点手动向日志提交 Kafka Bridge 消费者接收的所有消息。这是必要的,因为在 创建 Kafka Bridge 消费者 中的 Kafka Bridge 消费者 被配置,其 enable.auto.commit 设置为 false。
流程
将偏移提交到
bridge-quickstart-consumer的日志:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets因为没有提交请求正文,所以针对消费者收到的所有记录提交偏移。另外,请求正文可以包含数组(OffsetCommitSeekList),用于指定您要为其提交偏移的主题和分区。
如果请求成功,Kafka Bridge 只会返回一个
204 No Content代码。
接下来要做什么
向日志提交偏移后,尝试查找查找 偏移的端点。
其他资源
12.2.7. 寻找分区偏移量 复制链接链接已复制到粘贴板!
使用 位置 端点配置 Kafka Bridge 消费者,从特定偏移中检索分区的消息,然后从最新的偏移中检索。这在 Apache Kafka 中被称为 seek 操作。
流程
请参阅
quickstart-bridge-topic主题的分区 0 的特定偏移:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "offsets": [ { "topic": "bridge-quickstart-topic", "partition": 0, "offset": 2 } ] }'如果请求成功,Kafka Bridge 只会返回一个
204 No Content代码。向
记录端点提交GET请求:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'Kafka Bridge 从您看到的偏移返回信息。
通过查找同一分区的最后一个偏移来恢复默认消息检索行为。这一次,使用 位置/结束 端点。
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "bridge-quickstart-topic", "partition": 0 } ] }'如果请求成功,Kafka Bridge 会返回另一个
204 No Content代码。
您还可以使用 positions/beginning 端点查找一个或多个分区的第一个偏移。
接下来要做什么
在这个快速入门中,您使用 AMQ Streams Kafka Bridge 在 Kafka 集群上执行几个常见操作。现在 ,您可以删除之前创建的 Kafka Bridge 使用者。
12.2.8. 删除 Kafka Bridge 使用者 复制链接链接已复制到粘贴板!
最后,删除您在此快速入门中使用的 Kafa Bridge 使用者。
流程
通过向 实例 端点发送
DELETE请求来删除 Kafka Bridge 使用者。curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer如果请求成功,Kafka Bridge 只会返回一个
204 No Content代码。
其他资源
- API 参考文档中 DELETE /consumers/{groupid}/instances/{name}。