camel-kafka 라이브러리는 Kafka 주제 기반 멱등 리포지토리를 제공합니다. 이 리포지토리는 Kafka 주제의 멱등 상태(추가/제거)에 대한 모든 변경 사항을 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 내 캐시를 채웁니다.
사용된 항목은 멱등 리포지토리 인스턴스별로 고유해야 합니다. 이 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 리포지토리가 모든 파티션에서 동시에 사용되므로. 또한 주제의 복제 요소에 대한 요구 사항이 없습니다.
주제를 사용하는 각 리포지토리 인스턴스(예: 병렬로 실행되는 다른 시스템에서)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10 Camel 프로세스의 클러스터에서 각각 자체 오프셋을 제어합니다.
시작 시 인스턴스에서 주제를 구독하고 처음 오프셋을 다시 시도하여 캐시를 최신 상태로 다시 빌드합니다. 이 캐시는 pollDurationM 의 하나의 폴링이 0 레코드를 반환할 때까지 워밍업으로 간주되지 않습니다. 시작은 캐시가 웜화되거나 30초가 될 때까지 완료되지 않습니다. 후자의 경우 소비자가 주제가 끝날 때까지 멱등 리포지토리가 일관성 없는 상태가 될 수 있습니다.
KafkaIdempotentRepository 에는 다음 속성이 있습니다.
Expand
속성
설명
topic
변경 사항을 브로드캐스트하는 데 사용할 Kafka 항목의 이름입니다. (필수)
bootstrapServers
내부 Kafka 생산자 및 소비자의 bootstrap.servers 속성입니다. consumerConfig 및 producerConfig 를 설정하지 않는 경우 이를 단축으로 사용합니다. 사용하는 경우 이 구성 요소는 생산자 및 소비자에 적절한 기본 구성을 적용합니다.
producerConfig
변경 사항을 브로드캐스트하는 Kafka 생산자에서 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.
consumerConfig
주제에서 캐시를 채우는 Kafka 소비자가 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.
maxCacheSize
메모리에 저장해야 하는 가장 최근에 사용된 키 수(기본값 1000)입니다.
pollDurationMs
Kafka 소비자의 폴링 기간입니다. 로컬 캐시는 즉시 업데이트됩니다. 이 값은 주제에서 캐시를 업데이트하는 다른 피어 뒤에는 캐시 작업 메시지를 보낸 멱등 소비자 인스턴스와 관련된 정도에 영향을 미칩니다. 기본값은 100ms입니다. 이 값을 명시적으로 설정하는 경우 원격 캐시 liveness와 이 리포지토리의 소비자와 Kafka 브로커 간의 네트워크 트래픽 볼륨 사이에 장단점이 있는지 확인합니다. 캐시 웜 업 프로세스는 또한 아무것도 가져오지 않는 하나의 폴링이 있는 곳에 따라 달라집니다 - 이는 스트림이 현재 지점까지 사용되었음을 나타냅니다. 주제에서 메시지가 전송되는 비율에 대해 폴링 기간이 과도하게 길면 캐시가 워밍되지 않고 피어와 비교하여 일관성 없는 상태로 작동할 가능성이 있습니다.
주제 및 bootstrapServers 를 정의하여 리포지토리를 인스턴스화할 수 있습니다. 또는 producerConfig 및 consumerConfig 속성 세트를 명시적으로 정의하여 SSL/SASL과 같은 기능을 활성화할 수 있습니다.
이 리포지토리를 사용하려면 수동으로 또는 Spring/Blueprint의 빈으로 등록하여 Camel 레지스트리에 배치해야 합니다.
샘플 사용법은 다음과 같습니다.
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
Copy to ClipboardCopied!Toggle word wrapToggle overflow