26.9. Kafka 멱등 리포지토리 사용


camel-kafka 라이브러리는 Kafka 주제 기반 멱등 리포지토리를 제공합니다.

이 리포지토리는 Kafka 주제의 멱등 상태(추가/제거)에 대한 모든 변경 사항을 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 내 캐시를 채웁니다. 사용된 항목은 멱등 리포지토리 인스턴스별로 고유해야 합니다.

이 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 리포지토리가 모든 파티션에서 동시에 사용되므로. 또한 주제의 복제 요소에 대한 요구 사항이 없습니다.

주제를 사용하는 각 리포지토리 인스턴스(예: 병렬로 실행되는 다른 시스템에서)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10 Camel 프로세스의 클러스터에서 각각 자체 오프셋을 제어합니다.

시작 시 인스턴스에서 주제를 구독하고 처음 오프셋을 다시 시도하여 캐시를 최신 상태로 다시 빌드합니다. 이 캐시는 pollDurationM 의 하나의 폴링이 0 레코드를 반환할 때까지 워밍업으로 간주되지 않습니다. 시작은 캐시가 웜화되거나 30초가 될 때까지 완료되지 않습니다. 후자의 경우 소비자가 주제가 끝날 때까지 멱등 리포지토리가 일관성 없는 상태가 될 수 있습니다.

고유성 검사에 사용되는 헤더의 형식을 염두에 두십시오. 기본적으로 Strings를 데이터 유형으로 사용합니다. 기본 숫자 형식을 사용하는 경우 헤더를 적절하게 역직렬화해야 합니다. 아래 샘플에서 예를 확인하십시오.

KafkaIdempotentRepository 에는 다음 속성이 있습니다.

Expand
속성설명

topic

변경 사항을 브로드캐스트하는 데 사용할 Kafka 항목의 이름입니다. (필수)

bootstrapServers

내부 Kafka 생산자 및 소비자의 bootstrap.servers 속성입니다. consumerConfigproducerConfig 를 설정하지 않는 경우 이를 단축으로 사용합니다. 사용하는 경우 이 구성 요소는 생산자 및 소비자에 적절한 기본 구성을 적용합니다.

producerConfig

변경 사항을 브로드캐스트하는 Kafka 생산자에서 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.

consumerConfig

주제에서 캐시를 채우는 Kafka 소비자가 사용할 속성을 설정합니다. bootstrapServers 를 재정의하므로 Kafka bootstrap.servers 속성 자체를 정의해야 합니다.

maxCacheSize

메모리에 저장해야 하는 가장 최근에 사용된 키 수(기본값 1000)입니다.

pollDurationMs

Kafka 소비자의 폴링 기간입니다. 로컬 캐시는 즉시 업데이트됩니다. 이 값은 주제에서 캐시를 업데이트하는 다른 피어 뒤에는 캐시 작업 메시지를 보낸 멱등 소비자 인스턴스와 관련된 정도에 영향을 미칩니다. 기본값은 100ms입니다.
이 값을 명시적으로 설정하는 경우 원격 캐시 liveness와 이 리포지토리의 소비자와 Kafka 브로커 간의 네트워크 트래픽 볼륨 사이에 장단점이 있는지 확인합니다. 캐시 웜 업 프로세스는 또한 아무것도 가져오지 않는 하나의 폴링이 있는 곳에 따라 달라집니다 - 이는 스트림이 현재 지점까지 사용되었음을 나타냅니다. 주제에서 메시지가 전송되는 비율에 대해 폴링 기간이 과도하게 길면 캐시가 워밍되지 않고 피어와 비교하여 일관성 없는 상태로 작동할 가능성이 있습니다.

주제bootstrapServers 를 정의하여 리포지토리를 인스턴스화할 수 있습니다. 또는 producerConfigconsumerConfig 속성 세트를 명시적으로 정의하여 SSL/SASL과 같은 기능을 활성화할 수 있습니다. 이 리포지토리를 사용하려면 수동으로 또는 Spring/Blueprint의 빈으로 등록하여 Camel 레지스트리에 배치해야 합니다.

샘플 사용법은 다음과 같습니다.

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

XML에서 다음을 수행합니다.

<!-- simple -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>

숫자 식별자와 idempotency를 사용할 때 선택할 수 있는 세 가지 대안이 있습니다. 첫 번째는 org.apache.camel.component.kafka.serde.KafkaSerdeHelper 에서 정적 메서드 numericHeader 메서드를 사용하여 사용자를 위한 변환을 수행하는 것입니다.

from("direct:performInsert")
    .idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

또는 경로 URL을 통해 구성된 사용자 지정 serializer를 사용하여 변환을 수행할 수도 있습니다.

public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
    private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);

    @Override
    public Object deserialize(String key, byte[] value) {
        if (key.equals("id")) {
            BigInteger bi = new BigInteger(value);

            return String.valueOf(bi.longValue());
        } else {
            return super.deserialize(key, value);
        }
    }
}

마지막으로 프로세서에서 이를 수행할 수도 있습니다.

from(from).routeId("foo")
    .process(exchange -> {
        byte[] id = exchange.getIn().getHeader("id", byte[].class);

        BigInteger bi = new BigInteger(id);
        exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
    })
    .idempotentConsumer(header("id"))
    .messageIdRepositoryRef("kafkaIdempotentRepository")
    .to(to);
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동