第 6 章 Kafka Bridge
本章概述了 AMQ Streams Kafka Bridge,并帮助您开始使用 REST API 与 AMQ Streams 交互。
- 要在您的本地环境中尝试 Kafka 网桥,请参阅本章后面的 第 6.2 节 “Kafka Bridge quickstart”。
- 有关配置步骤的详情请参考 第 2.5 节 “Kafka Bridge 集群配置”。
- 要查看 API 文档,请参阅 Kafka Bridge API 参考。
6.1. Kafka 网桥概述
您可以使用 AMQ Streams Kafka Bridge 作为接口,向 Kafka 集群发出特定类型的 HTTP 请求。
6.1.1. Kafka Bridge 接口
Kafka Bridge 提供了一个 RESTful 接口,允许基于 HTTP 的客户端与 Kafka 集群交互。 它提供了与 AMQ Streams 的 Web API 连接的优势,不需要客户端应用程序来解释 Kafka 协议。
API 有两个主要资源( 使用者和
主题
),它们通过端点公开并可访问,以便与 Kafka 集群中的用户和生产者交互。资源仅与 Kafka 网桥相关,而不是与 Kafka 直接连接的消费者和生产者。
6.1.1.1. HTTP 请求
Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,其方法如下:
- 发送消息到一个主题。
- 从主题检索消息.
- 检索主题的分区列表。
- 创建和删除消费者.
- 订阅消费者了解主题,以便他们开始接收来自这些主题的信息。
- 检索消费者订阅的主题列表。
- 取消订阅消费者的主题.
- 将分区分配给消费者.
- 提交消费者偏移列表。
- 寻找分区,以便使用者开始接受来自第一个或最后一个偏移位置的信息,或者给定的偏移位置。
这些方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以 JSON 或二进制格式发送。
客户端可以生成和使用消息,而无需使用原生 Kafka 协议。
其他资源
- 要查看 API 文档,包括请求和响应示例,请参阅 Kafka Bridge API 参考。
6.1.2. Kafka Bridge 支持的客户端
您可以使用 Kafka Bridge 将 内部和外部 HTTP 客户端应用程序与 Kafka 集群集成。
- 内部客户端
-
内部客户端是基于容器的 HTTP 客户端,与 Kafka Bridge 本身 在同一个 OpenShift 集群中运行。内部客户端可以访问在 Kafka
Bridge 自定义资源中定义的主机上的 Kafka
Bridge 和端口。 - 外部客户端
- 外部客户端是在 OpenShift 集群 外部 运行的 HTTP 客户端,其中部署并运行 Kafka Bridge。外部客户端可以通过 OpenShift Route、负载均衡器服务或使用 Ingress 访问 Kafka 网桥。
HTTP 内部和外部客户端集成
6.1.3. 保护 Kafka 网桥
AMQ Streams 目前不为 Kafka Bridge 提供任何加密、身份验证或授权。这意味着,从外部客户端发送到 Kafka Bridge 的请求是:
- 未加密,且必须使用 HTTP 而不是 HTTPS
- 在没有验证的情况下发送
但是,您可以使用其他方法保护 Kafka 网桥的安全,例如:
- 定义哪些容器集可以访问 Kafka 网桥的 OpenShift Network 策略。
- 带有身份验证或授权的反向代理,如 OAuth2 代理。
- API 网关.
- 具有 TLS 终止的入口或 OpenShift 路由。
当连接到 Kafka Broker 时,Kafka Bridge 支持 TLS 加密以及 TLS 和 SASL 身份验证。在 OpenShift 集群中,您可以配置:
- Kafka Bridge 和 Kafka 集群间的 TLS 或基于 SASL 的身份验证
- Kafka 网桥和 Kafka 集群之间的 TLS 加密连接。
如需更多信息,请参阅 第 2.5.1 节 “配置 Kafka 网桥”。
您可以使用 Kafka 代理中的 ACL 来限制可通过 Kafka Bridge 使用和生成的主题。
6.1.4. 访问 OpenShift 外部的 Kafka 网桥
部署后,AMQ Streams Kafka Bridge 只能由同一 OpenShift 集群中运行的应用程序访问。这些应用程序使用 kafka-bridge-name-bridge-service
服务来访问 API。
如果要使 Kafka Bridge 可供在 OpenShift 集群外运行的应用程序访问,您可以使用以下功能之一手动公开它:
- LoadBalancer 或 NodePort 类型的服务
- Ingress 资源
- OpenShift Routes
如果您决定创建服务,请使用 选择器
中的以下标签来配置服务要将流量路由到的 pod:
# ...
selector:
strimzi.io/cluster: kafka-bridge-name 1
strimzi.io/kind: KafkaBridge
#...
- 1
- OpenShift 集群中 Kafka Bridge 自定义资源的名称。
6.1.5. 对 Kafka Bridge 的请求
指定数据格式和 HTTP 标头,以确保向 Kafka Bridge 提交有效的请求。
6.1.5.1. 内容类型标头
API 请求和响应正文始终编码为 JSON。
在执行消费者操作时,如果存在非空正文,
POST
请求必须提供以下Content-Type
标头:Content-Type: application/vnd.kafka.v2+json
在执行制作者操作时,
POST 请求
必须提供Content-Type
标头,指定生成的消息的 嵌入式数据格式。这可以是json
或二进制
。嵌入式数据格式 content-Type 标头 JSON
content-Type: application/vnd.kafka.json.v2+json
二进制
content-Type: application/vnd.kafka.binary.v2+json
嵌入式数据格式为每个消费者设置,如下一节所述。
如果 POST 请求
具有空正文,则不能 设置 Content-Type
。空正文可用于创建具有默认值的消费者。
6.1.5.2. 嵌入式数据格式
嵌入式数据格式是 Kafka 消息通过 HTTP 从生产者传输到使用 Kafka 网桥的消费者的格式。支持两种嵌入式数据格式:JSON 和二进制.
在使用 /consumers/groupid
端点创建消费者时,POST 请求
正文必须指定嵌入的数据格式(JSON 或二进制)。这在 format
字段中指定,例如:
{
"name": "my-consumer",
"format": "binary", 1
...
}
- 1
- 二进制嵌入式数据格式.
创建使用者时指定的嵌入式数据格式必须与它将使用的 Kafka 消息的数据格式匹配。
如果您选择指定二进制嵌入式数据格式,后续制作者请求必须在请求正文中以 Base64 编码的字符串形式提供二进制数据。例如,在使用 /topics/topicname
端点发送信息时,records.value
必须采用 Base64 编码:
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
制作者请求还必须提供与嵌入式数据格式对应的 Content-Type
标头,如 Content-Type: application/vnd.kafka.binary.v2+json
。
6.1.5.3. 消息格式
使用 /topics 端点
发送消息时,您将在请求正文中的 record 参数
中输入消息有效负载。
records
参数可以包含任何这些可选字段:
-
消息
标头
-
消息
键
-
消息
值
-
目标
分区
到 /topics 的 POST
请求示例
curl -X POST \
http://localhost:8080/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "my-key",
"value": "sales-lead-0001"
"partition": 2
"headers": [
{
"key": "key1",
"value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
}
]
},
]
}'
- 1
- 二进制格式的标头值,编码为 Base64。
6.1.5.4. 接受标头
创建消费者后,所有后续 GET 请求都必须以以下格式提供 Accept
标头:
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
EMBEDDED-DATA-FORMAT
是 json
或 二进制
。
例如,当使用嵌入的 JSON 数据格式获取订阅的消费者的记录时,包括这个 Accept 标头:
Accept: application/vnd.kafka.json.v2+json
6.1.6. CORS
跨 Origin 资源共享(CORS)允许您指定在 Kafka 网桥 HTTP 配置 中访问 Kafka 集群的允许方法和原始 URL。
Kafka Bridge 的 CORS 配置示例
# ... cors: allowedOrigins: "https://strimzi.io" allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH" # ...
CORS 允许在不同域中的原始源之间 简单 且 预先理解的请求。
简单请求适用于使用 GET
、HEAD、POST
方法的标准请求。
预定义的请求会发送 HTTP OPTIONS 请求,作为检查实际请求是否安全发送的初始检查。确认后会发送实际请求。preflight 请求适用于需要更大保护的方法,如 PUT
和 DELETE
,以及使用非标准标头。
所有请求都需要其标头中有一个 Origin
值,这是 HTTP 请求的来源。
6.1.6.1. 简单请求
例如:这个简单请求标头将原始文件指定为 https://strimzi.io
。
Origin: https://strimzi.io
标头信息添加到请求中。
curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \
-H 'Origin: https://strimzi.io'\
-H 'content-type: application/vnd.kafka.v2+json'
在 Kafka Bridge 的响应中,会返回 Access-Control-Allow-Origin
标头。
HTTP/1.1 200 OK
Access-Control-Allow-Origin: * 1
- 1
- 返回星号
(*
)表示资源可以被任何域访问。
6.1.6.2. Preflighted 请求
使用 a OPTIONS
方法将初始 preflight 请求发送到 Kafka Bridge。HTTP OPTIONS 请求发送标头信息,以检查 Kafka Bridge 是否允许实际请求。
这里的 preflight 请求检查 POST 请求
是否从 https://strimzi.io
有效。
OPTIONS /my-group/instances/my-user/subscription HTTP/1.1 Origin: https://strimzi.io Access-Control-Request-Method: POST 1 Access-Control-Request-Headers: Content-Type 2
OPTIONS
被添加到 preflight 请求的标题信息中。
curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \ -H 'Access-Control-Request-Method: POST' \ -H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridge 响应初始请求,以确认请求被接受。响应标头返回允许的来源、方法和标头。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH Access-Control-Allow-Headers: content-type
如果原始或方法被拒绝,则返回错误消息。
实际请求不需要 Access-Control-Request-Method
标头,因为它已在 preflight 请求中确认,但它确实需要 origin 标头。
curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \
-H 'Origin: https://strimzi.io' \
-H 'content-type: application/vnd.kafka.v2+json'
响应中显示允许的来源 URL。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io
其他资源
获取 CORS 规格
6.1.7. Kafka Bridge API 资源
有关 REST API 端点和描述的完整列表,包括请求和响应示例,请查看 Kafka Bridge API 参考。
6.1.8. Kafka Bridge 部署
您可以使用 Cluster Operator 将 Kafka Bridge 部署到 OpenShift 集群中。
部署 Kafka Bridge 后,Cluster Operator 会在 OpenShift 集群中创建 Kafka Bridge 对象。对象包括 部署、服务和 pod,各自名称都以 Kafka Bridge 自定义资源中指定的名称命名。
其他资源
- 有关部署说明,请参阅 OpenShift 指南中的 Deploying AMQ Streams 中的将 Kafka Bridge 部署到 OpenShift 集群。
- 有关配置 Kafka 网桥的详情,请参考 第 2.5 节 “Kafka Bridge 集群配置”
-
有关为
KafkaBridge
资源配置主机和端口的详情请参考 第 2.5.1 节 “配置 Kafka 网桥”。 - 有关集成外部客户端的详情请参考 第 6.1.4 节 “访问 OpenShift 外部的 Kafka 网桥”。