7.2. 使用监听程序设置对 Kafka 集群的客户端访问
使用 Kafka 集群的地址,您可以提供对同一 OpenShift 集群中的客户端的访问权限;或提供对不同 OpenShift 命名空间或 OpenShift 之外的客户端的外部访问权限。此流程演示了如何配置从 OpenShift 外部或从另一个 OpenShift 集群配置对 Kafka 集群的客户端访问。
Kafka 侦听器提供访问。支持以下监听程序类型:
-
在同一 OpenShift 集群内连接的
内部
-
使用 OpenShift
Route
和默认 HAProxy 路由器的路由
-
LoadBalancer
使用负载均衡器服务 -
NodePort
使用 OpenShift 节点上的端口 -
Ingress
使用 OpenShift Ingress 和 Ingress NGINX Controller for Kubernetes -
cluster-ip
使用 per-brokerClusterIP
服务公开 Kafka
所选的类型取决于您的要求,以及您的环境和基础架构。例如,负载均衡器可能不适用于某些基础架构,如裸机,节点端口提供了更好的选项。
在这个流程中:
-
为 Kafka 集群配置外部监听程序,启用 TLS 加密和 mTLS 身份验证,以及 Kafka
简单
授权。 -
为客户端创建
KafkaUser
,使用 mTLS 验证和为简单
授权定义的访问控制列表(ACL)创建。
您可以将监听程序配置为使用 mutual tls
、scram-sha-512
或 oauth
身份验证。mTLS 始终使用加密,但在使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证时,也建议加密。
您可以为 Kafka 代理配置简单的
、oauth
、opa
或 自定义
授权。启用后,授权将应用到所有启用的监听程序。
当您配置 KafkaUser
身份验证和授权机制时,请确保它们与对等的 Kafka 配置匹配:
-
KafkaUser.spec.authentication
匹配Kafka.spec.kafka.listeners[*].authentication
-
KafkaUser.spec.authorization
匹配Kafka.spec.kafka.authorization
您应该至少有一个监听程序支持您要用于 KafkaUser
的身份验证。
Kafka 用户和 Kafka 代理之间的身份验证取决于每个验证设置。例如,如果 Kafka 配置中还没有启用,则无法使用 mTLS 验证用户。
AMQ Streams operator 会自动完成配置过程,并创建身份验证所需的证书:
- Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便通过 Kafka 集群启用身份验证。
- User Operator 根据所选的身份验证类型创建代表客户端和用于客户端身份验证的安全凭证的用户。
您可以将证书添加到客户端配置中。
在这一流程中,会使用 Cluster Operator 生成的 CA 证书,但您可以 通过安装自己的证书 来替换它们。您还可以将监听程序 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书。
证书以 PEM (.crt)和 PKCS #12 (.p12)格式提供。这个过程使用 PEM 证书。将 PEM 证书用于 X.509 格式使用证书的客户端。
对于同一 OpenShift 集群和命名空间中的内部客户端,您可以在 pod 规格中挂载集群 CA 证书。如需更多信息,请参阅配置内部客户端以信任集群 CA。
先决条件
- Kafka 集群可供 OpenShift 集群外部运行的客户端进行连接
- Cluster Operator 和 User Operator 在集群中运行
流程
使用 Kafka 侦听器配置 Kafka 集群。
- 定义通过监听程序访问 Kafka 代理所需的身份验证。
在 Kafka 代理上启用授权。
监听程序配置示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 有关启用外部监听器的配置选项,请参考 Generic Kafka 侦听器 schema。
- 2
- 用于标识监听程序的名称。必须在 Kafka 集群中唯一。
- 3
- Kafka 内监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许的端口号为 9092 及更高版本,除了端口 9404 和 9999 除外,后者已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 4
- 将外部监听程序类型指定为
路由
、loadbalancer
、nodeport
或ingress
。内部监听程序被指定为internal
或cluster-ip
。 - 5
- 必需。监听器上的 TLS 加密。对于
route
和ingress
类型监听程序,它必须设置为true
。对于 mTLS 身份验证,也使用authentication
属性。 - 6
- 侦听器上的客户端身份验证机制。对于使用 mTLS 的服务器和客户端身份验证,请指定
tls: true
和authentication.type: tls
。 - 7
- (可选)根据监听程序类型的要求,您可以指定额外的 监听程序配置。
- 8
- 授权指定为
simple
,它使用AclAuthorizer
Kafka 插件。 - 9
- (可选)Super 用户可以访问所有代理,无论 ACL 中定义的任何访问限制如何。
警告OpenShift Route 地址由 Kafka 集群的名称、侦听器的名称以及在其中创建的命名空间的名称组成。例如,
my-cluster-kafka-listener1-bootstrap-myproject
(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用路由
监听程序类型,请注意地址的整个长度不超过 63 个字符的最大值。
创建或更新
Kafka
资源。oc apply -f <kafka_configuration_file>
oc apply -f <kafka_configuration_file>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka 集群使用 mTLS 身份验证配置有 Kafka 代理监听程序。
为每个 Kafka 代理 pod 创建服务。
创建服务作为连接到 Kafka 集群的 bootstrap 地址。
一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用
nodeport
监听器。在 secret <cluster
_name> -cluster-ca-cert 中也创建了用于验证 kafka 代理身份的集群
CA 证书。注意如果您在使用外部监听程序时扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。
检索您用来从
Kafka
资源状态访问 Kafka 集群的 bootstrap 地址。oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'
oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。
创建或修改代表需要访问 Kafka 集群的客户端的用户。
创建或修改
KafkaUser
资源。oc apply -f USER-CONFIG-FILE
oc apply -f USER-CONFIG-FILE
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 创建用户,以及名称与
KafkaUser
资源的名称相同的 secret。secret 包含用于 mTLS 验证的公钥和私钥。secret 示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 从 Kafka 集群的 <
cluster_name>-cluster-ca-cert
secret 中提取集群 CA 证书。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 从 <
user_name>
; secret 中提取用户 CA 证书。oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 从 <
user_name>
; secret 中提取用户的私钥。oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用用于连接 Kafka 集群的 bootstrap 地址主机名和端口配置客户端:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 truststore 凭证配置客户端,以验证 Kafka 集群的身份。
指定公共集群 CA 证书。
信任存储配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow SSL 是用于 mTLS 验证的指定安全协议。为 TLS 的 SCRAM-SHA-512 身份验证指定
SASL_SSL
。PEM 是 truststore 的文件格式。使用密钥存储凭证配置您的客户端,以在连接到 Kafka 集群时验证用户。
指定公共证书和私钥。
密钥存储配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将密钥存储证书和私钥直接添加到配置中。将 作为单行格式添加。在
BEGIN CERTIFICATE
和END CERTIFICATE
分隔符之间,以换行符(\n
)开头。结束原始证书中的每一行也带有\n
。密钥存储配置示例
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow