搜索

第 12 章 Kafka Bridge

download PDF

本章概述了 Red Hat Enterprise Linux 上的 AMQ Streams Kafka Bridge,并帮助您开始使用 REST API 与 AMQ Streams 交互。要在您的本地环境中尝试 Kafka 网桥,请参阅本章后面的 第 12.2 节 “Kafka Bridge quickstart”

其它资源

12.1. Kafka 网桥概述

Kafka Bridge 提供了一个 RESTful 接口,允许基于 HTTP 的客户端与 Kafka 集群交互。它提供了与 AMQ Streams 的 Web API 连接的优势,不需要客户端应用程序来解释 Kafka 协议。

API 有两个主要资源:consumerstopics,它们通过端点公开并可访问,以便与 Kafka 集群中的用户和生产者交互。资源仅与 Kafka 网桥相关,而不是与 Kafka 直接连接的消费者和生产者。

HTTP 请求

Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,其方法如下:

  • 发送消息到一个主题。
  • 从主题检索消息.
  • 检索主题的分区列表。
  • 创建和删除消费者.
  • 订阅消费者了解主题,以便他们开始接收来自这些主题的信息。
  • 检索消费者订阅的主题列表。
  • 取消订阅消费者的主题.
  • 将分区分配给消费者.
  • 提交消费者偏移列表。
  • 寻找分区,以便使用者开始接受来自第一个或最后一个偏移位置的信息,或者给定的偏移位置。

这些方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以 JSON 或二进制格式发送。

客户端可以生成和使用消息,而无需使用原生 Kafka 协议。

与 AMQ Streams 安装类似,您可以下载用于在 Red Hat Enterprise Linux 上安装的 Kafka Bridge 文件。请参阅 第 12.1.5 节 “下载 Kafka 网桥存档”

有关为 KafkaBridge 资源配置主机和端口的详情请参考 第 12.1.6 节 “配置 Kafka Bridge 属性”

12.1.1. 身份验证和加密

尚不支持 HTTP 客户端和 Kafka 网桥之间的身份验证和加密。这意味着,从客户端发送到 Kafka Bridge 的请求是:

  • 未加密,且必须使用 HTTP 而不是 HTTPS
  • 在没有验证的情况下发送

您可以在 Kafka Bridge 和 Kafka 集群间配置 TLS 或 SASL 的身份验证

您已配置 Kafka Bridge 以通过其 属性文件 进行身份验证。

12.1.2. 对 Kafka Bridge 的请求

指定数据格式和 HTTP 标头,以确保向 Kafka Bridge 提交有效的请求。

API 请求和响应正文始终编码为 JSON。

12.1.2.1. 内容类型标头

必须为所有请求提交 Content-Type 标头。唯一的例外是,POST 请求正文为空时,添加 Content-Type 标头将导致请求失败。

消费者操作(/consumers 端点)和制作者操作(/topics 端点)需要不同的 Content-Type 标头。

用于消费者操作的内容类型标头

无论 嵌入式数据格式是什么,如果请求正文包含数据,对消费者操作的POST 请求必须提供以下 Content-Type 标头:

Content-Type: application/vnd.kafka.v2+json

用于制作者操作的 content-Type 标头

在执行制作者操作时,POST 请求必须提供 Content-Type 标头指定生成的信息的 嵌入式数据格式。这可以是 jsonbinary

表 12.1. 数据格式的内容类型标头
嵌入式数据格式content-Type 标头

JSON

Content-Type: application/vnd.kafka.json.v2+json

二进制

Content-Type: application/vnd.kafka.binary.v2+json

嵌入式数据格式为每个消费者设置,如下一节所述。

如果 POST 请求有空正文,则不能 设置 Content-Type。空正文可用于创建具有默认值的消费者。

12.1.2.2. 嵌入式数据格式

嵌入式数据格式是 Kafka 消息通过 HTTP 从生产者传输到使用 Kafka 网桥的消费者的格式。支持两种嵌入的数据格式:JSON 或二进制。

当使用 /consumers/groupid 端点创建消费者时,POST 请求正文必须指定 JSON 或二进制的嵌入式数据格式。这在请求正文的 format 字段中指定,例如:

{
  "name": "my-consumer",
  "format": "binary", 1
...
}
1
二进制嵌入式数据格式.

如果没有为使用者指定嵌入式数据格式,则会设置二进制格式。

创建使用者时指定的嵌入式数据格式必须与它将使用的 Kafka 消息的数据格式匹配。

如果您选择指定二进制嵌入式数据格式,后续制作者请求必须在请求正文中以 Base64 编码的字符串形式提供二进制数据。例如,当通过向 /topics/topicname 端点发出 POST 请求来发送信息时,value 必须使用 Base64 编码:

{
  "records": [
    {
      "key": "my-key",
      "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
    },
  ]
}

制作者请求还必须提供与嵌入式数据格式对应的 Content-Type 标头,如 Content-Type: application/vnd.kafka.binary.v2+json

12.1.2.3. 消息格式

当使用 /topics 端点发送消息时,您可以在请求正文中在 records 参数中输入消息有效负载。

records 参数可以包含这些可选字段:

  • message key
  • message value
  • 目的地 partition
  • message headers

对 /topics 的 POST 请求示例

curl -X POST \
  http://localhost:8080/topics/my-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
            "partition": 2
            "headers": [
              {
                "key": "key1",
                "value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
              }
            ]
        },
    ]
}'

1
二进制格式的标头值,编码为 Base64。

12.1.2.4. 接受标头

创建消费者后,所有后续 GET 请求都必须以以下格式提供 Accept 标头:

Accept: application/vnd.kafka.embedded-data-format.v2+json

embedded -data-formatjsonbinary

例如,当使用嵌入的 JSON 数据格式获取订阅的消费者的记录时,包括这个 Accept 标头:

Accept: application/vnd.kafka.json.v2+json

12.1.3. 为 Kafka Bridge 配置日志记录器

AMQ Streams Kafka 网桥允许您为相关 OpenAPI 规格定义的每个操作设置不同的日志级别。

每一操作具有对应的 API 端点,网桥通过它接收来自 HTTP 客户端的请求。您可以更改每个端点的日志级别,以生成有关传入和传出 HTTP 请求的更多或更少的精细日志信息。

日志记录器在 log4j.properties 文件中定义,该文件具有 healthyready 端点的以下默认配置:

log4j.logger.http.openapi.operation.healthy=WARN, out
log4j.additivity.http.openapi.operation.healthy=false
log4j.logger.http.openapi.operation.ready=WARN, out
log4j.additivity.http.openapi.operation.ready=false

所有其他操作的日志级别默认设置为 INFO。日志记录器的格式如下:

log4j.logger.http.openapi.operation.<operation-id>

其中 <operation-id> 是特定操作的标识符。以下是 OpenAPI 规范定义的操作列表:

  • createConsumer
  • deleteConsumer
  • subscribe
  • unsubscribe
  • poll
  • assign
  • commit
  • send
  • sendToPartition
  • seekToBeginning
  • seekToEnd
  • seek
  • healthy
  • ready
  • openapi

12.1.4. Kafka Bridge API 资源

有关 REST API 端点和描述的完整列表,包括请求和响应示例,请查看 Kafka Bridge API 参考

12.1.5. 下载 Kafka 网桥存档

从红帽网站下载 AMQ Streams Kafka Bridge 的压缩发行版。

流程

12.1.6. 配置 Kafka Bridge 属性

此流程描述了如何配置 AMQ Streams Kafka Bridge 使用的 Kafka 和 HTTP 连接属性。

您可以使用与 Kafka 相关的属性的适当前缀,将 Kafka Bridge 配置为任何其他 Kafka 客户端。

  • kafka. 对于适用于生产者和消费者的一般配置,如服务器连接和安全。
  • kafka.consumer. 对于仅传递给使用者的特定于消费者的配置。
  • kafka.producer. 对于仅传递给制作者的特定制作者的配置。

除了启用对 Kafka 集群的 HTTP 访问外,HTTP 属性还提供通过跨操作资源共享(CORS)启用和定义 Kafka 网桥访问控制的功能。CORS 是一种 HTTP 机制,它允许浏览器从多个来源访问选定的资源。要配置 CORS,您可以定义允许的资源来源列表以及访问它们的 HTTP 方法。请求中的附加 HTTP 标头描述了允许访问 Kafka 集群的原始数据

流程

  1. 编辑 AMQ Streams Kafka Bridge 安装存档提供的 application.properties 文件。

    使用 属性文件指定 Kafka 和 HTTP 相关的属性,并启用分布式追踪。

    1. 配置标准的 Kafka 相关属性,包括特定于 Kafka 用户和生产者的属性。

      使用:

      • kafka.bootstrap.servers 定义到 Kafka 集群的主机/端口连接
      • kafka.producer.acks 向 HTTP 客户端提供确认
      • kafka.consumer.auto.offset.reset 确定如何在 Kafka 中管理重置偏移

        有关 Kafka 属性配置的更多信息,请参阅 Apache Kafka 网站

    2. 配置与 HTTP 相关的属性,以启用对 Kafka 集群的 HTTP 访问。

      例如:

      http.enabled=true
      http.host=0.0.0.0
      http.port=8080 1
      http.cors.enabled=true 2
      http.cors.allowedOrigins=https://strimzi.io 3
      http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 4
      1
      Kafka Bridge 侦听端口 8080 的默认 HTTP 配置。
      2
      设置为 true 以启用 CORS。
      3
      允许的 CORS 原始源以逗号分隔的列表。您可以使用 URL 或 Java 正则表达式。
      4
      以逗号分隔的 CORS 允许 HTTP 方法列表。
    3. 启用或禁用分布式追踪。

      bridge.tracing=jaeger

      从属性中删除代码注释,以启用分布式追踪

12.1.7. 安装 Kafka Bridge

按照以下步骤在 Red Hat Enterprise Linux 上安装 AMQ Streams Kafka Bridge。

流程

  1. 如果您还没有这样做,请将 AMQ Streams Kafka Bridge 安装存档解压缩到任何目录。
  2. 使用配置属性作为参数运行 Kafka Bridge 脚本:

    例如:

    ./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
  3. 检查日志中是否成功安装了。

    HTTP-Kafka Bridge started and listening on port 8080
    HTTP-Kafka Bridge bootstrap servers localhost:9092
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.