搜索

2.5. Apache Kafka 的源

download PDF

您可以创建一个 Apache Kafka 源,从 Apache Kafka 集群中读取事件,并将这些事件传递给接收器。您可以使用 OpenShift Container Platform web 控制台、Knative (kn) CLI 或直接创建 KafkaSource 对象并使用 OpenShift CLI (oc) 创建 Kafka 源来应用它。

注意

请参阅有关 为 Apache Kafka 安装 Knative 代理 的文档。

2.5.1. 使用 Web 控制台创建 Apache Kafka 事件源

在集群中安装 Apache Kafka 的 Knative 代理实现后,您可以使用 Web 控制台创建 Apache Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建 Kafka 源。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在集群中。
  • 已登陆到 web 控制台。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,进入到 +Add 页面并选择 Event Source
  2. Event Sources 页面中,在 Type 部分选择 Kafka Source
  3. 配置 Kafka Source 设置:

    1. 添加用逗号分开的 Bootstrap 服务器列表。
    2. 添加以逗号分隔的标题列表。
    3. 添加一个 消费者组
    4. 为您创建的服务帐户选择 Service Account Name
    5. Target 部分中,选择您的事件 sink。这可以是 Resource 或一个 URI

      1. 选择 Resource 来使用频道、代理或服务作为事件源的事件 sink。
      2. 选择 URI 来指定事件路由到的 Uniform Resource Identifier (URI)。
    6. 输入 Kafka 事件源的 名称
  4. Create

验证

您可以通过查看 Topology 页面来验证 Kafka 事件源是否已创建并连接到接收器。

  1. Developer 视角中,导航到 Topology
  2. 查看 Kafka 事件源和接收器。

    在 Topology 视图中查看 Kafka 源和服务

2.5.2. 使用 Knative CLI 创建 Apache Kafka 事件源

您可以使用 kn source kafka create 命令,使用 Knative (kn) CLI 创建 Kafka 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

先决条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving 和 KnativeKafka 自定义资源(CR)已安装在集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 已安装 Knative (kn) CLI。
  • 可选:如果您想要使用此流程中的验证步骤,已安装 OpenShift CLI (oc)。

流程

  1. 要验证 Kafka 事件源是否可以工作,请创建一个 Knative 服务,在服务日志中转储传入的事件:

    $ kn service create event-display \
        --image quay.io/openshift-knative/showcase
  2. 创建 KafkaSource CR:

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注意

    将此命令中的占位符值替换为源名称、引导服务器和主题的值。

    --servers--topics--consumergroup 选项指定到 Kafka 集群的连接参数。--consumergroup 选项是可选的。

  3. 可选:查看您创建的 KafkaSource CR 的详情:

    $ kn source kafka describe <kafka_source_name>

    输出示例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

验证步骤

  1. 触发 Kafka 实例将信息发送到主题:

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    在提示符后输入信息。这个命令假设:

    • Kafka 集群安装在 kafka 命名空间中。
    • KafkaSource 对象已被配置为使用 my-topic 主题。
  2. 通过查看日志来验证消息是否显示:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

2.5.2.1. Knative CLI sink 标记

当使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

以下示例创建使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 sink 标记的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker

2.5.3. 使用 YAML 创建 Apache Kafka 事件源

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个 YAML 文件来定义 KafkaSource 对象,然后使用 oc apply 命令应用它。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 创建 KafkaSource 对象作为 YAML 文件:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    用户组是一组使用相同组群 ID 的用户,并消耗一个标题中的数据。
    2
    主题提供数据存储的目的地。每个主题都被分成一个或多个分区。
    3
    sink 指定事件从源发送到的位置。
    重要

    仅支持 OpenShift Serverless 上的 KafkaSource 对象的 v1beta1 API 版本。不要使用这个 API 的 v1alpha1 版本,因为这个版本现已弃用。

    KafkaSource 对象示例

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. 应用 KafkaSource YAML 文件:

    $ oc apply -f <filename>

验证

  • 输入以下命令验证 Kafka 事件源是否已创建:

    $ oc get pods

    输出示例

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

2.5.4. 为 Apache Kafka 源配置 SASL 身份验证

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群的用户名和密码。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512
  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。
  • 已安装 OpenShift (oc) CLI。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 1
      --from-literal=user="my-sasl-user"
    1
    SASL 类型可以是 PLAINSCRAM-SHA-256SCRAM-SHA-512
  2. 创建或修改 Kafka 源,使其包含以下 spec 配置:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 1
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    如果您使用公共云 Kafka 服务,则不需要 caCert spec。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.