第 4 章 配置客户端应用程序以连接到 Kafka 集群


要连接到 Kafka 集群,必须将客户端应用程序配置为一组用来识别代理并启用连接的最小属性。另外,您需要添加一个序列化器/反序列化机制,将信息转换为 Kafka 使用的字节数组格式或移出。在开发消费者客户端时,您首先将初始连接添加到 Kafka 集群,用于发现所有可用的代理。建立连接后,您可以开始使用 Kafka 主题的信息或向它们生成信息。

虽然不需要,但建议使用唯一的客户端 ID,以便您可以在日志和指标集合中标识您的客户端。

您可以在属性文件中配置属性。使用属性文件意味着您可以修改配置,而无需重新编译代码。

例如,您可以使用以下代码在 Java 客户端中载入属性:

将配置属性加载到客户端中

Properties properties = new Properties();
InsetPropertyStream insetPropertyStream = new FileInsetPropertyStream("config.properties");
properties.load(insetPropertyStream);
KafkaProducer<String, String> consumer = new KafkaProducer<>(properties);

您还可以直接使用将属性直接添加到配置对象中的代码中。例如,您可以将 setProperty () 方法用于 Java 客户端应用程序。当您只有少量配置属性时,直接添加属性是一个有用的选项。

4.1. 基本制作者客户端配置

在开发制作者客户端时,请配置以下内容:

  • 与 Kafka 集群的连接
  • 将 Kafka 代理的消息密钥转换为字节的序列化器
  • 将 Kafka 代理的消息值转换为字节的序列化器

如果您想要发送和存储压缩消息,您可能还会添加压缩类型。

基本制作者客户端配置属性

client.id = my-producer-id 1
bootstrap.servers = my-cluster-kafka-bootstrap:9092 2
key.serializer = org.apache.kafka.common.serialization.StringSerializer 3
value.serializer = org.apache.kafka.common.serialization.StringSerializer 4

1
客户端的逻辑名称。
2
客户端的 bootstrap 地址,以便客户端能够进行到 Kafka 集群的初始连接。
3
在发送到 Kafka 代理前,序列化器将消息密钥转换为字节。
4
在发送到 Kafka 代理前,序列化器将消息值转换为字节。

将制作者客户端配置直接添加到代码中

Properties props = new Properties();
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

KafkaProducer 指定它发送的消息的字符串键和值类型。在将键和值从指定类型转换为字节前,必须使用的序列化程序将它们转换为 Kafka。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.