29.9. Kafka idempotent 리포지토리 사용


camel-kafka 라이브러리는 Kafka 주제 기반 idempotent 리포지토리를 제공합니다.

이 리포지토리는 Kafka 주제에서 idempotent 상태(add/remove)에 대한 모든 변경 사항을 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 캐시를 채웁니다. 사용된 주제는 idempotent 리포지토리 인스턴스별로 고유해야 합니다.

이 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 리포지토리가 모든 파티션에서 동시에 사용되기 때문입니다. 또한 주제의 복제 요인에 대한 요구 사항은 없습니다.

주제를 사용하는 각 리포지토리 인스턴스(예: 일반적으로 병렬로 실행되는 다른 시스템에서)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10개의 Camel 프로세스 클러스터에서 각각의 오프셋을 제어합니다.

시작 시 인스턴스는 주제를 구독하고 오프셋을 시작으로 되풀이하여 캐시를 최신 상태로 다시 빌드합니다. 이 캐시는 하나의 pollDurationMs in length에서 0 레코드를 반환할 때까지 온 상태로 간주되지 않습니다. 시작은 캐시가 워밍업되거나 30초가 될 때까지 완료되지 않습니다. 후자가 멱등 저장소가 발생하면 소비자가 주제의 마지막까지 오를 때까지 일관되지 않은 상태가 될 수 있습니다.

고유성 검사에 사용되는 헤더의 형식에 유의하십시오. 기본적으로 Strings를 데이터 형식으로 사용합니다. 기본 숫자 형식을 사용하는 경우 헤더를 적절하게 역직렬화해야 합니다. 예제를 보려면 아래 샘플을 확인하십시오.

KafkaIdempotentRepository 에는 다음과 같은 속성이 있습니다.

Expand
속성설명

주제

변경 사항을 브로드캐스트하는 데 사용할 Kafka 주제의 이름입니다. (필수)

bootstrapServers

내부 Kafka 생산자 및 소비자의 bootstrap.servers 속성입니다. consumerConfigproducerConfig 를 설정하지 않는 경우 이 값을 단축으로 사용합니다. 이 구성 요소를 사용하면 프로듀서 및 소비자에게 합리적인 기본 구성을 적용합니다.

producerConfig

변경 사항을 브로드캐스트하는 Kafka 생산자가 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.

consumerConfig

주제의 캐시를 채우는 Kafka 소비자가 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.

maxCacheSize

가장 최근에 사용된 키의 메모리(기본값 1000)는 몇 개입니까.

pollDurationMs

Kafka 소비자의 폴링 기간입니다. 로컬 캐시가 즉시 업데이트됩니다. 이 값은 캐시를 업데이트하는 다른 피어 뒤에서 캐시가 캐시 작업 메시지를 보낸 idempotent Consumer 인스턴스와 얼마나 멀리 떨어져 있는지에 영향을 미칩니다. 기본값은 100ms입니다.
이 값을 명시적으로 설정하는 경우 원격 캐시 활성 상태와 이 리포지토리의 소비자와 Kafka 브로커 간의 네트워크 트래픽 볼륨 간에 절충이 있음을 유의하십시오. 캐시 워밍업 프로세스는 또한 아무 것도 가져오는 하나의 폴링이 있는지에 따라 달라집니다 - 이는 스트림이 현재 시점까지 소비되었음을 나타냅니다. 주제에서 메시지를 보내는 비율에 대해 폴링 기간이 과도하게 길면 캐시를 워밍할 수 없으며 추적할 때까지 해당 피어에 대해 일관되지 않은 상태에서 작동할 가능성이 있습니다.

주제bootstrapServers 를 정의하여 리포지토리를 인스턴스화하거나 producerConfigconsumerConfig 속성 세트를 명시적으로 정의하여 SSL/SASL과 같은 기능을 활성화할 수 있습니다. 사용하려면 CamelContext 를 인식하므로 이 리포지토리를 Spring/Blueprint에서 수동으로 또는 등록한 Camel 레지스트리에 배치해야 합니다.

샘플 사용법은 다음과 같습니다.

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()
Copy to Clipboard Toggle word wrap

XML에서:

<!-- simple -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>
Copy to Clipboard Toggle word wrap

숫자 식별자와 함께 idempotency를 사용할 때 선택할 수 있는 대안은 세 가지가 있습니다. 첫 번째는 org.apache.camel.component.kafka.serde.KafkaSer dedeer의 정적 메서드 numericHeader 메서드를 사용하여 변환을 수행하는 것입니다.

from("direct:performInsert")
    .idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()
Copy to Clipboard Toggle word wrap

또는 경로 URL을 통해 구성된 사용자 정의 serializer를 사용하여 변환을 수행할 수 있습니다.

public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
    private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);

    @Override
    public Object deserialize(String key, byte[] value) {
        if (key.equals("id")) {
            BigInteger bi = new BigInteger(value);

            return String.valueOf(bi.longValue());
        } else {
            return super.deserialize(key, value);
        }
    }
}
Copy to Clipboard Toggle word wrap

프로세서에서 수행할 수도 있습니다.You can also do so in a processor:

from(from).routeId("foo")
    .process(exchange -> {
        byte[] id = exchange.getIn().getHeader("id", byte[].class);

        BigInteger bi = new BigInteger(id);
        exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
    })
    .idempotentConsumer(header("id"))
    .messageIdRepositoryRef("kafkaIdempotentRepository")
    .to(to);
Copy to Clipboard Toggle word wrap
맨 위로 이동
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2025 Red Hat