Chapter 6. Developing a Kafka client
Create a Kafka client in your preferred programming language and connect it to AMQ Streams.
To interact with a Kafka cluster, client applications need to be able to produce and consume messages. To develop and configure a basic Kafka client application, as a minimum, you need to do the following:
- Set up configuration to connect to a Kafka cluster
- Use producers and consumers to send and receive messages
Setting up the basic configuration for connecting to a Kafka cluster and using producers and consumers is the first step in developing a Kafka client. After that, you can expand into improving the inputs, security, performance, error handling, and functionality of the client application.
Prerequisites
You have created a client properties file that contains property values for the following:
Procedure
- Choose a Kafka client library for your programming language, e.g. Java, Python, .NET, etc.
- Install the library, either through a package manager or manually by downloading the library from its source.
- Import the necessary classes and dependencies for your Kafka client in your code.
Create a Kafka consumer or producer object, depending on the type of client you want to create.
You can have a client that does both.
- Provide the configuration properties to connect to the Kafka cluster, including the broker address, port, and credentials if necessary.
- Use the Kafka consumer or producer object to subscribe to topics, produce messages, or retrieve messages from the Kafka cluster.
- Handle any errors that may occur during the connection or communication with AMQ Streams.
Here is an example in Java using the Apache Kafka client library to create a Kafka consumer and connect it to a Kafka cluster to start reading from a specified topic.
Example consumer client
import java.util.Arrays; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class AMQStreamsConsumerExample { public static void main(String[] args) { Properties properties = new Properties(); try (InputStream input = new FileInputStream("consumer.properties")) { 1 properties.load(input); } catch (IOException ex) { ex.printStackTrace(); } KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("my-topic")); 2 while (true) { 3 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: " + record.value()); } } } }
If a consumer has a group.id
, it must subscribe to one or more topics. The broker needs to know which topics the consumer is interested in so that it can assign partitions to the consumer.
Here is an example in Java using the Apache Kafka client library to create a Kafka producer and connect it to a Kafka cluster to start producing messages to a specified topic.
Example producer client
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class AMQStreamsProducerExample { public static void main(String[] args) { Properties properties = new Properties(); try (InputStream input = new FileInputStream("producer.properties")) { 1 properties.load(input); } catch (IOException e) { e.printStackTrace(); } KafkaProducer<String, String> producer = new KafkaProducer<>(properties); 2 ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "message-key", "message-value"); 3 producer.send(record); producer.close(); } }