4.2. 基本消费者客户端配置
开发消费者客户端时,请配置以下内容:
- 与 Kafka 集群的连接
- 将从 Kafka 代理获取的字节数转换为客户端应用程序可以理解的消息键的反序列化器
- 将从 Kafka 代理获取的字节数转换为客户端应用程序可理解的消息值的反序列化器
通常,您还会添加消费者组 ID,以将消费者与消费者组关联。消费者组是一种逻辑实体,用于将大型数据流的处理从一个或多个主题分发到并行消费者。消费者使用 group.id
分组,允许消息分散到成员中。在给定的消费者组中,每个主题分区都由一个消费者读取。单个消费者可以处理许多分区。要获得最大并行性,为每个分区创建一个消费者。如果消费者超过分区,一些消费者保持闲置状态,在出现故障时可以接管。
基本消费者客户端配置属性
client.id = my-consumer-id group.id = my-group-id bootstrap.servers = my-cluster-kafka-bootstrap:9092 key.deserializer = org.apache.kafka.common.serialization.StringDeserializer value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
client.id = my-consumer-id
group.id = my-group-id
bootstrap.servers = my-cluster-kafka-bootstrap:9092
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
直接将消费者客户端配置添加到代码中
Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
KafkaConsumer
指定它接收的消息的字符串键和值类型。使用的序列化器必须能够将从 Kafka 接收的字节数转换为指定类型。
注意
每个消费者组必须具有唯一的 group.id
。如果您重启具有相同 group.id
的消费者,它会在停止前从其离开的位置恢复其消息。