第 12 章 KafkaListenerAuthenticationCustom schema reference
使用 in: GenericKafkaListener
KafkaListenerAuthenticationCustom schema 属性的完整列表
为监听程序配置自定义身份验证。
要配置自定义身份验证,请将 type 属性设置为 custom。自定义身份验证允许使用任何类型的 Kafka 支持的身份验证。
自定义 OAuth 身份验证配置示例
spec:
kafka:
config:
principal.builder.class: SimplePrincipal.class
listeners:
- name: oauth-bespoke
port: 9093
type: internal
tls: true
authentication:
type: custom
sasl: true
listenerConfig:
oauthbearer.sasl.client.callback.handler.class: client.class
oauthbearer.sasl.server.callback.handler.class: server.class
oauthbearer.sasl.login.callback.handler.class: login.class
oauthbearer.connections.max.reauth.ms: 999999999
sasl.enabled.mechanisms: oauthbearer
oauthbearer.sasl.jaas.config: |
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
secrets:
- name: example
生成协议映射,它使用 sasl 和 tls 值来确定要映射到监听程序的协议。
-
sasl = True, TLS = True
SASL_SSL -
sasl = False, TLS = True
SSL -
sasl = True, TLS = False
SASL_PLAINTEXT -
sasl = False, TLS = False
PLAINTEXT
secret 挂载到 Kafka 代理节点的 容器中的 /opt/kafka/custom-authn-secrets/custom-listener-<listener_name>-<port>/<secret_name >。例如,示例配置中挂载的 secret (示例)位于 /opt/kafka/custom-authn-secrets/custom-listener-oauth-bespoke-9093/example。
12.1. 设置自定义主体构建器 复制链接链接已复制到粘贴板!
您可以在 Kafka 集群配置中设置自定义主体构建器。但是,主体构建器有以下要求:
- 镜像上必须存在指定的主体构建器类。在自己构建之前,请检查是否已存在。您需要使用所需类重建 Apache Kafka 镜像的流。
-
没有其他监听程序使用
oauth类型身份验证。这是因为 OAuth 侦听器将自己的原则构建器附加到 Kafka 配置中。 - 指定主体构建器与 Apache Kafka 的 Streams 兼容。
自定义主体构建器必须支持进行身份验证的对等证书,因为 Apache Kafka 的 Streams 使用它们来管理 Kafka 集群。
Kafka 的默认主体构建器类 支持根据对等证书的名称构建主体。自定义主体构建器应使用 SSL peer 证书的名称提供类型为 user 的主体。
以下示例显示了一个自定义主体构建器,它满足 Apache Kafka 的 Streams 的 OAuth 要求。
自定义 OAuth 配置的主体构建器示例
public final class CustomKafkaPrincipalBuilder implements KafkaPrincipalBuilder {
public KafkaPrincipalBuilder() {}
@Override
public KafkaPrincipal build(AuthenticationContext context) {
if (context instanceof SslAuthenticationContext) {
SSLSession sslSession = ((SslAuthenticationContext) context).session();
try {
return new KafkaPrincipal(
KafkaPrincipal.USER_TYPE, sslSession.getPeerPrincipal().getName());
} catch (SSLPeerUnverifiedException e) {
throw new IllegalArgumentException("Cannot use an unverified peer for authentication", e);
}
}
// Create your own KafkaPrincipal here
...
}
}