Chapter 4. Configuring client applications for connecting to a Kafka cluster


To connect to a Kafka cluster, a client application must be configured with a minimum set of properties that identify the brokers and enable a connection. Additionally, you need to add a serializer/deserializer mechanism to convert messages into or out of the byte array format used by Kafka. When developing a consumer client, you begin by adding an initial connection to your Kafka cluster, which is used to discover all available brokers. When you have established a connection, you can begin consuming messages from Kafka topics or producing messages to them.

Although not required, a unique client ID is recommended so that you can identity your clients in logs and metrics collection.

You can configure the properties in a properties file. Using a properties file means you can modify the configuration without recompiling the code.

For example, you can load the properties in a Java client using the following code:

Loading configuration properties into a client

Properties properties = new Properties();
InsetPropertyStream insetPropertyStream = new FileInsetPropertyStream("config.properties");
properties.load(insetPropertyStream);
KafkaProducer<String, String> consumer = new KafkaProducer<>(properties);

You can also use add the properties directly to the code in a configuration object. For example, you can use the setProperty() method for a Java client application. Adding properties directly is a useful option when you only have a small number of properties to configure.

4.1. Basic producer client configuration

When you develop a producer client, configure the following:

  • A connection to your Kafka cluster
  • A serializer to transform message keys into bytes for the Kafka broker
  • A serializer to transform message values into bytes for the Kafka broker

You might also add a compression type in case you want to send and store compressed messages.

Basic producer client configuration properties

client.id = my-producer-id 1
bootstrap.servers = my-cluster-kafka-bootstrap:9092 2
key.serializer = org.apache.kafka.common.serialization.StringSerializer 3
value.serializer = org.apache.kafka.common.serialization.StringSerializer 4

1
The logical name for the client.
2
Bootstrap address for the client to be able to make an initial connection to the Kafka cluster.
3
Serializer to transform message keys into bytes before being sent to the Kafka broker.
4
Serializer to transform message values into bytes before being sent to the Kafka broker.

Adding producer client configuration directly to the code

Properties props = new Properties();
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

The KafkaProducer specifies string key and value types for the messages it sends. The serializers used must be able to convert the key and values from the specified type into bytes before sending them to Kafka.

4.2. Basic consumer client configuration

When you develop a consumer client, configure the following:

  • A connection to your Kafka cluster
  • A deserializer to transform the bytes fetched from the Kafka broker into message keys that can be understood by the client application
  • A deserializer to transform the bytes fetched from the Kafka broker into message values that can be understood by the client application

Typically, you also add a consumer group ID to associate the consumer with a consumer group. A consumer group is a logical entity for distributing the processing of a large data stream from one or more topics to parallel consumers. Consumers are grouped using a group.id, allowing messages to be spread across the members. In a given consumer group, each topic partition is read by a single consumer. A single consumer can handle many partitions. For maximum parallelism, create one consumer for each partition. If there are more consumers than partitions, some consumers remain idle, ready to take over in case of failure.

Basic consumer client configuration properties

client.id = my-consumer-id 1
group.id = my-group-id 2
bootstrap.servers = my-cluster-kafka-bootstrap:9092 3
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 4
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 5

1
The logical name for the client.
2
A group ID for the consumer to be able to join a specific consumer group.
3
Bootstrap address for the client to be able to make an initial connection to the Kafka cluster.
4
Deserializer to transform the bytes fetched from the Kafka broker into message keys.
5
Deserializer to transform the bytes fetched from the Kafka broker into message values.

Adding consumer client configuration directly to the code

Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

The KafkaConsumer specifies string key and value types for the messages it receives. The serializers used must be able to convert the bytes received from Kafka into the specified types.

Note

Each consumer group must have a unique group.id. If you restart a consumer with the same group.id, it resumes consuming messages from where it left off before it was stopped.

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.