6.3. 身份验证
要验证到 Kafka 集群的客户端连接,可以使用以下选项:
- TLS 客户端身份验证
- 在加密连接中使用 X.509 证书 TLS (传输层安全)
- Kafka SASL
- 使用支持的身份验证机制的 Kafka SASL (简单身份验证和安全层)
- OAuth 2.0
- 基于 OAuth 2.0 令牌的身份验证
SASL 身份验证支持普通未加密的连接和 TLS 连接的各种机制:
-
PLAIN
- 基于用户名和密码进行身份验证。 -
SCRAM-SHA-256
和SCRAM-SHA-512
- 使用 Salted Challenge Response Authentication Mechanism (SCRAM)进行身份验证。 -
GSSAPI
- 对 Kerberos 服务器进行身份验证。
PLAIN
机制通过网络发送用户名和密码,格式为未加密的格式。它应该只与 TLS 加密结合使用。
6.3.1. 启用 TLS 客户端身份验证
在 Kafka 代理中启用 TLS 客户端身份验证,以增强连接到已使用 TLS 加密的 Kafka 节点的安全性。
使用 ssl.client.auth
属性使用以下值之一设置 TLS 身份验证:
-
none
- TLS 客户端身份验证为 off (默认) -
requested
- 可选 TLS 客户端身份验证 -
必需
- 客户端必须使用 TLS 客户端证书进行身份验证
当客户端使用 TLS 客户端身份验证进行身份验证时,经过身份验证的主体名称会派生自客户端证书中的可分辨名称。例如,具有可分辨名称 CN=someuser
证书的用户将使用主体 CN=someuser,OU=Unknown,O=Unknown,L=Unknown,C=Unknown,C=Unknown,C=Unknown 进行验证
。这个主体名称为经过身份验证的用户或实体提供唯一标识符。如果没有使用 TLS 客户端身份验证,并且禁用 SASL,则主体名称默认为 ANONYMOUS
。
流程
- 准备一个 JKS (Java Keystore )信任存储,其中包含用于签署用户证书的 CA (认证机构)的公钥。
编辑所有集群节点上的 Kafka 配置文件,如下所示:
-
使用
ssl.truststore.location
属性指定 JKS 信任存储的路径。 -
如果信任存储受密码保护,请使用
ssl.truststore.password
属性设置密码。 将
ssl.client.auth
属性设置为required
。TLS 客户端身份验证配置
ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=123456 ssl.client.auth=required
-
使用
- (重新)启动 Kafka 代理。
6.3.2. 启用 SASL PLAIN 客户端身份验证
在 Kafka 中启用 SASL PLAIN 身份验证,以增强连接到 Kafka 节点的安全性。
SASL 身份验证通过使用 KafkaServer
JAAS 上下文的 Java 身份验证和授权服务(JAAS)启用。您可以在专用文件中或直接在 Kafka 配置中定义 JAAS 配置。
专用文件的建议位置为 /opt/kafka/config/jaas.conf
。确保该文件可由 kafka
用户读取。保留 JAAS 配置文件在所有 Kafka 节点上同步。
先决条件
- 每个主机上安装了 Apache Kafka 的流,且配置文件可用。
流程
编辑或创建
/opt/kafka/config/jaas.conf
JAAS 配置文件以启用PlainLoginModule
并指定允许的用户名和密码。确保该文件在所有 Kafka 代理中都是相同的。
JAAS 配置
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
编辑所有集群节点上的 Kafka 配置文件,如下所示:
-
使用
listener.security.protocol.map
属性在特定监听程序上启用 SASL PLAIN 身份验证。指定SASL_PLAINTEXT
或SASL_SSL
。 将
sasl.enabled.mechanisms
属性设置为PLAIN
。SASL 普通配置
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=PLAIN
-
使用
(重新)使用
KAFKA_OPTS
环境变量启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理:su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.3.3. 启用 SASL SCRAM 客户端身份验证
在 Kafka 中启用 SASL SCRAM 身份验证,以增强连接到 Kafka 节点的安全性。
SASL 身份验证通过使用 KafkaServer
JAAS 上下文的 Java 身份验证和授权服务(JAAS)启用。您可以在专用文件中或直接在 Kafka 配置中定义 JAAS 配置。
专用文件的建议位置为 /opt/kafka/config/jaas.conf
。确保该文件可由 kafka
用户读取。保留 JAAS 配置文件在所有 Kafka 节点上同步。
先决条件
- 每个主机上安装了 Apache Kafka 的流,且配置文件可用。
流程
编辑或创建
/opt/kafka/config/jaas.conf
JAAS 配置文件以启用ScramLoginModule
。确保该文件在所有 Kafka 代理中都是相同的。
JAAS 配置
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
编辑所有集群节点上的 Kafka 配置文件,如下所示:
-
使用
listener.security.protocol.map
属性在特定监听程序上启用 SASL SCRAM 身份验证。指定SASL_PLAINTEXT
或SASL_SSL
。 将
sasl.enabled.mechanisms
选项设置为SCRAM-SHA-256
或SCRAM-SHA-512
。例如:
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=SCRAM-SHA-512
-
使用
(重新)使用
KAFKA_OPTS
环境变量启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理。su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.3.4. 启用多个 SASL 机制
使用 SASL 身份验证时,您可以启用多个机制。Kafka 可以同时使用多个 SASL 机制。启用多个机制后,您可以选择特定的客户端使用的机制。
要使用多个机制,您可以设置每个机制所需的配置。您可以将不同的 KafkaServer
JAAS 配置添加到同一上下文中,并使用 sasl.mechanism.inter.broker.protocol
属性在 Kafka 配置中启用多个机制作为用逗号分开的列表。
JAAS 配置多个 SASL 机制
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; org.apache.kafka.common.security.scram.ScramLoginModule required; };
启用 SASL 机制
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
6.3.5. 为 inter-broker 身份验证启用 SASL
在 Kafka 节点间启用 SASL SCRAM 身份验证,以增强代理连接的安全性。除了将 SASL 身份验证用于客户端连接到 Kafka 集群外,您还可以使用 SASL 进行代理身份验证。与 SASL 用于客户端连接不同,您只能为代理通信选择一个机制。
先决条件
- ZooKeeper 安装在每个主机上,且配置文件可用。
如果您使用 SCRAM 机制,请在 Kafka 集群中注册 SCRAM 凭证。
对于 Kafka 集群中的所有节点,将 inter-broker SASL SCRAM 用户添加到 ZooKeeper 中。这样可确保在 Kafka 集群运行前为 bootstrap 更新用于身份验证的凭证。
注册 inter-broker SASL SCRAM 用户
bin/kafka-configs.sh \ --zookeeper localhost:2181 \ --alter \ --add-config 'SCRAM-SHA-512=[password=changeit]' \ --entity-type users \ --entity-name kafka
流程
使用
sasl.mechanism.inter.broker.protocol
属性在 Kafka 配置中指定 inter-broker SASL 机制。inter-broker SASL 机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
(可选)如果您使用 SCRAM 机制,请通过 添加 SCRAM 用户在 Kafka 集群上注册 SCRAM 凭证。
这样可确保在 Kafka 集群运行前为 bootstrap 更新用于身份验证的凭证。
使用
username
和 password 字段在KafkaServer
JAAS 上下文中指定用于代理间通信的用户名和密码
。inter-broker JAAS 上下文
KafkaServer { org.apache.kafka.common.security.plain.ScramLoginModule required username="admin" password="123456" # ... };
6.3.6. 添加 SASL SCRAM 用户
此流程概述了在 Kafka 中使用 SASL SCRAM 注册新用户进行身份验证的步骤。SASL SCRAM 身份验证增强了客户端连接的安全性。
流程
使用
kafka-configs.sh
工具添加新的 SASL SCRAM 用户。/opt/kafka/kafka-configs.sh \ --bootstrap-server <broker_host>:<port> \ --alter \ --add-config 'SCRAM-SHA-512=[password=<password>]' \ --entity-type users --entity-name <username>
例如:
/opt/kafka/kafka-configs.sh \ --bootstrap-server localhost:9092 \ --alter \ --add-config 'SCRAM-SHA-512=[password=123456]' \ --entity-type users \ --entity-name user1
6.3.7. 删除 SASL SCRAM 用户
此流程概述了在 Kafka 中使用 SASL SCRAM 删除注册的用户的步骤。
流程
使用
kafka-configs.sh
工具删除 SASL SCRAM 用户。/opt/kafka/bin/kafka-configs.sh \ --bootstrap-server <broker_host>:<port> \ --alter \ --delete-config 'SCRAM-SHA-512' \ --entity-type users \ --entity-name <username>
例如:
/opt/kafka/bin/kafka-configs.sh \ --bootstrap-server localhost:9092 \ --alter \ --delete-config 'SCRAM-SHA-512' \ --entity-type users \ --entity-name user1
6.3.8. 启用 Kerberos (GSSAPI)身份验证
Apache Kafka 的流支持使用 Kerberos (GSSAPI)身份验证协议来保护对 Kafka 集群的单点登录访问。GSSAPI 是 Kerberos 功能的 API 包装程序,从底层实现更改中模拟应用程序。
Kerberos 是一种网络身份验证系统,其允许客户端和服务器使用对称加密和信任的第三方 KDC 来互相进行身份验证。
此流程演示了如何为 Apache Kafka 配置流,以便 Kafka 客户端可以使用 Kerberos (GSSAPI)身份验证访问 Kafka 和 ZooKeeper。
该流程假设已在 Red Hat Enterprise Linux 主机上设置了 Kerberos krb5 资源服务器。
该流程显示,带有如何配置的示例:
- 服务主体
- 使用 Kerberos 登录的 Kafka 代理
- zookeeper 使用 Kerberos 登录
- 生产者和消费者客户端使用 Kerberos 身份验证访问 Kafka
这些说明描述了在单个主机上为单个 ZooKeeper 和 Kafka 安装设置的 Kerberos,为生成者和消费者客户端提供额外的配置。
先决条件
要配置 Kafka 和 ZooKeeper 来验证和授权 Kerberos 凭证,您需要:
- 访问 Kerberos 服务器
- 每个 Kafka 代理主机上的 Kerberos 客户端
有关在代理主机上设置 Kerberos 服务器和客户端的详情请参考 RHEL 设置配置中的 Kerberos 示例。
为身份验证添加服务主体
在 Kerberos 服务器中,为 ZooKeeper、Kafka 代理和 Kafka 生成者和消费者客户端创建服务主体(用户)。
服务主体必须采用 SERVICE-NAME/FULLY-QUALIFIED-HOST-NAME@DOMAIN-REALM 的形式。
创建通过 Kerberos KDC 存储主体密钥的服务主体和 keytab。
确保 Kerberos 主体中的域名采用大写。
例如:
-
zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM
-
kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM
-
producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM
consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM
ZooKeeper 服务主体必须与 Kafka
config/server.properties
文件中的zookeeper.connect
配置具有相同的主机名:zookeeper.connect=node1.example.redhat.com:2181
如果主机名不相同,则使用 localhost,身份验证将失败。
-
在主机上创建一个目录并添加 keytab 文件:
例如:
/opt/kafka/krb5/zookeeper-node1.keytab /opt/kafka/krb5/kafka-node1.keytab /opt/kafka/krb5/kafka-producer1.keytab /opt/kafka/krb5/kafka-consumer1.keytab
确保
kafka
用户可以访问目录:chown kafka:kafka -R /opt/kafka/krb5
将 ZooKeeper 配置为使用 Kerberos 登录
配置 ZooKeeper 以使用 Kerberos 密钥分发中心(KDC)使用之前为 zookeeper
创建的用户主体和 keytabs 进行身份验证。
创建或修改
opt/kafka/config/jaas.conf
文件来支持 ZooKeeper 客户端和服务器操作:Client { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true 1 storeKey=true 2 useTicketCache=false 3 keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" 4 principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; 5 }; Server { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true useTicketCache=false keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; }; QuorumServer { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; }; QuorumLearner { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; };
编辑
opt/kafka/config/zookeeper.properties
以使用更新的 JAAS 配置:# ... requireClientAuthScheme=sasl jaasLoginRenew=3600000 1 kerberos.removeHostFromPrincipal=false 2 kerberos.removeRealmFromPrincipal=false 3 quorum.auth.enableSasl=true 4 quorum.auth.learnerRequireSasl=true 5 quorum.auth.serverRequireSasl=true quorum.auth.learner.loginContext=QuorumLearner 6 quorum.auth.server.loginContext=QuorumServer quorum.auth.kerberos.servicePrincipal=zookeeper/_HOST 7 quorum.cnxn.threads.size=20
- 1
- 控制登录续订的频率(以毫秒为单位),可针对票据续订间隔进行调整。默认值为一小时。
- 2
- 指定主机名是否用作登录主体名称的一部分。如果将单个 keytab 用于集群中的所有节点,则会将其设置为
true
。但是,建议为每个代理主机生成单独的 keytab 和完全限定主体以进行故障排除。 - 3
- 控制域名称是否从 Kerberos 协商的主体名称中分离。建议将此设置设置为
false
。 - 4
- 为 ZooKeeper 服务器和客户端启用 SASL 身份验证机制。
- 5
RequireSasl
属性控制仲裁事件是否需要 SASL 身份验证,如 master 选举机制。- 6
loginContext
属性标识用于对指定组件进行身份验证的 JAAS 配置中登录上下文的名称。loginContext 名称与opt/kafka/config/jaas.conf
文件中相关部分的名称对应。- 7
- 控制用于组成用于识别的主体名称的命名惯例。占位符
_HOST
会自动解析为运行时由server.1
属性定义的主机名。
使用 JVM 参数启动 ZooKeeper,以指定 Kerberos 登录配置:
su - kafka export EXTRA_ARGS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
如果您不使用默认服务名称(
zookeeper
),请使用-Dzookeeper.sasl.client.username=NAME
参数添加名称。注意如果您使用
/etc/krb5.conf
位置,则不需要在启动 ZooKeeper, Kafka, 或 Kafka 生成者和消费者时不需要指定-Djava.security.krb5.conf=/etc/krb5.conf
。
将 Kafka 代理服务器配置为使用 Kerberos 登录
使用之前为 kafka
创建的 user principals 和 keytabs,将 Kafka 配置为使用 Kerberos 密钥分发中心(KDC)进行身份验证。
使用以下元素修改
opt/kafka/config/jaas.conf
文件:KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/kafka/krb5/kafka-node1.keytab" principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true useTicketCache=false keyTab="/opt/kafka/krb5/kafka-node1.keytab" principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; };
通过修改
config/server.properties
文件中的监听程序配置来配置 Kafka 集群中的每个代理,以便监听程序使用 SASL/GSSAPI 登录。将 SASL 协议添加到监听器的安全协议映射中,并删除所有不需要的协议。
例如:
# ... broker.id=0 # ... listeners=SECURE://:9092,REPLICATION://:9094 1 inter.broker.listener.name=REPLICATION # ... listener.security.protocol.map=SECURE:SASL_PLAINTEXT,REPLICATION:SASL_PLAINTEXT 2 # .. sasl.enabled.mechanisms=GSSAPI 3 sasl.mechanism.inter.broker.protocol=GSSAPI 4 sasl.kerberos.service.name=kafka 5 ...
使用 JVM 参数启动 Kafka 代理,以指定 Kerberos 登录配置:
su - kafka export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
如果代理和 ZooKeeper 集群之前已经配置并使用基于非 Kerberos 的身份验证系统,则可以启动 ZooKeeper 和 broker 集群,并检查日志中是否存在配置错误。
启动代理和 Zookeeper 实例后,集群现在被配置为 Kerberos 身份验证。
配置 Kafka producer 和消费者客户端以使用 Kerberos 身份验证
使用之前为 producer1
和 consumer1
创建的用户主体和 keytabs 配置 Kafka producer 和 使用者客户端,以使用 Kerberos 密钥分发中心(KDC)进行身份验证。
将 Kerberos 配置添加到制作者或消费者配置文件中。
例如:
/opt/kafka/config/producer.properties
# ... sasl.mechanism=GSSAPI 1 security.protocol=SASL_PLAINTEXT 2 sasl.kerberos.service.name=kafka 3 sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ 4 useKeyTab=true \ useTicketCache=false \ storeKey=true \ keyTab="/opt/kafka/krb5/producer1.keytab" \ principal="producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; # ...
/opt/kafka/config/consumer.properties
# ... sasl.mechanism=GSSAPI security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ useKeyTab=true \ useTicketCache=false \ storeKey=true \ keyTab="/opt/kafka/krb5/consumer1.keytab" \ principal="consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; # ...
运行客户端,验证您可以从 Kafka 代理发送和接收信息。
制作者客户端:
export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-producer.sh --producer.config /opt/kafka/config/producer.properties --topic topic1 --bootstrap-server node1.example.redhat.com:9094
消费者客户端:
export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-consumer.sh --consumer.config /opt/kafka/config/consumer.properties --topic topic1 --bootstrap-server node1.example.redhat.com:9094
其他资源
- Kerberos man pages: krb5.conf(5), kinit(1), klist(1), 和 kdestroy(1)
- RHEL 设置配置中的 Kerberos 服务器示例
- 使用 Kerberos 票据进行 Kafka 集群验证的客户端应用程序示例