189.7. 使用 Kafka consumer 的手动提交
可作为 Camel 2.21 可用
默认情况下,Kafka 使用者将使用自动提交,其中偏移会使用给定间隔在后台自动提交。
如果要强制进行手动提交,您可以使用 Camel Exchange 中的 KafkaManualCommit
API,存储在消息标头中。这需要通过在 KafkaComponent
或 端点上将选项 allowManualCommit
设置为 true
来打开手动提交,例如:
KafkaComponent kafka = new KafkaComponent(); kafka.setAllowManualCommit(true); ... camelContext.addComponent("kafka", kafka);
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
然后,您可以使用来自 Java 代码的 KafkaManualCommit
,如 Camel Processor
:
public void process(Exchange exchange) { KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); manual.commitSync(); }
public void process(Exchange exchange) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
这将强制一个同步提交,该提交将阻止,直到在 Kafka 上确认提交被确认,或者是否引发异常。
如果要使用 KafkaManualCommit
的自定义实现,那么您可以在创建自定义实施的 KafkaComponent
上配置自定义 KafkaManualCommitFactory
。