使用 AMQ Streams Kafka Bridge
使用 AMQ Streams Kafka Bridge 与 Kafka 集群连接
摘要
使开源包含更多
红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息。
第 1 章 Kafka Bridge 概述
使用 AMQ Streams Kafka Bridge 向 Kafka 集群发出 HTTP 请求。
您可以使用 Kafka Bridge 将 HTTP 客户端应用程序与 Kafka 集群集成。
HTTP 客户端集成
1.1. 运行 Kafka Bridge
安装 AMQ Streams Kafka Bridge,以便在与 Kafka 集群相同的环境中运行。
您可以将 Kafka Bridge 安装工件下载到主机机器中。要在本地环境中尝试 Kafka Bridge,请参阅 Kafka Bridge Quickstart。
务必要注意,Kafka Bridge 的每个实例都维护自己的一组内存消费者(和订阅),它们代表 HTTP 客户端连接到 Kafka Broker。这意味着,每个 HTTP 客户端都必须维护同一 Kafka Bridge 实例的关联性,才能访问创建的任何订阅。另外,当 Kafka Bridge 的实例时,内存消费者和订阅将会丢失。如果 Kafka Bridge 重启,HTTP 客户端负责重新创建任何使用者和订阅。
1.1.1. 在 OpenShift 上运行 Kafka Bridge
如果在 OpenShift 上部署了 AMQ Streams,您可以使用 AMQ Streams Cluster Operator 将 Kafka Bridge 部署到 OpenShift 集群。配置和部署 Kafka Bridge 作为 KafkaBridge
资源。您需要一个正在运行的 Kafka 集群,它由 Cluster Operator 部署到 OpenShift 命名空间中。您可以配置部署来访问 OpenShift 集群外的 Kafka Bridge。
HTTP 客户端必须保持与 Kafka Bridge 相同的实例的关联性,才能访问它们创建的任何用户或订阅。因此,不建议每个 OpenShift Deployment 运行多个 Kafka Bridge 副本。如果 Kafka Bridge pod 重启(例如,因为 OpenShift 将工作负载重新定位到另一节点),则 HTTP 客户端必须重新创建任何消费者或订阅。
有关部署和配置 Kafka Bridge 作为 KafkaBridge
资源的详情,请参考 AMQ Streams 文档。
1.2. Kafka Bridge 接口
Kafka Bridge 提供了一个 RESTful 接口,它允许基于 HTTP 的客户端与 Kafka 集群交互。 它提供了与 AMQ Streams 的 Web API 连接的优点,而无需客户端应用程序来解释 Kafka 协议。
API 有两个主要资源 - 消费者(consumer)
和主题(topic)
- 通过端点公开并可访问,以便与 Kafka 集群中的消费者和制作者交互。资源只与 Kafka Bridge 相关,而不是直接连接到 Kafka 的用户和制作者。
1.2.1. HTTP 请求
Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,使用以下方法:
- 发送消息到主题。
- 从主题检索消息。
- 检索主题的分区列表。
- 创建和删除用户。
- 订阅消费者到主题,以便他们开始从这些主题接收信息。
- 检索消费者订阅的主题列表。
- 取消订阅消费者的主题.
- 为消费者分配分区。
- 提交使用者偏移列表。
- 定位分区,以便消费者开始收到来自第一或最后一个偏移位置的信息,或给定的偏移位置。
该方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以使用 JSON 或二进制格式发送。
客户端可以在不需要使用原生 Kafka 协议的情况下生成和使用消息。
1.3. Kafka Bridge OpenAPI 规格
Kafka Bridge API 使用 OpenAPI 规格(OAS)。OAS 提供了一个标准框架,用于描述和实施 HTTP API。
Kafka Bridge OpenAPI 规格采用 JSON 格式。您可以在 Kafka Bridge 源下载文件的 src/main/resources/
文件夹中找到 OpenAPI JSON 文件。下载文件可从 客户门户网站 获得。
您还可以使用 GET /openapi
方法 以 JSON 格式检索 OpenAPI v2 规格。
其他资源
1.4. 保护到 Kafka 集群的连接
您可以在 Kafka Bridge 和 Kafka 集群间配置以下内容:
- TLS 或基于 SASL 的身份验证
- TLS 加密连接
您可以通过其 属性文件 配置 Kafka Bridge 以进行身份验证。
您还可以使用 Kafka 代理中的 ACL 来限制可以使用 Kafka Bridge 使用并生成的主题。
在 OpenShift 上运行 Kafka Bridge 时,使用 KafkaBridge
资源配置身份验证。
1.5. 保护 Kafka Bridge HTTP 接口
Kafka Bridge 不支持 HTTP 客户端和 Kafka Bridge 之间的身份验证和加密。从客户端发送到 Kafka Bridge 的请求会在没有身份验证或加密的情况下发送。请求必须使用 HTTP 而不是 HTTPS。
您可以将 Kafka Bridge 与以下工具合并,以保护它:
- 定义哪些 pod 可以访问 Kafka Bridge 的网络策略和防火墙
- 反向代理(例如 OAuth 2.0)
- API 网关
1.6. 对 Kafka Bridge 的请求
指定数据格式和 HTTP 标头,以确保向 Kafka Bridge 提交有效请求。
1.6.1. 内容类型标头
API 请求和响应正文始终编码为 JSON。
在执行消费者操作时,如果有非空正文,
POST
请求必须提供以下Content-Type
标头:Content-Type: application/vnd.kafka.v2+json
在执行制作者操作时,
POST
请求必须提供Content-Type
标头,指定 生成的消息的嵌入式数据格式。这可以是json
或二进制
。嵌入式数据格式 content-Type 标头 JSON
content-Type: application/vnd.kafka.json.v2+json
二进制
content-Type: application/vnd.kafka.binary.v2+json
嵌入式数据格式为每个消费者设置,如下一部分所述。
如果 POST
请求具有空正文,则不得设置 Content-Type
。可以使用空正文来创建具有默认值的消费者。
1.6.2. 嵌入式数据格式
嵌入式数据格式是使用 Kafka Bridge 将传输的 Kafka 消息的格式(通过 HTTP)从制作者传输到消费者。支持两种嵌入式数据格式:JSON 和 binary。
使用 /consumers/groupid
端点创建消费者时,POST
请求正文必须指定 JSON 或二进制的嵌入式数据格式。这在 format
字段中指定,例如:
{
"name": "my-consumer",
"format": "binary", 1
# ...
}
- 1
- 二进制嵌入式数据格式。
创建消费者时指定的嵌入式数据格式必须与所使用的 Kafka 信息的数据格式匹配。
如果您选择指定二进制嵌入式数据格式,则后续制作者请求必须在请求正文中提供二进制数据作为 Base64 编码的字符串。例如,当使用 /topics/topicname
端点发送消息时,记录.value
必须以 Base64 编码:
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
生成者请求还必须提供一个与嵌入式数据格式对应的 Content-Type
标头,如 Content-Type: application/vnd.kafka.binary.v2+json
。
1.6.3. 消息格式
使用 /topics
端点发送消息时,您可以在 记录
参数中输入请求正文中的消息有效负载。
records
参数可以包含这些可选字段:
-
消息标头
-
Message
键
-
消息
值
-
目标
分区
到 /topics 的 POST
请求示例
curl -X POST \
http://localhost:8080/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "my-key",
"value": "sales-lead-0001",
"partition": 2,
"headers": [
{
"key": "key1",
"value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
}
]
}
]
}'
- 1
- 二进制格式的标头值,并以 Base64 编码。
1.6.4. 接受标头
创建消费者后,所有后续 GET 请求都必须以以下格式提供一个 Accept
标头:
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
EMBEDDED-DATA-FORMAT
是 json
或 二进制
。
例如,当使用 JSON 的嵌入式数据格式检索订阅的消费者的记录时,请包括此 Accept 标头:
Accept: application/vnd.kafka.json.v2+json
1.7. CORS
通常,HTTP 客户端无法在不同域间发出请求。
例如,假设您与 Kafka 集群一起部署的 Kafka Bridge 可使用 http://my-bridge.io
域访问。HTTP 客户端可以使用 URL 与 Kafka Bridge 交互,并通过 Kafka 集群交换信息。但是,您的客户端在 http://my-web-application.io
域中作为 Web 应用程序运行。客户端(源)域与 Kafka Bridge (目标)域不同。由于相同的原始策略限制,来自客户端的请求会失败。您可以使用跨Origin Resource Sharing (CORS)来避免这种情况。
CORS 允许在不同的域中的原始源之间的 simple 和 preflighted 请求。
简单请求适用于使用 GET
、HEAD
、POST
方法的标准请求。
preflighted 请求发送 HTTP OPTIONS 请求,作为初始检查实际请求是否安全发送。确认时,会发送实际请求。preflight 请求适合需要更多保护的方法,如 PUT
和 DELETE
,并使用非标准标头。
所有请求都需要在其标头中有一个 origin 值,即 HTTP 请求的来源。
CORS 允许您指定允许的方法和原始 URL 来访问 Kafka Bridge HTTP 配置中 Kafka 集群。
Kafka Bridge 的 CORS 配置示例
# ... http.cors.enabled=true http.cors.allowedOrigins=http://my-web-application.io http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
1.7.1. 简单请求
例如,这个简单请求标头将源指定为 http://my-web-application.io
。
Origin: http://my-web-application.io
标头信息添加到请求中以使用记录。
curl -v -X GET HTTP-BRIDGE-ADDRESS/consumers/my-group/instances/my-consumer/records \
-H 'Origin: http://my-web-application.io'\
-H 'content-type: application/vnd.kafka.v2+json'
在 Kafka Bridge 的响应中,会返回 Access-Control-Allow-Origin
标头。它包含可以将 HTTP 请求发送到网桥的域列表。
HTTP/1.1 200 OK
Access-Control-Allow-Origin: * 1
- 1
- 返回星号(
DSL
)显示资源可以被任何域访问。
1.7.2. preflighted 请求
初始 preflight 请求使用 OPTIONS
方法发送到 Kafka Bridge。HTTP OPTIONS 请求发送标头信息,以检查 Kafka Bridge 是否允许实际请求。
此处 preflight 请求检查 POST
请求是否有效 http://my-web-application.io
。
OPTIONS /my-group/instances/my-consumer/subscription HTTP/1.1 Origin: http://my-web-application.io Access-Control-Request-Method: POST 1 Access-Control-Request-Headers: Content-Type 2
OPTIONS
添加到 preflight 请求的标头信息中。
curl -v -X OPTIONS -H 'Origin: http://my-web-application.io' \ -H 'Access-Control-Request-Method: POST' \ -H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridge 响应初始请求,以确认请求将被接受。响应标头返回允许的源、方法和标头。
HTTP/1.1 200 OK Access-Control-Allow-Origin: http://my-web-application.io Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH Access-Control-Allow-Headers: content-type
如果 origin 或 method 被拒绝,则返回错误消息。
实际请求不需要 Access-Control-Request-Method
标头,因为它在 preflight 请求中确认,但它需要原始标头。
curl -v -X POST HTTP-BRIDGE-ADDRESS/topics/bridge-topic \
-H 'Origin: http://my-web-application.io' \
-H 'content-type: application/vnd.kafka.v2+json'
响应显示允许原始 URL。
HTTP/1.1 200 OK Access-Control-Allow-Origin: http://my-web-application.io
其他资源
1.8. 为 Kafka Bridge 配置日志记录器
您可以为 Kafka Bridge OpenAPI 规格定义的每个操作设置不同的日志级别。
每个操作都有一个对应的 API 端点,网桥从 HTTP 客户端接收请求。您可以更改每个端点的日志级别,以生成更多或不太精细的日志记录信息,以及传入和传出的 HTTP 请求。
日志记录器在 log4j2.properties
文件中定义,该文件对 healthy
和 ready
端点有以下默认配置:
logger.healthy.name = http.openapi.operation.healthy logger.healthy.level = WARN logger.ready.name = http.openapi.operation.ready logger.ready.level = WARN
所有其他操作的日志级别默认设置为 INFO
。日志记录器的格式如下:
logger.<operation_id>.name = http.openapi.operation.<operation_id> logger.<operation_id>_level = _<LOG_LEVEL>
其中 <operation_id
> 是特定操作的标识符。
OpenAPI 规格定义的操作列表
-
createConsumer
-
deleteConsumer
-
Subscription
-
unsubscribe
-
poll
-
分配
-
commit
-
send
-
sendToPartition
-
seekToBeginning
-
seekToEnd
-
seek
-
healthy
-
ready
-
openapi
其中 <LOG_LEVEL > 是 log4j2 定义的日志级别(例如 INFO
,DEBUG
, …)。
第 2 章 Kafka Bridge Quickstart
使用此快速入门在本地开发环境中尝试 AMQ Streams Kafka Bridge。
您将了解如何进行以下操作:
- 为 Kafka 集群中的主题和分区生成信息
- 创建 Kafka Bridge 使用者
- 执行基本消费者操作,如订阅消费者到主题并检索您生成的消息
在这个快速入门中,HTTP 请求被格式化为 curl 命令,您可以复制并粘贴到终端中。
确保您有先决条件,然后按照本章中提供的顺序跟踪任务。
在此快速入门中,您将以 JSON 格式生成和使用消息。
快速入门的先决条件
- Kafka 集群在主机上运行。
2.1. 下载 Kafka Bridge 归档
AMQ Streams Kafka Bridge 的压缩发布可用于下载。
流程
- 从客户门户网站下载 AMQ Streams Kafka Bridge 归档的最新版本。
2.2. 安装 Kafka Bridge
使用 Kafka Bridge 归档提供的脚本来安装 Kafka Bridge。安装存档提供的 application.properties
文件提供默认配置设置。
以下默认属性值将 Kafka Bridge 配置为侦听端口 8080 上的请求。
默认配置属性
http.host=0.0.0.0 http.port=8080
流程
- 如果您还没有这样做,将 Kafka Bridge 安装存档解压缩到任何目录中。
使用配置属性作为参数运行 Kafka Bridge 脚本:
例如:
./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
检查日志中的安装是否成功。
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
接下来要做什么
2.3. 生成消息到主题和分区
使用 Kafka Bridge 使用主题端点以 JSON 格式向 Kafka 主题生成信息。
您可以使用主题端点以 JSON 格式生成消息到 主题。您可以为请求正文中的消息指定目标分区。分区 端点提供了一种替代的方法,用来为所有消息指定一个目标分区作为 path 参数。
在此过程中,消息将生成给名为 bridge-quickstart-topic
的主题。
先决条件
Kafka 集群有三个分区的主题。
您可以使用
kafka-topics.sh
实用程序来创建主题。有三个分区的主题创建示例
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
验证是否已创建主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
如果在 OpenShift 上部署了 AMQ Streams,您可以使用 KafkaTopic
自定义资源创建一个主题。
流程
使用 Kafka Bridge,为您创建的主题生成三个信息:
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'
-
Sales-lead-0001
根据密钥的哈希发送到分区。 -
Sales-lead-0002
直接发送到分区 2。 -
Sales-lead-0003
使用 round-robin 方法发送到bridge-quickstart-topic
主题中的分区。
-
如果请求成功,Kafka Bridge 会返回
偏移
数组,以及200
代码和application/vnd.kafka.v2+json
的content-type
标头。对于每个消息,偏移
数组描述:- 消息发送到的分区
分区的当前消息偏移
响应示例
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
其他主题请求
发出其他 curl 请求来查找主题和分区的信息。
- 列出主题
curl -X GET \ http://localhost:8080/topics
响应示例
[ "__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog", "bridge-quickstart-topic", "my-topic" ]
- 获取主题配置和分区详情
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic
响应示例
{ "name": "bridge-quickstart-topic", "configs": { "compression.type": "producer", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "message.downconversion.enable": "true", "segment.jitter.ms": "0", "cleanup.policy": "delete", "flush.ms": "9223372036854775807", "follower.replication.throttled.replicas": "", "segment.bytes": "1073741824", "retention.ms": "604800000", "flush.messages": "9223372036854775807", "message.format.version": "2.8-IV1", "max.compaction.lag.ms": "9223372036854775807", "file.delete.delay.ms": "60000", "max.message.bytes": "1048588", "min.compaction.lag.ms": "0", "message.timestamp.type": "CreateTime", "preallocate": "false", "index.interval.bytes": "4096", "min.cleanable.dirty.ratio": "0.5", "unclean.leader.election.enable": "false", "retention.bytes": "-1", "delete.retention.ms": "86400000", "segment.ms": "604800000", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.index.bytes": "10485760" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ] }
- 列出特定主题的分区
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions
响应示例
[ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ]
- 列出特定主题分区的详情
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
响应示例
{ "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }
- 列出特定主题分区的偏移
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
响应示例
{ "beginning_offset": 0, "end_offset": 1 }
接下来要做什么
在向主题和分区生成信息后,创建一个 Kafka Bridge 消费者。
2.4. 创建 Kafka Bridge 使用者
在 Kafka 集群中执行任何消费者操作前,您必须首先使用使用者端点创建 消费者。消费者被称为 Kafka Bridge 消费者。
流程
在名为
bridge-quickstart-consumer-group
的新消费者组中创建一个 Kafka Bridge 使用者:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "bridge-quickstart-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": false, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'
-
消费者命名为
bridge-quickstart-consumer
,嵌入式数据格式被设置为json
。 - 定义了一些基本配置设置。
消费者不会自动向日志提交偏移,因为
enable.auto.commit
设置为false
。稍后,您将在此快速入门中手动提交偏移。如果请求成功,Kafka Bridge 会返回响应正文中的消费者 ID (
instance_id
)和基础 URL (base_uri
),以及200
代码。响应示例
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
-
消费者命名为
-
复制本快速入门中其他消费者操作使用的基本 URL (
base_uri
)。
接下来要做什么
现在,您已创建了 Kafka Bridge 使用者,您可以将其订阅到主题。
2.5. 将 Kafka Bridge 使用者订阅到主题
创建 Kafka Bridge 使用者后,使用订阅端点 订阅 一个或多个主题。订阅后,消费者开始接收生成至该主题的所有消息。
流程
将消费者订阅到之前创建的
bridge-quickstart-topic
主题,在 Produc ing 信息到主题和分区中 :curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "bridge-quickstart-topic" ] }'
topics
数组可以包含单个主题(如下所示)或多个主题。如果要订阅消费者到与正则表达式匹配的多个主题,您可以使用topic_pattern
字符串而不是topics
数组。如果请求成功,Kafka Bridge 只返回一个
204
(无内容)代码。
使用 Apache Kafka 客户端时,HTTP 订阅操作会将主题添加到本地消费者的订阅中。在运行多个 HTTP 轮询操作后,加入消费者组并获取分区分配,启动分区重新平衡和 join-group 进程。请注意,初始 HTTP 轮询操作可能无法返回任何记录。
接下来要做什么
将 Kafka Bridge 使用者订阅到主题后,您可以从 消费者检索信息。
2.6. 从 Kafka Bridge 使用者检索最新的信息
通过从 记录 端点请求数据,从 Kafka Bridge 使用者检索最新的消息。在生产环境中,HTTP 客户端可以重复调用此端点(在循环中)。
流程
- 为 Kafka Bridge 使用者生成额外消息,如在 主题和分区中修复消息 中所述。
将
GET
请求提交到记录
端点:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
创建并订阅 Kafka Bridge 使用者后,第一个 GET 请求会返回一个空响应,因为轮询操作会启动重新平衡过程来分配分区。
重复步骤 2,以从 Kafka Bridge 使用者检索信息。
Kafka Bridge 返回一条消息,其中包含主题名称、键、值、分区和偏移,以及响应正文中的
200
代码。默认情况下,消息从最新的偏移中检索。HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...
注意如果返回空响应,请为消费者生成更多记录,如 将消息到主题和分区 中所述,然后尝试再次检索消息。
接下来要做什么
从 Kafka Bridge 使用者检索消息后,尝试向 日志提交偏移。
2.7. 将偏移结果提交给日志
使用 偏移 端点,为 Kafka Bridge 消费者接收的所有消息手动将偏移时间提交到日志中。这是必要的,因为您在创建 Kafka Bridge 消费者 时使用 enable.auto.commit
设置配置为 false
。
流程
将偏移提交到
bridge-quickstart-consumer
的日志:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
由于没有提交请求正文,所以偏移量用于消费者收到的所有记录。或者,请求正文可以包含数组(OffsetCommitSeekList),用于指定您要为其提交偏移的主题和分区。
如果请求成功,Kafka Bridge 只返回一个
204
代码。
接下来要做什么
向日志提交偏移后,请尝试尝试查找 偏移的端点。
2.8. 分区偏移
使用 位置 端点配置 Kafka Bridge 使用者,以从特定偏移中检索分区的信息,然后从最新的偏移中检索。这在 Apache Kafka 中称为 seek 操作。
流程
查找
quickstart-bridge-topic
主题的分区 0 的特定偏移:curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "offsets": [ { "topic": "bridge-quickstart-topic", "partition": 0, "offset": 2 } ] }'
如果请求成功,Kafka Bridge 只返回一个
204
代码。将
GET
请求提交到记录
端点:curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Kafka Bridge 从您找到的偏移返回信息。
通过查找同一分区的最后一个偏移来恢复默认消息检索行为。此时,使用 位置/端点。
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "bridge-quickstart-topic", "partition": 0 } ] }'
如果请求成功,Kafka Bridge 会返回另一个
204
代码。
您还可以使用 位置/关闭 端点查找一个或多个分区的第一个偏移量。
接下来要做什么
在这个快速入门中,您已使用 AMQ Streams Kafka Bridge 在 Kafka 集群上执行几个常见操作。现在 ,您可以删除之前创建的 Kafka Bridge 使用者。
2.9. 删除 Kafka Bridge 使用者
删除您在这个快速入门中使用的 Kafka Bridge 使用者。
流程
通过向 实例 端点发送
DELETE
请求来删除 Kafka Bridge 使用者。curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
如果请求成功,Kafka Bridge 会返回
204
代码。
第 3 章 Kafka Bridge 配置
使用配置属性配置 Kafka Bridge 的部署。配置 Kafka 并指定与 Kafka 交互所需的 HTTP 连接详情。您还可以使用配置属性在 Kafka Bridge 中启用和使用分布式追踪。分布式追踪允许您跟踪分布式系统中应用程序之间的事务进度。
在 OpenShift 上运行 Kafka Bridge 时,使用 KafkaBridge
资源配置属性。
3.1. 配置 Kafka Bridge 属性
这个步骤描述了如何配置 Kafka Bridge 使用的 Kafka 和 HTTP 连接属性。
您可以使用与 Kafka 相关的属性的适当前缀,将 Kafka Bridge 配置为任何其他 Kafka 客户端。
-
kafka.
用于适用于制作者和消费者的常规配置,如服务器连接和安全。 -
kafka.consumer.
用于仅传递给消费者的特定于消费者的配置。 -
kafka.producer.
用于仅传递给制作者的特定于制作者的配置。
除了启用对 Kafka 集群的 HTTP 访问外,HTTP 属性还提供通过跨图形资源共享(CORS)为 Kafka Bridge 启用和定义访问控制的功能。CORS 是一种 HTTP 机制,它允许浏览器从多个来源访问所选资源。要配置 CORS,您可以定义允许的资源来源和 HTTP 方法列表来访问它们。请求中的其他 HTTP 标头描述了允许访问 Kafka 集群的 CORS 来源。
流程
编辑 Kafka Bridge 安装存档提供的
application.properties
文件。使用属性文件指定 Kafka 和 HTTP 相关属性。
配置标准的 Kafka 相关属性,包括特定于 Kafka 使用者和制作者的属性。
使用:
-
kafka.bootstrap.servers
以定义到 Kafka 集群的主机/端口连接 -
kafka.producer.acks
为 HTTP 客户端提供确认 kafka.consumer.auto.offset.reset
,以确定如何在 Kafka 中管理偏移的重置有关配置 Kafka 属性的更多信息,请参阅 Apache Kafka 网站
-
配置与 HTTP 相关的属性,以启用对 Kafka 集群的 HTTP 访问。
例如:
bridge.id=my-bridge http.host=0.0.0.0 http.port=8080 1 http.cors.enabled=true 2 http.cors.allowedOrigins=https://strimzi.io 3 http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 4
- 保存配置文件。
3.2. 配置分布式追踪
启用分布式追踪来跟踪 Kafka Bridge 使用和生成的消息,以及来自客户端应用程序的 HTTP 请求。
application.properties
文件中提供了启用追踪的属性。要启用分布式追踪,请执行以下操作:
-
设置
bridge.tracing
属性值,以启用您要使用的追踪。可能的值有jaeger
和opentelemetry
。 - 为追踪设置环境变量。
使用默认配置,OpenSSH 追踪使用 OTLP 作为导出器协议。通过配置 OTLP 端点,您仍然可以使用 Jaeger 后端实例获取 trace。
从版本 1.35 开始,Jaeger 支持 OTLP 协议。旧的 Jaeger 版本无法使用 OTLP 协议获得 trace。
OpenTelemetry 和 OpenTracing 是收集追踪数据的 API 规格,作为指标数据的 span。span 代表特定的操作。trace 是一个或多个 span 的集合。
当 Kafka Bridge 执行以下操作时,会生成 trace:
- 将信息从 Kafka 发送到消费者 HTTP 客户端
- 从制作者 HTTP 客户端接收信息,以发送到 Kafka
Jaeger 实现所需的 API,并在用户界面中显示 trace 数据的视觉化来进行分析。
要获得端到端追踪,您必须在 HTTP 客户端中配置追踪。
OpenTracing 项目现已存档,因此 AMQ Streams 已被弃用对 OpenTracing 的支持。如果可能,我们将保持对 bridge.tracing=jaeger
追踪的支持,直到 2023 年 6 月为止。请尽快迁移到 OpenTelemetry。
先决条件
流程
编辑 Kafka Bridge 安装存档提供的
application.properties
文件。使用
bridge.tracing
属性启用您要使用的追踪。启用 OpenTelemetry 的配置示例
#bridge.tracing=jaeger 1 bridge.tracing=opentelemetry 2
启用追踪后,您可以在运行 Kafka Bridge 脚本时初始化追踪。
- 保存配置文件。
设置用于追踪的环境变量。
OpenTelemetry 的环境变量
OTEL_SERVICE_NAME=my-tracing-service 1 OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 2
OpenTracing 的环境变量
JAEGER_SERVICE_NAME=my-jaeger-service 1 JAEGER_AGENT_HOST=localhost 2 JAEGER_AGENT_PORT=6831 3
使用启用了用于追踪的属性运行 Kafka Bridge 脚本:
运行启用了 OpenTelemetry 的 Kafka Bridge
./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
Kafka Bridge 的内部使用者和制作者现在被启用以进行追踪。
3.2.1. 使用 OpenTelemetry 指定追踪系统
您可以指定 OpenTelemetry 支持的其他追踪系统,而不是默认的 OTLP 追踪系统。
如果要将另一个追踪系统与 OpenTelemetry 搭配使用,请执行以下操作:
- 将追踪系统的库添加到 Kafka classpath 中。
将追踪系统的名称添加为额外的导出器环境变量。
如果不使用 OTLP 时的其他环境变量
OTEL_SERVICE_NAME=my-tracing-service OTEL_TRACES_EXPORTER=zipkin 1 OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans 2
其他资源
第 4 章 AMQ Streams Kafka Bridge API 参考
4.1. 概述
AMQ Streams Kafka Bridge 提供了一个 REST API,用于将基于 HTTP 的客户端应用程序与 Kafka 集群集成。您可以使用 API 创建和管理消费者,并通过 HTTP 发送和接收记录,而不是原生 Kafka 协议。
4.1.1. 版本信息
版本 : 0.1.0
4.1.2. Tags
- 消费者 :消费者操作在 Kafka 集群中创建消费者,并执行常见操作,如订阅主题、检索已处理记录和提交偏移。
- producer : Producer 操作将记录发送到指定主题或主题分区。
- seek : seek 操作使消费者能够开始从给定的偏移位置接收消息。
- 主题 :将消息发送到指定主题或主题分区的主题操作(可选)在请求中包含消息密钥。您还可以检索主题和主题元数据。
4.1.3. 使用
-
application/json
4.1.4. generate
-
application/json
4.2. 定义
4.2.1. AssignedTopicPartitions
类型 : < string, < integer (int32)> array > map
4.2.2. BridgeInfo
有关 Kafka Bridge 实例的信息。
名称 | 模式 |
---|---|
bridge_version | 字符串 |
4.2.3. 消费者
名称 | 描述 | 模式 |
---|---|---|
auto.offset.reset |
重置消费者的偏移位置。如果设置为 | 字符串 |
consumer.request.timeout.ms |
设置消费者等待请求的最大时间(以毫秒为单位)。如果在没有响应的情况下达到超时周期,则返回错误。默认为 | 整数 |
enable.auto.commit |
如果设置为 | 布尔值 |
fetch.min.bytes |
设置接收消费者的最小数据量(以字节为单位)。代理会等待数据发送超过这个数量。默认为 | 整数 |
格式 |
使用者的允许消息格式,可以是 | 字符串 |
isolated.level |
如果设置为 | 字符串 |
name | consumer 实例的唯一名称。名称在消费者组范围内是唯一的。名称用于 URL。如果没有指定名称,则会分配随机生成的名称。 | 字符串 |
4.2.4. ConsumerRecord
名称 | 模式 |
---|---|
标头 | |
偏移 | 整数(int64) |
分区 | integer (int32) |
Topic | 字符串 |
4.2.5. ConsumerRecordList
type : < ConsumerRecord > array
4.2.6. CreatedConsumer
名称 | 描述 | 模式 |
---|---|---|
base_uri | 用于构建 URI 以进行针对此消费者实例的后续请求的基本 URI。 | 字符串 |
instance_id | 组中消费者实例的唯一 ID。 | 字符串 |
4.2.7. Error
名称 | 模式 |
---|---|
error_code | integer (int32) |
message | 字符串 |
4.2.8. KafkaHeader
名称 | 描述 | 模式 |
---|---|---|
key | 字符串 | |
需要 |
标头值(二进制格式)base64 编码的 | 字符串(字节) |
4.2.9. KafkaHeaderList
类型 : < KafkaHeader > 数组
4.2.10. OffsetCommitSeek
名称 | 模式 |
---|---|
需要 偏移 | 整数(int64) |
分区 | integer (int32) |
主题 | 字符串 |
4.2.11. OffsetCommitSeekList
名称 | 模式 |
---|---|
偏移 | < OffsetCommitSeek > 数组 |
4.2.12. OffsetRecordSent
名称 | 模式 |
---|---|
偏移 | 整数(int64) |
分区 | integer (int32) |
4.2.13. OffsetRecordSentList
名称 | 模式 |
---|---|
偏移 | < OffsetRecordSent > array |
4.2.14. OffsetsSummary
名称 | 模式 |
---|---|
beginning_offset | 整数(int64) |
end_offset | 整数(int64) |
4.2.15. 分区
名称 | 模式 |
---|---|
分区 | integer (int32) |
Topic | 字符串 |
4.2.16. PartitionMetadata
名称 | 模式 |
---|---|
领导 | integer (int32) |
分区 | integer (int32) |
replicas | < ; replica > 数组 |
4.2.17. 分区
名称 | 模式 |
---|---|
分区 | < ; partition > 数组 |
4.2.18. ProducerRecord
名称 | 模式 |
---|---|
标头 | |
分区 | integer (int32) |
4.2.19. ProducerRecordList
名称 | 模式 |
---|---|
记录 | < ProducerRecord > array |
4.2.20. ProducerRecordToPartition
名称 | 模式 |
---|---|
标头 |
4.2.21. ProducerRecordToPartitionList
名称 | 模式 |
---|---|
记录 | < ProducerRecordToPartition > array |
4.2.22. replica
名称 | 模式 |
---|---|
broker | integer (int32) |
in_sync | 布尔值 |
领导 | 布尔值 |
4.2.23. SubscribedTopicList
名称 | 模式 |
---|---|
分区 | < AssignedTopicPartitions > 数组 |
topics |
4.2.24. TopicMetadata
名称 | 描述 | 模式 |
---|---|---|
配置 | 每个主题配置覆盖 | < string, string > map |
name | 主题的名称 | 字符串 |
分区 | < PartitionMetadata > array |
4.2.25. 主题
名称 | 描述 | 模式 |
---|---|---|
topic_pattern | 匹配多个主题的正则表达式主题模式 | 字符串 |
topics | < string > 数字 |
4.3. 路径
4.3.1. GET /
4.3.1.1. 描述
以 JSON 格式检索 Kafka Bridge 实例的信息。
4.3.1.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 有关 Kafka Bridge 实例的信息。 |
4.3.1.3. generate
-
application/json
4.3.1.4. HTTP 响应示例
4.3.1.4.1. 响应 200
{ "bridge_version" : "0.16.0" }
4.3.2. POST /consumers/{groupid}
4.3.2.1. 描述
在给定的消费者组中创建一个消费者实例。您可以选择指定使用者名称和支持的配置选项。它返回一个基本 URI,必须用来构建针对这个消费者实例的后续请求的 URL。
4.3.2.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 要创建消费者的消费者组的 ID。 | 字符串 |
Body |
body | 使用者的名称和配置。名称在消费者组范围内是唯一的。如果没有指定名称,则会分配随机生成的名称。所有参数都是可选的。以下示例中显示了支持的配置选项。 |
4.3.2.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 消费者创建成功。 | |
409 | Kafka Bridge 中已存在具有指定名称的消费者实例。 | |
422 | 一个或多个使用者配置选项具有无效的值。 |
4.3.2.4. 使用
-
application/vnd.kafka.v2+json
4.3.2.5. generate
-
application/vnd.kafka.v2+json
4.3.2.6. Tags
- 消费者
4.3.2.7. HTTP 请求示例
4.3.2.7.1. 请求正文
{ "name" : "consumer1", "format" : "binary", "auto.offset.reset" : "earliest", "enable.auto.commit" : false, "fetch.min.bytes" : 512, "consumer.request.timeout.ms" : 30000, "isolation.level" : "read_committed" }
4.3.2.8. HTTP 响应示例
4.3.2.8.1. 响应 200
{ "instance_id" : "consumer1", "base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1" }
4.3.2.8.2. 响应 409
{ "error_code" : 409, "message" : "A consumer instance with the specified name already exists in the Kafka Bridge." }
4.3.2.8.3. 响应 422
{ "error_code" : 422, "message" : "One or more consumer configuration options have invalid values." }
4.3.3. DELETE /consumers/{groupid}/instances/{name}
4.3.3.1. 描述
删除指定的消费者实例。此操作的请求必须使用从 POST
请求返回的基本 URL (包括主机和端口)到用于创建此消费者的 /consumers/{groupid}
。
4.3.3.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 要删除的使用者的名称。 | 字符串 |
4.3.3.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 使用者被成功移除。 | 无内容 |
404 | 未找到指定的消费者实例。 |
4.3.3.4. 使用
-
application/vnd.kafka.v2+json
4.3.3.5. generate
-
application/vnd.kafka.v2+json
4.3.3.6. Tags
- 消费者
4.3.3.7. HTTP 响应示例
4.3.3.7.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.4. POST /consumers/{groupid}/instances/{name}/assignments
4.3.4.1. 描述
为消费者分配一个或多个主题分区。
4.3.4.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 要为其分配主题分区的使用者的名称。 | 字符串 |
Body |
body | 要分配给消费者的主题分区列表。 |
4.3.4.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 成功分配的分区。 | 无内容 |
404 | 未找到指定的消费者实例。 | |
409 | 主题、分区和模式的订阅是互斥的。 |
4.3.4.4. 使用
-
application/vnd.kafka.v2+json
4.3.4.5. generate
-
application/vnd.kafka.v2+json
4.3.4.6. Tags
- 消费者
4.3.4.7. HTTP 请求示例
4.3.4.7.1. 请求正文
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
4.3.4.8. HTTP 响应示例
4.3.4.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.4.8.2. 响应 409
{ "error_code" : 409, "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive." }
4.3.5. POST /consumers/{groupid}/instances/{name}/offsets
4.3.5.1. 描述
提交消费者偏移列表。要提交消费者获取的所有记录的偏移,请将请求正文留空。
4.3.5.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 使用者的名称。 | 字符串 |
Body |
body | 提交到消费者偏移日志的消费者偏移列表。您可以指定一个或多个主题分区来提交偏移。 |
4.3.5.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 提交成功。 | 无内容 |
404 | 未找到指定的消费者实例。 |
4.3.5.4. 使用
-
application/vnd.kafka.v2+json
4.3.5.5. generate
-
application/vnd.kafka.v2+json
4.3.5.6. Tags
- 消费者
4.3.5.7. HTTP 请求示例
4.3.5.7.1. 请求正文
{ "offsets" : [ { "topic" : "topic", "partition" : 0, "offset" : 15 }, { "topic" : "topic", "partition" : 1, "offset" : 42 } ] }
4.3.5.8. HTTP 响应示例
4.3.5.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.6. POST /consumers/{groupid}/instances/{name}/positions
4.3.6.1. 描述
配置订阅的消费者,以便在下一次从给定的主题分区获取一组记录时从特定的偏移偏移偏移。这会覆盖消费者的默认获取行为。您可以指定一个或多个主题分区。
4.3.6.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅的消费者的名称。 | 字符串 |
Body |
body | 订阅消费者下次从中获取记录的分区偏移列表。 |
4.3.6.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | seek 已成功执行。 | 无内容 |
404 | 未找到指定的消费者实例,或者指定的消费者实例没有分配指定分区之一。 |
4.3.6.4. 使用
-
application/vnd.kafka.v2+json
4.3.6.5. generate
-
application/vnd.kafka.v2+json
4.3.6.6. Tags
- 消费者
- seek
4.3.6.7. HTTP 请求示例
4.3.6.7.1. 请求正文
{ "offsets" : [ { "topic" : "topic", "partition" : 0, "offset" : 15 }, { "topic" : "topic", "partition" : 1, "offset" : 42 } ] }
4.3.6.8. HTTP 响应示例
4.3.6.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.7. POST /consumers/{groupid}/instances/{name}/positions/beginning
4.3.7.1. 描述
配置订阅的消费者,以查找(随后读取)一个或多个给定主题分区中的第一个偏移。
4.3.7.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅的消费者的名称。 | 字符串 |
Body |
body | 消费者订阅的主题分区列表。消费者将在指定分区中看到第一个偏移量。 |
4.3.7.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 请参阅成功执行开始。 | 无内容 |
404 | 未找到指定的消费者实例,或者指定的消费者实例没有分配指定分区之一。 |
4.3.7.4. 使用
-
application/vnd.kafka.v2+json
4.3.7.5. generate
-
application/vnd.kafka.v2+json
4.3.7.6. Tags
- 消费者
- seek
4.3.7.7. HTTP 请求示例
4.3.7.7.1. 请求正文
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
4.3.7.8. HTTP 响应示例
4.3.7.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.8. POST /consumers/{groupid}/instances/{name}/positions/end
4.3.8.1. 描述
配置订阅的消费者,以查找(并从中读取)在一个或多个给定主题分区的末尾的偏移量。
4.3.8.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅的消费者的名称。 | 字符串 |
Body |
body | 消费者订阅的主题分区列表。消费者将在指定分区中看到最后一个偏移量。 |
4.3.8.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 寻求成功执行的最终操作。 | 无内容 |
404 | 未找到指定的消费者实例,或者指定的消费者实例没有分配指定分区之一。 |
4.3.8.4. 使用
-
application/vnd.kafka.v2+json
4.3.8.5. generate
-
application/vnd.kafka.v2+json
4.3.8.6. Tags
- 消费者
- seek
4.3.8.7. HTTP 请求示例
4.3.8.7.1. 请求正文
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
4.3.8.8. HTTP 响应示例
4.3.8.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.9. GET /consumers/{groupid}/instances/{name}/records
4.3.9.1. 描述
检索订阅的消费者的记录,包括消息值、主题和分区。此操作的请求必须使用从 POST
请求返回的基本 URL (包括主机和端口)到用于创建此消费者的 /consumers/{groupid}
。
4.3.9.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 从中检索记录的订阅消费者的名称。 | 字符串 |
查询 |
max_bytes | 响应中包含的未编码键和值的最大大小(以字节为单位)。否则,会返回带有代码 422 的错误响应。 | 整数 |
查询 |
timeout | HTTP 网桥在超时请求前检索记录的最长时间(以毫秒为单位)。 | 整数 |
4.3.9.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 轮询请求执行成功。 | |
404 | 未找到指定的消费者实例。 | |
406 |
消费者创建请求中使用的 | |
422 | 响应超过消费者可以接收的最大字节数 |
4.3.9.4. generate
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
-
application/vnd.kafka.v2+json
4.3.9.5. Tags
- 消费者
4.3.9.6. HTTP 响应示例
4.3.9.6.1. 响应 200
[ { "topic" : "topic", "key" : "key1", "value" : { "foo" : "bar" }, "partition" : 0, "offset" : 2 }, { "topic" : "topic", "key" : "key2", "value" : [ "foo2", "bar2" ], "partition" : 1, "offset" : 3 } ]
[ { "topic": "test", "key": "a2V5", "value": "Y29uZmx1ZW50", "partition": 1, "offset": 100, }, { "topic": "test", "key": "a2V5", "value": "a2Fma2E=", "partition": 2, "offset": 101, } ]
4.3.9.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.9.6.3. 响应 406
{ "error_code" : 406, "message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request." }
4.3.9.6.4. 响应 422
{ "error_code" : 422, "message" : "Response exceeds the maximum number of bytes the consumer can receive" }
4.3.10. POST /consumers/{groupid}/instances/{name}/subscription
4.3.10.1. 描述
订阅消费者订阅一个或多个主题。您可以描述消费者将在列表(主题类型)或 topic_pattern
字段中订阅 的主题
。每个调用都会替换订阅者的订阅。
4.3.10.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 要订阅主题的消费者名称。 | 字符串 |
Body |
body | 消费者要订阅的主题列表。 |
4.3.10.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 消费者订阅成功。 | 无内容 |
404 | 未找到指定的消费者实例。 | |
409 | 主题、分区和模式的订阅是互斥的。 | |
422 |
必须指定列表( |
4.3.10.4. 使用
-
application/vnd.kafka.v2+json
4.3.10.5. generate
-
application/vnd.kafka.v2+json
4.3.10.6. Tags
- 消费者
4.3.10.7. HTTP 请求示例
4.3.10.7.1. 请求正文
{ "topics" : [ "topic1", "topic2" ] }
4.3.10.8. HTTP 响应示例
4.3.10.8.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.10.8.2. 响应 409
{ "error_code" : 409, "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive." }
4.3.10.8.3. 响应 422
{ "error_code" : 422, "message" : "A list (of Topics type) or a topic_pattern must be specified." }
4.3.11. GET /consumers/{groupid}/instances/{name}/subscription
4.3.11.1. 描述
检索消费者订阅的主题列表。
4.3.11.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 订阅的消费者的名称。 | 字符串 |
4.3.11.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 订阅的主题和分区列表。 | |
404 | 未找到指定的消费者实例。 |
4.3.11.4. generate
-
application/vnd.kafka.v2+json
4.3.11.5. Tags
- 消费者
4.3.11.6. HTTP 响应示例
4.3.11.6.1. 响应 200
{ "topics" : [ "my-topic1", "my-topic2" ], "partitions" : [ { "my-topic1" : [ 1, 2, 3 ] }, { "my-topic2" : [ 1 ] } ] }
4.3.11.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.12. DELETE /consumers/{groupid}/instances/{name}/subscription
4.3.12.1. 描述
从所有主题取消订阅消费者。
4.3.12.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
GroupId | 订阅消费者所属的消费者组的 ID。 | 字符串 |
路径 |
name | 要从主题中取消订阅的消费者名称。 | 字符串 |
4.3.12.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 消费者成功取消订阅。 | 无内容 |
404 | 未找到指定的消费者实例。 |
4.3.12.4. Tags
- 消费者
4.3.12.5. HTTP 响应示例
4.3.12.5.1. 响应 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
4.3.13. GET /healthy
4.3.13.1. 描述
检查网桥是否正在运行。这不一定表示它已准备好接受请求。
4.3.13.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 网桥处于健康状态 | 无内容 |
500 | 网桥不是健康 | 无内容 |
4.3.14. GET /metrics
4.3.14.1. 描述
以 Prometheus 格式检索网桥指标。
4.3.14.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | Prometheus 格式的指标成功检索。 | 字符串 |
4.3.14.3. generate
-
text/plain
4.3.15. GET /openapi
4.3.15.1. 描述
以 JSON 格式检索 OpenAPI v2 规格。
4.3.15.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | JSON 格式的 OpenAPI v2 规格成功检索。 | 字符串 |
4.3.15.3. generate
-
application/json
4.3.16. GET /ready
4.3.16.1. 描述
检查网桥是否已就绪,并且可以接受请求。
4.3.16.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
204 | 网桥已就绪 | 无内容 |
500 | 网桥未就绪 | 无内容 |
4.3.17. GET /topics
4.3.17.1. 描述
检索所有主题的列表。
4.3.17.2. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 主题列表。 | < string > 数字 |
4.3.17.3. generate
-
application/vnd.kafka.v2+json
4.3.17.4. Tags
- 主题
4.3.17.5. HTTP 响应示例
4.3.17.5.1. 响应 200
[ "topic1", "topic2" ]
4.3.18. POST /topics/{topicname}
4.3.18.1. 描述
将一个或多个记录发送到给定的主题,可选指定分区、密钥或两者。
4.3.18.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
topicName | 将记录发送到或从中检索元数据的主题名称。 | 字符串 |
查询 |
async | 在发送记录时是否立即返回,而不是等待元数据。如果指定,则不会返回偏移量。默认为false。 | 布尔值 |
Body |
body |
4.3.18.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 成功发送的记录。 | |
404 | 未找到指定的主题。 | |
422 | 记录列表无效。 |
4.3.18.4. 使用
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
4.3.18.5. generate
-
application/vnd.kafka.v2+json
4.3.18.6. Tags
- 制作者
- 主题
4.3.18.7. HTTP 请求示例
4.3.18.7.1. 请求正文
{ "records" : [ { "key" : "key1", "value" : "value1" }, { "value" : "value2", "partition" : 1 }, { "value" : "value3" } ] }
4.3.18.8. HTTP 响应示例
4.3.18.8.1. 响应 200
{ "offsets" : [ { "partition" : 2, "offset" : 0 }, { "partition" : 1, "offset" : 1 }, { "partition" : 2, "offset" : 2 } ] }
4.3.18.8.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic was not found." }
4.3.18.8.3. 响应 422
{ "error_code" : 422, "message" : "The record list contains invalid records." }
4.3.19. GET /topics/{topicname}
4.3.19.1. 描述
检索给定主题的元数据。
4.3.19.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
topicName | 将记录发送到或从中检索元数据的主题名称。 | 字符串 |
4.3.19.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 主题元数据 |
4.3.19.4. generate
-
application/vnd.kafka.v2+json
4.3.19.5. Tags
- 主题
4.3.19.6. HTTP 响应示例
4.3.19.6.1. 响应 200
{ "name" : "topic", "offset" : 2, "configs" : { "cleanup.policy" : "compact" }, "partitions" : [ { "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }, { "partition" : 2, "leader" : 2, "replicas" : [ { "broker" : 1, "leader" : false, "in_sync" : true }, { "broker" : 2, "leader" : true, "in_sync" : true } ] } ] }
4.3.20. GET /topics/{topicname}/partitions
4.3.20.1. 描述
检索主题的分区列表。
4.3.20.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
topicName | 将记录发送到或从中检索元数据的主题名称。 | 字符串 |
4.3.20.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 分区列表 | < PartitionMetadata > array |
404 | 未找到指定的主题。 |
4.3.20.4. generate
-
application/vnd.kafka.v2+json
4.3.20.5. Tags
- 主题
4.3.20.6. HTTP 响应示例
4.3.20.6.1. 响应 200
[ { "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }, { "partition" : 2, "leader" : 2, "replicas" : [ { "broker" : 1, "leader" : false, "in_sync" : true }, { "broker" : 2, "leader" : true, "in_sync" : true } ] } ]
4.3.20.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic was not found." }
4.3.21. POST /topics/{topicname}/partitions/{partitionid}
4.3.21.1. 描述
将一个或多个记录发送到给定的主题分区,可选指定密钥。
4.3.21.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
partitionid | 要从中发送记录或检索元数据的分区 ID。 | 整数 |
路径 |
topicName | 将记录发送到或从中检索元数据的主题名称。 | 字符串 |
查询 |
async | 在发送记录时是否立即返回,而不是等待元数据。如果指定,则不会返回偏移量。默认为false。 | 布尔值 |
Body |
body | 要发送到给定主题分区的记录列表,包括值(必需)和键(可选)。 |
4.3.21.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 成功发送的记录。 | |
404 | 未找到指定的主题分区。 | |
422 | 记录无效。 |
4.3.21.4. 使用
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
4.3.21.5. generate
-
application/vnd.kafka.v2+json
4.3.21.6. Tags
- 制作者
- 主题
4.3.21.7. HTTP 请求示例
4.3.21.7.1. 请求正文
{ "records" : [ { "key" : "key1", "value" : "value1" }, { "value" : "value2" } ] }
4.3.21.8. HTTP 响应示例
4.3.21.8.1. 响应 200
{ "offsets" : [ { "partition" : 2, "offset" : 0 }, { "partition" : 1, "offset" : 1 }, { "partition" : 2, "offset" : 2 } ] }
4.3.21.8.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
4.3.21.8.3. 响应 422
{ "error_code" : 422, "message" : "The record is not valid." }
4.3.22. GET /topics/{topicname}/partitions/{partitionid}
4.3.22.1. 描述
检索主题分区分区元数据。
4.3.22.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
partitionid | 要从中发送记录或检索元数据的分区 ID。 | 整数 |
路径 |
topicName | 将记录发送到或从中检索元数据的主题名称。 | 字符串 |
4.3.22.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 分区元数据 | |
404 | 未找到指定的主题分区。 |
4.3.22.4. generate
-
application/vnd.kafka.v2+json
4.3.22.5. Tags
- 主题
4.3.22.6. HTTP 响应示例
4.3.22.6.1. 响应 200
{ "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }
4.3.22.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
4.3.23. GET /topics/{topicname}/partitions/{partitionid}/offsets
4.3.23.1. 描述
检索主题分区的偏移摘要。
4.3.23.2. 参数
类型 | Name | 描述 | 模式 |
---|---|---|---|
路径 |
partitionid | 分区 ID。 | 整数 |
路径 |
topicName | 包含分区的主题名称。 | 字符串 |
4.3.23.3. 响应
HTTP 代码 | 描述 | 模式 |
---|---|---|
200 | 主题分区的偏移摘要。 | |
404 | 未找到指定的主题分区。 |
4.3.23.4. generate
-
application/vnd.kafka.v2+json
4.3.23.5. Tags
- 主题
4.3.23.6. HTTP 响应示例
4.3.23.6.1. 响应 200
{ "beginning_offset" : 10, "end_offset" : 50 }
4.3.23.6.2. 响应 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
附录 A. 使用您的订阅
AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
访问您的帐户
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
激活订阅
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
下载 Zip 和 Tar 文件
要访问 zip 或 tar 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 在 INTEGRATION AND AUTOMATION 目录中找到 AMQ Streams for Apache Kafka 项。
- 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
- 单击组件的 Download 链接。
使用 DNF 安装软件包
要安装软件包以及所有软件包的依赖软件包,请使用:
dnf install <package_name>
要从本地目录中安装之前下载的软件包,请使用:
dnf install <path_to_download_package>
更新于 2023-11-22