4.10. 使用基于 OAuth 2.0 令牌的身份验证


AMQ Streams 支持使用 OAUTHBEARERPLAIN 机制使用 OAuth 2.0 身份验证。

OAuth 2.0 启用应用程序之间的基于令牌的标准化身份验证和授权,使用中央授权服务器签发对资源有限访问权限的令牌。

Kafka 代理和客户端都需要配置为使用 OAuth 2.0。您可以配置 OAuth 2.0 身份验证,然后配置 OAuth 2.0 授权

注意

OAuth 2.0 身份验证可与 基于 ACL 的 Kafka 授权 一起使用,无论使用的授权服务器是什么。

使用 OAuth 2.0 身份验证时,应用程序客户端可以访问应用服务器(称为 资源服务器)上的资源,而无需公开帐户凭据。

应用程序客户端通过访问令牌作为身份验证方法传递,应用服务器也可以用来决定要授予的访问权限级别。授权服务器处理访问权限的授予和查询有关访问权限的查询。

在 AMQ Streams 的上下文中:

  • Kafka 代理作为 OAuth 2.0 资源服务器
  • Kafka 客户端充当 OAuth 2.0 应用程序客户端

Kafka 客户端在 Kafka 代理验证。代理和客户端根据需要与 OAuth 2.0 授权服务器通信,以获取或验证访问令牌。

对于 AMQ Streams 的部署,OAuth 2.0 集成提供:

  • Kafka 代理的服务器端 OAuth 2.0 支持
  • Kafka MirrorMaker、Kafka Connect 和 Kafka Bridge 的客户端 OAuth 2.0 支持

RHEL 上的 AMQ Streams 包括两个 OAuth 2.0 库:

kafka-oauth-client
提供名为 io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 的自定义登录回调处理器类。要处理 OAUTHBEARER 身份验证机制,请使用 Apache Kafka 提供的 OAuthBearerLoginModule 的登录回调处理器。
kafka-oauth-common
提供 kafka-oauth-client 库所需的一些功能的帮助程序库。

提供的客户端库还依赖于一些额外的第三方库,例如: keycloak-corejackson-databindslf4j-api

我们建议使用 Maven 项目来打包您的客户端,以确保包含所有依赖项库。依赖项库可能会在以后的版本中有所变化。

为 Kafka 客户端 Java 库提供了一个 OAuth 回调处理程序,因此您不需要为 Java 客户端编写自己的回调处理程序。应用客户端可以使用回调处理程序来提供访问令牌。使用 Go 等其他语言编写的客户端必须使用自定义代码连接到授权服务器并获取访问令牌。

其他资源

4.10.1. OAuth 2.0 身份验证机制

AMQ Streams 支持 OAUTHBEARER 和 PLAIN 机制进行 OAuth 2.0 身份验证。这两种机制都允许 Kafka 客户端与 Kafka 代理建立经过身份验证的会话。客户端、授权服务器和 Kafka 代理之间的身份验证流因每种机制而异。

我们建议您将客户端配置为尽可能使用 OAUTHBEARER。OAUTHBEARER 提供比 PLAIN 更高的安全性,因为客户端凭证不会与 Kafka 代理共享。考虑仅在不支持 OAUTHBEARER 的 Kafka 客户端中使用 PLAIN。

如有必要,可以在相同的 OAuth 身份验证监听程序配置中一起启用 OAUTHBEARER 和 PLAIN。

OAUTHBEARER 概述

Kafka 支持 OAUTHBEARER 身份验证机制,但必须明确配置它。许多 Kafka 客户端工具使用在协议级别为 OAUTHBEARER 提供基本支持的库。

使用 OAUTHBEARER 时,客户端发起带有 Kafka 代理进行凭证交换的会话,其中凭证采用由回调处理器提供的 bearer 令牌的形式。使用回调,您可以以三种方式之一配置令牌置备:

  • 客户端 ID 和机密(使用 OAuth 2.0 客户端凭证机制
  • 一个长期的访问令牌,在配置时手动获取
  • 一个长期的刷新令牌,在配置时手动获取

要使用 OAUTHBEARER,您必须在 Kafka 代理的 OAuth 身份验证监听程序配置中将 sasl.enabled.mechanisms 设置为 OAUTHBEARER。有关详细配置,请参阅 第 4.10.2 节 “OAuth 2.0 Kafka 代理配置”

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
Copy to Clipboard Toggle word wrap
注意

OAUTHBEARER 身份验证只能由支持协议级别的 OAUTHBEARER 机制的 Kafka 客户端使用。

PLAIN 概述

PLAIN 是所有 Kafka 客户端工具支持的简单身份验证机制,包括 kafkacat 等开发人员工具。要启用 PLAIN 以用于 OAuth 2.0 身份验证,RHEL 上的 AMQ Streams 包括服务器端回调。PLAIN 的 AMQ Streams 实现被称为 OAuth 2.0,与 PLAIN 相比

使用 OAuth 2.0 over PLAIN 时,客户端凭证不会存储在 ZooKeeper 中。相反,它们会在兼容授权服务器后集中处理,类似于使用 OAUTHBEARER 身份验证时。

当与 OAuth 2.0 over PLAIN 回调一起使用时,Kafka 客户端使用以下方法之一与 Kafka 代理进行身份验证:

  • 客户端 ID 和 secret (使用 OAuth 2.0 客户端凭证机制
  • 一个长期的访问令牌,在配置时手动获取

必须启用客户端才能使用 PLAIN 身份验证,并提供 用户名和密码 。如果密码的前缀为 $accessToken:,后跟访问令牌的值,则 Kafka 代理会将密码解释为访问令牌。否则,Kafka 代理会将 用户名 解释为客户端 ID,密码 作为客户端 secret。

如果将密码设置为访问令牌,则必须将用户名设置为 Kafka 代理从访问令牌获取的相同的主体名称。此过程取决于如何使用 userNameClaimfallbackUserNameClaimfallbackUsernamePrefixuserInfoEndpointUri 来配置用户名提取。它还取决于您的授权服务器;特别是,它如何将客户端 ID 映射到帐户名称。

您可以在 Kafka 代理的 OAuth 身份验证监听程序配置中启用 PLAIN。为此,请将 PLAIN 添加到 sasl.enabled.mechanisms 的值。

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
Copy to Clipboard Toggle word wrap

有关详细配置,请参阅 第 4.10.2 节 “OAuth 2.0 Kafka 代理配置”

4.10.1.1. 使用属性或变量配置 OAuth 2.0

您可以使用 Java Authentication and Authorization Service(JAAS)属性或环境变量来配置 OAuth 2.0 设置。

  • JAAS 属性在 server.properties 配置文件中配置,并传递为 listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config 属性的键值对。
  • 如果使用环境变量,您仍然需要在 server.properties 文件中提供 listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config 属性,但您可以忽略其他 JAAS 属性。

    您可以使用大写或大写环境变量命名约定。

AMQ Streams OAuth 2.0 库使用以以下内容开头的属性:

4.10.2. OAuth 2.0 Kafka 代理配置

用于 OAuth 2.0 身份验证的 Kafka 代理配置涉及:

  • 在授权服务器中创建 OAuth 2.0 客户端
  • 在 Kafka 集群中配置 OAuth 2.0 身份验证
注意

与授权服务器的关系,Kafka 代理和 Kafka 客户端都被视为 OAuth 2.0 客户端。

4.10.2.1. 授权服务器上的 OAuth 2.0 客户端配置

要配置 Kafka 代理以验证会话启动期间收到的令牌,建议的做法是在授权服务器中创建一个 OAuth 2.0 client 定义(配置为 confidential),并启用了以下客户端凭证:

  • 客户端 ID kafka-broker (例如)
  • 客户端 ID 和 secret 作为身份验证机制
注意

在使用授权服务器的非公共内省端点时,您只需要使用客户端 ID 和 secret。在使用公共授权服务器端点时,通常不需要凭据,如快速本地 JWT 令牌验证一样。

4.10.2.2. Kafka 集群中的 OAuth 2.0 身份验证配置

要在 Kafka 集群中使用 OAuth 2.0 身份验证,您可以在 Kafka server.properties 文件中为 Kafka 集群启用 OAuth 身份验证监听程序配置。至少需要配置。您还可以配置 TLS 侦听器,其中 TLS 用于代理间通信。

您可以使用以下方法之一为授权服务器配置代理以进行令牌验证:

  • 快速本地令牌验证: JWKS 端点与签名的 JWT 格式的访问令牌相结合
  • 内省端点

您可以配置 OAUTHBEARER 或 PLAIN 身份验证,或两者。

以下示例显示了应用 全局 监听器配置的最低配置,这意味着,内部代理间通信通过与应用程序客户端相同的监听程序进行。

这个示例还显示了一个特定监听程序的 OAuth 2.0 配置,在其中您可以指定 listener.name.LISTENER-NAME.sasl.enabled.mechanisms 而不是 sasl.enabled.mechanismsLISTENER-NAME 是监听器的区分大小写的名称。在这里,我们将侦听器的 CLIENT 命名为 listener.name.client.sasl.enabled.mechanisms

这个示例使用 OAUTHBEARER 身份验证。

示例:使用 JWKS 端点进行 OAuth 2.0 身份验证的最小监听程序配置

sasl.enabled.mechanisms=OAUTHBEARER 
1

listeners=CLIENT://0.0.0.0:9092 
2

listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 
3

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 
4

sasl.mechanism.inter.broker.protocol=OAUTHBEARER 
5

inter.broker.listener.name=CLIENT 
6

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 
7

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 
8

  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 
9

  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 
10

  oauth.username.claim="preferred_username"  \ 
11

  oauth.client.id="kafka-broker" \ 
12

  oauth.client.secret="kafka-secret" \ 
13

  oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; 
14

listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 
15

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 
16
Copy to Clipboard Toggle word wrap

1
为通过 SASL 进行凭证交换启用 OAUTHBEARER 机制。
2
配置要连接的客户端应用程序的监听程序。系统的 hostname 用作公告的主机名,客户端必须可以解析该主机名才能重新连接。本例中,侦听器名为 CLIENT
3
指定监听器的频道协议。SASL_SSL 用于 TLS。SASL_PLAINTEXT 用于未加密的连接(无 TLS),但存在丢失和截获 TCP 连接层的风险。
4
CLIENT 侦听器指定 OAUTHBEARER 机制。客户端名称(CLIENT)通常在 listeners 属性中使用大写指定,对于 listener.name 属性是小写 (listener.name.client),当为 listener.name.client.* 属性的一部分时是小写。
5
指定用于代理间通信的 OAUTHBEARER 机制。
6
指定用于代理间通信的监听程序。配置需要有效的规格。
7
在客户端监听器上配置 OAuth 2.0 身份验证。
8
配置客户端和 inter-broker 通信的身份验证设置。oauth.client.idoauth.client.secretauth.token.endpoint.uri 属性与 inter-broker 配置相关。
9
有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
10
JWKS 端点 URL。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
11
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。
12
Kafka 代理的客户端 ID,适用于所有代理。这是在 授权服务器注册为 kafka-broker的客户端
13
Kafka 代理的 secret,适用于所有代理。
14
到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 HTTP。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
15
为代理间通信启用(且只需要)OAuth 2.0 身份验证。
16
(可选)当令牌过期时强制会话到期,并激活 Kafka 重新验证机制。如果指定的值小于访问令牌保留的时间,则客户端必须在实际令牌到期前重新验证。默认情况下,当访问令牌过期时,会话不会过期,客户端也不会尝试重新身份验证。

以下示例显示了 TLS 侦听器的最小配置,其中 TLS 用于代理间通信。

示例:用于 OAuth 2.0 身份验证的 TLS 侦听器配置

listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 
1

listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 
2

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=KEYSTORE-PASSWORD 
3

listener.name.replication.ssl.truststore.password=TRUSTSTORE-PASSWORD
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 
4

listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 
5

listener.name.replication.ssl.keystore.location=PATH-TO-KEYSTORE 
6

listener.name.replication.ssl.truststore.location=PATH-TO-TRUSTSTORE 
7

listener.name.replication.ssl.client.auth=required 
8

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \
  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \
  oauth.username.claim="preferred_username" ; 
9
Copy to Clipboard Toggle word wrap

1
相互代理通信和客户端应用程序需要单独的配置。
2
REPLICATION 侦听器配置为使用 TLS,并将 CLIENT 侦听器配置为通过未加密的通道使用 SASL。客户端可能会在生产环境中使用加密的频道(SASL_SSL)。
3
ssl. 属性定义 TLS 配置。
4
随机数生成器实施。如果没有设置,则使用 Java platform SDK 默认。
5
主机名验证。如果设置为空字符串,则会关闭主机名验证。如果没有设置,则默认值为 HTTPS,它会强制对服务器证书进行主机名验证。
6
侦听器的密钥存储的路径。
7
侦听器的信任存储的路径。
8
指定 REPLICATION 侦听器的客户端必须在建立 TLS 连接时与客户端证书进行身份验证(用于代理连接)。
9
为 OAuth 2.0 配置 CLIENT 侦听器。与授权服务器的连接应使用安全 HTTPS 连接。

以下示例显示了使用 PLAIN 身份验证机制通过 SASL 进行凭证交换的 OAuth 2.0 身份验证的最低配置。使用快速的本地令牌验证。

示例:用于 PLAIN 验证的最小监听程序配置

listeners=CLIENT://0.0.0.0:9092 
1

listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 
2

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN 
3

sasl.mechanism.inter.broker.protocol=OAUTHBEARER 
4

inter.broker.listener.name=CLIENT 
5

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 
6

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 
7

  oauth.valid.issuer.uri="http://AUTH_SERVER/auth/realms/REALM" \ 
8

  oauth.jwks.endpoint.uri="https://AUTH_SERVER/auth/realms/REALM/protocol/openid-connect/certs" \ 
9

  oauth.username.claim="preferred_username"  \ 
10

  oauth.client.id="kafka-broker" \ 
11

  oauth.client.secret="kafka-secret" \ 
12

  oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; 
13

listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 
14

listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 
15

listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 
16

  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 
17

  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 
18

  oauth.username.claim="preferred_username"  \ 
19

  oauth.token.endpoint.uri="http://AUTH_SERVER/auth/realms/REALM/protocol/openid-connect/token" ; 
20

connections.max.reauth.ms=3600000 
21
Copy to Clipboard Toggle word wrap

1
为要连接的客户端应用程序配置监听程序(本例中为 CLIENT )。系统的 hostname 用作公告的主机名,客户端必须可以解析该主机名才能重新连接。由于这是唯一配置的监听程序,它也用于代理间通信。
2
将示例 CLIENT 侦听器配置为通过未加密的频道使用 SASL。在生产环境中,客户端应使用加密频道(SASL_SSL)来保护对 TCP 连接层进行窃取和截获。
3
为通过 SASL 和 OAUTHBEARER 进行凭证交换启用 PLAIN 身份验证机制。OAUTHBEARER 也被指定,因为代理间通信需要它。Kafka 客户端可以选择使用哪些机制进行连接。

所有平台上的所有客户端都支持 PLAIN 身份验证。Kafka 客户端必须启用 PLAIN 机制并设置 用户名 和密码。PLAIN 可以用来使用 OAuth 访问令牌或 OAuth clientIdsecret (客户端凭据)进行身份验证。这个行为还由指定了 oauth.token.endpoint.uri 来控制。

如果指定了 oauth.token.endpoint.uri,并且客户端将密码设置为以字符串 $accessToken: 开头,服务器会将密码解释为帐户用户名。 否则,用户名 被解释为 clientId 和密码 客户端 secret,代理用来在客户端名称中获取访问令牌。

如果没有指定 oauth.token.endpoint.uri密码 总是被解释为访问令牌,用户名 始终被解释为帐户用户名,它必须与从令牌中提取的主体 id 匹配。这被称为 'no-client-credentials' 模式,因为客户端必须始终自行获取访问令牌,且无法使用 clientIdsecret

4
为代理间通信指定 OAUTHBEARER 身份验证机制。
5
为内部代理通信指定监听器(本例中为 CLIENT)。配置需要有效。
6
为 OAUTHBEARER 机制配置服务器回调处理程序。
7
使用 OAUTHBEARER 机制为客户端和内部代理通信配置身份验证设置。oauth.client.idoauth.client.secretoauth.token.endpoint.uri 属性与 inter-broker 配置相关。
8
有效的签发者 URI。只有来自此签发者的访问令牌才被接受。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
9
JWKS 端点 URL。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
10
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是标识用户 的主体。该值取决于身份验证流和使用的授权服务器。
11
Kafka 代理的客户端 ID,适用于所有代理。这是在 授权服务器注册为 kafka-broker的客户端
12
Kafka 代理的 secret (所有代理都相同)。
13
到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 HTTPS。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
14
为代理间通信启用 OAuth 2.0 身份验证。
15
PLAIN 身份验证配置服务器回调处理程序。
16
使用 PLAIN 身份验证为客户端通信配置身份验证设置。

oauth.token.endpoint.uri 是一个可选属性,它使用 OAuth 2.0 客户端凭证机制启用 OAuth 2.0 over PLAIN。

17
有效的签发者 URI。只有来自此签发者的访问令牌才被接受。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
18
JWKS 端点 URL。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
19
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是标识用户 的主体。该值取决于身份验证流和使用的授权服务器。
20
到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 HTTP。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token

其他 PLAIN 机制配置,允许客户端通过传递 clientIdsecret 作为用户名和密码 进行验证,如点 3 所述。如果没有指定,客户端只能通过将访问令牌作为 password 参数通过 PLAIN 进行身份验证。

21
(可选)当令牌过期时强制会话到期,并激活 Kafka 重新验证机制。如果指定的值小于访问令牌保留的时间,则客户端必须在实际令牌到期前重新验证。默认情况下,当访问令牌过期时,会话不会过期,客户端也不会尝试重新身份验证。

4.10.2.3. 快速的本地 JWT 令牌验证配置

快速本地 JWT 令牌验证会在本地检查 JWT 令牌签名。

本地检查可确保令牌:

  • 使用一个访问令牌的 Bearer 的(typ)声明值符合类型
  • 有效(未过期)
  • 具有与 validIssuerURI 匹配的签发者

在配置监听程序时 指定有效的签发者 URI,以便任何未由授权服务器发布的令牌都被拒绝。

授权服务器不需要在快速本地 JWT 令牌验证过程中联系。您可以通过指定由 OAuth 2.0 授权服务器公开的 JWKs 端点 URI 来激活快速本地 JWT 令牌验证。端点包含验证已签名的 JWT 令牌的公钥,这些令牌由 Kafka 客户端作为凭证发送。

注意

所有与授权服务器通信都应使用 HTTPS 执行。

对于 TLS 侦听器,您可以配置证书信任存储并指向信任存储文件。

快速本地 JWT 令牌验证的属性示例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 
1

  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 
2

  oauth.jwks.refresh.seconds="300" \ 
3

  oauth.jwks.refresh.min.pause.seconds="1" \ 
4

  oauth.jwks.expiry.seconds="360" \ 
5

  oauth.username.claim="preferred_username" \ 
6

  oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 
7

  oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 
8

  oauth.ssl.truststore.type="PKCS12" ; 
9
Copy to Clipboard Toggle word wrap

1
有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
2
JWKS 端点 URL。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
3
端点刷新之间的周期(默认为 300)。
4
连续尝试刷新 JWKS 公钥之间的最小暂停(以秒为单位)。当遇到未知签名密钥时,JWKS 密钥刷新在常规定期调度后调度,且至少在最后一次刷新尝试后出现指定暂停。刷新密钥遵循 exponential backoff 规则,重试不成功刷新,且持续增加暂停,直到它到达 oauth.jwks.refresh.seconds。默认值为 1。
5
JWKs 证书在证书过期前被视为有效。默认为 360 秒。如果您指定了较长的时间,请考虑允许访问撤销的证书的风险。
6
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。
7
TLS 配置中使用的信任存储的位置。
8
访问信任存储的密码。
9
PKCS #12 格式的 truststore 类型。

4.10.2.4. OAuth 2.0 内省端点配置

使用 OAuth 2.0 内省端点进行令牌验证会将接收的访问令牌视为不透明。Kafka 代理向内省端点发送访问令牌,该端点使用验证所需的令牌信息做出响应。最重要的是,如果特定访问令牌有效,它会返回最新的信息,以及令牌何时过期的信息。

要配置基于 OAuth 2.0 内省的验证,您可以指定一个 内省端点 URI,而不是为快速本地 JWT 令牌验证指定的 JWKs 端点 URI。根据授权服务器,通常必须指定 client IDclient secret,因为内省端点通常受到保护。

内省端点的属性示例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/introspection" \ 
1

  oauth.client.id="kafka-broker" \ 
2

  oauth.client.secret="kafka-broker-secret" \ 
3

  oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 
4

  oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 
5

  oauth.ssl.truststore.type="PKCS12" \ 
6

  oauth.username.claim="preferred_username" ; 
7
Copy to Clipboard Toggle word wrap

1
OAuth 2.0 内省端点 URI。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token/introspect
2
Kafka 代理的客户端 ID。
3
Kafka 代理的 secret。
4
TLS 配置中使用的信任存储的位置。
5
访问信任存储的密码。
6
PKCS #12 格式的 truststore 类型。
7
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体oauth.username.claim 的值取决于使用的授权服务器。

4.10.3. Kafka 代理的会话重新身份验证

您可以将 OAuth 侦听程序配置为使用 Kafka 会话在 Kafka 客户端和 Kafka 代理之间对 OAuth 2.0 会话进行重新身份验证。这个机制在定义的时间后强制实施客户端和代理之间经过身份验证的会话的过期。当会话过期时,客户端会立即通过重复使用现有连接而不是丢弃它来启动新的会话。

会话重新身份验证默认为禁用。您可以在 server.properties 文件中启用它。为启用了 OAUTHBEARER 或 PLAIN 的 TLS 侦听器设置 connections.max.reauth.ms 属性作为 SASL 机制。

您可以指定每个监听器的会话重新身份验证。例如:

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
Copy to Clipboard Toggle word wrap

会话重新身份验证必须由客户端使用的 Kafka 客户端库支持。

会话重新身份验证可用于快速 本地 JWT内省端点 令牌验证。

客户端重新身份验证

当代理的经过身份验证的会话过期时,客户端必须通过向代理发送一个新的有效访问令牌来重新验证到现有会话,而无需丢弃连接。

如果令牌验证成功,则使用现有连接启动新的客户端会话。如果客户端无法重新验证,代理会在进一步尝试发送或接收消息时关闭连接。如果代理上启用了重新身份验证机制,则使用 Kafka 客户端库 2.2 或更高版本的 Java 客户端会自动重新验证。

如果使用,会话重新身份验证也适用于刷新令牌。当会话过期时,客户端会使用其刷新令牌来刷新访问令牌。然后,客户端使用新的访问令牌来重新验证现有连接。

OAUTHBEARER 和 PLAIN 的会话到期

配置会话重新身份验证后,会话到期对于 OAUTHBEARER 和 PLAIN 身份验证是不同的。

对于 OAUTHBEARER 和 PLAIN,使用 客户端 ID 和 secret 方法:

  • 代理的身份验证会话将在配置的 connections.max.reauth.ms 过期。
  • 如果访问令牌在配置的时间前过期,会话将提前过期。

对于 PLAIN,使用 长期访问令牌 方法:

  • 代理的身份验证会话将在配置的 connections.max.reauth.ms 过期。
  • 如果访问令牌在配置的时间前过期,则重新身份验证将失败。虽然尝试尝试会话重新身份验证,但 PLAIN 没有刷新令牌的机制。

如果没有配置 connection.max.reauth.ms,OAUTHBEARER 和 PLAIN 客户端可以无限期地连接到代理,而无需重新验证。经过身份验证的会话不会因为访问令牌到期而终止。但是,这可在配置授权时考虑,例如,使用 keycloak 授权或安装自定义授权器。

4.10.4. OAuth 2.0 Kafka 客户端配置

Kafka 客户端被配置为:

  • 与授权服务器进行身份验证所需的凭证(客户端 ID 和 Secret),以获取有效的访问令牌
  • 使用授权服务器提供的工具获取有效的长期 访问令牌或 刷新令牌

发送到 Kafka 代理的唯一信息是访问令牌。用于与授权服务器进行身份验证的凭证 永远不会 发送到代理。当客户端获取访问令牌时,不需要进一步与授权服务器通信。

最简单的机制是使用客户端 ID 和 Secret 进行身份验证。使用长期的访问令牌或长期的刷新令牌会增加复杂性,因为对授权服务器工具还有额外的依赖。

注意

如果您使用长期的访问令牌,您可能需要在授权服务器中配置客户端来提高令牌的最大生命周期。

如果 Kafka 客户端没有直接配置访问令牌,客户端会在 Kafka 会话发起授权服务器的过程中交换访问令牌的凭证。Kafka 客户端交换:

  • 客户端 ID 和 Secret
  • 客户端 ID、刷新令牌和(可选)Secret

4.10.5. OAuth 2.0 客户端身份验证流

在本节中,我们在 Kafka 会话启动过程中解释和视觉化 Kafka 客户端、Kafka 代理和授权服务器之间的通信流。流取决于客户端和服务器配置。

当 Kafka 客户端将访问令牌作为凭证发送到 Kafka 代理时,需要验证令牌。

根据所使用的授权服务器,以及可用选项,您可能需要使用:

  • 根据 JWT 签名检查和本地令牌内省快速本地令牌验证,而无需联系授权服务器
  • 授权服务器提供的 OAuth 2.0 内省端点

使用快速本地令牌验证需要授权服务器为 JWKS 端点提供用于验证令牌签名的公共证书。

另一种选择是在授权服务器上使用 OAuth 2.0 内省端点。每次建立新的 Kafka 代理连接时,代理会将从客户端接收的访问令牌传递给授权服务器,并检查响应以确认令牌是否有效。

也可以为以下内容配置 Kafka 客户端凭证:

  • 使用之前生成的长期访问令牌直接进行本地访问
  • 与授权服务器联系以获取要发布的新访问令牌
注意

授权服务器可能只允许使用不透明访问令牌,这意味着无法进行本地令牌验证。

4.10.5.1. 客户端身份验证流示例

您可以在 Kafka 会话身份验证过程中看到与 Kafka 客户端和代理的不同配置的通信流。

使用客户端 ID 和 secret 的客户端以及代理委派到授权服务器的验证

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka 客户端使用客户端 ID 和 secret 从授权服务器请求访问令牌,以及可选的刷新令牌。
  2. 授权服务器生成新的访问令牌。
  3. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递访问令牌。
  4. Kafka 代理通过使用自己的客户端 ID 和 secret,在授权服务器上调用令牌内省端点来验证访问令牌。
  5. 如果令牌有效,则会建立 Kafka 客户端会话。

使用客户端 ID 和 secret 的客户端,代理执行快速本地令牌验证

Client using client ID and secret with broker performing fast local token validation

  1. Kafka 客户端使用令牌端点(使用客户端 ID 和 secret)以及刷新令牌(可选)通过授权服务器进行身份验证。
  2. 授权服务器生成新的访问令牌。
  3. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递访问令牌。
  4. Kafka 代理使用 JWT 令牌签名检查和本地令牌内省验证访问令牌。

使用长期访问令牌的客户端,带有代理委派验证到授权服务器

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递长期访问令牌。
  2. Kafka 代理通过使用自己的客户端 ID 和 secret,在授权服务器上调用令牌内省端点来验证访问令牌。
  3. 如果令牌有效,则会建立 Kafka 客户端会话。

使用长期访问令牌的客户端,代理执行快速本地验证

Client using long-lived access token with broker performing fast local validation

  1. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递长期访问令牌。
  2. Kafka 代理使用 JWT 令牌签名检查和本地令牌内省验证访问令牌。
警告

快速的本地 JWT 令牌签名验证仅适用于短期的令牌,因为如果已撤销令牌,就不会通过授权服务器检查该授权服务器。令牌到期时间写入到令牌,但可以随时进行撤销,因此不能在不联系授权服务器的情况下被考虑。任何发布的令牌都将被视为有效,直到过期为止。

4.10.6. 配置 OAuth 2.0 身份验证

OAuth 2.0 用于在 Kafka 客户端和 AMQ Streams 组件间交互。

要将 OAuth 2.0 用于 AMQ Streams,您必须:

这个步骤描述了如何将 Red Hat Single Sign-On 部署为授权服务器,并配置它以与 AMQ Streams 集成。

授权服务器为身份验证和授权提供了一个中央点,以及用户、客户端和权限的管理。Red Hat Single Sign-On 具有域概念,其中 realm 代表一组独立的用户、客户端、权限和其他配置。您可以使用默认 master 域,或创建新域。每个 realm 会公开自己的 OAuth 2.0 端点,这意味着应用程序客户端和应用服务器都需要使用相同的域。

要将 OAuth 2.0 与 AMQ Streams 搭配使用,您需要部署授权服务器才能创建和管理身份验证域。

注意

如果您已经部署了 Red Hat Single Sign-On,您可以跳过部署步骤并使用您的当前部署。

开始前

您将需要熟悉使用 Red Hat Single Sign-On。

有关安装和管理说明,请参阅:

先决条件

  • AMQ Streams 和 Kafka 正在运行

对于 Red Hat Single Sign-On 部署:

流程

  1. 安装 Red Hat Single Sign-On。

    您可以从 ZIP 文件或使用 RPM 安装。

  2. 登录到 Red Hat Single Sign-On Admin 控制台,为 AMQ Streams 创建 OAuth 2.0 策略。

    部署 Red Hat Single Sign-On 时会提供登录详情。

  3. 创建并启用 realm。

    您可以使用现有的 master 域。

  4. 如果需要,调整域的会话和令牌超时。
  5. 创建名为 kafka-broker 的客户端。
  6. Settings 选项卡中设置:

    • 访问类型机密
    • Standard Flow EnabledOFF 为这个客户端禁用 Web 登录
    • Service Accounts EnabledON,允许此客户端在其自己的名称中进行身份验证
  7. 在继续操作前,点 Save
  8. Credentials 选项卡中,记录使用 AMQ Streams Kafka 集群配置的 secret。
  9. 对将连接到 Kafka 代理的任何应用程序客户端重复客户端创建步骤。

    为每个新客户端创建一个定义。

    在配置中,您将使用名称作为客户端 ID。

接下来要做什么

部署和配置授权服务器后,将 Kafka 代理配置为使用 OAuth 2.0

4.10.6.2. 为 Kafka 代理配置 OAuth 2.0 支持

此流程描述了如何配置 Kafka 代理,以便代理监听程序可以使用授权服务器使用 OAuth 2.0 身份验证。

我们建议通过配置 TLS 侦听程序在加密接口中使用 OAuth 2.0。不建议使用 plain 监听程序。

使用支持您选择的授权服务器的属性配置 Kafka 代理,以及您要实现的授权类型。

开始前

有关 Kafka 代理监听程序的配置和验证的更多信息,请参阅:

有关监听器配置中使用的属性的描述,请参阅:

先决条件

  • AMQ Streams 和 Kafka 正在运行
  • 部署 OAuth 2.0 授权服务器

流程

  1. server.properties 文件中配置 Kafka 代理监听程序配置。

    例如,使用 OAUTHBEARER 机制:

    sasl.enabled.mechanisms=OAUTHBEARER
    listeners=CLIENT://0.0.0.0:9092
    listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
    listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
    inter.broker.listener.name=CLIENT
    listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
    listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    Copy to Clipboard Toggle word wrap
  2. 将代理连接设置配置为 listener.name.client.oauthbearer.sasl.jaas.config 的一部分。

    此处的示例显示连接配置选项。

    示例 1:使用 JWKS 端点配置进行本地令牌验证

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME" \
      oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs" \
      oauth.jwks.refresh.seconds="300" \
      oauth.jwks.refresh.min.pause.seconds="1" \
      oauth.jwks.expiry.seconds="360" \
      oauth.username.claim="preferred_username" \
      oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \
      oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \
      oauth.ssl.truststore.type="PKCS12" ;
    listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
    Copy to Clipboard Toggle word wrap

    示例 2:通过 OAuth 2.0 内省端点将令牌验证委托给授权服务器

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/introspection" \
      # ...
    Copy to Clipboard Toggle word wrap

  3. 如果需要,配置对授权服务器的访问。

    生产环境通常需要这一步,除非使用 service mesh 等技术来配置容器外的安全频道。

    1. 提供用于连接安全授权服务器的自定义信任存储。对授权服务器的访问始终需要 SSL。

      设置属性来配置信任存储。

      例如:

      listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        # ...
        oauth.client.id="kafka-broker" \
        oauth.client.secret="kafka-broker-secret" \
        oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \
        oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \
        oauth.ssl.truststore.type="PKCS12" ;
      Copy to Clipboard Toggle word wrap
    2. 如果证书主机名与访问 URL 主机名不匹配,您可以关闭证书主机名验证:

      oauth.ssl.endpoint.identification.algorithm=""
      Copy to Clipboard Toggle word wrap

      检查可确保与授权服务器的连接是真实的。您可能想要在非生产环境中关闭验证。

  4. 根据您选择的身份验证流配置附加属性。

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" \ 
    1
    
      oauth.custom.claim.check="@.custom == 'custom-value'" \ 
    2
    
      oauth.scope="SCOPE" \ 
    3
    
      oauth.check.audience="true" \ 
    4
    
      oauth.audience="AUDIENCE" \ 
    5
    
      oauth.valid.issuer.uri="https://https://AUTH-SERVER-ADDRESS/auth/REALM-NAME" \ 
    6
    
      oauth.client.id="kafka-broker" \ 
    7
    
      oauth.client.secret="kafka-broker-secret" \ 
    8
    
      oauth.refresh.token="REFRESH-TOKEN-FOR-KAFKA-BROKERS" \ 
    9
    
      oauth.access.token="ACCESS-TOKEN-FOR-KAFKA-BROKERS" ; 
    10
    Copy to Clipboard Toggle word wrap
    1
    到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 HTTP。当使用 KeycloakRBACAuthorizer 或启用了 OAuth 2.0 的监听程序时,需要用于 Inter-broker 间通信。
    2
    (可选) 自定义声明检查。JsonPath 过滤器查询,用于在验证期间将其他自定义规则应用到 JWT 访问令牌。如果访问令牌不包含必要的数据,它将被拒绝。使用 introspection 端点方法时,自定义检查将应用到内省端点响应 JSON。
    3
    (可选)传递给令牌端点 的范围 参数。获取访问令牌进行代理身份验证时使用的 scope。它还在使用 clientIdsecret 的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会影响监听器的令牌验证规则。
    4
    (可选)对象 检查。如果您的授权服务器提供 aud (audience)声明,并且希望强制进行受众检查,请将 ouath.check.audience 设置为 true。Audience 检查标识令牌的预期接收者。因此,Kafka 代理将拒绝在其 aud 声明中没有 clientId 的令牌。默认为 false
    5
    (可选)传递给令牌端点的 audience 参数。在获取用于代理身份验证的访问令牌时,需要使用 audience。它还在使用 clientIdsecret 的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会影响监听器的令牌验证规则。
    6
    有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。(始终需要。)
    7
    Kafka 代理配置的客户端 ID,适用于所有代理。这是在 授权服务器注册为 kafka-broker的客户端。当内省端点用于令牌验证时,或使用 KeycloakRBACAuthorizer 时是必需的。
    8
    Kafka 代理配置的 secret,适用于所有代理。当代理必须向授权服务器进行身份验证时,必须指定客户端 secret、访问令牌或刷新令牌。
    9
    (可选) Kafka 代理的长期刷新令牌。
    10
    (可选) Kafka 代理的长期访问令牌。
  5. 根据您如何应用 OAuth 2.0 身份验证以及所使用的授权服务器类型,添加额外的配置设置:

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.check.issuer=false \ 
    1
    
      oauth.fallback.username.claim="CLIENT-ID" \ 
    2
    
      oauth.fallback.username.prefix="CLIENT-ACCOUNT" \ 
    3
    
      oauth.valid.token.type="bearer" \ 
    4
    
      oauth.userinfo.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/userinfo" ; 
    5
    Copy to Clipboard Toggle word wrap
    1
    如果您的授权服务器不提供 iss 声明,则无法执行签发者检查。在这种情况下,将 oauth.check.issuer 设置为 false,且不指定 oauth.valid.issuer.uri。默认为 true
    2
    授权服务器可能无法提供单个属性来识别常规用户和客户端。当客户端以自己的名称进行身份验证时,服务器可能会提供 客户端 ID。当用户使用用户名和密码进行身份验证时,若要获取刷新令牌或访问令牌,除了客户端 ID 外,服务器可能会提供一个 username 属性。如果主用户 ID 属性不可用,则使用这个 fallback 选项指定要使用的用户名声明(attribute)。
    3
    oauth.fallback.username.claim 适用的情况下,可能还需要防止用户名声明的值与回退用户名声明之间的名称冲突。请考虑存在名为 producer 的客户端存在的情况,但也存在名为 producer 的常规用户。为了区分这两者,您可以使用此属性向客户端的用户 ID 添加前缀。
    4
    (仅在使用 oauth.introspection.endpoint.uri)取决于您使用的授权服务器,内省端点可能会或不返回 令牌类型 属性,或者可以包含不同的值。您可以指定来自内省端点的响应必须包含有效的令牌类型值。
    5
    (仅在使用 oauth.introspection.endpoint.uri)时,可以配置或实施授权服务器,以便在内省端点响应中提供任何可识别的信息。要获取用户 ID,您可以将 userinfo 端点的 URI 配置为回退。oauth.fallback.username.claimoauth.fallback.username.claimoauth.fallback.username.prefix 设置应用到 userinfo 端点的响应。

4.10.6.3. 将 Kafka Java 客户端配置为使用 OAuth 2.0

这个步骤描述了如何配置 Kafka producer 和消费者 API,以使用 OAuth 2.0 与 Kafka 代理交互。

将客户端回调插件添加到 pom.xml 文件中,并配置系统属性。

先决条件

  • AMQ Streams 和 Kafka 正在运行
  • 部署和配置 OAuth 2.0 授权服务器以 OAuth 访问 Kafka 代理
  • 为 OAuth 2.0 配置 Kafka 代理

流程

  1. 将支持 OAuth 2.0 的客户端库添加到 Kafka 客户端的 pom.xml 文件中:

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.9.0.redhat-00001</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 为回调配置系统属性:

    例如:

    System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token”); 
    1
    
    System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "CLIENT-NAME"); 
    2
    
    System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "CLIENT_SECRET"); 
    3
    
    System.setProperty(ClientConfig.OAUTH_SCOPE, "SCOPE-VALUE") 
    4
    Copy to Clipboard Toggle word wrap
    1
    授权服务器令牌端点的 URI。
    2
    客户端 ID,这是在授权服务器中创建客户端时使用的名称。
    3
    在授权服务器中创建客户端时创建的 客户端 secret。
    4
    (可选)从令牌端点请求令牌的范围。授权服务器可能需要客户端来指定范围。
  3. 在 Kafka 客户端配置中的 TLS 加密连接上启用 OAUTHBEARER OR PLAIN 机制。

    例如:

    为 Kafka 客户端启用 OAUTHBEARER

    props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "OAUTHBEARER");
    props.put("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
    Copy to Clipboard Toggle word wrap

    为 Kafka 客户端启用 PLAIN

    props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$CLIENT_ID_OR_ACCOUNT_NAME\" password=\"$SECRET_OR_ACCESS_TOKEN\" ;");
    props.put("security.protocol", "SASL_SSL"); 
    1
    
    props.put("sasl.mechanism", "PLAIN");
    Copy to Clipboard Toggle word wrap

    1
    在这里,我们使用 SASL_SSL 进行 TLS 连接。仅对本地开发使用 SASL_PLAINTEXT
  4. 验证 Kafka 客户端是否可以访问 Kafka 代理。
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat