第 12 章 KafkaListenerAuthenticationCustom schema reference


使用 in: GenericKafkaListener

KafkaListenerAuthenticationCustom schema 属性的完整列表

为监听程序配置自定义身份验证。

要配置自定义身份验证,请将 type 属性设置为 custom。自定义身份验证允许使用任何类型的 Kafka 支持的身份验证。

自定义 OAuth 身份验证配置示例

spec:
  kafka:
    config:
      principal.builder.class: SimplePrincipal.class
    listeners:
      - name: oauth-bespoke
        port: 9093
        type: internal
        tls: true
        authentication:
          type: custom
          sasl: true
          listenerConfig:
            oauthbearer.sasl.client.callback.handler.class: client.class
            oauthbearer.sasl.server.callback.handler.class: server.class
            oauthbearer.sasl.login.callback.handler.class: login.class
            oauthbearer.connections.max.reauth.ms: 999999999
            sasl.enabled.mechanisms: oauthbearer
            oauthbearer.sasl.jaas.config: |
              org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
          secrets:
            - name: example
Copy to Clipboard Toggle word wrap

生成协议映射,它使用 sasltls 值来确定要映射到监听程序的协议。

  • sasl = True, TLS = True SASL_SSL
  • sasl = False, TLS = True SSL
  • sasl = True, TLS = False SASL_PLAINTEXT
  • sasl = False, TLS = False PLAINTEXT

secret 挂载到 Kafka 代理节点的 容器中的 /opt/kafka/custom-authn-secrets/custom-listener-<listener_name>-<port>/<secret_name >。例如,示例配置中挂载的 secret (示例)位于 /opt/kafka/custom-authn-secrets/custom-listener-oauth-bespoke-9093/example

12.1. 设置自定义主体构建器

您可以在 Kafka 集群配置中设置自定义主体构建器。但是,主体构建器有以下要求:

  • 镜像上必须存在指定的主体构建器类。在自己构建之前,请检查是否已存在。您需要使用所需类重建 Apache Kafka 镜像的流。
  • 没有其他监听程序使用 oauth 类型身份验证。这是因为 OAuth 侦听器将自己的原则构建器附加到 Kafka 配置中。
  • 指定主体构建器与 Apache Kafka 的 Streams 兼容。

自定义主体构建器必须支持进行身份验证的对等证书,因为 Apache Kafka 的 Streams 使用它们来管理 Kafka 集群。

注意

Kafka 的默认主体构建器类 支持根据对等证书的名称构建主体。自定义主体构建器应使用 SSL peer 证书的名称提供类型为 user 的主体。

以下示例显示了一个自定义主体构建器,它满足 Apache Kafka 的 Streams 的 OAuth 要求。

自定义 OAuth 配置的主体构建器示例

public final class CustomKafkaPrincipalBuilder implements KafkaPrincipalBuilder {

    public KafkaPrincipalBuilder() {}

    @Override
    public KafkaPrincipal build(AuthenticationContext context) {
        if (context instanceof SslAuthenticationContext) {
            SSLSession sslSession = ((SslAuthenticationContext) context).session();
            try {
                return new KafkaPrincipal(
                    KafkaPrincipal.USER_TYPE, sslSession.getPeerPrincipal().getName());
            } catch (SSLPeerUnverifiedException e) {
                throw new IllegalArgumentException("Cannot use an unverified peer for authentication", e);
            }
        }

        // Create your own KafkaPrincipal here
        ...
    }
}
Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat