26.9. Kafka 멱등 리포지토리 사용
camel-kafka 라이브러리는 Kafka 주제 기반 멱등 리포지토리를 제공합니다.
이 리포지토리는 Kafka 주제의 멱등 상태(추가/제거)에 대한 모든 변경 사항을 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 내 캐시를 채웁니다. 사용된 항목은 멱등 리포지토리 인스턴스별로 고유해야 합니다.
이 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 리포지토리가 모든 파티션에서 동시에 사용되므로. 또한 주제의 복제 요소에 대한 요구 사항이 없습니다.
주제를 사용하는 각 리포지토리 인스턴스(예: 병렬로 실행되는 다른 시스템에서)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10 Camel 프로세스의 클러스터에서 각각 자체 오프셋을 제어합니다.
시작 시 인스턴스에서 주제를 구독하고 처음 오프셋을 다시 시도하여 캐시를 최신 상태로 다시 빌드합니다. 이 캐시는 pollDurationM 의 하나의 폴링이 0 레코드를 반환할 때까지 워밍업으로 간주되지 않습니다. 시작은 캐시가 웜화되거나 30초가 될 때까지 완료되지 않습니다. 후자의 경우 소비자가 주제가 끝날 때까지 멱등 리포지토리가 일관성 없는 상태가 될 수 있습니다.
고유성 검사에 사용되는 헤더의 형식을 염두에 두십시오. 기본적으로 Strings를 데이터 유형으로 사용합니다. 기본 숫자 형식을 사용하는 경우 헤더를 적절하게 역직렬화해야 합니다. 아래 샘플에서 예를 확인하십시오.
KafkaIdempotentRepository 에는 다음 속성이 있습니다.
| 속성 | 설명 |
|---|---|
|
| 변경 사항을 브로드캐스트하는 데 사용할 Kafka 항목의 이름입니다. (필수) |
|
|
내부 Kafka 생산자 및 소비자의 |
|
|
변경 사항을 브로드캐스트하는 Kafka 생산자에서 사용할 속성을 설정합니다. |
|
|
주제에서 캐시를 채우는 Kafka 소비자가 사용할 속성을 설정합니다. |
|
| 메모리에 저장해야 하는 가장 최근에 사용된 키 수(기본값 1000)입니다. |
|
|
Kafka 소비자의 폴링 기간입니다. 로컬 캐시는 즉시 업데이트됩니다. 이 값은 주제에서 캐시를 업데이트하는 다른 피어 뒤에는 캐시 작업 메시지를 보낸 멱등 소비자 인스턴스와 관련된 정도에 영향을 미칩니다. 기본값은 100ms입니다. |
주제 및 bootstrapServers 를 정의하여 리포지토리를 인스턴스화할 수 있습니다. 또는 producerConfig 및 consumerConfig 속성 세트를 명시적으로 정의하여 SSL/SASL과 같은 기능을 활성화할 수 있습니다. 이 리포지토리를 사용하려면 수동으로 또는 Spring/Blueprint의 빈으로 등록하여 Camel 레지스트리에 배치해야 합니다.
샘플 사용법은 다음과 같습니다.
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
XML에서 다음을 수행합니다.
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>
숫자 식별자와 idempotency를 사용할 때 선택할 수 있는 세 가지 대안이 있습니다. 첫 번째는 org.apache.camel.component.kafka.serde.KafkaSerdeHelper 에서 정적 메서드 numericHeader 메서드를 사용하여 사용자를 위한 변환을 수행하는 것입니다.
from("direct:performInsert")
.idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
또는 경로 URL을 통해 구성된 사용자 지정 serializer를 사용하여 변환을 수행할 수도 있습니다.
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);
@Override
public Object deserialize(String key, byte[] value) {
if (key.equals("id")) {
BigInteger bi = new BigInteger(value);
return String.valueOf(bi.longValue());
} else {
return super.deserialize(key, value);
}
}
}
마지막으로 프로세서에서 이를 수행할 수도 있습니다.
from(from).routeId("foo")
.process(exchange -> {
byte[] id = exchange.getIn().getHeader("id", byte[].class);
BigInteger bi = new BigInteger(id);
exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
})
.idempotentConsumer(header("id"))
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to(to);