8.2. Kafka 客户端的安全选项


使用 KafkaUser 资源为 Kafka 客户端配置身份验证机制、授权机制和访问权限。在配置安全性方面,客户端以用户表示。

您可以验证并授权用户访问 Kafka 代理。身份验证允许访问,授权限制对允许的操作的访问权限。

您还可以创建对 Kafka 代理不受限制访问权限的 超级用户

身份验证和授权机制必须与 用于访问 Kafka 代理的监听程序规格 匹配。

有关配置 KafkaUser 资源以安全访问 Kafka 代理的更多信息,请参阅 第 7.3 节 “使用监听程序设置对 Kafka 集群的客户端访问”

8.2.1. 识别用于用户处理的 Kafka 集群

KafkaUser 资源包含一个标签,用于定义它所属的 Kafka 集群的名称(从 Kafka 资源的名称中获得)。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
Copy to Clipboard Toggle word wrap

用户 Operator 使用该标签来识别 KafkaUser 资源并创建新用户,以及后续处理用户。

如果标签与 Kafka 集群不匹配,则 User Operator 无法识别 KafkaUser,且不会创建用户。

如果 KafkaUser 资源的状态为空,请检查您的标签。

8.2.2. 用户身份验证

使用 KafkaUser 自定义资源为需要访问 Kafka 集群的用户(客户端)配置身份验证凭证。使用 KafkaUser.spec 中的 authentication 属性配置凭据。通过指定 类型,您可以控制生成的凭据。

支持的验证类型:

  • TLS 用于 mTLS 身份验证
  • 使用外部证书的 mTLS 身份验证的 tls-external
  • SCRAM-sha-512 用于 SCRAM-SHA-512 验证

如果指定了 tlsscram-sha-512,User Operator 会在创建用户时创建身份验证凭据。如果指定了 tls-external,用户仍然使用 mTLS,但不会创建身份验证凭证。当您提供自己的证书时使用这个选项。如果没有指定身份验证类型,User Operator 不会创建用户或其凭证。

您可以使用 tls-external 来使用 User Operator 外部发布的证书与 mTLS 进行身份验证。User Operator 不会生成 TLS 证书或 secret。您仍然可以通过 User Operator 管理 ACL 规则和配额,方式与使用 tls 机制时相同。这意味着,您在指定 ACL 规则和配额时使用 CN=USER-NAME 格式。USER-NAME 是 TLS 证书中给出的通用名称。

8.2.2.1. mTLS 身份验证

要使用 mTLS 身份验证,请将 KafkaUser 资源中的 type 字段设置为 tls

启用 mTLS 身份验证的用户示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  # ...
Copy to Clipboard Toggle word wrap

身份验证类型必须与用于访问 Kafka 集群的 Kafka 侦听器的等效配置匹配。

当用户由 User Operator 创建时,它会创建一个新的 secret,其名称与 KafkaUser 资源的名称相同。secret 包含 mTLS 的私钥和公钥。公钥包含在用户证书中,该证书在创建时由客户端 CA (证书授权)签名。所有密钥都是 X.509 格式。

注意

如果您使用 Cluster Operator 生成的客户端 CA,则当 Cluster Operator 更新客户端 CA 时,用户 Operator 生成的用户证书也会续订。

用户 secret 以 PEM 和 PKCS #12 的形式提供密钥和证书

使用用户凭证的 secret 示例

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  ca.crt: <public_key> # Public key of the clients CA
  user.crt: <user_certificate> # Public key of the user
  user.key: <user_private_key> # Private key of the user
  user.p12: <store> # PKCS #12 store for user certificates and keys
  user.password: <password_for_store> # Protects the PKCS #12 store
Copy to Clipboard Toggle word wrap

配置客户端时,您可以指定以下内容:

  • 公共集群 CA 证书的 Truststore 属性用于验证 Kafka 集群的身份
  • 用户身份验证凭证的 Keystore 属性用于验证客户端

配置取决于文件格式 (PEM 或 PKCS #12)。这个示例使用 PKCS #12 格式存储,以及访问存储中凭证所需的密码。

以 PKCS #12 格式使用 mTLS 的客户端配置示例

bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 
1

security.protocol=SSL 
2

ssl.truststore.location=/tmp/ca.p12 
3

ssl.truststore.password=<truststore_password> 
4

ssl.keystore.location=/tmp/user.p12 
5

ssl.keystore.password=<keystore_password> 
6
Copy to Clipboard Toggle word wrap

1
连接到 Kafka 集群的 bootstrap 服务器地址。
2
使用 TLS 加密时的安全协议选项。
3
truststore 位置包含了 Kafka 集群的公钥证书 (ca.p12)。在创建 Kafka 集群时,Cluster Operator 会在 < cluster_name>-cluster-ca-cert secret 中生成集群 CA 证书和密钥。
4
用于访问信任存储的密码(ca.password)。
5
密钥存储位置包含 Kafka 用户的公钥证书(user.p12)。
6
用于访问密钥存储的密码(user.password)。

要使用 User Operator 外部发布的证书使用 mTLS 身份验证,您需要将 KafkaUser 资源中的 type 字段设置为 tls-external。不会为用户创建 secret 和凭证。

具有 mTLS 身份验证的用户示例,它使用 User Operator 外部发布的证书

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls-external
  # ...
Copy to Clipboard Toggle word wrap

8.2.2.3. SCRAM-SHA-512 身份验证

要使用 SCRAM-SHA-512 身份验证机制,您需要将 KafkaUser 资源中的 type 字段设置为 scram-sha-512

启用 SCRAM-SHA-512 验证的用户示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  # ...
Copy to Clipboard Toggle word wrap

当用户由 User Operator 创建时,它会创建一个新的 secret,其名称与 KafkaUser 资源的名称相同。secret 包含在 password 键中生成的密码,该密钥使用 base64 编码。要使用密码,必须对其进行解码。

使用用户凭证的 secret 示例

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  password: Z2VuZXJhdGVkcGFzc3dvcmQ= 
1

  sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK 
2
Copy to Clipboard Toggle word wrap

1
生成的密码 base64 编码。
2
SASL SCRAM-SHA-512 身份验证的 JAAS 配置字符串,base64 编码。

解码生成的密码:

echo "Z2VuZXJhdGVkcGFzc3dvcmQ=" | base64 --decode
Copy to Clipboard Toggle word wrap
8.2.2.3.1. 自定义密码配置

创建用户时,AMQ Streams 会生成一个随机密码。您可以使用自己的密码而不是 AMQ Streams 生成的密码。要做到这一点,使用密码创建一个 secret,并在 KafkaUser 资源中引用它。

为 SCRAM-SHA-512 验证设定密码的用户示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
    password:
      valueFrom:
        secretKeyRef:
          name: my-secret 
1

          key: my-password 
2

  # ...
Copy to Clipboard Toggle word wrap

1
包含预定义密码的 secret 名称。
2
存储在机密中的密码的密钥。

8.2.3. 用户授权

使用 KafkaUser 自定义资源为用户(客户端)配置需要访问 Kafka 集群的授权规则。使用 KafkaUser.spec 中的 authorization 属性配置规则。通过指定 类型,您可以控制使用哪些规则。

要使用简单的授权,您可以在 KafkaUser.spec.authorization 中将 type 属性设置为 simple。简单的授权使用 Kafka Admin API 管理 Kafka 集群中的 ACL 规则。User Operator 中的 ACL 管理是启用的,还是取决于您的 Kafka 集群中的授权配置。

  • 对于简单的授权,总是启用 ACL 管理。
  • 对于 OPA 授权,始终禁用 ACL 管理。授权规则在 OPA 服务器中配置。
  • 对于 Red Hat Single Sign-On 授权,您可以在 Red Hat Single Sign-On 中直接管理 ACL 规则。您还可以将授权委派给简单授权器作为配置中的回退选项。当启用到简单授权器时,用户 Operator 也会启用对 ACL 规则的管理。
  • 要使用自定义授权插件进行自定义授权,请使用 Kafka 自定义资源的 .spec.kafka.authorization 配置中的 supportsAdminApi 属性来启用或禁用支持。

授权是集群范围的。授权类型必须与 Kafka 自定义资源中的等效配置匹配。

如果没有启用 ACL 管理,AMQ Streams 会在包含任何 ACL 规则时拒绝资源。

如果您使用 User Operator 的独立部署,则默认启用 ACL 管理。您可以使用 STRIMZI_ACLS_ADMIN_API_SUPPORTED 环境变量来禁用它。

如果没有指定授权,User Operator 不会为用户置备任何访问权限。此类 KafkaUser 是否仍然可以访问资源,这取决于所使用的授权者。例如,对于 AclAuthorizer,这由 allow.everyone.if.no.acl.found 配置决定。

8.2.3.1. ACL 规则

AclAuthorizer 使用 ACL 规则来管理对 Kafka 代理的访问。

ACL 规则授予您在 acls 属性中指定的用户访问权限。

如需有关 AclRule 对象的更多信息,请参阅 AclRule 模式参考

8.2.3.2. 超级用户对 Kafka 代理的访问

如果用户被添加到 Kafka 代理配置中的超级用户列表中,无论 KafkaUser 中 ACL 中定义的任何授权限制,用户都可以不受限制地访问集群。

有关配置超级用户对代理的访问权限的更多信息,请参阅 Kafka 授权

8.2.3.3. 用户配额

您可以为 KafkaUser 资源配置 spec 来强制实施配额,以便用户不会超过配置对 Kafka 代理的访问级别。您可以设置基于大小的网络使用量以及基于时间的 CPU 使用率阈值。您还可以添加分区变异配额来控制用户请求接受更改分区的请求的速度。

带有用户配额的 KafkaUser 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  # ...
  quotas:
    producerByteRate: 1048576 
1

    consumerByteRate: 2097152 
2

    requestPercentage: 55 
3

    controllerMutationRate: 10 
4
Copy to Clipboard Toggle word wrap

1
用户可以推送到 Kafka 代理的数据量的字节/秒配额
2
用户可以从 Kafka 代理获取的数据量的字节/秒配额
3
CPU 使用率限值作为客户端组的时间百分比
4
允许每秒的并发分区创建和删除操作数(变异)

有关这些属性的更多信息,请参阅 KafkaUserQuotas 模式参考

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat