6.2. 配置 Knative Kafka
Knative Kafka 提供集成选项,供您在 OpenShift Serverless 中使用支持的 Apache Kafka 消息流平台。Kafka 为事件源、频道、代理和事件 sink 功能提供选项。
除了作为 OpenShift Serverless 核心安装一部分的 Knative Eventing 组件外,集群管理员还可安装 KnativeKafka
自定义资源 (CR) 。
IBM Z 和 IBM Power Systems 目前不支持 Knative Kafka。
KnativeKafka
CR 为用户提供其他选项,例如:
- Kafka 源
- Kafka 频道
- Kafka 代理
- Kafka 接收器
6.2.1. 安装 Knative Kafka
Knative Kafka 提供集成选项,供您在 OpenShift Serverless 中使用支持的 Apache Kafka 消息流平台。如果您已安装 KnativeKafka 自定义资源,则 OpenShift Serverless 安装中提供了 Knative Kafka
功能。
先决条件
- 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
- 您可以访问 Red Hat AMQ Streams 集群。
-
如果要使用验证步骤,请安装 OpenShift CLI (
oc
) 。 - 在 OpenShift Container Platform 上具有集群管理员权限。
- 已登陆到 OpenShift Container Platform Web 控制台。
流程
-
在 Administrator 视角中,进入 Operators
Installed Operators。 - 检查页面顶部的 Project 下拉菜单是否已设置为 Project: knative-eventing。
- 在 OpenShift Serverless Operator 的 Provided APIs 列表中,找到 Knative Kafka 复选框并点 Create Instance。
在 Create Knative Kafka 页面中配置 KnativeKafka 对象。
重要要在集群中使用 Kafka 频道、源、代理或 sink,您需要将要使用的属性的 enabled 选项设置为 true。这些交换机默认设置为 false。另外,要使用 Kafka 频道、代理或接收器,您必须指定 bootstrap 服务器。
KnativeKafka
自定义资源示例apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: name: knative-kafka namespace: knative-eventing spec: channel: enabled: true 1 bootstrapServers: <bootstrap_servers> 2 source: enabled: true 3 broker: enabled: true 4 defaultConfig: bootstrapServers: <bootstrap_servers> 5 numPartitions: <num_partitions> 6 replicationFactor: <replication_factor> 7 sink: enabled: true 8
- 1
- 让开发人员在集群中使用
KafkaChannel
频道类型。 - 2
- 以逗号分隔的 AMQ Streams 集群中的 bootstrap 服务器列表。
- 3
- 让开发人员在集群中使用
KafkaSource
事件源类型。 - 4
- 让开发人员在集群中使用 Knative Kafka 代理实现。
- 5
- 来自 Red Hat AMQ Streams 集群的 bootstrap 服务器的逗号分隔列表。
- 6
- 定义 Kafka 主题分区的数量,由
Broker
对象支持。默认值为10
。 - 7
- 定义 Kafka 主题的复制因素,由
Broker
对象支持。默认值为3
。 - 8
- 让开发人员在集群中使用 Kafka sink。
注意replicationFactor
值必须小于或等于 Red Hat AMQ Streams 集群的节点数量。- 建议您在不需要完全控制 KnativeKafka 对象创建的简单配置中使用该表单。
- 对于更复杂的配置,建议编辑 YAML,这可以完全控制 KnativeKafka 对象的创建。您可以通过点 Create Knative Kafka 页面右上角的 Edit YAML 链接来访问 YAML。
- 完成 Kafka 的任何可选配置后,点 Create。您会自动定向到 Knative Kafka 标签页,其中 knative-kafka 在资源列表中。
验证
- 点 Knative Kafka 选项卡中的 knative-kafka 资源。您会自动定向到 Knative Kafka Overview 页面。
查看资源的 Conditions 列表,并确认其状态为 True。
如果条件的状态为 Unknown 或 False,请等待几分钟刷新页面。
检查是否已创建 Knative Kafka 资源:
$ oc get pods -n knative-eventing
输出示例
NAME READY STATUS RESTARTS AGE kafka-broker-dispatcher-7769fbbcbb-xgffn 2/2 Running 0 44s kafka-broker-receiver-5fb56f7656-fhq8d 2/2 Running 0 44s kafka-channel-dispatcher-84fd6cb7f9-k2tjv 2/2 Running 0 44s kafka-channel-receiver-9b7f795d5-c76xr 2/2 Running 0 44s kafka-controller-6f95659bf6-trd6r 2/2 Running 0 44s kafka-source-dispatcher-6bf98bdfff-8bcsn 2/2 Running 0 44s kafka-webhook-eventing-68dc95d54b-825xs 2/2 Running 0 44s
6.2.2. Knative Kafka 的安全配置
Kafka 集群通常使用 TLS 或 SASL 身份验证方法进行保护。您可以使用 TLS 或 SASL 将 Kafka 代理或频道配置为针对受保护的 Red Hat AMQ Streams 集群进行操作。
红帽建议您同时启用 SASL 和 TLS。
6.2.2.1. 为 Kafka 代理配置 TLS 身份验证
Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。
先决条件
- 在 OpenShift Container Platform 上具有集群管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
您有一个 Kafka 集群 CA 证书存储为一个
.pem
文件。 -
您有一个 Kafka 集群客户端证书,并存储为
.pem
文件的密钥。 -
安装 OpenShift CLI (
oc
) 。
流程
在
knative-eventing
命名空间中创建证书文件作为 secret:$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=caroot.pem \ --from-file=user.crt=certificate.pem \ --from-file=user.key=key.pem
重要使用密钥名称
ca.crt
、user.crt
和user.key
。不要更改它们。编辑
KnativeKafka
CR,并在broker
spec 中添加对 secret 的引用:apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...
6.2.2.2. 为 Kafka 代理配置 SASL 身份验证
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
- 在 OpenShift Container Platform 上具有集群管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您有一个 Kafka 集群的用户名和密码。
-
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。 -
如果启用了 TLS,您还需要 Kafka 集群的
ca.crt
证书文件。 -
安装 OpenShift CLI (
oc
) 。
流程
在
knative-eventing
命名空间中创建证书文件作为 secret:$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=user="my-sasl-user"
-
使用键名
ca.crt
,password
, 和sasl.mechanism
.不要更改它们。 如果要将 SASL 与公共 CA 证书搭配使用,您必须在创建 secret 时使用
tls.enabled=true
标志,而不是使用ca.crt
参数。例如:$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-literal=tls.enabled=true \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
-
使用键名
编辑
KnativeKafka
CR,并在broker
spec 中添加对 secret 的引用:apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...
6.2.2.3. 为 Kafka 频道配置 TLS 验证
Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。
先决条件
- 在 OpenShift Container Platform 上具有集群管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
您有一个 Kafka 集群 CA 证书存储为一个
.pem
文件。 -
您有一个 Kafka 集群客户端证书,并存储为
.pem
文件的密钥。 -
安装 OpenShift CLI (
oc
) 。
流程
在所选命名空间中创建证书文件作为 secret:
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-file=user.crt=certificate.pem \ --from-file=user.key=key.pem
重要使用密钥名称
ca.crt
、user.crt
和user.key
。不要更改它们。编辑
KnativeKafka
自定义资源:$ oc edit knativekafka
引用您的 secret 和 secret 的命名空间:
apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: channel: authSecretName: <kafka_auth_secret> authSecretNamespace: <kafka_auth_secret_namespace> bootstrapServers: <bootstrap_servers> enabled: true source: enabled: true
注意确保指定 bootstrap 服务器中的匹配端口。
例如:
apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: channel: authSecretName: tls-user authSecretNamespace: kafka bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9094 enabled: true source: enabled: true
6.2.2.4. 为 Kafka 频道配置 SASL 验证
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
- 在 OpenShift Container Platform 上具有集群管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您有一个 Kafka 集群的用户名和密码。
-
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。 -
如果启用了 TLS,您还需要 Kafka 集群的
ca.crt
证书文件。 -
安装 OpenShift CLI (
oc
) 。
流程
在所选命名空间中创建证书文件作为 secret:
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
-
使用键名
ca.crt
,password
, 和sasl.mechanism
.不要更改它们。 如果要将 SASL 与公共 CA 证书搭配使用,您必须在创建 secret 时使用
tls.enabled=true
标志,而不是使用ca.crt
参数。例如:$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-literal=tls.enabled=true \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
-
使用键名
编辑
KnativeKafka
自定义资源:$ oc edit knativekafka
引用您的 secret 和 secret 的命名空间:
apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: channel: authSecretName: <kafka_auth_secret> authSecretNamespace: <kafka_auth_secret_namespace> bootstrapServers: <bootstrap_servers> enabled: true source: enabled: true
注意确保指定 bootstrap 服务器中的匹配端口。
例如:
apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: channel: authSecretName: scram-user authSecretNamespace: kafka bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9093 enabled: true source: enabled: true
6.2.2.5. 为 Kafka 源配置 SASL 身份验证
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您有一个 Kafka 集群的用户名和密码。
-
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。 -
如果启用了 TLS,您还需要 Kafka 集群的
ca.crt
证书文件。 -
已安装 OpenShift (
oc
) CLI。
流程
在所选命名空间中创建证书文件作为 secret:
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ 1 --from-literal=user="my-sasl-user"
- 1
- SASL 类型可以是
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。
创建或修改 Kafka 源,使其包含以下
spec
配置:apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <kafka_auth_secret> key: user password: secretKeyRef: name: <kafka_auth_secret> key: password type: secretKeyRef: name: <kafka_auth_secret> key: saslType tls: enable: true caCert: 1 secretKeyRef: name: <kafka_auth_secret> key: ca.crt ...
- 1
- 如果您使用公有云 Kafka 服务,则不需要
caCert
spec,如 Red Hat OpenShift Streams for Apache Kafka。
6.2.2.6. 为 Kafka sink 配置安全性
Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Knative Kafka 唯一支持的流量加密方法。
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源(CR)已安装在 OpenShift Container Platform 集群中。 -
在
KnativeKafka
CR 中启用了 Kafka sink。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
您有一个 Kafka 集群 CA 证书存储为一个
.pem
文件。 -
您有一个 Kafka 集群客户端证书,并存储为
.pem
文件的密钥。 -
已安装 OpenShift (
oc
) CLI。 -
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。
流程
在与
KafkaSink
对象相同的命名空间中创建一个 secret:重要证书和密钥必须采用 PEM 格式。
对于使用 SASL 时没有加密的身份验证:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_PLAINTEXT \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-literal=user=<username> \ --from-literal=password=<password>
对于使用 TLS 的 SASL 和加密进行身份验证:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=<my_caroot.pem_file_path> \ 1 --from-literal=user=<username> \ --from-literal=password=<password>
- 1
- 如果您使用公有云管理 Kafka 服务,可以省略
ca.crt
来使用系统的根 CA,如用于 Apache Kafka 的 Red Hat OpenShift Streams。
使用 TLS 进行身份验证和加密:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=<my_caroot.pem_file_path> \ 1 --from-file=user.crt=<my_cert.pem_file_path> \ --from-file=user.key=<my_key.pem_file_path>
- 1
- 如果您使用公有云管理 Kafka 服务,可以省略
ca.crt
来使用系统的根 CA,如用于 Apache Kafka 的 Red Hat OpenShift Streams。
创建或修改
KafkaSink
对象,并在auth
spec 中添加对 secret 的引用:apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink_name> namespace: <namespace> spec: ... auth: secret: ref: name: <secret_name> ...
应用
KafkaSink
对象:$ oc apply -f <filename>
6.2.3. 配置 Kafka 代理设置
您可以通过创建配置映射并在 Kafka Broker
对象中引用此配置映射,配置复制因素、bootstrap 服务器和 Kafka 代理的主题分区数量。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源(CR)已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。
流程
修改
kafka-broker-config
配置映射,或创建自己的配置映射来包含以下配置:apiVersion: v1 kind: ConfigMap metadata: name: <config_map_name> 1 namespace: <namespace> 2 data: default.topic.partitions: <integer> 3 default.topic.replication.factor: <integer> 4 bootstrap.servers: <list_of_servers> 5
重要default.topic.replication.factor
值必须小于或等于集群中的 Kafka 代理实例数量。例如,如果您只有一个 Kafka 代理,则default.topic.replication.factor
值应该不超过"1"
。Kafka 代理配置映射示例
apiVersion: v1 kind: ConfigMap metadata: name: kafka-broker-config namespace: knative-eventing data: default.topic.partitions: "10" default.topic.replication.factor: "3" bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
应用配置映射:
$ oc apply -f <config_map_filename>
指定 Kafka
Broker
对象的配置映射:Broker 对象示例
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: <broker_name> 1 namespace: <namespace> 2 annotations: eventing.knative.dev/broker.class: Kafka 3 spec: config: apiVersion: v1 kind: ConfigMap name: <config_map_name> 4 namespace: <namespace> 5 ...
应用代理:
$ oc apply -f <broker_filename>
其他资源