Chapter 6. Developing a Kafka client


Create a Kafka client in your preferred programming language and connect it to AMQ Streams.

To interact with a Kafka cluster, client applications need to be able to produce and consume messages. To develop and configure a basic Kafka client application, as a minimum, you need to do the following:

  • Set up configuration to connect to a Kafka cluster
  • Use producers and consumers to send and receive messages

Setting up the basic configuration for connecting to a Kafka cluster and using producers and consumers is the first step in developing a Kafka client. After that, you can expand into improving the inputs, security, performance, error handling, and functionality of the client application.

Prerequisites

You have created a client properties file that contains property values for the following:

Procedure

  1. Choose a Kafka client library for your programming language, e.g. Java, Python, .NET, etc.
  2. Install the library, either through a package manager or manually by downloading the library from its source.
  3. Import the necessary classes and dependencies for your Kafka client in your code.
  4. Create a Kafka consumer or producer object, depending on the type of client you want to create.

    You can have a client that does both.

  5. Provide the configuration properties to connect to the Kafka cluster, including the broker address, port, and credentials if necessary.
  6. Use the Kafka consumer or producer object to subscribe to topics, produce messages, or retrieve messages from the Kafka cluster.
  7. Handle any errors that may occur during the connection or communication with AMQ Streams.

Here is an example in Java using the Apache Kafka client library to create a Kafka consumer and connect it to a Kafka cluster to start reading from a specified topic.

Example consumer client

import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class AMQStreamsConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        try (InputStream input = new FileInputStream("consumer.properties")) { 1
            properties.load(input);
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("my-topic")); 2

        while (true) { 3
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }
    }
}

1
The consumer client configuration is read and loaded from the consumer.properties file.
2
The subscribe() method subscribes to a topic called my-topic. You can add other topics to the list.
3
The consumer polls for new messages from Kafka and retrieves message batches for processing.
Note

If a consumer has a group.id, it must subscribe to one or more topics. The broker needs to know which topics the consumer is interested in so that it can assign partitions to the consumer.

Here is an example in Java using the Apache Kafka client library to create a Kafka producer and connect it to a Kafka cluster to start producing messages to a specified topic.

Example producer client

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class AMQStreamsProducerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        try (InputStream input = new FileInputStream("producer.properties")) { 1
            properties.load(input);
        } catch (IOException e) {
            e.printStackTrace();
        }

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties); 2

        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "message-key", "message-value"); 3
        producer.send(record);

        producer.close();
    }
}

1
The producer client configuration is read and loaded from the producer.properties file.
2
The client creates a producer using the properties.
3
A message is sent to a topic called my-topic.
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.