12.2. Kafka Bridge quickstart


使用此快速入门在 Red Hat Enterprise Linux 上尝试 AMQ Streams Kafka Bridge。您将学习如何:

  • 安装 Kafka Bridge
  • 生成信息到 Kafka 集群中的主题和分区
  • 创建 Kafka Bridge 使用者
  • 执行基本的消费者操作,如将消费者订阅到主题并检索您生成的消息

在这个快速入门中,HTTP 请求被格式化为 curl 命令,您可以复制并粘贴到终端。

确定您有先决条件,然后按照本章中提供的顺序按照任务进行操作。

关于数据格式

在本快速入门中,您将以 JSON 格式生成和使用消息,而不是二进制。有关示例请求中使用的数据格式和 HTTP 标头的更多信息,请参阅 第 12.1.1 节 “身份验证和加密”

12.2.1. 在本地部署 Kafka Bridge

将 AMQ Streams Kafka Bridge 的实例部署到主机。使用安装存档提供的 application.properties 文件以应用默认配置设置。

流程

  1. 打开 application.properties 文件,并检查是否定义了默认的 HTTP 相关设置

    http.enabled=true
    http.host=0.0.0.0
    http.port=8080

    这会将 Kafka Bridge 配置为侦听端口 8080 上的请求。

  2. 使用配置属性作为参数运行 Kafka Bridge 脚本:

    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties

接下来要做什么

12.2.2. 向主题和分区生成信息

使用 topics 端点将消息生成到 JSON 格式的主题。

您可以为请求正文中的消息指定目标分区,如下所示。分区 端点提供了一种替代方法,为所有消息指定单一目标分区作为路径参数。

流程

  1. 使用 kafka-topics.sh 实用程序创建 Kafka 主题:

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1 --config retention.ms=7200000 --config segment.bytes=1073741824

    指定三个分区。

  2. 验证是否已创建该主题:

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
  3. 使用 Kafka Bridge,为您创建的主题生成三个信息:

    curl -X POST \
      http://localhost:8080/topics/bridge-quickstart-topic \
      -H 'content-type: application/vnd.kafka.json.v2+json' \
      -d '{
        "records": [
            {
                "key": "my-key",
                "value": "sales-lead-0001"
            },
            {
                "value": "sales-lead-0002",
                "partition": 2
            },
            {
                "value": "sales-lead-0003"
            }
        ]
    }'
    • sales-lead-0001 发送到基于密钥哈希的分区。
    • Sales-lead-0002 直接发送到分区 2。
    • Sales-lead-0003 使用 round-robin 方法发送到 bridge-quickstart-topic 主题中的分区。
  4. 如果请求成功,Kafka Bridge 会返回一个 偏移 数组,以及 200 (OK)代码和 application/vnd.kafka.v2+jsoncontent-type 标头。对于每个消息,偏移 数组描述:

    • 消息发送到的分区
    • 分区的当前消息偏移

      响应示例

      #...
      {
        "offsets":[
          {
            "partition":0,
            "offset":0
          },
          {
            "partition":2,
            "offset":0
          },
          {
            "partition":0,
            "offset":1
          }
        ]
      }

接下来要做什么

在生成消息到主题和分区后,创建一个 Kafka Bridge 消费者

其他资源

12.2.3. 创建 Kafka Bridge 使用者

在 Kafka 集群上执行任何消费者操作前,您必须首先使用 consumers 端点创建消费者。消费者被称为 Kafka Bridge 消费者

流程

  1. 在名为 bridge-quickstart-consumer-group 的新消费者组中创建一个 Kafka Bridge 使用者:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "name": "bridge-quickstart-consumer",
        "auto.offset.reset": "earliest",
        "format": "json",
        "enable.auto.commit": false,
        "fetch.min.bytes": 512,
        "consumer.request.timeout.ms": 30000
      }'
    • 消费者名为 bridge-quickstart-consumer,嵌入式数据格式被设置为 json
    • 消费者不会自动向日志提交偏移,因为 enable.auto.commit 设置为 false。稍后您将在此快速入门中手动提交偏移量。

      注意

      如果您没有在请求正文中指定消费者名称,则 Kafka Bridge 会生成一个随机消费者名称。

      如果请求成功,Kafka Bridge 会返回响应正文中的使用者 ID (instance_id)和基本 URL (base_uri),以及 200 (OK)代码。

      响应示例

      #...
      {
        "instance_id": "bridge-quickstart-consumer",
        "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
      }

  2. 复制基础 URL (base_uri),以便在这个快速入门中的其他消费者操作中使用。

接下来要做什么

现在,您已创建了 Kafka Bridge 消费者,您可以将其订阅到主题

其他资源

12.2.4. 将 Kafka Bridge 消费者订阅到主题

使用 订阅 端点将 Kafka Bridge 使用者订阅到一个或多个主题。订阅后,消费者开始接收生成到该主题的所有消息。

流程

  • 将消费者订阅之前创建的 bridge-quickstart-topic 主题,在 Produc ing 信息到主题和分区

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "topics": [
            "bridge-quickstart-topic"
        ]
    }'

    topics 数组可以包含单个主题(如上所示)或多个主题。如果要将消费者订阅与正则表达式匹配的多个主题,您可以使用 topic_pattern 字符串而不是 topics 数组。

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

接下来要做什么

将 Kafka Bridge 消费者订阅到主题后,您可以从 消费者检索信息

其他资源

12.2.5. 从 Kafka Bridge consumer 检索最新的信息

通过从 records 端点请求数据,从 Kafka Bridge 使用者检索最新的消息。在生产环境中,HTTP 客户端可以重复调用此端点(在循环中)。

流程

  1. 为 Kafka Bridge consumer 生成额外消息,如 Produc ing 消息到主题和分区 中所述。
  2. 记录 端点提交 GET 请求:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    创建并订阅 Kafka Bridge 消费者后,第一个 GET 请求将返回空响应,因为轮询操作会触发重新平衡过程来分配分区。

  3. 重复步骤 2,以从 Kafka Bridge 使用者检索信息。

    Kafka Bridge 返回一条信息 数组,它带有 200 (OK)代码,它包括主题名称、键、值、分区和偏移来来包括 200 (OK)代码。默认情况下,消息从最新的偏移中检索。

    HTTP/1.1 200 OK
    content-type: application/vnd.kafka.json.v2+json
    #...
    [
      {
        "topic":"bridge-quickstart-topic",
        "key":"my-key",
        "value":"sales-lead-0001",
        "partition":0,
        "offset":0
      },
      {
        "topic":"bridge-quickstart-topic",
        "key":null,
        "value":"sales-lead-0003",
        "partition":0,
        "offset":1
      },
    #...
    注意

    如果返回空响应,请生成更多记录到消费者,如 Produc ing 消息到主题和分区 中所述,然后尝试再次检索消息。

接下来要做什么

从 Kafka Bridge 消费者检索信息后,尝试向 日志提交偏移

其他资源

12.2.6. 向日志提交偏移量

使用 偏移 端点手动向日志提交 Kafka Bridge 消费者接收的所有消息。这是必要的,因为在 创建 Kafka Bridge 消费者 中的 Kafka Bridge 消费者 被配置,其 enable.auto.commit 设置为 false

流程

  • 将偏移提交到 bridge-quickstart-consumer 的日志:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets

    因为没有提交请求正文,所以针对消费者收到的所有记录提交偏移。另外,请求正文可以包含数组(OffsetCommitSeekList),用于指定您要为其提交偏移的主题和分区。

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

接下来要做什么

向日志提交偏移后,尝试查找查找 偏移的端点

其他资源

12.2.7. 寻找分区偏移量

使用 位置 端点配置 Kafka Bridge 消费者,从特定偏移中检索分区的消息,然后从最新的偏移中检索。这在 Apache Kafka 中被称为 seek 操作。

流程

  1. 请参阅 quickstart-bridge-topic 主题的分区 0 的特定偏移:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "offsets": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0,
                "offset": 2
            }
        ]
    }'

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

  2. 记录 端点提交 GET 请求:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'

    Kafka Bridge 从您看到的偏移返回信息。

  3. 通过查找同一分区的最后一个偏移来恢复默认消息检索行为。这一次,使用 位置/结束 端点。

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "partitions": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0
            }
        ]
    }'

    如果请求成功,Kafka Bridge 会返回另一个 204 No Content 代码。

注意

您还可以使用 positions/beginning 端点查找一个或多个分区的第一个偏移。

接下来要做什么

在这个快速入门中,您使用 AMQ Streams Kafka Bridge 在 Kafka 集群上执行几个常见操作。现在 ,您可以删除之前创建的 Kafka Bridge 使用者

12.2.8. 删除 Kafka Bridge 使用者

最后,删除您在此快速入门中使用的 Kafa Bridge 使用者。

流程

  • 通过向 实例 端点发送 DELETE 请求来删除 Kafka Bridge 使用者。

    curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

其他资源

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部