第 12 章 Kafka Bridge
本章概述了 Red Hat Enterprise Linux 上的 AMQ Streams Kafka Bridge,并帮助您开始使用 REST API 与 AMQ Streams 交互。要在本地环境中尝试 Kafka Bridge,请参阅本章后的 第 12.2 节 “Kafka Bridge quickstart”。
其他资源
- 要查看 API 文档,包括请求和响应示例,请参阅 Kafka Bridge API 参考。
- 要为分布式追踪配置 Kafka Bridge,请参阅 第 15.4 节 “为 Kafka Bridge 启用追踪”。
12.1. Kafka Bridge 概述 复制链接链接已复制到粘贴板!
Kafka Bridge 提供了一个 RESTful 接口,它允许基于 HTTP 的客户端与 Kafka 集群交互。它提供了与 AMQ Streams 的 Web API 连接的优点,而无需客户端应用程序来解释 Kafka 协议。
API 有两个主要的消费者
和主题 - 通过端点公开并可访问,以便与 Kafka 集群中的使用者和制作者交互。资源只与 Kafka Bridge 相关,而不是直接连接到 Kafka 的用户和制作者。
HTTP 请求
Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,使用以下方法:
- 发送消息到主题。
- 从主题检索消息。
- 检索主题的分区列表。
- 创建和删除用户。
- 订阅消费者到主题,以便他们开始从这些主题接收信息。
- 检索消费者订阅的主题列表。
- 取消订阅消费者的主题.
- 为消费者分配分区。
- 提交使用者偏移列表。
- 定位分区,以便消费者开始收到来自第一或最后一个偏移位置的信息,或给定的偏移位置。
该方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以使用 JSON 或二进制格式发送。
客户端可以在不需要使用原生 Kafka 协议的情况下生成和使用消息。
与 AMQ Streams 安装类似,您可以下载 Kafka Bridge 文件以便在 Red Hat Enterprise Linux 上安装。请参阅 第 12.1.5 节 “下载 Kafka Bridge 归档”。
有关为 KafkaBridge
资源配置主机和端口的详情,请参考 第 12.1.6 节 “配置 Kafka Bridge 属性”。
12.1.1. 身份验证和加密 复制链接链接已复制到粘贴板!
尚不支持 HTTP 客户端和 Kafka Bridge 之间的身份验证和加密。这意味着从客户端发送到 Kafka Bridge 的请求有:
- 不加密,必须使用 HTTP 而不是 HTTPS
- 在没有身份验证的情况下发送
您可以在 Kafka Bridge 和 Kafka 集群之间配置 TLS 或基于 SASL 的身份验证。
您可以通过其 属性文件 配置 Kafka Bridge 进行身份验证。
12.1.2. 对 Kafka Bridge 的请求 复制链接链接已复制到粘贴板!
指定数据格式和 HTTP 标头,以确保将有效的请求提交到 Kafka Bridge。
API 请求和响应正文始终编码为 JSON。
12.1.2.1. 内容类型标头 复制链接链接已复制到粘贴板!
必须为所有请求提交 Content-Type
标头。唯一的例外是 POST
请求正文为空,其中添加 Content-Type
标头将导致请求失败。
消费者操作(/consumers
端点)和制作者操作(/topics
端点)需要不同的 Content-Type
标头。
consumer 操作的 content-Type 标头
无论 嵌入式数据格式 是什么,消费者操作的POST
请求都必须提供以下 Content-Type
标头(如果请求正文包含数据):
Content-Type: application/vnd.kafka.v2+json
Content-Type: application/vnd.kafka.v2+json
producer 操作的 content-Type 标头
在执行制作者操作时,POST
请求必须提供 Content-Type
标头来指定 生成的消息的嵌入式数据格式。这可以是 json
或 二进制
。
嵌入式数据格式 | Content-Type 标头 |
---|---|
JSON |
|
二进制 |
|
嵌入式数据格式是为每个消费者设置的,如下一节中所述。
如果 POST
请求带有空的正文,则一定不能设置 Content-Type
。可以使用空正文来创建具有默认值的消费者。
12.1.2.2. 嵌入式数据格式 复制链接链接已复制到粘贴板!
嵌入式数据格式是通过 HTTP 传输的 Kafka 消息的格式,使用 Kafka Bridge 将生产者传输到消费者。支持两种嵌入式数据格式:JSON 或二进制。
当使用 /consumers/groupid
端点创建消费者时,POST
请求正文必须指定 JSON 或二进制的嵌入式数据格式。这在请求正文中的 format
字段中指定,例如:
{ "name": "my-consumer", "format": "binary", ... }
{
"name": "my-consumer",
"format": "binary",
...
}
- 1
- 二进制嵌入式数据格式。
如果没有指定消费者的嵌入式数据格式,则会设置二进制格式。
创建消费者时指定的嵌入式数据格式必须与它将使用的 Kafka 信息的数据格式匹配。
如果您选择指定二进制嵌入的数据格式,后续的制作者请求必须在请求正文中提供二进制数据作为 Base64 编码的字符串。例如,当向 /topics/topicname
端点发出 POST
请求时,该值必须采用 Base64 编码:
制作者请求还必须提供与嵌入式数据格式对应的 Content-Type
标头,如 Content-Type: application/vnd.kafka.binary.v2+json
。
12.1.2.3. 消息格式 复制链接链接已复制到粘贴板!
使用 /topics
端点发送消息时,您可以在请求正文中输入消息有效负载,在 records
参数中输入消息有效负载。
records
参数可以包含这些可选字段:
-
message
键
-
消息
值
-
目标
分区
-
消息标头
到 /topics 的 POST
请求示例
- 1
- 二进制格式的标头值,并编码为 Base64。
12.1.2.4. 接受标头 复制链接链接已复制到粘贴板!
创建消费者后,所有后续 GET 请求都必须以以下格式提供 Accept
标头:
Accept: application/vnd.kafka.embedded-data-format.v2+json
Accept: application/vnd.kafka.embedded-data-format.v2+json
embedded-data-format 是 json
或 二进制
。
例如,当使用 JSON 的嵌入式数据格式检索订阅消费者的记录时,请包含此 Accept 标头:
Accept: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.json.v2+json
12.1.3. 为 Kafka Bridge 配置日志记录器 复制链接链接已复制到粘贴板!
AMQ Streams Kafka 网桥允许您为每个由相关 OpenAPI 规格定义的操作设置不同的日志级别。
每个操作都有一个对应的 API 端点,网桥通过该端点从 HTTP 客户端接收请求。您可以更改每个端点的日志级别,以生成有关传入和传出 HTTP 请求的更多或更精细的日志信息。
日志程序在 log4j.properties
文件中定义,它包括了对 healthy
和 ready
端点的默认配置:
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
log4j.logger.http.openapi.operation.healthy=WARN, out
log4j.additivity.http.openapi.operation.healthy=false
log4j.logger.http.openapi.operation.ready=WARN, out
log4j.additivity.http.openapi.operation.ready=false
所有其他操作的日志级别默认设置为 INFO
。日志记录器的格式如下:
log4j.logger.http.openapi.operation.<operation-id>
log4j.logger.http.openapi.operation.<operation-id>
其中 <operation-id
> 是特定操作的标识符。以下是 OpenAPI 规格定义的操作列表:
-
createConsumer
-
deleteConsumer
-
订阅
-
unsubscribe
-
poll
-
分配
-
commit
-
send
-
sendToPartition
-
seekToBeginning
-
seekToEnd
-
seek
-
健康
-
ready
-
openapi
12.1.4. Kafka Bridge API 资源 复制链接链接已复制到粘贴板!
有关 REST API 端点和描述的完整列表,包括请求和响应示例,请参阅 Kafka Bridge API 参考。
12.1.5. 下载 Kafka Bridge 归档 复制链接链接已复制到粘贴板!
可以从红帽网站下载 AMQ Streams Kafka Bridge 的压缩发行版。
流程
- 从客户门户网站下载 最新版本的 Red Hat AMQ Streams Kafka Bridge 存档。
12.1.6. 配置 Kafka Bridge 属性 复制链接链接已复制到粘贴板!
此流程描述了如何配置 AMQ Streams Kafka Bridge 使用的 Kafka 和 HTTP 连接属性。
您可以将 Kafka Bridge 配置为任何其他 Kafka 客户端,为 Kafka 相关的属性使用适当的前缀。
-
Kafka.
对于应用于生产者和消费者的常规配置,如服务器连接和安全性。 -
kafka.consumer.
用于只传递给消费者的特定于消费者的配置。 -
kafka.producer.
for producer 特定配置仅传递给制作者。
除了启用对 Kafka 集群的 HTTP 访问外,HTTP 属性提供通过 Cross-Origin Resource Sharing (CORS)启用和定义 Kafka Bridge 的访问控制。CORS 是一种 HTTP 机制,它允许浏览器从多个来源访问所选资源。要配置 CORS,您可以定义允许的资源源和 HTTP 方法列表来访问它们。请求中的其他 HTTP 标头 描述了允许访问 Kafka 集群的源。
流程
编辑 AMQ Streams Kafka Bridge 安装存档提供的
application.properties
文件。使用属性文件指定 Kafka 和 HTTP 相关属性,并启用分布式追踪。
配置标准 Kafka 相关属性,包括特定于 Kafka 用户和制作者的属性。
使用:
-
kafka.bootstrap.servers
来定义到 Kafka 集群的主机/端口连接 -
kafka.producer.acks
为 HTTP 客户端提供确认 kafka.consumer.auto.offset.reset
来确定如何在 Kafka 中管理偏移重置有关配置 Kafka 属性的更多信息,请参阅 Apache Kafka 网站
-
配置与 HTTP 相关的属性,以启用对 Kafka 集群的 HTTP 访问。
例如:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 启用或禁用分布式追踪。
bridge.tracing=jaeger
bridge.tracing=jaeger
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 从属性中删除代码注释以启用分布式追踪
12.1.7. 安装 Kafka Bridge 复制链接链接已复制到粘贴板!
按照以下步骤在 Red Hat Enterprise Linux 上安装 AMQ Streams Kafka Bridge。
流程
- 如果您还没有这样做,将 AMQ Streams Kafka Bridge 安装存档解压缩到任何目录中。
使用配置属性作为参数运行 Kafka Bridge 脚本:
例如:
./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 检查日志中的安装是否成功。
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
Copy to Clipboard Copied! Toggle word wrap Toggle overflow