2.4. 向主题和分区生成信息
使用 Kafka Bridge 使用主题端点,以 JSON 格式的信息向 Kafka 主题生成信息。
您可以使用主题端点将消息生成到 JSON 格式的 主题。您可以为请求正文中的消息指定目的地分区。分区 端点提供了一种替代方法,可为所有消息指定一个目标分区作为路径参数。
在这一过程中,会将消息生成到名为 bridge-quickstart-topic 的主题。
先决条件
Kafka 集群有一个具有三个分区的主题。
您可以使用
kafka-topics.sh实用程序来创建主题。创建三个分区的主题示例
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1验证该主题是否已创建
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
如果在 OpenShift 中部署了 AMQ Streams,您可以使用 KafkaTopic 自定义资源创建一个主题。
流程
使用 Kafka Bridge,为您创建的主题生成三条信息:
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'-
sales-lead-0001根据密钥的哈希发送到分区。 -
sales-lead-0002直接发送至分区 2。 -
sales-lead-0003使用 round-robin 方法发送到bridge-quickstart-topic主题中的一个分区。
-
如果请求成功,Kafka Bridge 会返回一个
偏移数组,以及一个200代码和一个content-type标头application/vnd.kafka.v2+json。对于每个消息,偏移数组描述:- 发送到消息的分区
分区的当前消息偏移
响应示例
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
其他主题请求
发出其他 curl 请求以查找有关主题和分区的信息。
- 列出主题
curl -X GET \ http://localhost:8080/topics响应示例
[ "__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog", "bridge-quickstart-topic", "my-topic" ]- 获取主题配置和分区详情
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic响应示例
{ "name": "bridge-quickstart-topic", "configs": { "compression.type": "producer", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "message.downconversion.enable": "true", "segment.jitter.ms": "0", "cleanup.policy": "delete", "flush.ms": "9223372036854775807", "follower.replication.throttled.replicas": "", "segment.bytes": "1073741824", "retention.ms": "604800000", "flush.messages": "9223372036854775807", "message.format.version": "2.8-IV1", "max.compaction.lag.ms": "9223372036854775807", "file.delete.delay.ms": "60000", "max.message.bytes": "1048588", "min.compaction.lag.ms": "0", "message.timestamp.type": "CreateTime", "preallocate": "false", "index.interval.bytes": "4096", "min.cleanable.dirty.ratio": "0.5", "unclean.leader.election.enable": "false", "retention.bytes": "-1", "delete.retention.ms": "86400000", "segment.ms": "604800000", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.index.bytes": "10485760" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ] }- 列出特定主题的分区
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions响应示例
[ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ]- 列出特定主题分区的详情
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0响应示例
{ "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }- 列出特定主题分区的偏移
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets响应示例
{ "beginning_offset": 0, "end_offset": 1 }
下一步做什么
在为主题和分区生成消息后,创建一个 Kafka Bridge 消费者。