65.9. 在 Kafka 使用者中使用手动提交
默认情况下,Kafka 使用者将使用自动提交,其中偏移将使用给定间隔在后台自动提交。
如果要强制手动提交,您可以使用 Camel Exchange 中的 KafkaManualCommit API,存储在消息标头中。这要求通过将 KafkaComponent 或端点上的 allowManualCommit 设置为 true 来打开手动提交,例如:
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);
然后,您可以使用 Java 代码(如 Camel 处理器)的 KafkaManualCommit :
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
}
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
}
这将强制进行同步提交,它将阻止到提交在 Kafka 上确认,或者抛出异常。您还可以通过将 KafkaManualCommitFactory 配置为 'DefaultKafkaManualAsyncCommitFactory'implementation 来使用异步提交。
然后,提交将使用 kafka 异步提交 api 在下一个消费者循环中进行。请注意,分区中的记录必须通过唯一的线程处理和提交。如果没有,这可能会导致不一致的行为。这对聚合的完成超时策略大体很有用。
如果要使用 KafkaManualCommit 的自定义实现,您可以在 KafkaComponent 中配置自定义 KafkaManualCommitFactory,用于创建自定义实现实例。
当消费者配置为使用手动提交并且特定的 CommitManager 时,了解这些如何影响 breakOnFirstError 的行为非常重要。
当 CommitManager 保留为默认的 NoopCommitManager 时,breakOnFirstError 不会自动提交偏移,以便重试带有错误的消息。消费者必须使用 KafkaManualCommit 在路由中管理它。
当 CommitManager 更改为同步或异步管理器时,breakOnFirstError 将自动提交偏移,以便重试带有错误的消息。此消息将持续重试,直到可以正常处理且没有错误。
- 注意
-
分区中的记录必须通过与消费者相同的线程处理和提交。这意味着 DSL 中的某些 EIP、sync 或 concurrent 操作可能会导致提交失败。在这种情况下,提交事务的 tyring 将导致 Kafka 客户端抛出
java.util.ConcurrentModificationException异常,消息KafkaConsumer 不适用于多线程访问。要防止这种情况,请重新设计您的路由以避免这些操作。 - 这对聚合的完成超时策略大体很有用。
-
分区中的记录必须通过与消费者相同的线程处理和提交。这意味着 DSL 中的某些 EIP、sync 或 concurrent 操作可能会导致提交失败。在这种情况下,提交事务的 tyring 将导致 Kafka 客户端抛出