34.10. Kafka 소비자에서 수동 커밋 사용
기본적으로 Kafka 소비자는 지정된 간격을 사용하여 백그라운드에서 오프셋이 자동으로 커밋됩니다.
수동 커밋을 강제 적용하려면 메시지 헤더에 저장된 Camel 교환에서 KafkaManualCommit API를 사용할 수 있습니다. 이렇게 하려면 KafkaComponent 또는 끝점에서 allowManualCommit 옵션을 true 로 설정하거나 끝점에서 수동 커밋을 설정해야 합니다. 예를 들면 다음과 같습니다.
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
그런 다음 Camel Processor 와 같은 Java 코드의 KafkaManualCommit 을 사용할 수 있습니다.
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
}
이렇게 하면 Kafka에서 커밋이 승인되거나 예외가 발생할 때까지 차단되는 동기 커밋이 강제 적용됩니다. 'DefaultKafkaManualAsyncCommitFactory' 구현으로 KafkaManualCommitFactory 를 구성하여 비동기 커밋을 사용할 수 있습니다.
그러면 kafka 비동기 커밋 API를 사용하여 다음 소비자 루프에서 커밋이 수행됩니다. 파티션의 레코드를 처리하여 고유한 스레드에 의해 커밋해야 합니다. 그렇지 않으면 일관성 없는 동작이 발생할 수 있습니다. 이는 집계의 완료 시간 초과 전략에 주로 유용합니다.
KafkaManualCommit 의 사용자 지정 구현을 사용하려면 사용자 지정 구현의 인스턴스를 생성하는 KafkaComponent 에 사용자 지정 KafkaManualCommitFactory 를 구성할 수 있습니다.