第 6 章 开发 Kafka 客户端
使用首选编程语言创建 Kafka 客户端,并将其连接到 Apache Kafka 的 Streams。
要与 Kafka 集群交互,客户端应用程序需要能够生成和使用消息。要开发并配置基本 Kafka 客户端应用程序,至少您必须执行以下操作:
- 设置配置以连接到 Kafka 集群
- 使用制作者和消费者发送和接收消息
设置用于连接 Kafka 集群和使用制作者和消费者的基本配置是开发 Kafka 客户端时的第一步。之后,您可以扩展以改进客户端应用程序的输入、安全性、性能、错误处理和功能。
先决条件
您可以创建一个客户端属性文件,其中包含以下内容的属性值:
流程
- 为您的编程语言选择一个 Kafka 客户端库,如 Java、Python、.NET 等。对于 Apache Kafka,只支持由红帽构建的客户端库。目前,Apache Kafka 的流仅提供 Java 客户端库。
- 通过软件包管理器或从其源下载库来手动安装库。
- 在代码中为您的 Kafka 客户端导入所需的类和依赖项。
根据您要创建的客户端类型,创建 Kafka 使用者或制作者对象。
客户端可以是 Kafka 使用者、生成者、流处理器和 admin。
提供配置属性以连接到 Kafka 集群,包括代理地址、端口和凭证(如果需要)。
对于本地 Kafka 部署,您可能以
localhost:9092
等地址开始。但是,当使用由 Streams for Apache Kafka 管理的 Kafka 集群时,您可以使用oc
命令从Kafka
自定义资源状态获取 bootstrap 地址:oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 此命令为 Kafka 集群上的客户端连接检索由监听程序公开的 bootstrap 地址。
- 使用 Kafka consumer 或 producer 对象订阅主题、生成信息或从 Kafka 集群检索信息。
- 请注意错误处理;在连接和与 Kafka 通信时,尤其是在高可用性和易于操作生产系统中非常重要。有效的错误处理是原型和生产级应用程序之间的关键区别,它只适用于 Kafka,还适用于任何强大的软件系统。
6.1. Kafka producer 应用程序示例 复制链接链接已复制到粘贴板!
这个基于 Java 的 Kafka producer 应用程序是一个自包含应用程序的示例,它生成信息到 Kafka 主题。客户端使用 Kafka Producer
API 异步发送消息,并有一些错误处理。
客户端实施 Callback
接口以进行消息处理。
要运行 Kafka producer 应用程序,您可以在 Producer
类中执行 主
方法。客户端使用 randomBytes
方法生成随机字节阵列作为消息有效负载。客户端生成信息到指定的 Kafka 主题,直到 NUM_MESSAGES
消息(示例配置中为50)被发送。producer 是 thread-safe,允许多个线程使用单个制作者实例。
Kafka 生成者实例设计为线程安全,允许多个线程共享单个制作者实例。
这个示例客户端提供了为特定用例构建更复杂的 Kafka 生成者的基本基础。您可以纳入额外的功能,例如 实施安全连接。
先决条件
-
在指定的
BOOTSTRAP_SERVERS
上运行的 Kafka 代理 -
生成消息的 Kafka 主题
TOPIC_NAME
。 - 客户端依赖项
在实施 Kafka producer 应用程序前,您的项目必须包含所需的依赖项。对于基于 Java 的 Kafka 客户端,请包含 Kafka 客户端 JAR。此 JAR 文件包含构建和运行客户端所需的 Kafka 库。
有关如何将依赖项添加到 Maven 项目中的 pom.xml
文件中的详情,请参考 第 3.1 节 “在 Maven 项目中添加 Kafka 客户端依赖项”。
配置
您可以通过 Producer
类中指定的以下常数来配置制作者应用程序:
BOOTSTRAP_SERVERS
- 连接到 Kafka 代理的地址和端口。
TOPIC_NAME
- 生成消息的 Kafka 主题的名称。
NUM_MESSAGES
- 在停止前要生成的消息数量。
MESSAGE_SIZE_BYTES
- 每个消息的大小(以字节为单位)。
PROCESSING_DELAY_MS
- 发送消息之间的延迟(毫秒)。这可以模拟消息处理时间,这对测试非常有用。
producer 应用程序示例
- 1
- 客户端使用
createKafkaProducer
方法创建一个 Kafka producer。producer 异步发送消息到 Kafka 主题。 - 2
- 字节数组用作发送到 Kafka 主题的每个消息的有效负载。
- 3
- 发送的最大消息数量由
NUM_MESSAGES
常量值决定。 - 4
- 消息率使用发送的每个消息之间的延迟控制。
- 5
- 制作者传递主题名称、消息计数值和消息值。
- 6
- 客户端使用提供的配置创建
KafkaProducer
实例。您可以使用属性文件或直接添加配置。有关基本配置的详情,请参考 第 4 章 配置客户端应用程序以连接到 Kafka 集群。 - 7
- 与 Kafka 代理的连接。
- 8
- 使用随机生成的 UUID,生成者的唯一客户端 ID。不需要客户端 ID,但跟踪请求源非常有用。
- 9
- 用于处理键和值作为字节数组的适当序列化类。
- 10
- 在指定毫秒内引入消息发送进程的延时。如果负责发送消息的线程在暂停时中断,它会抛出
InterruptedException
错误。 - 11
- 创建特定大小随机字节阵列的方法,它充当发送到 Kafka 主题的每个消息的有效负载。这个方法会生成一个随机整数,并添加
65
来代表 ascii 代码中的大写字母(65 是A
, 66 为B
,以此类推)。ascii 代码存储为 payload 数组中的单个字节。如果有效负载大小不大于零,它会抛出IllegalArgumentException
。 - 12
- 检查是否在异常后重试发送消息的方法。Kafka producer 会自动处理特定错误的重试,如连接错误。您可以自定义此方法以包含其他错误。返回 null 和指定例外,或返回没有实现
RetriableException
接口的例外。 - 13
- Kafka 代理确认了消息后调用的方法。成功时,会输出一条消息,其中包含消息的主题、分区和偏移位置的详细信息。如果在发送消息时 ocurred 出错,则会显示错误消息。该方法检查异常,并根据它是致命错误还是非严重错误来采取适当的操作。如果错误是非严重的,则消息发送过程将继续。如果错误是致命的,将输出堆栈追踪,并且生成者终止。
错误处理
producer 应用程序发现的致命异常:
InterruptedException
-
当当前线程在暂停时中断时抛出错误。在停止或关闭制作者时,通常会中断。异常被重新增长为
RuntimeException
,后者终止制作者。 IllegalArgumentException
- 当生成者收到无效的或不当参数时,抛出错误。例如,如果缺少主题,则会抛出异常。
UnsupportedOperationException
-
不支持操作或未实施方法时抛出错误。例如,如果尝试使用不受支持的制作者配置或调用
KafkaProducer
类不支持的方法,则会抛出异常。
producer 应用程序发现的非严重异常:
RetriableException
-
对于实现 Kafka 客户端库提供的
RetriableException
接口的异常抛出错误。
使用非严重错误时,生成者将继续发送消息。
默认情况下,Kafka 使用 at-least-once 消息交付语义进行操作,这意味着在特定情况下,消息可能会多次发送,可能会导致重复。要避免这种风险,请考虑在 Kafka producer 中启用事务。事务可以更强地保证一次交付。另外,您可以使用 retries
配置属性来控制制作者将在放弃前重试发送消息的次数。此设置会影响在消息发送错误期间 retriable
方法可能会返回 true
的次数。