191.8. 在 Kafka 使用者中使用手动提交
从 Camel 2.21 开始提供
默认情况下,Kafka 使用者将使用自动提交,其中偏移将使用给定间隔在后台自动提交。
如果要强制手动提交,您可以使用 Camel Exchange 中的 KafkaManualCommit API,存储在消息标头中。这要求通过将 KafkaComponent 或端点上的 allowManualCommit 设置为 true 来打开手动提交,例如:
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
然后,您可以使用 Java 代码(如 Camel 处理器)的 KafkaManualCommit :
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
这将强制进行同步提交,它将阻止到提交在 Kafka 上确认,或者抛出异常。
如果要使用 KafkaManualCommit 的自定义实现,您可以在 KafkaComponent 中配置自定义 KafkaManualCommitFactory,用于创建自定义实现实例。