Chapter 6. Developing a Kafka client


Create a Kafka client in your preferred programming language and connect it to Streams for Apache Kafka.

To interact with a Kafka cluster, client applications need to be able to produce and consume messages. To develop and configure a basic Kafka client application, as a minimum, you must do the following:

  • Set up configuration to connect to a Kafka cluster
  • Use producers and consumers to send and receive messages

Setting up the basic configuration for connecting to a Kafka cluster and using producers and consumers is the first step in developing a Kafka client. After that, you can expand into improving the inputs, security, performance, error handling, and functionality of the client application.

Prerequisites

You can create a client properties file that contains property values for the following:

Procedure

  1. Choose a Kafka client library for your programming language, e.g. Java, Python, .NET, etc. Only client libraries built by Red Hat are supported for Streams for Apache Kafka. Currently, Streams for Apache Kafka only provides a Java client library.
  2. Install the library, either through a package manager or manually by downloading the library from its source.
  3. Import the necessary classes and dependencies for your Kafka client in your code.
  4. Create a Kafka consumer or producer object, depending on the type of client you want to create.

    A client can be a Kafka consumer, producer, Streams processor, and admin.

  5. Provide the configuration properties to connect to the Kafka cluster, including the broker address, port, and credentials if necessary.

    For a local Kafka deployment, you might start with an address like localhost:9092. However, when working with a Kafka cluster managed by Streams for Apache Kafka, you can obtain the bootstrap address from the Kafka custom resource status using an oc command:

    oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'
    Copy to Clipboard Toggle word wrap

    This command retrieves the bootstrap addresses exposed by listeners for client connections on a Kafka cluster.

  6. Use the Kafka consumer or producer object to subscribe to topics, produce messages, or retrieve messages from the Kafka cluster.
  7. Pay attention to error handling; it’s vitally important when connecting and communicating with Kafka, especially in production systems where high availability and ease of operations are valued. Effective error handling is a key differentiator between a prototype and a production-grade application, and it applies not only to Kafka but also to any robust software system.

6.1. Example Kafka producer application

This Java-based Kafka producer application is an example of a self-contained application that produces messages to a Kafka topic. The client uses the Kafka Producer API to send messages asynchronously, with some error handling.

The client implements the Callback interface for message handling.

To run the Kafka producer application, you execute the main method in the Producer class. The client generates a random byte array as the message payload using the randomBytes method. The client produces messages to a specified Kafka topic until NUM_MESSAGES messages (50 in the example configuration) have been sent. The producer is thread-safe, allowing multiple threads to use a single producer instance.

Kafka producer instances are designed to be thread-safe, allowing multiple threads to share a single producer instance.

This example client provides a basic foundation for building more complex Kafka producers for specific use cases. You can incorporate additional functionality, such as implementing secure connections.

Prerequisites

  • Kafka brokers running on the specified BOOTSTRAP_SERVERS
  • A Kafka topic named TOPIC_NAME to which messages are produced.
  • Client dependencies

Before implementing the Kafka producer application, your project must include the necessary dependencies. For a Java-based Kafka client, include the Kafka client JAR. This JAR file contains the Kafka libraries required for building and running the client.

For information on how to add the dependencies to a pom.xml file in a Maven project, see Section 3.1, “Adding a Kafka clients dependency to your Maven project”.

Configuration

You can configure the producer application through the following constants specified in the Producer class:

BOOTSTRAP_SERVERS
The address and port to connect to the Kafka brokers.
TOPIC_NAME
The name of the Kafka topic to produce messages to.
NUM_MESSAGES
The number of messages to produce before stopping.
MESSAGE_SIZE_BYTES
The size of each message in bytes.
PROCESSING_DELAY_MS
The delay in milliseconds between sending messages. This can simulate message processing time, which is useful for testing.

Example producer application

import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;

public class Producer implements Callback {
    private static final Random RND = new Random(0);
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC_NAME = "my-topic";
    private static final long NUM_MESSAGES = 50;
    private static final int MESSAGE_SIZE_BYTES = 100;
    private static final long PROCESSING_DELAY_MS = 1000L;

    protected AtomicLong messageCount = new AtomicLong(0);

    public static void main(String[] args) {
        new Producer().run();
    }

    public void run() {
        System.out.println("Running producer");
        try (var producer = createKafkaProducer()) {  
1

            byte[] value = randomBytes(MESSAGE_SIZE_BYTES); 
2

            while (messageCount.get() < NUM_MESSAGES) { 
3

                sleep(PROCESSING_DELAY_MS); 
4

                producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this); 
5

                messageCount.incrementAndGet();
            }
        }
    }

    private KafkaProducer<Long, byte[]> createKafkaProducer() {
        Properties props = new Properties(); 
6

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
7

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 
8

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); 
9

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        return new KafkaProducer<>(props);
    }

    private void sleep(long ms) { 
10

        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private byte[] randomBytes(int size) { 
11

        if (size <= 0) {
            throw new IllegalArgumentException("Record size must be greater than zero");
        }
        byte[] payload = new byte[size];
        for (int i = 0; i < payload.length; ++i) {
            payload[i] = (byte) (RND.nextInt(26) + 65);
        }
        return payload;
    }

    private boolean retriable(Exception e) { 
12

        if (e instanceof IllegalArgumentException
            || e instanceof UnsupportedOperationException
            || !(e instanceof RetriableException)) {
            return false;
        } else {
            return true;
        }
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) { 
13

        if (e != null) {
            System.err.println(e.getMessage());
            if (!retriable(e)) {
                e.printStackTrace();
                System.exit(1);
            }
        } else {
            System.out.printf("Record sent to %s-%d with offset %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
}
Copy to Clipboard Toggle word wrap

1
The client creates a Kafka producer using the createKafkaProducer method. The producer sends messages to the Kafka topic asynchronously.
2
A byte array is used as the payload for each message sent to the Kafka topic.
3
The maximum number of messages sent is determined by the NUM_MESSAGES constant value.
4
The message rate is controlled with a delay between each message sent.
5
The producer passes the topic name, the message count value, and the message value.
6
The client creates the KafkaProducer instance using the provided configuration. You can use a properties file or add the configuration directly. For more information on the basic configuration, see Chapter 4, Configuring client applications for connecting to a Kafka cluster.
7
The connection to the Kafka brokers.
8
A unique client ID for the producer using a randomly generated UUID. A client ID is not required, but it is useful to track the source of requests.
9
The appropriate serializer classes for handling keys and values as byte arrays.
10
Method to introduce a delay to the message sending process for a specified number of milliseconds. If the thread responsible for sending messages is interrupted while paused, it throws an InterruptedException error.
11
Method to create a random byte array of a specific size, which serves as the payload for each message sent to the Kafka topic. The method generates a random integer and adds 65 to represent an uppercase letter in ascii code (65 is A, 66 is B, and so on). The ascii code is stored as a single byte in the payload array. If the payload size is not greater than zero, it throws an IllegalArgumentException.
12
Method to check whether to retry sending a message following an exception. The Kafka producer automatically handles retries for certain errors, such as connection errors. You can customize this method to include other errors. Returns false for null and specified exceptions, or those that do not implement the RetriableException interface.
13
Method called when a message has been acknowledged by the Kafka broker. On success, a message is printed with the details of the topic, partition, and offset position for the message. If an error ocurred when sending the message, an error message is printed. The method checks the exception and takes appropriate action based on whether it’s a fatal or non-fatal error. If the error is non-fatal, the message sending process continues. If the error is fatal, a stack trace is printed and the producer is terminated.

Error handling

Fatal exceptions caught by the producer application:

InterruptedException
Error thrown when the current thread is interrupted while paused. Interruption typically occurs when stopping or shutting down the producer. The exception is rethrown as a RuntimeException, which terminates the producer.
IllegalArgumentException
Error thrown when the producer receives invalid or inappropriate arguments. For example, the exception is thrown if the topic is missing.
UnsupportedOperationException
Error thrown when an operation is not supported or a method is not implemented. For example, the exception is thrown if an attempt is made to use an unsupported producer configuration or call a method that is not supported by the KafkaProducer class.

Non-fatal exceptions caught by the producer application:

RetriableException
Error thrown for any exception that implements the RetriableException interface provided by the Kafka client library.

With non-fatal errors, the producer continues to send messages.

Note

By default, Kafka operates with at-least-once message delivery semantics, which means that messages can be delivered more than once in certain scenarios, potentially leading to duplicates. To avoid this risk, consider enabling transactions in your Kafka producer. Transactions provide stronger guarantees of exactly-once delivery. Additionally, you can use the retries configuration property to control how many times the producer will retry sending a message before giving up. This setting affects how many times the retriable method may return true during a message send error.

6.2. Example Kafka consumer application

This Java-based Kafka consumer application is an example of a self-contained application that consumes messages from a Kafka topic. The client uses the Kafka Consumer API to fetch and process messages from a specified topic asynchronously, with some error handling. It follows at-least-once semantics by committing offsets after successfully processing messages.

The client implements the ConsumerRebalanceListener interface for partition handling and the OffsetCommitCallback interface for committing offsets.

To run the Kafka consumer application, you execute the main method in the Consumer class. The client consumes messages from the Kafka topic until NUM_MESSAGES messages (50 in the example configuration) have been consumed. The consumer is not designed to be safely accessed concurrently by multiple threads.

This example client provides a basic foundation for building more complex Kafka consumers for specific use cases. You can incorporate additional functionality, such as implementing secure connections.

Prerequisites

  • Kafka brokers running on the specified BOOTSTRAP_SERVERS
  • A Kafka topic named TOPIC_NAME from which messages are consumed.
  • Client dependencies

Before implementing the Kafka consumer application, your project must include the necessary dependencies. For a Java-based Kafka client, include the Kafka client JAR. This JAR file contains the Kafka libraries required for building and running the client.

For information on how to add the dependencies to a pom.xml file in a Maven project, see Section 3.1, “Adding a Kafka clients dependency to your Maven project”.

Configuration

You can configure the consumer application through the following constants specified in the Consumer class:

BOOTSTRAP_SERVERS
The address and port to connect to the Kafka brokers.
GROUP_ID
The consumer group identifier.
POLL_TIMEOUT_MS
The maximum time to wait for new messages during each poll.
TOPIC_NAME
The name of the Kafka topic to consume messages from.
NUM_MESSAGES
The number of messages to consume before stopping.
PROCESSING_DELAY_MS
The delay in milliseconds between sending messages. This can simulate message processing time, which is useful for testing.

Example consumer application

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;

import static java.time.Duration.ofMillis;
import static java.util.Collections.singleton;

public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-group";
    private static final long POLL_TIMEOUT_MS = 1_000L;
    private static final String TOPIC_NAME = "my-topic";
    private static final long NUM_MESSAGES = 50;
    private static final long PROCESSING_DELAY_MS = 1_000L;

    private KafkaConsumer<Long, byte[]> kafkaConsumer;
    protected AtomicLong messageCount = new AtomicLong(0);
    private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();

    public static void main(String[] args) {
        new Consumer().run();
    }

    public void run() {
        System.out.println("Running consumer");
        try (var consumer = createKafkaConsumer()) { 
1

            kafkaConsumer = consumer;
            consumer.subscribe(singleton(TOPIC_NAME), this); 
2

            System.out.printf("Subscribed to %s%n", TOPIC_NAME);
            while (messageCount.get() < NUM_MESSAGES) { 
3

                try {
                    ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS)); 
4

                    if (!records.isEmpty()) { 
5

                        for (ConsumerRecord<Long, byte[]> record : records) {
                            System.out.printf("Record fetched from %s-%d with offset %d%n",
                                record.topic(), record.partition(), record.offset());
                            sleep(PROCESSING_DELAY_MS); 
6


                            pendingOffsets.put(new TopicPartition(record.topic(), record.partition()), 
7

                                new OffsetAndMetadata(record.offset() + 1, null));
                            if (messageCount.incrementAndGet() == NUM_MESSAGES) {
                                break;
                            }
                        }
                        consumer.commitAsync(pendingOffsets, this); 
8

                        pendingOffsets.clear();
                    }
                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { 
9

                    System.out.println("Invalid or no offset found, and auto.reset.policy unset, using latest");
                    consumer.seekToEnd(e.partitions());
                    consumer.commitSync();
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                    if (!retriable(e)) {
                        e.printStackTrace();
                        System.exit(1);
                    }
                }
            }
        }
    }

    private KafkaConsumer<Long, byte[]> createKafkaConsumer() {
        Properties props = new Properties(); 
10

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
11

        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 
12

        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); 
13

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); 
14

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
15

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
16

        return new KafkaConsumer<>(props);
    }

    private void sleep(long ms) { 
17

        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean retriable(Exception e) { 
18

        if (e == null) {
            return false;
        } else if (e instanceof IllegalArgumentException
            || e instanceof UnsupportedOperationException
            || !(e instanceof RebalanceInProgressException)
            || !(e instanceof RetriableException)) {
            return false;
        } else {
            return true;
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
19

        System.out.printf("Assigned partitions: %s%n", partitions);
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 
20

        System.out.printf("Revoked partitions: %s%n", partitions);
        kafkaConsumer.commitSync(pendingOffsets);
        pendingOffsets.clear();
    }

    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) { 
21

        System.out.printf("Lost partitions: {}", partitions);
    }

    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { 
22

        if (e != null) {
            System.err.println("Failed to commit offsets");
            if (!retriable(e)) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }
}
Copy to Clipboard Toggle word wrap

1
The client creates a Kafka consumer using the createKafkaConsumer method.
2
The consumer subscribes to a specific topic. After subscribing to the topic, a confirmation message is printed.
3
The maximum number of messages consumed is determined by the NUM_MESSAGES constant value.
4
The next poll to fetch messages must be called within session.timeout.ms to avoid a rebalance.
5
A condition to check that the records object containing the batch messages fetched from Kafka is not empty. If the records object is empty, there are no new messages to process and the process is skipped.
6
Method to introduce a delay to the message fetching process for a specified number of milliseconds.
7
The consumer uses a pendingOffsets map to store the offsets of the consumed messages that need to be committed.
8
After processing a batch of messages, the consumer asynchronously commits the offsets using the commitAsync method, implementing at-least-once semantics.
9
A catch to handle non-fatal and fatal errors when consuming messages and auto-reset policy is not set. For non-fatal errors, the consumer seeks to the end of the partition and starts consuming from the latest available offset. If an exception cannot be retried, a stack trace is printed, and the consumer is terminated.
10
The client creates the KafkaConsumer instance using the provided configuration. You can use a properties file or add the configuration directly. For more information on the basic configuration, see Chapter 4, Configuring client applications for connecting to a Kafka cluster.
11
The connection to the Kafka brokers.
12
A unique client ID for the producer using a randomly generated UUID. A client ID is not required, but it is useful to track the source of requests.
13
The group ID for consumer coordination of assignments to partitions.
14
The appropriate deserializer classes for handling keys and values as byte arrays.
15
Configuration to disable automatic offset commits.
16
Configuration for the consumer to start consuming messages from the earliest available offset when no committed offset is found for a partition.
17
Method to introduce a delay to the message consuming process for a specified number of milliseconds. If the thread responsible for sending messages is interrupted while paused, it throws an InterruptedException error.
18
Method to check whether to retry committing a message following an exception. Null and specified exceptions are not retried, nor are exceptions that do not implement the RebalanceInProgressException or RetriableException interfaces. You can customize this method to include other errors.
19
Method to print a message to the console indicating the list of partitions that have been assigned to the consumer.
20
Method called when the consumer is about to lose ownership of partitions during a consumer group rebalance. The method prints the list of partitions that are being revoked from the consumer. Any pending offsets are committed.
21
Method called when the consumer loses ownership of partitions during a consumer group rebalance, but failed to commit any pending offsets. The method prints the list of partitions lost by the consumer.
22
Method called when the consumer is committing offsets to Kafka. If an error ocurred when committing an offset, an error message is printed. The method checks the exception and takes appropriate action based on whether it’s a fatal or non-fatal error. If the error is non-fatal, the offset committing process continues. If the error is fatal, a stack trace is printed and the consumer is terminated.

Error handling

Fatal exceptions caught by the consumer application:

InterruptedException
Error thrown when the current thread is interrupted while paused. Interruption typically occurs when stopping or shutting down the consumer. The exception is rethrown as a RuntimeException, which terminates the consumer.
IllegalArgumentException
Error thrown when the consumer receives invalid or inappropriate arguments. For example, the exception is thrown if the topic is missing.
UnsupportedOperationException
Error thrown when an operation is not supported or a method is not implemented. For example, the exception is thrown if an attempt is made to use an unsupported consumer configuration or call a method that is not supported by the KafkaConsumer class.

Non-fatal exceptions caught by the consumer application:

OffsetOutOfRangeException
Error thrown when the consumer attempts to seek to an invalid offset for a partition, typically when the offset is outside the valid range of offsets for that partition, and auto-reset policy is not enabled. To recover, the consumer seeks to the end of the partition to commit the offset synchronously (commitSync). If auto-reset policy is enabled, the consumer seeks to the start or end of the partition depending on the setting.
NoOffsetForPartitionException
Error thrown when there is no committed offset for a partition or the requested offset is invalid, and auto-reset policy is not enabled. To recover, the consumer seeks to the end of the partition to commit the offset synchronously (commitSync). If auto-reset policy is enabled, the consumer seeks to the start or end of the partition depending on the setting.
RebalanceInProgressException
Error thrown during a consumer group rebalance when partitions are being assigned. Offset commits cannot be completed when the consumer is undergoing a rebalance.
RetriableException
Error thrown for any exception that implements the RetriableException interface provided by the Kafka client library.

With non-fatal errors, the consumer continues to process messages.

6.3. Using cooperative rebalancing with consumers

Kafka consumers use a partition assignment strategy determined by the rebalancing protocol in place. By default, Kafka employs the RangeAssignor protocol, which involves consumers relinquishing their partition assignments during a rebalance, leading to potential service disruptions.

To improve efficiency and reduce downtime, you can switch to the CooperativeStickyAssignor protocol, a cooperative rebalancing approach. Unlike the default protocol, cooperative rebalancing enables consumers to work together, retaining their partition assignments during a rebalance, and releasing partitions only when necessary to achieve a balance within the consumer group.

Procedure

  1. In the consumer configuration, use the partition.assignment.strategy property to switch to using CooperativeStickyAssignor as the protocol. For example, if the current configuration is partition.assignment.strategy=RangeAssignor, CooperativeStickyAssignor, update it to partition.assignment.strategy=CooperativeStickyAssignor.

    Instead of modifying the consumer configuration file directly, you can also set the partition assignment strategy using props.put in the consumer application code:

    # ...
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
    # ...
    Copy to Clipboard Toggle word wrap
  2. Restart each consumer in the group one at a time, allowing them to rejoin the group after each restart.
Warning

After switching to the CooperativeStickyAssignor protocol, a RebalanceInProgressException may occur during consumer rebalancing, leading to unexpected stoppages of multiple Kafka clients in the same consumer group. Additionally, this issue may result in the duplication of uncommitted messages, even if Kafka consumers have not changed their partition assignments during rebalancing. If you are using automatic offset commits (enable.auto.commit=true), you don’t need to make any changes. If you are manually committing offsets (enable.auto.commit=false), and a RebalanceInProgressException occurs during the manual commit, change the consumer implementation to call poll() in the next loop to complete the consumer rebalancing process. For more information, see the CooperativeStickyAssignor article on the customer portal.

Back to top
Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust. Explore our recent updates.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Theme

© 2025 Red Hat