15.4. 示例:设置安全客户端访问


此流程演示了如何配置从 OpenShift 外部或从另一个 OpenShift 集群配置对 Kafka 集群的客户端访问。它分为两个部分:

  • 保护 Kafka 代理
  • 保护用户对 Kafka 的访问

资源配置

对 Kafka 集群的客户端访问是通过以下配置进行保护的:

  1. 外部监听程序在 Kafka 资源中配置 TLS 加密和 mutual TLS (mTLS)身份验证,以及简单的授权。
  2. 为客户端创建一个 KafkaUser,使用 mTLS 身份验证,并为 简单 授权定义访问控制列表(ACL)。

必须至少为 KafkaUser 配置支持所需身份验证的监听程序。

可以为 mutual TLSSCRAM-SHA-512OAuth 身份验证配置监听程序。虽然 mTLS 总是使用加密,但在使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证时也建议使用它。

Kafka 的授权选项包括 simpleOAuthOPAcustom。启用后,授权将应用到所有已启用的监听程序。

为确保 Kafka 和客户端之间的兼容性,配置以下身份验证和授权机制必须保持一致:

  • 对于 type: tlstype: scram-sha-512 验证类型,Kafka.spec.kafka.listeners[*].authentication 必须与 KafkaUser.spec.authentication匹配
  • 对于 type: simple authorization,Kafka.spec.kafka.authorization 必须与 KafkaUser.spec.authorization匹配

例如,只有在 Kafka 配置中也启用了用户时,才能对用户 mTLS 身份验证。

自动化和证书管理

Apache Kafka operator 的 Streams 会自动配置过程并创建身份验证所需的证书:

  • Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。
  • User Operator 根据所选的验证类型,创建代表客户端的用户以及用于客户端身份验证的安全凭证。

您可以将证书添加到客户端配置中。

在此过程中,使用 Cluster Operator 生成的 CA 证书。或者,您可以通过安装自己的自定义 CA 证书 来替换它们。您还可以将监听程序 配置为使用由外部 CA 管理的 Kafka 侦听器证书

证书以 PEM (.crt) 和 PKCS #12 (.p12) 格式提供。此流程使用 PEM 证书。使用带有支持 X.509 证书格式的客户端的 PEM 证书。

注意

对于同一 OpenShift 集群和命名空间中的内部客户端,您可以在 Pod 规格中挂载集群 CA 证书。如需更多信息,请参阅配置内部客户端以信任集群 CA

先决条件

  • Kafka 集群可供在 OpenShift 集群外部运行的客户端进行连接
  • Cluster Operator 和 User Operator 在集群中运行

15.4.1. 保护 Kafka 代理

  1. 使用 Kafka 侦听器配置 Kafka 集群。

    • 定义通过监听程序访问 Kafka 代理所需的身份验证。
    • 在 Kafka 代理上启用授权。

      侦听器配置示例

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 
      1
      
          - name: external1 
      2
      
            port: 9094 
      3
      
            type: <listener_type> 
      4
      
            tls: true 
      5
      
            authentication:
              type: tls 
      6
      
            configuration: 
      7
      
              #...
          authorization: 
      8
      
            type: simple
            superUsers:
              - super-user-name 
      9
      
        # ...
      Copy to Clipboard Toggle word wrap

      1
      Generic Kafka listener schema reference 中描述了启用外部监听程序的配置选项。
      2
      用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
      3
      Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高,但端口 9404 和 9999 除外,它们已经用于 Prometheus 和 JMX。根据监听器类型,端口号可能与连接 Kafka 客户端的端口号不同。
      4
      外部监听器类型指定为 route (只限 OpenShift), loadbalancer, nodeportingress (只限 Kubernetes)。内部监听程序指定为 internalcluster-ip
      5
      必需。侦听器上的 TLS 加密。对于 routeingress 类型监听程序,必须设置为 true。对于 mTLS 身份验证,也使用 authentication 属性。
      6
      侦听器上的客户端身份验证机制。对于使用 mTLS 的服务器和客户端身份验证,您可以指定 tls: trueauthentication.type: tls
      7
      (可选)根据监听器类型的要求,您可以指定额外的 监听程序配置
      8
      授权指定为 simple,它使用 AclAuthorizerStandardAuthorizer Kafka 插件。
      9
      (可选)无论 ACL 中定义的任何访问限制,Super 用户可以访问所有代理。
      警告

      OpenShift 路由地址由 Kafka 集群名称、侦听器名称、项目名称和路由器的域组成。例如,my-cluster-kafka-external1-bootstrap-my-project.domain.com (<cluster_name>-kafka-<listener_name>-bootstrap-<namespace>.<domain>)。每个 DNS 标签(between period ".")不得超过 63 个字符,地址的总长度不得超过 255 个字符。

  2. Kafka 资源配置应用更改。

    Kafka 集群使用 mTLS 身份验证配置 Kafka 代理监听程序。

    为每个 Kafka 代理 pod 创建一个服务。

    创建一个服务作为与 Kafka 集群连接的 bootstrap 地址

    一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用 nodeport 监听器。

    在 secret <cluster _name>-cluster-ca-cert 中还创建了用于验证 kafka 代理的身份的集群 CA 证书。

    注意

    如果您在使用外部监听程序时需要扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。

  3. Kafka 资源的状态中检索可用于访问 Kafka 集群的 bootstrap 地址。

    oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'
    Copy to Clipboard Toggle word wrap

    例如:

    oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'
    Copy to Clipboard Toggle word wrap

    使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。

15.4.2. 保护用户对 Kafka 的访问

  1. 创建或修改代表需要访问 Kafka 集群的客户端的用户。

    • 指定与 Kafka 侦听器相同的身份验证类型。
    • 指定用于 简单 授权的授权 ACL。

      用户配置示例

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 
      1
      
      spec:
        authentication:
          type: tls 
      2
      
        authorization:
          type: simple
          acls: 
      3
      
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operations:
                - Describe
                - Read
            - resource:
                type: group
                name: my-group
                patternType: literal
              operations:
                - Read
      Copy to Clipboard Toggle word wrap

      1
      标签必须与 Kafka 集群的标签匹配。
      2
      将身份验证指定为 mutual tls
      3
      简单的授权需要附带的 ACL 规则列表应用到用户。规则定义了根据 用户名 (my-user)对 Kafka 资源允许的操作。
  2. KafkaUser 资源配置应用更改。

    创建了用户,以及名称与 KafkaUser 资源相同的 secret。secret 包含 mTLS 身份验证的公钥和私钥。

    使用用户凭证的 secret 示例

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate
      user.crt: <user_certificate> # Public key of the user
      user.key: <user_private_key> # Private key of the user
      user.p12: <store> # PKCS #12 store for user certificates and keys
      user.password: <password_for_store> # Protects the PKCS #12 store
    Copy to Clipboard Toggle word wrap

  3. 从 Kafka 集群的 < cluster_name>-cluster-ca-cert secret 中提取集群 CA 证书。

    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
    Copy to Clipboard Toggle word wrap
  4. 从 < user_name&gt; secret 中提取用户 CA 证书。

    oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
    Copy to Clipboard Toggle word wrap
  5. 从 < user_name&gt; secret 中提取用户的私钥。

    oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
    Copy to Clipboard Toggle word wrap
  6. 使用 bootstrap 地址主机名和端口配置客户端,以连接到 Kafka 集群:

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
    Copy to Clipboard Toggle word wrap
  7. 使用 truststore 凭证配置客户端,以验证 Kafka 集群的身份。

    指定公共集群 CA 证书。

    truststore 配置示例

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
    Copy to Clipboard Toggle word wrap

    SSL 是 mTLS 验证的指定的安全协议。为通过 TLS 进行 SCRAM-SHA-512 验证指定 SASL_SSL。PEM 是信任存储的文件格式。

  8. 使用密钥存储凭证配置客户端,以便在连接到 Kafka 集群时验证用户。

    指定公钥和私钥。

    keystore 配置示例

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
    Copy to Clipboard Toggle word wrap

    将密钥存储证书和私钥直接添加到配置中。以单行格式添加。在 BEGIN CERTIFICATEEND CERTIFICATE 分隔符之间,以换行符(\n)开始。使用 \n 结束原始证书中的每一行。

    keystore 配置示例

    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
    Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat