6.4. 配置 Kafka


Kafka 使用属性文件来存储静态配置。配置文件的建议位置为 /opt/kafka/config/server.properties。该配置文件必须可由 kafka 用户读取。

AMQ Streams 附带了一个示例配置文件,它突出显示了该产品的基本和高级功能。它可以在 AMQ Streams 安装目录中的 config/server.properties 下找到。

本章解释了最重要的配置选项。

6.4.1. ZooKeeper

Kafka 代理需要 ZooKeeper 存储其配置的一些部分,并协调集群(例如,决定哪个节点是哪个分区的领导)。ZooKeeper 集群的连接详情保存在配置文件中。字段 zookeeper.connect 包含 zookeeper 集群成员的主机名和端口列表。

例如:

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
Copy to Clipboard Toggle word wrap

Kafka 将使用这些地址来连接到 ZooKeeper 集群。使用这个配置,所有 Kafka znodes 都会直接在 ZooKeeper 数据库的根目录中创建。因此,此类 ZooKeeper 集群只能用于单个 Kafka 集群。要将多个 Kafka 集群配置为使用单个 ZooKeeper 集群,在 Kafka 配置文件中 ZooKeeper 连接字符串末尾指定基本(前缀)路径:

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1
Copy to Clipboard Toggle word wrap

6.4.2. 监听器

监听器用于连接到 Kafka 代理。每个 Kafka 代理都可以配置为使用多个监听程序。每个侦听器都需要不同的配置,以便它可以侦听不同的端口或网络接口。

要配置监听程序,请编辑配置文件中的 监听程序 属性(/opt/kafka/config/server.properties)。以逗号分隔列表的形式将 监听程序添加到监听程序 属性。配置每个属性,如下所示:

<listenerName>://<hostname>:<port>
Copy to Clipboard Toggle word wrap

如果 <hostname> 为空,则 Kafka 将使用 java.net.InetAddress.getCanonicalHostName() 类作为主机名。

多个监听器的配置示例

listeners=internal-1://:9092,internal-2://:9093,replication://:9094
Copy to Clipboard Toggle word wrap

当 Kafka 客户端连接到 Kafka 集群时,它首先连接到 bootstrap 服务器,这是集群节点之一。bootstrap 服务器为客户端提供集群中所有代理的列表,客户端会单独连接到每个代理。代理列表基于配置 的监听程序

advertised 监听程序

另外,您可以使用 advertised.listeners 属性为客户端提供不同于监听程序属性中给出的一系列不同的监听程序地址。如果其他网络基础架构(如代理)在客户端和代理之间,或者会使用外部 DNS 名称而不是 IP 地址,这很有用。

advertised.listeners 属性的格式与 listeners 属性相同。

公告监听程序的配置示例

listeners=internal-1://:9092,internal-2://:9093
advertised.listeners=internal-1://my-broker-1.my-domain.com:1234,internal-2://my-broker-1.my-domain.com:1235
Copy to Clipboard Toggle word wrap

注意

公告的监听程序的名称必须与监听程序属性中列出的名称匹配。

inter-broker 监听程序

inter-broker 监听程序 用于 Kafka 代理之间的通信。需要代理间通信:

  • 在不同代理间协调工作负载
  • 在存储在不同代理中的分区间复制消息
  • 处理控制器中的管理任务,如分区领导变化

inter-broker 侦听器可以分配给您选择的端口。当配置了多个监听程序时,您可以在 inter.broker.listener.name 属性中定义 inter-broker 监听程序的名称。

在这里,inter-broker 侦听器被命名为 REPLICATION

listeners=REPLICATION://0.0.0.0:9091
inter.broker.listener.name=REPLICATION
Copy to Clipboard Toggle word wrap

control plane 监听程序

默认情况下,控制器和其他代理之间的通信使用 inter-broker 侦听器。控制器负责协调管理任务,如分区领导变化。

您可以为控制器连接启用专用 control plane 侦听器。control plane 侦听器可以分配给您选择的端口。

要启用 control plane 侦听程序,请使用监听程序名称配置 control.plane.listener.name 属性:

listeners=CONTROLLER://0.0.0.0:9090,REPLICATION://0.0.0.0:9091
...
control.plane.listener.name=CONTROLLER
Copy to Clipboard Toggle word wrap

启用 control plane 侦听程序可能会提高集群性能,因为控制器通信不会因为代理中的数据复制不会延迟。数据复制通过 inter-broker 侦听器继续。

如果没有配置 control.plane.listener,控制器连接将使用 inter-broker 侦听器

6.4.3. 提交日志

Apache Kafka 将其从制作者接收的所有记录存储在提交日志中。提交日志包含 Kafka 需要提供的记录形式的实际数据。这些不是记录代理所执行的操作的应用程序日志文件。

日志目录

您可以使用 log.dirs 属性文件配置日志目录,将提交日志存储在一个或多个日志目录中。它应设置为在安装过程中创建的 /var/lib/kafka 目录:

log.dirs=/var/lib/kafka
Copy to Clipboard Toggle word wrap

出于性能考虑,您可以将 log.dirs 配置为多个目录,并将其每个目录放在不同的物理设备中,以提高磁盘 I/O 性能。例如:

log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3
Copy to Clipboard Toggle word wrap

6.4.4. 代理 ID

代理 ID 是集群中每个代理的唯一标识符。您可以分配一个大于或等于 0 的整数作为代理 ID。代理 ID 用于在重启或崩溃后识别代理,因此 id 稳定且不会随时间变化。代理 ID 在代理属性文件中配置:

broker.id=1
Copy to Clipboard Toggle word wrap

6.4.5. ZooKeeper 身份验证

默认情况下,ZooZ 和 Kafka 之间的连接不会被身份验证。但是,Kafka 和 ZooKeeper 支持 Java 认证和授权服务(JAAS),可用于使用简单身份验证和安全层(SASL)设置身份验证。ZooKeeper 支持在本地存储的凭证中使用 DIGEST-MD5 SASL 机制进行身份验证。

6.4.5.1. JAAS 配置

ZooKeeper 连接的 SASL 身份验证必须在 JAAS 配置文件中配置。默认情况下,Kafka 将使用名为 Client 的 JAAS 上下文连接到 ZooKeeper。Client 上下文应该在 /opt/kafka/config/jass.conf 文件中配置。上下文必须启用 PLAIN SASL 身份验证,如下例所示:

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="123456";
};
Copy to Clipboard Toggle word wrap

6.4.5.2. 启用 ZooKeeper 身份验证

这个流程描述了如何在连接到 ZooKeeper 时使用 SASL DIGEST-MD5 机制启用身份验证。

先决条件

启用 SASL DIGEST-MD5 身份验证

  1. 在所有 Kafka 代理节点上,创建或编辑 /opt/kafka/config/jaas.conf JAAS 配置文件并添加以下上下文:

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="<Username>"
        password="<Password>";
    };
    Copy to Clipboard Toggle word wrap

    用户名和密码应该与 ZooKeeper 中配置相同。

    以下示例显示了 Client 上下文:

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="123456";
    };
    Copy to Clipboard Toggle word wrap
  2. 逐一重启所有 Kafka 代理节点。要将 JAAS 配置传递给 Kafka 代理,请使用 KAFKA_OPTS 环境变量。

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap

    有关在多节点集群中重启代理的详情,请参考 第 4.3 节 “执行 Kafka 代理的安全滚动重启”

6.4.6. 授权

Kafka 代理中的授权使用授权器插件实现。

在本节中,我们描述了如何使用 Kafka 提供的 AclAuthorizer 插件。

或者,您可以使用自己的授权插件。例如,如果您使用 基于 OAuth 2.0 令牌的身份验证,您可以使用 OAuth 2.0 授权

6.4.6.1. 简单 ACL 授权器

Authorizer 插件(包括 AclAuthorizer )通过 authorizer.class.name 属性启用:

authorizer.class.name=kafka.security.auth.AclAuthorizer
Copy to Clipboard Toggle word wrap

所选授权器需要完全限定名称。对于 AclAuthorizer,完全限定名称为 kafka.security.auth.AclAuthorizer

6.4.6.1.1. ACL 规则

AclAuthorizer 使用 ACL 规则来管理 Kafka 代理的访问。

ACL 规则的格式定义:

主体 P 允许/拒绝来自主机 H 的 Kafka 资源 R 中的操作 O

例如,可以设置一个规则以便用户:

John 可以查看 主机 127.0.0.1中的主题 注释

Host 是 John 进行连接的机器的 IP 地址。

在大多数情况下,用户是生成者或消费者应用程序:

Consumer01 可以对来自主机 127.0.0.1 的消费者组账户写入权限。

如果没有 ACL 规则

如果给定资源没有 ACL 规则,则所有操作都会被拒绝。通过在 Kafka 配置文件 /opt/kafka/config/server.properties 中将属性 allow.everyone.if.no.acl.found 设置为 true 来更改此行为。

6.4.6.1.2. 主体

principal(主体)代表用户的身份。ID 格式取决于客户端用来连接到 Kafka 的身份验证机制:

  • 在没有身份验证的情况下连接时,user :ANONYMOUS.
  • 使用简单身份验证机制(如 PLAIN 或 SCRAM)连接时,user:< username >。

    例如 User:admin or User:user1

  • 使用 TLS 客户端身份验证 连接时,user:<DistinguishedName >。

    例如 User:CN=user1,O=MyCompany,L=Prague,C=CZ

  • 使用 Kerberos 连接时,user:<Kerberos username >。

DistinguishedName 是与客户端证书区分的名称。

Kerberos 用户名是 Kerberos 主体的主要部分,在使用 Kerberos 连接时默认使用它。您可以使用 sasl.kerberos.principal.to.local.rules 属性来配置 Kafka 主体如何从 Kerberos 主体构建。

6.4.6.1.3. 用户身份验证

要使用授权,您需要启用身份验证并供您的客户端使用。否则,所有连接都将具有主体 User:ANONYMOUS

有关身份验证方法的更多信息,请参阅加密和身份验证

6.4.6.1.4. 超级用户

超级用户被允许执行所有操作,无论 ACL 规则是什么。

超级用户使用属性 super.users 在 Kafka 配置文件中定义。

例如:

super.users=User:admin,User:operator
Copy to Clipboard Toggle word wrap
6.4.6.1.5. 副本代理身份验证

启用授权后,它将适用于所有监听程序和所有连接。这包括用于在代理间复制数据的 inter-broker 连接。如果启用授权,请确保使用身份验证进行代理连接,并授予代理使用足够权利的用户。例如,如果代理之间的身份验证使用 kafka-broker 用户,则超级用户配置必须包含用户名 super.users=User:kafka-broker

6.4.6.1.6. 支持的资源

您可以将 Kafka ACL 应用到这些资源类型:

  • topics
  • 消费者组
  • 集群
  • TransactionId
  • DelegationToken
6.4.6.1.7. 支持的操作

AclAuthorizer 授权对资源的操作。

下表中带有 X 的字段标记每个资源支持的操作。

Expand
表 6.1. 资源支持的操作
 topics消费者组集群

X

X

 

X

  

创建

  

X

删除

X

  

更改

X

  

Describe

X

X

X

ClusterAction

  

X

All

X

X

X

6.4.6.1.8. ACL 管理选项

ACL 规则使用 bin/kafka-acls.sh 工具进行管理,该工具作为 Kafka 发布软件包的一部分提供。

使用 kafka-acls.sh 参数选项来添加、列出和删除 ACL 规则,并执行其他功能。

参数需要双假设惯例,如 --add

Expand
选项类型描述默认

add

操作

添加 ACL 规则。

 

remove

操作

删除 ACL 规则。

 

list

操作

列出 ACL 规则。

 

authorizer

操作

授权者的完全限定类名称。

kafka.security.auth.AclAuthorizer

authorizer-properties

Configuration

传递给授权器进行初始化的键/值对。

对于 AclAuthorizer,示例值为:zookeeper.connect=zoo1.my-domain.com:2181

 

bootstrap-server

资源

主机/端口对以连接到 Kafka 集群。

使用这个选项或 authorizer 选项,不能同时使用它们。

command-config

资源

要传递给 Admin Client 的配置文件,与 bootstrap-server 参数结合使用。

 

cluster

资源

将集群指定为 ACL 资源。

 

topic

资源

将主题名称指定为 ACL 资源。

一个星号(*),用作通配符转换为 所有主题

单个命令可以指定多个 --topic 选项。

 

group

资源

将消费者组名称指定为 ACL 资源。

单个命令可以指定多个 --group 选项。

 

transactional-id

资源

将事务 ID 指定为 ACL 资源。

事务发送意味着,生产者发送到多个分区的所有消息都必须成功发送或不发送它们。

使用通配符星号 (*) 代表所有 ID

 

delegation-token

资源

将委托令牌指定为 ACL 资源。

使用通配符星号 (*) 代表所有令牌

 

resource-pattern-type

Configuration

指定 add 参数的资源模式,或为 listremove 参数指定资源模式过滤器值。

使用 literalprefixed 作为一个资源名称的资源特征类型。

使用 anymatch 作为资源模式过滤器值,或者特定模式类型过滤器。

literal

allow-principal

主体

添加到允许 ACL 规则的主体。

单个命令可以指定多个 --allow-principal 选项。

 

deny-principal

主体

添加到拒绝 ACL 规则的主体。

单个命令可以指定多个 --deny-principal 选项。

 

主体

主体

list 参数一起使用的主体名称,用于返回主体的 ACL 列表。

单个命令可以指定多个 --principal 选项。

 

allow-host

主机

允许访问 --allow-principal 中列出的主体的 IP 地址。

不支持主机名或 CIDR 范围。

如果指定了 --allow-principal,则默认为 * 代表"所有主机"。

deny-host

主机

拒绝访问 --deny-principal 中列出的主体的 IP 地址。

不支持主机名或 CIDR 范围。

如果指定了 --deny-principal,则默认为 * 代表"所有主机"。

operation

操作

允许或拒绝操作。

单个命令可以指定多个Multiple --operation 选项。

All

producer

快捷键

允许或拒绝消息制作者需要的所有操作的快捷方式(主题中为 WRITE 和 DESCRIBE,集群中为 CREATE)。

 

consumer

快捷键

允许或拒绝消息消费者所需的所有操作的快捷方式(主题为READ 和 DESCRIBE,READ on consumer group)。

 

idempotent

快捷键

--producer 参数一起使用时启用 idempotence 的快捷方式,以便消息精确向分区发送一次。

如果制作者被授权根据特定的事务 ID 发送消息,则会自动启用 Idepmotence。

 

force

快捷键

接受所有查询且不提示的快捷方式。

 

6.4.6.2. 启用授权

此流程描述了如何为 Kafka 代理中的授权启用 AclAuthorizer 插件。

先决条件

流程

  1. 编辑 /opt/kafka/config/server.properties Kafka 配置文件,以使用 AclAuthorizer

    authorizer.class.name=kafka.security.auth.AclAuthorizer
    Copy to Clipboard Toggle word wrap
  2. (重新)启动 Kafka 代理。

6.4.6.3. 添加 ACL 规则

当使用 AclAuthorizer 插件根据访问控制列表(ACL)控制对 Kafka 代理的访问时,您可以使用 kafka-acls.sh 工具添加新的 ACL 规则。

先决条件

流程

  • 使用 --add 选项运行 kafka-acls.sh

    示例:

  • 允许使用 MyConsumerGroup 消费者组从 myTopic 读取 user1user2 访问权限。

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
    Copy to Clipboard Toggle word wrap
  • 拒绝 user1 访问从 IP 地址主机 127.0.0.1 读取 myTopic

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
    Copy to Clipboard Toggle word wrap
  • 添加 user1 作为带有 MyConsumerGroupmyTopic 的消费者。

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
    Copy to Clipboard Toggle word wrap

6.4.6.4. 列出 ACL 规则

当使用 AclAuthorizer 插件根据访问控制列表(ACL)控制对 Kafka 代理的访问时,您可以使用 kafka-acls.sh 工具来列出现有的 ACL 规则。

先决条件

流程

  • 使用 --list 选项运行 kafka-acls.sh

    例如:

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic myTopic
    
    Current ACLs for resource `Topic:myTopic`:
    
    User:user1 has Allow permission for operations: Read from hosts: *
    User:user2 has Allow permission for operations: Read from hosts: *
    User:user2 has Deny permission for operations: Read from hosts: 127.0.0.1
    User:user1 has Allow permission for operations: Describe from hosts: *
    User:user2 has Allow permission for operations: Describe from hosts: *
    User:user2 has Deny permission for operations: Describe from hosts: 127.0.0.1
    Copy to Clipboard Toggle word wrap

6.4.6.5. 删除 ACL 规则

当使用 AclAuthorizer 插件根据访问控制列表(ACL)控制对 Kafka 代理的访问时,您可以使用 kafka-acls.sh 工具删除现有 ACL 规则。

先决条件

流程

  • 使用 --remove 选项运行 kafka-acls.sh

    示例:

  • 删除 ACL,允许 user1user2 使用 MyConsumerGroup 消费者组从 myTopic 读取。

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
    Copy to Clipboard Toggle word wrap
  • 删除 ACL 添加 user1 作为带有 MyConsumerGroupmyTopic 的消费者。

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
    Copy to Clipboard Toggle word wrap
  • 删除 ACL 拒绝 user1 从 IP 地址主机 127.0.0.1 读取 myTopic 的访问。

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
    Copy to Clipboard Toggle word wrap

6.4.7. ZooKeeper 授权

当在 Kafka 和 ZooKeeper 之间启用身份验证时,您可以使用 ZooKeeper 访问控制列表(ACL)规则自动控制对存储在 ZooKeeper 中的 Kafka 元数据的访问。

6.4.7.1. ACL 配置

ZooKeeper ACL 规则的强制由 config/server.properties Kafka 配置文件中的 zookeeper.set.acl 属性控制。

属性默认被禁用,并通过设置为 true 来启用:

zookeeper.set.acl=true
Copy to Clipboard Toggle word wrap

如果启用了 ACL 规则,当在 ZooKeeper 中创建 znode 时,只有创建它的 Kafka 用户才可以修改或删除它。所有其他用户都具有只读访问权限。

Kafka 仅为新创建的 ZooKeeper znodes 设置 ACL 规则。如果只在集群首次启动后启用 ACL,zookeeper-security-migration.sh 工具可以在所有现有 znodes 上设置 ACL。

ZooKeeper 中数据的机密性

ZooKeeper 中存储的数据包括:

  • 主题名称及其配置
  • 当使用 SASL SCRAM 身份验证时,salted 和 hash 用户凭证。

但是 ZooKeeper 不存储使用 Kafka 发送和接收的任何记录。ZooKeeper 中存储的数据被认为是非机密。

如果要将数据视为机密(例如,主题名称包含客户 ID),则唯一可用于保护的选项会隔离网络级别的 ZooKeeper,并只允许访问 Kafka 代理。

6.4.7.2. 为新的 Kafka 集群启用 ZooKeeper ACL

此流程描述了如何在新 Kafka 集群的 Kafka 配置中启用 ZooKeeper ACL。仅在 Kafka 集群首次启动前使用此流程。有关在已在运行的集群中启用 ZooKeeper ACL,请参阅 第 6.4.7.3 节 “在现有 Kafka 集群中启用 ZooKeeper ACL”

先决条件

  • AMQ Streams 安装在 将用作 Kafka 代理的所有主机上。
  • zookeeper 集群 已配置并运行
  • ZooKeeper 中启用了 客户端到服务器身份验证。
  • 在 Kafka 代理中,ZooKeeper 身份验证被启用
  • Kafka 代理还没有启动。

流程

  1. 编辑 /opt/kafka/config/server.properties Kafka 配置文件,在所有集群节点上将 zookeeper.set.acl 字段设置为 true

    zookeeper.set.acl=true
    Copy to Clipboard Toggle word wrap
  2. 启动 Kafka 代理。

6.4.7.3. 在现有 Kafka 集群中启用 ZooKeeper ACL

此流程描述了如何在 Kafka 配置中为正在运行的 Kafka 集群启用 ZooKeeper ACL。使用 zookeeper-security-migration.sh 工具,在所有存在的 znodes 中设置 ZooKeeper ACL。zookeeper-security-migration.sh 可作为 AMQ Streams 的一部分,并可在 bin 目录中找到。

先决条件

启用 ZooKeeper ACL

  1. 编辑 /opt/kafka/config/server.properties Kafka 配置文件,在所有集群节点上将 zookeeper.set.acl 字段设置为 true

    zookeeper.set.acl=true
    Copy to Clipboard Toggle word wrap
  2. 逐一重启所有 Kafka 代理。

    有关在多节点集群中重启代理的详情,请参考 第 4.3 节 “执行 Kafka 代理的安全滚动重启”

  3. 使用 zookeeper-security-migration.sh 工具在所有现有 ZooKeeper znodes 中设置 ACL。

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=<ZooKeeperURL>
    exit
    Copy to Clipboard Toggle word wrap

    例如:

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181
    exit
    Copy to Clipboard Toggle word wrap

6.4.8. 加密和身份验证

AMQ Streams 支持加密和身份验证,它们被配置为监听程序配置的一部分。

6.4.8.1. 侦听器配置

Kafka 代理中的加密和验证会针对每个监听程序进行配置。有关 Kafka 侦听器配置的更多信息,请参阅 第 6.4.2 节 “监听器”

Kafka 代理中的每个监听程序都使用自己的安全协议配置。配置属性 listener.security.protocol.map 定义哪个监听程序使用哪个安全协议。它将每个侦听器名称映射到其安全协议。支持的安全协议有:

PLAINTEXT
无加密或身份验证的监听程序。
SSL
使用 TLS 加密(可选)使用 TLS 客户端证书进行身份验证的监听程序。
SASL_PLAINTEXT
没有加密的监听程序,但使用基于 SASL 的身份验证。
SASL_SSL
带有基于 TLS 的加密和基于 SASL 的验证的监听程序。

根据以下 监听程序配置

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
Copy to Clipboard Toggle word wrap

listener.security.protocol.map 可能如下所示:

listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL
Copy to Clipboard Toggle word wrap

这会将侦听器 INT1 配置为使用带有 SASL 身份验证的未加密的连接,侦听器 INT2 使用 SASL 身份验证的加密连接,以及 REPLICATION 接口以使用 TLS 加密(可能与 TLS 客户端身份验证一起使用)。相同的安全协议可以多次使用。以下示例也是有效的配置:

listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
Copy to Clipboard Toggle word wrap

这样的配置会将 TLS 加密和 TLS 身份验证用于所有接口。以下章节将更详细地说明如何配置 TLS 和 SASL。

6.4.8.2. TLS 加密

Kafka 支持 TLS 来加密与 Kafka 客户端的通信。

要使用 TLS 加密和服务器身份验证,必须提供包含私钥和公钥的密钥存储。这通常使用 Java Keystore (JKS)格式的文件来完成。此文件的路径在 ssl.keystore.location 属性中设置。ssl.keystore.password 属性应该用于设置保护密钥存储的密码。例如:

ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456
Copy to Clipboard Toggle word wrap

在某些情况下,使用额外的密码来保护私钥。可以使用 ssl.key.password 属性设置此类密码。

Kafka 可以使用由证书颁发机构和自签名密钥签名的密钥。使用由证书颁发机构签名的密钥应始终是首选的方法。为了允许客户端验证其正在连接的 Kafka 代理的身份,证书应始终包含公告的主机名 (CN) 或 Subject Alternative Name(SAN)。

可以将不同的 SSL 配置用于不同的监听程序。所有以 ssl 开头的选项都可以加一个 listener.name.<NameOfTheListener>. 前缀,其中的监听程序的名称必须始终为小写。这将覆盖该特定监听器的默认 SSL 配置。以下示例演示了如何为不同的监听程序使用不同的 SSL 配置:

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL

# Default configuration - will be used for listeners INT1 and INT2
ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456

# Different configuration for listener REPLICATION
listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks
listener.name.replication.ssl.keystore.password=123456
Copy to Clipboard Toggle word wrap

其他 TLS 配置选项

除了上面描述的主要 TLS 配置选项外,Kafka 还支持很多选项来微调 TLS 配置。例如,启用或禁用 TLS / SSL 协议或密码套件:

ssl.cipher.suites
启用的密码套件列表。每个密码套件都是用于 TLS 连接的身份验证、加密、MAC 和密钥交换算法的组合。默认情况下启用所有可用的密码套件。
ssl.enabled.protocols
已启用的 TLS/SSL 协议列表。默认为 TLSv1.2,TLSv1.1,TLSv1

6.4.8.3. 启用 TLS 加密

这个步骤描述了如何在 Kafka 代理中启用加密。

先决条件

  • AMQ Streams 安装在 将用作 Kafka 代理的所有主机上。

流程

  1. 为集群中的所有 Kafka 代理生成 TLS 证书。证书应该在其 Common Name 或 Subject Alternative Name 中有其公告和 bootstrap 地址。
  2. 编辑所有集群节点上的 /opt/kafka/config/server.properties Kafka 配置文件:

    • 更改 listener.security.protocol.map 字段,为您要使用 TLS 加密的监听程序指定 SSL 协议。
    • 使用代理证书将 ssl.keystore.location 选项设置为 JKS 密钥存储的路径。
    • ssl.keystore.password 选项设置为用来保护密钥存储的密码。

      例如:

      listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094
      listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT
      ssl.keystore.location=/path/to/keystore/server-1.jks
      ssl.keystore.password=123456
      Copy to Clipboard Toggle word wrap
  3. (重新)启动 Kafka 代理

6.4.8.4. 身份验证

要进行身份验证,您可以使用:

6.4.8.4.1. TLS 客户端身份验证

TLS 客户端身份验证只能用于已经使用 TLS 加密的连接。要使用 TLS 客户端身份验证,可以为代理提供带有公钥的信任存储。这些密钥可用于验证连接到代理的客户端。truststore 应以 Java Keystore (JKS)格式提供,且应包含证书颁发机构的公钥。所有由信任存储中包含的证书颁发机构签名的公钥和私钥的客户端都将进行身份验证。信任存储的位置使用字段 ssl.truststore.location 设置。如果信任存储受密码保护,则密码应在 ssl.truststore.password 属性中设置。例如:

ssl.truststore.location=/path/to/keystore/server-1.jks
ssl.truststore.password=123456
Copy to Clipboard Toggle word wrap

配置信任存储后,必须使用 ssl.client.auth 属性启用 TLS 客户端身份验证。此属性可以设置为三个不同的值之一:

none
TLS 客户端身份验证已关闭。(默认值)
requested
TLS 客户端身份验证是可选的。系统将要求客户端使用 TLS 客户端证书进行身份验证,但不能选择。
required
客户端需要使用 TLS 客户端证书进行身份验证。

当客户端使用 TLS 客户端身份验证进行身份验证时,经过身份验证的主体名称是与经过身份验证的客户端证书区分名称。例如,具有可分辨名称 CN=someuser 的证书的用户将使用以下主体 CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown,C=Unknown 进行验证。如果没有使用 TLS 客户端身份验证,并且禁用 SASL,则主体名称为 ANONYMOUS

6.4.8.4.2. SASL 身份验证

SASL 身份验证是使用 Java 身份验证和授权服务(JAAS)配置的。JAAS 也用于对 Kafka 和 ZooKeeper 之间的连接进行身份验证。JAAS 使用自己的配置文件。此文件的建议位置为 /opt/kafka/config/jaas.conf。该文件必须由 kafka 用户读取。在运行 Kafka 时,使用 Java 系统属性 java.security.auth.login.config 来指定此文件的位置。启动代理节点时,必须将此属性传递给 Kafka:

KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh
Copy to Clipboard Toggle word wrap

通过普通未加密的连接以及 TLS 连接支持 SASL 身份验证。可以为每个监听程序单独启用 SASL。要启用它,listener.security.protocol.map 中的安全协议必须是 SASL_PLAINTEXTSASL_SSL

Kafka 中的 SASL 身份验证支持多个不同的机制:

PLAIN
根据用户名和密码实施身份验证。用户名和密码存储在 Kafka 配置中。
SCRAM-SHA-256SCRAM-SHA-512
使用 Salted Challenge Response Authentication Mechanism (SCRAM) 实现验证。SCRAM 凭证集中存储在 ZooKeeper 中。当 ZooKeeper 集群节点在私有网络中隔离时,可以使用 SCRAM。
GSSAPI
针对 Kerberos 服务器实现身份验证。
警告

PLAIN 机制通过网络以未加密的格式发送用户名和密码。因此,它应该只与 TLS 加密结合使用。

SASL 机制通过 JAAS 配置文件配置。Kafka 使用名为 KafkaServer 的 JAAS 上下文。在 JAAS 中配置后,必须在 Kafka 配置中启用 SASL 机制。这使用 sasl.enabled.mechanisms 属性完成。此属性包含以逗号分隔的启用机制列表:

sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
Copy to Clipboard Toggle word wrap

如果用于 Inter-broker 通信的监听程序使用 SASL,则必须使用属性 sasl.mechanism.inter.broker.protocol 来指定它应使用的 SASL 机制。例如:

sasl.mechanism.inter.broker.protocol=PLAIN
Copy to Clipboard Toggle word wrap

将用于 openshift-broker 通信的用户名和密码,该通信必须使用字段 usernamepasswordKafkaServer JAAS 上下文中指定。

SASL PLAIN

要使用 PLAIN 机制,允许在 JAAS 上下文中指定允许连接的用户名和密码。以下示例显示了为 SASL PLAIN 身份验证配置的上下文。这个示例配置三个不同的用户:

  • admin
  • user1
  • user2
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";
};
Copy to Clipboard Toggle word wrap

与用户数据库的 JAAS 配置文件应保持在所有 Kafka 代理上同步。

当 SASL PLAIN 也用于内部代理身份验证时,usernamepassword 属性应包含在 JAAS 上下文中:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456"
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";
};
Copy to Clipboard Toggle word wrap

SASL SCRAM

Kafka 中的 SCRAM 身份验证由两个机制组成: SCRAM-SHA-256SCRAM-SHA-512。这些机制仅在使用的哈希算法中有所不同 - SHA-256 与更强大的 SHA-512。要启用 SCRAM 身份验证,HMQ 配置文件必须包括以下配置:

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required;
};
Copy to Clipboard Toggle word wrap

当在 Kafka 配置文件中启用 SASL 身份验证时,可以列出这两个 SCRAM 机制。但是,对于代理间通信,只能选择其中一个。例如:

sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
Copy to Clipboard Toggle word wrap

SCRAM 机制的用户凭证存储在 ZooKeeper 中。kafka-configs.sh 工具可用于管理它们。例如,运行以下命令添加用户 user1 和密码 123456 :

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
Copy to Clipboard Toggle word wrap

要删除用户凭证,请使用:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
Copy to Clipboard Toggle word wrap

SASL GSSAPI

使用 Kerberos 进行身份验证的 SASL 机制称为 GSSAPI。要配置 Kerberos SASL 身份验证,应将以下配置添加到 JAAS 配置文件中:

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};
Copy to Clipboard Toggle word wrap

Kerberos 主体中的域名必须始终为大写。

除了 JAAS 配置外,还需要在 Kafka 配置中的 sasl.kerberos.service.name 属性中指定 Kerberos 服务名称:

sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
Copy to Clipboard Toggle word wrap

多个 SASL 机制

Kafka 可以同时使用多个 SASL 机制。不同的 JAAS 配置都可以添加到同一上下文中:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";

    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

    org.apache.kafka.common.security.scram.ScramLoginModule required;
};
Copy to Clipboard Toggle word wrap

启用多个机制时,客户端将能够选择想要使用的机制。

6.4.8.5. 启用 TLS 客户端身份验证

此流程描述了如何在 Kafka 代理中启用 TLS 客户端身份验证。

先决条件

  • AMQ Streams 安装在 将用作 Kafka 代理的所有主机上。
  • 启用 TLS 加密。

流程

  1. 准备一个 JKS 信任存储,其中包含用于签署用户证书的证书颁发机构的公钥。
  2. 编辑所有集群节点上的 /opt/kafka/config/server.properties Kafka 配置文件:

    • 使用用户证书的证书颁发机构,将 ssl.truststore.location 选项设置为 JKS 信任存储的路径。
    • ssl.truststore.password 选项设置为用来保护信任存储的密码。
    • ssl.client.auth 选项设置为 required

      例如:

      ssl.truststore.location=/path/to/truststore.jks
      ssl.truststore.password=123456
      ssl.client.auth=required
      Copy to Clipboard Toggle word wrap
  3. (重新)启动 Kafka 代理

6.4.8.6. 启用 SASL PLAIN 身份验证

这个步骤描述了如何在 Kafka 代理中启用 SASL PLAIN 身份验证。

先决条件

  • AMQ Streams 安装在 将用作 Kafka 代理的所有主机上。

流程

  1. 编辑或创建 /opt/kafka/config/jaas.conf JAAS 配置文件。此文件应包含您的所有用户及其密码。确保该文件在所有 Kafka 代理中都是相同的。

    例如:

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        user_admin="123456"
        user_user1="123456"
        user_user2="123456";
    };
    Copy to Clipboard Toggle word wrap
  2. 编辑所有集群节点上的 /opt/kafka/config/server.properties Kafka 配置文件:

    • 更改 listener.security.protocol.map 字段,为您要使用 SASL PLAIN 身份验证的监听程序指定 SASL_PLAINTEXTSASL_SSL 协议。
    • sasl.enabled.mechanisms 选项设置为 PLAIN

      例如:

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=PLAIN
      Copy to Clipboard Toggle word wrap
  3. 使用 KAFKA_OPTS 环境变量(重新)启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理。

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap

6.4.8.7. 启用 SASL SCRAM 身份验证

此流程描述了如何在 Kafka 代理中启用 SASL SCRAM 身份验证。

先决条件

  • AMQ Streams 安装在 将用作 Kafka 代理的所有主机上。

流程

  1. 编辑或创建 /opt/kafka/config/jaas.conf JAAS 配置文件。为 KafkaServer 上下文启用 ScramLoginModule。确保该文件在所有 Kafka 代理中都是相同的。

    例如:

    KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required;
    };
    Copy to Clipboard Toggle word wrap
  2. 编辑所有集群节点上的 /opt/kafka/config/server.properties Kafka 配置文件:

    • 更改 listener.security.protocol.map 字段,为您要使用 SASL SCRAM 身份验证的监听程序指定 SASL_PLAINTEXTSASL_SSL 协议。
    • sasl.enabled.mechanisms 选项设置为 SCRAM-SHA-256SCRAM-SHA-512

      例如:

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=SCRAM-SHA-512
      Copy to Clipboard Toggle word wrap
  3. 使用 KAFKA_OPTS 环境变量(重新)启动 Kafka 代理,将 JAAS 配置传递给 Kafka 代理。

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap

6.4.8.8. 添加 SASL SCRAM 用户

这个步骤描述了如何使用 SASL SCRAM 为身份验证添加新用户。

先决条件

  • AMQ Streams 安装在 将用作 Kafka 代理的所有主机上。
  • 启用 SASL SCRAM 身份验证。

流程

  • 使用 kafka-configs.sh 工具添加新的 SASL SCRAM 用户。

    bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>
    Copy to Clipboard Toggle word wrap

    例如:

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
    Copy to Clipboard Toggle word wrap

6.4.8.9. 删除 SASL SCRAM 用户

这个流程描述了如何在使用 SASL SCRAM 身份验证时删除用户。

先决条件

  • AMQ Streams 安装在 将用作 Kafka 代理的所有主机上。
  • 启用 SASL SCRAM 身份验证。

流程

  • 使用 kafka-configs.sh 工具删除 SASL SCRAM 用户。

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>
    Copy to Clipboard Toggle word wrap

    例如:

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
    Copy to Clipboard Toggle word wrap

6.4.9. 使用基于 OAuth 2.0 令牌的身份验证

AMQ Streams 支持使用 OAUTHBEARERPLAIN 机制使用 OAuth 2.0 身份验证

OAuth 2.0 启用应用程序之间的基于令牌的标准化身份验证和授权,使用中央授权服务器签发对资源有限访问权限的令牌。

您可以配置 OAuth 2.0 身份验证,然后配置 OAuth 2.0 授权

Kafka 代理和客户端都需要配置为使用 OAuth 2.0。OAuth 2.0 身份验证也可以与基于 simple 或 OPA 的 Kafka 授权一起使用。

使用 OAuth 2.0 身份验证时,应用程序客户端可以访问应用服务器(称为 资源服务器)上的资源,而无需公开帐户凭据。

应用程序客户端通过访问令牌作为身份验证方法传递,应用服务器也可以用来决定要授予的访问权限级别。授权服务器处理访问权限的授予和查询有关访问权限的查询。

在 AMQ Streams 的上下文中:

  • Kafka 代理作为 OAuth 2.0 资源服务器
  • Kafka 客户端充当 OAuth 2.0 应用程序客户端

Kafka 客户端在 Kafka 代理验证。代理和客户端根据需要与 OAuth 2.0 授权服务器通信,以获取或验证访问令牌。

对于 AMQ Streams 的部署,OAuth 2.0 集成提供:

  • Kafka 代理的服务器端 OAuth 2.0 支持
  • 对 Kafka MirrorMaker、Kafka Connect 和 Kafka Bridge 的客户端 OAuth 2.0 支持

RHEL 上的 AMQ Streams 包括两个 OAuth 2.0 库:

kafka-oauth-client
提供名为 io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 的自定义登录回调处理器类。要处理 OAUTHBEARER 身份验证机制,请使用 Apache Kafka 提供的 OAuthBearerLoginModule 的登录回调处理器。
kafka-oauth-common
提供 kafka-oauth-client 库所需的一些功能的帮助程序库。

提供的客户端库还依赖于一些额外的第三方库,例如: keycloak-corejackson-databindslf4j-api

我们建议使用 Maven 项目来打包您的客户端,以确保包含所有依赖项库。依赖项库可能会在以后的版本中有所变化。

6.4.9.1. OAuth 2.0 身份验证机制

AMQ Streams 支持 OAUTHBEARER 和 PLAIN 机制进行 OAuth 2.0 身份验证。这两种机制都允许 Kafka 客户端与 Kafka 代理建立经过身份验证的会话。客户端、授权服务器和 Kafka 代理之间的身份验证流因每种机制而异。

我们建议您将客户端配置为尽可能使用 OAUTHBEARER。OAUTHBEARER 提供比 PLAIN 更高的安全性,因为客户端凭证不会与 Kafka 代理共享。考虑仅在不支持 OAUTHBEARER 的 Kafka 客户端中使用 PLAIN。

您可以将 Kafka 代理监听程序配置为使用 OAuth 2.0 身份验证来连接客户端。如果需要,您可以在同一 oauth 侦听器上使用 OAUTHBEARER 和 PLAIN 机制。支持每个机制的属性必须在 oauth 侦听器配置中明确指定。

OAUTHBEARER 概述

要使用 OAUTHBEARER,请将 Kafka 代理的 OAuth 身份验证监听程序配置中的 sasl.enabled.mechanisms 设置为 OAUTHBEARER。有关详细配置,请参阅 第 6.4.9.2 节 “OAuth 2.0 Kafka 代理配置”

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
Copy to Clipboard Toggle word wrap

许多 Kafka 客户端工具使用在协议级别为 OAUTHBEARER 提供基本支持的库。为了支持应用程序开发,AMQ Streams 为上游 Kafka Client Java 库(但不适用于其他库)提供了一个 OAuth 回调处理器。因此,您不需要自行编写回调处理程序。应用客户端可以使用回调处理程序来提供访问令牌。使用 Go 等其他语言编写的客户端必须使用自定义代码连接到授权服务器并获取访问令牌。

使用 OAUTHBEARER 时,客户端发起带有 Kafka 代理进行凭证交换的会话,其中凭证采用由回调处理器提供的 bearer 令牌的形式。使用回调,您可以以三种方式之一配置令牌置备:

  • 客户端 ID 和 Secret (使用 OAuth 2.0 客户端凭证 机制)
  • 一个长期的访问令牌,在配置时手动获取
  • 一个长期的刷新令牌,在配置时手动获取
注意

OAUTHBEARER 身份验证只能由支持协议级别的 OAUTHBEARER 机制的 Kafka 客户端使用。

PLAIN 概述

要使用 PLAIN,请将 PLAIN 添加到 sasl.enabled.mechanisms 的值。

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
Copy to Clipboard Toggle word wrap

PLAIN 是所有 Kafka 客户端工具使用的简单身份验证机制。要启用 PLAIN 以用于 OAuth 2.0 身份验证,AMQ Streams 提供了 OAuth 2.0 over PLAIN 服务器端的回调。

使用 PLAIN 的 AMQ Streams 实现,客户端凭证不会存储在 ZooKeeper 中。相反,客户端凭证会在兼容授权服务器后进行集中处理,这与使用 OAUTHBEARER 身份验证类似。

当与 OAuth 2.0 over PLAIN 回调一起使用时,Kafka 客户端使用以下方法之一与 Kafka 代理进行身份验证:

  • 客户端 ID 和 secret (使用 OAuth 2.0 客户端凭证 机制)
  • 一个长期的访问令牌,在配置时手动获取

对于这两种方法,客户端必须提供 PLAIN usernamepassword 属性,将凭证传递给 Kafka 代理。客户端使用这些属性传递客户端 ID 和 secret 或用户名和访问令牌。

客户端 ID 和 secret 用于获取访问令牌。

访问令牌作为 password 属性值传递。您可以使用或没有 $accessToken: 前缀来传递访问令牌。

  • 如果您在监听器配置中配置令牌端点(oauth.token.endpoint.uri),则需要前缀。
  • 如果您没有在监听器配置中配置令牌端点(oauth.token.endpoint.uri),则不需要前缀。Kafka 代理将密码解释为原始访问令牌。

如果将密码设置为访问令牌,则必须将用户名设置为 Kafka 代理从访问令牌获取的相同的主体名称。您可以使用 oauth.username.claim, oauth.fallback.username.claim, oauth.fallback.username.prefix, 和 oauth.userinfo.endpoint.uri 属性在监听器中指定用户名提取选项。用户名提取过程还取决于您的授权服务器;特别是,它将客户端 ID 映射到帐户名称。

注意

PLAIN 上的 OAuth 不支持使用(已弃用)OAuth 2.0 密码授权机制传递用户名和密码(密码授权)。

6.4.9.1.1. 使用属性或变量配置 OAuth 2.0

您可以使用 Java Authentication and Authorization Service(JAAS)属性或环境变量来配置 OAuth 2.0 设置。

  • JAAS 属性在 server.properties 配置文件中配置,并传递为 listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config 属性的键值对。
  • 如果使用环境变量,您仍然需要在 server.properties 文件中提供 listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config 属性,但您可以忽略其他 JAAS 属性。

    您可以使用大写或大写环境变量命名约定。

AMQ Streams OAuth 2.0 库使用以以下内容开头的属性:

6.4.9.2. OAuth 2.0 Kafka 代理配置

用于 OAuth 2.0 身份验证的 Kafka 代理配置涉及:

  • 在授权服务器中创建 OAuth 2.0 客户端
  • 在 Kafka 集群中配置 OAuth 2.0 身份验证
注意

与授权服务器的关系,Kafka 代理和 Kafka 客户端都被视为 OAuth 2.0 客户端。

6.4.9.2.1. 授权服务器上的 OAuth 2.0 客户端配置

要配置 Kafka 代理以验证会话启动期间收到的令牌,建议的做法是在授权服务器中创建一个 OAuth 2.0 client 定义(配置为 confidential),并启用了以下客户端凭证:

  • 客户端 ID kafka-broker (例如)
  • 客户端 ID 和 secret 作为身份验证机制
注意

在使用授权服务器的非公共内省端点时,您只需要使用客户端 ID 和 secret。在使用公共授权服务器端点时,通常不需要凭据,如快速本地 JWT 令牌验证一样。

6.4.9.2.2. Kafka 集群中的 OAuth 2.0 身份验证配置

要在 Kafka 集群中使用 OAuth 2.0 身份验证,您可以在 Kafka server.properties 文件中为 Kafka 集群启用 OAuth 身份验证监听程序配置。至少需要配置。您还可以配置 TLS 侦听器,其中 TLS 用于代理间通信。

您可以使用以下方法之一为授权服务器配置代理以进行令牌验证:

  • 快速本地令牌验证: JWKS 端点与签名的 JWT 格式的访问令牌相结合
  • 内省端点

您可以配置 OAUTHBEARER 或 PLAIN 身份验证,或两者。

以下示例显示了应用 全局 监听器配置的最低配置,这意味着,内部代理间通信通过与应用程序客户端相同的监听程序进行。

这个示例还显示了一个特定监听程序的 OAuth 2.0 配置,在其中您可以指定 listener.name.LISTENER-NAME.sasl.enabled.mechanisms 而不是 sasl.enabled.mechanismsLISTENER-NAME 是监听器的区分大小写的名称。在这里,我们将侦听器的 CLIENT 命名为 listener.name.client.sasl.enabled.mechanisms

这个示例使用 OAUTHBEARER 身份验证。

示例:使用 JWKS 端点进行 OAuth 2.0 身份验证的最小监听程序配置

sasl.enabled.mechanisms=OAUTHBEARER 
1

listeners=CLIENT://0.0.0.0:9092 
2

listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 
3

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 
4

sasl.mechanism.inter.broker.protocol=OAUTHBEARER 
5

inter.broker.listener.name=CLIENT 
6

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 
7

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 
8

  oauth.valid.issuer.uri="https://<oauth_server_address>" \ 
9

  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 
10

  oauth.username.claim="preferred_username"  \ 
11

  oauth.client.id="kafka-broker" \ 
12

  oauth.client.secret="kafka-secret" \ 
13

  oauth.token.endpoint.uri="https://<oauth_server_address>/token" ; 
14

listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 
15

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 
16
Copy to Clipboard Toggle word wrap

1
为通过 SASL 进行凭证交换启用 OAUTHBEARER 机制。
2
配置要连接的客户端应用程序的监听程序。系统的 hostname 用作公告的主机名,客户端必须可以解析该主机名才能重新连接。本例中,侦听器名为 CLIENT
3
指定监听器的频道协议。SASL_SSL 用于 TLS。SASL_PLAINTEXT 用于未加密的连接(无 TLS),但存在丢失和截获 TCP 连接层的风险。
4
CLIENT 侦听器指定 OAUTHBEARER 机制。客户端名称(CLIENT)通常在 listeners 属性中使用大写指定,对于 listener.name 属性是小写 (listener.name.client),当为 listener.name.client.* 属性的一部分时是小写。
5
指定用于代理间通信的 OAUTHBEARER 机制。
6
指定用于代理间通信的监听程序。配置需要有效的规格。
7
在客户端监听器上配置 OAuth 2.0 身份验证。
8
配置客户端和 inter-broker 通信的身份验证设置。oauth.client.idoauth.client.secretauth.token.endpoint.uri 属性与 inter-broker 配置相关。
9
有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
10
JWKS 端点 URL。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
11
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。如果需要,您可以使用 JsonPath 表达式,如 "['user.info'].['user.id']",从令牌中的嵌套 JSON 属性检索用户名。
12
Kafka 代理的客户端 ID,适用于所有代理。这是在 授权服务器注册为 kafka-broker的客户端
13
Kafka 代理的 secret,适用于所有代理。
14
到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 https:// urls。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
15
为代理间通信启用(且只需要)OAuth 2.0 身份验证。
16
(可选)当令牌过期时强制会话到期,并激活 Kafka 重新验证机制。如果指定的值小于访问令牌保留的时间,则客户端必须在实际令牌到期前重新验证。默认情况下,当访问令牌过期时,会话不会过期,客户端也不会尝试重新身份验证。

以下示例显示了 TLS 侦听器的最小配置,其中 TLS 用于代理间通信。

示例:用于 OAuth 2.0 身份验证的 TLS 侦听器配置

listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 
1

listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 
2

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=<keystore_password> 
3

listener.name.replication.ssl.truststore.password=<truststore_password>
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 
4

listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 
5

listener.name.replication.ssl.keystore.location=<path_to_keystore> 
6

listener.name.replication.ssl.truststore.location=<path_to_truststore> 
7

listener.name.replication.ssl.client.auth=required 
8

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 
9

  oauth.valid.issuer.uri="https://<oauth_server_address>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \
  oauth.username.claim="preferred_username" ;
Copy to Clipboard Toggle word wrap

1
相互代理通信和客户端应用程序需要单独的配置。
2
REPLICATION 侦听器配置为使用 TLS,并将 CLIENT 侦听器配置为通过未加密的通道使用 SASL。客户端可能会在生产环境中使用加密的频道(SASL_SSL)。
3
ssl. 属性定义 TLS 配置。
4
随机数生成器实施。如果没有设置,则使用 Java platform SDK 默认。
5
主机名验证。如果设置为空字符串,则会关闭主机名验证。如果没有设置,则默认值为 HTTPS,它会强制对服务器证书进行主机名验证。
6
侦听器的密钥存储的路径。
7
侦听器的信任存储的路径。
8
指定 REPLICATION 侦听器的客户端必须在建立 TLS 连接时与客户端证书进行身份验证(用于代理连接)。
9
为 OAuth 2.0 配置 CLIENT 侦听器。与授权服务器的连接应使用安全 HTTPS 连接。

以下示例显示了使用 PLAIN 身份验证机制通过 SASL 进行凭证交换的 OAuth 2.0 身份验证的最低配置。使用快速的本地令牌验证。

示例:用于 PLAIN 验证的最小监听程序配置

listeners=CLIENT://0.0.0.0:9092 
1

listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 
2

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN 
3

sasl.mechanism.inter.broker.protocol=OAUTHBEARER 
4

inter.broker.listener.name=CLIENT 
5

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 
6

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 
7

  oauth.valid.issuer.uri="http://<auth_server>/auth/realms/<realm>" \ 
8

  oauth.jwks.endpoint.uri="https://<auth_server>/auth/realms/<realm>/protocol/openid-connect/certs" \ 
9

  oauth.username.claim="preferred_username"  \ 
10

  oauth.client.id="kafka-broker" \ 
11

  oauth.client.secret="kafka-secret" \ 
12

  oauth.token.endpoint.uri="https://<oauth_server_address>/token" ; 
13

listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 
14

listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 
15

listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 
16

  oauth.valid.issuer.uri="https://<oauth_server_address>" \ 
17

  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 
18

  oauth.username.claim="preferred_username"  \ 
19

  oauth.token.endpoint.uri="http://<auth_server>/auth/realms/<realm>/protocol/openid-connect/token" ; 
20

connections.max.reauth.ms=3600000 
21
Copy to Clipboard Toggle word wrap

1
为要连接的客户端应用程序配置监听程序(本例中为 CLIENT )。系统的 hostname 用作公告的主机名,客户端必须可以解析该主机名才能重新连接。由于这是唯一配置的监听程序,它也用于代理间通信。
2
将示例 CLIENT 侦听器配置为通过未加密的频道使用 SASL。在生产环境中,客户端应使用加密频道(SASL_SSL)来保护对 TCP 连接层进行窃取和截获。
3
为通过 SASL 和 OAUTHBEARER 进行凭证交换启用 PLAIN 身份验证机制。OAUTHBEARER 也被指定,因为代理间通信需要它。Kafka 客户端可以选择使用哪些机制进行连接。
4
为代理间通信指定 OAUTHBEARER 身份验证机制。
5
为内部代理通信指定监听器(本例中为 CLIENT)。配置需要有效。
6
为 OAUTHBEARER 机制配置服务器回调处理程序。
7
使用 OAUTHBEARER 机制为客户端和内部代理通信配置身份验证设置。oauth.client.idoauth.client.secretoauth.token.endpoint.uri 属性与 inter-broker 配置相关。
8
有效的签发者 URI。只有来自此签发者的访问令牌才被接受。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
9
JWKS 端点 URL。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
10
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。如果需要,您可以使用 JsonPath 表达式,如 "['user.info'].['user.id']",从令牌中的嵌套 JSON 属性检索用户名。
11
Kafka 代理的客户端 ID,适用于所有代理。这是在 授权服务器注册为 kafka-broker的客户端
12
Kafka 代理的 secret (所有代理都相同)。
13
到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 https:// urls。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
14
为代理间通信启用 OAuth 2.0 身份验证。
15
PLAIN 身份验证配置服务器回调处理程序。
16
使用 PLAIN 身份验证为客户端通信配置身份验证设置。

oauth.token.endpoint.uri 是一个可选属性,它使用 OAuth 2.0 客户端凭证机制启用 OAuth 2.0 over PLAIN。

17
有效的签发者 URI。只有来自此签发者的访问令牌才被接受。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
18
JWKS 端点 URL。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
19
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。如果需要,您可以使用 JsonPath 表达式,如 "['user.info'].['user.id']",从令牌中的嵌套 JSON 属性检索用户名。
20
到您的授权服务器的 OAuth 2.0 令牌端点 URL。PLAIN 机制的其他配置。如果指定,客户端可以通过在使用 $accessToken: 前缀将访问令牌 作为密码 传递来通过 PLAIN 进行身份验证。

对于生产环境,请始终使用 https:// urls。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token

21
(可选)当令牌过期时强制会话到期,并激活 Kafka 重新验证机制。如果指定的值小于访问令牌保留的时间,则客户端必须在实际令牌到期前重新验证。默认情况下,当访问令牌过期时,会话不会过期,客户端也不会尝试重新身份验证。
6.4.9.2.3. 快速的本地 JWT 令牌验证配置

快速本地 JWT 令牌验证会在本地检查 JWT 令牌签名。

本地检查可确保令牌:

  • 使用一个访问令牌的 Bearer 的(typ)声明值符合类型
  • 有效(未过期)
  • 具有与 validIssuerURI 匹配的签发者

在配置监听程序时 指定有效的签发者 URI,以便任何未由授权服务器发布的令牌都被拒绝。

授权服务器不需要在快速本地 JWT 令牌验证过程中联系。您可以通过指定由 OAuth 2.0 授权服务器公开的 JWKs 端点 URI 来激活快速本地 JWT 令牌验证。端点包含验证已签名的 JWT 令牌的公钥,这些令牌由 Kafka 客户端作为凭证发送。

注意

所有与授权服务器通信都应使用 HTTPS 执行。

对于 TLS 侦听器,您可以配置证书信任存储并指向信任存储文件。

快速本地 JWT 令牌验证的属性示例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://<oauth_server_address>" \ 
1

  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 
2

  oauth.jwks.refresh.seconds="300" \ 
3

  oauth.jwks.refresh.min.pause.seconds="1" \ 
4

  oauth.jwks.expiry.seconds="360" \ 
5

  oauth.username.claim="preferred_username" \ 
6

  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 
7

  oauth.ssl.truststore.password="<truststore_password>" \ 
8

  oauth.ssl.truststore.type="PKCS12" ; 
9
Copy to Clipboard Toggle word wrap

1
有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。例如 :https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
2
JWKS 端点 URL。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
3
端点刷新之间的周期(默认为 300)。
4
连续尝试刷新 JWKS 公钥之间的最小暂停(以秒为单位)。当遇到未知签名密钥时,JWKS 密钥刷新在常规定期调度后调度,且至少在最后一次刷新尝试后出现指定暂停。刷新密钥遵循 exponential backoff 规则,重试不成功刷新,且持续增加暂停,直到它到达 oauth.jwks.refresh.seconds。默认值为 1。
5
JWKs 证书在证书过期前被视为有效。默认为 360 秒。如果您指定了较长的时间,请考虑允许访问撤销的证书的风险。
6
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。如果需要,您可以使用 JsonPath 表达式,如 "['user.info'].['user.id']",从令牌中的嵌套 JSON 属性检索用户名。
7
TLS 配置中使用的信任存储的位置。
8
访问信任存储的密码。
9
PKCS #12 格式的 truststore 类型。
6.4.9.2.4. OAuth 2.0 内省端点配置

使用 OAuth 2.0 内省端点进行令牌验证会将接收的访问令牌视为不透明。Kafka 代理向内省端点发送访问令牌,该端点使用验证所需的令牌信息做出响应。最重要的是,如果特定访问令牌有效,它会返回最新的信息,以及令牌何时过期的信息。

要配置基于 OAuth 2.0 内省的验证,您可以指定一个 内省端点 URI,而不是为快速本地 JWT 令牌验证指定的 JWKs 端点 URI。根据授权服务器,通常必须指定 client IDclient secret,因为内省端点通常受到保护。

内省端点的属性示例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.introspection.endpoint.uri="https://<oauth_server_address>/introspection" \ 
1

  oauth.client.id="kafka-broker" \ 
2

  oauth.client.secret="kafka-broker-secret" \ 
3

  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 
4

  oauth.ssl.truststore.password="<truststore_password>" \ 
5

  oauth.ssl.truststore.type="PKCS12" \ 
6

  oauth.username.claim="preferred_username" ; 
7
Copy to Clipboard Toggle word wrap

1
OAuth 2.0 内省端点 URI。例如: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token/introspect
2
Kafka 代理的客户端 ID。
3
Kafka 代理的 secret。
4
TLS 配置中使用的信任存储的位置。
5
访问信任存储的密码。
6
PKCS #12 格式的 truststore 类型。
7
在令牌中包含实际用户名的令牌声明(或密钥)。用户名是用于识别用户的 主体。该值将取决于身份验证流和使用的授权服务器。如果需要,您可以使用 JsonPath 表达式,如 "['user.info'].['user.id']",从令牌中的嵌套 JSON 属性检索用户名。

6.4.9.3. Kafka 代理的会话重新身份验证

您可以将 OAuth 侦听程序配置为使用 Kafka 会话在 Kafka 客户端和 Kafka 代理之间对 OAuth 2.0 会话进行重新身份验证。这个机制在定义的时间后强制实施客户端和代理之间经过身份验证的会话的过期。当会话过期时,客户端会立即通过重复使用现有连接而不是丢弃它来启动新的会话。

会话重新身份验证默认为禁用。您可以在 server.properties 文件中启用它。为启用了 OAUTHBEARER 或 PLAIN 的 TLS 侦听器设置 connections.max.reauth.ms 属性作为 SASL 机制。

您可以指定每个监听器的会话重新身份验证。例如:

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
Copy to Clipboard Toggle word wrap

会话重新身份验证必须由客户端使用的 Kafka 客户端库支持。

会话重新身份验证可用于快速 本地 JWT内省端点 令牌验证。

客户端重新身份验证

当代理的经过身份验证的会话过期时,客户端必须通过向代理发送一个新的有效访问令牌来重新验证到现有会话,而无需丢弃连接。

如果令牌验证成功,则使用现有连接启动新的客户端会话。如果客户端无法重新验证,代理会在进一步尝试发送或接收消息时关闭连接。如果代理上启用了重新身份验证机制,则使用 Kafka 客户端库 2.2 或更高版本的 Java 客户端会自动重新验证。

如果使用,会话重新身份验证也适用于刷新令牌。当会话过期时,客户端会使用其刷新令牌来刷新访问令牌。然后,客户端使用新的访问令牌来重新验证现有连接。

OAUTHBEARER 和 PLAIN 的会话到期

配置会话重新身份验证后,会话到期对于 OAUTHBEARER 和 PLAIN 身份验证是不同的。

对于 OAUTHBEARER 和 PLAIN,使用 客户端 ID 和 secret 方法:

  • 代理的身份验证会话将在配置的 connections.max.reauth.ms 过期。
  • 如果访问令牌在配置的时间前过期,会话将提前过期。

对于 PLAIN,使用 长期访问令牌 方法:

  • 代理的身份验证会话将在配置的 connections.max.reauth.ms 过期。
  • 如果访问令牌在配置的时间前过期,则重新身份验证将失败。虽然尝试尝试会话重新身份验证,但 PLAIN 没有刷新令牌的机制。

如果没有配置 connection.max.reauth.ms,OAUTHBEARER 和 PLAIN 客户端可以无限期地连接到代理,而无需重新验证。经过身份验证的会话不会因为访问令牌到期而终止。但是,这可在配置授权时考虑,例如,使用 keycloak 授权或安装自定义授权器。

6.4.9.4. OAuth 2.0 Kafka 客户端配置

Kafka 客户端被配置为:

  • 从授权服务器获取有效访问令牌(客户端 ID 和 Secret)所需的凭证
  • 使用授权服务器提供的工具获取有效的长期访问令牌或刷新令牌

发送到 Kafka 代理的唯一信息是访问令牌。用于与授权服务器进行身份验证的凭证从不会发送到代理。

当客户端获取访问令牌时,不需要进一步与授权服务器通信。

最简单的机制是使用客户端 ID 和 Secret 进行身份验证。使用长期的访问令牌或长期的刷新令牌会增加复杂性,因为对授权服务器工具还有额外的依赖。

注意

如果您使用长期的访问令牌,您可能需要在授权服务器中配置客户端来提高令牌的最大生命周期。

如果 Kafka 客户端没有直接配置访问令牌,客户端会在 Kafka 会话发起授权服务器的过程中交换访问令牌的凭证。Kafka 客户端交换:

  • 客户端 ID 和 Secret
  • 客户端 ID、刷新令牌和(可选)secret
  • 使用客户端 ID 和(可选)secret 的用户名和密码

6.4.9.5. OAuth 2.0 客户端身份验证流

OAuth 2.0 身份验证流程取决于底层 Kafka 客户端和 Kafka 代理配置。流还必须由使用的授权服务器支持。

Kafka 代理监听程序配置决定客户端如何使用访问令牌进行身份验证。客户端可以传递客户端 ID 和机密来请求访问令牌。

如果侦听器配置为使用 PLAIN 身份验证,客户端可以通过客户端 ID 和 secret 或用户名和访问令牌进行身份验证。这些值作为 PLAIN 机制 usernamepassword 属性传递。

侦听器配置支持以下令牌验证选项:

  • 您可以根据 JWT 签名检查和本地令牌内省使用快速本地令牌验证,而无需联系授权服务器。授权服务器提供带有公共证书的 JWKS 端点,用于验证令牌中的签名。
  • 您可以使用调用授权服务器提供的令牌内省端点。每次建立新的 Kafka 代理连接时,代理会将从客户端接收的访问令牌传递给授权服务器。Kafka 代理检查响应,以确认令牌是否有效。
注意

授权服务器可能只允许使用不透明访问令牌,这意味着无法进行本地令牌验证。

也可以为以下类型的身份验证配置 Kafka 客户端凭证:

  • 使用之前生成的长期访问令牌直接进行本地访问
  • 与授权服务器联系,以获取要发布的新访问令牌(使用客户端 ID 和 secret,或刷新令牌,或者用户名和密码)

您可以使用 SASL OAUTHBEARER 机制为 Kafka 身份验证使用以下通信流。

使用客户端 ID 和 secret 的客户端以及代理委派到授权服务器的验证

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka 客户端使用客户端 ID 和 secret 从授权服务器请求访问令牌,以及可选的刷新令牌。或者,客户端也可以使用用户名和密码进行身份验证。
  2. 授权服务器生成新的访问令牌。
  3. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递访问令牌。
  4. Kafka 代理通过使用自己的客户端 ID 和 secret,在授权服务器上调用令牌内省端点来验证访问令牌。
  5. 如果令牌有效,则会建立 Kafka 客户端会话。

使用客户端 ID 和 secret 的客户端,代理执行快速本地令牌验证

Client using client ID and secret with broker performing fast local token validation

  1. Kafka 客户端使用令牌端点、使用客户端 ID 和 secret 以及刷新令牌(可选)从令牌端点验证。或者,客户端也可以使用用户名和密码进行身份验证。
  2. 授权服务器生成新的访问令牌。
  3. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递访问令牌。
  4. Kafka 代理使用 JWT 令牌签名检查和本地令牌内省验证访问令牌。

使用长期访问令牌的客户端,带有代理委派验证到授权服务器

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递长期访问令牌。
  2. Kafka 代理通过使用自己的客户端 ID 和 secret,在授权服务器上调用令牌内省端点来验证访问令牌。
  3. 如果令牌有效,则会建立 Kafka 客户端会话。

使用长期访问令牌的客户端,代理执行快速本地验证

Client using long-lived access token with broker performing fast local validation

  1. Kafka 客户端使用 SASL OAUTHBEARER 机制通过 Kafka 代理进行身份验证,以传递长期访问令牌。
  2. Kafka 代理使用 JWT 令牌签名检查和本地令牌内省验证访问令牌。
警告

快速的本地 JWT 令牌签名验证仅适用于短期的令牌,因为如果已撤销令牌,就不会通过授权服务器检查该授权服务器。令牌到期时间写入到令牌,但可以随时进行撤销,因此不能在不联系授权服务器的情况下被考虑。任何发布的令牌都将被视为有效,直到过期为止。

您可以使用 OAuth PLAIN 机制对 Kafka 身份验证使用以下通信流。

使用客户端 ID 和 secret 的客户端以及代理获取客户端的访问令牌

Client using a client ID and secret with the broker obtaining the access token for the client

  1. Kafka 客户端或传递一个 clientId 作为用户名,以及一个 secret 作为密码。
  2. Kafka 代理使用令牌端点将 clientIdsecret 传递给授权服务器。
  3. 如果客户端凭据无效,授权服务器会返回一个新的访问令牌或错误。
  4. Kafka 代理使用以下方法之一验证令牌:

    1. 如果指定了令牌内省端点,Kafka 代理会通过调用授权服务器上的端点来验证访问令牌。如果令牌验证成功,则会建立会话。
    2. 如果使用本地令牌内省,则不会向授权服务器发出请求。Kafka 代理使用 JWT 令牌签名检查在本地验证访问令牌。

使用没有客户端 ID 和 secret 的长期访问令牌的客户端

Client using a long-lived access token without a client ID and secret

  1. Kafka 客户端会传递用户名和密码。密码提供在运行客户端前手动配置的访问令牌值。
  2. 密码通过或不使用 $accessToken: 字符串前缀来传递,具体取决于 Kafka 代理侦听程序是否配置了令牌端点来进行身份验证。

    1. 如果配置了令牌端点,则密码应加上前缀 $accessToken:,以便代理知道 password 参数包含访问令牌,而不是客户端 secret。Kafka 代理将用户名解释为帐户用户名。
    2. 如果没有在 Kafka 代理监听程序上配置令牌端点(强制 no-client-credentials 模式),则密码应在没有前缀的情况下提供访问令牌。Kafka 代理将用户名解释为帐户用户名。在这个模式中,客户端不使用客户端 ID 和 secret,password 参数始终解释为原始访问令牌。
  3. Kafka 代理使用以下方法之一验证令牌:

    1. 如果指定了令牌内省端点,Kafka 代理会通过调用授权服务器上的端点来验证访问令牌。如果令牌验证成功,则会建立会话。
    2. 如果使用本地令牌内省,则不会向授权服务器发出请求。Kafka 代理使用 JWT 令牌签名检查在本地验证访问令牌。

6.4.9.6. 配置 OAuth 2.0 身份验证

OAuth 2.0 用于在 Kafka 客户端和 AMQ Streams 组件间交互。

要将 OAuth 2.0 用于 AMQ Streams,您必须:

这个步骤描述了如何将 Red Hat Single Sign-On 部署为授权服务器,并配置它以与 AMQ Streams 集成。

授权服务器为身份验证和授权提供了一个中央点,以及用户、客户端和权限的管理。Red Hat Single Sign-On 具有域概念,其中 realm 代表一组独立的用户、客户端、权限和其他配置。您可以使用默认 master 域,或创建新域。每个 realm 会公开自己的 OAuth 2.0 端点,这意味着应用程序客户端和应用服务器都需要使用相同的域。

要将 OAuth 2.0 与 AMQ Streams 搭配使用,请使用部署 Red Hat Single Sign-On 来创建和管理身份验证域。

注意

如果您已经部署了 Red Hat Single Sign-On,您可以跳过部署步骤并使用您的当前部署。

开始前

您将需要熟悉使用 Red Hat Single Sign-On。

有关安装和管理说明,请参阅:

先决条件

  • AMQ Streams 和 Kafka 正在运行

对于 Red Hat Single Sign-On 部署:

流程

  1. 安装 Red Hat Single Sign-On。

    您可以从 ZIP 文件或使用 RPM 安装。

  2. 登录到 Red Hat Single Sign-On Admin 控制台,为 AMQ Streams 创建 OAuth 2.0 策略。

    部署 Red Hat Single Sign-On 时会提供登录详情。

  3. 创建并启用 realm。

    您可以使用现有的 master 域。

  4. 如果需要,调整域的会话和令牌超时。
  5. 创建名为 kafka-broker 的客户端。
  6. Settings 选项卡中设置:

    • 访问类型机密
    • Standard Flow EnabledOFF 为这个客户端禁用 Web 登录
    • Service Accounts EnabledON,允许此客户端在其自己的名称中进行身份验证
  7. 在继续操作前,点 Save
  8. Credentials 选项卡中,记录使用 AMQ Streams Kafka 集群配置的 secret。
  9. 对将连接到 Kafka 代理的任何应用程序客户端重复客户端创建步骤。

    为每个新客户端创建一个定义。

    在配置中,您将使用名称作为客户端 ID。

接下来要做什么

部署和配置授权服务器后,将 Kafka 代理配置为使用 OAuth 2.0

6.4.9.6.2. 为 Kafka 代理配置 OAuth 2.0 支持

此流程描述了如何配置 Kafka 代理,以便代理监听程序可以使用授权服务器使用 OAuth 2.0 身份验证。

我们建议通过配置 TLS 侦听程序在加密接口中使用 OAuth 2.0。不建议使用 plain 监听程序。

使用支持您选择的授权服务器的属性配置 Kafka 代理,以及您要实现的授权类型。

开始前

有关 Kafka 代理监听程序的配置和验证的更多信息,请参阅:

有关监听器配置中使用的属性的描述,请参阅:

先决条件

  • AMQ Streams 和 Kafka 正在运行
  • 部署 OAuth 2.0 授权服务器

流程

  1. server.properties 文件中配置 Kafka 代理监听程序配置。

    例如,使用 OAUTHBEARER 机制:

    sasl.enabled.mechanisms=OAUTHBEARER
    listeners=CLIENT://0.0.0.0:9092
    listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
    listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
    inter.broker.listener.name=CLIENT
    listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
    listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    Copy to Clipboard Toggle word wrap
  2. 将代理连接设置配置为 listener.name.client.oauthbearer.sasl.jaas.config 的一部分。

    此处的示例显示连接配置选项。

    示例 1:使用 JWKS 端点配置进行本地令牌验证

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.valid.issuer.uri="https://<oauth_server_address>/auth/realms/<realm_name>" \
      oauth.jwks.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/certs" \
      oauth.jwks.refresh.seconds="300" \
      oauth.jwks.refresh.min.pause.seconds="1" \
      oauth.jwks.expiry.seconds="360" \
      oauth.username.claim="preferred_username" \
      oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
      oauth.ssl.truststore.password="<truststore_password>" \
      oauth.ssl.truststore.type="PKCS12" ;
    listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
    Copy to Clipboard Toggle word wrap

    示例 2:通过 OAuth 2.0 内省端点将令牌验证委托给授权服务器

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.introspection.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/introspection" \
      # ...
    Copy to Clipboard Toggle word wrap

  3. 如果需要,配置对授权服务器的访问。

    生产环境通常需要这一步,除非使用 service mesh 等技术来配置容器外的安全频道。

    1. 提供用于连接安全授权服务器的自定义信任存储。对授权服务器的访问始终需要 SSL。

      设置属性来配置信任存储。

      例如:

      listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        # ...
        oauth.client.id="kafka-broker" \
        oauth.client.secret="kafka-broker-secret" \
        oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
        oauth.ssl.truststore.password="<truststore_password>" \
        oauth.ssl.truststore.type="PKCS12" ;
      Copy to Clipboard Toggle word wrap
    2. 如果证书主机名与访问 URL 主机名不匹配,您可以关闭证书主机名验证:

      oauth.ssl.endpoint.identification.algorithm=""
      Copy to Clipboard Toggle word wrap

      检查可确保与授权服务器的连接是真实的。您可能想要在非生产环境中关闭验证。

  4. 根据您选择的身份验证流配置附加属性:

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.token.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/token" \ 
    1
    
      oauth.custom.claim.check="@.custom == 'custom-value'" \ 
    2
    
      oauth.scope="<scope>" \ 
    3
    
      oauth.check.audience="true" \ 
    4
    
      oauth.audience="<audience>" \ 
    5
    
      oauth.valid.issuer.uri="https://https://<oauth_server_address>/auth/<realm_name>" \ 
    6
    
      oauth.client.id="kafka-broker" \ 
    7
    
      oauth.client.secret="kafka-broker-secret" \ 
    8
    
      oauth.connect.timeout.seconds=60 \ 
    9
    
      oauth.read.timeout.seconds=60 \ 
    10
    
      oauth.http.retries=2 \ 
    11
    
      oauth.http.retry.pause.millis=300 \ 
    12
    
      oauth.groups.claim="$.groups" \ 
    13
    
      oauth.groups.claim.delimiter="," ; 
    14
    Copy to Clipboard Toggle word wrap
    1
    到您的授权服务器的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 https:// urls。当使用 KeycloakAuthorizer 或启用了 OAuth 2.0 的监听程序时,需要用于代理间通信。
    2
    (可选) 自定义声明检查。JsonPath 过滤器查询,用于在验证期间将其他自定义规则应用到 JWT 访问令牌。如果访问令牌不包含必要的数据,它将被拒绝。使用 introspection 端点方法时,自定义检查将应用到内省端点响应 JSON。
    3
    (可选)传递给令牌端点 的范围 参数。获取访问令牌进行代理身份验证时使用的 scope。它还在使用 clientIdsecret 的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会影响监听器的令牌验证规则。
    4
    (可选)对象 检查。如果您的授权服务器提供 aud (audience)声明,并且希望强制进行受众检查,请将 ouath.check.audience 设置为 true。Audience 检查标识令牌的预期接收者。因此,Kafka 代理将拒绝在其 aud 声明中没有 clientId 的令牌。默认为 false
    5
    (可选)传递给令牌端点的 audience 参数。在获取用于代理身份验证的访问令牌时,需要使用 audience。它还在使用 clientIdsecret 的 PLAIN 客户端验证中用于 OAuth 2.0 的客户端名称。这只会影响获取令牌的能力,以及令牌的内容,具体取决于授权服务器。它不会影响监听器的令牌验证规则。
    6
    有效的签发者 URI。只有此签发者发布的访问令牌才会被接受。(始终需要。)
    7
    Kafka 代理配置的客户端 ID,适用于所有代理。这是在 授权服务器注册为 kafka-broker的客户端。当内省端点用于令牌验证时,或使用 KeycloakAuthorizer 时是必需的。
    8
    Kafka 代理配置的 secret,适用于所有代理。当代理必须向授权服务器进行身份验证时,必须指定客户端 secret、访问令牌或刷新令牌。
    9
    (可选)连接到授权服务器时的连接超时(以秒为单位)。默认值为 60。
    10
    (可选)连接到授权服务器时读取超时(以秒为单位)。默认值为 60。
    11
    将失败的 HTTP 请求重试到授权服务器的次数上限。默认值为 0,表示不会执行重试。要有效地使用这个选项,请考虑减少 oauth.connect.timeout.secondsoauth.read.timeout.seconds 选项的超时时间。但请注意,重试可能会阻止当前 worker 线程可用于其他请求,如果太多请求停滞,则可能会导致 Kafka 代理无响应。
    12
    尝试另一个到授权服务器的 HTTP 请求重试前等待的时间。默认情况下,这个时间被设置为零,这意味着不会应用暂停。这是因为导致失败请求的许多问题是针对每个请求的网络粘合或代理问题,可以快速解决。但是,如果您的授权服务器处于压力下或高流量,您可能希望将此选项设置为值 100 ms 或更多,以减少服务器上的负载,并增加成功重试的可能性。
    13
    JsonPath 查询,用于从 JWT 令牌或内省端点响应中提取组信息。默认不设置。这可供自定义授权器用于根据用户组做出授权决策。
    14
    当以单一分隔的字符串返回时,用于解析组信息的分隔符。默认值为 ','(comma)。
  5. 根据您如何应用 OAuth 2.0 身份验证以及所使用的授权服务器类型,添加额外的配置设置:

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.check.issuer=false \ 
    1
    
      oauth.fallback.username.claim="<client_id>" \ 
    2
    
      oauth.fallback.username.prefix="<client_account>" \ 
    3
    
      oauth.valid.token.type="bearer" \ 
    4
    
      oauth.userinfo.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/userinfo" ; 
    5
    Copy to Clipboard Toggle word wrap
    1
    如果您的授权服务器不提供 iss 声明,则无法执行签发者检查。在这种情况下,将 oauth.check.issuer 设置为 false,且不指定 oauth.valid.issuer.uri。默认为 true
    2
    授权服务器可能无法提供单个属性来识别常规用户和客户端。当客户端以自己的名称进行身份验证时,服务器可能会提供 客户端 ID。当用户使用用户名和密码进行身份验证时,若要获取刷新令牌或访问令牌,除了客户端 ID 外,服务器可能会提供一个 username 属性。如果主用户 ID 属性不可用,则使用这个 fallback 选项指定要使用的用户名声明(attribute)。如果需要,您可以使用 JsonPath 表达式,如 "['client.info'].['client.id']",从令牌中的嵌套 JSON 属性检索回退用户名。
    3
    oauth.fallback.username.claim 适用的情况下,可能还需要防止用户名声明的值与回退用户名声明之间的名称冲突。请考虑存在名为 producer 的客户端存在的情况,但也存在名为 producer 的常规用户。为了区分这两者,您可以使用此属性向客户端的用户 ID 添加前缀。
    4
    (仅在使用 oauth.introspection.endpoint.uri)取决于您使用的授权服务器,内省端点可能会或不返回 令牌类型 属性,或者可以包含不同的值。您可以指定来自内省端点的响应必须包含有效的令牌类型值。
    5
    (仅在使用 oauth.introspection.endpoint.uri)时,可以配置或实施授权服务器,以便在内省端点响应中提供任何可识别的信息。要获取用户 ID,您可以将 userinfo 端点的 URI 配置为回退。oauth.fallback.username.claimoauth.fallback.username.claimoauth.fallback.username.prefix 设置应用到 userinfo 端点的响应。

配置 Kafka producer 和消费者 API,以使用 OAuth 2.0 与 Kafka 代理交互。在客户端 pom.xml 文件中添加回调插件,然后为 OAuth 2.0 配置您的客户端。

在客户端配置中指定以下内容:

  • SASL (简单身份验证和安全层)安全协议:

    • SASL_SSL 用于通过 TLS 加密连接进行身份验证
    • SASL_PLAINTEXT 用于通过未加密的连接进行身份验证

      SASL_SSL 用于生产环境,SASL_PLAINTEXT 仅用于本地开发。使用 SASL_SSL 时,需要额外的 ssl.truststore 配置。安全连接(https://)需要 truststore 配置到 OAuth 2.0 授权服务器。要验证 OAuth 2.0 授权服务器,请将授权服务器的 CA 证书添加到客户端配置的信任存储中。您可以使用 PEM 或 PKCS #12 格式配置信任存储。

  • Kafka SASL 机制:

    • OAUTHBEARER 用于使用 bearer 令牌交换的凭证
    • PLAIN 传递客户端凭证(clientId + secret)或访问令牌
  • 实现 SASL 机制的 JAAS (Java 身份验证和授权服务)模块:

    • org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 实现 OAUTHBEARER 机制
    • org.apache.kafka.common.security.plain.PlainLoginModule 实现 PLAIN 机制
  • SASL 验证方法,它支持以下验证方法:

    • OAuth 2.0 客户端凭证
    • OAuth 2.0 密码授权(已弃用)
    • 访问令牌
    • 刷新令牌

将 SASL 身份验证属性添加为 JAAS 配置(sasl.jaas.config)。如何配置身份验证属性取决于您用来访问 OAuth 2.0 授权服务器的身份验证方法。在此过程中,属性在属性文件中指定,然后加载到客户端配置中。

注意

您还可以将身份验证属性指定为环境变量,或指定为 Java 系统属性。对于 Java 系统属性,您可以使用 setProperty 设置它们,并使用 -D 选项在命令行中传递它们。

先决条件

  • AMQ Streams 和 Kafka 正在运行
  • 部署和配置 OAuth 2.0 授权服务器以 OAuth 访问 Kafka 代理
  • 为 OAuth 2.0 配置 Kafka 代理

流程

  1. 将支持 OAuth 2.0 的客户端库添加到 Kafka 客户端的 pom.xml 文件中:

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.13.0.redhat-00015</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 通过在属性文件中指定以下配置来配置客户端属性:

    • 安全协议
    • SASL 机制
    • JAAS 模块和身份验证属性,具体取决于所使用的方法

      例如,我们可以将以下内容添加到 client.properties 文件中:

      客户端凭证机制属性

      security.protocol=SASL_SSL 
      1
      
      sasl.mechanism=OAUTHBEARER 
      2
      
      ssl.truststore.location=/tmp/truststore.p12 
      3
      
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \ 
      4
      
        oauth.client.id="<client_id>" \ 
      5
      
        oauth.client.secret="<client_secret>" \ 
      6
      
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 
      7
      
        oauth.ssl.truststore.password="$STOREPASS" \ 
      8
      
        oauth.ssl.truststore.type="PKCS12" \ 
      9
      
        oauth.scope="<scope>" \ 
      10
      
        oauth.audience="<audience>" ; 
      11
      Copy to Clipboard Toggle word wrap

      1
      SASL_SSL 安全协议用于 TLS 加密连接。仅对本地开发使用 SASL_PLAINTEXT
      2
      指定为 OAUTHBEARERPLAIN 的 SASL 机制。
      3
      用于安全访问 Kafka 集群的 truststore 配置。
      4
      授权服务器令牌端点的 URI。
      5
      客户端 ID,这是在授权服务器中创建客户端时使用的名称。
      6
      在授权服务器中创建客户端时创建的 客户端 secret。
      7
      该位置包含授权服务器的公钥证书(truststore.p12)。
      8
      用于访问 truststore 的密码。
      9
      truststore 类型。
      10
      (可选)从令牌端点请求令牌的范围。授权服务器可能需要客户端来指定范围。
      11
      (可选)从令牌端点请求令牌的听众。授权服务器可能需要客户端来指定受众。

      密码授予机制属性

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.client.id="<client_id>" \ 
      1
      
        oauth.client.secret="<client_secret>" \ 
      2
      
        oauth.password.grant.username="<username>" \ 
      3
      
        oauth.password.grant.password="<password>" \ 
      4
      
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" \
        oauth.scope="<scope>" \
        oauth.audience="<audience>" ;
      Copy to Clipboard Toggle word wrap

      1
      客户端 ID,这是在授权服务器中创建客户端时使用的名称。
      2
      (可选)在授权服务器中创建客户端时创建的客户端 secret。
      3
      password 授权身份验证的用户名。OAuth 密码授权配置(用户名和密码)使用 OAuth 2.0 密码授权方法。要使用密码授权,请在您的授权服务器上为客户端创建一个有限权限的用户帐户。帐户应像服务帐户一样操作。在进行身份验证需要用户帐户的环境中使用,但首先考虑使用刷新令牌。
      4
      密码以授予密码身份验证。
      注意

      SASL PLAIN 不支持使用 OAuth 2.0 密码授权方法传递用户名和密码(密码授权)。

      访问令牌属性

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.access.token="<access_token>" ; 
      1
      
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" \
      Copy to Clipboard Toggle word wrap

      1
      Kafka 客户端长期的访问令牌。

      刷新令牌属性

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.client.id="<client_id>" \ 
      1
      
        oauth.client.secret="<client_secret>" \ 
      2
      
        oauth.refresh.token="<refresh_token>" ; 
      3
      
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" \
      Copy to Clipboard Toggle word wrap

      1
      客户端 ID,这是在授权服务器中创建客户端时使用的名称。
      2
      (可选)在授权服务器中创建客户端时创建的客户端 secret。
      3
      Kafka 客户端长期刷新令牌。
  3. 在 Java 客户端代码中输入 OAUTH 2.0 身份验证的客户端属性。

    显示客户端属性输入示例

    Properties props = new Properties();
    try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) {
      props.load(reader);
    }
    Copy to Clipboard Toggle word wrap

  4. 验证 Kafka 客户端是否可以访问 Kafka 代理。

6.4.10. 使用基于 OAuth 2.0 令牌的授权

如果您在 Red Hat Single Sign-On 中使用 OAuth 2.0 进行基于令牌的身份验证,您还可以使用 Red Hat Single Sign-On 来配置授权规则来限制客户端对 Kafka 代理的访问。身份验证建立用户的身份。授权决定该用户的访问权限级别。

AMQ Streams 支持通过 Red Hat Single Sign-On Authorization Services 使用基于 OAuth 2.0 令牌的授权,它允许您集中管理安全策略和权限。

Red Hat Single Sign-On 中定义的安全策略和权限用于授予对 Kafka 代理上资源的访问权限。用户和客户端与允许对 Kafka 代理执行特定操作的策略进行匹配。

Kafka 允许所有用户默认对代理进行完全访问,同时还提供 AclAuthorizer 插件来配置基于 Access Control Lists (ACL)的授权。

ZooKeeper 存储 ACL 规则,以根据 用户名 授予或拒绝对资源的访问。但是,红帽单点登录基于 OAuth 2.0 令牌的授权在您希望实现对 Kafka 代理的访问控制方面具有更大的灵活性。另外,您可以将 Kafka 代理配置为使用 OAuth 2.0 授权和 ACL。

6.4.10.1. OAuth 2.0 授权机制

AMQ Streams 中的 OAuth 2.0 授权使用红帽单点登录服务器授权服务 REST 端点,通过在特定用户上应用定义的安全策略来扩展基于令牌的身份验证,并为该用户提供授予不同资源的权限列表。策略使用角色和组来匹配用户的权限。OAuth 2.0 授权根据从 Red Hat Single Sign-On Authorization Services 用户获得的授予者列表在本地强制实施权限。

6.4.10.1.1. Kafka 代理自定义授权器

AMQ Streams 提供了一个 Red Hat Single Sign-On authorizer (KeycloakAuthorizer)。为了可以使用 Red Hat Single Sign-On 提供的授权服务的 Red Hat Single Sign-On REST 端点,您可以在 Kafka 代理上配置自定义授权器。

授权程序根据需要从授权服务器获取授予权限的列表,并在 Kafka Broker 上本地强制实施授权,为每个客户端请求做出快速授权决策。

6.4.10.2. 配置 OAuth 2.0 授权支持

这个步骤描述了如何使用 Red Hat Single Sign-On Authorization Services 将 Kafka 代理配置为使用 OAuth 2.0 授权服务。

开始前

考虑某些用户所需的访问权限或希望限制某些用户。您可以使用 Red Hat Single Sign-On 角色客户端用户在 Red Hat Single Sign-On 中配置访问权限的组合。

通常,组用于根据机构部门或地理位置匹配用户。和 角色用于根据其功能匹配用户。

使用红帽单点登录,您可以在 LDAP 中存储用户和组,而客户端和角色不能以这种方式存储。存储和对用户数据的访问可能是您选择配置授权策略的一个因素。

注意

无论在 Kafka 代理上实现的授权是什么,超级用户始终对 Kafka 代理具有不受限制的访问。

先决条件

  • AMQ Streams 必须通过 Red Hat Single Sign-On 配置为使用 OAuth 2.0 进行基于令牌的身份验证。设置授权时,您可以使用相同的 Red Hat Single Sign-On 服务器端点。
  • 您需要了解如何管理 Red Hat Single Sign-On Authorization Services 的策略和权限,如 Red Hat Single Sign-On 文档所述

流程

  1. 访问 Red Hat Single Sign-On Admin 控制台,或使用 Red Hat Single Sign-On Admin CLI 为设置 OAuth 2.0 身份验证时创建的 Kafka 代理客户端启用授权服务。
  2. 使用 Authorization Services 为客户端定义资源、授权范围、策略和权限。
  3. 通过分配角色和组,将权限绑定到用户和客户端。
  4. 将 Kafka 代理配置为使用 Red Hat Single Sign-On 授权。

    将以下内容添加到 Kafka server.properties 配置文件中,以在 Kafka 中安装授权器:

    authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer
    principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
    Copy to Clipboard Toggle word wrap
  5. 为 Kafka 代理添加配置以访问授权服务器和授权服务。

    在此我们显示作为额外属性添加到 server.properties 的示例配置,但您也可以使用大写或大写的命名规则将它们定义为环境变量。

    strimzi.authorization.token.endpoint.uri="https://<auth_server_address>/auth/realms/REALM-NAME/protocol/openid-connect/token" 
    1
    
    strimzi.authorization.client.id="kafka" 
    2
    Copy to Clipboard Toggle word wrap
    1
    到 Red Hat Single Sign-On 的 OAuth 2.0 令牌端点 URL。对于生产环境,请始终使用 https:// urls。
    2
    在启用了授权服务的 Red Hat Single Sign-On 中 OAuth 2.0 客户端定义的客户端 ID。通常,kafka 被用作 ID。
  6. (可选)为特定 Kafka 集群添加配置。

    例如:

    strimzi.authorization.kafka.cluster.name="kafka-cluster" 
    1
    Copy to Clipboard Toggle word wrap
    1
    特定 Kafka 集群的名称。名称用于目标权限,因此可以在同一 Red Hat Single Sign-On 域中管理多个集群。默认值为 kafka-cluster
  7. (可选)与简单授权决定。

    例如:

    strimzi.authorization.delegate.to.kafka.acl="false" 
    1
    Copy to Clipboard Toggle word wrap
    1
    如果 Red Hat Single Sign-On Authorization Services 策略无法访问,将授权委派给 Kafka AclAuthorizer。默认值为 false
  8. (可选)将 TLS 连接的配置添加到授权服务器。

    例如:

    strimzi.authorization.ssl.truststore.location=<path_to_truststore> 
    1
    
    strimzi.authorization.ssl.truststore.password=<my_truststore_password> 
    2
    
    strimzi.authorization.ssl.truststore.type=JKS 
    3
    
    strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 
    4
    
    strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 
    5
    Copy to Clipboard Toggle word wrap
    1
    包含证书的信任存储的路径。
    2
    truststore 的密码。
    3
    truststore 类型。如果没有设置,则使用默认的 Java 密钥存储类型。
    4
    随机数生成器实施。如果没有设置,则使用 Java platform SDK 默认。
    5
    主机名验证。如果设置为空字符串,则会关闭主机名验证。如果没有设置,则默认值为 HTTPS,它会强制对服务器证书进行主机名验证。
  9. (可选)配置从授权服务器刷新授权。授予刷新作业的工作原理,方法是枚举活跃的令牌并请求每个令牌的最新授权。

    例如:

    strimzi.authorization.grants.refresh.period.seconds="120" 
    1
    
    strimzi.authorization.grants.refresh.pool.size="10" 
    2
    
    strimzi.authorization.grants.max.idle.time.seconds="300" 
    3
    
    strimzi.authorization.grants.gc.period.seconds="300" 
    4
    
    strimzi.authorization.reuse.grants="false" 
    5
    Copy to Clipboard Toggle word wrap
    1
    指定授权服务器的授予的频率(默认为每分钟一次)。要关闭刷新以进行调试,请将 设置为 "0"
    2
    指定授予刷新作业使用的线程池大小(并行级别)。默认值为 "5"
    3
    缓存中闲置授权的时间(以秒为单位)。默认值为 300。
    4
    连续运行作业之间的时间(以秒为单位)。默认值为 300。
    5
    控制是否为新会话获取最新的授权。禁用后,从 Red Hat Single Sign-On 检索并缓存该用户的权限。默认值为 true
  10. (可选)在与授权服务器通信时配置网络超时。

    例如:

    strimzi.authorization.connect.timeout.seconds="60" 
    1
    
    strimzi.authorization.read.timeout.seconds="60" 
    2
    
    strimzi.authorization.http.retries="2" 
    3
    Copy to Clipboard Toggle word wrap
    1
    连接到 Red Hat Single Sign-On 令牌端点时的连接超时(以秒为单位)。默认值为 60
    2
    连接到 Red Hat Single Sign-On 令牌端点时读取超时(以秒为单位)。默认值为 60
    3
    重新尝试(不暂停)授权服务器的 HTTP 请求失败的次数上限。默认值为 0, 表示不会执行重试。要有效地使用这个选项,请考虑减少 strimzi.authorization.connect.timeout.secondsstrimzi.authorization.read.timeout.seconds 选项的超时时间。但请注意,重试可能会阻止当前 worker 线程可用于其他请求,如果太多请求停滞,则可能会导致 Kafka 代理无响应。
  11. (可选)为令牌验证和授权启用 OAuth 2.0 指标。

    例如:

    oauth.enable.metrics="true" 
    1
    Copy to Clipboard Toggle word wrap
    1
    控制是否启用或禁用 OAuth 指标。默认值为 false
  12. 通过以客户端或具有特定角色的用户访问 Kafka 代理来验证配置的权限,确保它们具有必要的访问权限,或者没有应该具有访问权限。

6.4.11. 使用基于 OPA 策略的授权

开源策略代理(OPA)是一个开源策略引擎。您可以将 OPA 与 AMQ Streams 集成,作为基于策略的授权机制,允许在 Kafka 代理上进行客户端操作。

从客户端发出请求时,OPA 将根据为 Kafka 访问定义的策略评估请求,然后允许或拒绝请求。

注意

红帽不支持 OPA 服务器。

6.4.11.1. 定义 OPA 策略

在将 OPA 与 AMQ Streams 集成前,请考虑如何定义策略以提供精细的访问控制。

您可以为 Kafka 集群、消费者组和主题定义访问控制。例如,您可以定义一个授权策略,允许从制作者客户端写入到特定代理主题的访问。

为此,策略可能会指定:

  • 与制作者客户端关联的用户主体主机地址
  • 客户端允许的操作
  • 策略适用的 资源类型 (topic)和资源名称

允许或拒绝决策将写入到策略中,并根据提供的请求和客户端识别数据提供响应。

在我们的示例中,生成者客户端必须满足策略才能写入该主题。

6.4.11.2. 连接到 OPA

要启用 Kafka 访问 OPA 策略引擎以查询访问控制策略,您可以在您的 Kafka server.properties 文件中配置自定义 OPA authorizer 插件 (kafka-authorizer-opa-VERSION.jar)。

客户端发出请求时,OPA 策略引擎将通过指定的 URL 地址和 REST 端点来查询 OPA 策略引擎,该端点必须是定义的策略的名称。

该插件提供了客户端请求的详细信息 - 用户主体、操作和资源 - 以 JSON 格式针对策略进行检查。详细信息将包括客户端的唯一身份;例如,使用 TLS 身份验证时将区分名称与客户端证书进行区分。

OPA 使用数据向插件提供响应 - truefalse - 允许或拒绝请求。

6.4.11.3. 配置 OPA 授权支持

这个步骤描述了如何将 Kafka 代理配置为使用 OPA 授权。

开始前

考虑某些用户所需的访问权限或希望限制某些用户。您可以使用 用户和 Kafka 资源 的组合来定义 OPA 策略。

可以设置 OPA 从 LDAP 数据源加载用户信息。

注意

无论在 Kafka 代理上实现的授权是什么,超级用户始终对 Kafka 代理具有不受限制的访问。

先决条件

流程

  1. 编写授权客户端请求对 Kafka 代理执行操作所需的 OPA 策略。

    请参阅 定义 OPA 策略

    现在,将 Kafka 代理配置为使用 OPA。

  2. 为 Kafka 安装 OPA 授权器插件

    请参阅 连接到 OPA

    确保插件文件包含在 Kafka 类路径中。

  3. 在 Kafka server.properties 配置文件中添加以下内容以启用 OPA 插件:

    authorizer.class.name: com.bisnode.kafka.authorization.OpaAuthorizer
    Copy to Clipboard Toggle word wrap
  4. 在 Kafka 代理的 server.properties 中添加其他配置来访问 OPA 策略引擎和策略。

    例如:

    opa.authorizer.url=https://OPA-ADDRESS/allow 
    1
    
    opa.authorizer.allow.on.error=false 
    2
    
    opa.authorizer.cache.initial.capacity=50000 
    3
    
    opa.authorizer.cache.maximum.size=50000 
    4
    
    opa.authorizer.cache.expire.after.seconds=600000 
    5
    
    super.users=User:alice;User:bob 
    6
    Copy to Clipboard Toggle word wrap
    1
    (必需)授权器插件将查询的策略的 OAuth 2.0 令牌端点 URL。在本例中,策略称为 allow
    2
    标志指定在授权器插件无法与 OPA 策略引擎连接时,默认是否允许或拒绝客户端访问。
    3
    本地缓存的初始容量(以字节为单位)。使用缓存,以便插件不必查询每个请求的 OPA 策略引擎。
    4
    本地缓存的最大容量(以字节为单位)。
    5
    本地缓存的时间(以毫秒为单位),方法是从 OPA 策略引擎重新加载。
    6
    被视为超级用户的用户主体列表,以便在不查询 Open Policy Agent 策略的情况下始终允许它们。

    有关身份验证和授权选项的信息,请参阅 Open Policy Agent 网站

  5. 通过使用具有正确授权的客户端访问 Kafka 代理来验证配置的权限。
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat