使用 AMQ Streams Kafka Bridge
使用 AMQ Streams Kafka Bridge 连接到 Kafka 集群
摘要
使开源包含更多
红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息。
第 1 章 Kafka Bridge 概述
使用 AMQ Streams Kafka Bridge 向 Kafka 集群发出 HTTP 请求。
您可以使用 Kafka Bridge 将 HTTP 客户端应用程序与您的 Kafka 集群集成。
HTTP 客户端集成
1.1. 运行 Kafka Bridge
安装 AMQ Streams Kafka Bridge,以和您的 Kafka 集群在相同的环境中运行。
您可以下载 Kafka Bridge 安装工件并将其添加到主机机器中。要尝试使用本地环境中的 Kafka Bridge,请参阅 Kafka Bridge Quickstart。
如果在 OpenShift 中部署了 AMQ Streams,您可以使用 AMQ Streams Cluster Operator 将 Kafka Bridge 部署到 OpenShift 集群。您需要一个正在运行的 Kafka 集群,该集群由 Cluster Operator 在 OpenShift 命名空间中部署。您可以配置部署,以访问 OpenShift 集群外的 Kafka Bridge。
其他资源
- AMQ Streams 文档 描述了如何使用 AMQ Streams 部署 Kafka Bridge
1.2. Kafka Bridge 接口
Kafka Bridge 提供了一个 RESTful 接口,允许基于 HTTP 的客户端与 Kafka 集群交互。 它提供与 AMQ Streams 的 Web API 连接的优点,而无需客户端应用程序来解释 Kafka 协议。
API 有两个主要资源( 使用者
和主题
),它们可以通过端点公开并可访问,以便与 Kafka 集群中的消费者和制作者交互。这些资源仅与 Kafka Bridge 相关,而不与与 Kafka 连接的使用者和制作者相关。
1.2.1. HTTP 请求
Kafka Bridge 支持到 Kafka 集群的 HTTP 请求,其方法如下:
- 发送消息到主题。
- 从主题检索消息。
- 检索主题的分区列表。
- 创建和删除消费者。
- 将消费者订阅到主题,以便它们开始从这些主题收到信息。
- 检索消费者订阅的主题列表。
- 取消订阅消费者的主题。
- 为消费者分配分区。
- 提交使用者偏移列表。
- 寻求分区,以便消费者开始从第一或最后一个偏移位置收到信息,或给定的偏移位置。
该方法提供 JSON 响应和 HTTP 响应代码错误处理。可以使用 JSON 或二进制格式发送消息。
客户端可以在不需要使用原生 Kafka 协议的情况下生成和使用消息。
1.3. Kafka Bridge OpenAPI 规格
Kafka Bridge API 使用 OpenAPI 规格(OAS)。OAS 为描述和实施 HTTP API 提供了标准框架。
Kafka Bridge OpenAPI 规格采用 JSON 格式。您可以在 Kafka Bridge 源下载文件的 src/main/resources/
文件夹中找到 OpenAPI JSON 文件。下载文件可从 客户门户网站 获得。
您还可以使用 GET /openapi
方法 以 JSON 格式检索 OpenAPI v2 规格。
其他资源
1.4. 保护到 Kafka 集群的连接
您可以在 Kafka Bridge 和 Kafka 集群间配置以下内容:
- 基于 TLS 或 SASL 的身份验证
- TLS 加密的连接
您可以通过 其属性文件 配置 Kafka Bridge 进行身份验证。
您还可以使用 Kafka 代理中的 ACL 来限制可以使用 Kafka Bridge消耗和生成的主题。
其他资源
1.5. 保护 Kafka Bridge HTTP 接口
Kafka Bridge 不支持 HTTP 客户端和 Kafka Bridge 间的身份验证和加密。从客户端发送到 Kafka Bridge 的请求在没有身份验证或加密的情况下发送。请求必须使用 HTTP 而不是 HTTPS。
您可以将 Kafka Bridge 与以下工具合并:
- 定义哪些 pod 可以访问 Kafka Bridge 的网络策略和防火墙
- 反向代理(如 OAuth 2.0)
- API 网关
1.6. 对 Kafka Bridge 的请求
指定数据格式和 HTTP 标头,以确保将有效请求提交到 Kafka Bridge。
1.6.1. 内容类型标头
API 请求和响应正文始终以 JSON 格式编码。
在执行消费者操作时,如果出现非空正文,
POST
请求必须提供以下Content-Type
标头:Content-Type: application/vnd.kafka.v2+json
在执行制作者操作时,
POST
请求必须提供Content-Type
标头来指定所生成消息的 嵌入式数据格式。这可以是json
或二进制
。嵌入式数据格式 content-Type 标头 JSON
content-Type: application/vnd.kafka.json.v2+json
二进制
content-Type: application/vnd.kafka.binary.v2+json
嵌入式数据格式按使用者设置,如下一节中所述。
如果 POST
请求有一个空的正文,则不能 设置 Content-Type
。空正文可用于创建具有默认值的使用者。
1.6.2. 嵌入式数据格式
内嵌的数据格式是通过 HTTP 传输的 Kafka 消息的格式,从制作者到使用 Kafka Bridge 的消费者。支持两种嵌入式数据格式:JSON 和二进制。
在使用 /consumers/groupid
端点创建消费者时,POST
请求正文必须指定 JSON 或二进制的嵌入式数据格式。这在 格式字段中指定
,例如:
{
"name": "my-consumer",
"format": "binary", 1
# ...
}
- 1
- 二进制嵌入式数据格式。
在创建消费者时指定的嵌入式数据格式必须与其要使用的 Kafka 消息的数据格式匹配。
如果您选择指定二进制嵌入式数据格式,则后续生产者请求必须以 Base64 编码的字符串形式提供二进制数据。例如,在使用 /topics/topicname
端点发送消息时,records.value
必须采用 Base64 编码:
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
制作者请求还必须提供一个与嵌入式数据格式对应的 Content-Type
标头,如 Content-Type: application/vnd.kafka.binary.v2+json
。
1.6.3. 消息格式
使用 /topics
端点发送消息时,您可以在请求正文中输入消息有效负载,在 records
参数中输入。
records
参数可以包含任何这些可选字段:
-
Message
headers
-
Message
key
-
消息
值
-
目的地
分区
到 /topics 的 POST
请求示例
curl -X POST \
http://localhost:8080/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "my-key",
"value": "sales-lead-0001"
"partition": 2
"headers": [
{
"key": "key1",
"value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
}
]
},
]
}'
- 1
- 二进制格式的标头值,并编码为 Base64。
1.6.4. 接受标头
创建消费者后,所有后续 GET 请求都必须以以下格式提供 Accept
标头:
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
EMBEDDED-DATA-FORMAT
为 json
或 二进制
。
例如,在使用 JSON 的嵌入式数据格式检索已订阅的消费者的记录时,包括以下 Accept 标头:
Accept: application/vnd.kafka.json.v2+json
1.7. CORS
通过跨资源共享(CORS),您可以指定允许的方法和用于访问 Kafka Bridge HTTP 配置中的 Kafka 集群的来源 URL。
Kafka Bridge 的 CORS 配置示例
# ... http.cors.enabled=true http.cors.allowedOrigins=https://strimzi.io http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
CORS 允许在不同的域中的源 源之间 简单 和预先 动态的请求。
简单请求适用于使用 GET
、HEAD
和POST
方法的标准请求。
预先修改的请求将 HTTP OPTIONS 请求作为初始检查,用于检查实际请求是安全的发送。确认时会发送实际请求。preflight 请求适合需要更大保护的方法,如 PUT
和 DELETE
,并使用非标准标头。
所有请求都需要在其标头中有一个 origin 值,即 HTTP 请求的来源。
1.7.1. 简单请求
例如:这个简单请求标头将原始卷指定为 https://strimzi.io
。
Origin: https://strimzi.io
向请求中添加标头信息。
curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \
-H 'Origin: https://strimzi.io'\
-H 'content-type: application/vnd.kafka.v2+json'
在 Kafka Bridge 的响应中,会返回 Access-Control-Allow-Origin
标头。
HTTP/1.1 200 OK
Access-Control-Allow-Origin: * 1
- 1
- 返回星号(
*
)显示任何域可以访问的资源。
1.7.2. Preflighted 请求
使用 OPTIONS
方法将初始 preflight 请求发送到 Kafka Bridge。HTTP OPTIONS 请求会发送标头信息,以检查 Kafka Bridge 是否允许实际请求。
此处显示 POST
请求有效的 preflight 请求检查 https://strimzi.io
。
OPTIONS /my-group/instances/my-user/subscription HTTP/1.1 Origin: https://strimzi.io Access-Control-Request-Method: POST 1 Access-Control-Request-Headers: Content-Type 2
OPTIONS
添加到 preflight 请求的标题信息中。
curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \ -H 'Access-Control-Request-Method: POST' \ -H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridge 响应初始请求,以确认请求将被接受。响应标头返回允许的源、方法和标头。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH Access-Control-Allow-Headers: content-type
如果 origin 或 方法被拒绝,则返回错误消息。
实际请求不需要 Access-Control-Request-Method
标头,因为它在 preflight 请求中确认,但它需要 origin 标头。
curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \
-H 'Origin: https://strimzi.io' \
-H 'content-type: application/vnd.kafka.v2+json'
该响应显示允许原始 URL。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io
其他资源
1.8. 为 Kafka Bridge 配置日志记录器
您可以为 Kafka Bridge OpenAPI 规格定义的每个操作设置不同的日志级别。
每个操作都有一个对应的 API 端点,网桥通过它接收来自 HTTP 客户端的请求。您可以在每个端点上更改日志级别,以生成有关传入和传出 HTTP 请求的更精细的日志信息。
日志记录器在 log4j.properties
文件中定义,具有以下用于健康和 就绪端点的默认配置
:
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
所有其他操作的日志级别默认设置为 INFO
。日志记录器的格式如下:
log4j.logger.http.openapi.operation.<operation_id>
其中 <operation_id
> 是特定操作的标识符。
OpenAPI 规格定义的操作列表
-
createConsumer
-
deleteConsumer
-
subscribe
-
取消订阅
-
Poll
-
分配
-
commit
-
send
-
sendToPartition
-
seekToBeginning
-
seekToEnd
-
寻求
-
健康
-
ready
-
OpenAPI
第 2 章 Kafka Bridge Quickstart
使用此快速入门尝试本地开发环境中的 AMQ Streams Kafka Bridge。
您将学习如何进行以下操作:
- 生成 Kafka 集群中主题和分区的信息
- 创建 Kafka Bridge consumer
- 执行基本消费者操作,如订阅消费者到主题并检索您生成的消息
在此快速入门中,HTTP 请求被格式化为 curl 命令,您可复制并粘贴到终端中。
请确定您具有先决条件,然后按照本章中提供的信息进行操作。
在此快速入门中,您将以 JSON 格式生成和使用消息。
快速入门先决条件
- Kafka 集群在主机机器上运行。
2.1. 下载 Kafka Bridge 归档
AMQ Streams Kafka Bridge 的 zip 发行版本可下载。
流程
- 从 客户门户网站 下载 AMQ Streams Kafka Bridge 归档的最新版本。
2.2. 配置 Kafka Bridge 属性
这个步骤描述了如何配置 AMQ Streams Kafka Bridge 使用的 Kafka 和 HTTP 连接属性。
您可以使用针对 Kafka 相关属性的适当前缀,将 Kafka Bridge 配置为任何其他 Kafka 客户端。
-
Kafka.。
对于适用于生产者和消费者的常规配置,如服务器连接和安全性。 -
Kafka.consumer
.对于只传递给消费者的消费者特定配置。 -
Kafka.producer。
用于传递给制作者的特定生产配置。
除了启用对 Kafka 集群的 HTTP 访问外,HTTP 属性提供通过 Cross-Origin Resource Sharing(CORS)启用和定义 Kafka Bridge 的访问控制的功能。CORS 是一种 HTTP 机制,它允许浏览器从多个来源访问所选资源。要配置 CORS,您可以定义允许的资源来源列表和 HTTP 方法来访问它们。请求中的其他 HTTP 标头描述了允许访问 Kafka 集群的 CORS 源。
先决条件
流程
编辑 AMQ Streams Kafka Bridge 安装存档提供的
application.properties
文件。使用 属性文件来指定 Kafka 和 HTTP 相关属性,以及启用分布式追踪。
配置标准 Kafka 相关属性,包括特定于 Kafka 用户和制作者的属性。
使用:
-
Kafka.bootstrap.servers
用于定义到 Kafka 集群的主机/端口连接 -
Kafka.producer.acks
为 HTTP 客户端提供确认信息 Kafka
.consumer.auto.offset.reset
以确定如何在 Kafka 中管理重置偏移有关配置 Kafka 属性的更多信息,请参阅 Apache Kafka 网站
-
配置 HTTP 相关属性,以启用对 Kafka 集群的 HTTP 访问。
例如:
bridge.id=my-bridge http.enabled=true http.host=0.0.0.0 http.port=8080 1 http.cors.enabled=true 2 http.cors.allowedOrigins=https://strimzi.io 3 http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 4
2.3. 安装 Kafka Bridge
按照以下步骤安装 AMQ Streams Kafka Bridge。
流程
- 如果您还没有这样做,请将 Kafka Bridge 安装存档解压缩到任何目录。
使用配置属性作为参数运行 Kafka Bridge 脚本:
例如:
./bin/kafka_bridge_run.sh --config-file=<path>/configfile.properties
检查日志中的安装是否成功。
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
2.4. 向主题和分区生成信息
使用 Kafka Bridge 使用主题端点,以 JSON 格式的信息向 Kafka 主题生成信息。
您可以使用主题端点将消息生成到 JSON 格式的 主题。您可以为请求正文中的消息指定目的地分区。分区 端点提供了一种替代方法,可为所有消息指定一个目标分区作为路径参数。
在这一过程中,会将消息生成到名为 bridge-quickstart-topic
的主题。
先决条件
Kafka 集群有一个具有三个分区的主题。
您可以使用
kafka-topics.sh
实用程序来创建主题。创建三个分区的主题示例
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
验证该主题是否已创建
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
如果在 OpenShift 中部署了 AMQ Streams,您可以使用 KafkaTopic
自定义资源创建一个主题。
流程
使用 Kafka Bridge,为您创建的主题生成三条信息:
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'
-
sales-lead-0001
根据密钥的哈希发送到分区。 -
sales-lead-0002
直接发送至分区 2。 -
sales-lead-0003
使用 round-robin 方法发送到bridge-quickstart-topic
主题中的一个分区。
-
如果请求成功,Kafka Bridge 会返回一个
偏移
数组,以及一个200
代码和一个content-type
标头application/vnd.kafka.v2+json
。对于每个消息,偏移
数组描述:- 发送到消息的分区
分区的当前消息偏移
响应示例
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
其他主题请求
发出其他 curl 请求以查找有关主题和分区的信息。
- 列出主题
curl -X GET \ http://localhost:8080/topics
响应示例
[ "__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog", "bridge-quickstart-topic", "my-topic" ]
- 获取主题配置和分区详情
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic
响应示例
{ "name": "bridge-quickstart-topic", "configs": { "compression.type": "producer", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "message.downconversion.enable": "true", "segment.jitter.ms": "0", "cleanup.policy": "delete", "flush.ms": "9223372036854775807", "follower.replication.throttled.replicas": "", "segment.bytes": "1073741824", "retention.ms": "604800000", "flush.messages": "9223372036854775807", "message.format.version": "2.8-IV1", "max.compaction.lag.ms": "9223372036854775807", "file.delete.delay.ms": "60000", "max.message.bytes": "1048588", "min.compaction.lag.ms": "0", "message.timestamp.type": "CreateTime", "preallocate": "false", "index.interval.bytes": "4096", "min.cleanable.dirty.ratio": "0.5", "unclean.leader.election.enable": "false", "retention.bytes": "-1", "delete.retention.ms": "86400000", "segment.ms": "604800000", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.index.bytes": "10485760" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ] }
- 列出特定主题的分区
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions
响应示例
[ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ]
- 列出特定主题分区的详情
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
响应示例
{ "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }
- 列出特定主题分区的偏移
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
响应示例
{ "beginning_offset": 0, "end_offset": 1 }
下一步做什么
在为主题和分区生成消息后,创建一个 Kafka Bridge 消费者。
2.5. 创建 Kafka Bridge consumer
在 Kafka 集群中执行任何消费者操作前,您必须首先使用使用者端点创建一个 消费者。使用者被称为 Kafka Bridge consumer。
流程
在名为 bridge-quickstart-consumer-group 的新消费者组中,创建一个名为
bridge-quickstart-consumer-group
的 Kafka Bridge consumer:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "bridge-quickstart-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": false, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'
-
使用者名为
bridge-quickstart-consumer
,嵌入式数据格式设置为json
。 - 定义一些基本配置设置。
使用者不会自动向日志提交偏移,因为
enable.auto.commit
设置为false
。在此快速入门中,您将手动提交误差。如果请求成功,Kafka Bridge 会返回响应正文中的消费者 ID(
instance_id
)和基本 URL(base_uri
),以及200
代码。响应示例
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
-
使用者名为
-
复制用于此快速入门的其他消费者操作的基本 URL(
base_uri
)。
下一步做什么
现在,您已创建了一个 Kafka Bridge 消费者,您可将其订阅到主题。
2.6. 为主题订阅 Kafka Bridge consumer
创建 Kafka Bridge 消费者后,使用订阅端点将其 订阅 到一个或多个主题。订阅后,消费者开始接收生成到该主题的所有信息。
流程
将使用者订阅到您之前创建的
网桥-quickstart-topic
主题,并放入 主题和分区中 :curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "bridge-quickstart-topic" ] }'
主题
数组可以包含单个主题(如此处所示)或多个主题。如果要将消费者订阅多个与正则表达式匹配的主题,您可以使用topic_pattern
字符串而不是主题
数组。如果请求成功,则 Kafka Bridge 只返回一个
204
(No Content)代码。
下一步做什么
在向主题订阅 Kafka Bridge consumer 后,您可以从 消费者检索信息。
2.7. 从 Kafka Bridge consumer 检索最新的信息
通过从 记录 端点请求数据,从 Kafka Bridge consumer 检索最新的信息。在生产环境中,HTTP 客户端可以重复调用此端点(在循环中)。
流程
- 为 Kafka Bridge consumer 生成额外的信息,如 将消息设置为主题和分区 中所述。
将
GET
请求提交到记录
端点:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
创建并订阅 Kafka Bridge 消费者后,第一个 GET 请求会返回空响应,因为轮询操作会启动重新平衡过程来分配分区。
重复步骤 2,以从 Kafka Bridge consumer 检索信息。
Kafka Bridge 在响应正文和
200
代码之间返回一系列消息为:username name、key、value、partition 和 offsetsHistoryLimit-jaxbin。默认情况下,消息从最新的偏移检索。HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...
注意如果返回了空响应,为消费者生成更多记录,如将消息 设置为主题和分区,然后尝试再次检索消息。
下一步做什么
从 Kafka Bridge consumer 检索信息后,尝试向 日志提交偏移。
2.8. 对日志提交偏移
使用 偏移 端点手动将偏移地提交到 Kafka Bridge 消费者接收的所有消息的日志中。这是必要的,因为您之前创建的 Kafka Bridge consumer 在创建 Kafka Bridge consumer 中,使用 enable.auto.commit
设置配置为 false
。
流程
将偏移提交到
bridge-quickstart-consumer
的日志:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
因为没有提交请求正文,所以会提交偏移用于消费者接收的所有记录。另外,请求正文可以包含数组(OffsetCommitSeekList),用于指定您要为其提交偏移的主题和分区。
如果请求成功,则 Kafka Bridge 只返回
204
代码。
下一步做什么
在为日志提交误差后,请尝试查找查找 偏移的端点。
2.9. 为分区寻求偏移
使用 位置 端点配置 Kafka Bridge consumer,以从特定的偏移检索分区的信息,然后从最新的偏移中检索分区的信息。这在 Apache Kafka 中被称为 寻求操作。
流程
为
quickstart-bridge-topic
主题的分区 0 寻求特定的偏移:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "offsets": [ { "topic": "bridge-quickstart-topic", "partition": 0, "offset": 2 } ] }'
如果请求成功,则 Kafka Bridge 只返回
204
代码。将
GET
请求提交到记录
端点:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Kafka Bridge 返回您想到的偏移信息。
通过寻找同一分区的最后偏移来恢复默认信息检索行为。此时使用 positions/end 端点。
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "bridge-quickstart-topic", "partition": 0 } ] }'
如果请求成功,Kafka Bridge 会返回另一个
204
代码。
您还可以使用 位置/更好的端点来 寻找一个或多个分区的第一个偏移量。
下一步做什么
在本快速入门中,您已使用 AMQ Streams Kafka Bridge 在 Kafka 集群上执行几个常见操作。现在 ,您可以删除之前创建的 Kafka Bridge consumer。
2.10. 删除 Kafka Bridge consumer
删除您整个快速入门中使用的 Kafka Bridge consumer。
流程
通过向 实例 端点发送
DELETE
请求来删除 Kafka Bridge consumer。curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
如果请求成功,Kafka Bridge 会返回
204
代码。
第 3 章 AMQ Streams Kafka Bridge API 参考
3.1. 概述
AMQ Streams Kafka Bridge 提供了一个 REST API,用于将基于 HTTP 的客户端应用程序与 Kafka 集群集成。您可以使用 API 来创建和管理消费者,并通过 HTTP 发送和接收记录,而不是通过原生 Kafka 协议。
3.1.1. 版本信息
版本 : 0.1.0
3.1.2. Tags
- 消费者:消费者操作在 Kafka 集群中创建消费者,并执行常见的操作,如订阅主题、检索已处理的记录以及提交偏移。
- 制作者:生产程序操作,以将记录发送到指定的主题或主题分区。
- 寻求 : Seek 操作,使使用者能够开始从给定的偏移位置接收消息。
- 主题:要将消息发送到指定的主题或主题分区(包括请求中的消息键)的主题操作。您还可以检索主题和主题元数据。
3.1.3. 使用
-
application/json
3.1.4. produces
-
application/json
3.2. 定义
3.2.1. AssignedTopicPartitions
类型 : < string, < integer(int32)> 数组 > map
3.2.2. BridgeInfo
有关 Kafka Bridge 实例的信息。
名称 | 模式 |
---|---|
bridge_version | 字符串 |
3.2.3. 消费者
名称 | 描述 | 模式 |
---|---|---|
auto.offset.reset |
重置消费者的偏移位置。如果设置为 | 字符串 |
consumer.request.timeout.ms |
为使用者设置请求请求的最大时间(以毫秒为单位)。如果在没有响应的情况下达到超时时间,则会返回错误。默认为 | 整数 |
enable.auto.commit |
如果设置为 | 布尔值 |
fetch.min.bytes |
为消费者设置接收的最小数据量(以字节为单位)。代理会等待发送数据超过这个数量。默认值为 | 整数 |
格式 |
consumer 的允许消息格式,可以是 | 字符串 |
isolation.level |
如果设置为 | 字符串 |
name | consumer 实例的唯一名称。该名称在 consumer 组的范围内是唯一的。名称在 URL 中使用。如果没有指定名称,则会分配一个随机生成的名称。 | 字符串 |
3.2.4. ConsumerRecord
名称 | 模式 |
---|---|
标头 | |
key | 字符串 |
offset | integer (int64) |
分区 | integer (int32) |
主题 | 字符串 |
value | 字符串 |
3.2.5. ConsumerRecordList
类型 : < ConsumerRecord > 数组
3.2.6. CreatedConsumer
名称 | 描述 | 模式 |
---|---|---|
base_uri | 用于构建用于针对此消费者实例的后续请求的 URI 的基础 URI。 | 字符串 |
instance_id | 组中消费者实例的唯一 ID。 | 字符串 |
3.2.7. Error
名称 | 模式 |
---|---|
error_code | integer (int32) |
message | 字符串 |
3.2.8. KafkaHeader
名称 | 描述 | 模式 |
---|---|---|
key | 字符串 | |
|
标头值采用二进制格式, base64 编码的 | 字符串(字节) |
3.2.9. KafkaHeaderList
键入 : < KafkaHeader > 数组
3.2.10. OffsetCommitSeek
名称 | 模式 |
---|---|
偏移 | integer (int64) |
需要 | integer (int32) |
需要 | 字符串 |
3.2.11. OffsetCommitSeekList
名称 | 模式 |
---|---|
偏移 | < OffsetCommitSeek > array |
3.2.12. OffsetRecordSent
名称 | 模式 |
---|---|
offset | integer (int64) |
分区 | integer (int32) |
3.2.13. OffsetRecordSentList
名称 | 模式 |
---|---|
偏移 | < OffsetRecordSent > array |
3.2.14. OffsetsSummary
名称 | 模式 |
---|---|
beginning_offset | integer (int64) |
end_offset | integer (int64) |
3.2.15. 分区
名称 | 模式 |
---|---|
分区 | integer (int32) |
主题 | 字符串 |
3.2.16. PartitionMetadata
名称 | 模式 |
---|---|
领导 | integer (int32) |
分区 | integer (int32) |
replicas | < ; replica > 数组 |
3.2.17. 分区
名称 | 模式 |
---|---|
分区 | < ; partition > 数组 |
3.2.18. ProducerRecord
名称 | 模式 |
---|---|
标头 | |
分区 | integer (int32) |
3.2.19. ProducerRecordList
名称 | 模式 |
---|---|
记录 | < ProducerRecord > 数组 |
3.2.20. ProducerRecordToPartition
键入 : 对象
3.2.21. ProducerRecordToPartitionList
名称 | 模式 |
---|---|
记录 | < ProducerRecordToPartition > 数组 |
3.2.22. replica
名称 | 模式 |
---|---|
代理 | integer (int32) |
in_sync | 布尔值 |
领导 | 布尔值 |
3.2.23. SubscribedTopicList
名称 | 模式 |
---|---|
分区 | < AssignedTopicPartitions > 数组 |
主题 |
3.2.24. TopicMetadata
名称 | 描述 | 模式 |
---|---|---|
配置 | 每个主题配置覆盖 | < string, string > map |
name | 主题的名称 | 字符串 |
分区 | < PartitionMetadata > 数组 |
3.2.25. 主题
名称 | 描述 | 模式 |
---|---|---|
topic_pattern | 匹配多个主题的正则表达式主题模式 | 字符串 |
主题 | < string > 数字 |
3.3. 路径
3.3.1. GET /
3.3.1.1. 描述
检索有关 Kafka Bridge 实例的信息,采用 JSON 格式。
3.3.1.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 有关 Kafka Bridge 实例的信息。 |
3.3.1.3. produces
-
application/json
3.3.1.4. HTTP 响应示例
3.3.1.4.1. 响应 200
{ "bridge_version" : "0.16.0" }
3.3.2. POST /consumers/{groupid}
3.3.2.1. 描述
在给定的消费者组中创建一个消费者实例。您可以选择指定消费者名称和支持的配置选项。它会返回一个基础 URI,它必须用于构建针对此消费者实例的后续请求的 URL。
3.3.2.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 在其中创建消费者的消费者组的 ID。 | 字符串 |
Body |
body | 消费者的名称和配置。该名称在 consumer 组的范围内是唯一的。如果没有指定名称,则会分配一个随机生成的名称。所有参数都是可选的。以下示例中显示了支持的配置选项。 |
3.3.2.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 消费者创建成功。 | |
409 | Kafka Bridge 中已存在具有指定名称的消费者实例。 | |
422 | 一个或多个消费者配置选项具有无效的值。 |
3.3.2.4. 使用
-
application/vnd.kafka.v2+json
3.3.2.5. produces
-
application/vnd.kafka.v2+json
3.3.2.6. Tags
- 消费者
3.3.2.7. HTTP 请求示例
3.3.2.7.1. 请求正文
{ "name" : "consumer1", "format" : "binary", "auto.offset.reset" : "earliest", "enable.auto.commit" : false, "fetch.min.bytes" : 512, "consumer.request.timeout.ms" : 30000, "isolation.level" : "read_committed" }
3.3.2.8. HTTP 响应示例
3.3.2.8.1. 响应 200
{ "instance_id" : "consumer1", "base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1" }
3.3.2.8.2. 响应 409
{ "error_code" : 409, "message" : "A consumer instance with the specified name already exists in the Kafka Bridge." }
3.3.2.8.3. 响应 422
{ "error_code" : 422, "message" : "One or more consumer configuration options have invalid values." }
3.3.3. DELETE /consumers/{groupid}/instances/{name}
3.3.3.1. 描述
删除指定的消费者实例。对此操作 MUST 的请求使用从 POST
请求到 /consumers/{groupid}
的响应中返回的基本 URL(包括主机和端口),用于创建这个消费者。
3.3.3.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 要删除的消费者的名称。 | 字符串 |
3.3.3.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 消费者成功删除。 | 无内容 |
404 | 未找到指定的消费者实例。 |
3.3.3.4. 使用
-
application/vnd.kafka.v2+json
3.3.3.5. produces
-
application/vnd.kafka.v2+json
3.3.3.6. Tags
- 消费者
3.3.3.7. HTTP 响应示例
3.3.3.7.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.4. POST /consumers/{groupid}/instances/{name}/assignments
3.3.4.1. 描述
为消费者分配一个或多个主题分区。
3.3.4.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 将主题分区分配到的使用者的名称。 | 字符串 |
Body |
body | 要分配给消费者的主题分区列表。 |
3.3.4.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 成功分配的分区。 | 无内容 |
404 | 未找到指定的消费者实例。 | |
409 | 主题、分区和模式的订阅是互斥的。 |
3.3.4.4. 使用
-
application/vnd.kafka.v2+json
3.3.4.5. produces
-
application/vnd.kafka.v2+json
3.3.4.6. Tags
- 消费者
3.3.4.7. HTTP 请求示例
3.3.4.7.1. 请求正文
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
3.3.4.8. HTTP 响应示例
3.3.4.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.4.8.2. 响应 409
{ "error_code" : 409, "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive." }
3.3.5. POST /consumers/{groupid}/instances/{name}/offsets
3.3.5.1. 描述
提交使用者偏移列表。要提交消费者获取的所有记录的偏移,请将请求正文留空。
3.3.5.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | consumer 的名称。 | 字符串 |
Body |
body | 提交到使用者偏移提交日志的使用者偏移列表。您可以指定一个或多个主题分区以提交偏移量。 |
3.3.5.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 提交成功。 | 无内容 |
404 | 未找到指定的消费者实例。 |
3.3.5.4. 使用
-
application/vnd.kafka.v2+json
3.3.5.5. produces
-
application/vnd.kafka.v2+json
3.3.5.6. Tags
- 消费者
3.3.5.7. HTTP 请求示例
3.3.5.7.1. 请求正文
{ "offsets" : [ { "topic" : "topic", "partition" : 0, "offset" : 15 }, { "topic" : "topic", "partition" : 1, "offset" : 42 } ] }
3.3.5.8. HTTP 响应示例
3.3.5.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.6. POST /consumers/{groupid}/instances/{name}/positions
3.3.6.1. 描述
配置订阅的使用者,使其在下次从给定主题分区获取一组记录时从特定的偏移中获取偏移。这会覆盖消费者的默认获取行为。您可以指定一个或多个主题分区。
3.3.6.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅消费者的名称。 | 字符串 |
Body |
body | 订阅消费者下一次获取记录的分区偏移列表。 |
3.3.6.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 寻道成功。 | 无内容 |
404 | 未找到指定的消费者实例,或者指定的消费者实例没有分配指定分区之一。 |
3.3.6.4. 使用
-
application/vnd.kafka.v2+json
3.3.6.5. produces
-
application/vnd.kafka.v2+json
3.3.6.6. Tags
- 消费者
- 寻求
3.3.6.7. HTTP 请求示例
3.3.6.7.1. 请求正文
{ "offsets" : [ { "topic" : "topic", "partition" : 0, "offset" : 15 }, { "topic" : "topic", "partition" : 1, "offset" : 42 } ] }
3.3.6.8. HTTP 响应示例
3.3.6.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.7. POST /consumers/{groupid}/instances/{name}/positions/beginning
3.3.7.1. 描述
将订阅的消费者配置为寻道(随后读取)一个或多个给定主题分区中的第一个偏移。
3.3.7.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅消费者的名称。 | 字符串 |
Body |
body | 消费者订阅的主题分区列表。使用者将查找指定分区中的第一个偏移。 |
3.3.7.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 寻求成功执行的开始。 | 无内容 |
404 | 未找到指定的消费者实例,或者指定的消费者实例没有分配指定分区之一。 |
3.3.7.4. 使用
-
application/vnd.kafka.v2+json
3.3.7.5. produces
-
application/vnd.kafka.v2+json
3.3.7.6. Tags
- 消费者
- 寻求
3.3.7.7. HTTP 请求示例
3.3.7.7.1. 请求正文
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
3.3.7.8. HTTP 响应示例
3.3.7.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.8. POST /consumers/{groupid}/instances/{name}/positions/end
3.3.8.1. 描述
将订阅的消费者配置为寻道(随后从读取)偏移量在一个或多个给定主题分区的末尾。
3.3.8.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅消费者的名称。 | 字符串 |
Body |
body | 消费者订阅的主题分区列表。使用者将查找指定分区中的最后偏移量。 |
3.3.8.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 寻求成功执行的最终操作。 | 无内容 |
404 | 未找到指定的消费者实例,或者指定的消费者实例没有分配指定分区之一。 |
3.3.8.4. 使用
-
application/vnd.kafka.v2+json
3.3.8.5. produces
-
application/vnd.kafka.v2+json
3.3.8.6. Tags
- 消费者
- 寻求
3.3.8.7. HTTP 请求示例
3.3.8.7.1. 请求正文
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
3.3.8.8. HTTP 响应示例
3.3.8.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.9. GET /consumers/{groupid}/instances/{name}/records
3.3.9.1. 描述
检索订阅的消费者的记录,包括消息值、主题和分区。对此操作 MUST 的请求使用从 POST
请求到 /consumers/{groupid}
的响应中返回的基本 URL(包括主机和端口),用于创建这个消费者。
3.3.9.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 要从中检索记录的订阅消费者的名称。 | 字符串 |
查询 |
max_bytes | 响应中包含的未编码键和值的最大大小,以字节为单位。否则,会返回带有代码 422 的错误响应。 | 整数 |
查询 |
Timeout | HTTP Bridge 在超时请求前会花费检索记录的最长时间,以毫秒为单位。 | 整数 |
3.3.9.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 轮询请求已成功执行。 | |
404 | 未找到指定的消费者实例。 | |
406 |
消费者创建请求中使用的格式与此请求的 Accept 标头中的嵌入式格式不匹配,或者网桥从主题获取消息(非 JSON 编码)。 | |
422 | 响应超过消费者可接收的最大字节数 |
3.3.9.4. produces
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
-
application/vnd.kafka.v2+json
3.3.9.5. Tags
- 消费者
3.3.9.6. HTTP 响应示例
3.3.9.6.1. 响应 200
[ { "topic" : "topic", "key" : "key1", "value" : { "foo" : "bar" }, "partition" : 0, "offset" : 2 }, { "topic" : "topic", "key" : "key2", "value" : [ "foo2", "bar2" ], "partition" : 1, "offset" : 3 } ]
[ { "topic": "test", "key": "a2V5", "value": "Y29uZmx1ZW50", "partition": 1, "offset": 100, }, { "topic": "test", "key": "a2V5", "value": "a2Fma2E=", "partition": 2, "offset": 101, } ]
3.3.9.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.9.6.3. 响应 406
{ "error_code" : 406, "message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request." }
3.3.9.6.4. 响应 422
{ "error_code" : 422, "message" : "Response exceeds the maximum number of bytes the consumer can receive" }
3.3.10. POST /consumers/{groupid}/instances/{name}/subscription
3.3.10.1. 描述
订阅一个或多个主题的消费者。您可以描述消费者在列表( 主题
类型)或 topic_pattern
字段所订阅的主题。每个调用替换了订阅者的订阅。
3.3.10.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅主题的消费者的名称。 | 字符串 |
Body |
body | 消费者订阅的主题列表。 |
3.3.10.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 消费者订阅成功。 | 无内容 |
404 | 未找到指定的消费者实例。 | |
409 | 主题、分区和模式的订阅是互斥的。 | |
422 |
必须指定列表( |
3.3.10.4. 使用
-
application/vnd.kafka.v2+json
3.3.10.5. produces
-
application/vnd.kafka.v2+json
3.3.10.6. Tags
- 消费者
3.3.10.7. HTTP 请求示例
3.3.10.7.1. 请求正文
{ "topics" : [ "topic1", "topic2" ] }
3.3.10.8. HTTP 响应示例
3.3.10.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.10.8.2. 响应 409
{ "error_code" : 409, "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive." }
3.3.10.8.3. 响应 422
{ "error_code" : 422, "message" : "A list (of Topics type) or a topic_pattern must be specified." }
3.3.11. GET /consumers/{groupid}/instances/{name}/subscription
3.3.11.1. 描述
检索消费者订阅的主题列表。
3.3.11.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅消费者的名称。 | 字符串 |
3.3.11.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 订阅主题和分区的列表。 | |
404 | 未找到指定的消费者实例。 |
3.3.11.4. produces
-
application/vnd.kafka.v2+json
3.3.11.5. Tags
- 消费者
3.3.11.6. HTTP 响应示例
3.3.11.6.1. 响应 200
{ "topics" : [ "my-topic1", "my-topic2" ], "partitions" : [ { "my-topic1" : [ 1, 2, 3 ] }, { "my-topic2" : [ 1 ] } ] }
3.3.11.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.12. DELETE /consumers/{groupid}/instances/{name}/subscription
3.3.12.1. 描述
取消订阅来自所有主题的消费者。
3.3.12.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 要取消订阅主题的消费者的名称。 | 字符串 |
3.3.12.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 消费者成功退订。 | 无内容 |
404 | 未找到指定的消费者实例。 |
3.3.12.4. Tags
- 消费者
3.3.12.5. HTTP 响应示例
3.3.12.5.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
3.3.13. GET /healthy
3.3.13.1. 描述
检查网桥是否正在运行。这不一定意味着它已准备好接受请求。
3.3.13.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 网桥是健康的 | 无内容 |
3.3.14. GET /openapi
3.3.14.1. 描述
以 JSON 格式检索 OpenAPI v2 规格。
3.3.14.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 成功检索 JSON 格式的 OpenAPI v2 规格。 | 字符串 |
3.3.14.3. produces
-
application/json
3.3.15. GET /ready
3.3.15.1. 描述
检查网桥是否已准备就绪,并可以接受请求。
3.3.15.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 网桥已就绪 | 无内容 |
3.3.16. GET /topics
3.3.16.1. 描述
检索所有主题的列表。
3.3.16.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 主题列表。 | < string > 数字 |
3.3.16.3. produces
-
application/vnd.kafka.v2+json
3.3.16.4. Tags
- 主题
3.3.16.5. HTTP 响应示例
3.3.16.5.1. 响应 200
[ "topic1", "topic2" ]
3.3.17. POST /topics/{topicname}
3.3.17.1. 描述
将一个或多个记录发送到给定主题,可以选择指定分区、密钥或两者。
3.3.17.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
需要 主题名称 | 用于将记录发送到或检索元数据的主题名称。 | 字符串 |
查询 |
async | 在发送记录时是否立即返回,而不是等待元数据。如果指定,则不会返回任何偏移。默认为false。 | 布尔值 |
Body |
body |
3.3.17.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 成功发送的记录. | |
404 | 未找到指定的主题。 | |
422 | 记录列表无效。 |
3.3.17.4. 使用
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
3.3.17.5. produces
-
application/vnd.kafka.v2+json
3.3.17.6. Tags
- producer
- 主题
3.3.17.7. HTTP 请求示例
3.3.17.7.1. 请求正文
{ "records" : [ { "key" : "key1", "value" : "value1" }, { "value" : "value2", "partition" : 1 }, { "value" : "value3" } ] }
3.3.17.8. HTTP 响应示例
3.3.17.8.1. 响应 200
{ "offsets" : [ { "partition" : 2, "offset" : 0 }, { "partition" : 1, "offset" : 1 }, { "partition" : 2, "offset" : 2 } ] }
3.3.17.8.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic was not found." }
3.3.17.8.3. 响应 422
{ "error_code" : 422, "message" : "The record list contains invalid records." }
3.3.18. GET /topics/{topicname}
3.3.18.1. 描述
检索有关给定主题的元数据。
3.3.18.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
需要 主题名称 | 用于将记录发送到或检索元数据的主题名称。 | 字符串 |
3.3.18.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 主题元数据 |
3.3.18.4. produces
-
application/vnd.kafka.v2+json
3.3.18.5. Tags
- 主题
3.3.18.6. HTTP 响应示例
3.3.18.6.1. 响应 200
{ "name" : "topic", "offset" : 2, "configs" : { "cleanup.policy" : "compact" }, "partitions" : [ { "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }, { "partition" : 2, "leader" : 2, "replicas" : [ { "broker" : 1, "leader" : false, "in_sync" : true }, { "broker" : 2, "leader" : true, "in_sync" : true } ] } ] }
3.3.19. GET /topics/{topicname}/partitions
3.3.19.1. 描述
检索主题的分区列表。
3.3.19.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
需要 主题名称 | 用于将记录发送到或检索元数据的主题名称。 | 字符串 |
3.3.19.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 分区列表 | < PartitionMetadata > 数组 |
404 | 未找到指定的主题。 |
3.3.19.4. produces
-
application/vnd.kafka.v2+json
3.3.19.5. Tags
- 主题
3.3.19.6. HTTP 响应示例
3.3.19.6.1. 响应 200
[ { "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }, { "partition" : 2, "leader" : 2, "replicas" : [ { "broker" : 1, "leader" : false, "in_sync" : true }, { "broker" : 2, "leader" : true, "in_sync" : true } ] } ]
3.3.19.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic was not found." }
3.3.20. POST /topics/{topicname}/partitions/{partitionid}
3.3.20.1. 描述
将一个或多个记录发送到给定的主题分区(可选)指定密钥。
3.3.20.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
需要 partitionid | 用于将记录发送到或检索元数据的分区 ID。 | 整数 |
路径 |
需要 主题名称 | 用于将记录发送到或检索元数据的主题名称。 | 字符串 |
Body |
body | 要发送到给定主题分区的记录列表,包括值(必需)和一个键(可选)。 |
3.3.20.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 成功发送的记录. | |
404 | 未找到指定的主题分区。 | |
422 | 记录无效。 |
3.3.20.4. 使用
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
3.3.20.5. produces
-
application/vnd.kafka.v2+json
3.3.20.6. Tags
- producer
- 主题
3.3.20.7. HTTP 请求示例
3.3.20.7.1. 请求正文
{ "records" : [ { "key" : "key1", "value" : "value1" }, { "value" : "value2" } ] }
3.3.20.8. HTTP 响应示例
3.3.20.8.1. 响应 200
{ "offsets" : [ { "partition" : 2, "offset" : 0 }, { "partition" : 1, "offset" : 1 }, { "partition" : 2, "offset" : 2 } ] }
3.3.20.8.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
3.3.20.8.3. 响应 422
{ "error_code" : 422, "message" : "The record is not valid." }
3.3.21. GET /topics/{topicname}/partitions/{partitionid}
3.3.21.1. 描述
检索主题分区的分区元数据。
3.3.21.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
需要 partitionid | 用于将记录发送到或检索元数据的分区 ID。 | 整数 |
路径 |
需要 主题名称 | 用于将记录发送到或检索元数据的主题名称。 | 字符串 |
3.3.21.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 分区元数据 | |
404 | 未找到指定的主题分区。 |
3.3.21.4. produces
-
application/vnd.kafka.v2+json
3.3.21.5. Tags
- 主题
3.3.21.6. HTTP 响应示例
3.3.21.6.1. 响应 200
{ "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }
3.3.21.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
3.3.22. GET /topics/{topicname}/partitions/{partitionid}/offsets
3.3.22.1. 描述
检索主题分区的偏移摘要。
3.3.22.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
需要 partitionid | 分区的 ID。 | 整数 |
路径 |
需要 主题名称 | 包含分区的主题的名称。 | 字符串 |
3.3.22.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 主题分区的偏移概述。 | |
404 | 未找到指定的主题分区。 |
3.3.22.4. produces
-
application/vnd.kafka.v2+json
3.3.22.5. Tags
- 主题
3.3.22.6. HTTP 响应示例
3.3.22.6.1. 响应 200
{ "beginning_offset" : 10, "end_offset" : 50 }
3.3.22.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
附录 A. 使用您的订阅
AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
访问您的帐户
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
激活订阅
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
下载 Zip 和 Tar 文件
要访问 zip 或 tar 文件,请使用客户门户网站查找要下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 在 INTEGRATION 和 AUTOMATION 类别中找到 Apache Kafka 条目的 AMQ Streams。
- 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
- 单击组件的 Download 链接。
使用 DNF 安装软件包
要安装软件包以及所有软件包的依赖软件包,请使用:
dnf install <package_name>
要从本地目录中安装之前下载的软件包,请使用:
dnf install <path_to_download_package>
修订 2022-10-08 21:37:51 +1000