13.4. 使用监听程序设置对 Kafka 集群的客户端访问


使用 Kafka 集群的地址,您可以提供对同一 OpenShift 集群中的客户端的访问权限;或者,提供对不同 OpenShift 命名空间或 OpenShift 外部客户端的外部访问权限。此流程演示了如何配置从 OpenShift 外部或从另一个 OpenShift 集群配置对 Kafka 集群的客户端访问。

Kafka 侦听器提供对 Kafka 集群的访问。客户端访问是通过以下配置进行保护的:

  1. 为 Kafka 集群配置了外部监听程序,并启用了 TLS 加密和 mTLS 身份验证,并启用了 Kafka 简单 授权。
  2. 为客户端创建一个 KafkaUser,带有 mTLS 身份验证,并为 简单 授权定义访问控制列表(ACL)。

您可以将监听程序配置为使用 mutual tlsscram-sha-512oauth 身份验证。mTLS 始终使用加密,但在使用 SCRAM-SHA-512 和 OAuth 2.0 身份验证时也建议使用加密。

您可以为 Kafka 代理配置简单oauthopa自定义授权。启用后,授权将应用到所有已启用的监听程序。

当您配置 KafkaUser 身份验证和授权机制时,请确保它们与对应的 Kafka 配置匹配:

  • KafkaUser.spec.authentication 匹配 Kafka.spec.kafka.listeners[*].authentication
  • KafkaUser.spec.authorization 匹配 Kafka.spec.kafka.authorization

您应该至少有一个监听程序支持您要用于 KafkaUser 的身份验证。

注意

Kafka 用户和 Kafka 代理之间的身份验证取决于每个代理的身份验证设置。例如,如果在 Kafka 配置中也未启用,则无法验证带有 mTLS 的用户。

AMQ Streams Operator 自动配置过程并创建身份验证所需的证书:

  • Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便使用 Kafka 集群启用身份验证。
  • User Operator 根据所选的验证类型,创建代表客户端的用户以及用于客户端身份验证的安全凭证。

您可以将证书添加到客户端配置中。

在此过程中,会使用 Cluster Operator 生成的 CA 证书,但 您可以通过安装自己的证书 来替换它们。您还可以将侦听器 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书

证书以 PEM (.crt) 和 PKCS #12 (.p12) 格式提供。此流程使用 PEM 证书。使用带有 X.509 格式的证书的 PEM 证书。

注意

对于同一 OpenShift 集群和命名空间中的内部客户端,您可以在 Pod 规格中挂载集群 CA 证书。如需更多信息,请参阅配置内部客户端以信任集群 CA

先决条件

  • Kafka 集群可供在 OpenShift 集群外部运行的客户端进行连接
  • Cluster Operator 和 User Operator 在集群中运行

流程

  1. 使用 Kafka 侦听器配置 Kafka 集群。

    • 定义通过监听程序访问 Kafka 代理所需的身份验证。
    • 在 Kafka 代理上启用授权。

      侦听器配置示例

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 
      1
      
          - name: external1 
      2
      
            port: 9094 
      3
      
            type: <listener_type> 
      4
      
            tls: true 
      5
      
            authentication:
              type: tls 
      6
      
            configuration: 
      7
      
              #...
          authorization: 
      8
      
            type: simple
            superUsers:
              - super-user-name 
      9
      
        # ...
      Copy to Clipboard Toggle word wrap

      1
      Generic Kafka listener schema reference 中描述了启用外部监听程序的配置选项。
      2
      用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
      3
      Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高,但端口 9404 和 9999 除外,它们已经用于 Prometheus 和 JMX。根据监听器类型,端口号可能与连接 Kafka 客户端的端口号不同。
      4
      外部监听器类型指定为 route (只限 OpenShift), loadbalancer, nodeportingress (只限 Kubernetes)。内部监听程序指定为 internalcluster-ip
      5
      必需。侦听器上的 TLS 加密。对于 routeingress 类型监听程序,必须设置为 true。对于 mTLS 身份验证,也使用 authentication 属性。
      6
      侦听器上的客户端身份验证机制。对于使用 mTLS 的服务器和客户端身份验证,您可以指定 tls: trueauthentication.type: tls
      7
      (可选)根据监听器类型的要求,您可以指定额外的 监听程序配置
      8
      授权指定为 simple,它使用 AclAuthorizerStandardAuthorizer Kafka 插件。
      9
      (可选)无论 ACL 中定义的任何访问限制,Super 用户可以访问所有代理。
      警告

      OpenShift 路由地址由 Kafka 集群名称、侦听器名称、项目名称和路由器的域组成。例如,my-cluster-kafka-external1-bootstrap-my-project.domain.com (<cluster_name>-kafka-<listener_name>-bootstrap-<namespace>.<domain>)。每个 DNS 标签(between period ".")不得超过 63 个字符,地址的总长度不得超过 255 个字符。

  2. 创建或更新 Kafka 资源。

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

    Kafka 集群使用 mTLS 身份验证配置 Kafka 代理监听程序。

    为每个 Kafka 代理 pod 创建一个服务。

    创建一个服务作为与 Kafka 集群连接的 bootstrap 地址

    一个服务也作为外部 bootstrap 地址为到 Kafka 集群的外部连接创建,使用 nodeport 监听器。

    在 secret <cluster _name> -cluster-ca-cert 中还创建了用于验证 kafka 代理的身份的集群 CA 证书。

    注意

    如果您在使用外部监听程序时需要扩展 Kafka 集群,可能会触发所有 Kafka 代理的滚动更新。这取决于配置。

  3. Kafka 资源的状态中检索可用于访问 Kafka 集群的 bootstrap 地址。

    oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'
    Copy to Clipboard Toggle word wrap

    例如:

    oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'
    Copy to Clipboard Toggle word wrap

    使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。

  4. 创建或修改代表需要访问 Kafka 集群的客户端的用户。

    • 指定与 Kafka 侦听器相同的身份验证类型。
    • 指定用于 简单 授权的授权 ACL。

      用户配置示例

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 
      1
      
      spec:
        authentication:
          type: tls 
      2
      
        authorization:
          type: simple
          acls: 
      3
      
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operations:
                - Describe
                - Read
            - resource:
                type: group
                name: my-group
                patternType: literal
              operations:
                - Read
      Copy to Clipboard Toggle word wrap

      1
      标签必须与 Kafka 集群的标签匹配。
      2
      将身份验证指定为 mutual tls
      3
      简单的授权需要附带的 ACL 规则列表应用到用户。规则定义了根据 用户名 (my-user)对 Kafka 资源允许的操作。
  5. 创建或修改 KafkaUser 资源。

    oc apply -f USER-CONFIG-FILE
    Copy to Clipboard Toggle word wrap

    创建了用户,以及名称与 KafkaUser 资源相同的 secret。secret 包含 mTLS 身份验证的公钥和私钥。

    secret 示例

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: <public_key> # Public key of the clients CA
      user.crt: <user_certificate> # Public key of the user
      user.key: <user_private_key> # Private key of the user
      user.p12: <store> # PKCS #12 store for user certificates and keys
      user.password: <password_for_store> # Protects the PKCS #12 store
    Copy to Clipboard Toggle word wrap

  6. 从 Kafka 集群的 &lt ;cluster-ca-cert secret 中提取集群 CA 证书。

    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
    Copy to Clipboard Toggle word wrap
  7. 从 < user_name&gt; secret 中提取用户 CA 证书。

    oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
    Copy to Clipboard Toggle word wrap
  8. 从 < user_name&gt; secret 中提取用户的私钥。

    oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
    Copy to Clipboard Toggle word wrap
  9. 使用 bootstrap 地址主机名和端口配置客户端,以连接到 Kafka 集群:

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
    Copy to Clipboard Toggle word wrap
  10. 使用 truststore 凭证配置客户端,以验证 Kafka 集群的身份。

    指定公共集群 CA 证书。

    truststore 配置示例

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
    Copy to Clipboard Toggle word wrap

    SSL 是 mTLS 验证的指定的安全协议。为通过 TLS 进行 SCRAM-SHA-512 验证指定 SASL_SSL。PEM 是信任存储的文件格式。

  11. 使用密钥存储凭证配置客户端,以便在连接到 Kafka 集群时验证用户。

    指定公钥和私钥。

    keystore 配置示例

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
    Copy to Clipboard Toggle word wrap

    将密钥存储证书和私钥直接添加到配置中。以单行格式添加。在 BEGIN CERTIFICATEEND CERTIFICATE 分隔符之间,以换行符(\n)开始。使用 \n 结束原始证书中的每一行。

    keystore 配置示例

    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
    Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat