第 4 章 配置客户端应用程序以连接到 Kafka 集群
要连接到 Kafka 集群,必须将客户端应用程序配置为一组用来识别代理并启用连接的最小属性。另外,您需要添加一个序列化器/反序列化机制,将信息转换为 Kafka 使用的字节数组格式或移出。在开发消费者客户端时,您首先将初始连接添加到 Kafka 集群,用于发现所有可用的代理。建立连接后,您可以开始使用 Kafka 主题的信息或向它们生成信息。
虽然不需要,但建议使用唯一的客户端 ID,以便您可以在日志和指标集合中标识您的客户端。
您可以在属性文件中配置属性。使用属性文件意味着您可以修改配置,而无需重新编译代码。
例如,您可以使用以下代码在 Java 客户端中载入属性:
将配置属性加载到客户端中
Properties properties = new Properties(); InsetPropertyStream insetPropertyStream = new FileInsetPropertyStream("config.properties"); properties.load(insetPropertyStream); KafkaProducer<String, String> consumer = new KafkaProducer<>(properties);
您还可以直接使用将属性直接添加到配置对象中的代码中。例如,您可以将 setProperty ()
方法用于 Java 客户端应用程序。当您只有少量配置属性时,直接添加属性是一个有用的选项。
4.1. 基本制作者客户端配置
在开发制作者客户端时,请配置以下内容:
- 与 Kafka 集群的连接
- 将 Kafka 代理的消息密钥转换为字节的序列化器
- 将 Kafka 代理的消息值转换为字节的序列化器
如果您想要发送和存储压缩消息,您可能还会添加压缩类型。
基本制作者客户端配置属性
client.id = my-producer-id 1 bootstrap.servers = my-cluster-kafka-bootstrap:9092 2 key.serializer = org.apache.kafka.common.serialization.StringSerializer 3 value.serializer = org.apache.kafka.common.serialization.StringSerializer 4
将制作者客户端配置直接添加到代码中
Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
KafkaProducer
指定它发送的消息的字符串键和值类型。在将键和值从指定类型转换为字节前,必须使用的序列化程序将它们转换为 Kafka。