第 6 章 开发 Kafka 客户端
使用首选编程语言创建 Kafka 客户端,并将其连接到 Apache Kafka 的 Streams。
要与 Kafka 集群交互,客户端应用程序需要能够生成和使用消息。要开发并配置基本 Kafka 客户端应用程序,至少您必须执行以下操作:
- 设置配置以连接到 Kafka 集群
- 使用制作者和消费者发送和接收消息
设置用于连接 Kafka 集群和使用制作者和消费者的基本配置是开发 Kafka 客户端时的第一步。之后,您可以扩展以改进客户端应用程序的输入、安全性、性能、错误处理和功能。
先决条件
您可以创建一个客户端属性文件,其中包含以下内容的属性值:
流程
- 为您的编程语言选择一个 Kafka 客户端库,如 Java、Python、.NET 等。对于 Apache Kafka,只支持由红帽构建的客户端库。目前,Apache Kafka 的流仅提供 Java 客户端库。
- 通过软件包管理器或从其源下载库来手动安装库。
- 在代码中为您的 Kafka 客户端导入所需的类和依赖项。
根据您要创建的客户端类型,创建 Kafka 使用者或制作者对象。
客户端可以是 Kafka 使用者、生成者、流处理器和 admin。
提供配置属性以连接到 Kafka 集群,包括代理地址、端口和凭证(如果需要)。
对于本地 Kafka 部署,您可能以
localhost:9092
等地址开始。但是,当使用由 Streams for Apache Kafka 管理的 Kafka 集群时,您可以使用oc
命令从Kafka
自定义资源状态获取 bootstrap 地址:Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
此命令为 Kafka 集群上的客户端连接检索由监听程序公开的 bootstrap 地址。
- 使用 Kafka consumer 或 producer 对象订阅主题、生成信息或从 Kafka 集群检索信息。
- 请注意错误处理;在连接和与 Kafka 通信时,尤其是在高可用性和易于操作生产系统中非常重要。有效的错误处理是原型和生产级应用程序之间的关键区别,它只适用于 Kafka,还适用于任何强大的软件系统。
6.1. Kafka producer 应用程序示例
这个基于 Java 的 Kafka producer 应用程序是一个自包含应用程序的示例,它生成信息到 Kafka 主题。客户端使用 Kafka Producer
API 异步发送消息,并有一些错误处理。
客户端实施 Callback
接口以进行消息处理。
要运行 Kafka producer 应用程序,您可以在 Producer
类中执行 主
方法。客户端使用 randomBytes
方法生成随机字节阵列作为消息有效负载。客户端生成信息到指定的 Kafka 主题,直到 NUM_MESSAGES
消息(示例配置中为50)被发送。producer 是 thread-safe,允许多个线程使用单个制作者实例。
Kafka 生成者实例设计为线程安全,允许多个线程共享单个制作者实例。
这个示例客户端提供了为特定用例构建更复杂的 Kafka 生成者的基本基础。您可以纳入额外的功能,例如 实施安全连接。
先决条件
-
在指定的
BOOTSTRAP_SERVERS
上运行的 Kafka 代理 -
生成消息的 Kafka 主题
TOPIC_NAME
。 - 客户端依赖项
在实施 Kafka producer 应用程序前,您的项目必须包含所需的依赖项。对于基于 Java 的 Kafka 客户端,请包含 Kafka 客户端 JAR。此 JAR 文件包含构建和运行客户端所需的 Kafka 库。
有关如何将依赖项添加到 Maven 项目中的 pom.xml
文件中的详情,请参考 第 3.1 节 “在 Maven 项目中添加 Kafka 客户端依赖项”。
配置
您可以通过 Producer
类中指定的以下常数来配置制作者应用程序:
BOOTSTRAP_SERVERS
- 连接到 Kafka 代理的地址和端口。
TOPIC_NAME
- 生成消息的 Kafka 主题的名称。
NUM_MESSAGES
- 在停止前要生成的消息数量。
MESSAGE_SIZE_BYTES
- 每个消息的大小(以字节为单位)。
PROCESSING_DELAY_MS
- 发送消息之间的延迟(毫秒)。这可以模拟消息处理时间,这对测试非常有用。
producer 应用程序示例
import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongSerializer; public class Producer implements Callback { private static final Random RND = new Random(0); private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC_NAME = "my-topic"; private static final long NUM_MESSAGES = 50; private static final int MESSAGE_SIZE_BYTES = 100; private static final long PROCESSING_DELAY_MS = 1000L; protected AtomicLong messageCount = new AtomicLong(0); public static void main(String[] args) { new Producer().run(); } public void run() { System.out.println("Running producer"); try (var producer = createKafkaProducer()) { byte[] value = randomBytes(MESSAGE_SIZE_BYTES); while (messageCount.get() < NUM_MESSAGES) { sleep(PROCESSING_DELAY_MS); producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this); messageCount.incrementAndGet(); } } } private KafkaProducer<Long, byte[]> createKafkaProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); return new KafkaProducer<>(props); } private void sleep(long ms) { try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); } } private byte[] randomBytes(int size) { if (size <= 0) { throw new IllegalArgumentException("Record size must be greater than zero"); } byte[] payload = new byte[size]; for (int i = 0; i < payload.length; ++i) { payload[i] = (byte) (RND.nextInt(26) + 65); } return payload; } private boolean retriable(Exception e) { if (e instanceof IllegalArgumentException || e instanceof UnsupportedOperationException || !(e instanceof RetriableException)) { return false; } else { return true; } } @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { System.err.println(e.getMessage()); if (!retriable(e)) { e.printStackTrace(); System.exit(1); } } else { System.out.printf("Record sent to %s-%d with offset %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } } }
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;
public class Producer implements Callback {
private static final Random RND = new Random(0);
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "my-topic";
private static final long NUM_MESSAGES = 50;
private static final int MESSAGE_SIZE_BYTES = 100;
private static final long PROCESSING_DELAY_MS = 1000L;
protected AtomicLong messageCount = new AtomicLong(0);
public static void main(String[] args) {
new Producer().run();
}
public void run() {
System.out.println("Running producer");
try (var producer = createKafkaProducer()) {
byte[] value = randomBytes(MESSAGE_SIZE_BYTES);
while (messageCount.get() < NUM_MESSAGES) {
sleep(PROCESSING_DELAY_MS);
producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this);
messageCount.incrementAndGet();
}
}
}
private KafkaProducer<Long, byte[]> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return new KafkaProducer<>(props);
}
private void sleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private byte[] randomBytes(int size) {
if (size <= 0) {
throw new IllegalArgumentException("Record size must be greater than zero");
}
byte[] payload = new byte[size];
for (int i = 0; i < payload.length; ++i) {
payload[i] = (byte) (RND.nextInt(26) + 65);
}
return payload;
}
private boolean retriable(Exception e) {
if (e instanceof IllegalArgumentException
|| e instanceof UnsupportedOperationException
|| !(e instanceof RetriableException)) {
return false;
} else {
return true;
}
}
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
System.err.println(e.getMessage());
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
} else {
System.out.printf("Record sent to %s-%d with offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
}
- 1
- 客户端使用
createKafkaProducer
方法创建一个 Kafka producer。producer 异步发送消息到 Kafka 主题。 - 2
- 字节数组用作发送到 Kafka 主题的每个消息的有效负载。
- 3
- 发送的最大消息数量由
NUM_MESSAGES
常量值决定。 - 4
- 消息率使用发送的每个消息之间的延迟控制。
- 5
- 制作者传递主题名称、消息计数值和消息值。
- 6
- 客户端使用提供的配置创建
KafkaProducer
实例。您可以使用属性文件或直接添加配置。有关基本配置的详情,请参考 第 4 章 配置客户端应用程序以连接到 Kafka 集群。 - 7
- 与 Kafka 代理的连接。
- 8
- 使用随机生成的 UUID,生成者的唯一客户端 ID。不需要客户端 ID,但跟踪请求源非常有用。
- 9
- 用于处理键和值作为字节数组的适当序列化类。
- 10
- 在指定毫秒内引入消息发送进程的延时。如果负责发送消息的线程在暂停时中断,它会抛出
InterruptedException
错误。 - 11
- 创建特定大小随机字节阵列的方法,它充当发送到 Kafka 主题的每个消息的有效负载。这个方法会生成一个随机整数,并添加
65
来代表 ascii 代码中的大写字母(65 是A
, 66 为B
,以此类推)。ascii 代码存储为 payload 数组中的单个字节。如果有效负载大小不大于零,它会抛出IllegalArgumentException
。 - 12
- 检查是否在异常后重试发送消息的方法。Kafka producer 会自动处理特定错误的重试,如连接错误。您可以自定义此方法以包含其他错误。返回 null 和指定例外,或返回没有实现
RetriableException
接口的例外。 - 13
- Kafka 代理确认了消息后调用的方法。成功时,会输出一条消息,其中包含消息的主题、分区和偏移位置的详细信息。如果在发送消息时 ocurred 出错,则会显示错误消息。该方法检查异常,并根据它是致命错误还是非严重错误来采取适当的操作。如果错误是非严重的,则消息发送过程将继续。如果错误是致命的,将输出堆栈追踪,并且生成者终止。
错误处理
producer 应用程序发现的致命异常:
InterruptedException
-
当当前线程在暂停时中断时抛出错误。在停止或关闭制作者时,通常会中断。异常被重新增长为
RuntimeException
,后者终止制作者。 IllegalArgumentException
- 当生成者收到无效的或不当参数时,抛出错误。例如,如果缺少主题,则会抛出异常。
UnsupportedOperationException
-
不支持操作或未实施方法时抛出错误。例如,如果尝试使用不受支持的制作者配置或调用
KafkaProducer
类不支持的方法,则会抛出异常。
producer 应用程序发现的非严重异常:
RetriableException
-
对于实现 Kafka 客户端库提供的
RetriableException
接口的异常抛出错误。
使用非严重错误时,生成者将继续发送消息。
默认情况下,Kafka 使用 at-least-once 消息交付语义进行操作,这意味着在特定情况下,消息可能会多次发送,可能会导致重复。要避免这种风险,请考虑在 Kafka producer 中启用事务。事务可以更强地保证一次交付。另外,您可以使用 retries
配置属性来控制制作者将在放弃前重试发送消息的次数。此设置会影响在消息发送错误期间 retriable
方法可能会返回 true
的次数。