搜索

2.3. 将数据源连接到 Kamelet Binding 中的 Kafka 主题

download PDF

要将数据源连接到 Kafka 主题,您可以创建一个 Kamelet Binding,如图 2.2 所示

Connecting a data source to a Kafka topic 图 2.2 将数据源连接到 Kafka 主题

先决条件

  • 您知道要将事件发送到的 Kafka 主题的名称。

    此流程中的示例使用 test-topic 接收事件。

  • 您知道 Kafka 实例的以下参数值:

    • bootstrapServers - 以逗号分隔的 Kafka Broker URL 列表。
    • Password - 向 Kafka 进行身份验证的密码。对于 OpenShift Streams,这是 credentials.json 文件中的 密码。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。
    • User - 向 Kafka 进行身份验证的用户名。对于 OpenShift Streams,这是 credentials.json 文件中的 clientID。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。

      有关如何使用 OpenShift Streams 时如何获取这些值的详情,请参考 获取 Kafka 凭证

    • securityProtocol - 您知道与 Kafka 代理通信的安全协议。对于 OpenShift Streams 上的 Kafka 集群,它是 SASL_SSL (默认)。对于 AMQ 流上的 Kafka 集群,它是 PLAINTEXT
  • 您知道您要添加到 Camel K 集成中的 Kamelets 和所需的实例参数。

    此流程的 Kamelets 示例包括:

    • coffee-source Kamelet - 它有一个可选参数 周期,用于指定发送每个事件的频率。您可以将代码从 Example source Kamelet 复制到名为 coffee-source.kamelet.yaml 文件的文件,然后运行以下命令将其作为资源添加到命名空间中:

      oc apply -f coffee-source.kamelet.yaml

    • Kamelet Catalog 中提供的 kafka-sink Kamelet。您可以使用 kafka-sink Kamelet,因为 Kafka 主题在这个绑定中接收数据(这是数据消费者)。

流程

要将数据源连接到 Kafka 主题,请创建一个 Kamelet Binding:

  1. 在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. 为 Kamelet Binding 添加名称。在本例中,名称是 coffees-to-kafka,因为绑定将 coffee-source Kamelet 连接到 kafka-sink Kamelet。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
      sink:
  3. 对于 Kamelet Binding 的源,指定一个数据源 Kamelet (例如,coffee-source Kamelet)生成事件,其中包含有关 coffee 的数据,并为 Kamelet 配置任何参数。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
  4. 对于 Kamelet Binding 的 sink,请指定 kafka-sink Kamelet 及其所需属性。

    例如,当 Kafka 集群位于 OpenShift Streams 中时:

    • 对于 user 属性,指定 clientID,例如: srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094
    • 对于 password 属性,指定密码,例如: facf3df1-3c8d-4253-aa87-8c95ca5e1225
    • 您不需要设置 securityProtocol 属性。

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443"
            password: "facf3df1-3c8d-4253-aa87-8c95ca5e1225"
            topic: "test-topic"
            user: "srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094"

      对于另一个示例,当 Kafka 集群位于 AMQ Streams 时,将 securityProtocol 属性设置为 "PLAINTEXT"

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "broker.url:9092"
            password: "testpassword"
            topic: "test-topic"
            user: "testuser"
            securityProtocol: "PLAINTEXT"
  5. 保存 YAML 文件(例如,coffees-to-kafka.yaml)。
  6. 登录您的 OpenShift 项目。
  7. 将 Kamelet Binding 作为资源添加到 OpenShift 命名空间中:

    oc apply -f <kamelet binding filename>

    例如:

    oc apply -f coffees-to-kafka.yaml

    Camel K operator 使用 KameletBinding 资源生成并运行 Camel K 集成。构建可能需要几分钟时间。

  8. 查看 KameletBinding 资源的状态:

    oc get kameletbindings

  9. 查看其集成的状态:

    oc get integrations

  10. 查看集成的日志:

    kamel logs <integration> -n <project>

    例如:

    kamel logs coffees-to-kafka -n my-camel-k-kafka

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.