190.7. Kafka idempotent 리포지토리 사용
Camel 2.19에서 사용 가능
camel-kafka 라이브러리는 Kafka 주제 기반 idempotent 리포지토리를 제공합니다. 이 리포지토리는 Kafka 주제에서 모든 변경 사항을 idempotent 상태(추가/제거)에 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 내 캐시를 채웁니다.
사용된 주제는 idempotent 리포지토리 인스턴스별로 고유해야 합니다. 리포지토리가 모든 파티션에서 동시에 소비되므로 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 또한 주제의 복제 요인에 대한 요구 사항이 없습니다.
주제를 사용하는 각 리포지토리 인스턴스(예: 일반적으로 병렬로 실행되는 다른 시스템에서)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10개의 Camel 프로세스 클러스터에서 각각 고유한 오프셋을 제어합니다.
시작 시 인스턴스가 주제를 구독하고 오프셋을 시작으로 되감므로 캐시를 최신 상태로 다시 빌드합니다. 1개의 poll of pollDurationMs 의 길이가 0 레코드를 반환할 때까지 캐시가 워밍된 것으로 간주되지 않습니다. 시작은 캐시가 워밍업 또는 30초가 될 때까지 완료되지 않으며 30초가 지나면 소비자가 주제 종료까지 멱등화되지 않은 상태가 될 수 있습니다.
KafkaIdempotentRepository 에는 다음과 같은 속성이 있습니다.
| 속성 | 설명 |
|---|---|
|
| 변경 사항을 브로드캐스트하는 데 사용할 Kafka 주제의 이름입니다. (필수) |
|
|
내부 Kafka 프로듀서 및 소비자의 |
|
|
변경을 브로드캐스트하는 Kafka 프로듀서에서 사용할 속성을 설정합니다. |
|
|
주제에서 캐시를 채우는 Kafka 소비자가 사용할 속성을 설정합니다. |
|
| 가장 최근에 사용된 키 중 메모리에 저장해야 하는 키(기본값 1000)는 몇 개입니까. |
|
|
Kafka 소비자의 폴링 기간입니다. 로컬 캐시는 즉시 업데이트됩니다. 이 값은 주제에서 캐시를 업데이트하는 다른 피어가 캐시 작업 메시지를 보낸 멱등 소비자 인스턴스와 상대적인지에 영향을 미칩니다. 이 값의 기본값은 100ms입니다. |
주제 및 bootstrapServers 를 정의하여 리포지토리를 인스턴스화하거나 producerConfig 및 consumerConfig 속성 세트를 명시적으로 정의하여 SSL/ECDHEL과 같은 기능을 활성화할 수 있습니다.
사용하려면 CamelContext 에서 수동으로 또는 Spring/Blueprint의Bean으로 등록하여 Camel 레지스트리에 배치해야 합니다.
샘플 사용은 다음과 같습니다.
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
XML에서 다음을 수행합니다.
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>