12.2. Kafka Bridge quickstart


使用此快速入门在 Red Hat Enterprise Linux 上尝试 AMQ Streams Kafka Bridge。您将学习如何:

  • 安装 Kafka Bridge
  • 生成信息到 Kafka 集群中的主题和分区
  • 创建 Kafka Bridge 使用者
  • 执行基本的消费者操作,如将消费者订阅到主题并检索您生成的消息

在这个快速入门中,HTTP 请求被格式化为 curl 命令,您可以复制并粘贴到终端。

确定您有先决条件,然后按照本章中提供的顺序按照任务进行操作。

关于数据格式

在本快速入门中,您将以 JSON 格式生成和使用消息,而不是二进制。有关示例请求中使用的数据格式和 HTTP 标头的更多信息,请参阅 第 12.1.1 节 “身份验证和加密”

12.2.1. 在本地部署 Kafka Bridge

将 AMQ Streams Kafka Bridge 的实例部署到主机。使用安装存档提供的 application.properties 文件以应用默认配置设置。

流程

  1. 打开 application.properties 文件,并检查是否定义了默认的 HTTP 相关设置

    http.enabled=true
    http.host=0.0.0.0
    http.port=8080
    Copy to Clipboard Toggle word wrap

    这会将 Kafka Bridge 配置为侦听端口 8080 上的请求。

  2. 使用配置属性作为参数运行 Kafka Bridge 脚本:

    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
    Copy to Clipboard Toggle word wrap

接下来要做什么

12.2.2. 向主题和分区生成信息

使用 topics 端点将消息生成到 JSON 格式的主题。

您可以为请求正文中的消息指定目标分区,如下所示。分区 端点提供了一种替代方法,为所有消息指定单一目标分区作为路径参数。

流程

  1. 使用 kafka-topics.sh 实用程序创建 Kafka 主题:

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1 --config retention.ms=7200000 --config segment.bytes=1073741824
    Copy to Clipboard Toggle word wrap

    指定三个分区。

  2. 验证是否已创建该主题:

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
    Copy to Clipboard Toggle word wrap
  3. 使用 Kafka Bridge,为您创建的主题生成三个信息:

    curl -X POST \
      http://localhost:8080/topics/bridge-quickstart-topic \
      -H 'content-type: application/vnd.kafka.json.v2+json' \
      -d '{
        "records": [
            {
                "key": "my-key",
                "value": "sales-lead-0001"
            },
            {
                "value": "sales-lead-0002",
                "partition": 2
            },
            {
                "value": "sales-lead-0003"
            }
        ]
    }'
    Copy to Clipboard Toggle word wrap
    • sales-lead-0001 发送到基于密钥哈希的分区。
    • Sales-lead-0002 直接发送到分区 2。
    • Sales-lead-0003 使用 round-robin 方法发送到 bridge-quickstart-topic 主题中的分区。
  4. 如果请求成功,Kafka Bridge 会返回一个 偏移 数组,以及 200 (OK)代码和 application/vnd.kafka.v2+jsoncontent-type 标头。对于每个消息,偏移 数组描述:

    • 消息发送到的分区
    • 分区的当前消息偏移

      响应示例

      #...
      {
        "offsets":[
          {
            "partition":0,
            "offset":0
          },
          {
            "partition":2,
            "offset":0
          },
          {
            "partition":0,
            "offset":1
          }
        ]
      }
      Copy to Clipboard Toggle word wrap

接下来要做什么

在生成消息到主题和分区后,创建一个 Kafka Bridge 消费者

其他资源

12.2.3. 创建 Kafka Bridge 使用者

在 Kafka 集群上执行任何消费者操作前,您必须首先使用 consumers 端点创建消费者。消费者被称为 Kafka Bridge 消费者

流程

  1. 在名为 bridge-quickstart-consumer-group 的新消费者组中创建一个 Kafka Bridge 使用者:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "name": "bridge-quickstart-consumer",
        "auto.offset.reset": "earliest",
        "format": "json",
        "enable.auto.commit": false,
        "fetch.min.bytes": 512,
        "consumer.request.timeout.ms": 30000
      }'
    Copy to Clipboard Toggle word wrap
    • 消费者名为 bridge-quickstart-consumer,嵌入式数据格式被设置为 json
    • 消费者不会自动向日志提交偏移,因为 enable.auto.commit 设置为 false。稍后您将在此快速入门中手动提交偏移量。

      注意

      如果您没有在请求正文中指定消费者名称,则 Kafka Bridge 会生成一个随机消费者名称。

      如果请求成功,Kafka Bridge 会返回响应正文中的使用者 ID (instance_id)和基本 URL (base_uri),以及 200 (OK)代码。

      响应示例

      #...
      {
        "instance_id": "bridge-quickstart-consumer",
        "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
      }
      Copy to Clipboard Toggle word wrap

  2. 复制基础 URL (base_uri),以便在这个快速入门中的其他消费者操作中使用。

接下来要做什么

现在,您已创建了 Kafka Bridge 消费者,您可以将其订阅到主题

其他资源

12.2.4. 将 Kafka Bridge 消费者订阅到主题

使用 订阅 端点将 Kafka Bridge 使用者订阅到一个或多个主题。订阅后,消费者开始接收生成到该主题的所有消息。

流程

  • 将消费者订阅之前创建的 bridge-quickstart-topic 主题,在 Produc ing 信息到主题和分区

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "topics": [
            "bridge-quickstart-topic"
        ]
    }'
    Copy to Clipboard Toggle word wrap

    topics 数组可以包含单个主题(如上所示)或多个主题。如果要将消费者订阅与正则表达式匹配的多个主题,您可以使用 topic_pattern 字符串而不是 topics 数组。

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

接下来要做什么

将 Kafka Bridge 消费者订阅到主题后,您可以从 消费者检索信息

其他资源

12.2.5. 从 Kafka Bridge consumer 检索最新的信息

通过从 records 端点请求数据,从 Kafka Bridge 使用者检索最新的消息。在生产环境中,HTTP 客户端可以重复调用此端点(在循环中)。

流程

  1. 为 Kafka Bridge consumer 生成额外消息,如 Produc ing 消息到主题和分区 中所述。
  2. 记录 端点提交 GET 请求:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'
    Copy to Clipboard Toggle word wrap

    创建并订阅 Kafka Bridge 消费者后,第一个 GET 请求将返回空响应,因为轮询操作会触发重新平衡过程来分配分区。

  3. 重复步骤 2,以从 Kafka Bridge 使用者检索信息。

    Kafka Bridge 返回一条信息 数组,它带有 200 (OK)代码,它包括主题名称、键、值、分区和偏移来来包括 200 (OK)代码。默认情况下,消息从最新的偏移中检索。

    HTTP/1.1 200 OK
    content-type: application/vnd.kafka.json.v2+json
    #...
    [
      {
        "topic":"bridge-quickstart-topic",
        "key":"my-key",
        "value":"sales-lead-0001",
        "partition":0,
        "offset":0
      },
      {
        "topic":"bridge-quickstart-topic",
        "key":null,
        "value":"sales-lead-0003",
        "partition":0,
        "offset":1
      },
    #...
    Copy to Clipboard Toggle word wrap
    注意

    如果返回空响应,请生成更多记录到消费者,如 Produc ing 消息到主题和分区 中所述,然后尝试再次检索消息。

接下来要做什么

从 Kafka Bridge 消费者检索信息后,尝试向 日志提交偏移

其他资源

12.2.6. 向日志提交偏移量

使用 偏移 端点手动向日志提交 Kafka Bridge 消费者接收的所有消息。这是必要的,因为在 创建 Kafka Bridge 消费者 中的 Kafka Bridge 消费者 被配置,其 enable.auto.commit 设置为 false

流程

  • 将偏移提交到 bridge-quickstart-consumer 的日志:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
    Copy to Clipboard Toggle word wrap

    因为没有提交请求正文,所以针对消费者收到的所有记录提交偏移。另外,请求正文可以包含数组(OffsetCommitSeekList),用于指定您要为其提交偏移的主题和分区。

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

接下来要做什么

向日志提交偏移后,尝试查找查找 偏移的端点

其他资源

12.2.7. 寻找分区偏移量

使用 位置 端点配置 Kafka Bridge 消费者,从特定偏移中检索分区的消息,然后从最新的偏移中检索。这在 Apache Kafka 中被称为 seek 操作。

流程

  1. 请参阅 quickstart-bridge-topic 主题的分区 0 的特定偏移:

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "offsets": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0,
                "offset": 2
            }
        ]
    }'
    Copy to Clipboard Toggle word wrap

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

  2. 记录 端点提交 GET 请求:

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'
    Copy to Clipboard Toggle word wrap

    Kafka Bridge 从您看到的偏移返回信息。

  3. 通过查找同一分区的最后一个偏移来恢复默认消息检索行为。这一次,使用 位置/结束 端点。

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "partitions": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0
            }
        ]
    }'
    Copy to Clipboard Toggle word wrap

    如果请求成功,Kafka Bridge 会返回另一个 204 No Content 代码。

注意

您还可以使用 positions/beginning 端点查找一个或多个分区的第一个偏移。

接下来要做什么

在这个快速入门中,您使用 AMQ Streams Kafka Bridge 在 Kafka 集群上执行几个常见操作。现在 ,您可以删除之前创建的 Kafka Bridge 使用者

12.2.8. 删除 Kafka Bridge 使用者

最后,删除您在此快速入门中使用的 Kafa Bridge 使用者。

流程

  • 通过向 实例 端点发送 DELETE 请求来删除 Kafka Bridge 使用者。

    curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
    Copy to Clipboard Toggle word wrap

    如果请求成功,Kafka Bridge 只会返回一个 204 No Content 代码。

其他资源

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat