第 6 章 开发 Kafka 客户端


使用首选编程语言创建 Kafka 客户端,并将其连接到 Apache Kafka 的 Streams。

要与 Kafka 集群交互,客户端应用程序需要能够生成和使用消息。要开发并配置基本 Kafka 客户端应用程序,至少您必须执行以下操作:

  • 设置配置以连接到 Kafka 集群
  • 使用制作者和消费者发送和接收消息

设置用于连接 Kafka 集群和使用制作者和消费者的基本配置是开发 Kafka 客户端时的第一步。之后,您可以扩展以改进客户端应用程序的输入、安全性、性能、错误处理和功能。

先决条件

您可以创建一个客户端属性文件,其中包含以下内容的属性值:

流程

  1. 为您的编程语言选择一个 Kafka 客户端库,如 Java、Python、.NET 等。对于 Apache Kafka,只支持由红帽构建的客户端库。目前,Apache Kafka 的流仅提供 Java 客户端库。
  2. 通过软件包管理器或从其源下载库来手动安装库。
  3. 在代码中为您的 Kafka 客户端导入所需的类和依赖项。
  4. 根据您要创建的客户端类型,创建 Kafka 使用者或制作者对象。

    客户端可以是 Kafka 使用者、生成者、流处理器和 admin。

  5. 提供配置属性以连接到 Kafka 集群,包括代理地址、端口和凭证(如果需要)。

    对于本地 Kafka 部署,您可能以 localhost:9092 等地址开始。但是,当使用由 Streams for Apache Kafka 管理的 Kafka 集群时,您可以使用 oc 命令从 Kafka 自定义资源状态获取 bootstrap 地址:

    Copy to Clipboard Toggle word wrap
    oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[*].bootstrapServers}{"\n"}'

    此命令为 Kafka 集群上的客户端连接检索由监听程序公开的 bootstrap 地址。

  6. 使用 Kafka consumer 或 producer 对象订阅主题、生成信息或从 Kafka 集群检索信息。
  7. 请注意错误处理;在连接和与 Kafka 通信时,尤其是在高可用性和易于操作生产系统中非常重要。有效的错误处理是原型和生产级应用程序之间的关键区别,它只适用于 Kafka,还适用于任何强大的软件系统。

6.1. Kafka producer 应用程序示例

这个基于 Java 的 Kafka producer 应用程序是一个自包含应用程序的示例,它生成信息到 Kafka 主题。客户端使用 Kafka Producer API 异步发送消息,并有一些错误处理。

客户端实施 Callback 接口以进行消息处理。

要运行 Kafka producer 应用程序,您可以在 Producer 类中执行 方法。客户端使用 randomBytes 方法生成随机字节阵列作为消息有效负载。客户端生成信息到指定的 Kafka 主题,直到 NUM_MESSAGES 消息(示例配置中为50)被发送。producer 是 thread-safe,允许多个线程使用单个制作者实例。

Kafka 生成者实例设计为线程安全,允许多个线程共享单个制作者实例。

这个示例客户端提供了为特定用例构建更复杂的 Kafka 生成者的基本基础。您可以纳入额外的功能,例如 实施安全连接

先决条件

  • 在指定的 BOOTSTRAP_SERVERS上运行的 Kafka 代理
  • 生成消息的 Kafka 主题 TOPIC_NAME
  • 客户端依赖项

在实施 Kafka producer 应用程序前,您的项目必须包含所需的依赖项。对于基于 Java 的 Kafka 客户端,请包含 Kafka 客户端 JAR。此 JAR 文件包含构建和运行客户端所需的 Kafka 库。

有关如何将依赖项添加到 Maven 项目中的 pom.xml 文件中的详情,请参考 第 3.1 节 “在 Maven 项目中添加 Kafka 客户端依赖项”

配置

您可以通过 Producer 类中指定的以下常数来配置制作者应用程序:

BOOTSTRAP_SERVERS
连接到 Kafka 代理的地址和端口。
TOPIC_NAME
生成消息的 Kafka 主题的名称。
NUM_MESSAGES
在停止前要生成的消息数量。
MESSAGE_SIZE_BYTES
每个消息的大小(以字节为单位)。
PROCESSING_DELAY_MS
发送消息之间的延迟(毫秒)。这可以模拟消息处理时间,这对测试非常有用。

producer 应用程序示例

Copy to Clipboard Toggle word wrap
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;

public class Producer implements Callback {
    private static final Random RND = new Random(0);
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC_NAME = "my-topic";
    private static final long NUM_MESSAGES = 50;
    private static final int MESSAGE_SIZE_BYTES = 100;
    private static final long PROCESSING_DELAY_MS = 1000L;

    protected AtomicLong messageCount = new AtomicLong(0);

    public static void main(String[] args) {
        new Producer().run();
    }

    public void run() {
        System.out.println("Running producer");
        try (var producer = createKafkaProducer()) {  
1

            byte[] value = randomBytes(MESSAGE_SIZE_BYTES); 
2

            while (messageCount.get() < NUM_MESSAGES) { 
3

                sleep(PROCESSING_DELAY_MS); 
4

                producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this); 
5

                messageCount.incrementAndGet();
            }
        }
    }

    private KafkaProducer<Long, byte[]> createKafkaProducer() {
        Properties props = new Properties(); 
6

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 
7

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 
8

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); 
9

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        return new KafkaProducer<>(props);
    }

    private void sleep(long ms) { 
10

        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private byte[] randomBytes(int size) { 
11

        if (size <= 0) {
            throw new IllegalArgumentException("Record size must be greater than zero");
        }
        byte[] payload = new byte[size];
        for (int i = 0; i < payload.length; ++i) {
            payload[i] = (byte) (RND.nextInt(26) + 65);
        }
        return payload;
    }

    private boolean retriable(Exception e) { 
12

        if (e instanceof IllegalArgumentException
            || e instanceof UnsupportedOperationException
            || !(e instanceof RetriableException)) {
            return false;
        } else {
            return true;
        }
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) { 
13

        if (e != null) {
            System.err.println(e.getMessage());
            if (!retriable(e)) {
                e.printStackTrace();
                System.exit(1);
            }
        } else {
            System.out.printf("Record sent to %s-%d with offset %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
}

1
客户端使用 createKafkaProducer 方法创建一个 Kafka producer。producer 异步发送消息到 Kafka 主题。
2
字节数组用作发送到 Kafka 主题的每个消息的有效负载。
3
发送的最大消息数量由 NUM_MESSAGES 常量值决定。
4
消息率使用发送的每个消息之间的延迟控制。
5
制作者传递主题名称、消息计数值和消息值。
6
客户端使用提供的配置创建 KafkaProducer 实例。您可以使用属性文件或直接添加配置。有关基本配置的详情,请参考 第 4 章 配置客户端应用程序以连接到 Kafka 集群
7
与 Kafka 代理的连接。
8
使用随机生成的 UUID,生成者的唯一客户端 ID。不需要客户端 ID,但跟踪请求源非常有用。
9
用于处理键和值作为字节数组的适当序列化类。
10
在指定毫秒内引入消息发送进程的延时。如果负责发送消息的线程在暂停时中断,它会抛出 InterruptedException 错误。
11
创建特定大小随机字节阵列的方法,它充当发送到 Kafka 主题的每个消息的有效负载。这个方法会生成一个随机整数,并添加 65 来代表 ascii 代码中的大写字母(65 是 A, 66 为 B,以此类推)。ascii 代码存储为 payload 数组中的单个字节。如果有效负载大小不大于零,它会抛出 IllegalArgumentException
12
检查是否在异常后重试发送消息的方法。Kafka producer 会自动处理特定错误的重试,如连接错误。您可以自定义此方法以包含其他错误。返回 null 和指定例外,或返回没有实现 RetriableException 接口的例外。
13
Kafka 代理确认了消息后调用的方法。成功时,会输出一条消息,其中包含消息的主题、分区和偏移位置的详细信息。如果在发送消息时 ocurred 出错,则会显示错误消息。该方法检查异常,并根据它是致命错误还是非严重错误来采取适当的操作。如果错误是非严重的,则消息发送过程将继续。如果错误是致命的,将输出堆栈追踪,并且生成者终止。

错误处理

producer 应用程序发现的致命异常:

InterruptedException
当当前线程在暂停时中断时抛出错误。在停止或关闭制作者时,通常会中断。异常被重新增长为 RuntimeException,后者终止制作者。
IllegalArgumentException
当生成者收到无效的或不当参数时,抛出错误。例如,如果缺少主题,则会抛出异常。
UnsupportedOperationException
不支持操作或未实施方法时抛出错误。例如,如果尝试使用不受支持的制作者配置或调用 KafkaProducer 类不支持的方法,则会抛出异常。

producer 应用程序发现的非严重异常:

RetriableException
对于实现 Kafka 客户端库提供的 RetriableException 接口的异常抛出错误。

使用非严重错误时,生成者将继续发送消息。

注意

默认情况下,Kafka 使用 at-least-once 消息交付语义进行操作,这意味着在特定情况下,消息可能会多次发送,可能会导致重复。要避免这种风险,请考虑在 Kafka producer 中启用事务。事务可以更强地保证一次交付。另外,您可以使用 retries 配置属性来控制制作者将在放弃前重试发送消息的次数。此设置会影响在消息发送错误期间 retriable 方法可能会返回 true 的次数。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat, Inc.