14.4. 使用监听程序设置对 Kafka 集群的客户端访问
使用 Kafka 集群的地址,您可以提供对同一 OpenShift 集群中的客户端的访问权限;或者,提供对不同 OpenShift 命名空间或 OpenShift 外部客户端的外部访问权限。此流程演示了如何配置从 OpenShift 外部或从另一个 OpenShift 集群配置对 Kafka 集群的客户端访问。
Kafka 侦听器提供对 Kafka 集群的访问。客户端访问是通过以下配置进行保护的:
-
为 Kafka 集群配置了外部监听程序,并启用了 TLS 加密和 mTLS 身份验证,并启用了 Kafka
简单授权。 -
为客户端创建一个
KafkaUser,带有 mTLS 身份验证,并为简单授权定义访问控制列表(ACL)。
您可以将监听程序配置为使用 mutual tls、scram-sha-512 或 oauth 身份验证。mTLS 始终使用加密,但在使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证时也建议使用加密。
您可以为 Kafka 代理配置简单、oauth、opa 或 自定义授权。启用后,授权将应用到所有已启用的监听程序。
当您配置 KafkaUser 身份验证和授权机制时,请确保它们与对应的 Kafka 配置匹配:
-
KafkaUser.spec.authentication匹配Kafka.spec.kafka.listeners[*].authentication -
KafkaUser.spec.authorization匹配Kafka.spec.kafka.authorization
您应该至少有一个监听程序支持您要用于 KafkaUser 的身份验证。
Kafka 用户和 Kafka 代理之间的身份验证取决于每个代理的身份验证设置。例如,如果在 Kafka 配置中也未启用,则无法验证带有 mTLS 的用户。
Apache Kafka operator 的 Streams 会自动配置过程并创建身份验证所需的证书:
- Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便使用 Kafka 集群启用身份验证。
- User Operator 根据所选的验证类型,创建代表客户端的用户以及用于客户端身份验证的安全凭证。
您可以将证书添加到客户端配置中。
在此过程中,会使用 Cluster Operator 生成的 CA 证书,但 您可以通过安装自己的证书 来替换它们。您还可以将侦听器 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书。
证书以 PEM (.crt) 和 PKCS #12 (.p12) 格式提供。此流程使用 PEM 证书。使用带有 X.509 格式的证书的 PEM 证书。
对于同一 OpenShift 集群和命名空间中的内部客户端,您可以在 Pod 规格中挂载集群 CA 证书。如需更多信息,请参阅配置内部客户端以信任集群 CA。
先决条件
- Kafka 集群可供在 OpenShift 集群外部运行的客户端进行连接
- Cluster Operator 和 User Operator 在集群中运行
流程
使用 Kafka 侦听器配置 Kafka 集群。
- 定义通过监听程序访问 Kafka 代理所需的身份验证。
在 Kafka 代理上启用授权。
侦听器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners:1 - name: external12 port: 90943 type: <listener_type>4 tls: true5 authentication: type: tls6 configuration:7 #... authorization:8 type: simple superUsers: - super-user-name9 # ...- 1
- 在 Generic Kafka listener schema reference 中描述了启用外部监听程序的配置选项。
- 2
- 用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
- 3
- Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高,但端口 9404 和 9999 除外,它们已经用于 Prometheus 和 JMX。根据监听器类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 4
- 外部监听器类型指定为
route(只限 OpenShift),loadbalancer,nodeport或ingress(只限 Kubernetes)。内部监听程序指定为internal或cluster-ip。 - 5
- 必需。侦听器上的 TLS 加密。对于
route和ingress类型监听程序,必须设置为true。对于 mTLS 身份验证,也使用authentication属性。 - 6
- 侦听器上的客户端身份验证机制。对于使用 mTLS 的服务器和客户端身份验证,您可以指定
tls: true和authentication.type: tls。 - 7
- (可选)根据监听器类型的要求,您可以指定额外的 监听程序配置。
- 8
- 授权指定为
simple,它使用AclAuthorizer和StandardAuthorizerKafka 插件。 - 9
- (可选)无论 ACL 中定义的任何访问限制,Super 用户可以访问所有代理。
警告OpenShift 路由地址由 Kafka 集群名称、侦听器名称、项目名称和路由器的域组成。例如,
my-cluster-kafka-external1-bootstrap-my-project.domain.com(<cluster_name>-kafka-<listener_name>-bootstrap-<namespace>.<domain>)。每个 DNS 标签(between period ".")不得超过 63 个字符,地址的总长度不得超过 255 个字符。
创建或更新
Kafka资源。oc apply -f <kafka_configuration_file>Kafka 集群使用 mTLS 身份验证配置 Kafka 代理监听程序。
为每个 Kafka 代理 pod 创建一个服务。
创建一个服务作为与 Kafka 集群连接的 bootstrap 地址。
一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用
nodeport监听器。在 secret <cluster
_name> -cluster-ca-cert 中还创建了用于验证 kafka 代理的身份的集群CA 证书。注意如果您在使用外部监听程序时需要扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。
从
Kafka资源的状态中检索可用于访问 Kafka 集群的 bootstrap 地址。oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'例如:
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。
创建或修改代表需要访问 Kafka 集群的客户端的用户。
-
指定与
Kafka侦听器相同的身份验证类型。 指定用于
简单授权的授权 ACL。用户配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster1 spec: authentication: type: tls2 authorization: type: simple acls:3 - resource: type: topic name: my-topic patternType: literal operations: - Describe - Read - resource: type: group name: my-group patternType: literal operations: - Read
-
指定与
创建或修改
KafkaUser资源。oc apply -f USER-CONFIG-FILE创建了用户,以及名称与
KafkaUser资源相同的 secret。secret 包含 mTLS 身份验证的公钥和私钥。secret 示例
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: <public_key> # Public key of the clients CA user.crt: <user_certificate> # Public key of the user user.key: <user_private_key> # Private key of the user user.p12: <store> # PKCS #12 store for user certificates and keys user.password: <password_for_store> # Protects the PKCS #12 store从 Kafka 集群的 <
;cluster-ca-certsecret 中提取集群 CA 证书。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt从 <
user_name> secret 中提取用户 CA 证书。oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt从 <
user_name> secret 中提取用户的私钥。oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key使用 bootstrap 地址主机名和端口配置客户端,以连接到 Kafka 集群:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");使用 truststore 凭证配置客户端,以验证 Kafka 集群的身份。
指定公共集群 CA 证书。
truststore 配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");SSL 是 mTLS 验证的指定的安全协议。为通过 TLS 进行 SCRAM-SHA-512 验证指定
SASL_SSL。PEM 是信任存储的文件格式。使用密钥存储凭证配置客户端,以便在连接到 Kafka 集群时验证用户。
指定公钥和私钥。
keystore 配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");将密钥存储证书和私钥直接添加到配置中。以单行格式添加。在
BEGIN CERTIFICATE和END CERTIFICATE分隔符之间,以换行符(\n)开始。使用\n结束原始证书中的每一行。keystore 配置示例
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");