65.14. 批处理 Consumer


要使用 Camel 的 Kafka 批处理使用者,应用程序必须将配置 批处理 设置为 true

接收的记录存储在管道中使用的交换的列表中。因此,可以通过提交列表中的最后一个交换,一次单独提交每个记录或整个批处理。

批处理的大小由选项 maxPollRecords 控制。

为了避免阻塞太长,请等待整个记录集填充批处理,可以使用 pollTimeoutMs 选项为轮询设置超时。在这种情况下,批处理可能包含比 maxPollRecords 中设置的消息小。

65.14.1. 自动提交

默认情况下,Camel 在使用批处理时使用自动提交。在这种情况下,Camel 会在应用程序成功处理后自动提交记录。

如果出现故障,则不会处理记录。以下代码提供了此方法的示例:

public void configure() {
    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
        for (Object obj : exchanges) {
            if (obj instanceof Exchange exchange) {
                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
            }
        }
    }).to(KafkaTestUtil.MOCK_RESULT);
}

65.14.1.1. 使用自动提交处理错误

在使用自动提交时,如果处理失败时,Camel 不会提交记录。因此,有可能会多次重新处理记录的风险。

建议您实施适当的错误处理机制和模式(如死信队列),以防止失败的记录阻止处理进度。

以下代码提供了处理自动提交错误的示例:

public void configure() {
    /*
     We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
     production scenario, applications should probably send these records to a separate topic or fix the condition
     that lead to the failure
     */
    onException(IllegalArgumentException.class).process(exchange -> {
        LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
        LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
    }).continued(true);

    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        // The records from the batch are stored in a list of exchanges in the original exchange.
        int i = 0;
        for (Object o : exchanges) {
            if (o instanceof Exchange exchange) {
                i++;
                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));

                if (i == 4) {
                    throw new IllegalArgumentException("Failed to process record");
                }
            }
        }
    }).to(KafkaTestUtil.MOCK_RESULT);
}

65.14.2. 手动提交

使用手动提交的批处理时,它最多由应用提交记录,并处理潜在无效记录的结果。

以下代码提供了此方法的示例:

public void configure() {
    from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
    .process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        /*
        Every exchange in that list should contain a reference to the manual commit object. We use the reference
        for the last exchange in the list to commit the whole batch
         */
        final Object tmp = exchanges.getLast();
        if (tmp instanceof Exchange exchange) {
            KafkaManualCommit manual =
                    exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
            LOG.debug("Performing manual commit");
            manual.commit();
            LOG.debug("Done performing manual commit");
        }
    });
}

65.14.3. 处理较长的轮询超时

在某些情况下,应用程序可能希望轮询过程需要较长的超时(请参阅: pollTimeoutMs)。

要正确这样做,首先确保具有大于轮询超时的最大轮询间隔(请参阅: maxPollIntervalMs)。

然后,增加关闭超时,以确保提交、关闭和其他 Kafka 操作不会立即中止。例如:

public void configure() {
    // Note that this can be configured in other ways
    getCamelContext().getShutdownStrategy().setTimeout(10000);

    // route setup ...
}
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部