Chapter 6. Developing a Kafka client
Create a Kafka client in your preferred programming language and connect it to Streams for Apache Kafka.
To interact with a Kafka cluster, client applications need to be able to produce and consume messages. To develop and configure a basic Kafka client application, as a minimum, you must do the following:
- Set up configuration to connect to a Kafka cluster
- Use producers and consumers to send and receive messages
Setting up the basic configuration for connecting to a Kafka cluster and using producers and consumers is the first step in developing a Kafka client. After that, you can expand into improving the inputs, security, performance, error handling, and functionality of the client application.
Prerequisites
You can create a client properties file that contains property values for the following:
Procedure
- Choose a Kafka client library for your programming language, e.g. Java, Python, .NET, etc. Only client libraries built by Red Hat are supported for Streams for Apache Kafka. Currently, Streams for Apache Kafka only provides a Java client library.
- Install the library, either through a package manager or manually by downloading the library from its source.
- Import the necessary classes and dependencies for your Kafka client in your code.
Create a Kafka consumer or producer object, depending on the type of client you want to create.
A client can be a Kafka consumer, producer, Streams processor, and admin.
Provide the configuration properties to connect to the Kafka cluster, including the broker address, port, and credentials if necessary.
For a local Kafka deployment, you might start with an address like
localhost:9092. However, when working with a Kafka cluster managed by Streams for Apache Kafka, you can obtain the bootstrap address from theKafkacustom resource status using anoccommand:oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow This command retrieves the bootstrap addresses exposed by listeners for client connections on a Kafka cluster.
- Use the Kafka consumer or producer object to subscribe to topics, produce messages, or retrieve messages from the Kafka cluster.
- Pay attention to error handling; it’s vitally important when connecting and communicating with Kafka, especially in production systems where high availability and ease of operations are valued. Effective error handling is a key differentiator between a prototype and a production-grade application, and it applies not only to Kafka but also to any robust software system.
6.1. Example Kafka producer application Copy linkLink copied to clipboard!
This Java-based Kafka producer application is an example of a self-contained application that produces messages to a Kafka topic. The client uses the Kafka Producer API to send messages asynchronously, with some error handling.
The client implements the Callback interface for message handling.
To run the Kafka producer application, you execute the main method in the Producer class. The client generates a random byte array as the message payload using the randomBytes method. The client produces messages to a specified Kafka topic until NUM_MESSAGES messages (50 in the example configuration) have been sent. The producer is thread-safe, allowing multiple threads to use a single producer instance.
Kafka producer instances are designed to be thread-safe, allowing multiple threads to share a single producer instance.
This example client provides a basic foundation for building more complex Kafka producers for specific use cases. You can incorporate additional functionality, such as implementing secure connections.
Prerequisites
-
Kafka brokers running on the specified
BOOTSTRAP_SERVERS -
A Kafka topic named
TOPIC_NAMEto which messages are produced. - Client dependencies
Before implementing the Kafka producer application, your project must include the necessary dependencies. For a Java-based Kafka client, include the Kafka client JAR. This JAR file contains the Kafka libraries required for building and running the client.
For information on how to add the dependencies to a pom.xml file in a Maven project, see Section 3.1, “Adding a Kafka clients dependency to your Maven project”.
Configuration
You can configure the producer application through the following constants specified in the Producer class:
BOOTSTRAP_SERVERS- The address and port to connect to the Kafka brokers.
TOPIC_NAME- The name of the Kafka topic to produce messages to.
NUM_MESSAGES- The number of messages to produce before stopping.
MESSAGE_SIZE_BYTES- The size of each message in bytes.
PROCESSING_DELAY_MS- The delay in milliseconds between sending messages. This can simulate message processing time, which is useful for testing.
Example producer application
- 1
- The client creates a Kafka producer using the
createKafkaProducermethod. The producer sends messages to the Kafka topic asynchronously. - 2
- A byte array is used as the payload for each message sent to the Kafka topic.
- 3
- The maximum number of messages sent is determined by the
NUM_MESSAGESconstant value. - 4
- The message rate is controlled with a delay between each message sent.
- 5
- The producer passes the topic name, the message count value, and the message value.
- 6
- The client creates the
KafkaProducerinstance using the provided configuration. You can use a properties file or add the configuration directly. For more information on the basic configuration, see Chapter 4, Configuring client applications for connecting to a Kafka cluster. - 7
- The connection to the Kafka brokers.
- 8
- A unique client ID for the producer using a randomly generated UUID. A client ID is not required, but it is useful to track the source of requests.
- 9
- The appropriate serializer classes for handling keys and values as byte arrays.
- 10
- Method to introduce a delay to the message sending process for a specified number of milliseconds. If the thread responsible for sending messages is interrupted while paused, it throws an
InterruptedExceptionerror. - 11
- Method to create a random byte array of a specific size, which serves as the payload for each message sent to the Kafka topic. The method generates a random integer and adds
65to represent an uppercase letter in ascii code (65 isA, 66 isB, and so on). The ascii code is stored as a single byte in the payload array. If the payload size is not greater than zero, it throws anIllegalArgumentException. - 12
- Method to check whether to retry sending a message following an exception. The Kafka producer automatically handles retries for certain errors, such as connection errors. You can customize this method to include other errors. Returns
falsefor null and specified exceptions, or those that do not implement theRetriableExceptioninterface. - 13
- Method called when a message has been acknowledged by the Kafka broker. On success, a message is printed with the details of the topic, partition, and offset position for the message. If an error ocurred when sending the message, an error message is printed. The method checks the exception and takes appropriate action based on whether it’s a fatal or non-fatal error. If the error is non-fatal, the message sending process continues. If the error is fatal, a stack trace is printed and the producer is terminated.
Error handling
Fatal exceptions caught by the producer application:
InterruptedException-
Error thrown when the current thread is interrupted while paused. Interruption typically occurs when stopping or shutting down the producer. The exception is rethrown as a
RuntimeException, which terminates the producer. IllegalArgumentException- Error thrown when the producer receives invalid or inappropriate arguments. For example, the exception is thrown if the topic is missing.
UnsupportedOperationException-
Error thrown when an operation is not supported or a method is not implemented. For example, the exception is thrown if an attempt is made to use an unsupported producer configuration or call a method that is not supported by the
KafkaProducerclass.
Non-fatal exceptions caught by the producer application:
RetriableException-
Error thrown for any exception that implements the
RetriableExceptioninterface provided by the Kafka client library.
With non-fatal errors, the producer continues to send messages.
By default, Kafka operates with at-least-once message delivery semantics, which means that messages can be delivered more than once in certain scenarios, potentially leading to duplicates. To avoid this risk, consider enabling transactions in your Kafka producer. Transactions provide stronger guarantees of exactly-once delivery. Additionally, you can use the retries configuration property to control how many times the producer will retry sending a message before giving up. This setting affects how many times the retriable method may return true during a message send error.
6.2. Example Kafka consumer application Copy linkLink copied to clipboard!
This Java-based Kafka consumer application is an example of a self-contained application that consumes messages from a Kafka topic. The client uses the Kafka Consumer API to fetch and process messages from a specified topic asynchronously, with some error handling. It follows at-least-once semantics by committing offsets after successfully processing messages.
The client implements the ConsumerRebalanceListener interface for partition handling and the OffsetCommitCallback interface for committing offsets.
To run the Kafka consumer application, you execute the main method in the Consumer class. The client consumes messages from the Kafka topic until NUM_MESSAGES messages (50 in the example configuration) have been consumed. The consumer is not designed to be safely accessed concurrently by multiple threads.
This example client provides a basic foundation for building more complex Kafka consumers for specific use cases. You can incorporate additional functionality, such as implementing secure connections.
Prerequisites
-
Kafka brokers running on the specified
BOOTSTRAP_SERVERS -
A Kafka topic named
TOPIC_NAMEfrom which messages are consumed. - Client dependencies
Before implementing the Kafka consumer application, your project must include the necessary dependencies. For a Java-based Kafka client, include the Kafka client JAR. This JAR file contains the Kafka libraries required for building and running the client.
For information on how to add the dependencies to a pom.xml file in a Maven project, see Section 3.1, “Adding a Kafka clients dependency to your Maven project”.
Configuration
You can configure the consumer application through the following constants specified in the Consumer class:
BOOTSTRAP_SERVERS- The address and port to connect to the Kafka brokers.
GROUP_ID- The consumer group identifier.
POLL_TIMEOUT_MS- The maximum time to wait for new messages during each poll.
TOPIC_NAME- The name of the Kafka topic to consume messages from.
NUM_MESSAGES- The number of messages to consume before stopping.
PROCESSING_DELAY_MS- The delay in milliseconds between sending messages. This can simulate message processing time, which is useful for testing.
Example consumer application
- 1
- The client creates a Kafka consumer using the
createKafkaConsumermethod. - 2
- The consumer subscribes to a specific topic. After subscribing to the topic, a confirmation message is printed.
- 3
- The maximum number of messages consumed is determined by the
NUM_MESSAGESconstant value. - 4
- The next poll to fetch messages must be called within
session.timeout.msto avoid a rebalance. - 5
- A condition to check that the
recordsobject containing the batch messages fetched from Kafka is not empty. If therecordsobject is empty, there are no new messages to process and the process is skipped. - 6
- Method to introduce a delay to the message fetching process for a specified number of milliseconds.
- 7
- The consumer uses a
pendingOffsetsmap to store the offsets of the consumed messages that need to be committed. - 8
- After processing a batch of messages, the consumer asynchronously commits the offsets using the
commitAsyncmethod, implementing at-least-once semantics. - 9
- A catch to handle non-fatal and fatal errors when consuming messages and auto-reset policy is not set. For non-fatal errors, the consumer seeks to the end of the partition and starts consuming from the latest available offset. If an exception cannot be retried, a stack trace is printed, and the consumer is terminated.
- 10
- The client creates the
KafkaConsumerinstance using the provided configuration. You can use a properties file or add the configuration directly. For more information on the basic configuration, see Chapter 4, Configuring client applications for connecting to a Kafka cluster. - 11
- The connection to the Kafka brokers.
- 12
- A unique client ID for the producer using a randomly generated UUID. A client ID is not required, but it is useful to track the source of requests.
- 13
- The group ID for consumer coordination of assignments to partitions.
- 14
- The appropriate deserializer classes for handling keys and values as byte arrays.
- 15
- Configuration to disable automatic offset commits.
- 16
- Configuration for the consumer to start consuming messages from the earliest available offset when no committed offset is found for a partition.
- 17
- Method to introduce a delay to the message consuming process for a specified number of milliseconds. If the thread responsible for sending messages is interrupted while paused, it throws an
InterruptedExceptionerror. - 18
- Method to check whether to retry committing a message following an exception. Null and specified exceptions are not retried, nor are exceptions that do not implement the
RebalanceInProgressExceptionorRetriableExceptioninterfaces. You can customize this method to include other errors. - 19
- Method to print a message to the console indicating the list of partitions that have been assigned to the consumer.
- 20
- Method called when the consumer is about to lose ownership of partitions during a consumer group rebalance. The method prints the list of partitions that are being revoked from the consumer. Any pending offsets are committed.
- 21
- Method called when the consumer loses ownership of partitions during a consumer group rebalance, but failed to commit any pending offsets. The method prints the list of partitions lost by the consumer.
- 22
- Method called when the consumer is committing offsets to Kafka. If an error ocurred when committing an offset, an error message is printed. The method checks the exception and takes appropriate action based on whether it’s a fatal or non-fatal error. If the error is non-fatal, the offset committing process continues. If the error is fatal, a stack trace is printed and the consumer is terminated.
Error handling
Fatal exceptions caught by the consumer application:
InterruptedException-
Error thrown when the current thread is interrupted while paused. Interruption typically occurs when stopping or shutting down the consumer. The exception is rethrown as a
RuntimeException, which terminates the consumer. IllegalArgumentException- Error thrown when the consumer receives invalid or inappropriate arguments. For example, the exception is thrown if the topic is missing.
UnsupportedOperationException-
Error thrown when an operation is not supported or a method is not implemented. For example, the exception is thrown if an attempt is made to use an unsupported consumer configuration or call a method that is not supported by the
KafkaConsumerclass.
Non-fatal exceptions caught by the consumer application:
OffsetOutOfRangeException-
Error thrown when the consumer attempts to seek to an invalid offset for a partition, typically when the offset is outside the valid range of offsets for that partition, and auto-reset policy is not enabled. To recover, the consumer seeks to the end of the partition to commit the offset synchronously (
commitSync). If auto-reset policy is enabled, the consumer seeks to the start or end of the partition depending on the setting. NoOffsetForPartitionException-
Error thrown when there is no committed offset for a partition or the requested offset is invalid, and auto-reset policy is not enabled. To recover, the consumer seeks to the end of the partition to commit the offset synchronously (
commitSync). If auto-reset policy is enabled, the consumer seeks to the start or end of the partition depending on the setting. RebalanceInProgressException- Error thrown during a consumer group rebalance when partitions are being assigned. Offset commits cannot be completed when the consumer is undergoing a rebalance.
RetriableException-
Error thrown for any exception that implements the
RetriableExceptioninterface provided by the Kafka client library.
With non-fatal errors, the consumer continues to process messages.
6.3. Using cooperative rebalancing with consumers Copy linkLink copied to clipboard!
Kafka consumers use a partition assignment strategy determined by the rebalancing protocol in place. By default, Kafka employs the RangeAssignor protocol, which involves consumers relinquishing their partition assignments during a rebalance, leading to potential service disruptions.
To improve efficiency and reduce downtime, you can switch to the CooperativeStickyAssignor protocol, a cooperative rebalancing approach. Unlike the default protocol, cooperative rebalancing enables consumers to work together, retaining their partition assignments during a rebalance, and releasing partitions only when necessary to achieve a balance within the consumer group.
Procedure
In the consumer configuration, use the
partition.assignment.strategyproperty to switch to usingCooperativeStickyAssignoras the protocol. For example, if the current configuration ispartition.assignment.strategy=RangeAssignor, CooperativeStickyAssignor, update it topartition.assignment.strategy=CooperativeStickyAssignor.Instead of modifying the consumer configuration file directly, you can also set the partition assignment strategy using
props.putin the consumer application code:... ...
# ... props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); # ...Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Restart each consumer in the group one at a time, allowing them to rejoin the group after each restart.
After switching to the CooperativeStickyAssignor protocol, a RebalanceInProgressException may occur during consumer rebalancing, leading to unexpected stoppages of multiple Kafka clients in the same consumer group. Additionally, this issue may result in the duplication of uncommitted messages, even if Kafka consumers have not changed their partition assignments during rebalancing. If you are using automatic offset commits (enable.auto.commit=true), you don’t need to make any changes. If you are manually committing offsets (enable.auto.commit=false), and a RebalanceInProgressException occurs during the manual commit, change the consumer implementation to call poll() in the next loop to complete the consumer rebalancing process. For more information, see the CooperativeStickyAssignor article on the customer portal.