import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import static java.time.Duration.ofMillis;
import static java.util.Collections.singleton;
public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
private static final long POLL_TIMEOUT_MS = 1_000L;
private static final String TOPIC_NAME = "my-topic";
private static final long NUM_MESSAGES = 50;
private static final long PROCESSING_DELAY_MS = 1_000L;
private KafkaConsumer<Long, byte[]> kafkaConsumer;
protected AtomicLong messageCount = new AtomicLong(0);
private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();
public static void main(String[] args) {
new Consumer().run();
}
public void run() {
System.out.println("Running consumer");
try (var consumer = createKafkaConsumer()) {
kafkaConsumer = consumer;
consumer.subscribe(singleton(TOPIC_NAME), this);
System.out.printf("Subscribed to %s%n", TOPIC_NAME);
while (messageCount.get() < NUM_MESSAGES) {
try {
ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS));
if (!records.isEmpty()) {
for (ConsumerRecord<Long, byte[]> record : records) {
System.out.printf("Record fetched from %s-%d with offset %d%n",
record.topic(), record.partition(), record.offset());
sleep(PROCESSING_DELAY_MS);
pendingOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
if (messageCount.incrementAndGet() == NUM_MESSAGES) {
break;
}
}
consumer.commitAsync(pendingOffsets, this);
pendingOffsets.clear();
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
System.out.println("Invalid or no offset found, and auto.reset.policy unset, using latest");
consumer.seekToEnd(e.partitions());
consumer.commitSync();
} catch (Exception e) {
System.err.println(e.getMessage());
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
}
}
}
}
private KafkaConsumer<Long, byte[]> createKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
private void sleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private boolean retriable(Exception e) {
if (e == null) {
return false;
} else if (e instanceof IllegalArgumentException
|| e instanceof UnsupportedOperationException
|| !(e instanceof RebalanceInProgressException)
|| !(e instanceof RetriableException)) {
return false;
} else {
return true;
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("Assigned partitions: %s%n", partitions);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("Revoked partitions: %s%n", partitions);
kafkaConsumer.commitSync(pendingOffsets);
pendingOffsets.clear();
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
System.out.printf("Lost partitions: {}", partitions);
}
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
System.err.println("Failed to commit offsets");
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
}
}
}
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import static java.time.Duration.ofMillis;
import static java.util.Collections.singleton;
public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
private static final long POLL_TIMEOUT_MS = 1_000L;
private static final String TOPIC_NAME = "my-topic";
private static final long NUM_MESSAGES = 50;
private static final long PROCESSING_DELAY_MS = 1_000L;
private KafkaConsumer<Long, byte[]> kafkaConsumer;
protected AtomicLong messageCount = new AtomicLong(0);
private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();
public static void main(String[] args) {
new Consumer().run();
}
public void run() {
System.out.println("Running consumer");
try (var consumer = createKafkaConsumer()) {
1
kafkaConsumer = consumer;
consumer.subscribe(singleton(TOPIC_NAME), this);
2
System.out.printf("Subscribed to %s%n", TOPIC_NAME);
while (messageCount.get() < NUM_MESSAGES) {
3
try {
ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS));
4
if (!records.isEmpty()) {
5
for (ConsumerRecord<Long, byte[]> record : records) {
System.out.printf("Record fetched from %s-%d with offset %d%n",
record.topic(), record.partition(), record.offset());
sleep(PROCESSING_DELAY_MS);
6
pendingOffsets.put(new TopicPartition(record.topic(), record.partition()),
7
new OffsetAndMetadata(record.offset() + 1, null));
if (messageCount.incrementAndGet() == NUM_MESSAGES) {
break;
}
}
consumer.commitAsync(pendingOffsets, this);
8
pendingOffsets.clear();
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
9
System.out.println("Invalid or no offset found, and auto.reset.policy unset, using latest");
consumer.seekToEnd(e.partitions());
consumer.commitSync();
} catch (Exception e) {
System.err.println(e.getMessage());
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
}
}
}
}
private KafkaConsumer<Long, byte[]> createKafkaConsumer() {
Properties props = new Properties();
10
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
11
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
12
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
13
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
14
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
15
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
16
return new KafkaConsumer<>(props);
}
private void sleep(long ms) {
17
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private boolean retriable(Exception e) {
18
if (e == null) {
return false;
} else if (e instanceof IllegalArgumentException
|| e instanceof UnsupportedOperationException
|| !(e instanceof RebalanceInProgressException)
|| !(e instanceof RetriableException)) {
return false;
} else {
return true;
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
19
System.out.printf("Assigned partitions: %s%n", partitions);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
20
System.out.printf("Revoked partitions: %s%n", partitions);
kafkaConsumer.commitSync(pendingOffsets);
pendingOffsets.clear();
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
21
System.out.printf("Lost partitions: {}", partitions);
}
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
22
if (e != null) {
System.err.println("Failed to commit offsets");
if (!retriable(e)) {
e.printStackTrace();
System.exit(1);
}
}
}
}