7.2. 使用基于 OAuth 2.0 令牌的身份验证
Apache Kafka 的流支持使用 OAuth 2.0 进行基于令牌的身份验证。OAuth 2.0 授权服务器处理访问权限的授予和查询有关访问权限的查询。Kafka 客户端在 Kafka 代理验证。代理和客户端根据需要与授权服务器通信,以获取或验证访问令牌。
对于 Apache Kafka 部署 Streams,OAuth 2.0 集成提供以下支持:
- Kafka 代理的服务器端 OAuth 2.0 身份验证
- Kafka MirrorMaker、Kafka Connect 和 Kafka Bridge 的客户端 OAuth 2.0 身份验证
RHEL 上的 Apache Kafka Streams 包括两个 OAuth 2.0 库:
kafka-oauth-client-
提供名为
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler的自定义登录回调处理器类。要处理OAUTHBEARER身份验证机制,请使用 Apache Kafka 提供的OAuthBearerLoginModule的登录回调处理器。 kafka-oauth-common-
提供
kafka-oauth-client库所需的一些功能的帮助程序库。
提供的客户端库还依赖于一些额外的第三方库,例如: keycloak-core、jackson-databind 和 slf4j-api。
我们建议使用 Maven 项目来打包您的客户端,以确保包含所有依赖项库。依赖项库可能会在以后的版本中有所变化。
7.2.1. 在监听程序中配置 OAuth 2.0 身份验证 复制链接链接已复制到粘贴板!
要使用 OAuth 2.0 身份验证保护 Kafka 代理,请将 Kafka 侦听器配置为使用 OAUth 2.0 身份验证和 Kafka server.properties 文件中的客户端身份验证机制,并根据身份验证中使用的身份验证机制和令牌验证类型添加更多配置。
至少需要配置。您还可以配置 TLS 侦听器,其中 TLS 用于代理间通信。我们建议使用 OAuth 2.0 身份验证以及 TLS 加密。如果没有加密,则通过令牌 aft 对网络进行窃取和未授权的访问会受到攻击。
当您将身份验证类型定义为 OAuth 2.0 时,您可以根据验证类型添加配置,可以是 fast local JWT validation 或 token validation using an introspection endpoint。
启用 SASL 身份验证机制
以下一个或多个 SASL 机制用于客户端来交换凭证并与 Kafka 建立经过身份验证的会话。
OAUTHBEARER使用
OAUTHBEARER身份验证机制,凭证交换使用 OAuth 回调处理程序提供的 bearer 令牌。令牌置备可以配置为使用以下方法:- 客户端 ID 和机密(使用 OAuth 2.0 客户端凭据机制)
- 客户端 ID 和客户端断言
- 长期访问令牌
- 手动获取的长期刷新令牌
建议
OAUTHBEARER提供比PLAIN更高的安全性,尽管只能供支持协议级别的OAUTHBEARER机制的 Kafka 客户端使用。客户端凭证永远不会与 Kafka 共享。PLAINPLAIN是所有 Kafka 客户端工具使用的简单身份验证机制。考虑仅在不支持OAUTHBEARER的 Kafka 客户端中使用PLAIN。使用PLAIN身份验证机制,可以将凭证交换配置为使用以下方法之一:- 客户端 ID 和机密(使用 OAuth 2.0 客户端凭据机制)
-
long-lived 访问令牌
重复使用的方法,客户端必须为 Kafka属性。提供用户名和密码
凭证在兼容的授权服务器后集中处理,类似于如何使用
OAUTHBEARER身份验证。用户名提取过程取决于授权服务器配置。
OAUTHBEARER 机制的监听程序配置示例
sasl.enabled.mechanisms=OAUTHBEARER
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
# ...
- 1
- 为通过 SASL 进行凭证交换启用
OAUTHBEARER机制。 - 2
- 配置要连接的客户端应用程序的监听程序。系统的
hostname用作公告的主机名,客户端必须可以解析该主机名才能重新连接。本例中,侦听器名为CLIENT。 - 3
- 指定监听器的频道协议。
SASL_SSL用于 TLS。SASL_PLAINTEXT用于未加密的连接(无 TLS),但存在丢失和截获 TCP 连接层的风险。 - 4
- 为 CLIENT 侦听器指定
OAUTHBEARER机制。客户端名称(CLIENT)通常在listeners属性中使用大写指定,对于listener.name属性是小写 (listener.name.client),当为listener.name.client.*属性的一部分时是小写。 - 5
- 指定用于代理间通信的
OAUTHBEARER机制。 - 6
- 指定用于代理间通信的监听程序。配置需要有效的规格。
- 7
- 在客户端监听器上配置 OAuth 2.0 身份验证。
使用属性或变量配置 OAuth 2.0
使用 Java 身份验证和授权服务(JAAS)属性或环境变量配置 OAuth 2.0 设置。
-
JAAS 属性在
server.properties配置文件中配置,并作为 listener.name.<listener_name>.oauthbearer.sasl.jaas.config属性的键值对传递。 如果使用环境变量,您仍然需要在
server.properties文件中提供listener.name.<listener_name>.oauthbearer.sasl.jaas.config属性,但您可以省略其他 JAAS 属性。您可以使用大写或大写环境变量命名约定。
Apache Kafka OAuth 2.0 库的 Streams 使用以以下开头的属性:
-
OAuth.用于配置身份验证 -
strimzi.到 configure OAuth 2.0 authorization
配置快速本地 JWT 令牌验证
快速本地 JWT 令牌验证涉及在本地检查 JWT 令牌签名,以确保令牌满足以下条件:
-
包含
Bearer的typ(类型)或token_type标头声明值,以指示它是一个访问令牌 - 当前有效且未过期
-
具有与
validIssuerURI匹配的签发者
在配置监听器时,您可以指定 validIssuerURI 属性,以便任何没有由授权服务器发布的令牌都会被拒绝。
授权服务器不需要在快速本地 JWT 令牌验证过程中联系。您可以通过指定 jwksEndpointUri 属性来激活快速本地 JWT 令牌验证,即 OAuth 2.0 授权服务器公开的端点。端点包含验证已签名的 JWT 令牌的公钥,这些令牌由 Kafka 客户端作为凭证发送。
与授权服务器的所有通信都应使用 TLS 加密来执行。您可以配置证书信任存储并指向信任存储文件。
您可能需要配置 userNameClaim,以从 JWT 令牌正确提取用户名。如果需要,您可以使用 JsonPath 表达式,如 "['user.info'].['user.id']",从令牌中的嵌套 JSON 属性检索用户名。
如果要使用 Kafka ACL 授权,请在身份验证过程中根据用户名来识别用户。( JWT 令牌中的 子 声明通常是唯一 ID,而不是用户名。)
快速本地 JWT 令牌验证配置示例
# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.jwks.refresh.seconds="300" \
oauth.jwks.refresh.min.pause.seconds="1" \
oauth.jwks.expiry.seconds="360" \
oauth.username.claim="preferred_username" \
oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
oauth.ssl.truststore.password="<truststore_password>" \
oauth.ssl.truststore.type="PKCS12" ;
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
- 1
- 为 OAuth 2.0 配置 CLIENT 侦听器。与授权服务器的连接应使用安全 HTTPS 连接。
- 2
- 有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。(始终需要。)
- 3
- JWKS 端点 URL。
- 4
- 端点刷新之间的周期(默认为 300)。
- 5
- 连续尝试刷新 JWKS 公钥之间的最小暂停(以秒为单位)。当遇到未知签名密钥时,JWKS 密钥刷新在常规定期调度后调度,且至少在最后一次刷新尝试后出现指定暂停。刷新密钥遵循 exponential backoff 规则,重试不成功刷新,且持续增加暂停,直到它到达
oauth.jwks.refresh.seconds。默认值为 1。 - 6
- JWKs 证书在证书过期前被视为有效。默认为
360秒。如果您指定了较长的时间,请考虑允许访问撤销的证书的风险。 - 7
- 在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。如果需要,您可以使用 JsonPath 表达式,如
"['user.info'].['user.id']",从令牌中的嵌套 JSON 属性检索用户名。 - 8
- TLS 配置中使用的信任存储的位置。
- 9
- 访问信任存储的密码。
- 10
- PKCS #12 格式的 truststore 类型。
- 11
- (可选)当令牌过期时强制会话到期,并激活 Kafka 重新验证机制。如果指定的值小于访问令牌保留的时间,则客户端必须在实际令牌到期前重新验证。默认情况下,当访问令牌过期时,会话不会过期,客户端也不会尝试重新身份验证。
使用内省端点配置令牌验证
使用 OAuth 2.0 内省端点进行令牌验证会将接收的访问令牌视为不透明。Kafka 代理向内省端点发送访问令牌,该端点使用验证所需的令牌信息做出响应。最重要的是,如果特定访问令牌有效,它会返回最新的信息,以及令牌何时过期的信息。
要配置基于 OAuth 2.0 内省的验证,您可以指定一个 内省端点 URI,而不是为快速本地 JWT 令牌验证指定的 JWKs 端点 URI。根据授权服务器,通常必须指定 client ID 和 client secret,因为内省端点通常受到保护。
使用内省端点的令牌验证配置示例
# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.introspection.endpoint.uri="https://<oauth_server_address>/<introspection_endpoint>" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-broker-secret" \
oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
oauth.ssl.truststore.password="<truststore_password>" \
oauth.ssl.truststore.type="PKCS12" \
oauth.username.claim="preferred_username" ;
向授权服务器受保护的端点验证代理
通常,授权服务器的证书端点(oauth.jwks.endpoint.uri)可以公开访问,而内省端点(oauth.introspection.endpoint.uri)则受到保护。但是,这可能因授权服务器配置而异。
Kafka 代理可以使用 HTTP 身份验证方案以两种方式之一向授权服务器受保护的端点进行身份验证:
- HTTP 基本身份验证 使用客户端 ID 和 secret。
- HTTP Bearer 身份验证 使用 bearer 令牌。
要配置 HTTP 基本身份验证,请设置以下属性:
-
oauth.client.id -
oauth.client.secret
对于 HTTP Bearer 身份验证,请设置以下属性之一:
-
oauth.server.bearer.token.location,用于指定包含 bearer 令牌的磁盘上的文件路径。 -
oauth.server.bearer.token,以明文形式指定 bearer 令牌。
包括其他配置选项
根据身份验证要求和您使用的授权服务器指定其他设置。其中一些属性仅适用于某些身份验证机制,或者在与其他属性结合使用时。
例如,当使用 OAUth over PLAIN 时,访问令牌作为带有或没有 $accessToken: 前缀的 密码 属性值传递。
-
如果您在监听器配置中配置令牌端点(
oauth.token.endpoint.uri),则需要前缀。 - 如果您没有在监听器配置中配置令牌端点,则不需要前缀。Kafka 代理将密码解释为原始访问令牌。
如果将密码设置为访问令牌,则必须将用户名设置为 Kafka 代理从访问令牌获取的相同的主体名称。您可以使用 oauth.username.claim,oauth.username.prefix,oauth.fallback.username.claim,oauth.fallback.username.prefix, 和 oauth.userinfo.endpoint.uri 属性在监听器中指定用户名提取选项。用户名提取过程还取决于您的授权服务器;特别是,它将客户端 ID 映射到帐户名称。
PLAIN 机制不支持密码授权身份验证。使用客户端凭证(客户端 ID + secret)或访问令牌进行身份验证。
其他配置设置示例
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
# ...
oauth.token.endpoint.uri="https://<auth_server_address>/<path_to_token_endpoint>" \
oauth.custom.claim.check="@.custom == 'custom-value'" \
oauth.scope="<scope>" \
oauth.check.audience="true" \
oauth.audience="<audience>" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-broker-secret" \
oauth.connect.timeout.seconds=60 \
oauth.read.timeout.seconds=60 \
oauth.http.retries=2 \
oauth.http.retry.pause.millis=300 \
oauth.groups.claim="$.groups" \
oauth.groups.claim.delimiter="," \
oauth.include.accept.header="false" ;
oauth.check.issuer=false \
oauth.username.prefix="user-account-" \
oauth.fallback.username.claim="client_id" \
oauth.fallback.username.prefix="service-account-" \
oauth.valid.token.type="bearer" \
oauth.userinfo.endpoint.uri="https://<auth_server_address>/<path_to_userinfo_endpoint>" ;
- 1
- 到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用
https://urls。当使用KeycloakAuthorizer或启用了 OAuth 2.0 的监听程序时,需要用于代理间通信。 - 2
- (可选) 自定义声明检查。JsonPath 过滤器查询,用于在验证期间将其他自定义规则应用到 JWT 访问令牌。如果访问令牌不包含必要的数据,它将被拒绝。使用 introspection 端点方法时,自定义检查将应用到内省端点响应 JSON。
- 3
- (可选)传递给令牌端点
的范围参数。获取访问令牌进行代理身份验证时使用的 scope。它还在使用clientId和secret的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会影响监听器的令牌验证规则。 - 4
- (可选)对象 检查。如果您的授权服务器提供
aud(audience)声明,并且希望强制进行受众检查,请将ouath.check.audience设置为true。Audience 检查标识令牌的预期接收者。因此,Kafka 代理将拒绝在其aud声明中没有clientId的令牌。默认为false。 - 5
- (可选)传递给令牌端点的
audience参数。在获取用于代理身份验证的访问令牌时,需要使用 audience。它还在使用clientId和secret的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会影响监听器的令牌验证规则。 - 6
- Kafka 代理配置的客户端 ID,适用于所有代理。这是在 授权服务器注册为
kafka-broker的客户端。当内省端点用于令牌验证时,或使用KeycloakAuthorizer时是必需的。 - 7
- Kafka 代理配置的 secret,适用于所有代理。当代理必须向授权服务器进行身份验证时,必须指定客户端 secret、访问令牌或刷新令牌。
- 8
- (可选)连接到授权服务器时的连接超时(以秒为单位)。默认值为 60。
- 9
- (可选)连接到授权服务器时读取超时(以秒为单位)。默认值为 60。
- 10
- 将失败的 HTTP 请求重试到授权服务器的次数上限。默认值为 0,表示不会执行重试。要有效地使用这个选项,请考虑减少
oauth.connect.timeout.seconds和oauth.read.timeout.seconds选项的超时时间。但请注意,重试可能会阻止当前 worker 线程可用于其他请求,如果太多请求停滞,则可能会导致 Kafka 代理无响应。 - 11
- 尝试另一个到授权服务器的 HTTP 请求重试前等待的时间。默认情况下,这个时间被设置为零,这意味着不会应用暂停。这是因为导致失败请求的许多问题是针对每个请求的网络粘合或代理问题,可以快速解决。但是,如果您的授权服务器处于压力下或高流量,您可能希望将此选项设置为值 100 ms 或更多,以减少服务器上的负载,并增加成功重试的可能性。
- 12
- JsonPath 查询,用于从 JWT 令牌或内省端点响应中提取组信息。默认不设置。这可供自定义授权器用于根据用户组做出授权决策。
- 13
- 当以单一分隔的字符串返回时,用于解析组信息的分隔符。默认值为 ','(comma)。
- 14
- (可选)将
oauth.include.accept.header设置为false,以从请求中删除Accept标头。如果包含标头在与授权服务器通信时导致问题,您可以使用此设置。 - 15
- 如果您的授权服务器不提供
iss声明,则无法执行签发者检查。在这种情况下,将oauth.check.issuer设置为false,且不指定oauth.valid.issuer.uri。默认为true。 - 16
- 构造用户 ID 时使用的前缀。这只有在配置了
oauth.username.claim时生效。 - 17
- 授权服务器可能无法提供单个属性来识别常规用户和客户端。当客户端以自己的名称进行身份验证时,服务器可能会提供 客户端 ID 属性。当用户使用用户名和密码进行身份验证时,若要获取刷新令牌或访问令牌,除了客户端 ID 外,服务器可能会提供一个 username 属性。如果主用户 ID 属性不可用,则使用这个 fallback 选项指定要使用的用户名声明(attribute)。如果需要,您可以使用 JsonPath 表达式,如
"['client.info'].['client.id']",从令牌中的嵌套 JSON 属性检索回退用户名。 - 18
- 当
oauth.fallback.username.claim适用的情况下,可能还需要防止用户名声明的值与回退用户名声明之间的名称冲突。请考虑存在名为producer的客户端存在的情况,但也存在名为producer的常规用户。为了区分这两者,您可以使用此属性向客户端的用户 ID 添加前缀。 - 19
- (仅在使用
oauth.introspection.endpoint.uri)取决于您使用的授权服务器,内省端点可能会或不返回 令牌类型 属性,或者可以包含不同的值。您可以指定来自内省端点的响应必须包含有效的令牌类型值。 - 20
- (仅在使用
oauth.introspection.endpoint.uri)时,可以配置或实施授权服务器,以便在内省端点响应中提供任何可识别的信息。要获取用户 ID,您可以将userinfo端点的 URI 配置为回退。oauth.username.claim,oauth.username.prefix,oauth.fallback.username.claim, 和oauth.fallback.username.prefix设置也会应用到userinfo端点的响应。
为代理间通信配置监听程序
以下示例将 OAUTHBEARER 机制用于最小配置中的快速令牌验证,其中 inter-broker 通信通过与应用程序客户端相同的监听程序进行。
oauth.client.id、oauth.client.secret 和 auth.token.endpoint.uri 属性与 inter-broker 通信相关。
使用 OAUTHBEARER 机制的 inter-broker 配置示例
sasl.enabled.mechanisms=OAUTHBEARER
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-secret" \
oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
以下示例显示了用于内部代理通信的 TLS 侦听器的最低配置。
使用 TLS 的 inter-broker 配置配置示例
sasl.enabled.mechanisms=OAUTHBEARER
listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092
listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=<keystore_password>
listener.name.replication.ssl.truststore.password=<truststore_password>
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.secure.random.implementation=SHA1PRNG
listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS
listener.name.replication.ssl.keystore.location=<path_to_keystore>
listener.name.replication.ssl.truststore.location=<path_to_truststore>
listener.name.replication.ssl.client.auth=required
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" ;
- 1
- 相互代理通信和客户端应用程序需要单独的配置。
- 2
- 将 REPLICATION 侦听器配置为使用 TLS,并将 CLIENT 侦听器配置为通过未加密的通道使用 SASL。客户端可能会在生产环境中使用加密的频道(
SASL_SSL)。 - 3
ssl.属性定义 TLS 配置。- 4
- 随机数生成器实施。如果没有设置,则使用 Java platform SDK 默认。
- 5
- 主机名验证。如果设置为空字符串,则会关闭主机名验证。如果没有设置,则默认值为
HTTPS,它会强制对服务器证书进行主机名验证。 - 6
- 侦听器的密钥存储的路径。
- 7
- 侦听器的信任存储的路径。
- 8
- 指定 REPLICATION 侦听器的客户端必须在建立 TLS 连接时与客户端证书进行身份验证(用于代理连接)。
以下示例使用 PLAIN 机制在最低配置中快速验证令牌验证,其中 inter-broker 通信通过与应用程序客户端相同的监听程序。
使用 PLAIN 机制的 inter-broker 配置示例
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https:<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<auth_server>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-secret" \
oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler
listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" \
oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
7.2.2. 在客户端应用程序中配置 OAuth 2.0 复制链接链接已复制到粘贴板!
要在客户端应用程序中配置 OAuth 2.0,您必须指定以下内容:
- SASL (简单身份验证和安全层)安全协议
- SASL 机制
- JAAS (Java 身份验证和授权服务)模块
- 访问授权服务器的身份验证属性
配置 SASL 协议
在客户端配置中指定 SASL 协议:
-
SASL_SSL用于通过 TLS 加密连接进行身份验证 -
SASL_PLAINTEXT用于通过未加密的连接进行身份验证
将 SASL_SSL 用于生产环境,SASL_PLAINTEXT 仅用于本地开发。
使用 SASL_SSL 时,需要额外的 ssl.truststore 配置。安全连接(https://)到 OAuth 2.0 授权服务器需要 truststore 配置。要验证 OAuth 2.0 授权服务器,请将授权服务器的 CA 证书添加到客户端配置的信任存储中。您可以使用 PEM 或 PKCS #12 格式配置信任存储。
配置 SASL 身份验证机制
在客户端配置中指定 SASL 机制:
-
OAUTHBEARER用于使用 bearer 令牌交换的凭证 -
PLAIN传递客户端凭证(clientId + secret)或访问令牌
配置 JAAS 模块
指定将 SASL 身份验证机制实现为 sasl.jaas.config 属性值的 JAAS 模块:
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule实现OAUTHBEARER机制 -
org.apache.kafka.common.security.plain.PlainLoginModule实现PLAIN机制
对于 OAUTHBEARER 机制,Apache Kafka 的 Streams 为使用 Kafka 客户端 Java 库启用凭证交换的客户端提供了一个回调处理器。对于其他语言中的客户端,可能需要自定义代码来获取访问令牌。对于 PLAIN 机制,Apache Kafka 的 Streams 提供了服务器端回调来启用凭证交换。
为了可以使用 OAUTHBEARER 机制,您还必须将自定义 io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 类添加为回调处理程序。JaasClientOauthLoginCallbackHandler 处理 OAuth 回调到授权服务器,以便在客户端登录期间处理访问令牌。这可启用自动令牌续订,确保在用户不干预的情况下进行持续身份验证。另外,它使用 OAuth 2.0 密码授权方法为客户端处理登录凭证。
配置身份验证属性
将客户端配置为使用凭据或访问令牌进行 OAuth 2.0 身份验证。
- 使用客户端凭证
- 使用客户端凭据涉及使用必要的凭据(客户端 ID 和机密,或者客户端 ID 和客户端断言)配置客户端,以从授权服务器获取有效的访问令牌。这是最简单的机制。
- 使用访问令牌
- 使用访问令牌时,客户端配置了有效的长期访问令牌,或者从授权服务器获取刷新令牌。使用访问令牌会增加复杂性,因为对授权服务器工具还有额外的依赖。如果您使用长期的访问令牌,您可能需要在授权服务器中配置客户端来提高令牌的最大生命周期。
发送到 Kafka 的唯一信息是访问令牌。用于获取令牌的凭证永远不会发送到 Kafka。当客户端获取访问令牌时,不需要进一步与授权服务器通信。
SASL 身份验证属性支持以下验证方法:
- OAuth 2.0 客户端凭证
- 访问令牌或服务帐户令牌
- 刷新令牌
- OAuth 2.0 密码授权(已弃用)
将身份验证属性作为 JAAS 配置(sasl.jaas.config 和 sasl.login.callback.handler.class)。
如果客户端应用程序没有直接配置访问令牌,客户端会在 Kafka 会话启动过程中交换访问令牌的以下一组凭证集:
- 客户端 ID 和 secret
- 客户端 ID 和客户端断言
- 客户端 ID、刷新令牌和(可选)secret
- 使用客户端 ID 和(可选)secret 的用户名和密码
您还可以将身份验证属性指定为环境变量,或指定为 Java 系统属性。对于 Java 系统属性,您可以使用 setProperty 设置它们,并使用 -D 选项在命令行中传递它们。
使用客户端 secret 的客户端凭证配置示例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
SASL_SSL安全协议用于 TLS 加密连接。仅对本地开发使用SASL_PLAINTEXT。- 2
- 指定为
OAUTHBEARER或PLAIN的 SASL 机制。 - 3
- 用于安全访问 Kafka 集群的 truststore 配置。
- 4
- 授权服务器令牌端点的 URI。
- 5
- 客户端 ID,这是在授权服务器中创建客户端时使用的名称。
- 6
- 在授权服务器中创建客户端时创建的 客户端 secret。
- 7
- 该位置包含授权服务器的公钥证书(
truststore.p12)。 - 8
- 用于访问 truststore 的密码。
- 9
- truststore 类型。
- 10
- (可选)从令牌端点请求令牌的
范围。授权服务器可能需要客户端来指定范围。 - 11
- (可选)从令牌端点请求令牌的
听众。授权服务器可能需要客户端来指定受众。
使用客户端断言的客户端凭证配置示例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.assertion.location="<path_to_client_assertion_token_file>" \
oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
密码授予配置示例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.password.grant.username="<username>" \
oauth.password.grant.password="<password>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
访问令牌配置示例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token="<access_token>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Kafka 客户端长期的访问令牌。或者,可以使用
oauth.access.token.location指定包含访问令牌的文件。
OpenShift 服务帐户令牌配置示例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token";
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- 指向文件系统中的服务帐户令牌的位置(假设客户端已部署为 OpenShift 容器集)
刷新令牌配置示例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.refresh.token="<refresh_token>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
7.2.3. OAuth 2.0 客户端身份验证流 复制链接链接已复制到粘贴板!
OAuth 2.0 身份验证流程取决于底层 Kafka 客户端和 Kafka 代理配置。流还必须由使用的授权服务器支持。
Kafka 代理监听程序配置决定客户端如何使用访问令牌进行身份验证。客户端可以传递客户端 ID 和机密来请求访问令牌。
如果监听程序配置为使用 PLAIN 身份验证,客户端可以通过客户端 ID 和 secret 或用户名和访问令牌进行身份验证。这些值作为 PLAIN 机制 属性传递。
的用户名和密码
侦听器配置支持以下令牌验证选项:
- 您可以根据 JWT 签名检查和本地令牌内省使用快速本地令牌验证,而无需联系授权服务器。授权服务器提供带有公共证书的 JWKS 端点,用于验证令牌中的签名。
- 您可以使用调用授权服务器提供的令牌内省端点。每次建立新的 Kafka 代理连接时,代理会将从客户端接收的访问令牌传递给授权服务器。Kafka 代理检查响应,以确认令牌是否有效。
授权服务器可能只允许使用不透明访问令牌,这意味着无法进行本地令牌验证。
也可以为以下类型的身份验证配置 Kafka 客户端凭证:
- 使用之前生成的长期访问令牌直接进行本地访问
- 与授权服务器联系,以便发布新的访问令牌(使用客户端 ID 和凭证、刷新令牌或用户名和密码)
7.2.3.1. 使用 SASL OAUTHBEARER 机制的客户端身份验证流示例 复制链接链接已复制到粘贴板!
您可以使用 SASL OAUTHBEARER 机制为 Kafka 身份验证使用以下通信流。
使用客户端 ID 和凭证的客户端,代理委派验证到授权服务器
- Kafka 客户端使用客户端 ID 和凭证从授权服务器请求访问令牌,以及可选的刷新令牌。或者,客户端也可以使用用户名和密码进行身份验证。
- 授权服务器生成新的访问令牌。
-
Kafka 客户端使用 SASL
OAUTHBEARER机制通过 Kafka 代理进行身份验证,以传递访问令牌。 - Kafka 代理通过使用自己的客户端 ID 和 secret,在授权服务器上调用令牌内省端点来验证访问令牌。
- 如果令牌有效,则会建立 Kafka 客户端会话。
使用客户端 ID 和凭证的客户端执行快速本地令牌验证
- Kafka 客户端使用令牌端点、使用客户端 ID 和凭证以及刷新令牌(可选)从令牌端点验证。或者,客户端也可以使用用户名和密码进行身份验证。
- 授权服务器生成新的访问令牌。
-
Kafka 客户端使用 SASL
OAUTHBEARER机制通过 Kafka 代理进行身份验证,以传递访问令牌。 - Kafka 代理使用 JWT 令牌签名检查和本地令牌内省验证访问令牌。
使用长期访问令牌的客户端,带有代理委派验证到授权服务器
-
Kafka 客户端使用 SASL
OAUTHBEARER机制与 Kafka 代理进行身份验证,以传递长期访问令牌。 - Kafka 代理通过使用自己的客户端 ID 和 secret,在授权服务器上调用令牌内省端点来验证访问令牌。
- 如果令牌有效,则会建立 Kafka 客户端会话。
使用长期访问令牌的客户端,代理执行快速本地验证
-
Kafka 客户端使用 SASL
OAUTHBEARER机制与 Kafka 代理进行身份验证,以传递长期访问令牌。 - Kafka 代理使用 JWT 令牌签名检查和本地令牌内省验证访问令牌。
快速的本地 JWT 令牌签名验证仅适用于短期的令牌,因为如果已撤销令牌,就不会通过授权服务器检查该授权服务器。令牌到期时间写入到令牌,但可以随时进行撤销,因此不能在不联系授权服务器的情况下被考虑。任何发布的令牌都将被视为有效,直到过期为止。
7.2.3.2. 使用 SASL PLAIN 机制的客户端验证流示例 复制链接链接已复制到粘贴板!
您可以使用 OAuth PLAIN 机制为 Kafka 身份验证使用以下通信流。
使用客户端 ID 和 secret 的客户端以及代理获取客户端的访问令牌
-
Kafka 客户端或传递一个
clientId作为用户名,以及一个secret作为密码。 -
Kafka 代理使用令牌端点将
clientId和secret传递给授权服务器。 - 如果客户端凭据无效,授权服务器会返回一个新的访问令牌或错误。
Kafka 代理使用以下方法之一验证令牌:
- 如果指定了令牌内省端点,Kafka 代理会通过调用授权服务器上的端点来验证访问令牌。如果令牌验证成功,则会建立会话。
- 如果使用本地令牌内省,则不会向授权服务器发出请求。Kafka 代理使用 JWT 令牌签名检查在本地验证访问令牌。
使用没有客户端 ID 和 secret 的长期访问令牌的客户端
- Kafka 客户端会传递用户名和密码。密码提供在运行客户端前手动配置的访问令牌值。
密码通过或不使用
$accessToken:字符串前缀来传递,具体取决于 Kafka 代理侦听程序是否配置了令牌端点来进行身份验证。-
如果配置了令牌端点,则密码应加上前缀
$accessToken:,以便代理知道 password 参数包含访问令牌,而不是客户端 secret。Kafka 代理将用户名解释为帐户用户名。 -
如果没有在 Kafka 代理监听程序上配置令牌端点(强制
no-client-credentials 模式),则密码应在没有前缀的情况下提供访问令牌。Kafka 代理将用户名解释为帐户用户名。在这个模式中,客户端不使用客户端 ID 和 secret,password参数始终解释为原始访问令牌。
-
如果配置了令牌端点,则密码应加上前缀
Kafka 代理使用以下方法之一验证令牌:
- 如果指定了令牌内省端点,Kafka 代理会通过调用授权服务器上的端点来验证访问令牌。如果令牌验证成功,则会建立会话。
- 如果使用本地令牌内省,则不会向授权服务器发出请求。Kafka 代理使用 JWT 令牌签名检查在本地验证访问令牌。
7.2.4. 重新验证会话 复制链接链接已复制到粘贴板!
您可以将 OAuth 侦听程序配置为使用 Kafka 会话在 Kafka 客户端和 Kafka 代理之间对 OAuth 2.0 会话进行重新身份验证。这个机制在定义的时间后强制实施客户端和代理之间经过身份验证的会话的过期。当会话过期时,客户端会立即通过重复使用现有连接而不是丢弃它来启动新的会话。
会话重新身份验证默认为禁用。要启用它,请在 server.properties 文件中为 connections.max.reauth.ms 属性设置一个时间值。如需示例配置,请参阅 第 7.2.1 节 “在监听程序中配置 OAuth 2.0 身份验证”。
会话重新身份验证必须由客户端使用的 Kafka 客户端库支持。
会话重新身份验证可用于快速 本地 JWT 或 内省端点 令牌验证。
客户端重新身份验证
当代理的经过身份验证的会话过期时,客户端必须通过向代理发送一个新的有效访问令牌来重新验证到现有会话,而无需丢弃连接。
如果令牌验证成功,则使用现有连接启动新的客户端会话。如果客户端无法重新验证,代理会在进一步尝试发送或接收消息时关闭连接。如果代理上启用了重新身份验证机制,则使用 Kafka 客户端库 2.2 或更高版本的 Java 客户端会自动重新验证。
如果使用,会话重新身份验证也适用于刷新令牌。当会话过期时,客户端会使用其刷新令牌来刷新访问令牌。然后,客户端使用新的访问令牌来重新验证现有连接。
OAUTHBEARER 和 PLAIN 的会话到期
配置会话重新身份验证后,会话到期对于 OAUTHBEARER 和 PLAIN 身份验证是不同的。
对于 OAUTHBEARER 和 PLAIN,使用 客户端 ID 和 secret 方法:
-
代理的身份验证会话将在配置的
connections.max.reauth.ms过期。 - 如果访问令牌在配置的时间前过期,会话将提前过期。
对于 PLAIN,使用 长期访问令牌 方法:
-
代理的身份验证会话将在配置的
connections.max.reauth.ms过期。 - 如果访问令牌在配置的时间前过期,则重新身份验证将失败。虽然尝试尝试会话重新身份验证,但 PLAIN 没有刷新令牌的机制。
如果没有配置 connection.max.reauth.ms,OAUTHBEARER 和 PLAIN 客户端可以无限期地连接到代理,而无需重新验证。经过身份验证的会话不会因为访问令牌到期而终止。
但是,这在配置授权时可以考虑,例如使用 keycloak 授权或安装自定义授权器。
7.2.5. 示例:启用 OAuth 2.0 身份验证 复制链接链接已复制到粘贴板!
本例演示了如何使用 OAUth 2.0 身份验证配置对 Kafka 集群的客户端访问。此流程描述了在 Kafka 侦听程序和 Kafka Java 客户端上设置 OAuth 2.0 身份验证所需的配置。
7.2.5.1. 为 Kafka 代理配置 OAuth 2.0 支持 复制链接链接已复制到粘贴板!
此流程描述了如何配置 Kafka 代理,以便代理监听程序可以使用授权服务器使用 OAuth 2.0 身份验证。
我们建议通过配置 TLS 侦听程序在加密接口中使用 OAuth 2.0。不建议使用 plain 监听程序。
使用支持您选择的授权服务器的属性配置 Kafka 代理,以及您要实现的授权类型。
先决条件
- 每个主机上安装了 Apache Kafka 的流,且配置文件可用。
- 部署 OAuth 2.0 授权服务器。
流程
在
server.properties文件中配置 Kafka 代理监听程序配置。例如,使用 OAUTHBEARER 机制:
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler将代理连接设置配置为
listener.name.client.oauthbearer.sasl.jaas.config的一部分。如果需要,配置对授权服务器的访问。
生产环境通常需要这一步,除非使用 service mesh 等技术来配置容器外的安全频道。
提供用于连接安全授权服务器的自定义信任存储。对授权服务器的访问始终需要 SSL。
设置属性来配置信任存储。
例如:
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ oauth.ssl.truststore.password="<truststore_password>" \ oauth.ssl.truststore.type="PKCS12" ;如果证书主机名与访问 URL 主机名不匹配,您可以关闭证书主机名验证:
oauth.ssl.endpoint.identification.algorithm=""检查可确保与授权服务器的连接是真实的。您可能想要在非生产环境中关闭验证。
接下来要做什么
7.2.5.2. 在 Kafka Java 客户端中设置 OAuth 2.0 复制链接链接已复制到粘贴板!
配置 Kafka producer 和消费者 API,以使用 OAuth 2.0 与 Kafka 代理交互。在客户端 pom.xml 文件中添加回调插件,然后为 OAuth 2.0 配置您的客户端。
如何配置身份验证属性取决于您用来访问 OAuth 2.0 授权服务器的身份验证方法。在此过程中,属性在属性文件中指定,然后加载到客户端配置中。
先决条件
- Apache Kafka 和 Kafka 的流正在运行
- 部署和配置 OAuth 2.0 授权服务器以 OAuth 访问 Kafka 代理
- 为 OAuth 2.0 配置 Kafka 代理
流程
将支持 OAuth 2.0 的客户端库添加到 Kafka 客户端的
pom.xml文件中:<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.15.0.redhat-00010</version> </dependency>根据 OAuth 2.0 身份验证方法配置客户端:
例如,在
client.properties文件中指定验证方法的属性。在 Java 客户端代码中输入 OAUTH 2.0 身份验证的客户端属性。
显示客户端属性输入示例
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }- 验证 Kafka 客户端是否可以访问 Kafka 代理。