搜索

第 6 章 Kafka Bridge

download PDF

本章概述了 AMQ Streams Kafka Bridge,并帮助您开始使用 REST API 与 AMQ Streams 交互。

6.1. Kafka 网桥概述

您可以使用 AMQ Streams Kafka Bridge 作为接口,向 Kafka 集群发出特定类型的 HTTP 请求。

6.1.1. Kafka Bridge 接口

Kafka Bridge 提供了一个 RESTful 接口,允许基于 HTTP 的客户端与 Kafka 集群交互。  它提供了与 AMQ Streams 的 Web API 连接的优势,不需要客户端应用程序来解释 Kafka 协议。

API 有两个主要资源( 使用者和 主题 ),它们通过端点公开并可访问,以便与 Kafka 集群中的用户和生产者交互。资源仅与 Kafka 网桥相关,而不是与 Kafka 直接连接的消费者和生产者。

6.1.1.1. HTTP 请求

Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,其方法如下:

  • 发送消息到一个主题。
  • 从主题检索消息.
  • 检索主题的分区列表。
  • 创建和删除消费者.
  • 订阅消费者了解主题,以便他们开始接收来自这些主题的信息。
  • 检索消费者订阅的主题列表。
  • 取消订阅消费者的主题.
  • 将分区分配给消费者.
  • 提交消费者偏移列表。
  • 寻找分区,以便使用者开始接受来自第一个或最后一个偏移位置的信息,或者给定的偏移位置。

这些方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以 JSON 或二进制格式发送。

客户端可以生成和使用消息,而无需使用原生 Kafka 协议。

其他资源

6.1.2. Kafka Bridge 支持的客户端

您可以使用 Kafka Bridge 将 内部和外部 HTTP 客户端应用程序与 Kafka 集群集成。

内部客户端
内部客户端是基于容器的 HTTP 客户端,与 Kafka Bridge 本身 在同一个 OpenShift 集群中运行。内部客户端可以访问在 Kafka Bridge 自定义资源中定义的主机上的 Kafka Bridge 和端口。
外部客户端
外部客户端是在 OpenShift 集群 外部 运行的 HTTP 客户端,其中部署并运行 Kafka Bridge。外部客户端可以通过 OpenShift Route、负载均衡器服务或使用 Ingress 访问 Kafka 网桥。

HTTP 内部和外部客户端集成

Internal and external HTTP producers and consumers exchange data with the Kafka brokers through the Kafka Bridge

6.1.3. 保护 Kafka 网桥

AMQ Streams 目前不为 Kafka Bridge 提供任何加密、身份验证或授权。这意味着,从外部客户端发送到 Kafka Bridge 的请求是:

  • 未加密,且必须使用 HTTP 而不是 HTTPS
  • 在没有验证的情况下发送

但是,您可以使用其他方法保护 Kafka 网桥的安全,例如:

  • 定义哪些容器集可以访问 Kafka 网桥的 OpenShift Network 策略。
  • 带有身份验证或授权的反向代理,如 OAuth2 代理。
  • API 网关.
  • 具有 TLS 终止的入口或 OpenShift 路由。

当连接到 Kafka Broker 时,Kafka Bridge 支持 TLS 加密以及 TLS 和 SASL 身份验证。在 OpenShift 集群中,您可以配置:

  • Kafka Bridge 和 Kafka 集群间的 TLS 或基于 SASL 的身份验证
  • Kafka 网桥和 Kafka 集群之间的 TLS 加密连接。

如需更多信息,请参阅 第 2.5.1 节 “配置 Kafka 网桥”

您可以使用 Kafka 代理中的 ACL 来限制可通过 Kafka Bridge 使用和生成的主题。

6.1.4. 访问 OpenShift 外部的 Kafka 网桥

部署后,AMQ Streams Kafka Bridge 只能由同一 OpenShift 集群中运行的应用程序访问。这些应用程序使用 kafka-bridge-name-bridge-service 服务来访问 API。

如果要使 Kafka Bridge 可供在 OpenShift 集群外运行的应用程序访问,您可以使用以下功能之一手动公开它:

  • LoadBalancer 或 NodePort 类型的服务
  • Ingress 资源
  • OpenShift Routes

如果您决定创建服务,请使用 选择器 中的以下标签来配置服务要将流量路由到的 pod:

  # ...
  selector:
    strimzi.io/cluster: kafka-bridge-name 1
    strimzi.io/kind: KafkaBridge
  #...
1
OpenShift 集群中 Kafka Bridge 自定义资源的名称。

6.1.5. 对 Kafka Bridge 的请求

指定数据格式和 HTTP 标头,以确保向 Kafka Bridge 提交有效的请求。

6.1.5.1. 内容类型标头

API 请求和响应正文始终编码为 JSON。

  • 在执行消费者操作时,如果存在非空正文,POST 请求必须提供以下 Content-Type 标头:

    Content-Type: application/vnd.kafka.v2+json
  • 在执行制作者操作时,POST 请求 必须提供 Content-Type 标头,指定生成的消息的 嵌入式数据格式。这可以是 json二进制

    嵌入式数据格式content-Type 标头

    JSON

    content-Type: application/vnd.kafka.json.v2+json

    二进制

    content-Type: application/vnd.kafka.binary.v2+json

嵌入式数据格式为每个消费者设置,如下一节所述。

如果 POST 请求 具有空正文,则不能 设置 Content-Type。空正文可用于创建具有默认值的消费者。

6.1.5.2. 嵌入式数据格式

嵌入式数据格式是 Kafka 消息通过 HTTP 从生产者传输到使用 Kafka 网桥的消费者的格式。支持两种嵌入式数据格式:JSON 和二进制.

在使用 /consumers/groupid 端点创建消费者时,POST 请求 正文必须指定嵌入的数据格式(JSON 或二进制)。这在 format 字段中指定,例如:

{
  "name": "my-consumer",
  "format": "binary", 1
...
}
1
二进制嵌入式数据格式.

创建使用者时指定的嵌入式数据格式必须与它将使用的 Kafka 消息的数据格式匹配。

如果您选择指定二进制嵌入式数据格式,后续制作者请求必须在请求正文中以 Base64 编码的字符串形式提供二进制数据。例如,在使用 /topics/topicname 端点发送信息时,records.value 必须采用 Base64 编码:

{
  "records": [
    {
      "key": "my-key",
      "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
    },
  ]
}

制作者请求还必须提供与嵌入式数据格式对应的 Content-Type 标头,如 Content-Type: application/vnd.kafka.binary.v2+json

6.1.5.3. 消息格式

使用 /topics 端点 发送消息时,您将在请求正文中的 record 参数 中输入消息有效负载。

records 参数可以包含任何这些可选字段:

  • 消息 标头
  • 消息
  • 消息
  • 目标 分区

到 /topics 的 POST 请求示例

curl -X POST \
  http://localhost:8080/topics/my-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
            "partition": 2
            "headers": [
              {
                "key": "key1",
                "value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
              }
            ]
        },
    ]
}'

1
二进制格式的标头值,编码为 Base64。

6.1.5.4. 接受标头

创建消费者后,所有后续 GET 请求都必须以以下格式提供 Accept 标头:

Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json

EMBEDDED-DATA-FORMATjson二进制

例如,当使用嵌入的 JSON 数据格式获取订阅的消费者的记录时,包括这个 Accept 标头:

Accept: application/vnd.kafka.json.v2+json

6.1.6. CORS

跨 Origin 资源共享(CORS)允许您指定在 Kafka 网桥 HTTP 配置 中访问 Kafka 集群的允许方法和原始 URL。

Kafka Bridge 的 CORS 配置示例

# ...
cors:
  allowedOrigins: "https://strimzi.io"
  allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH"
  # ...

CORS 允许在不同域中的原始源之间 简单预先理解的请求

简单请求适用于使用 GETHEAD、POST 方法的标准请求。

预定义的请求会发送 HTTP OPTIONS 请求,作为检查实际请求是否安全发送的初始检查。确认后会发送实际请求。preflight 请求适用于需要更大保护的方法,如 PUTDELETE,以及使用非标准标头。

所有请求都需要其标头中有一个 Origin 值,这是 HTTP 请求的来源。

6.1.6.1. 简单请求

例如:这个简单请求标头将原始文件指定为 https://strimzi.io

Origin: https://strimzi.io

标头信息添加到请求中。

curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \
-H 'Origin: https://strimzi.io'\
-H 'content-type: application/vnd.kafka.v2+json'

在 Kafka Bridge 的响应中,会返回 Access-Control-Allow-Origin 标头。

HTTP/1.1 200 OK
Access-Control-Allow-Origin: * 1
1
返回星号(*)表示资源可以被任何域访问。

6.1.6.2. Preflighted 请求

使用 a OPTIONS 方法将初始 preflight 请求发送到 Kafka Bridge。HTTP OPTIONS 请求发送标头信息,以检查 Kafka Bridge 是否允许实际请求。

这里的 preflight 请求检查 POST 请求 是否从 https://strimzi.io 有效。

OPTIONS /my-group/instances/my-user/subscription HTTP/1.1
Origin: https://strimzi.io
Access-Control-Request-Method: POST 1
Access-Control-Request-Headers: Content-Type 2
1
Kafka Bridge 已被警告,实际请求是 POST 请求
2
实际的请求将与 Content-Type 标头一起发送。

OPTIONS 被添加到 preflight 请求的标题信息中。

curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \
-H 'Access-Control-Request-Method: POST' \
-H 'content-type: application/vnd.kafka.v2+json'

Kafka Bridge 响应初始请求,以确认请求被接受。响应标头返回允许的来源、方法和标头。

HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://strimzi.io
Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH
Access-Control-Allow-Headers: content-type

如果原始或方法被拒绝,则返回错误消息。

实际请求不需要 Access-Control-Request-Method 标头,因为它已在 preflight 请求中确认,但它确实需要 origin 标头。

curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \
-H 'Origin: https://strimzi.io' \
-H 'content-type: application/vnd.kafka.v2+json'

响应中显示允许的来源 URL。

HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://strimzi.io

其他资源

获取 CORS 规格

6.1.7. Kafka Bridge API 资源

有关 REST API 端点和描述的完整列表,包括请求和响应示例,请查看 Kafka Bridge API 参考

6.1.8. Kafka Bridge 部署

您可以使用 Cluster Operator 将 Kafka Bridge 部署到 OpenShift 集群中。

部署 Kafka Bridge 后,Cluster Operator 会在 OpenShift 集群中创建 Kafka Bridge 对象。对象包括 部署服务和 pod,各自名称都以 Kafka Bridge 自定义资源中指定的名称命名。

其他资源

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.