搜索

34.9. 使用 Kafka 幂等存储库

download PDF

camel-kafka 库提供了一个基于 Kafka 主题的幂等存储库。

此存储库在 Kafka 主题中将所有更改广播到幂等状态(add/remove),并通过事件源为每个存储库的进程实例填充本地内存缓存。使用的主题必须为每个幂等存储库实例是唯一的。

机制没有任何有关主题分区数量的要求;因为存储库同时从所有分区消耗。它还没有任何有关主题复制因素的要求。

使用主题(例如,并行运行的不同机器上)的每个存储库实例控制其自身的消费者组,因此在使用相同主题的 10 个 Camel 进程集群中,控制其自身偏移。

在启动时,实例订阅该主题,并将偏移回开始,将缓存重建为最新状态。在一次轮询 轮询持续时间M 前,缓存不会被认为是 0 个记录。在缓存热上或 30 秒发生之前,启动才会完成;如果后者发生幂等存储库可能处于不一致的状态,直到消费者捕获到主题的末尾为止。

请注意用于唯一检查的标头格式。默认情况下,它使用 Strings 作为数据类型。使用原语数字格式时,必须相应地对标头进行反序列化。例如,请检查以下示例。

KafkaIdempotentRepository 具有以下属性:

属性描述

topic

用于广播更改的 Kafka 主题的名称。(必需)

bootstrapServers

内部 Kafka 生成者和消费者上的 bootstrap.servers 属性。如果没有设置 consumerConfigproducerConfig,则将其用作缩写。如果使用,此组件将为生成者和消费者应用可配置默认配置。

producerConfig

设置 Kafka producer 将用于广播更改的属性。覆盖 bootstrapServers,因此必须定义 Kafka bootstrap.servers 属性本身

consumerConfig

设置 Kafka 消费者将使用的属性,该消费者从主题填充缓存。覆盖 bootstrapServers,因此必须定义 Kafka bootstrap.servers 属性本身

maxCacheSize

最近使用的密钥应存储在内存中(默认值 1000)。

pollDurationMs

Kafka 消费者的轮询持续时间。本地缓存会立即更新。这个值会影响从主题更新其缓存的其他对等点后如何相对于发送缓存操作消息的幂等消费者实例。默认值为 100 ms。
如果明确设置这个值,请注意,远程缓存存活度和此存储库消费者和 Kafka 代理之间的网络流量卷之间有一个权衡。缓存温过程还依赖于一个轮询,它获取任何内容 - 这表示该流已消耗到当前点。如果轮询持续时间对于主题上发送消息的速率过长,则缓存可能无法热热,并且会相对于对等点运行处于不一致的状态,直到捕获为止。

可以通过定义 主题和 bootstrapServers 来实例化存储库,或者明确定义 producerConfigconsumerConfig 属性集来启用 SSL/SASL 等功能。要使用,此存储库必须手动放置在 Camel 注册表中,也可以作为 bean 在 Spring/Blueprint 中进行注册,因为它为 CamelContext 感知。

用法示例如下:

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

在 XML 中:

<!-- simple -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo"
  class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>

在使用带有数字标识符的 idempotency 时,可以选择 3 个替代方案。第一个是使用来自 org.apache.camel.component.kafka.serde.KafkaSerdeHelper 的静态方法 number Header 方法为您执行转换:

from("direct:performInsert")
    .idempotentConsumer(numericHeader("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

另外,也可以使用通过路由 URL 配置的自定义序列化程序来执行转换:

public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
    private static final Logger LOG = LoggerFactory.getLogger(CustomHeaderDeserializer.class);

    @Override
    public Object deserialize(String key, byte[] value) {
        if (key.equals("id")) {
            BigInteger bi = new BigInteger(value);

            return String.valueOf(bi.longValue());
        } else {
            return super.deserialize(key, value);
        }
    }
}

最后,也可以在处理器中执行此操作:

from(from).routeId("foo")
    .process(exchange -> {
        byte[] id = exchange.getIn().getHeader("id", byte[].class);

        BigInteger bi = new BigInteger(id);
        exchange.getIn().setHeader("id", String.valueOf(bi.longValue()));
    })
    .idempotentConsumer(header("id"))
    .messageIdRepositoryRef("kafkaIdempotentRepository")
    .to(to);
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.