5.2. 设置客户端以进行安全访问


在 Kafka 代理上设置监听程序来支持安全连接后,下一步是将客户端应用程序配置为使用这些监听程序与 Kafka 集群通信。这包括根据监听器上配置的安全机制,为每个客户端提供适当的安全设置来使用集群进行身份验证。

5.2.1. 配置安全协议

配置客户端应用程序用来与 Kafka 代理监听程序中配置的协议匹配的安全协议。例如,使用 SSL (安全套接字层)进行 TLS 身份验证,或 SASL_SSL 用于 SASL (通过 SSL 进行简单身份验证和安全层)通过 TLS 加密进行身份验证。将 truststore 和 keystore 添加到支持访问 Kafka 集群所需的身份验证机制的客户端配置中。

truststore
truststore 包含用来验证 Kafka 代理真实性的可信证书颁发机构(CA)的公共证书。当客户端连接到安全 Kafka 代理时,可能需要验证代理的身份。
keystore
密钥存储包含客户端的私钥及其公共证书。当客户端希望向代理验证自身时,它会提供自己的证书。

如果使用 TLS 身份验证,您的 Kafka 客户端配置需要一个信任存储和密钥存储来连接到 Kafka 集群。如果您使用 SASL SCRAM-SHA-512,则通过用户名和密码凭证交换来执行身份验证,而不是数字证书,因此不需要密钥存储。SCRAM-SHA-512 是一个更加轻量级的机制,但它不像使用基于证书的身份验证一样安全。

注意

如果您有自己的证书基础架构并使用第三方 CA 的证书,则客户端的默认信任存储可能已经包含公共 CA 证书,您不需要将它们添加到客户端的信任存储中。如果客户端由默认信任存储中包含的公共 CA 证书签名,则客户端会自动信任服务器证书。

您可以创建一个 config.properties 文件来指定客户端应用程序使用的身份验证凭据。

在以下示例中,security.protocol 设置为 SSL,以在客户端和代理之间启用 TLS 身份验证和加密。

ssl.truststore.locationssl.truststore.password 属性指定信任存储的位置和密码。ssl.keystore.locationssl.keystore.password 属性指定密钥存储的位置和密码。

使用 PKCSsetuptools (Public-Key Cryptography Standards obtain)文件格式。您还可以使用 base64 编码的 PEM (Privacy Enhanced Mail)格式。

TLS 身份验证的客户端配置属性示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SSL
ssl.truststore.location = /path/to/ca.p12
ssl.truststore.password = truststore-password
ssl.keystore.location = /path/to/user.p12
ssl.keystore.password = keystore-password
client.id = my-client
Copy to Clipboard Toggle word wrap

在以下示例中,security.protocol 设置为 SASL_SSL,以在客户端和服务器间启用带有 TLS 加密的 SASL 身份验证。如果您只需要身份验证而不是加密,您可以使用 SASL 协议。用于身份验证的指定 SASL 机制是 SCRAM-SHA-512。可以使用不同的验证机制。sasl.jaas.config 属性指定身份验证凭据。

SCRAM-SHA-512 验证的客户端配置属性示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
  username = "user" \
  password = "secret";
ssl.truststore.location = path/to/truststore.p12
ssl.truststore.password = truststore_password
ssl.truststore.type = PKCS12
client.id = my-client
Copy to Clipboard Toggle word wrap

注意

对于不支持 PEM 格式的应用程序,您可以使用 OpenSSL 等工具将 PEM 文件转换为 PKCSGalaxy 格式。

5.2.2. 配置允许的 TLS 版本和密码套件

您可以纳入 SSL 配置和密码套件,以进一步保护客户端应用程序和 Kafka 集群之间的基于 TLS 的通信。在 Kafka 代理配置中指定支持的 TLS 版本和密码套件。如果要限制其使用的 TLS 版本和密码套件,您还可以将配置添加到您的客户端。客户端上的配置应该只使用在代理上启用的协议和密码套件。

在以下示例中,对于 Kafka 代理和客户端应用程序之间的通信,使用 security.protocol 启用 SSL。您可以将密码套件指定为用逗号分开的列表。ssl.cipher.suites 属性是 允许客户端使用的密码套件的逗号分隔列表。

Kafka 代理的 SSL 配置属性示例

security.protocol: "SSL"
ssl.enabled.protocols: "TLSv1.3", "TLSv1.2"
ssl.protocol: "TLSv1.3"
ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"
Copy to Clipboard Toggle word wrap

ssl.enabled.protocols 属性指定可用于保护集群及其客户端之间的通信的可用 TLS 版本。在这种情况下,TLSv1.3TLSv1.2 都已启用。ssl.protocol 属性为所有连接设置默认 TLS 版本,且必须从启用的协议中选择。默认情况下,客户端使用 TLSv1.3 进行通信。如果客户端只支持 TLSv1.2,它仍然可以连接到代理并使用该支持的版本进行通信。同样,如果配置位于客户端,并且代理只支持 TLSv1.2,客户端将使用支持的版本。

Apache Kafka 支持的密码套件取决于您使用的 Kafka 版本和底层环境。检查提供最高级别的安全的最新支持的密码套件。

5.2.3. 使用访问控制列表(ACL)

您不必为客户端应用程序中的 ACL 明确配置任何内容。Kafka 代理在服务器端强制 ACL。当客户端向服务器发送请求以生成或消耗数据时,服务器会检查 ACL,以确定客户端(用户)是否授权执行请求的操作。如果客户端被授权,则处理请求;否则,请求将被拒绝并返回错误。但是,客户端仍必须经过身份验证并使用适当的安全协议来启用与 Kafka 集群的安全连接。

如果您在 Kafka 代理上使用访问控制列表(ACL),请确保正确设置 ACL 来限制客户端对要控制的主题和操作的访问。如果您使用 Open Policy Agent (OPA)策略来管理访问,在策略中配置了授权规则,因此您不需要针对 Kafka 代理指定 ACL。OAuth 2.0 提供了一些灵活性:您可以使用 OAuth 2.0 供应商来管理 ACL;或使用 OAuth 2.0 和 Kafka 的简单 授权来管理 ACL。

注意

ACL 适用于大多数类型的请求,不限于生成和消耗操作。例如,可以将 ACLS 应用到读操作,如描述主题或写入操作,如创建新主题。

5.2.4. 使用 OAuth 2.0 进行基于令牌的访问

使用 OAuth 2.0 开放标准进行 AMQ Streams 授权,通过 OAuth 2.0 供应商强制授权控制。OAuth 2.0 为应用程序提供了一种安全的方法来访问存储在其他系统中的用户数据。授权服务器可能会向客户端应用程序发出访问令牌来授予对 Kafka 集群的访问权限。

以下步骤描述了设置并使用 OAuth 2.0 进行令牌验证的一般方法:

  1. 使用代理和客户端凭证配置授权服务器,如客户端 ID 和 secret。
  2. 从授权服务器获取 OAuth 2.0 凭据。
  3. 使用 OAuth 2.0 凭证在 Kafka 代理中配置监听程序,并与授权服务器交互。
  4. 将 Oauth 2.0 依赖项添加到客户端库中。
  5. 使用 OAuth 2.0 凭证配置 Kafka 客户端,并与授权服务器交互。
  6. 在运行时获取访问令牌,它使用 OAuth 2.0 供应商验证客户端。

如果您在 Kafka 代理上为 OAuth 2.0 配置监听程序,您可以将客户端应用程序设置为使用 OAuth 2.0。除了访问 Kafka 集群的标准 Kafka 客户端配置外,还必须包含 OAuth 2.0 身份验证的特定配置。您还必须确保您使用的授权服务器可以被 Kafka 集群和客户端应用程序访问。

指定 SASL (Simple Authentication and Security Layer)安全协议和机制。在生产环境中,建议使用以下设置:

  • TLS 加密连接的 SASL_SSL 协议。
  • 用于使用 bearer 令牌交换的凭证的 OAUTHBEARER 机制

JAAS (Java 身份验证和授权服务)模块实施 SASL 机制。机制的配置取决于您使用的身份验证方法。例如,使用凭证交换,您可以添加 OAuth 2.0 访问令牌端点、访问令牌、客户端 ID 和客户端 secret。客户端连接到授权服务器的令牌端点(URL),以检查令牌是否仍然有效。您还需要一个信任存储,其中包含授权服务器用于经过身份验证的用户访问的公钥证书。

OAauth 2.0 的客户端配置属性示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = OAUTHBEARER
# ...
sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \
    oauth.access.token = <access_token> \
    oauth.client.id = "<client_id>" \
    oauth.client.secret = "<client_secret>" \
    oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \
    oauth.ssl.truststore.password = "<truststore_password>" \
    oauth.ssl.truststore.type = "PKCS12" \
Copy to Clipboard Toggle word wrap

有关将代理设置为使用 OAuth 2.0 的更多信息,请参阅以下指南:

5.2.5. 使用 Open Policy Agent (OPA)访问策略

使用带有 AMQ Streams 的 Open Policy Agent (OPA)策略代理来评估请求,以针对访问策略连接到您的 Kafka 集群。开放策略代理(OPA)是一个策略引擎,用于管理授权策略。策略集中访问控制,可以动态更新,无需对客户端应用程序进行更改。例如,您可以创建一个策略,仅允许特定用户(客户端)生成和使用消息到特定主题。

AMQ Streams 为 Kafka 授权使用 Open Policy Agent 插件作为授权器。

以下步骤描述了设置和使用 OPA 的一般方法:

  1. 设置 OPA 服务器实例。
  2. 定义提供管理 Kafka 集群的访问的授权规则的策略。
  3. 为 Kafka 代理创建配置,以接受 OPA 授权并与 OPA 服务器交互。
  4. 配置 Kafka 客户端,为授权访问 Kafka 集群提供凭证。

如果您在 Kafka 代理上为 OPA 配置了一个监听程序,您可以将客户端应用程序设置为使用 OPA。在监听器配置中,您可以指定一个 URL 来连接到 OPA 服务器并授权您的客户端应用程序。除了用于访问 Kafka 集群的标准 Kafka 客户端配置外,还必须添加凭证来与 Kafka 代理进行身份验证。代理通过向 OPA 服务器发送请求来评估授权策略,来检查客户端是否有必要的授权来执行请求的操作。您不需要信任存储或密钥存储来保护通信,因为策略引擎强制执行授权策略。

OPA 授权的客户端配置属性示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
  username = "user" \
  password = "secret";
# ...
Copy to Clipboard Toggle word wrap

注意

红帽不支持 OPA 服务器。

有关设置代理以使用 OPA 的更多信息,请参阅以下指南:

5.2.6. 在流传输消息时使用事务

通过在代理和制作者客户端应用中配置事务属性,您可以确保在单个事务中处理消息。事务为消息流增加了可靠性和一致性。

在代理上总是启用事务。您可以使用以下属性更改默认配置:

事务的 Kafka 代理配置属性示例

transaction.state.log.replication.factor = 3
transaction.state.log.min.isr = 2
transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000
Copy to Clipboard Toggle word wrap

这是生产环境的典型配置,它为内部 __transaction_state 主题创建 3 个副本。\__transaction_state 主题存储有关事务的信息。事务日志至少需要 2 个同步副本。清理间隔是检查超时事务和清理对应的事务日志之间的时间。

要为客户端配置添加事务属性,您可以为生产者和消费者设置以下属性:

事务的制作者客户端配置属性示例

transactional.id = unique-transactional-id
enable.idempotence = true
max.in.flight.requests.per.connection = 5
acks = all
retries=2147483647
transaction.timeout.ms = 30000
delivery.timeout = 25000
Copy to Clipboard Toggle word wrap

事务 ID 允许 Kafka 代理跟踪事务。它是制作者的唯一标识符,应当与一组特定分区一起使用。如果您需要为多组分区执行事务,您需要为每个组使用不同的事务 ID。启用 idempotence,以避免生成者实例创建重复消息。使用 idempotence 时,消息使用制作者 ID 和序列号进行跟踪。当代理收到消息时,它会检查制作者 ID 和序列号。如果已收到具有相同制作者 ID 和序列号的消息,代理会丢弃重复消息。

动态请求的最大数量被设置为 5,以便根据发送的顺序处理事务。分区最多可有 5 个动态请求,而不会影响消息的排序。

通过将 acks 设置为 all,生成者会等待来自主题分区的所有同步副本的确认,然后再将事务视为完成。这样可确保信息被大量写入(提交)到 Kafka 集群,并在代理失败时不会丢失它们。

事务超时指定客户端在恢复前完成事务的最长时间。交付超时指定制作者在超时前等待代理确认的最大时间。为确保消息在事务周期内发送,请将交付超时设置为小于事务超时。在指定重新发送失败的消息请求 的重试次数 时,请考虑网络延迟和消息吞吐量,并允许临时故障。

事务的消费者客户端配置属性示例

group.id = my-group-id
isolation.level = read_committed
enable.auto.commit = false
Copy to Clipboard Toggle word wrap

read_committed 隔离级别指定消费者只读取成功完成的事务的消息。消费者不会处理作为持续或失败事务一部分的任何消息。这样可确保消费者只读取属于完整事务一部分的消息。

在使用事务流消息时,务必要将 enable.auto.commit 设置为 false。如果设置为 true,则消费者会定期提交偏移,而无需考虑事务。这意味着消费者可以在事务完成前提交消息。通过将 enable.auto.commit 设置为 false,使用者只读取和提交已完全写入并提交的消息作为事务的一部分。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat