27.9. 使用 Kafka 幂等存储库
camel-kafka 库提供了一个基于 Kafka 主题的幂等存储库。
此软件仓库将广播对 Kafka 主题中的幂等状态(add/remove)的所有更改,并通过事件源为每个存储库的进程实例填充本地内存缓存。使用的主题必须为每个幂等存储库实例唯一。
机制对主题分区的数量没有任何要求;因为存储库会同时消耗所有分区。它还对主题的复制因子有任何要求。
每个使用主题的仓库实例(例如,通常并行运行的不同机器上)控制自己的消费者组,因此在 10 个 Camel 进程集群中,每个主题都控制自己的偏移量。
在启动时,实例订阅该主题,并将偏移重新构建到起始位置,将缓存重新构建到最新的状态。在轮转 DurationM 长度的轮询返回 0 记录之前,该缓存不会被考虑。在缓存结束前,启动不会完成,或由 30 秒为止;如果后者发生幂等存储库,则启动可能处于不一致的状态,直到其消费者捕获到主题的末尾。
请注意用于唯一检查的标头格式。默认情况下,它使用 String 作为数据类型。使用原语数字格式时,标头必须相应地反序列化。检查以下示例。
KafkaIdempotentRepository 具有以下属性:
| 属性 | 描述 |
|---|---|
|
| 用于广播更改的 Kafka 主题的名称。(必需) |
|
|
内部 Kafka 制作者和消费者上的 |
|
|
设置由 Kafka producer 用于广播更改的属性。覆盖 |
|
|
设置 Kafka 使用者将使用的属性,从主题填充缓存。覆盖 |
|
| 最近使用的密钥应存储在内存中数量(默认值 1000)。 |
|
|
Kafka 消费者的轮询持续时间。本地缓存会立即更新。这个值将影响从主题更新其缓存的其他对等点的对等点与发送缓存操作消息的幂等消费者实例。默认值为 100 ms。 |
可以通过定义 主题和 bootstrapServers 或 producerConfig 和 consumerConfig 属性集来实例化存储库,以启用 SSL/SASL 等功能。要使用此存储库,必须手动将此存储库放在 Camel 注册表中,或者通过注册为 Spring/Blueprint 中的 bean,因为它是 CamelContext aware。
示例用法如下:
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
在 XML 中:
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>
使用带有数字标识符的 idempotency 时需要 3 种替代方案。第一个方法是使用 org.apache.camel.component.kafka.serde.KafkaSerdeHelper 中的静态方法 数字标头 方法为您执行转换:
from("direct:performInsert")
.idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()
另外,也可以使用通过路由 URL 配置的自定义序列化器来执行转换:
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);
@Override
public Object deserialize(String key, byte[] value) {
if (key.equals("id")) {
BigInteger bi = new BigInteger(value);
return String.valueOf(bi.longValue());
} else {
return super.deserialize(key, value);
}
}
}
最后,也可以在处理器中这样做:
from(from).routeId("foo")
.process(exchange -> {
byte[] id = exchange.getIn().getHeader("id", byte[].class);
BigInteger bi = new BigInteger(id);
exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
})
.idempotentConsumer(header("id"))
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to(to);