26.10. 在 Kafka consumer 中使用手动提交
默认情况下,Kafka 使用者将使用自动提交,其中偏移将使用给定间隔自动在后台提交。
如果要强制手动提交,您可以使用来自 Camel Exchange 的 KafkaManualCommit API,存储在消息标头中。这需要通过在 KafkaComponent 或端点上将 allowManualCommit 设置为 true 来打开手动提交,例如:
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
然后,您可以使用来自 Java 代码的 KafkaManualCommit,如 Camel 处理器 :
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
}
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
}
这将强制一个同步提交,它将阻断,直到提交在 Kafka 上确认,或者抛出异常。您还可以使用异步提交,通过"Default KafkaManualAsyncCommit factory"实施方式配置 KafkaManualCommit factory。
然后,提交将在下一个消费者循环中使用 kafka 异步提交 api。请注意,分区中的记录必须由唯一线程处理和提交。如果没有,这会导致没有一致的行为。这主要可用于聚合的完成超时策略。
如果要使用 KafkaManualCommit 的自定义实现,那么您可以在 KafkaComponent 上配置自定义 KafkaManualCommit factory,以创建自定义实现的实例。