4.2. 기본 소비자 클라이언트 구성
소비자 클라이언트를 개발할 때 다음을 구성합니다.
- Kafka 클러스터에 대한 연결
- Kafka 브로커에서 가져온 바이트를 클라이언트 애플리케이션에서 이해할 수 있는 메시지 키로 변환하는 역직렬자
- Kafka 브로커에서 가져온 바이트를 클라이언트 애플리케이션에서 이해할 수 있는 메시지 값으로 변환하는 역직렬자
일반적으로 소비자 그룹 ID를 추가하여 소비자 그룹과 소비자 그룹을 연결합니다. 소비자 그룹은 하나 이상의 주제에서 병렬 소비자에 대규모 데이터 스트림 처리를 분산하기 위한 논리적 엔티티입니다. Consumer는 group.id
를 사용하여 그룹화되므로 메시지가 멤버 전체에 분산됩니다. 지정된 소비자 그룹에서 각 주제 파티션을 단일 소비자가 읽습니다. 단일 소비자는 여러 파티션을 처리할 수 있습니다. 최대 병렬 처리의 경우 각 파티션에 대해 하나의 소비자를 만듭니다. 파티션보다 많은 소비자가 있는 경우 일부 소비자는 유휴 상태로 유지되고 오류가 발생할 경우 이를 대신할 준비가 되어 있습니다.
기본 소비자 클라이언트 구성 속성
client.id = my-consumer-id 1 group.id = my-group-id 2 bootstrap.servers = my-cluster-kafka-bootstrap:9092 3 key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 4 value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 5
코드에 소비자 클라이언트 구성 직접 추가
Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
KafkaConsumer
는 수신하는 메시지에 대한 문자열 키 및 값 유형을 지정합니다. 사용된 serializers는 Kafka에서 수신한 바이트를 지정된 유형으로 변환할 수 있어야 합니다.
각 소비자 그룹에는 고유한 group.id
가 있어야 합니다. 동일한 group.id
를 사용하여 소비자를 다시 시작하면 중지되기 전에 해제된 위치에서 메시지 사용을 재개합니다.