6.2. Kafka 消费者应用程序示例
这个基于 Java 的 Kafka 消费者应用程序是一个自包含应用程序的示例,它使用来自 Kafka 主题的信息。客户端使用 Kafka Consumer API 来异步从指定主题获取和处理信息,并有一些错误处理。它遵循 at-least-once 语义,方法是在成功处理消息后提交偏移。
客户端实现 ConsumerRebalanceListener 接口以用于分区处理,以及用于提交偏移的 OffsetCommitCallback 接口。
要运行 Kafka 消费者应用程序,您可以在 Consumer 类中执行 主 方法。客户端会消耗来自 Kafka 主题的信息,直到 NUM_MESSAGES 信息(示例配置中为50)被使用。消费者不会被多个线程安全地访问。
这个示例客户端为为特定用例构建更复杂的 Kafka 用户提供了基本基础。您可以纳入额外的功能,例如 实施安全连接。
先决条件
-
在指定的
BOOTSTRAP_SERVERS上运行的 Kafka 代理 -
名为
TOPIC_NAME的 Kafka 主题,从中消耗消息。 - 客户端依赖项
在实施 Kafka 消费者应用程序前,您的项目必须包含所需的依赖项。对于基于 Java 的 Kafka 客户端,请包含 Kafka 客户端 JAR。此 JAR 文件包含构建和运行客户端所需的 Kafka 库。
有关如何将依赖项添加到 Maven 项目中的 pom.xml 文件中的详情,请参考 第 3.1 节 “在 Maven 项目中添加 Kafka 客户端依赖项”。
Configuration
您可以通过在 Consumer 类中指定的以下常数来配置消费者应用程序:
BOOTSTRAP_SERVERS- 连接到 Kafka 代理的地址和端口。
GROUP_ID- 消费者组标识符。
POLL_TIMEOUT_MS- 每次轮询期间等待新消息的最长时间。
TOPIC_NAME- 要使用消息的 Kafka 主题的名称。
NUM_MESSAGES- 在停止前使用的消息数量。
PROCESSING_DELAY_MS- 发送消息之间的延迟(毫秒)。这可以模拟消息处理时间,这对测试非常有用。
消费者应用程序示例
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import static java.time.Duration.ofMillis;
import static java.util.Collections.singleton;
public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
private static final long POLL_TIMEOUT_MS = 1_000L;
private static final String TOPIC_NAME = "my-topic";
private static final long NUM_MESSAGES = 50;
private static final long PROCESSING_DELAY_MS = 1_000L;
private KafkaConsumer<Long, byte[]> kafkaConsumer;
protected AtomicLong messageCount = new AtomicLong(0);
private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();
public static void main(String[] args) {
new Consumer().run();
}
public void run() {
System.out.println("Running consumer");
try (var consumer = createKafkaConsumer()) {
kafkaConsumer = consumer;
consumer.subscribe(singleton(TOPIC_NAME), this);
System.out.printf("Subscribed to %s%n", TOPIC_NAME);
while (messageCount.get() < NUM_MESSAGES) {
try {
ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS));
if (!records.isEmpty()) {
for (ConsumerRecord<Long, byte[]> record : records) {
System.out.printf("Record fetched from %s-%d with offset %d%n",
record.topic(), record.partition(), record.offset());
sleep(PROCESSING_DELAY_MS);
pendingOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
if (messageCount.incrementAndGet() == NUM_MESSAGES) {
break;
}
}
consumer.commitAsync(pendingOffsets, this);
pendingOffsets.clear();
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
System.out.println("Invalid or no offset found, and auto.reset.policy unset, using latest");
consumer.seekToEnd(e.partitions());
consumer.commitSync();
} catch (Exception e) {
System.err.println(e.getMessage());
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
}
}
}
}
private KafkaConsumer<Long, byte[]> createKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
private void sleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private boolean retriable(Exception e) {
if (e == null) {
return false;
} else if (e instanceof IllegalArgumentException
|| e instanceof UnsupportedOperationException
|| !(e instanceof RebalanceInProgressException)
|| !(e instanceof RetriableException)) {
return false;
} else {
return true;
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("Assigned partitions: %s%n", partitions);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("Revoked partitions: %s%n", partitions);
kafkaConsumer.commitSync(pendingOffsets);
pendingOffsets.clear();
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
System.out.printf("Lost partitions: {}", partitions);
}
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
System.err.println("Failed to commit offsets");
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
}
}
}
- 1
- 客户端使用
createKafkaConsumer方法创建一个 Kafka 使用者。 - 2
- 消费者订阅特定主题。订阅主题后,会输出确认信息。
- 3
- 所消耗的消息的最大数量由
NUM_MESSAGES常量值决定。 - 4
- 在
session.timeout.ms中必须调用下一个轮询来获取消息,以避免重新平衡。 - 5
- 检查包含从 Kafka 获取的批处理消息的
records对象是否不为空。如果records对象为空,则没有要处理的新消息,并跳过进程。 - 6
- 为指定毫秒数显示消息获取过程的方法。
- 7
- 消费者使用
待处理的Offset 映射来存储需要提交的已用消息的偏移量。 - 8
- 在处理批处理消息后,消费者异步使用
commitAsync方法提交偏移,实现 at-least-once 语义。 - 9
- 在消耗消息和自动重置策略时,用于处理非严重和严重错误的捕获。对于非严重错误,消费者寻求分区的末尾,并开始使用最新的可用偏移。如果无法重试异常,则会输出堆栈追踪,并终止消费者。
- 10
- 客户端使用提供的配置创建
KafkaConsumer实例。您可以使用属性文件或直接添加配置。有关基本配置的详情,请参考 第 4 章 配置客户端应用程序以连接到 Kafka 集群。 - 11
- 与 Kafka 代理的连接。
- 12
- 使用随机生成的 UUID,生成者的唯一客户端 ID。不需要客户端 ID,但跟踪请求源非常有用。
- 13
- 用于与分区的消费者协调的组 ID。
- 14
- 用于处理键和值作为字节数数组的适当反序列化器类。
- 15
- 配置,以禁用自动偏移提交。
- 16
- 在没有为分区找到提交偏移时,消费者开始使用来自最早可用偏移的消息。
- 17
- 在指定毫秒内引入消息消耗进程的延时。如果负责发送消息的线程在暂停时中断,它会抛出
InterruptedException错误。 - 18
- 检查是否在异常后重试提交消息的方法。不重试 null 和 specified 异常,也不是没有实现
RebalanceInProgressException或RetriableException接口的例外。您可以自定义此方法以包含其他错误。 - 19
- 将消息输出到控制台的方法指示已分配给消费者的分区列表。
- 20
- 当消费者要在消费者组重新平衡期间丢失分区的所有权时调用的方法。该方法打印从消费者撤销的分区列表。提交任何待处理的偏移。
- 21
- 当消费者在消费者重新平衡过程中丢失分区的所有权时调用的方法,但无法提交任何待处理的偏移。该方法打印使用者丢失的分区列表。
- 22
- 当消费者向 Kafka 提交偏移时调用的方法。如果在提交偏移时 ocurred 错误,则会打印错误消息。该方法检查异常,并根据它是致命错误还是非严重错误来采取适当的操作。如果错误不是严重的,则偏移提交过程将继续。如果错误是致命的,则会输出堆栈追踪,并且使用者终止。
错误处理
消费者应用程序捕获的致命异常:
InterruptedException-
当当前线程在暂停时中断时抛出错误。在停止或关闭消费者时通常会中断。异常被重新增长为
RuntimeException,后者终止消费者。 IllegalArgumentException- 当使用者收到无效或不当参数时,抛出错误。例如,如果缺少主题,则会抛出异常。
UnsupportedOperationException-
不支持操作或未实施方法时抛出错误。例如,如果尝试使用不受支持的消费者配置或调用
KafkaConsumer类不支持的方法,则会抛出异常。
消费者应用程序发现的非严重异常:
OffsetOutOfRangeException-
当消费者试图查看分区无效偏移时抛出错误,通常是当偏移超出该分区的有效偏移范围时,并且未启用自动重置策略。要恢复,消费者查找分区末尾以同步提交偏移(
commitSync)。如果启用了 auto-reset 策略,则根据设置,消费者查找到分区的开始或结束。 NoOffsetForPartitionException-
当分区没有提交偏移或请求的偏移无效时抛出错误,且没有启用自动重置策略。要恢复,消费者查找分区末尾以同步提交偏移(
commitSync)。如果启用了 auto-reset 策略,则根据设置,消费者查找到分区的开始或结束。 RebalanceInProgressException- 当分配了分区时,消费者组重新平衡过程中抛出错误。当消费者进入重新平衡时,无法完成偏移提交。
RetriableException-
对于实现 Kafka 客户端库提供的
RetriableException接口的异常抛出错误。
使用非严重错误时,使用者会继续处理消息。