7.3. 使用监听程序设置对 Kafka 集群的客户端访问
使用 Kafka 集群的地址,您可以提供对同一 OpenShift 集群中的客户端的访问权限;或者,为不同 OpenShift 命名空间或外部 OpenShift 上的客户端提供外部访问。此流程演示了如何配置从 OpenShift 外部或从另一个 OpenShift 集群配置对 Kafka 集群的客户端访问。
Kafka 侦听器提供对 Kafka 集群的访问。客户端访问使用以下配置进行保护:
-
为 Kafka 集群配置了外部监听程序,它启用了 TLS 加密和 mTLS 身份验证,并启用了 Kafka
简单授权。 -
为客户端创建一个
KafkaUser,它带有 mTLS 身份验证,以及为简单授权定义的访问控制列表(ACL)。
您可以将监听程序配置为使用 mutual tls、scram-sha-512 或 oauth 身份验证。mTLS 始终使用加密,但在使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证时也建议使用加密。
您可以为 Kafka 代理配置简单的、oauth、opa 或 自定义授权。启用后,授权将应用到所有启用的监听程序。
当您配置 KafkaUser 身份验证和授权机制时,请确保它们与等同的 Kafka 配置匹配:
-
KafkaUser.spec.authentication匹配Kafka.spec.kafka.listeners[*].authentication -
KafkaUser.spec.authorizationmatchesKafka.spec.kafka.authorization
您应该至少有一个监听程序支持您要用于 KafkaUser 的身份验证。
Kafka 用户和 Kafka 代理之间的身份验证取决于每个身份验证设置。例如,如果在 Kafka 配置中也没有启用,则无法使用 mTLS 验证用户。
AMQ Streams operator 自动配置过程并创建身份验证所需的证书:
- Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便使用 Kafka 集群进行身份验证。
- User Operator 根据所选的验证类型,创建代表客户端身份验证的用户和用于客户端身份验证的安全凭证。
您可以将证书添加到客户端配置中。
在此过程中,使用 Cluster Operator 生成的 CA 证书,但 您可以通过安装自己的证书 来替换它们。您还可以将监听程序 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书。
证书以 PEM (.crt) 和 PKCS #12 (.p12) 格式提供。此流程使用 PEM 证书。将 PEM 证书与使用 X.509 格式的证书的客户端一起使用。
对于同一 OpenShift 集群和命名空间中的内部客户端,您可以在 pod 规格中挂载集群 CA 证书。如需更多信息,请参阅配置内部客户端以信任集群 CA。
先决条件
- Kafka 集群可供在 OpenShift 集群外运行的客户端进行连接
- Cluster Operator 和 User Operator 在集群中运行
流程
使用 Kafka 侦听器配置 Kafka 集群。
- 定义通过监听程序访问 Kafka 代理所需的身份验证。
在 Kafka 代理上启用授权。
侦听器配置示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- Generic Kafka listener schema reference 中描述了启用外部监听程序的配置选项。
- 2
- 用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
- 3
- Kafka 中监听程序使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高版本,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 4
- 外部监听器类型指定为
route(只限 OpenShift),loadbalancer,nodeport或ingress(只限 Kubernetes)。内部监听程序指定为internal或cluster-ip。 - 5
- 必需。侦听器上的 TLS 加密。对于
route和ingress类型监听程序,它必须设置为true。对于 mTLS 身份验证,也使用authentication属性。 - 6
- 侦听器上的客户端身份验证机制。对于使用 mTLS 的服务器和客户端身份验证,您可以指定
tls: true和authentication.type: tls。 - 7
- (可选)取决于监听器类型的要求,您可以指定额外的 监听程序配置。
- 8
- 授权指定为
simple,它使用AclAuthorizerKafka 插件。 - 9
- (可选)无论 ACL 中定义的任何访问限制,Super Super 用户可以访问所有代理。
警告OpenShift Route 地址由 Kafka 集群的名称、监听程序的名称以及它在其中创建的命名空间的名称组成。例如,
my-cluster-kafka-listener1-bootstrap-myproject(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用路由监听程序类型,请注意地址的整个长度不超过 63 个字符的最大值。
创建或更新
Kafka资源。oc apply -f <kafka_configuration_file>
oc apply -f <kafka_configuration_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka 集群被配置为使用 mTLS 身份验证配置 Kafka 代理监听程序。
为每个 Kafka 代理 pod 创建一个服务。
创建一个服务,以用作连接到 Kafka 集群的 bootstrap 地址。
一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用
nodeport监听器。验证 kafka 代理的身份的集群 CA 证书也会在 secret <
cluster_name> -cluster-ca-cert中创建。注意如果您在使用外部监听程序时需要扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。
从
Kafka资源的状态中检索可用于访问 Kafka 集群的 bootstrap 地址。oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。
创建或修改代表需要访问 Kafka 集群的客户端的用户。
创建或修改
KafkaUser资源。oc apply -f USER-CONFIG-FILE
oc apply -f USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow 创建用户,以及名称与
KafkaUser资源相同的 secret。secret 包含 mTLS 身份验证的公钥和私钥。secret 示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 从 Kafka 集群的 <
cluster_name>-cluster-ca-certsecret 中提取集群 CA 证书。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crtoc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crtCopy to Clipboard Copied! Toggle word wrap Toggle overflow 从 <
user_name> secret 中提取用户 CA 证书。oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crtoc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crtCopy to Clipboard Copied! Toggle word wrap Toggle overflow 从 <
user_name> secret 中提取用户的私钥。oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.keyoc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.keyCopy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 bootstrap 地址主机名和端口配置客户端以连接到 Kafka 集群:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用 truststore 凭证配置客户端以验证 Kafka 集群的身份。
指定公共集群 CA 证书。
truststore 配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");Copy to Clipboard Copied! Toggle word wrap Toggle overflow SSL 是 mTLS 身份验证指定的安全协议。通过 TLS 为 SCRAM-SHA-512 身份验证指定
SASL_SSL。PEM 是信任存储的文件格式。使用密钥存储凭证配置客户端,以便在连接到 Kafka 集群时验证用户。
指定公钥证书和私钥。
keystore 配置示例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将密钥存储证书和私钥直接添加到配置中。以单行格式添加。在
BEGIN CERTIFICATE和END CERTIFICATE分隔符之间,以换行符(\n)开头。也来自原始证书的每行。keystore 配置示例
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");Copy to Clipboard Copied! Toggle word wrap Toggle overflow