191.7. 使用 Kafka idempotent 软件仓库


Camel 2.19 中可用

camel-kafka 库提供基于 Kafka 主题的幂等存储库。此存储库将广播到 Kafka 主题中的幂等状态(添加/删除)的所有更改,并通过事件源为每个存储库的进程实例填充本地内存缓存。

使用的主题每个幂等存储库实例必须是唯一的。机制对主题分区的数量没有任何要求,因为存储库同时消耗所有分区。它还对主题的复制因素没有任何要求。

使用主题的每个存储库实例(例如,通常在并行运行的不同机器上)控制其自己的消费者组,因此使用相同主题的 10 个 Camel 进程集群中都会控制自己的偏移量。

在启动时,实例订阅了主题,并将偏移回开始,将缓存重建到最新的状态。缓存不会被视为温设置,直到一个轮询 pollDurationMs (以长度为 0 条)返回 0 记录。在缓存已温启动或 30 秒结束前,启动不会完成;如果后者发生幂等存储库可能处于不一致状态,直到消费者捕获到主题的末尾为止。

KafkaIdempotentRepository 具有以下属性:

Expand
属性描述

topic

用于广播更改的 Kafka 主题的名称(必需)

bootstrapServers

内部 Kafka producer 和消费者上的 bootstrap.servers 属性。如果没有设置 consumerConfigproducerConfig,则使用此选项作为简写。如果使用,此组件将应用生产者和消费者的默认配置。

producerConfig

设置用于广播更改的 Kafka producer 使用的属性。覆盖 bootstrapServers,因此必须定义 Kafka bootstrap.servers 属性本身

consumerConfig

设置 Kafka 使用者将使用的属性,从主题填充缓存。覆盖 bootstrapServers,因此必须定义 Kafka bootstrap.servers 属性本身

maxCacheSize

最近使用多少密钥应存储在内存中(默认值 1000)。

pollDurationMs

Kafka 使用者的轮询持续时间。本地缓存会立即更新。这个值将影响从主题更新其缓存的其他对等点的时长,相对于发送缓存操作消息的幂等消费者实例。默认值为 100 ms。
如果明确设置这个值,请注意,远程缓存存活度和此存储库的消费者和 Kafka 代理之间的网络流量卷之间存在权衡。缓存温进程还取决于有一个无法获取的轮询 - 这表示流已消耗至当前点。如果轮询持续时间太长,对于在主题上发送消息的速率过长,则存在缓存可能无法被温的,并且将在其对等点相对处于不一致的状态进行操作。

可以通过定义 主题和 bootstrapServers、或 producerConfigconsumerConfig 属性集来实例化存储库,以启用 SSL/SASL 等功能。

要使用,此存储库必须放在 Camel registry 中,也可以手动或通过注册作为 Spring/Blueprint 中的 bean,因为它是 CamelContext aware。

示例用法如下:

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()
Copy to Clipboard Toggle word wrap

在 XML 中:

<!-- simple -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>
Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat