开发 Kafka 客户端应用程序
开发客户端应用程序以使用 AMQ Streams 与 Kafka 交互
摘要
前言
对红帽文档提供反馈
我们感谢您对我们文档的反馈。
要改进,创建一个 JIRA 问题并描述您推荐的更改。提供尽可能多的详细信息,以便我们快速解决您的请求。
前提条件
-
您有红帽客户门户网站帐户。此帐户可让您登录到 Red Hat Jira Software 实例。
如果您没有帐户,系统会提示您创建一个帐户。
流程
- 点以下内容: Create issue。
- 在 Summary 文本框中输入问题的简短描述。
在 Description 文本框中提供以下信息:
- 找到此问题的页面的 URL。
-
有关此问题的详细描述。
您可以将信息保留在任何其他字段中的默认值。
- 添加 reporter 名称。
- 点 Create 将 JIRA 问题提交到文档团队。
感谢您花时间来提供反馈。
第 1 章 开发客户端概述
为 Apache Kafka 安装开发 Kafka 客户端应用程序,它可以生成消息、使用消息或两者。您可以开发在 RHEL 上的 OpenShift 或 Streams for Apache Kafka 的 Streams for Apache Kafka 的客户端应用程序。
消息包含可选键和值,其中包含消息数据以及标头和相关元数据。键标识邮件的主题或消息的属性。如果您需要以与发送相同的顺序处理一组消息,则必须使用相同的密钥。
消息会批量发送。消息包含标头和元数据,用于提供客户端过滤和路由的详情,如消息的时间戳和偏移位置。
Kafka 为开发客户端应用程序提供客户端 API。Kafka 生成者和消费者 API 是与客户端应用程序中的 Kafka 集群交互的主要方法。API 控制消息流。producer API 将信息发送到 Kafka 主题,而使用者 API 从主题读取消息。
Apache Kafka 的流支持使用 Java 编写的客户端。如何开发您的客户端取决于您的特定用例。数据持久性可能是优先级或高吞吐量。通过配置客户端和代理,可以满足这些要求。但是,所有客户端必须能够连接到给定 Kafka 集群中的所有代理。
1.1. 支持 HTTP 客户端
作为在客户端中使用 Kafka producer 和消费者 API 的替代选择,您可以设置和使用 Apache Kafka Bridge 的 Streams。Kafka Bridge 提供了一个 RESTful 接口,它允许基于 HTTP 的客户端与 Kafka 集群交互。它提供与 Strimzi 的 Web API 连接的优点,而无需解释 Kafka 协议的客户端应用程序。Kafka 通过 TCP 使用二进制协议。
如需更多信息,请参阅将 流用于 Apache Kafka Bridge。
1.2. 调整生产者和消费者
您可以添加更多配置属性来优化 Kafka 客户端的性能。当您有一些时间来分析您的客户端和服务器配置性能时,您可能需要执行此操作。
如需更多信息,请参阅 Kafka 配置调整。
1.3. 监控客户端交互
分布式追踪有助于对消息的端到端跟踪。您可以在 Kafka 消费者和制作者客户端应用程序中启用追踪。
如需更多信息,请参阅以下指南中的分布式追踪文档:
当使用术语客户端应用程序时,我们特别引用使用 Kafka 生成者和消费者向 Kafka 集群发送和接收信息的应用程序。我们不引用其他 Kafka 组件,如 Kafka Connect 或 Kafka Streams,它们都有自己的不同的用例和功能。
第 2 章 客户端开发先决条件
开发客户端用于 Apache Kafka 的 Streams 需要满足以下先决条件。
- 您有红帽帐户。
- 您有一个 Kafka 集群在 Apache Kafka 的 Streams 中运行。
- Kafka 代理使用监听程序为安全客户端连接配置。
- 已为集群创建主题。
- 您有一个 IDE 来开发和测试您的客户端。
- 已安装了 JDK 11 或更高版本。
第 3 章 将客户端依赖项添加到 Maven 项目中
如果要开发基于 Java 的 Kafka 客户端,您可以将包括 Kafka 客户端的红帽依赖项(包括 Kafka Streams)添加到 Maven 项目的 pom.xml
文件中。对于 Apache Kafka,只支持由红帽构建的客户端库。
您可以将以下工件作为依赖项添加:
kafka-clients
包含 Kafka
Producer
、Consumer
和AdminClient
API。-
Producer
API 可让应用程序将数据发送到 Kafka 代理。 -
Consumer
API 可让应用程序消耗 Kafka 代理中的数据。 -
AdminClient
API 提供了管理 Kafka 集群的功能,包括主题、代理和其他组件。
-
kafka-streams
包含
KafkaStreams
API。Kafka Streams 可让应用程序从一个或多个输入流接收数据。您可以使用此 API 对数据流运行一系列实时操作,如映射、过滤和加入。您可以使用 Kafka Streams 将结果写入一个或多个输出流。它是 Red Hat Maven 存储库中提供的
kafka-streams
JAR 软件包的一部分。
3.1. 在 Maven 项目中添加 Kafka 客户端依赖项
将 Kafka 客户端的红帽依赖项添加到 Maven 项目中。
先决条件
-
具有现有
pom.xml
的 Maven 项目。
流程
将 Red Hat Maven 存储库添加到 Maven 项目的
pom.xml
文件的 <repositories
> 部分。<repositories> <repository> <id>redhat-maven</id> <url>https://maven.repository.redhat.com/ga/</url> </repository> </repositories>
将
kafka-clients
作为 <dependency
> 添加到 Maven 项目的pom.xml
文件中。<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.0.redhat-00004</version> </dependency> </dependencies>
- 构建 Maven 项目,将 Kafka 客户端依赖项添加到项目中。
3.2. 在 Maven 项目中添加 Kafka Streams 依赖项
将 Kafka Streams 的红帽依赖项添加到 Maven 项目中。
先决条件
-
具有现有
pom.xml
的 Maven 项目。
流程
将 Red Hat Maven 存储库添加到 Maven 项目的
pom.xml
文件的 <repositories
> 部分。<repositories> <repository> <id>redhat-maven</id> <url>https://maven.repository.redhat.com/ga/</url> </repository> </repositories>
将
kafka-streams
作为 <dependency
> 添加到 Maven 项目的pom.xml
文件中。<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.7.0.redhat-00004</version> </dependency> </dependencies>
- 构建 Maven 项目,将 Kafka Streams 依赖项添加到项目中。
3.3. 在您的 Maven 项目中添加 OAuth 2.0 依赖项
将 OAuth 2.0 的红帽依赖项添加到您的 Maven 项目中。
先决条件
-
具有现有
pom.xml
的 Maven 项目。
流程
将 Red Hat Maven 存储库添加到 Maven 项目的
pom.xml
文件的 <repositories
> 部分。<repositories> <repository> <id>redhat-maven</id> <url>https://maven.repository.redhat.com/ga/</url> </repository> </repositories>
将
kafka-oauth-client
作为 <dependency
> 添加到 Maven 项目的pom.xml
文件中。<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.15.0.redhat-00006</version> </dependency>
- 构建 Maven 项目,将 OAuth 2.0 依赖项添加到项目中。
第 4 章 配置客户端应用程序以连接到 Kafka 集群
要连接到 Kafka 集群,客户端应用程序必须使用标识代理并启用连接的最小属性集进行配置。另外,您需要添加一个 serializer/deserializer 机制,来将信息转换为 Kafka 使用的字节数组格式。在开发消费者客户端时,您首先向 Kafka 集群添加初始连接,该连接用于发现所有可用的代理。建立连接时,您可以开始消耗来自 Kafka 主题的信息,或向它们生成信息。
虽然不需要,但建议使用唯一的客户端 ID,以便您可以在日志和指标集合中对客户端进行身份。
您可以在属性文件中配置属性。使用属性文件意味着您可以在不编译代码的情况下修改配置。
例如,您可以使用以下代码在 Java 客户端中载入属性:
将配置属性加载到客户端中
Properties props = new Properties(); try (InputStream propStream = Files.newInputStream(Paths.get(filename))) { props.load(propStream); }
您还可以使用直接将属性添加到配置对象中的代码中。例如,您可以将 setProperty ()
方法用于 Java 客户端应用程序。当您只配置少量属性时,直接添加属性是有用的选项。
4.1. 基本制作者客户端配置
开发制作者客户端时,配置以下内容:
- 与 Kafka 集群的连接
- 将消息密钥转换为 Kafka 代理的字节的序列化器
- 将消息值转换为 Kafka 代理的字节的序列化器
如果要发送和存储压缩消息,您可能还会添加压缩类型。
基本制作者客户端配置属性
client.id = my-producer-id 1 bootstrap.servers = my-cluster-kafka-bootstrap:9092 2 key.serializer = org.apache.kafka.common.serialization.StringSerializer 3 value.serializer = org.apache.kafka.common.serialization.StringSerializer 4
直接将制作者客户端配置添加到代码中
Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
KafkaProducer
为它发送的消息指定字符串键和值类型。使用的序列化器必须能够将指定类型的键和值转换为字节,然后才能将它们发送到 Kafka。
4.2. 基本消费者客户端配置
开发消费者客户端时,请配置以下内容:
- 与 Kafka 集群的连接
- 将从 Kafka 代理获取的字节数转换为客户端应用程序可以理解的消息键的反序列化器
- 将从 Kafka 代理获取的字节数转换为客户端应用程序可理解的消息值的反序列化器
通常,您还会添加消费者组 ID,以将消费者与消费者组关联。消费者组是一种逻辑实体,用于将大型数据流的处理从一个或多个主题分发到并行消费者。消费者使用 group.id
分组,允许消息分散到成员中。在给定的消费者组中,每个主题分区都由一个消费者读取。单个消费者可以处理许多分区。要获得最大并行性,为每个分区创建一个消费者。如果消费者超过分区,一些消费者保持闲置状态,在出现故障时可以接管。
基本消费者客户端配置属性
client.id = my-consumer-id 1 group.id = my-group-id 2 bootstrap.servers = my-cluster-kafka-bootstrap:9092 3 key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 4 value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 5
直接将消费者客户端配置添加到代码中
Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
KafkaConsumer
指定它接收的消息的字符串键和值类型。使用的序列化器必须能够将从 Kafka 接收的字节数转换为指定类型。
每个消费者组必须具有唯一的 group.id
。如果您重启具有相同 group.id
的消费者,它会在停止前从其离开的位置恢复其消息。
第 5 章 配置安全连接
保护 Kafka 集群和客户端应用程序之间的连接有助于确保集群和客户端之间的通信的机密性、完整性和真实性。
要实现安全连接,您可以引入与身份验证、加密和授权相关的配置:
- 身份验证
- 使用身份验证机制来验证客户端应用程序的身份。
- 加密
- 启用使用 SSL/TLS 加密在客户端和服务器间传输数据加密。
- 授权
- 根据客户端应用程序的身份验证身份,控制 Kafka 代理上允许的客户端访问和操作。
在不进行身份验证的情况下无法使用授权。如果没有启用身份验证,则无法确定客户端的身份,因此无法强制执行授权规则。这意味着,即使定义了授权规则,也不会在不进行身份验证的情况下强制实施它们。
在 Apache Kafka 的 Streams 中,监听程序用于配置 Kafka 代理和客户端之间的网络连接。侦听器配置选项决定了代理如何侦听传入客户端连接以及如何管理安全访问。所需的确切配置取决于您选择的身份验证、加密和授权机制。
您可以配置 Kafka 代理和客户端应用程序以启用安全功能。保护到 Kafka 集群的客户端连接的一般概述如下:
- 安装 Apache Kafka 组件的流,包括 Kafka 集群。
- 对于 TLS,为每个代理和客户端应用程序生成 TLS 证书。
- 在代理配置中配置监听程序以进行安全连接。
- 配置客户端应用程序以进行安全连接。
根据您用来与 Kafka 代理建立安全和验证连接的机制配置客户端应用程序。Kafka 代理使用的身份验证、加密和授权必须与连接客户端应用程序使用的身份验证、加密和授权匹配。客户端应用程序和代理需要在安全协议和配置上达成一致,以便实现安全通信。例如,Kafka 客户端和 Kafka 代理必须使用相同的 TLS 版本和密码套件。
客户端和服务器间的安全配置不匹配可能会导致连接失败或潜在的安全漏洞。仔细配置和测试代理和客户端应用程序非常重要,以确保它们被正确保护并可安全地通信。
5.1. 为安全访问设置代理
在为安全访问配置客户端应用程序前,您必须首先在 Kafka 集群中设置代理来支持您要使用的安全机制。要启用安全连接,您可以使用安全机制的适当配置创建监听程序。
5.1.1. 建立到在 RHEL 上运行的 Kafka 集群的安全连接
当在 RHEL 上使用 Streams for Apache Kafka 时,保护与 Kafka 集群的客户端连接的一般概述如下:
- 在 RHEL 服务器中安装 Apache Kafka 组件的 Streams,包括 Kafka 集群。
- 对于 TLS,为 Kafka 集群中的所有代理生成 TLS 证书。
在代理配置属性文件中配置监听程序。
- 为 Kafka 集群监听程序配置身份验证,如 TLS 或 SASL SCRAM-SHA-512。
-
为 Kafka 集群上所有启用的监听程序配置授权,如
简单
授权。
- 对于 TLS,为每个客户端应用程序生成 TLS 证书。
-
创建
config.properties
文件,以指定客户端应用程序使用的连接详情和身份验证凭证。 启动 Kafka 客户端应用程序并连接到 Kafka 集群。
-
使用
config.properties
文件中定义的属性连接到 Kafka 代理。
-
使用
- 验证客户端是否可以成功连接到 Kafka 集群,并安全地使用和生成信息。
其他资源
有关设置代理的更多信息,请参阅以下指南:
5.1.2. 为 RHEL 上的 Kafka 集群配置安全监听程序
使用配置属性文件在 Kafka 中配置监听程序。要为 Kafka 代理配置安全连接,您可以在此文件中为 TLS、SASL 和其他与安全相关的配置设置相关的属性。
以下是在 Kafka 代理的 server.properties
配置文件中指定的 TLS 侦听器的示例配置,其密钥存储和信任存储采用 PKCSbusybox 格式:
server.properties
中的监听程序配置示例
listeners = listener_1://0.0.0.0:9093, listener_2://0.0.0.0:9094 listener.security.protocol.map = listener_1:SSL, listener_2:PLAINTEXT ssl.keystore.type = PKCS12 ssl.keystore.location = /path/to/keystore.p12 ssl.keystore.password = <password> ssl.truststore.type = PKCS12 ssl.truststore.location = /path/to/truststore.p12 ssl.truststore.password = <password> ssl.client.auth = required authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer. super.users = User:superuser
listener 属性指定每个监听程序名称,以及代理侦听的 IP 地址和端口。协议映射告知
listener_1
侦听器将 SSL 协议用于使用 TLS 加密的客户端。listener_2
为不使用 TLS 加密的客户端提供 PLAINTEXT 连接。密钥存储包含代理的私钥和证书。truststore 包含用于验证客户端应用程序身份的可信证书。ssl.client.auth
属性强制执行客户端身份验证。
Kafka 集群使用简单的授权。授权器设置为 SimpleAclAuthorizer
。为所有监听器上的非受限访问定义了单个超级用户。Apache Kafka 的流支持 Kafka SimpleAclAuthorizer
和自定义授权器插件。
如果我们使用 listener.name.<name_of_listener>
为配置属性添加前缀,则配置特定于该监听程序。
这只是一个示例配置。某些配置选项特定于监听器的类型。如果您使用 OAuth 2.0 或 Open Policy Agent (OPA),还必须配置特定监听器中的授权服务器或 OPA 服务器的访问权限。您可以根据特定要求和环境创建监听程序。
有关监听器配置的更多信息,请参阅 Apache Kafka 文档。
使用 ACL 来微调访问
您可以使用访问控制列表(ACL)来微调对 Kafka 集群的访问。要创建和管理访问控制列表(ACL),请使用 kafka-acls.sh
命令行工具。ACL 将访问规则应用到客户端应用程序。
在以下示例中,第一个 ACL 授予名为 my-topic
的特定主题的读取和描述权限。resource.patternType
设置为 literal
,这意味着资源名称必须完全匹配。
第二个 ACL 授予名为 my-group
的特定消费者组的读取权限。resource.patternType
设置为 prefix
,这意味着资源名称必须与前缀匹配。
ACL 配置示例
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \ --allow-principal User:my-user --operation Read --operation Describe --topic my-topic --resource-pattern-type literal \ --allow-principal User:my-user --operation Read --group my-group --resource-pattern-type prefixed
5.1.3. 建立到在 OpenShift 上运行的 Kafka 集群的安全连接
在 OpenShift 中使用流 for Apache Kafka 时,一般概述了保护与 Kafka 集群的客户端连接,如下所示:
使用 Cluster Operator 在 OpenShift 环境中部署 Kafka 集群。使用
Kafka
自定义资源配置和安装集群并创建监听程序。- 为监听程序配置身份验证,如 TLS 或 SASL SCRAM-SHA-512。Cluster Operator 会创建一个包含集群 CA 证书的 secret,以验证 Kafka 代理的身份。
-
为所有启用的监听程序配置授权,如
简单
授权。
使用 User Operator 创建一个代表您的客户端的 Kafka 用户。使用
KafkaUser
自定义资源配置和创建用户。- 为与监听器验证机制匹配的 Kafka 用户(客户端)配置身份验证。User Operator 会创建一个 secret,其中包含客户端用于与 Kafka 集群进行身份验证的客户端证书和私钥。
- 为您的 Kafka 用户(客户端)配置与监听器的授权机制匹配的授权。授权规则允许对 Kafka 集群的特定操作。
-
创建
config.properties
文件,以指定客户端应用程序连接到集群所需的连接详情和身份验证凭证。 启动 Kafka 客户端应用程序并连接到 Kafka 集群。
-
使用
config.properties
文件中定义的属性连接到 Kafka 代理。
-
使用
- 验证客户端是否可以成功连接到 Kafka 集群,并安全地使用和生成信息。
其他资源
有关设置代理的更多信息,请参阅在 OpenShift 中为 Apache Kafka 配置流。
5.1.4. 在 OpenShift 中为 Kafka 集群配置安全监听程序
当您使用 Apache Kafka
的 Streams 部署 Kafka 自定义资源时,您可以将监听程序配置添加到 Kafka spec
中。使用监听程序配置在 Kafka 中安全连接。要为 Kafka 代理配置安全连接,请在监听器级别设置 TLS、SASL 和其他与安全相关的配置的相关属性。
外部监听程序提供对 OpenShift 集群外部的 Kafka 集群的客户端访问。Apache Kafka 的 Streams 创建监听程序服务和 bootstrap 地址,以便根据配置启用对 Kafka 集群的访问。例如,您可以创建使用以下连接机制的外部监听程序:
- 节点端口
- loadBalancers
- OpenShift 路由
以下是 Kafka
资源的 nodeport
侦听器配置示例:
Kafka
资源中的监听程序配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... listeners: - name: plaintext port: 9092 type: internal tls: false configuration: useServiceDnsDomain: true - name: tls port: 9093 type: internal tls: true authentication: type: tls - name: external port: 9094 type: route tls: true authentication: type: tls authorization: type: simple superUsers: - CN=superuser # ...
listener 属性配置有三个监听程序: plaintext
、tls
和 external
。外部
侦听器的类型是 nodeport
,它使用 TLS 进行加密和身份验证。当使用 Cluster Operator 创建 Kafka 集群时,CA 证书会自动生成。您可以将集群 CA 添加到客户端应用程序的信任存储中,以验证 Kafka 代理的身份。另外,您可以将 Apache Kafka 的 Streams 配置为使用在代理或监听程序级别的您自己的证书。当客户端应用程序需要不同的安全配置时,可能需要在监听器级别使用证书。在监听器级别使用证书也会添加额外的控制和安全层。
使用配置供应商插件将配置数据加载到生成者和消费者客户端。配置提供程序插件从 secret 或 ConfigMap 加载配置数据。例如,您可以告诉供应商自动从 Strimzi secret 获取证书。如需更多信息,请参阅 OpenShift 上运行的 Apache Kafka 文档 的流。
Kafka 集群使用简单的授权。authorization 属性 type 设置为 simple
。为所有监听器上的非受限访问定义了单个超级用户。Apache Kafka 的流支持 Kafka SimpleAclAuthorizer
和自定义授权器插件。
这只是一个示例配置。某些配置选项特定于监听器的类型。如果您使用 OAuth 2.0 或 Open Policy Agent (OPA),还必须配置特定监听器中的授权服务器或 OPA 服务器的访问权限。您可以根据特定要求和环境创建监听程序。
有关监听器配置的更多信息,请参阅 GenericKafkaListener
模式参考。
当使用 路由类型
监听程序对 OpenShift 上的 Kafka 集群进行客户端访问时,会启用 TLS 透传功能。OpenShift 路由设计为使用 HTTP 协议,但也可用于为其他协议代理网络流量,包括 Apache Kafka 使用的 Kafka 协议。客户端建立与路由的连接,路由使用 TLS Server Name Indication (SNI)扩展将流量转发到 OpenShift 集群中运行的代理,以获取目标主机名。SNI 扩展允许路由为每个连接正确识别目标代理。
使用 ACL 来微调访问
您可以使用访问控制列表(ACL)来微调对 Kafka 集群的访问。要添加访问控制列表(ACL),您需要配置 KafkaUser
自定义资源。当您创建 KafkaUser
时,Apache Kafka 的 Streams 会自动管理创建和更新 ACL。ACL 将访问规则应用到客户端应用程序。
在以下示例中,第一个 ACL 授予名为 my-topic
的特定主题的读取和描述权限。resource.patternType
设置为 literal
,这意味着资源名称必须完全匹配。
第二个 ACL 授予名为 my-group
的特定消费者组的读取权限。resource.patternType
设置为 prefix
,这意味着资源名称必须与前缀匹配。
KafkaUser
资源中的 ACL 配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... authorization: type: simple acls: - resource: type: topic name: my-topic patternType: literal operations: - Read - Describe - resource: type: group name: my-group patternType: prefix operations: - Read
如果在配置 Kafka 用户时将 tls-external
指定为身份验证选项,您可以使用自己的客户端证书,而不是 User Operator 生成的客户端证书。
5.2. 设置客户端以进行安全访问
在 Kafka 代理上设置监听程序来支持安全连接后,下一步是将客户端应用程序配置为使用这些监听程序与 Kafka 集群通信。这包括根据监听器上配置的安全机制,为每个客户端提供适当的安全设置来使用集群进行身份验证。
5.2.1. 配置安全协议
配置客户端应用程序用来与 Kafka 代理监听程序中配置的协议匹配的安全协议。例如,使用 SSL
(安全套接字层)进行 TLS 身份验证,或 SASL_SSL
用于 SASL (通过 SSL 进行简单身份验证和安全层)通过 TLS 加密进行身份验证。将 truststore 和 keystore 添加到支持访问 Kafka 集群所需的身份验证机制的客户端配置中。
- truststore
- truststore 包含用来验证 Kafka 代理真实性的可信证书颁发机构(CA)的公共证书。当客户端连接到安全 Kafka 代理时,可能需要验证代理的身份。
- Keystore
- 密钥存储包含客户端的私钥及其公共证书。当客户端希望向代理验证自身时,它会提供自己的证书。
如果使用 TLS 身份验证,您的 Kafka 客户端配置需要一个信任存储和密钥存储来连接到 Kafka 集群。如果您使用 SASL SCRAM-SHA-512,则通过用户名和密码凭证交换来执行身份验证,而不是数字证书,因此不需要密钥存储。SCRAM-SHA-512 是一个更加轻量级的机制,但它不像使用基于证书的身份验证一样安全。
如果您有自己的证书基础架构并使用第三方 CA 的证书,则客户端的默认信任存储可能已经包含公共 CA 证书,您不需要将它们添加到客户端的信任存储中。如果客户端由默认信任存储中包含的公共 CA 证书签名,则客户端会自动信任服务器证书。
您可以创建一个 config.properties
文件来指定客户端应用程序使用的身份验证凭据。
在以下示例中,security.protocol
设置为 SSL
,以在客户端和代理之间启用 TLS 身份验证和加密。
ssl.truststore.location
和 ssl.truststore.password
属性指定信任存储的位置和密码。ssl.keystore.location
和 ssl.keystore.password
属性指定密钥存储的位置和密码。
使用 PKCSsetuptools (Public-Key Cryptography Standards obtain)文件格式。您还可以使用 base64 编码的 PEM (Privacy Enhanced Mail)格式。
TLS 身份验证的客户端配置属性示例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SSL ssl.truststore.location = /path/to/ca.p12 ssl.truststore.password = truststore-password ssl.keystore.location = /path/to/user.p12 ssl.keystore.password = keystore-password client.id = my-client
在以下示例中,security.protocol
设置为 SASL_SSL
,以在客户端和服务器间启用带有 TLS 加密的 SASL 身份验证。如果您只需要身份验证而不是加密,您可以使用 SASL
协议。用于身份验证的指定 SASL 机制是 SCRAM-SHA-512
。可以使用不同的验证机制。sasl.jaas.config
属性指定身份验证凭据。
SCRAM-SHA-512 验证的客户端配置属性示例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; ssl.truststore.location = path/to/truststore.p12 ssl.truststore.password = truststore_password ssl.truststore.type = PKCS12 client.id = my-client
对于不支持 PEM 格式的应用程序,您可以使用 OpenSSL 等工具将 PEM 文件转换为 PKCSGalaxy 格式。
5.2.2. 配置允许的 TLS 版本和密码套件
您可以纳入 SSL 配置和密码套件,以进一步保护客户端应用程序和 Kafka 集群之间的基于 TLS 的通信。在 Kafka 代理配置中指定支持的 TLS 版本和密码套件。如果要限制其使用的 TLS 版本和密码套件,您还可以将配置添加到您的客户端。客户端上的配置应该只使用在代理上启用的协议和密码套件。
在以下示例中,对于 Kafka 代理和客户端应用程序之间的通信,使用 security.protocol
启用 SSL。您可以将密码套件指定为用逗号分开的列表。ssl.cipher.suites 属性是
允许客户端使用的密码套件的逗号分隔列表。
Kafka 代理的 SSL 配置属性示例
security.protocol: "SSL" ssl.enabled.protocols: "TLSv1.3", "TLSv1.2" ssl.protocol: "TLSv1.3" ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"
ssl.enabled.protocols
属性指定可用于保护集群及其客户端之间的通信的可用 TLS 版本。在这种情况下,TLSv1.3
和 TLSv1.2
都已启用。ssl.protocol
属性为所有连接设置默认 TLS 版本,且必须从启用的协议中选择。默认情况下,客户端使用 TLSv1.3
进行通信。如果客户端只支持 TLSv1.2,它仍然可以连接到代理并使用该支持的版本进行通信。同样,如果配置位于客户端,并且代理只支持 TLSv1.2,客户端将使用支持的版本。
Apache Kafka 支持的密码套件取决于您使用的 Kafka 版本和底层环境。检查提供最高级别的安全的最新支持的密码套件。
5.2.3. 使用访问控制列表(ACL)
您不必为客户端应用程序中的 ACL 明确配置任何内容。Kafka 代理在服务器端强制 ACL。当客户端向服务器发送请求以生成或消耗数据时,服务器会检查 ACL,以确定客户端(用户)是否授权执行请求的操作。如果客户端被授权,则处理请求;否则,请求将被拒绝并返回错误。但是,客户端仍必须经过身份验证并使用适当的安全协议来启用与 Kafka 集群的安全连接。
如果您在 Kafka 代理上使用访问控制列表(ACL),请确保正确设置 ACL 来限制客户端对要控制的主题和操作的访问。如果您使用 Open Policy Agent (OPA)策略来管理访问,在策略中配置了授权规则,因此您不需要针对 Kafka 代理指定 ACL。OAuth 2.0 提供了一些灵活性:您可以使用 OAuth 2.0 供应商来管理 ACL;或使用 OAuth 2.0 和 Kafka 的简单
授权来管理 ACL。
ACL 适用于大多数类型的请求,不限于生成和消耗操作。例如,可以将 ACLS 应用到读操作,如描述主题或写入操作,如创建新主题。
5.2.4. 使用 OAuth 2.0 进行基于令牌的访问
使用 OAuth 2.0 开放式标准与 Apache Kafka 的流授权,通过 OAuth 2.0 供应商强制授权控制。OAuth 2.0 为应用程序提供了一种安全的方法来访问存储在其他系统中的用户数据。授权服务器可能会向客户端应用程序发出访问令牌来授予对 Kafka 集群的访问权限。
以下步骤描述了设置并使用 OAuth 2.0 进行令牌验证的一般方法:
- 使用代理和客户端凭证配置授权服务器,如客户端 ID 和 secret。
- 从授权服务器获取 OAuth 2.0 凭据。
- 使用 OAuth 2.0 凭证在 Kafka 代理中配置监听程序,并与授权服务器交互。
- 将 Oauth 2.0 依赖项添加到客户端库中。
- 使用 OAuth 2.0 凭证配置 Kafka 客户端,并与授权服务器交互。
- 在运行时获取访问令牌,它使用 OAuth 2.0 供应商验证客户端。
如果您在 Kafka 代理上为 OAuth 2.0 配置监听程序,您可以将客户端应用程序设置为使用 OAuth 2.0。除了访问 Kafka 集群的标准 Kafka 客户端配置外,还必须包含 OAuth 2.0 身份验证的特定配置。您还必须确保您使用的授权服务器可以被 Kafka 集群和客户端应用程序访问。
指定 SASL (Simple Authentication and Security Layer)安全协议和机制。在生产环境中,建议使用以下设置:
-
TLS 加密连接的
SASL_SSL
协议。 -
用于使用 bearer 令牌交换的凭证的
OAUTHBEARER
机制
JAAS (Java 身份验证和授权服务)模块实施 SASL 机制。机制的配置取决于您使用的身份验证方法。例如,使用凭证交换,您可以添加 OAuth 2.0 访问令牌端点、访问令牌、客户端 ID 和客户端 secret。客户端连接到授权服务器的令牌端点(URL),以检查令牌是否仍然有效。您还需要一个信任存储,其中包含授权服务器用于经过身份验证的用户访问的公钥证书。
OAauth 2.0 的客户端配置属性示例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = OAUTHBEARER # ... sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \ oauth.access.token = <access_token> \ oauth.client.id = "<client_id>" \ oauth.client.secret = "<client_secret>" \ oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \ oauth.ssl.truststore.password = "<truststore_password>" \ oauth.ssl.truststore.type = "PKCS12" \
其他资源
有关将代理设置为使用 OAuth 2.0 的更多信息,请参阅以下指南:
5.2.5. 使用 Open Policy Agent (OPA)访问策略
使用带有 Apache Kafka 的 Streams 的 Open Policy Agent (OPA)策略代理来评估请求,以针对访问策略连接到您的 Kafka 集群。开放策略代理(OPA)是一个策略引擎,用于管理授权策略。策略集中访问控制,可以动态更新,无需对客户端应用程序进行更改。例如,您可以创建一个策略,仅允许特定用户(客户端)生成和使用消息到特定主题。
Apache Kafka 的 Streams 使用 Open Policy Agent 插件作为授权器。
以下步骤描述了设置和使用 OPA 的一般方法:
- 设置 OPA 服务器实例。
- 定义提供管理 Kafka 集群的访问的授权规则的策略。
- 为 Kafka 代理创建配置,以接受 OPA 授权并与 OPA 服务器交互。
- 配置 Kafka 客户端,为授权访问 Kafka 集群提供凭证。
如果您在 Kafka 代理上为 OPA 配置了一个监听程序,您可以将客户端应用程序设置为使用 OPA。在监听器配置中,您可以指定一个 URL 来连接到 OPA 服务器并授权您的客户端应用程序。除了用于访问 Kafka 集群的标准 Kafka 客户端配置外,还必须添加凭证来与 Kafka 代理进行身份验证。代理通过向 OPA 服务器发送请求来评估授权策略,来检查客户端是否有必要的授权来执行请求的操作。您不需要信任存储或密钥存储来保护通信,因为策略引擎强制执行授权策略。
OPA 授权的客户端配置属性示例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; # ...
红帽不支持 OPA 服务器。
其他资源
有关设置代理以使用 OPA 的更多信息,请参阅以下指南:
5.2.6. 在流传输消息时使用事务
通过在代理和制作者客户端应用中配置事务属性,您可以确保在单个事务中处理消息。事务为消息流增加了可靠性和一致性。
在代理上总是启用事务。您可以使用以下属性更改默认配置:
事务的 Kafka 代理配置属性示例
transaction.state.log.replication.factor = 3 transaction.state.log.min.isr = 2 transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000
这是生产环境的典型配置,它为内部 __transaction_state
主题创建 3 个副本。\__transaction_state
主题存储有关事务的信息。事务日志至少需要 2 个同步副本。清理间隔是检查超时事务和清理对应的事务日志之间的时间。
要为客户端配置添加事务属性,您可以为生产者和消费者设置以下属性:
事务的制作者客户端配置属性示例
transactional.id = unique-transactional-id enable.idempotence = true max.in.flight.requests.per.connection = 5 acks = all retries=2147483647 transaction.timeout.ms = 30000 delivery.timeout = 25000
事务 ID 允许 Kafka 代理跟踪事务。它是制作者的唯一标识符,应当与一组特定分区一起使用。如果您需要为多组分区执行事务,您需要为每个组使用不同的事务 ID。启用 idempotence,以避免生成者实例创建重复消息。使用 idempotence 时,消息使用制作者 ID 和序列号进行跟踪。当代理收到消息时,它会检查制作者 ID 和序列号。如果已收到具有相同制作者 ID 和序列号的消息,代理会丢弃重复消息。
动态请求的最大数量被设置为 5,以便根据发送的顺序处理事务。分区最多可有 5 个动态请求,而不会影响消息的排序。
通过将 acks
设置为 all
,生成者会等待来自主题分区的所有同步副本的确认,然后再将事务视为完成。这样可确保信息被大量写入(提交)到 Kafka 集群,并在代理失败时不会丢失它们。
事务超时指定客户端在恢复前完成事务的最长时间。交付超时指定制作者在超时前等待代理确认的最大时间。为确保消息在事务周期内发送,请将交付超时设置为小于事务超时。在指定重新发送失败的消息请求 的重试次数
时,请考虑网络延迟和消息吞吐量,并允许临时故障。
事务的消费者客户端配置属性示例
group.id = my-group-id isolation.level = read_committed enable.auto.commit = false
read_committed
隔离级别指定消费者只读取成功完成的事务的消息。消费者不会处理作为持续或失败事务一部分的任何消息。这样可确保消费者只读取属于完整事务一部分的消息。
在使用事务流消息时,务必要将 enable.auto.commit
设置为 false
。如果设置为 true
,则消费者会定期提交偏移,而无需考虑事务。这意味着消费者可以在事务完成前提交消息。通过将 enable.auto.commit
设置为 false
,使用者只读取和提交已完全写入并提交的消息作为事务的一部分。
第 6 章 开发 Kafka 客户端
使用首选编程语言创建 Kafka 客户端,并将其连接到 Apache Kafka 的 Streams。
要与 Kafka 集群交互,客户端应用程序需要能够生成和使用消息。要开发并配置基本 Kafka 客户端应用程序,至少您必须执行以下操作:
- 设置配置以连接到 Kafka 集群
- 使用制作者和消费者发送和接收消息
设置用于连接 Kafka 集群和使用制作者和消费者的基本配置是开发 Kafka 客户端时的第一步。之后,您可以扩展以改进客户端应用程序的输入、安全性、性能、错误处理和功能。
先决条件
您可以创建一个客户端属性文件,其中包含以下内容的属性值:
流程
- 为您的编程语言选择一个 Kafka 客户端库,如 Java、Python、.NET 等。对于 Apache Kafka,只支持由红帽构建的客户端库。目前,Apache Kafka 的流仅提供 Java 客户端库。
- 通过软件包管理器或从其源下载库来手动安装库。
- 在代码中为您的 Kafka 客户端导入所需的类和依赖项。
根据您要创建的客户端类型,创建 Kafka 使用者或制作者对象。
客户端可以是 Kafka 使用者、生成者、流处理器和 admin。
提供配置属性以连接到 Kafka 集群,包括代理地址、端口和凭证(如果需要)。
对于本地 Kafka 部署,您可能以
localhost:9092
等地址开始。但是,当使用由 Streams for Apache Kafka 管理的 Kafka 集群时,您可以使用oc
命令从Kafka
自定义资源状态获取 bootstrap 地址:oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
此命令为 Kafka 集群上的客户端连接检索由监听程序公开的 bootstrap 地址。
- 使用 Kafka consumer 或 producer 对象订阅主题、生成信息或从 Kafka 集群检索信息。
- 请注意错误处理;在连接和与 Kafka 通信时,尤其是在高可用性和易于操作生产系统中非常重要。有效的错误处理是原型和生产级应用程序之间的关键区别,它只适用于 Kafka,还适用于任何强大的软件系统。
6.1. Kafka producer 应用程序示例
这个基于 Java 的 Kafka producer 应用程序是一个自包含应用程序的示例,它生成信息到 Kafka 主题。客户端使用 Kafka Producer
API 异步发送消息,并有一些错误处理。
客户端实施 Callback
接口以进行消息处理。
要运行 Kafka producer 应用程序,您可以在 Producer
类中执行 主
方法。客户端使用 randomBytes
方法生成随机字节阵列作为消息有效负载。客户端生成信息到指定的 Kafka 主题,直到 NUM_MESSAGES
消息(示例配置中为50)被发送。producer 是 thread-safe,允许多个线程使用单个制作者实例。
Kafka 生成者实例设计为线程安全,允许多个线程共享单个制作者实例。
这个示例客户端提供了为特定用例构建更复杂的 Kafka 生成者的基本基础。您可以纳入额外的功能,例如 实施安全连接。
先决条件
-
在指定的
BOOTSTRAP_SERVERS
上运行的 Kafka 代理 -
生成消息的 Kafka 主题
TOPIC_NAME
。 - 客户端依赖项
在实施 Kafka producer 应用程序前,您的项目必须包含所需的依赖项。对于基于 Java 的 Kafka 客户端,请包含 Kafka 客户端 JAR。此 JAR 文件包含构建和运行客户端所需的 Kafka 库。
有关如何将依赖项添加到 Maven 项目中的 pom.xml
文件中的详情,请参考 第 3.1 节 “在 Maven 项目中添加 Kafka 客户端依赖项”。
配置
您可以通过 Producer
类中指定的以下常数来配置制作者应用程序:
BOOTSTRAP_SERVERS
- 连接到 Kafka 代理的地址和端口。
TOPIC_NAME
- 生成消息的 Kafka 主题的名称。
NUM_MESSAGES
- 在停止前要生成的消息数量。
MESSAGE_SIZE_BYTES
- 每个消息的大小(以字节为单位)。
PROCESSING_DELAY_MS
- 发送消息之间的延迟(毫秒)。这可以模拟消息处理时间,这对测试非常有用。
producer 应用程序示例
import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongSerializer; public class Producer implements Callback { private static final Random RND = new Random(0); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC_NAME = "my-topic"; private static final long NUM_MESSAGES = 50; private static final int MESSAGE_SIZE_BYTES = 100; private static final long PROCESSING_DELAY_MS = 1000L; protected AtomicLong messageCount = new AtomicLong(0); public static void main(String[] args) { new Producer().run(); } public void run() { System.out.println("Running producer"); try (var producer = createKafkaProducer()) { 1 byte[] value = randomBytes(MESSAGE_SIZE_BYTES); 2 while (messageCount.get() < NUM_MESSAGES) { 3 sleep(PROCESSING_DELAY_MS); 4 producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this); 5 messageCount.incrementAndGet(); } } } private KafkaProducer<Long, byte[]> createKafkaProducer() { Properties props = new Properties(); 6 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 7 props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 8 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); 9 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return new KafkaProducer<>(props); } private void sleep(long ms) { 10 try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); } } private byte[] randomBytes(int size) { 11 if (size <= 0) { throw new IllegalArgumentException("Record size must be greater than zero"); } byte[] payload = new byte[size]; for (int i = 0; i < payload.length; ++i) { payload[i] = (byte) (RND.nextInt(26) + 65); } return payload; } private boolean retriable(Exception e) { 12 if (e instanceof IllegalArgumentException || e instanceof UnsupportedOperationException || !(e instanceof RetriableException)) { return false; } else { return true; } } @Override public void onCompletion(RecordMetadata metadata, Exception e) { 13 if (e != null) { System.err.println(e.getMessage()); if (!retriable(e)) { e.printStackTrace(); System.exit(1); } } else { System.out.printf("Record sent to %s-%d with offset %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } } }
- 1
- 客户端使用
createKafkaProducer
方法创建一个 Kafka producer。producer 异步发送消息到 Kafka 主题。 - 2
- 字节数组用作发送到 Kafka 主题的每个消息的有效负载。
- 3
- 发送的最大消息数量由
NUM_MESSAGES
常量值决定。 - 4
- 消息率使用发送的每个消息之间的延迟控制。
- 5
- 制作者传递主题名称、消息计数值和消息值。
- 6
- 客户端使用提供的配置创建
KafkaProducer
实例。您可以使用属性文件或直接添加配置。有关基本配置的详情,请参考 第 4 章 配置客户端应用程序以连接到 Kafka 集群。 - 7
- 与 Kafka 代理的连接。
- 8
- 使用随机生成的 UUID,生成者的唯一客户端 ID。不需要客户端 ID,但跟踪请求源非常有用。
- 9
- 用于处理键和值作为字节数组的适当序列化类。
- 10
- 在指定毫秒内引入消息发送进程的延时。如果负责发送消息的线程在暂停时中断,它会抛出
InterruptedException
错误。 - 11
- 创建特定大小随机字节阵列的方法,它充当发送到 Kafka 主题的每个消息的有效负载。这个方法会生成一个随机整数,并添加
65
来代表 ascii 代码中的大写字母(65 是A
, 66 为B
,以此类推)。ascii 代码存储为 payload 数组中的单个字节。如果有效负载大小不大于零,它会抛出IllegalArgumentException
。 - 12
- 检查是否在异常后重试发送消息的方法。Kafka producer 会自动处理特定错误的重试,如连接错误。您可以自定义此方法以包含其他错误。返回 null 和指定例外,或返回没有实现
RetriableException
接口的例外。 - 13
- Kafka 代理确认了消息后调用的方法。成功时,会输出一条消息,其中包含消息的主题、分区和偏移位置的详细信息。如果在发送消息时 ocurred 出错,则会显示错误消息。该方法检查异常,并根据它是致命错误还是非严重错误来采取适当的操作。如果错误是非严重的,则消息发送过程将继续。如果错误是致命的,将输出堆栈追踪,并且生成者终止。
错误处理
producer 应用程序发现的致命异常:
InterruptedException
-
当当前线程在暂停时中断时抛出错误。在停止或关闭制作者时,通常会中断。异常被重新增长为
RuntimeException
,后者终止制作者。 IllegalArgumentException
- 当生成者收到无效的或不当参数时,抛出错误。例如,如果缺少主题,则会抛出异常。
UnsupportedOperationException
-
不支持操作或未实施方法时抛出错误。例如,如果尝试使用不受支持的制作者配置或调用
KafkaProducer
类不支持的方法,则会抛出异常。
producer 应用程序发现的非严重异常:
RetriableException
-
对于实现 Kafka 客户端库提供的
RetriableException
接口的异常抛出错误。
使用非严重错误时,生成者将继续发送消息。
默认情况下,Kafka 使用 at-least-once 消息交付语义进行操作,这意味着在特定情况下,消息可能会多次发送,可能会导致重复。要避免这种风险,请考虑在 Kafka producer 中启用事务。事务可以更强地保证一次交付。另外,您可以使用 retries
配置属性来控制制作者将在放弃前重试发送消息的次数。此设置会影响在消息发送错误期间 retriable
方法可能会返回 true
的次数。
6.2. Kafka 消费者应用程序示例
这个基于 Java 的 Kafka 消费者应用程序是一个自包含应用程序的示例,它使用来自 Kafka 主题的信息。客户端使用 Kafka Consumer
API 来异步从指定主题获取和处理信息,并有一些错误处理。它遵循 at-least-once 语义,方法是在成功处理消息后提交偏移。
客户端实现 ConsumerRebalanceListener
接口以用于分区处理,以及用于提交偏移的 OffsetCommitCallback
接口。
要运行 Kafka 消费者应用程序,您可以在 Consumer
类中执行 主
方法。客户端会消耗来自 Kafka 主题的信息,直到 NUM_MESSAGES
信息(示例配置中为50)被使用。消费者不会被多个线程安全地访问。
这个示例客户端为为特定用例构建更复杂的 Kafka 用户提供了基本基础。您可以纳入额外的功能,例如 实施安全连接。
先决条件
-
在指定的
BOOTSTRAP_SERVERS
上运行的 Kafka 代理 -
名为
TOPIC_NAME
的 Kafka 主题,从中消耗消息。 - 客户端依赖项
在实施 Kafka 消费者应用程序前,您的项目必须包含所需的依赖项。对于基于 Java 的 Kafka 客户端,请包含 Kafka 客户端 JAR。此 JAR 文件包含构建和运行客户端所需的 Kafka 库。
有关如何将依赖项添加到 Maven 项目中的 pom.xml
文件中的详情,请参考 第 3.1 节 “在 Maven 项目中添加 Kafka 客户端依赖项”。
配置
您可以通过在 Consumer
类中指定的以下常数来配置消费者应用程序:
BOOTSTRAP_SERVERS
- 连接到 Kafka 代理的地址和端口。
GROUP_ID
- 消费者组标识符。
POLL_TIMEOUT_MS
- 每次轮询期间等待新消息的最长时间。
TOPIC_NAME
- 要使用消息的 Kafka 主题的名称。
NUM_MESSAGES
- 在停止前使用的消息数量。
PROCESSING_DELAY_MS
- 发送消息之间的延迟(毫秒)。这可以模拟消息处理时间,这对测试非常有用。
消费者应用程序示例
import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.LongDeserializer; import static java.time.Duration.ofMillis; import static java.util.Collections.singleton; public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "my-group"; private static final long POLL_TIMEOUT_MS = 1_000L; private static final String TOPIC_NAME = "my-topic"; private static final long NUM_MESSAGES = 50; private static final long PROCESSING_DELAY_MS = 1_000L; private KafkaConsumer<Long, byte[]> kafkaConsumer; protected AtomicLong messageCount = new AtomicLong(0); private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>(); public static void main(String[] args) { new Consumer().run(); } public void run() { System.out.println("Running consumer"); try (var consumer = createKafkaConsumer()) { 1 kafkaConsumer = consumer; consumer.subscribe(singleton(TOPIC_NAME), this); 2 System.out.printf("Subscribed to %s%n", TOPIC_NAME); while (messageCount.get() < NUM_MESSAGES) { 3 try { ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS)); 4 if (!records.isEmpty()) { 5 for (ConsumerRecord<Long, byte[]> record : records) { System.out.printf("Record fetched from %s-%d with offset %d%n", record.topic(), record.partition(), record.offset()); sleep(PROCESSING_DELAY_MS); 6 pendingOffsets.put(new TopicPartition(record.topic(), record.partition()), 7 new OffsetAndMetadata(record.offset() + 1, null)); if (messageCount.incrementAndGet() == NUM_MESSAGES) { break; } } consumer.commitAsync(pendingOffsets, this); 8 pendingOffsets.clear(); } } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { 9 System.out.println("Invalid or no offset found, and auto.reset.policy unset, using latest"); consumer.seekToEnd(e.partitions()); consumer.commitSync(); } catch (Exception e) { System.err.println(e.getMessage()); if (!retriable(e)) { e.printStackTrace(); System.exit(1); } } } } } private KafkaConsumer<Long, byte[]> createKafkaConsumer() { Properties props = new Properties(); 10 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 11 props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 12 props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); 13 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); 14 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 15 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 16 return new KafkaConsumer<>(props); } private void sleep(long ms) { 17 try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); } } private boolean retriable(Exception e) { 18 if (e == null) { return false; } else if (e instanceof IllegalArgumentException || e instanceof UnsupportedOperationException || !(e instanceof RebalanceInProgressException) || !(e instanceof RetriableException)) { return false; } else { return true; } } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 19 System.out.printf("Assigned partitions: %s%n", partitions); } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 20 System.out.printf("Revoked partitions: %s%n", partitions); kafkaConsumer.commitSync(pendingOffsets); pendingOffsets.clear(); } @Override public void onPartitionsLost(Collection<TopicPartition> partitions) { 21 System.out.printf("Lost partitions: {}", partitions); } @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { 22 if (e != null) { System.err.println("Failed to commit offsets"); if (!retriable(e)) { e.printStackTrace(); System.exit(1); } } } }
- 1
- 客户端使用
createKafkaConsumer
方法创建一个 Kafka 使用者。 - 2
- 消费者订阅特定主题。订阅主题后,会输出确认信息。
- 3
- 所消耗的消息的最大数量由
NUM_MESSAGES
常量值决定。 - 4
- 在
session.timeout.ms
中必须调用下一个轮询来获取消息,以避免重新平衡。 - 5
- 检查包含从 Kafka 获取的批处理消息的
records
对象是否不为空。如果records
对象为空,则没有要处理的新消息,并跳过进程。 - 6
- 为指定毫秒数显示消息获取过程的方法。
- 7
- 消费者使用
待处理的Offset 映射来存储需要提交的已用消息的偏移量
。 - 8
- 在处理批处理消息后,消费者异步使用
commitAsync
方法提交偏移,实现 at-least-once 语义。 - 9
- 在消耗消息和自动重置策略时,用于处理非严重和严重错误的捕获。对于非严重错误,消费者寻求分区的末尾,并开始使用最新的可用偏移。如果无法重试异常,则会输出堆栈追踪,并终止消费者。
- 10
- 客户端使用提供的配置创建
KafkaConsumer
实例。您可以使用属性文件或直接添加配置。有关基本配置的详情,请参考 第 4 章 配置客户端应用程序以连接到 Kafka 集群。 - 11
- 与 Kafka 代理的连接。
- 12
- 使用随机生成的 UUID,生成者的唯一客户端 ID。不需要客户端 ID,但跟踪请求源非常有用。
- 13
- 用于与分区的消费者协调的组 ID。
- 14
- 用于处理键和值作为字节数数组的适当反序列化器类。
- 15
- 配置,以禁用自动偏移提交。
- 16
- 在没有为分区找到提交偏移时,消费者开始使用来自最早可用偏移的消息。
- 17
- 在指定毫秒内引入消息消耗进程的延时。如果负责发送消息的线程在暂停时中断,它会抛出
InterruptedException
错误。 - 18
- 检查是否在异常后重试提交消息的方法。不重试 null 和 specified 异常,也不是没有实现
RebalanceInProgressException
或RetriableException
接口的例外。您可以自定义此方法以包含其他错误。 - 19
- 将消息输出到控制台的方法指示已分配给消费者的分区列表。
- 20
- 当消费者要在消费者组重新平衡期间丢失分区的所有权时调用的方法。该方法打印从消费者撤销的分区列表。提交任何待处理的偏移。
- 21
- 当消费者在消费者重新平衡过程中丢失分区的所有权时调用的方法,但无法提交任何待处理的偏移。该方法打印使用者丢失的分区列表。
- 22
- 当消费者向 Kafka 提交偏移时调用的方法。如果在提交偏移时 ocurred 错误,则会打印错误消息。该方法检查异常,并根据它是致命错误还是非严重错误来采取适当的操作。如果错误不是严重的,则偏移提交过程将继续。如果错误是致命的,则会输出堆栈追踪,并且使用者终止。
错误处理
消费者应用程序捕获的致命异常:
InterruptedException
-
当当前线程在暂停时中断时抛出错误。在停止或关闭消费者时通常会中断。异常被重新增长为
RuntimeException
,后者终止消费者。 IllegalArgumentException
- 当使用者收到无效或不当参数时,抛出错误。例如,如果缺少主题,则会抛出异常。
UnsupportedOperationException
-
不支持操作或未实施方法时抛出错误。例如,如果尝试使用不受支持的消费者配置或调用
KafkaConsumer
类不支持的方法,则会抛出异常。
消费者应用程序发现的非严重异常:
OffsetOutOfRangeException
-
当消费者试图查看分区无效偏移时抛出错误,通常是当偏移超出该分区的有效偏移范围时,并且未启用自动重置策略。要恢复,消费者查找分区末尾以同步提交偏移(
commitSync
)。如果启用了 auto-reset 策略,则根据设置,消费者查找到分区的开始或结束。 NoOffsetForPartitionException
-
当分区没有提交偏移或请求的偏移无效时抛出错误,且没有启用自动重置策略。要恢复,消费者查找分区末尾以同步提交偏移(
commitSync
)。如果启用了 auto-reset 策略,则根据设置,消费者查找到分区的开始或结束。 RebalanceInProgressException
- 当分配了分区时,消费者组重新平衡过程中抛出错误。当消费者进入重新平衡时,无法完成偏移提交。
RetriableException
-
对于实现 Kafka 客户端库提供的
RetriableException
接口的异常抛出错误。
使用非严重错误时,使用者会继续处理消息。
6.3. 将合作重新平衡与消费者一起使用
Kafka 用户使用由重新平衡协议决定的分区分配策略。默认情况下,Kafka 使用 RangeAssignor
协议,它涉及用户在重新平衡过程中重新精简其分区分配,从而导致潜在的服务中断。
为提高效率并缩短停机时间,您可以切换到 CooperativeStickyAssignor
协议,这是一个合作重新平衡方法。与默认协议不同,合作重新平衡使消费者能够一起工作,在重新平衡期间保留其分区分配,只有在需要在消费者组中实现平衡时才释放分区。
流程
在消费者配置中,使用
partition.assignment.strategy
属性切换到使用CooperativeStickyAssignor
作为协议。例如,如果当前配置是partition.assignment.strategy=RangeAssignor, CooperativeStickyAssignor
,请将其更新为partition.assignment.strategy=CooperativeStickyAssignor
。您还可以使用消费者应用程序代码中的
props.put
设置分区分配策略,而不是直接修改消费者配置文件:# ... props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); # ...
- 一次重启组中的每个消费者,允许它们在每次重启后重新加入组。
切换到 CooperativeStickyAssignor
协议后,可能会在消费者重新平衡过程中发生 RebalanceInProgressException
,从而导致同一消费者组中的多个 Kafka 客户端的意外停止页面。另外,这个问题可能会导致意外的消息重复,即使 Kafka 用户在重新平衡过程中没有更改其分区分配。如果您使用自动偏移提交(enable.auto.commit=true
),则不需要进行任何更改。如果您要手动提交偏移(enable.auto.commit=false
),并在手动提交过程中发生 RebalanceInProgressException
,请将消费者实施更改为下一循环中调用 poll ()
以完成消费者重新平衡过程。如需更多信息,请参阅 客户门户网站中的合作StickyAssignor
文章。
附录 A. 使用您的订阅
Apache Kafka 的流通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
访问您的帐户
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
激活订阅
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
下载 Zip 和 Tar 文件
要访问 zip 或 tar 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 在 INTEGRATION AND AUTOMATION 目录中找到 Apache Kafka for Apache Kafka 的流。
- 选择 Apache Kafka 产品所需的流。此时会打开 Software Downloads 页面。
- 单击组件的 Download 链接。
使用 DNF 安装软件包
要安装软件包以及所有软件包的依赖软件包,请使用:
dnf install <package_name>
要从本地目录中安装之前下载的软件包,请使用:
dnf install <path_to_download_package>
更新于 2024-04-30