此存储库在 Kafka 主题中将所有更改广播到幂等状态(add/remove),并通过事件源为每个存储库的进程实例填充本地内存缓存。使用的主题必须为每个幂等存储库实例是唯一的。
使用主题(例如,并行运行的不同机器上)的每个存储库实例控制其自身的消费者组,因此在使用相同主题的 10 个 Camel 进程集群中,控制其自身偏移。
请注意用于唯一检查的标头格式。默认情况下,它使用 Strings 作为数据类型。使用原语数字格式时,必须相应地对标头进行反序列化。例如,请检查以下示例。
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
from("direct:performInsert")
.idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
from("direct:performInsert")
.idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);
@Override
public Object deserialize(String key, byte[] value) {
if (key.equals("id")) {
BigInteger bi = new BigInteger(value);
return String.valueOf(bi.longValue());
} else {
return super.deserialize(key, value);
}
}
}
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);
@Override
public Object deserialize(String key, byte[] value) {
if (key.equals("id")) {
BigInteger bi = new BigInteger(value);
return String.valueOf(bi.longValue());
} else {
return super.deserialize(key, value);
}
}
}
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow
from(from).routeId("foo")
.process(exchange -> {
byte[] id = exchange.getIn().getHeader("id", byte[].class);
BigInteger bi = new BigInteger(id);
exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
})
.idempotentConsumer(header("id"))
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to(to);
from(from).routeId("foo")
.process(exchange -> {
byte[] id = exchange.getIn().getHeader("id", byte[].class);
BigInteger bi = new BigInteger(id);
exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
})
.idempotentConsumer(header("id"))
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to(to);
Copy to Clipboard
Copied!
Toggle word wrap
Toggle overflow