34.10. 使用 Kafka 使用者手动提交
默认情况下,Kafka 使用者将使用自动提交,其中偏移将使用给定间隔自动提交。
如果要强制手动提交,您可以使用 Camel Exchange 中的 KafkaManualCommit
API,存储在消息标头中。这需要在 KafkaComponent
或端点上将 allowManualCommit
选项设置为 true
来打开手动提交,例如:
KafkaComponent kafka = new KafkaComponent(); kafka.setAllowManualCommit(true); ... camelContext.addComponent("kafka", kafka);
然后,您可以使用 Java 代码中的 KafkaManualCommit
,如 Camel Processor
:
public void process(Exchange exchange) { KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); manual.commit(); }
这将强制同步提交,该提交将在 Kafka 上确认提交,或者抛出异常失败。您可以通过配置带有 'DefaultKafkaManualAsyncCommitFactory' 实现的 KafkaManualCommitFactory
来使用异步提交。
然后,提交将使用 kafka 异步提交 api 在下一个消费者循环中进行。请注意,分区中的记录必须由唯一的线程处理并提交。如果没有,这会导致非一致的行为。这主要用于聚合的完成超时策略。
如果要使用 KafkaManualCommit
的自定义实现,您可以在 KafkaComponent
上配置自定义 KafkaManualCommitFactory
,以创建自定义实现的实例。