4.6. Apache Kafka 的 Knative 代理实现
对于生产环境就绪的 Knative Eventing 部署,红帽建议为 Apache Kafka 使用 Knative 代理实现。代理是 Knative 代理的 Apache Kafka 原生实现,它将 CloudEvents 直接发送到 Kafka 实例。
Knative 代理与 Kafka 有一个原生集成,用于存储和路由事件。它可以更好地与 Kafka 集成用于代理,并在其他代理类型中触发模型,并减少网络跃点。Knative 代理实现的其他优点包括:
- 最少一次的交付保证
- 根据 CloudEvents 分区扩展排序事件交付
- 数据平面的高可用性
- 水平扩展数据平面
Apache Kafka 的 Knative 代理实现使用二进制内容模式将传入的 CloudEvents 存储为 Kafka 记录。这意味着,所有 CloudEvent 属性和扩展都会在 Kafka 记录上映射,而 CloudEvent 的 data
规格与 Kafka 记录的值对应。
4.6.1. 当没有配置为 default 代理类型时,创建 Apache Kafka 代理
如果您的 OpenShift Serverless 部署没有配置为使用 Kafka 代理作为默认代理类型,您仍可使用以下步骤创建基于 Kafka 的代理。
4.6.1.1. 使用 YAML 创建 Apache Kafka 代理
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。要使用 YAML 创建 Kafka 代理,您必须创建一个 YAML 文件来定义 Broker
对象,然后使用 oc apply
命令应用它。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。
流程
创建一个基于 Kafka 的代理作为 YAML 文件:
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: Kafka 1 name: example-kafka-broker spec: config: apiVersion: v1 kind: ConfigMap name: kafka-broker-config 2 namespace: knative-eventing
应用基于 Kafka 的代理 YAML 文件:
$ oc apply -f <filename>
4.6.1.2. 创建使用外部管理的 Kafka 主题的 Apache Kafka 代理
如果要在不创建自己的内部主题的情况下使用 Kafka 代理,您可以使用外部管理的 Kafka 主题。要做到这一点,您必须创建一个使用 kafka.eventing.knative.dev/external.topic
注解的 Kafka Broker
对象。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在 OpenShift Container Platform 集群中。 - 您可以访问一个 Kafka 实例,如 Red Hat AMQ Streams,并创建了 Kafka 主题。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。
流程
创建一个基于 Kafka 的代理作为 YAML 文件:
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: Kafka 1 kafka.eventing.knative.dev/external.topic: <topic_name> 2 ...
应用基于 Kafka 的代理 YAML 文件:
$ oc apply -f <filename>
4.6.1.3. 使用隔离数据平面的 Apache Kafka 的 Knative Broker 实现
带有隔离数据平面的 Apache Kafka 的 Knative Broker 实现只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。
有关红帽技术预览功能支持范围的更多信息,请参阅技术预览功能支持范围。
Apache Kafka 的 Knative Broker 实现有 2 个平面:
- Control plane(控制平面)
- 由与 Kubernetes API 通信、监视自定义对象和管理数据平面的控制器组成。
- 数据平面
-
侦听传入事件、与 Apache Kafka 通信以及向事件接收器发送事件的组件集合。Apache Kafka 数据平面的 Knative Broker 实现是事件流的位置。实现由
kafka-broker-receiver
和kafka-broker-dispatcher
部署组成。
当您配置 Kafka 的代理类时,Apache Kafka
的 Knative Broker 实现会使用共享的数据平面。这意味着 knative-eventing
命名空间中的 kafka-broker-receiver
和 kafka-broker-dispatcher
部署用于集群中的所有 Apache Kafka Broker。
但是,当您配置 KafkaNamespaced
的代理类时,Apache Kafka 代理控制器会为代理存在的每个命名空间创建一个新的数据平面。该数据平面都会被该命名空间中的所有 KafkaNamespaced
代理使用。这提供了数据平面之间的隔离,因此用户命名空间中的 kafka-broker-receiver
和 kafka-broker-dispatcher
部署仅用于该命名空间中的代理。
由于具有单独的数据平面,这个安全功能会创建更多部署并使用更多资源。除非有这样的隔离要求,请使用一个带有 Kafka
类的 常规 代理。
4.6.1.4. 为使用隔离的数据平面的 Apache Kafka 创建 Knative 代理
带有隔离数据平面的 Apache Kafka 的 Knative Broker 实现只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。
有关红帽技术预览功能支持范围的更多信息,请参阅技术预览功能支持范围。
要创建 KafkaNamespaced
代理,您必须将 eventing.knative.dev/broker.class
注解设置为 KafkaNamespaced
。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在 OpenShift Container Platform 集群中。 - 您可以访问 Apache Kafka 实例,如 Red Hat AMQ Streams,并创建了 Kafka 主题。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。
流程
使用 YAML 文件创建基于 Apache Kafka 的代理:
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: annotations: eventing.knative.dev/broker.class: KafkaNamespaced 1 name: default namespace: my-namespace 2 spec: config: apiVersion: v1 kind: ConfigMap name: my-config 3 ...
应用基于 Apache Kafka 的代理 YAML 文件:
$ oc apply -f <filename>
spec.config
中的 ConfigMap
对象必须与 Broker
对象位于同一个命名空间中:
apiVersion: v1 kind: ConfigMap metadata: name: my-config namespace: my-namespace data: ...
使用 KafkaNamespaced
类创建第一个 Broker
对象后,在命名空间中创建 kafka-broker-receiver
和 kafka-broker-dispatcher
部署。因此,同一命名空间中的 KafkaNamespaced
类的所有代理都将使用相同的数据平面。如果命名空间中没有 KafkaNamespaced
类的代理,则会删除命名空间中的数据平面。
4.6.2. 配置 Apache Kafka 代理设置
您可以通过创建配置映射并在 Kafka Broker
对象中引用此配置映射,配置复制因素、bootstrap 服务器和 Kafka 代理的主题分区数量。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源(CR)已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。
流程
修改
kafka-broker-config
配置映射,或创建自己的配置映射来包含以下配置:apiVersion: v1 kind: ConfigMap metadata: name: <config_map_name> 1 namespace: <namespace> 2 data: default.topic.partitions: <integer> 3 default.topic.replication.factor: <integer> 4 bootstrap.servers: <list_of_servers> 5
重要default.topic.replication.factor
值必须小于或等于集群中的 Kafka 代理实例数量。例如,如果您只有一个 Kafka 代理,则default.topic.replication.factor
值应该不超过"1"
。Kafka 代理配置映射示例
apiVersion: v1 kind: ConfigMap metadata: name: kafka-broker-config namespace: knative-eventing data: default.topic.partitions: "10" default.topic.replication.factor: "3" bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
应用配置映射:
$ oc apply -f <config_map_filename>
指定 Kafka
Broker
对象的配置映射:Broker 对象示例
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: <broker_name> 1 namespace: <namespace> 2 annotations: eventing.knative.dev/broker.class: Kafka 3 spec: config: apiVersion: v1 kind: ConfigMap name: <config_map_name> 4 namespace: <namespace> 5 ...
应用代理:
$ oc apply -f <broker_filename>
4.6.3. Apache Kafka 的 Knative 代理实现的安全配置
Kafka 集群通常使用 TLS 或 SASL 身份验证方法进行保护。您可以使用 TLS 或 SASL 将 Kafka 代理或频道配置为针对受保护的 Red Hat AMQ Streams 集群进行操作。
红帽建议您同时启用 SASL 和 TLS。
4.6.3.1. 为 Apache Kafka 代理配置 TLS 身份验证
Apache Kafka 客户端和服务器使用 传输层安全性 (TLS) 来加密 Knative 和 Kafka 之间的流量,以及用于身份验证。TLS 是 Apache Kafka 的 Knative 代理实现唯一支持的流量加密方法。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
您有一个 Kafka 集群 CA 证书存储为一个
.pem
文件。 -
您有一个 Kafka 集群客户端证书,并存储为
.pem
文件的密钥。 -
安装 OpenShift CLI (
oc
) 。
流程
在
knative-eventing
命名空间中创建证书文件作为 secret:$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=caroot.pem \ --from-file=user.crt=certificate.pem \ --from-file=user.key=key.pem
重要使用密钥名称
ca.crt
、user.crt
和user.key
。不要更改它们。编辑
KnativeKafka
CR,并在broker
spec 中添加对 secret 的引用:apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...
4.6.3.2. 为 Apache Kafka 代理配置 SASL 身份验证
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您有一个 Kafka 集群的用户名和密码。
-
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。 -
如果启用了 TLS,您还需要 Kafka 集群的
ca.crt
证书文件。 -
安装 OpenShift CLI (
oc
) 。
流程
在
knative-eventing
命名空间中创建证书文件作为 secret:$ oc create secret -n knative-eventing generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=user="my-sasl-user"
-
使用键名
ca.crt
,password
, 和sasl.mechanism
.不要更改它们。 如果要将 SASL 与公共 CA 证书搭配使用,您必须在创建 secret 时使用
tls.enabled=true
标志,而不是使用ca.crt
参数。例如:$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-literal=tls.enabled=true \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
-
使用键名
编辑
KnativeKafka
CR,并在broker
spec 中添加对 secret 的引用:apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: namespace: knative-eventing name: knative-kafka spec: broker: enabled: true defaultConfig: authSecretName: <secret_name> ...