26.9. 使用 Kafka 幂等仓库
camel-kafka
库提供了一个基于 Kafka 主题的幂等存储库。
此软件仓库将所有更改存储在 Kafka 主题中,并通过事件源为每个存储库的进程实例填充本地内存缓存。使用的主题必须是每个幂等存储库实例的唯一。
机制对主题分区的数量没有任何要求,因为存储库会同时消耗所有分区。它还对主题的复制工厂没有任何要求。
每个使用主题的仓库实例(例如,通常在并行运行的不同机器上)控制自己的消费者组,因此在使用相同主题的 10 个 Camel 进程集群中会控制自己的误差。
在启动时,实例订阅了主题,并修改开始的偏移,将缓存重建为最新状态。在有长度的 pollDurationMs
轮询时,缓存不会被视为备份,从而返回 0 记录。在缓存得到备份或 30 秒发生前,启动将不会完成;如果后者发生幂等存储库可能处于不一致的状态,直到其消费者赶上该主题的末尾为止。
请注意,用于唯一性检查的标头格式。默认情况下,它使用 Strings 作为数据类型。使用原语格式时,标头必须相应地反序列化。请参阅以下示例来了解示例。
KafkaIdempotentRepository
具有以下属性:
属性 | 描述 |
---|---|
| 用于广播更改的 Kafka 主题的名称。(必需) |
|
内部 Kafka producer 和 consumer 上的 |
|
设置广播更改的 Kafka 制作者将使用的属性。覆盖 |
|
设置 Kafka 用户将使用的属性,用于从主题填充缓存。覆盖 |
| 最近使用的密钥应保存在内存中(默认 1000)。 |
|
Kafka 消费者的轮询持续时间。本地缓存立即更新。这个值会影响将缓存从主题更新其缓存与发送 cache 操作消息的幂等消费者实例的其他对等使用者实例。默认值为 100 ms。 |
可以通过定义 主题
和 bootstrapServers
或 producer
属性集来实例化存储库,以启用 SSL/SASL 等功能。要使用,该存储库必须放在 Camel 注册表中(手动或通过注册作为 bean 在 Spring/Blueprint 中),因为它是 Config
CamelContext
aware。
用法示例如下:
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091"); SimpleRegistry registry = new SimpleRegistry(); registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext CamelContext context = new CamelContext(registry); // later in RouteBuilder... from("direct:performInsert") .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo") // once-only insert into database .end()
在 XML 中:
<!-- simple --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="bootstrapServers" value="localhost:9091"/> </bean> <!-- complex --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="maxCacheSize" value="10000"/> <property name="consumerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> <property name="producerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> </bean>
在使用带有数字标识符的幂等性时,可以选择 3 个选择。第一个方法是使用来自 org.apache.camel.component.kafka.serde.KafkaSerde.KafkaSerdeHelper
的静态方法来为您执行转换:
from("direct:performInsert") .idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo") // once-only insert into database .end()
另外,也可以使用通过路由 URL 配置的自定义序列化器来执行转换:
public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer { private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class); @Override public Object deserialize(String key, byte[] value) { if (key.equals("id")) { BigInteger bi = new BigInteger(value); return String.valueOf(bi.longValue()); } else { return super.deserialize(key, value); } } }
最后,也可以在处理器中执行此操作:
from(from).routeId("foo") .process(exchange -> { byte[] id = exchange.getIn().getHeader("id", byte[].class); BigInteger bi = new BigInteger(id); exchange.getIn().setHeader("id", String.valueOf(bi.longValue())); }) .idempotentConsumer(header("id")) .messageIdRepositoryRef("kafkaIdempotentRepository") .to(to);