第 12 章 Kafka Bridge


本章概述了 Red Hat Enterprise Linux 上的 AMQ Streams Kafka Bridge,并帮助您开始使用 REST API 与 AMQ Streams 交互。要在本地环境中尝试 Kafka Bridge,请参阅本章后的 第 12.2 节 “Kafka Bridge quickstart”

其他资源

12.1. Kafka Bridge 概述

Kafka Bridge 提供了一个 RESTful 接口,它允许基于 HTTP 的客户端与 Kafka 集群交互。它提供了与 AMQ Streams 的 Web API 连接的优点,而无需客户端应用程序来解释 Kafka 协议。

API 有两个主要的消费者 和主题 - 通过端点公开并可访问,以便与 Kafka 集群中的使用者和制作者交互。资源只与 Kafka Bridge 相关,而不是直接连接到 Kafka 的用户和制作者。

HTTP 请求

Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,使用以下方法:

  • 发送消息到主题。
  • 从主题检索消息。
  • 检索主题的分区列表。
  • 创建和删除用户。
  • 订阅消费者到主题,以便他们开始从这些主题接收信息。
  • 检索消费者订阅的主题列表。
  • 取消订阅消费者的主题.
  • 为消费者分配分区。
  • 提交使用者偏移列表。
  • 定位分区,以便消费者开始收到来自第一或最后一个偏移位置的信息,或给定的偏移位置。

该方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以使用 JSON 或二进制格式发送。

客户端可以在不需要使用原生 Kafka 协议的情况下生成和使用消息。

与 AMQ Streams 安装类似,您可以下载 Kafka Bridge 文件以便在 Red Hat Enterprise Linux 上安装。请参阅 第 12.1.5 节 “下载 Kafka Bridge 归档”

有关为 KafkaBridge 资源配置主机和端口的详情,请参考 第 12.1.6 节 “配置 Kafka Bridge 属性”

12.1.1. 身份验证和加密

尚不支持 HTTP 客户端和 Kafka Bridge 之间的身份验证和加密。这意味着从客户端发送到 Kafka Bridge 的请求有:

  • 不加密,必须使用 HTTP 而不是 HTTPS
  • 在没有身份验证的情况下发送

您可以在 Kafka Bridge 和 Kafka 集群之间配置 TLS 或基于 SASL 的身份验证

您可以通过其 属性文件 配置 Kafka Bridge 进行身份验证。

12.1.2. 对 Kafka Bridge 的请求

指定数据格式和 HTTP 标头,以确保将有效的请求提交到 Kafka Bridge。

API 请求和响应正文始终编码为 JSON。

12.1.2.1. 内容类型标头

必须为所有请求提交 Content-Type 标头。唯一的例外是 POST 请求正文为空,其中添加 Content-Type 标头将导致请求失败。

消费者操作(/consumers 端点)和制作者操作(/topics 端点)需要不同的 Content-Type 标头。

consumer 操作的 content-Type 标头

无论 嵌入式数据格式 是什么,消费者操作的POST 请求都必须提供以下 Content-Type 标头(如果请求正文包含数据):

Content-Type: application/vnd.kafka.v2+json
Copy to Clipboard Toggle word wrap

producer 操作的 content-Type 标头

在执行制作者操作时,POST 请求必须提供 Content-Type 标头来指定 生成的消息的嵌入式数据格式。这可以是 json二进制

Expand
表 12.1. 用于数据格式的 Content-Type 标头
嵌入式数据格式Content-Type 标头

JSON

content-Type: application/vnd.kafka.json.v2+json

二进制

content-Type: application/vnd.kafka.binary.v2+json

嵌入式数据格式是为每个消费者设置的,如下一节中所述。

如果 POST 请求带有空的正文,则一定不能设置 Content-Type。可以使用空正文来创建具有默认值的消费者。

12.1.2.2. 嵌入式数据格式

嵌入式数据格式是通过 HTTP 传输的 Kafka 消息的格式,使用 Kafka Bridge 将生产者传输到消费者。支持两种嵌入式数据格式:JSON 或二进制。

当使用 /consumers/groupid 端点创建消费者时,POST 请求正文必须指定 JSON 或二进制的嵌入式数据格式。这在请求正文中的 format 字段中指定,例如:

{
  "name": "my-consumer",
  "format": "binary", 
1

...
}
Copy to Clipboard Toggle word wrap
1
二进制嵌入式数据格式。

如果没有指定消费者的嵌入式数据格式,则会设置二进制格式。

创建消费者时指定的嵌入式数据格式必须与它将使用的 Kafka 信息的数据格式匹配。

如果您选择指定二进制嵌入的数据格式,后续的制作者请求必须在请求正文中提供二进制数据作为 Base64 编码的字符串。例如,当向 /topics/topicname 端点发出 POST 请求时,该值必须采用 Base64 编码:

{
  "records": [
    {
      "key": "my-key",
      "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
    },
  ]
}
Copy to Clipboard Toggle word wrap

制作者请求还必须提供与嵌入式数据格式对应的 Content-Type 标头,如 Content-Type: application/vnd.kafka.binary.v2+json

12.1.2.3. 消息格式

使用 /topics 端点发送消息时,您可以在请求正文中输入消息有效负载,在 records 参数中输入消息有效负载。

records 参数可以包含这些可选字段:

  • message
  • 消息
  • 目标 分区
  • 消息标头

到 /topics 的 POST 请求示例

curl -X POST \
  http://localhost:8080/topics/my-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
            "partition": 2
            "headers": [
              {
                "key": "key1",
                "value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 
1

              }
            ]
        },
    ]
}'
Copy to Clipboard Toggle word wrap

1
二进制格式的标头值,并编码为 Base64。

12.1.2.4. 接受标头

创建消费者后,所有后续 GET 请求都必须以以下格式提供 Accept 标头:

Accept: application/vnd.kafka.embedded-data-format.v2+json
Copy to Clipboard Toggle word wrap

embedded-data-formatjson二进制

例如,当使用 JSON 的嵌入式数据格式检索订阅消费者的记录时,请包含此 Accept 标头:

Accept: application/vnd.kafka.json.v2+json
Copy to Clipboard Toggle word wrap

12.1.3. 为 Kafka Bridge 配置日志记录器

AMQ Streams Kafka 网桥允许您为每个由相关 OpenAPI 规格定义的操作设置不同的日志级别。

每个操作都有一个对应的 API 端点,网桥通过该端点从 HTTP 客户端接收请求。您可以更改每个端点的日志级别,以生成有关传入和传出 HTTP 请求的更多或更精细的日志信息。

日志程序在 log4j.properties 文件中定义,它包括了对 healthyready 端点的默认配置:

log4j.logger.http.openapi.operation.healthy=WARN, out
log4j.additivity.http.openapi.operation.healthy=false
log4j.logger.http.openapi.operation.ready=WARN, out
log4j.additivity.http.openapi.operation.ready=false
Copy to Clipboard Toggle word wrap

所有其他操作的日志级别默认设置为 INFO。日志记录器的格式如下:

log4j.logger.http.openapi.operation.<operation-id>
Copy to Clipboard Toggle word wrap

其中 <operation-id > 是特定操作的标识符。以下是 OpenAPI 规格定义的操作列表:

  • createConsumer
  • deleteConsumer
  • 订阅
  • unsubscribe
  • poll
  • 分配
  • commit
  • send
  • sendToPartition
  • seekToBeginning
  • seekToEnd
  • seek
  • 健康
  • ready
  • openapi

12.1.4. Kafka Bridge API 资源

有关 REST API 端点和描述的完整列表,包括请求和响应示例,请参阅 Kafka Bridge API 参考

12.1.5. 下载 Kafka Bridge 归档

可以从红帽网站下载 AMQ Streams Kafka Bridge 的压缩发行版。

流程

12.1.6. 配置 Kafka Bridge 属性

此流程描述了如何配置 AMQ Streams Kafka Bridge 使用的 Kafka 和 HTTP 连接属性。

您可以将 Kafka Bridge 配置为任何其他 Kafka 客户端,为 Kafka 相关的属性使用适当的前缀。

  • Kafka. 对于应用于生产者和消费者的常规配置,如服务器连接和安全性。
  • kafka.consumer. 用于只传递给消费者的特定于消费者的配置。
  • kafka.producer. for producer 特定配置仅传递给制作者。

除了启用对 Kafka 集群的 HTTP 访问外,HTTP 属性提供通过 Cross-Origin Resource Sharing (CORS)启用和定义 Kafka Bridge 的访问控制。CORS 是一种 HTTP 机制,它允许浏览器从多个来源访问所选资源。要配置 CORS,您可以定义允许的资源源和 HTTP 方法列表来访问它们。请求中的其他 HTTP 标头 描述了允许访问 Kafka 集群的源

流程

  1. 编辑 AMQ Streams Kafka Bridge 安装存档提供的 application.properties 文件。

    使用属性文件指定 Kafka 和 HTTP 相关属性,并启用分布式追踪。

    1. 配置标准 Kafka 相关属性,包括特定于 Kafka 用户和制作者的属性。

      使用:

      • kafka.bootstrap.servers 来定义到 Kafka 集群的主机/端口连接
      • kafka.producer.acks 为 HTTP 客户端提供确认
      • kafka.consumer.auto.offset.reset 来确定如何在 Kafka 中管理偏移重置

        有关配置 Kafka 属性的更多信息,请参阅 Apache Kafka 网站

    2. 配置与 HTTP 相关的属性,以启用对 Kafka 集群的 HTTP 访问。

      例如:

      http.enabled=true
      http.host=0.0.0.0
      http.port=8080 
      1
      
      http.cors.enabled=true 
      2
      
      http.cors.allowedOrigins=https://strimzi.io 
      3
      
      http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 
      4
      Copy to Clipboard Toggle word wrap
      1
      Kafka Bridge 的默认 HTTP 配置来侦听端口 8080。
      2
      设置为 true 以启用 CORS。
      3
      允许 CORS 来源的逗号分隔列表。您可以使用 URL 或 Java 正则表达式。
      4
      CORS 允许用逗号分隔的 HTTP 方法列表。
    3. 启用或禁用分布式追踪。

      bridge.tracing=jaeger
      Copy to Clipboard Toggle word wrap

      从属性中删除代码注释以启用分布式追踪

12.1.7. 安装 Kafka Bridge

按照以下步骤在 Red Hat Enterprise Linux 上安装 AMQ Streams Kafka Bridge。

流程

  1. 如果您还没有这样做,将 AMQ Streams Kafka Bridge 安装存档解压缩到任何目录中。
  2. 使用配置属性作为参数运行 Kafka Bridge 脚本:

    例如:

    ./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
    Copy to Clipboard Toggle word wrap
  3. 检查日志中的安装是否成功。

    HTTP-Kafka Bridge started and listening on port 8080
    HTTP-Kafka Bridge bootstrap servers localhost:9092
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat