7.3. 使用监听程序设置对 Kafka 集群的客户端访问


使用 Kafka 集群的地址,您可以提供对同一 OpenShift 集群中的客户端的访问权限;或者,为不同 OpenShift 命名空间或外部 OpenShift 上的客户端提供外部访问。此流程演示了如何配置从 OpenShift 外部或从另一个 OpenShift 集群配置对 Kafka 集群的客户端访问。

Kafka 侦听器提供对 Kafka 集群的访问。客户端访问使用以下配置进行保护:

  1. 为 Kafka 集群配置了外部监听程序,它启用了 TLS 加密和 mTLS 身份验证,并启用了 Kafka 简单 授权。
  2. 为客户端创建一个 KafkaUser,它带有 mTLS 身份验证,以及为 简单 授权定义的访问控制列表(ACL)。

您可以将监听程序配置为使用 mutual tlsscram-sha-512oauth 身份验证。mTLS 始终使用加密,但在使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证时也建议使用加密。

您可以为 Kafka 代理配置简单的oauthopa自定义授权。启用后,授权将应用到所有启用的监听程序。

当您配置 KafkaUser 身份验证和授权机制时,请确保它们与等同的 Kafka 配置匹配:

  • KafkaUser.spec.authentication 匹配 Kafka.spec.kafka.listeners[*].authentication
  • KafkaUser.spec.authorization matches Kafka.spec.kafka.authorization

您应该至少有一个监听程序支持您要用于 KafkaUser 的身份验证。

注意

Kafka 用户和 Kafka 代理之间的身份验证取决于每个身份验证设置。例如,如果在 Kafka 配置中也没有启用,则无法使用 mTLS 验证用户。

AMQ Streams operator 自动配置过程并创建身份验证所需的证书:

  • Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便使用 Kafka 集群进行身份验证。
  • User Operator 根据所选的验证类型,创建代表客户端身份验证的用户和用于客户端身份验证的安全凭证。

您可以将证书添加到客户端配置中。

在此过程中,使用 Cluster Operator 生成的 CA 证书,但 您可以通过安装自己的证书 来替换它们。您还可以将监听程序 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书

证书以 PEM (.crt) 和 PKCS #12 (.p12) 格式提供。此流程使用 PEM 证书。将 PEM 证书与使用 X.509 格式的证书的客户端一起使用。

注意

对于同一 OpenShift 集群和命名空间中的内部客户端,您可以在 pod 规格中挂载集群 CA 证书。如需更多信息,请参阅配置内部客户端以信任集群 CA

先决条件

  • Kafka 集群可供在 OpenShift 集群外运行的客户端进行连接
  • Cluster Operator 和 User Operator 在集群中运行

流程

  1. 使用 Kafka 侦听器配置 Kafka 集群。

    • 定义通过监听程序访问 Kafka 代理所需的身份验证。
    • 在 Kafka 代理上启用授权。

      侦听器配置示例

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 
      1
      
          - name: external 
      2
      
            port: 9094 
      3
      
            type: <listener_type> 
      4
      
            tls: true 
      5
      
            authentication:
              type: tls 
      6
      
            configuration: 
      7
      
              #...
          authorization: 
      8
      
            type: simple
            superUsers:
              - super-user-name 
      9
      
        # ...
      Copy to Clipboard Toggle word wrap

      1
      Generic Kafka listener schema reference 中描述了启用外部监听程序的配置选项。
      2
      用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
      3
      Kafka 中监听程序使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高版本,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
      4
      外部监听器类型指定为 route (只限 OpenShift), loadbalancer, nodeportingress (只限 Kubernetes)。内部监听程序指定为 internalcluster-ip
      5
      必需。侦听器上的 TLS 加密。对于 routeingress 类型监听程序,它必须设置为 true。对于 mTLS 身份验证,也使用 authentication 属性。
      6
      侦听器上的客户端身份验证机制。对于使用 mTLS 的服务器和客户端身份验证,您可以指定 tls: trueauthentication.type: tls
      7
      (可选)取决于监听器类型的要求,您可以指定额外的 监听程序配置
      8
      授权指定为 simple,它使用 AclAuthorizer Kafka 插件。
      9
      (可选)无论 ACL 中定义的任何访问限制,Super Super 用户可以访问所有代理。
      警告

      OpenShift Route 地址由 Kafka 集群的名称、监听程序的名称以及它在其中创建的命名空间的名称组成。例如,my-cluster-kafka-listener1-bootstrap-myproject (CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用 路由 监听程序类型,请注意地址的整个长度不超过 63 个字符的最大值。

  2. 创建或更新 Kafka 资源。

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

    Kafka 集群被配置为使用 mTLS 身份验证配置 Kafka 代理监听程序。

    为每个 Kafka 代理 pod 创建一个服务。

    创建一个服务,以用作连接到 Kafka 集群的 bootstrap 地址

    一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用 nodeport 监听器。

    验证 kafka 代理的身份的集群 CA 证书也会在 secret < cluster_name> -cluster-ca-cert 中创建。

    注意

    如果您在使用外部监听程序时需要扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。

  3. Kafka 资源的状态中检索可用于访问 Kafka 集群的 bootstrap 地址。

    oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'
    Copy to Clipboard Toggle word wrap

    例如:

    oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'
    Copy to Clipboard Toggle word wrap

    使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。

  4. 创建或修改代表需要访问 Kafka 集群的客户端的用户。

    • 指定与 Kafka 侦听器相同的身份验证类型。
    • 指定 简单 授权的授权 ACL。

      用户配置示例

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 
      1
      
      spec:
        authentication:
          type: tls 
      2
      
        authorization:
          type: simple
          acls: 
      3
      
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operations:
                - Describe
                - Read
            - resource:
                type: group
                name: my-group
                patternType: literal
              operations:
                - Read
      Copy to Clipboard Toggle word wrap

      1
      标签必须与 Kafka 集群的标签匹配。
      2
      身份验证指定为 mutual tls
      3
      简单的授权需要包含 ACL 规则列表才能应用到用户。规则根据 用户名 (my-user)定义 Kafka 资源允许的操作。
  5. 创建或修改 KafkaUser 资源。

    oc apply -f USER-CONFIG-FILE
    Copy to Clipboard Toggle word wrap

    创建用户,以及名称与 KafkaUser 资源相同的 secret。secret 包含 mTLS 身份验证的公钥和私钥。

    secret 示例

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: <public_key> # Public key of the clients CA
      user.crt: <user_certificate> # Public key of the user
      user.key: <user_private_key> # Private key of the user
      user.p12: <store> # PKCS #12 store for user certificates and keys
      user.password: <password_for_store> # Protects the PKCS #12 store
    Copy to Clipboard Toggle word wrap

  6. 从 Kafka 集群的 < cluster_name>-cluster-ca-cert secret 中提取集群 CA 证书。

    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
    Copy to Clipboard Toggle word wrap
  7. 从 < user_name&gt; secret 中提取用户 CA 证书。

    oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
    Copy to Clipboard Toggle word wrap
  8. 从 < user_name&gt; secret 中提取用户的私钥。

    oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
    Copy to Clipboard Toggle word wrap
  9. 使用 bootstrap 地址主机名和端口配置客户端以连接到 Kafka 集群:

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
    Copy to Clipboard Toggle word wrap
  10. 使用 truststore 凭证配置客户端以验证 Kafka 集群的身份。

    指定公共集群 CA 证书。

    truststore 配置示例

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
    Copy to Clipboard Toggle word wrap

    SSL 是 mTLS 身份验证指定的安全协议。通过 TLS 为 SCRAM-SHA-512 身份验证指定 SASL_SSL。PEM 是信任存储的文件格式。

  11. 使用密钥存储凭证配置客户端,以便在连接到 Kafka 集群时验证用户。

    指定公钥证书和私钥。

    keystore 配置示例

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
    Copy to Clipboard Toggle word wrap

    将密钥存储证书和私钥直接添加到配置中。以单行格式添加。在 BEGIN CERTIFICATEEND CERTIFICATE 分隔符之间,以换行符(\n)开头。也来自原始证书的 每行

    keystore 配置示例

    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
    Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat