2.4. 将 Kafka 主题连接到 Kamelet Binding 中的数据 sink


要将 Kafka 主题连接到数据接收器,您可以创建一个 Kamelet Binding,如图 2.3 所示

Connecting a Kafka topic to a data sink 图 2.3 将 Kafka 主题连接到数据 sink

前提条件

  • 您知道您要发送事件的 Kafka 主题的名称。此流程中的示例使用 test-topic 来发送事件。这与您在 Kamelet Binding 中连接数据源中的 coffee 源中接收事件的主题与 Kafka 主题 相同。
  • 您知道 Kafka 实例的以下参数值:

    • BootstrapServers - 以逗号分隔的 Kafka Broker URL 列表。
    • Password - 在 Kafka 中进行身份验证的密码。
    • User - Kafka 身份验证的用户名。

      有关使用 OpenShift Streams 时如何获取这些值的详情,请参考 Obtaining Kafka 凭证

  • 您知道与 Kafka 代理通信的安全协议。对于 OpenShift Streams 上的 Kafka 集群,它是 SASL_SSL (默认值)。对于 AMQ 流上的 Kafka 集群,它是 PLAINTEXT
  • 您知道哪个 Kame 可让您添加到 Camel K 集成以及所需的实例参数。Kamelet 在 Kamelet Catalog 中提供:

    • kafka-source Kamelet - 使用 kafka-source Kamelet,因为 Kafka 主题是发送数据(它是数据制作者)。所需参数的示例值有:

      • bootstrapServers - "broker.url:9092"
      • password - "testpassword"
      • user - "testuser"
      • topic - "test-topic"
      • securityProtocol - 对于 OpenShift Streams 上的 Kafka 集群,您不需要设置此参数,因为 SASL_SSL 是默认值。对于 AMQ 流上的 Kafka 集群,此参数值为 "PLAINTEXT "。
    • log-sink Kamelet - 使用 log-sink 来记录它从 kafka-source Kamelet 接收的数据。(可选)指定 showStreams 参数以显示数据的消息正文。log-sink Kamelet 可用于调试目的。

流程

要将 Kafka 主题连接到数据接收器,请创建一个 Kamelet Binding:

  1. 在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. 为 Kamelet Binding 添加名称。在本例中,名称是 kafka-to-log,因为该绑定将 kafka-source Kamelet 连接到 log-sink Kamelet。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
      sink:
  3. 对于 Kamelet Binding 的源,指定 kafka-source Kamelet 并配置其参数。

    例如,当 Kafka 集群位于 OpenShift Streams 时(您不需要设置 securityProtocol 参数):

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
      sink:

    例如,当 Kafka 集群位于 AMQ Streams 时,您必须将 securityProtocol 参数设置为 "PLAINTEXT "。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
          securityProtocol: "PLAINTEXT"
      sink:
  4. 对于 Kamelet Binding 的 sink,指定数据使用者 Kamelet (如 log-sink Kamelet),并为 Kamelet 配置任何参数,例如:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  5. 保存 YAML 文件(例如,kafka-to-log.yaml)。
  6. 登录您的 OpenShift 项目。
  7. 将 Kamelet Binding 作为资源添加到 OpenShift 命名空间:

    oc apply -f <kamelet 绑定 filename>

    例如:

    oc apply -f kafka-to-log.yaml

    Camel K 运算符通过使用 KameletBinding 资源生成并运行 Camel K 集成。构建可能需要几分钟时间。

  8. 查看 KameletBinding 资源的状态:

    oc get kameletbindings

  9. 查看其集成的状态:

    oc get integrations

  10. 查看集成的日志:

    kamel logs <integration> -n <project>

    例如:

    kamel logs kafka-to-log -n my-camel-k-kafka

    在输出中,您应看到 coffee 事件,例如:

    INFO  [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
  11. 要停止正在运行的集成,请删除关联的 Kamelet Binding 资源:

    oc delete kameletbindings/<kameletbinding-name>

    例如:

    oc delete kameletbindings/kafka-to-log

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.