搜索

34.10. 使用 Kafka 使用者手动提交

download PDF

默认情况下,Kafka 使用者将使用自动提交,其中偏移将使用给定间隔自动提交。

如果要强制手动提交,您可以使用 Camel Exchange 中的 KafkaManualCommit API,存储在消息标头中。这需要在 KafkaComponent 或端点上将 allowManualCommit 选项设置为 true 来打开手动提交,例如:

KafkaComponent kafka = new KafkaComponent();
kafka.setAllowManualCommit(true);
...
camelContext.addComponent("kafka", kafka);

然后,您可以使用 Java 代码中的 KafkaManualCommit,如 Camel Processor

public void process(Exchange exchange) {
    KafkaManualCommit manual =
        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    manual.commit();
}

这将强制同步提交,该提交将在 Kafka 上确认提交,或者抛出异常失败。您可以通过配置带有 'DefaultKafkaManualAsyncCommitFactory' 实现的 KafkaManualCommitFactory 来使用异步提交。

然后,提交将使用 kafka 异步提交 api 在下一个消费者循环中进行。请注意,分区中的记录必须由唯一的线程处理并提交。如果没有,这会导致非一致的行为。这主要用于聚合的完成超时策略。

如果要使用 KafkaManualCommit 的自定义实现,您可以在 KafkaComponent 上配置自定义 KafkaManualCommitFactory,以创建自定义实现的实例。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.