2.5. Apache Kafka 的源
您可以创建一个 Apache Kafka 源,从 Apache Kafka 集群中读取事件,并将这些事件传递给接收器。您可以使用 OpenShift Container Platform web 控制台、Knative (kn
) CLI 或直接创建 KafkaSource
对象并使用 OpenShift CLI (oc
) 创建 Kafka 源来应用它。
请参阅有关 为 Apache Kafka 安装 Knative 代理 的文档。
2.5.1. 使用 Web 控制台创建 Apache Kafka 事件源 复制链接链接已复制到粘贴板!
在集群中安装 Apache Kafka 的 Knative 代理实现后,您可以使用 Web 控制台创建 Apache Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建 Kafka 源。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在集群中。 - 已登陆到 web 控制台。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
- 在 Developer 视角中,进入到 +Add 页面并选择 Event Source。
- 在 Event Sources 页面中,在 Type 部分选择 Kafka Source。
配置 Kafka Source 设置:
- 添加用逗号分开的 Bootstrap 服务器列表。
- 添加以逗号分隔的标题列表。
- 添加一个 消费者组。
- 为您创建的服务帐户选择 Service Account Name。
在 Target 部分中,选择您的事件 sink。这可以是 Resource 或一个 URI :
- 选择 Resource 来使用频道、代理或服务作为事件源的事件 sink。
- 选择 URI 来指定事件路由到的 Uniform Resource Identifier (URI)。
- 输入 Kafka 事件源的 名称。
- 点 Create。
验证
您可以通过查看 Topology 页面来验证 Kafka 事件源是否已创建并连接到接收器。
- 在 Developer 视角中,导航到 Topology。
查看 Kafka 事件源和接收器。
2.5.2. 使用 Knative CLI 创建 Apache Kafka 事件源 复制链接链接已复制到粘贴板!
您可以使用 kn source kafka create
命令,使用 Knative (kn
) CLI 创建 Kafka 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。
先决条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving 和
KnativeKafka
自定义资源(CR)已安装在集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
-
已安装 Knative (
kn
) CLI。 -
可选:如果您想要使用此流程中的验证步骤,已安装 OpenShift CLI (
oc
)。
流程
要验证 Kafka 事件源是否可以工作,请创建一个 Knative 服务,在服务日志中转储传入的事件:
kn service create event-display \ --image quay.io/openshift-knative/showcase
$ kn service create event-display \ --image quay.io/openshift-knative/showcase
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 创建
KafkaSource
CR:kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注意将此命令中的占位符值替换为源名称、引导服务器和主题的值。
--servers
、--topics
和--consumergroup
选项指定到 Kafka 集群的连接参数。--consumergroup
选项是可选的。可选:查看您创建的
KafkaSource
CR 的详情:kn source kafka describe <kafka_source_name>
$ kn source kafka describe <kafka_source_name>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 输出示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
验证步骤
触发 Kafka 实例将信息发送到主题:
oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在提示符后输入信息。这个命令假设:
-
Kafka 集群安装在
kafka
命名空间中。 -
KafkaSource
对象已被配置为使用my-topic
主题。
-
Kafka 集群安装在
通过查看日志来验证消息是否显示:
oc logs $(oc get pod -o name | grep event-display) -c user-container
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 输出示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.5.2.1. Knative CLI sink 标记 复制链接链接已复制到粘贴板!
当使用 Knative (kn
) CLI 创建事件源时,您可以使用 --sink
标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。
以下示例创建使用服务 http://event-display.svc.cluster.local
的接收器绑定作为接收器:
使用 sink 标记的命令示例
kn source binding create bind-heartbeat \ --namespace sinkbinding-example \ --subject "Job:batch/v1:app=heartbeat-cron" \ --sink http://event-display.svc.cluster.local \ --ce-override "sink=bound"
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
中的svc
确定接收器是一个 Knative 服务。其他默认的接收器前缀包括channel
和broker
。
2.5.3. 使用 YAML 创建 Apache Kafka 事件源 复制链接链接已复制到粘贴板!
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个 YAML 文件来定义 KafkaSource
对象,然后使用 oc apply
命令应用它。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
-
安装 OpenShift CLI (
oc
) 。
流程
创建
KafkaSource
对象作为 YAML 文件:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 重要仅支持 OpenShift Serverless 上的
KafkaSource
对象的v1beta1
API 版本。不要使用这个 API 的v1alpha1
版本,因为这个版本现已弃用。KafkaSource
对象示例Copy to Clipboard Copied! Toggle word wrap Toggle overflow 应用
KafkaSource
YAML 文件:oc apply -f <filename>
$ oc apply -f <filename>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
验证
输入以下命令验证 Kafka 事件源是否已创建:
oc get pods
$ oc get pods
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 输出示例
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.5.4. 为 Apache Kafka 源配置 SASL 身份验证 复制链接链接已复制到粘贴板!
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您有一个 Kafka 集群的用户名和密码。
-
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。 -
如果启用了 TLS,您还需要 Kafka 集群的
ca.crt
证书文件。 -
已安装 OpenShift (
oc
) CLI。
流程
在所选命名空间中创建证书文件作为 secret:
oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \
1 --from-literal=user="my-sasl-user"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- SASL 类型可以是
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。
创建或修改 Kafka 源,使其包含以下
spec
配置:Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 如果您使用公共云 Kafka 服务,则不需要
caCert
spec。