5.15. 使用 Knative Kafka
Knative Kafka 提供集成选项,供您在 OpenShift Serverless 中使用支持的 Apache Kafka 消息流平台。Kafka 为事件源、频道、代理和事件 sink 功能提供选项。
如果集群管理员安装了 KnativeKafka
自定义资源,则 OpenShift Serverless 安装中就会提供 Knative Kafka 功能。
IBM Z 和 IBM Power Systems 目前不支持 Knative Kafka。
Knative Kafka 提供了额外的选项,例如:
- Kafka 源
- Kafka 频道
- Kafka 代理
- Kafka 接收器
5.15.1. Kafka 事件交付和重试
在事件驱动的架构中使用 Kafka 组件会提供"至少一次"事件交付。这意味着,会在收到返回代码值前重试操作。这使您的应用对丢失的事件更具弹性,但可能会导致发送重复的事件。
对于 Kafka 事件源,默认会尝试发送事件的固定次数。对于 Kafka 频道,只有在 Kafka 频道 Delivery
规格中配置了它们时才会进行重试。
有关交付保证的更多信息,请参阅 事件交付 文档。
5.15.2. Kafka 源
您可以创建一个 Kafka 源从 Apache Kafka 集群中读取事件,并将这些事件传递给接收器。您可以使用 OpenShift Container Platform web 控制台、Knative (kn
) CLI 或直接创建 KafkaSource
对象并使用 OpenShift CLI (oc
) 创建 Kafka 源来应用它。
5.15.2.1. 使用 Web 控制台创建 Kafka 事件源
在集群中安装了 Knative Kafka 后,您可以使用 Web 控制台创建 Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建 Kafka 源。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在集群中。 - 已登陆到 web 控制台。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
- 在 Developer 视角中,进入到 +Add 页面并选择 Event Source。
- 在 Event Sources 页面中,在 Type 部分选择 Kafka Source。
配置 Kafka Source 设置:
- 添加用逗号分开的 Bootstrap 服务器列表。
- 添加以逗号分隔的标题列表。
- 添加一个 消费者组。
- 为您创建的服务帐户选择 Service Account Name。
- 为事件源选择 Sink。Sink 可以是一个 资源,如频道、代理或服务,也可以是一个 URI。
- 输入 Kafka 事件源的 名称。
- 点 Create。
验证
您可以通过查看 Topology 页面来验证 Kafka 事件源是否已创建并连接到接收器。
- 在 Developer 视角中,导航到 Topology。
查看 Kafka 事件源和接收器。
5.15.2.2. 使用 Knative CLI 创建 Kafka 事件源
您可以使用 kn source kafka create
命令,使用 Knative (kn
) CLI 创建 Kafka 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。
先决条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving 和
KnativeKafka
自定义资源(CR)已安装在集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
-
已安装 Knative (
kn
) CLI。 -
可选:如果您想要使用此流程中的验证步骤,已安装 OpenShift CLI (
oc
)。
流程
要验证 Kafka 事件源是否可以工作,请创建一个 Knative 服务,在服务日志中转储传入的事件:
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
创建
KafkaSource
CR:$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
注意将此命令中的占位符值替换为源名称、引导服务器和主题的值。
--servers
、--topics
和--consumergroup
选项指定到 Kafka 集群的连接参数。--consumergroup
选项是可选的。可选:查看您创建的
KafkaSource
CR 的详情:$ kn source kafka describe <kafka_source_name>
输出示例
Name: example-kafka-source Namespace: kafka Age: 1h BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092 Topics: example-topic ConsumerGroup: example-consumer-group Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 1h ++ Deployed 1h ++ SinkProvided 1h
验证步骤
触发 Kafka 实例将信息发送到主题:
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
在提示符后输入信息。这个命令假设:
-
Kafka 集群安装在
kafka
命名空间中。 -
KafkaSource
对象已被配置为使用my-topic
主题。
-
Kafka 集群安装在
通过查看日志来验证消息是否显示:
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic subject: partition:46#0 id: partition:46/offset:0 time: 2021-03-10T11:21:49.4Z Extensions, traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00 Data, Hello!
5.15.2.2.1. Knative CLI sink 标记
当使用 Knative (kn
) CLI 创建事件源时,您可以使用 --sink
标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。
以下示例创建使用服务 http://event-display.svc.cluster.local
的接收器绑定作为接收器:
使用 sink 标记的命令示例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
中的svc
确定接收器是一个 Knative 服务。其他默认的接收器前缀包括channel
和broker
。
5.15.2.3. 使用 YAML 创建 Kafka 事件源
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个 YAML 文件来定义 KafkaSource
对象,然后使用 oc apply
命令应用它。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
-
安装 OpenShift CLI (
oc
) 。
流程
创建
KafkaSource
对象作为 YAML 文件:apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: <source_name> spec: consumerGroup: <group_name> 1 bootstrapServers: - <list_of_bootstrap_servers> topics: - <list_of_topics> 2 sink: - <list_of_sinks> 3
重要仅支持 OpenShift Serverless 上的
KafkaSource
对象的v1beta1
API 版本。不要使用这个 API 的v1alpha1
版本,因为这个版本现已弃用。KafkaSource
对象示例apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
应用
KafkaSource
YAML 文件:$ oc apply -f <filename>
验证
输入以下命令验证 Kafka 事件源是否已创建:
$ oc get pods
输出示例
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
5.15.3. Kafka 代理
对于生产环境就绪的 Knative Eventing 部署,红帽建议您使用 Knative Kafka 代理实现。Kafka 代理是 Knative 代理的 Apache Kafka 原生实现,它将 CloudEvents 直接发送到 Kafka 实例。
Kafka 代理禁用联邦信息处理标准 (FIPS) 模式。
Kafka 代理具有与 Kafka 的原生集成,用于存储和路由事件。它可以更好地与 Kafka 集成用于代理,并在其他代理类型中触发模型,并减少网络跃点。Kafka 代理实现的其他优点包括:
- 最少一次的交付保证
- 根据 CloudEvents 分区扩展排序事件交付
- 数据平面的高可用性
- 水平扩展数据平面
Knative Kafka 代理使用二进制内容模式将传入的 CloudEvents 存储为 Kafka 记录。这意味着,所有 CloudEvent 属性和扩展都会在 Kafka 记录上映射,而 CloudEvent 的 data
规格与 Kafka 记录的值对应。
有关使用 Kafka 代理的详情,请参考创建代理。
5.15.4. 使用 YAML 创建 Kafka 频道
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述频道,并以可重复的方式描述频道。您可以通过创建一个 Kafka 频道,创建由 Kafka 主题支持的 Knative Eventing 频道。要使用 YAML 创建 Kafka 频道,您必须创建一个 YAML 文件来定义 KafkaChannel
对象,然后使用 oc apply
命令应用它。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在 OpenShift Container Platform 集群中。 -
安装 OpenShift CLI (
oc
) 。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
创建一个
KafkaChannel
对象作为一个 YAML 文件:apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel metadata: name: example-channel namespace: default spec: numPartitions: 3 replicationFactor: 1
重要仅支持 OpenShift Serverless 上的
KafkaChannel
对象的v1beta1
API 版本。不要使用这个 API 的v1alpha1
版本,因为这个版本现已弃用。应用
KafkaChannel
YAML 文件:$ oc apply -f <filename>
5.15.5. Kafka 接收器
如果集群管理员在集群中启用了 Kafka,则 Kafka sink 是事件 sink 类型。您可以使用 Kafka sink 将事件直接从事件源发送到 Kafka 主题。
5.15.5.1. 使用 Kafka sink
您可以创建一个称为 Kafka sink 的事件 sink,用于将事件发送到 Kafka 主题。使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。默认情况下,Kafka sink 使用二进制内容模式,其效率比结构化模式更高效。要使用 YAML 创建 Kafka sink,您必须创建一个 YAML 文件来定义 KafkaSink
对象,然后使用 oc apply
命令应用它。
先决条件
-
在集群中安装了 OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源 (CR) 。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
-
安装 OpenShift CLI (
oc
) 。
流程
创建一个
KafkaSink
对象定义作为一个 YAML 文件:Kafka sink YAML
apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink-name> namespace: <namespace> spec: topic: <topic-name> bootstrapServers: - <bootstrap-server>
要创建 Kafka sink,请应用
KafkaSink
YAML 文件:$ oc apply -f <filename>
配置事件源,以便在其 spec 中指定 sink:
连接到 API 服务器源的 Kafka sink 示例
apiVersion: sources.knative.dev/v1alpha2 kind: ApiServerSource metadata: name: <source-name> 1 namespace: <namespace> 2 spec: serviceAccountName: <service-account-name> 3 mode: Resource resources: - apiVersion: v1 kind: Event sink: ref: apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink name: <sink-name> 4