4.2. 基本消费者客户端配置


开发消费者客户端时,请配置以下内容:

  • 与 Kafka 集群的连接
  • 将从 Kafka 代理获取的字节数转换为客户端应用程序可以理解的消息键的反序列化器
  • 将从 Kafka 代理获取的字节数转换为客户端应用程序可理解的消息值的反序列化器

通常,您还会添加消费者组 ID,以将消费者与消费者组关联。消费者组是一种逻辑实体,用于将大型数据流的处理从一个或多个主题分发到并行消费者。消费者使用 group.id 分组,允许消息分散到成员中。在给定的消费者组中,每个主题分区都由一个消费者读取。单个消费者可以处理许多分区。要获得最大并行性,为每个分区创建一个消费者。如果消费者超过分区,一些消费者保持闲置状态,在出现故障时可以接管。

基本消费者客户端配置属性

client.id = my-consumer-id 
1

group.id = my-group-id 
2

bootstrap.servers = my-cluster-kafka-bootstrap:9092 
3

key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
4

value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
5
Copy to Clipboard Toggle word wrap

1
客户端的逻辑名称。
2
消费者的组 ID,以便可以加入特定的消费者组。
3
客户端的 bootstrap 地址可以进行到 Kafka 集群的初始连接。
4
deserializer 将从 Kafka 代理获取的字节数转换为消息密钥。
5
deserializer 将从 Kafka 代理获取的字节数转换为消息值。

直接将消费者客户端配置添加到代码中

Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
Copy to Clipboard Toggle word wrap

KafkaConsumer 指定它接收的消息的字符串键和值类型。使用的序列化器必须能够将从 Kafka 接收的字节数转换为指定类型。

注意

每个消费者组必须具有唯一的 group.id。如果您重启具有相同 group.id 的消费者,它会在停止前从其离开的位置恢复其消息。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat