Dieser Inhalt ist in der von Ihnen ausgewählten Sprache nicht verfügbar.

Chapter 6. Developing a Kafka client


Create a Kafka client in your preferred programming language and connect it to AMQ Streams.

To interact with a Kafka cluster, client applications need to be able to produce and consume messages. To develop and configure a basic Kafka client application, as a minimum, you need to do the following:

  • Set up configuration to connect to a Kafka cluster
  • Use producers and consumers to send and receive messages

Setting up the basic configuration for connecting to a Kafka cluster and using producers and consumers is the first step in developing a Kafka client. After that, you can expand into improving the inputs, security, performance, error handling, and functionality of the client application.

Prerequisites

You have created a client properties file that contains property values for the following:

Procedure

  1. Choose a Kafka client library for your programming language, e.g. Java, Python, .NET, etc.
  2. Install the library, either through a package manager or manually by downloading the library from its source.
  3. Import the necessary classes and dependencies for your Kafka client in your code.
  4. Create a Kafka consumer or producer object, depending on the type of client you want to create.

    You can have a client that does both.

  5. Provide the configuration properties to connect to the Kafka cluster, including the broker address, port, and credentials if necessary.
  6. Use the Kafka consumer or producer object to subscribe to topics, produce messages, or retrieve messages from the Kafka cluster.
  7. Handle any errors that may occur during the connection or communication with AMQ Streams.

Here is an example in Java using the Apache Kafka client library to create a Kafka consumer and connect it to a Kafka cluster to start reading from a specified topic.

Example consumer client

import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class AMQStreamsConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        try (InputStream input = new FileInputStream("consumer.properties")) { 
1

            properties.load(input);
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("my-topic")); 
2


        while (true) { 
3

            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }
    }
}
Copy to Clipboard Toggle word wrap

1
The consumer client configuration is read and loaded from the consumer.properties file.
2
The subscribe() method subscribes to a topic called my-topic. You can add other topics to the list.
3
The consumer polls for new messages from Kafka and retrieves message batches for processing.
Note

If a consumer has a group.id, it must subscribe to one or more topics. The broker needs to know which topics the consumer is interested in so that it can assign partitions to the consumer.

Here is an example in Java using the Apache Kafka client library to create a Kafka producer and connect it to a Kafka cluster to start producing messages to a specified topic.

Example producer client

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class AMQStreamsProducerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        try (InputStream input = new FileInputStream("producer.properties")) { 
1

            properties.load(input);
        } catch (IOException e) {
            e.printStackTrace();
        }

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties); 
2


        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "message-key", "message-value"); 
3

        producer.send(record);

        producer.close();
    }
}
Copy to Clipboard Toggle word wrap

1
The producer client configuration is read and loaded from the producer.properties file.
2
The client creates a producer using the properties.
3
A message is sent to a topic called my-topic.
Nach oben
Red Hat logoGithubredditYoutubeTwitter

Lernen

Testen, kaufen und verkaufen

Communitys

Über Red Hat Dokumentation

Wir helfen Red Hat Benutzern, mit unseren Produkten und Diensten innovativ zu sein und ihre Ziele zu erreichen – mit Inhalten, denen sie vertrauen können. Entdecken Sie unsere neuesten Updates.

Mehr Inklusion in Open Source

Red Hat hat sich verpflichtet, problematische Sprache in unserem Code, unserer Dokumentation und unseren Web-Eigenschaften zu ersetzen. Weitere Einzelheiten finden Sie in Red Hat Blog.

Über Red Hat

Wir liefern gehärtete Lösungen, die es Unternehmen leichter machen, plattform- und umgebungsübergreifend zu arbeiten, vom zentralen Rechenzentrum bis zum Netzwerkrand.

Theme

© 2025 Red Hat