191.7. Kafka idempotent 리포지토리 사용
Camel 2.19에서 사용 가능
camel-kafka 라이브러리는 Kafka 주제 기반 idempotent 리포지토리를 제공합니다. 이 리포지토리는 Kafka 주제의 idempotent 상태(add/remove)에 대한 모든 변경 사항을 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 내 캐시를 채웁니다.
사용되는 주제는 멱등 리포지토리 인스턴스당 고유해야 합니다. 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 리포지토리는 모든 파티션에서 동시에 사용합니다. 또한 주제의 복제 요소에 대한 요구 사항이 없습니다.
주제를 사용하는 각 리포지토리 인스턴스(예: 일반적으로 병렬로 실행되는 다른 시스템에 있는)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10 Camel 프로세스 클러스터에서 각각 고유한 오프셋을 제어합니다.
시작 시 인스턴스는 항목을 구독하고 오프셋을 시작으로 되감고 캐시를 최신 상태로 다시 빌드합니다. 캐시는 pollDurationMs 의 한 번 폴링이 0 레코드를 반환될 때까지 온난 상태로 간주되지 않습니다. 시작은 캐시가 워밍업되거나 30초가 될 때까지 완료되지 않습니다. 후자가 발생하는 경우 멱등 리포지토리가 주제가 끝날 때까지 일관성 없는 상태가 될 수 있습니다.
KafkaIdempotentRepository 에는 다음과 같은 속성이 있습니다.
| 속성 | 설명 |
|---|---|
|
| 변경 사항을 브로드캐스트하는 데 사용할 Kafka 주제의 이름입니다. (필수) |
|
|
내부 Kafka 생산자 및 소비자의 |
|
|
브로드캐스트하는 Kafka 프로듀서에서 사용할 속성을 설정합니다. |
|
|
주제에서 캐시를 채우 Kafka 소비자가 사용할 속성을 설정합니다. |
|
| 가장 최근에 사용된 키의 수를 메모리에 저장해야 합니까(기본값 1000). |
|
|
Kafka 소비자의 폴링 기간입니다. 로컬 캐시는 즉시 업데이트됩니다. 이 값은 캐시 작업 메시지를 보낸 idempotent 소비자 인스턴스와 상대적인 주제에서 캐시를 업데이트하는 다른 피어의 거리에 영향을 미칩니다. 기본값은 100 ms입니다. |
리포지토리는 주제 및 bootstrapServers 를 정의하여 인스턴스화하거나, SSL/SASL과 같은 기능을 활성화하도록 producerConfig 및 consumerConfig 속성 세트를 명시적으로 정의할 수 있습니다.
사용하려면 CamelContext 를 인식하므로 이 리포지토리를 수동으로 또는 Spring/Blueprint에서 빈으로 등록하여 Camel 레지스트리에 배치해야 합니다.
샘플 사용은 다음과 같습니다.
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
XML에서 다음을 수행합니다.
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>