190.7. Kafka idempotent 리포지토리 사용


Camel 2.19에서 사용 가능

camel-kafka 라이브러리는 Kafka 주제 기반 idempotent 리포지토리를 제공합니다. 이 리포지토리는 Kafka 주제에서 모든 변경 사항을 idempotent 상태(추가/제거)에 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 내 캐시를 채웁니다.

사용된 주제는 idempotent 리포지토리 인스턴스별로 고유해야 합니다. 리포지토리가 모든 파티션에서 동시에 소비되므로 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 또한 주제의 복제 요인에 대한 요구 사항이 없습니다.

주제를 사용하는 각 리포지토리 인스턴스(예: 일반적으로 병렬로 실행되는 다른 시스템에서)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10개의 Camel 프로세스 클러스터에서 각각 고유한 오프셋을 제어합니다.

시작 시 인스턴스가 주제를 구독하고 오프셋을 시작으로 되감므로 캐시를 최신 상태로 다시 빌드합니다. 1개의 poll of pollDurationMs 의 길이가 0 레코드를 반환할 때까지 캐시가 워밍된 것으로 간주되지 않습니다. 시작은 캐시가 워밍업 또는 30초가 될 때까지 완료되지 않으며 30초가 지나면 소비자가 주제 종료까지 멱등화되지 않은 상태가 될 수 있습니다.

KafkaIdempotentRepository 에는 다음과 같은 속성이 있습니다.

Expand
속성설명

주제

변경 사항을 브로드캐스트하는 데 사용할 Kafka 주제의 이름입니다. (필수)

bootstrapServers

내부 Kafka 프로듀서 및 소비자의 bootstrap.servers 속성입니다. consumerConfigproducerConfig 를 설정하지 않는 경우 이를 단기적으로 사용합니다. 사용하는 경우 이 구성 요소는 생산자 및 소비자에 합리적인 기본 구성을 적용합니다.

producerConfig

변경을 브로드캐스트하는 Kafka 프로듀서에서 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.

consumerConfig

주제에서 캐시를 채우는 Kafka 소비자가 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.

maxCacheSize

가장 최근에 사용된 키 중 메모리에 저장해야 하는 키(기본값 1000)는 몇 개입니까.

pollDurationMs

Kafka 소비자의 폴링 기간입니다. 로컬 캐시는 즉시 업데이트됩니다. 이 값은 주제에서 캐시를 업데이트하는 다른 피어가 캐시 작업 메시지를 보낸 멱등 소비자 인스턴스와 상대적인지에 영향을 미칩니다. 이 값의 기본값은 100ms입니다.
이 값을 명시적으로 설정하는 경우 원격 캐시 활동성과 이 리포지토리의 소비자와 Kafka 브로커 간 네트워크 트래픽 볼륨 사이에 장단점이 있는지 확인합니다. 캐시 워밍업 프로세스는 스트림이 현재 시점까지 사용되었음을 나타냅니다. 주제에서 메시지가 전송되는 비율에 대해 폴링 기간이 과도하게 길면 캐시가 워밍업할 수 없고 일관성 없는 상태로 작동할 가능성이 있습니다.

주제bootstrapServers 를 정의하여 리포지토리를 인스턴스화하거나 producerConfigconsumerConfig 속성 세트를 명시적으로 정의하여 SSL/ECDHEL과 같은 기능을 활성화할 수 있습니다.

사용하려면 CamelContext 에서 수동으로 또는 Spring/Blueprint의Bean으로 등록하여 Camel 레지스트리에 배치해야 합니다.

샘플 사용은 다음과 같습니다.

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

XML에서 다음을 수행합니다.

<!-- simple -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동