13.4. 使用监听程序设置对 Kafka 集群的客户端访问
使用 Kafka 集群的地址,您可以提供对同一 OpenShift 集群中的客户端的访问权限;或者,提供对不同 OpenShift 命名空间或 OpenShift 外部客户端的外部访问权限。此流程演示了如何配置从 OpenShift 外部或从另一个 OpenShift 集群配置对 Kafka 集群的客户端访问。
Kafka 侦听器提供对 Kafka 集群的访问。客户端访问是通过以下配置进行保护的:
-
为 Kafka 集群配置了外部监听程序,并启用了 TLS 加密和 mTLS 身份验证,并启用了 Kafka
简单授权。 -
为客户端创建一个
KafkaUser,带有 mTLS 身份验证,并为简单授权定义访问控制列表(ACL)。
您可以将监听程序配置为使用 mutual tls、scram-sha-512 或 oauth 身份验证。mTLS 始终使用加密,但在使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证时也建议使用加密。
您可以为 Kafka 代理配置简单、oauth、opa 或 自定义授权。启用后,授权将应用到所有已启用的监听程序。
当您配置 KafkaUser 身份验证和授权机制时,请确保它们与对应的 Kafka 配置匹配:
-
KafkaUser.spec.authentication匹配Kafka.spec.kafka.listeners[*].authentication -
KafkaUser.spec.authorization匹配Kafka.spec.kafka.authorization
您应该至少有一个监听程序支持您要用于 KafkaUser 的身份验证。
Kafka 用户和 Kafka 代理之间的身份验证取决于每个代理的身份验证设置。例如,如果在 Kafka 配置中也未启用,则无法验证带有 mTLS 的用户。
AMQ Streams Operator 自动配置过程并创建身份验证所需的证书:
- Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便使用 Kafka 集群启用身份验证。
- User Operator 根据所选的验证类型,创建代表客户端的用户以及用于客户端身份验证的安全凭证。
您可以将证书添加到客户端配置中。
在此过程中,会使用 Cluster Operator 生成的 CA 证书,但 您可以通过安装自己的证书 来替换它们。您还可以将侦听器 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书。
证书以 PEM (.crt) 和 PKCS #12 (.p12) 格式提供。此流程使用 PEM 证书。使用带有 X.509 格式的证书的 PEM 证书。
对于同一 OpenShift 集群和命名空间中的内部客户端,您可以在 Pod 规格中挂载集群 CA 证书。如需更多信息,请参阅配置内部客户端以信任集群 CA。
先决条件
- Kafka 集群可供在 OpenShift 集群外部运行的客户端进行连接
- Cluster Operator 和 User Operator 在集群中运行
流程
使用 Kafka 侦听器配置 Kafka 集群。
- 定义通过监听程序访问 Kafka 代理所需的身份验证。
在 Kafka 代理上启用授权。
侦听器配置示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 在 Generic Kafka listener schema reference 中描述了启用外部监听程序的配置选项。
- 2
- 用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
- 3
- Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高,但端口 9404 和 9999 除外,它们已经用于 Prometheus 和 JMX。根据监听器类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 4
- 外部监听器类型指定为
route(只限 OpenShift),loadbalancer,nodeport或ingress(只限 Kubernetes)。内部监听程序指定为internal或cluster-ip。 - 5
- 必需。侦听器上的 TLS 加密。对于
route和ingress类型监听程序,必须设置为true。对于 mTLS 身份验证,也使用authentication属性。 - 6
- 侦听器上的客户端身份验证机制。对于使用 mTLS 的服务器和客户端身份验证,您可以指定
tls: true和authentication.type: tls。 - 7
- (可选)根据监听器类型的要求,您可以指定额外的 监听程序配置。
- 8
- 授权指定为
simple,它使用AclAuthorizer和StandardAuthorizerKafka 插件。 - 9
- (可选)无论 ACL 中定义的任何访问限制,Super 用户可以访问所有代理。
警告OpenShift 路由地址由 Kafka 集群名称、侦听器名称、项目名称和路由器的域组成。例如,
my-cluster-kafka-external1-bootstrap-my-project.domain.com(<cluster_name>-kafka-<listener_name>-bootstrap-<namespace>.<domain>)。每个 DNS 标签(between period ".")不得超过 63 个字符,地址的总长度不得超过 255 个字符。
创建或更新
Kafka资源。oc apply -f <kafka_configuration_file>
oc apply -f <kafka_configuration_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka 集群使用 mTLS 身份验证配置 Kafka 代理监听程序。
为每个 Kafka 代理 pod 创建一个服务。
创建一个服务作为与 Kafka 集群连接的 bootstrap 地址。
一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用
nodeport监听器。在 secret <cluster
_name> -cluster-ca-cert 中还创建了用于验证 kafka 代理的身份的集群CA 证书。注意如果您在使用外部监听程序时需要扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。
从
Kafka资源的状态中检索可用于访问 Kafka 集群的 bootstrap 地址。oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。
创建或修改代表需要访问 Kafka 集群的客户端的用户。
创建或修改
KafkaUser资源。oc apply -f USER-CONFIG-FILE
oc apply -f USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow 创建了用户,以及名称与
KafkaUser资源相同的 secret。secret 包含 mTLS 身份验证的公钥和私钥。secret 示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 从 Kafka 集群的 <
;cluster-ca-certsecret 中提取集群 CA 证书。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crtoc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crtCopy to Clipboard Copied! Toggle word wrap Toggle overflow 从 <
user_name> secret 中提取用户 CA 证书。oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crtoc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crtCopy to Clipboard Copied! Toggle word wrap Toggle overflow 从 <
user_name> secret 中提取用户的私钥。oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.keyoc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.keyCopy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 bootstrap 地址主机名和端口配置客户端,以连接到 Kafka 集群:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 truststore 凭证配置客户端,以验证 Kafka 集群的身份。
指定公共集群 CA 证书。
truststore 配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");Copy to Clipboard Copied! Toggle word wrap Toggle overflow SSL 是 mTLS 验证的指定的安全协议。为通过 TLS 进行 SCRAM-SHA-512 验证指定
SASL_SSL。PEM 是信任存储的文件格式。使用密钥存储凭证配置客户端,以便在连接到 Kafka 集群时验证用户。
指定公钥和私钥。
keystore 配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将密钥存储证书和私钥直接添加到配置中。以单行格式添加。在
BEGIN CERTIFICATE和END CERTIFICATE分隔符之间,以换行符(\n)开始。使用\n结束原始证书中的每一行。keystore 配置示例
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");Copy to Clipboard Copied! Toggle word wrap Toggle overflow