第 12 章 Kafka Bridge
本章概述了 Red Hat Enterprise Linux 上的 AMQ Streams Kafka Bridge,并帮助您开始使用 REST API 与 AMQ Streams 交互。要在您的本地环境中尝试 Kafka 网桥,请参阅本章后面的 第 12.2 节 “Kafka Bridge quickstart”。
其它资源
- 要查看 API 文档,包括请求和响应示例,请参阅 Kafka Bridge API 参考。
- 要为 Kafka Bridge 配置分布式追踪,请参阅 第 15.4 节 “为 Kafka 网桥启用追踪”。
12.1. Kafka 网桥概述
Kafka Bridge 提供了一个 RESTful 接口,允许基于 HTTP 的客户端与 Kafka 集群交互。它提供了与 AMQ Streams 的 Web API 连接的优势,不需要客户端应用程序来解释 Kafka 协议。
API 有两个主要资源:consumers
和 topics
,它们通过端点公开并可访问,以便与 Kafka 集群中的用户和生产者交互。资源仅与 Kafka 网桥相关,而不是与 Kafka 直接连接的消费者和生产者。
HTTP 请求
Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,其方法如下:
- 发送消息到一个主题。
- 从主题检索消息.
- 检索主题的分区列表。
- 创建和删除消费者.
- 订阅消费者了解主题,以便他们开始接收来自这些主题的信息。
- 检索消费者订阅的主题列表。
- 取消订阅消费者的主题.
- 将分区分配给消费者.
- 提交消费者偏移列表。
- 寻找分区,以便使用者开始接受来自第一个或最后一个偏移位置的信息,或者给定的偏移位置。
这些方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以 JSON 或二进制格式发送。
客户端可以生成和使用消息,而无需使用原生 Kafka 协议。
与 AMQ Streams 安装类似,您可以下载用于在 Red Hat Enterprise Linux 上安装的 Kafka Bridge 文件。请参阅 第 12.1.5 节 “下载 Kafka 网桥存档”。
有关为 KafkaBridge
资源配置主机和端口的详情请参考 第 12.1.6 节 “配置 Kafka Bridge 属性”。
12.1.1. 身份验证和加密
尚不支持 HTTP 客户端和 Kafka 网桥之间的身份验证和加密。这意味着,从客户端发送到 Kafka Bridge 的请求是:
- 未加密,且必须使用 HTTP 而不是 HTTPS
- 在没有验证的情况下发送
您可以在 Kafka Bridge 和 Kafka 集群间配置 TLS 或 SASL 的身份验证。
您已配置 Kafka Bridge 以通过其 属性文件 进行身份验证。
12.1.2. 对 Kafka Bridge 的请求
指定数据格式和 HTTP 标头,以确保向 Kafka Bridge 提交有效的请求。
API 请求和响应正文始终编码为 JSON。
12.1.2.1. 内容类型标头
必须为所有请求提交 Content-Type
标头。唯一的例外是,POST
请求正文为空时,添加 Content-Type
标头将导致请求失败。
消费者操作(/consumers
端点)和制作者操作(/topics
端点)需要不同的 Content-Type
标头。
用于消费者操作的内容类型标头
无论 嵌入式数据格式是什么,如果请求正文包含数据,对消费者操作的POST
请求必须提供以下 Content-Type
标头:
Content-Type: application/vnd.kafka.v2+json
用于制作者操作的 content-Type 标头
在执行制作者操作时,POST
请求必须提供 Content-Type
标头指定生成的信息的 嵌入式数据格式。这可以是 json
或 binary
。
嵌入式数据格式 | content-Type 标头 |
---|---|
JSON |
|
二进制 |
|
嵌入式数据格式为每个消费者设置,如下一节所述。
如果 POST
请求有空正文,则不能 设置 Content-Type
。空正文可用于创建具有默认值的消费者。
12.1.2.2. 嵌入式数据格式
嵌入式数据格式是 Kafka 消息通过 HTTP 从生产者传输到使用 Kafka 网桥的消费者的格式。支持两种嵌入的数据格式:JSON 或二进制。
当使用 /consumers/groupid
端点创建消费者时,POST
请求正文必须指定 JSON 或二进制的嵌入式数据格式。这在请求正文的 format
字段中指定,例如:
{
"name": "my-consumer",
"format": "binary", 1
...
}
- 1
- 二进制嵌入式数据格式.
如果没有为使用者指定嵌入式数据格式,则会设置二进制格式。
创建使用者时指定的嵌入式数据格式必须与它将使用的 Kafka 消息的数据格式匹配。
如果您选择指定二进制嵌入式数据格式,后续制作者请求必须在请求正文中以 Base64 编码的字符串形式提供二进制数据。例如,当通过向 /topics/topicname
端点发出 POST
请求来发送信息时,value
必须使用 Base64 编码:
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
制作者请求还必须提供与嵌入式数据格式对应的 Content-Type
标头,如 Content-Type: application/vnd.kafka.binary.v2+json
。
12.1.2.3. 消息格式
当使用 /topics
端点发送消息时,您可以在请求正文中在 records
参数中输入消息有效负载。
records
参数可以包含这些可选字段:
-
message
key
-
message
value
-
目的地
partition
-
message
headers
对 /topics 的 POST
请求示例
curl -X POST \
http://localhost:8080/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "my-key",
"value": "sales-lead-0001"
"partition": 2
"headers": [
{
"key": "key1",
"value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
}
]
},
]
}'
- 1
- 二进制格式的标头值,编码为 Base64。
12.1.2.4. 接受标头
创建消费者后,所有后续 GET 请求都必须以以下格式提供 Accept
标头:
Accept: application/vnd.kafka.embedded-data-format.v2+json
embedded -data-format 是 json
或 binary
。
例如,当使用嵌入的 JSON 数据格式获取订阅的消费者的记录时,包括这个 Accept 标头:
Accept: application/vnd.kafka.json.v2+json
12.1.3. 为 Kafka Bridge 配置日志记录器
AMQ Streams Kafka 网桥允许您为相关 OpenAPI 规格定义的每个操作设置不同的日志级别。
每一操作具有对应的 API 端点,网桥通过它接收来自 HTTP 客户端的请求。您可以更改每个端点的日志级别,以生成有关传入和传出 HTTP 请求的更多或更少的精细日志信息。
日志记录器在 log4j.properties
文件中定义,该文件具有 healthy
和 ready
端点的以下默认配置:
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
所有其他操作的日志级别默认设置为 INFO
。日志记录器的格式如下:
log4j.logger.http.openapi.operation.<operation-id>
其中 <operation-id>
是特定操作的标识符。以下是 OpenAPI 规范定义的操作列表:
-
createConsumer
-
deleteConsumer
-
subscribe
-
unsubscribe
-
poll
-
assign
-
commit
-
send
-
sendToPartition
-
seekToBeginning
-
seekToEnd
-
seek
-
healthy
-
ready
-
openapi
12.1.4. Kafka Bridge API 资源
有关 REST API 端点和描述的完整列表,包括请求和响应示例,请查看 Kafka Bridge API 参考。
12.1.5. 下载 Kafka 网桥存档
从红帽网站下载 AMQ Streams Kafka Bridge 的压缩发行版。
流程
- 从 客户门户网站 下载最新版本的 Red Hat AMQ Streams Kafka Bridge 存档。
12.1.6. 配置 Kafka Bridge 属性
此流程描述了如何配置 AMQ Streams Kafka Bridge 使用的 Kafka 和 HTTP 连接属性。
您可以使用与 Kafka 相关的属性的适当前缀,将 Kafka Bridge 配置为任何其他 Kafka 客户端。
-
kafka.
对于适用于生产者和消费者的一般配置,如服务器连接和安全。 -
kafka.consumer.
对于仅传递给使用者的特定于消费者的配置。 -
kafka.producer.
对于仅传递给制作者的特定制作者的配置。
除了启用对 Kafka 集群的 HTTP 访问外,HTTP 属性还提供通过跨操作资源共享(CORS)启用和定义 Kafka 网桥访问控制的功能。CORS 是一种 HTTP 机制,它允许浏览器从多个来源访问选定的资源。要配置 CORS,您可以定义允许的资源来源列表以及访问它们的 HTTP 方法。请求中的附加 HTTP 标头描述了允许访问 Kafka 集群的原始数据。
流程
编辑 AMQ Streams Kafka Bridge 安装存档提供的
application.properties
文件。使用 属性文件指定 Kafka 和 HTTP 相关的属性,并启用分布式追踪。
配置标准的 Kafka 相关属性,包括特定于 Kafka 用户和生产者的属性。
使用:
-
kafka.bootstrap.servers
定义到 Kafka 集群的主机/端口连接 -
kafka.producer.acks
向 HTTP 客户端提供确认 kafka.consumer.auto.offset.reset
确定如何在 Kafka 中管理重置偏移有关 Kafka 属性配置的更多信息,请参阅 Apache Kafka 网站
-
配置与 HTTP 相关的属性,以启用对 Kafka 集群的 HTTP 访问。
例如:
http.enabled=true http.host=0.0.0.0 http.port=8080 1 http.cors.enabled=true 2 http.cors.allowedOrigins=https://strimzi.io 3 http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 4
启用或禁用分布式追踪。
bridge.tracing=jaeger
从属性中删除代码注释,以启用分布式追踪
12.1.7. 安装 Kafka Bridge
按照以下步骤在 Red Hat Enterprise Linux 上安装 AMQ Streams Kafka Bridge。
流程
- 如果您还没有这样做,请将 AMQ Streams Kafka Bridge 安装存档解压缩到任何目录。
使用配置属性作为参数运行 Kafka Bridge 脚本:
例如:
./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
检查日志中是否成功安装了。
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092