第 4 章 配置客户端应用程序以连接到 Kafka 集群
要连接到 Kafka 集群,客户端应用程序必须使用标识代理并启用连接的最小属性集进行配置。另外,您需要添加一个 serializer/deserializer 机制,来将信息转换为 Kafka 使用的字节数组格式。在开发消费者客户端时,您首先向 Kafka 集群添加初始连接,该连接用于发现所有可用的代理。建立连接时,您可以开始消耗来自 Kafka 主题的信息,或向它们生成信息。
虽然不需要,但建议使用唯一的客户端 ID,以便您可以在日志和指标集合中对客户端进行身份。
您可以在属性文件中配置属性。使用属性文件意味着您可以在不编译代码的情况下修改配置。
例如,您可以使用以下代码在 Java 客户端中载入属性:
将配置属性加载到客户端中
Properties props = new Properties(); try (InputStream propStream = Files.newInputStream(Paths.get(filename))) { props.load(propStream); }
Properties props = new Properties();
try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
props.load(propStream);
}
您还可以使用直接将属性添加到配置对象中的代码中。例如,您可以将 setProperty ()
方法用于 Java 客户端应用程序。当您只配置少量属性时,直接添加属性是有用的选项。
4.1. 基本制作者客户端配置 复制链接链接已复制到粘贴板!
开发制作者客户端时,配置以下内容:
- 与 Kafka 集群的连接
- 将消息密钥转换为 Kafka 代理的字节的序列化器
- 将消息值转换为 Kafka 代理的字节的序列化器
如果要发送和存储压缩消息,您可能还会添加压缩类型。
基本制作者客户端配置属性
client.id = my-producer-id bootstrap.servers = my-cluster-kafka-bootstrap:9092 key.serializer = org.apache.kafka.common.serialization.StringSerializer value.serializer = org.apache.kafka.common.serialization.StringSerializer
client.id = my-producer-id
bootstrap.servers = my-cluster-kafka-bootstrap:9092
key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer
直接将制作者客户端配置添加到代码中
KafkaProducer
为它发送的消息指定字符串键和值类型。使用的序列化器必须能够将指定类型的键和值转换为字节,然后才能将它们发送到 Kafka。