搜索

6.3. 身份验证

download PDF

要验证到 Kafka 集群的客户端连接,可以使用以下选项:

TLS 客户端身份验证
在加密连接中使用 X.509 证书 TLS (传输层安全)
Kafka SASL
使用支持的身份验证机制的 Kafka SASL (简单身份验证和安全层)
OAuth 2.0
基于 OAuth 2.0 令牌的身份验证

SASL 身份验证支持普通未加密的连接和 TLS 连接的各种机制:

  • PLAIN - 基于用户名和密码进行身份验证。
  • SCRAM-SHA-256SCRAM-SHA-512 - 使用 Salted Challenge Response Authentication Mechanism (SCRAM)进行身份验证。
  • GSSAPI - 对 Kerberos 服务器进行身份验证。
警告

PLAIN 机制通过网络发送用户名和密码,格式为未加密的格式。它应该只与 TLS 加密结合使用。

6.3.1. 启用 TLS 客户端身份验证

在 Kafka 代理中启用 TLS 客户端身份验证,以增强连接到已使用 TLS 加密的 Kafka 节点的安全性。

使用 ssl.client.auth 属性使用以下值之一设置 TLS 身份验证:

  • none - TLS 客户端身份验证为 off (默认)
  • requested - 可选 TLS 客户端身份验证
  • 必需 - 客户端必须使用 TLS 客户端证书进行身份验证

当客户端使用 TLS 客户端身份验证进行身份验证时,经过身份验证的主体名称会派生自客户端证书中的可分辨名称。例如,具有可分辨名称 CN=someuser 证书的用户将使用主体 CN=someuser,OU=Unknown,O=Unknown,L=Unknown,C=Unknown,C=Unknown,C=Unknown 进行验证。这个主体名称为经过身份验证的用户或实体提供唯一标识符。如果没有使用 TLS 客户端身份验证,并且禁用 SASL,则主体名称默认为 ANONYMOUS

先决条件

流程

  1. 准备一个 JKS (Java Keystore )信任存储,其中包含用于签署用户证书的 CA (认证机构)的公钥。
  2. 编辑所有集群节点上的 Kafka 配置文件,如下所示:

    • 使用 ssl.truststore.location 属性指定 JKS 信任存储的路径。
    • 如果信任存储受密码保护,请使用 ssl.truststore.password 属性设置密码。
    • ssl.client.auth 属性设置为 required

      TLS 客户端身份验证配置

      ssl.truststore.location=/path/to/truststore.jks
      ssl.truststore.password=123456
      ssl.client.auth=required

  3. (重新)启动 Kafka 代理。

6.3.2. 启用 SASL PLAIN 客户端身份验证

在 Kafka 中启用 SASL PLAIN 身份验证,以增强连接到 Kafka 节点的安全性。

SASL 身份验证通过使用 KafkaServer JAAS 上下文的 Java 身份验证和授权服务(JAAS)启用。您可以在专用文件中或直接在 Kafka 配置中定义 JAAS 配置。

专用文件的建议位置为 /opt/kafka/config/jaas.conf。确保该文件可由 kafka 用户读取。保留 JAAS 配置文件在所有 Kafka 节点上同步。

先决条件

流程

  1. 编辑或创建 /opt/kafka/config/jaas.conf JAAS 配置文件以启用 PlainLoginModule 并指定允许的用户名和密码。

    确保该文件在所有 Kafka 代理中都是相同的。

    JAAS 配置

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        user_admin="123456"
        user_user1="123456"
        user_user2="123456";
    };

  2. 编辑所有集群节点上的 Kafka 配置文件,如下所示:

    • 使用 listener.security.protocol.map 属性在特定监听程序上启用 SASL PLAIN 身份验证。指定 SASL_PLAINTEXTSASL_SSL
    • sasl.enabled.mechanisms 属性设置为 PLAIN

      SASL 普通配置

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=PLAIN

  3. (重新)使用 KAFKA_OPTS 环境变量启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理:

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

6.3.3. 启用 SASL SCRAM 客户端身份验证

在 Kafka 中启用 SASL SCRAM 身份验证,以增强连接到 Kafka 节点的安全性。

SASL 身份验证通过使用 KafkaServer JAAS 上下文的 Java 身份验证和授权服务(JAAS)启用。您可以在专用文件中或直接在 Kafka 配置中定义 JAAS 配置。

专用文件的建议位置为 /opt/kafka/config/jaas.conf。确保该文件可由 kafka 用户读取。保留 JAAS 配置文件在所有 Kafka 节点上同步。

先决条件

流程

  1. 编辑或创建 /opt/kafka/config/jaas.conf JAAS 配置文件以启用 ScramLoginModule

    确保该文件在所有 Kafka 代理中都是相同的。

    JAAS 配置

    KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required;
    };

  2. 编辑所有集群节点上的 Kafka 配置文件,如下所示:

    • 使用 listener.security.protocol.map 属性在特定监听程序上启用 SASL SCRAM 身份验证。指定 SASL_PLAINTEXTSASL_SSL
    • sasl.enabled.mechanisms 选项设置为 SCRAM-SHA-256SCRAM-SHA-512

      例如:

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=SCRAM-SHA-512
  3. (重新)使用 KAFKA_OPTS 环境变量启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理。

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

6.3.4. 启用多个 SASL 机制

使用 SASL 身份验证时,您可以启用多个机制。Kafka 可以同时使用多个 SASL 机制。启用多个机制后,您可以选择特定的客户端使用的机制。

要使用多个机制,您可以设置每个机制所需的配置。您可以将不同的 KafkaServer JAAS 配置添加到同一上下文中,并使用 sasl.mechanism.inter.broker.protocol 属性在 Kafka 配置中启用多个机制作为用逗号分开的列表。

JAAS 配置多个 SASL 机制

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";

    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

    org.apache.kafka.common.security.scram.ScramLoginModule required;
};

启用 SASL 机制

sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512

6.3.5. 为 inter-broker 身份验证启用 SASL

在 Kafka 节点间启用 SASL SCRAM 身份验证,以增强代理连接的安全性。除了将 SASL 身份验证用于客户端连接到 Kafka 集群外,您还可以使用 SASL 进行代理身份验证。与 SASL 用于客户端连接不同,您只能为代理通信选择一个机制。

先决条件

  • ZooKeeper 安装在每个主机上,且配置文件可用。
  • 如果您使用 SCRAM 机制,请在 Kafka 集群中注册 SCRAM 凭证。

    对于 Kafka 集群中的所有节点,将 inter-broker SASL SCRAM 用户添加到 ZooKeeper 中。这样可确保在 Kafka 集群运行前为 bootstrap 更新用于身份验证的凭证。

    注册 inter-broker SASL SCRAM 用户

    bin/kafka-configs.sh \
    --zookeeper localhost:2181 \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=changeit]' \
    --entity-type users \
    --entity-name kafka

流程

  1. 使用 sasl.mechanism.inter.broker.protocol 属性在 Kafka 配置中指定 inter-broker SASL 机制。

    inter-broker SASL 机制

    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

  2. (可选)如果您使用 SCRAM 机制,请通过 添加 SCRAM 用户在 Kafka 集群上注册 SCRAM 凭证。

    这样可确保在 Kafka 集群运行前为 bootstrap 更新用于身份验证的凭证。

  3. 使用 username 和 password 字段在 KafkaServer JAAS 上下文中指定用于代理间通信的 用户名和密码

    inter-broker JAAS 上下文

    KafkaServer {
        org.apache.kafka.common.security.plain.ScramLoginModule required
        username="admin"
        password="123456"
        # ...
    };

6.3.6. 添加 SASL SCRAM 用户

此流程概述了在 Kafka 中使用 SASL SCRAM 注册新用户进行身份验证的步骤。SASL SCRAM 身份验证增强了客户端连接的安全性。

先决条件

流程

  • 使用 kafka-configs.sh 工具添加新的 SASL SCRAM 用户。

    /opt/kafka/kafka-configs.sh \
    --bootstrap-server <broker_host>:<port> \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=<password>]' \
    --entity-type users --entity-name <username>

    例如:

    /opt/kafka/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=123456]' \
    --entity-type users \
    --entity-name user1

6.3.7. 删除 SASL SCRAM 用户

此流程概述了在 Kafka 中使用 SASL SCRAM 删除注册的用户的步骤。

先决条件

流程

  • 使用 kafka-configs.sh 工具删除 SASL SCRAM 用户。

    /opt/kafka/bin/kafka-configs.sh \
    --bootstrap-server <broker_host>:<port> \
    --alter \
    --delete-config 'SCRAM-SHA-512' \
    --entity-type users \
    --entity-name <username>

    例如:

    /opt/kafka/bin/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --alter \
    --delete-config 'SCRAM-SHA-512' \
    --entity-type users \
    --entity-name user1

6.3.8. 启用 Kerberos (GSSAPI)身份验证

Apache Kafka 的流支持使用 Kerberos (GSSAPI)身份验证协议来保护对 Kafka 集群的单点登录访问。GSSAPI 是 Kerberos 功能的 API 包装程序,从底层实现更改中模拟应用程序。

Kerberos 是一种网络身份验证系统,其允许客户端和服务器使用对称加密和信任的第三方 KDC 来互相进行身份验证。

此流程演示了如何为 Apache Kafka 配置流,以便 Kafka 客户端可以使用 Kerberos (GSSAPI)身份验证访问 Kafka 和 ZooKeeper。

该流程假设已在 Red Hat Enterprise Linux 主机上设置了 Kerberos krb5 资源服务器。

该流程显示,带有如何配置的示例:

  1. 服务主体
  2. 使用 Kerberos 登录的 Kafka 代理
  3. zookeeper 使用 Kerberos 登录
  4. 生产者和消费者客户端使用 Kerberos 身份验证访问 Kafka

这些说明描述了在单个主机上为单个 ZooKeeper 和 Kafka 安装设置的 Kerberos,为生成者和消费者客户端提供额外的配置。

先决条件

要配置 Kafka 和 ZooKeeper 来验证和授权 Kerberos 凭证,您需要:

  • 访问 Kerberos 服务器
  • 每个 Kafka 代理主机上的 Kerberos 客户端

有关在代理主机上设置 Kerberos 服务器和客户端的详情请参考 RHEL 设置配置中的 Kerberos 示例

为身份验证添加服务主体

在 Kerberos 服务器中,为 ZooKeeper、Kafka 代理和 Kafka 生成者和消费者客户端创建服务主体(用户)。

服务主体必须采用 SERVICE-NAME/FULLY-QUALIFIED-HOST-NAME@DOMAIN-REALM 的形式。

  1. 创建通过 Kerberos KDC 存储主体密钥的服务主体和 keytab。

    确保 Kerberos 主体中的域名采用大写。

    例如:

    • zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM
    • kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM
    • producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM
    • consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM

      ZooKeeper 服务主体必须与 Kafka config/server.properties 文件中的 zookeeper.connect 配置具有相同的主机名:

      zookeeper.connect=node1.example.redhat.com:2181

      如果主机名不相同,则使用 localhost,身份验证将失败。

  2. 在主机上创建一个目录并添加 keytab 文件:

    例如:

    /opt/kafka/krb5/zookeeper-node1.keytab
    /opt/kafka/krb5/kafka-node1.keytab
    /opt/kafka/krb5/kafka-producer1.keytab
    /opt/kafka/krb5/kafka-consumer1.keytab
  3. 确保 kafka 用户可以访问目录:

    chown kafka:kafka -R /opt/kafka/krb5
将 ZooKeeper 配置为使用 Kerberos 登录

配置 ZooKeeper 以使用 Kerberos 密钥分发中心(KDC)使用之前为 zookeeper 创建的用户主体和 keytabs 进行身份验证。

  1. 创建或修改 opt/kafka/config/jaas.conf 文件来支持 ZooKeeper 客户端和服务器操作:

    Client {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true 1
        storeKey=true 2
        useTicketCache=false 3
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" 4
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; 5
    };
    
    Server {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        useTicketCache=false
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab"
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    
    QuorumServer {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab"
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    
    QuorumLearner {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab"
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    1
    设置为 true,以从 keytab 获取主体密钥。
    2
    设置为 true 以存储主体密钥。
    3
    设置为 true,以从票据缓存获取 Ticket Granting Ticket (TGT)。
    4
    keyTab 属性指向从 Kerberos KDC 复制的 keytab 文件的位置。位置和文件必须可由 kafka 用户读取。
    5
    principal 属性配置为与 KDC 主机上创建的完全限定域名匹配,其格式是 SERVICE-NAME/FULLY-QUALIFIED-HOST@DOMAIN-NAME
  2. 编辑 opt/kafka/config/zookeeper.properties 以使用更新的 JAAS 配置:

    # ...
    
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000 1
    kerberos.removeHostFromPrincipal=false 2
    kerberos.removeRealmFromPrincipal=false 3
    quorum.auth.enableSasl=true 4
    quorum.auth.learnerRequireSasl=true 5
    quorum.auth.serverRequireSasl=true
    quorum.auth.learner.loginContext=QuorumLearner 6
    quorum.auth.server.loginContext=QuorumServer
    quorum.auth.kerberos.servicePrincipal=zookeeper/_HOST 7
    quorum.cnxn.threads.size=20
    1
    控制登录续订的频率(以毫秒为单位),可针对票据续订间隔进行调整。默认值为一小时。
    2
    指定主机名是否用作登录主体名称的一部分。如果将单个 keytab 用于集群中的所有节点,则会将其设置为 true。但是,建议为每个代理主机生成单独的 keytab 和完全限定主体以进行故障排除。
    3
    控制域名称是否从 Kerberos 协商的主体名称中分离。建议将此设置设置为 false
    4
    为 ZooKeeper 服务器和客户端启用 SASL 身份验证机制。
    5
    RequireSasl 属性控制仲裁事件是否需要 SASL 身份验证,如 master 选举机制。
    6
    loginContext 属性标识用于对指定组件进行身份验证的 JAAS 配置中登录上下文的名称。loginContext 名称与 opt/kafka/config/jaas.conf 文件中相关部分的名称对应。
    7
    控制用于组成用于识别的主体名称的命名惯例。占位符 _HOST 会自动解析为运行时由 server.1 属性定义的主机名。
  3. 使用 JVM 参数启动 ZooKeeper,以指定 Kerberos 登录配置:

    su - kafka
    export EXTRA_ARGS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

    如果您不使用默认服务名称(zookeeper),请使用 -Dzookeeper.sasl.client.username=NAME 参数添加名称。

    注意

    如果您使用 /etc/krb5.conf 位置,则不需要在启动 ZooKeeper, Kafka, 或 Kafka 生成者和消费者时不需要指定 -Djava.security.krb5.conf=/etc/krb5.conf

将 Kafka 代理服务器配置为使用 Kerberos 登录

使用之前为 kafka 创建的 user principals 和 keytabs,将 Kafka 配置为使用 Kerberos 密钥分发中心(KDC)进行身份验证。

  1. 使用以下元素修改 opt/kafka/config/jaas.conf 文件:

    KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/krb5/kafka-node1.keytab"
        principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        useTicketCache=false
        keyTab="/opt/kafka/krb5/kafka-node1.keytab"
        principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
  2. 通过修改 config/server.properties 文件中的监听程序配置来配置 Kafka 集群中的每个代理,以便监听程序使用 SASL/GSSAPI 登录。

    将 SASL 协议添加到监听器的安全协议映射中,并删除所有不需要的协议。

    例如:

    # ...
    broker.id=0
    # ...
    listeners=SECURE://:9092,REPLICATION://:9094 1
    inter.broker.listener.name=REPLICATION
    # ...
    listener.security.protocol.map=SECURE:SASL_PLAINTEXT,REPLICATION:SASL_PLAINTEXT 2
    # ..
    sasl.enabled.mechanisms=GSSAPI 3
    sasl.mechanism.inter.broker.protocol=GSSAPI 4
    sasl.kerberos.service.name=kafka 5
    ...
    1
    配置了两个监听器:一个安全监听程序用于与客户端的通用通信(支持 TLS 用于通信),以及用于代理间通信的复制监听程序。
    2
    对于启用了 TLS 的监听程序,协议名称为 SASL_PLAINTEXT。对于启用了 TLS 的连接器,协议名称为 SASL_PLAINTEXT。如果不需要 SSL,您可以删除 ssl.* 属性。
    3
    Kerberos 验证的 SASL 机制是 GSSAPI
    4
    用于代理间通信的 Kerberos 身份验证。
    5
    用于指定用于身份验证请求的服务名称,以将其与可能使用相同的 Kerberos 配置的其他服务区分开来。
  3. 使用 JVM 参数启动 Kafka 代理,以指定 Kerberos 登录配置:

    su - kafka
    export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

    如果代理和 ZooKeeper 集群之前已经配置并使用基于非 Kerberos 的身份验证系统,则可以启动 ZooKeeper 和 broker 集群,并检查日志中是否存在配置错误。

    启动代理和 Zookeeper 实例后,集群现在被配置为 Kerberos 身份验证。

配置 Kafka producer 和消费者客户端以使用 Kerberos 身份验证

使用之前为 producer1consumer1 创建的用户主体和 keytabs 配置 Kafka producer 和 使用者客户端,以使用 Kerberos 密钥分发中心(KDC)进行身份验证。

  1. 将 Kerberos 配置添加到制作者或消费者配置文件中。

    例如:

    /opt/kafka/config/producer.properties

    # ...
    sasl.mechanism=GSSAPI 1
    security.protocol=SASL_PLAINTEXT 2
    sasl.kerberos.service.name=kafka 3
    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ 4
        useKeyTab=true  \
        useTicketCache=false \
        storeKey=true  \
        keyTab="/opt/kafka/krb5/producer1.keytab" \
        principal="producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    # ...

    1
    配置 Kerberos (GSSAPI)身份验证。
    2
    Kerberos 使用 SASL 纯文本(用户名/密码)安全协议。
    3
    在 Kerberos KDC 中配置的 Kafka 的服务主体(user)。
    4
    使用 jaas.conf 中定义的相同属性,为 JAAS 配置。

    /opt/kafka/config/consumer.properties

    # ...
    sasl.mechanism=GSSAPI
    security.protocol=SASL_PLAINTEXT
    sasl.kerberos.service.name=kafka
    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true  \
        useTicketCache=false \
        storeKey=true  \
        keyTab="/opt/kafka/krb5/consumer1.keytab" \
        principal="consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    # ...

  2. 运行客户端,验证您可以从 Kafka 代理发送和接收信息。

    制作者客户端:

    export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-producer.sh --producer.config /opt/kafka/config/producer.properties  --topic topic1 --bootstrap-server node1.example.redhat.com:9094

    消费者客户端:

    export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-consumer.sh --consumer.config /opt/kafka/config/consumer.properties  --topic topic1 --bootstrap-server node1.example.redhat.com:9094

其他资源

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.