65.14. バッチ処理コンシューマー


Camel で Kafka バッチ処理コンシューマーを使用するには、アプリケーションで batching の設定を true にする必要があります。

受信したレコードは、パイプラインで使用されるエクスチェンジのリストに保存されます。そのため、リストの最後のエクスチェンジをコミットすることで、すべてのレコードを個別にコミットすることも、バッチ全体をまとめてコミットすることもできます。

バッチのサイズはオプション maxPollRecords で制御されます。

レコードセット全体がバッチを埋め尽くすまで待機することで長時間ブロックすることを回避するには、pollTimeoutMs オプションを使用してポーリングのタイムアウトを設定します。この場合、バッチに含まれるメッセージの数は maxPollRecords で設定された数よりも少ない可能性があります。

65.14.1. 自動コミット

デフォルトでは、Camel はバッチ処理の使用時に自動コミットを使用します。この場合、Camel はアプリケーションによってレコードが正常に処理された後、自動的にレコードをコミットします。

失敗した場合、レコードは処理されません。以下のコードは、このアプローチの例を示しています。

public void configure() {
    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
        for (Object obj : exchanges) {
            if (obj instanceof Exchange exchange) {
                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
            }
        }
    }).to(KafkaTestUtil.MOCK_RESULT);
}

65.14.1.1. 自動コミットエラーの処理

自動コミットを使用する場合、処理に失敗すると Camel はレコードをコミットしません。このため、レコードが複数回再処理されるリスクがあります。

失敗したレコードによって処理の進行が妨げられないように、適切なエラー処理メカニズムとパターン (dead-letter キューなど) を実装することが推奨されます。

以下のコードは、自動コミットエラーの処理例を示しています。

public void configure() {
    /*
     We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
     production scenario, applications should probably send these records to a separate topic or fix the condition
     that lead to the failure
     */
    onException(IllegalArgumentException.class).process(exchange -> {
        LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
        LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
    }).continued(true);

    from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        // The records from the batch are stored in a list of exchanges in the original exchange.
        int i = 0;
        for (Object o : exchanges) {
            if (o instanceof Exchange exchange) {
                i++;
                LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));

                if (i == 4) {
                    throw new IllegalArgumentException("Failed to process record");
                }
            }
        }
    }).to(KafkaTestUtil.MOCK_RESULT);
}

65.14.2. 手動コミット

手動コミットによるバッチ処理を使用する場合、アプリケーションがレコードをコミットし、無効になっている可能性があるレコードの結果を処理します。

以下のコードは、このアプローチの例を示しています。

public void configure() {
    from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
    .process(e -> {
        // The received records are stored as exchanges in a list. This gets the list of those exchanges
        final List<?> exchanges = e.getMessage().getBody(List.class);

        // Ensure we are actually receiving what we are asking for
        if (exchanges == null || exchanges.isEmpty()) {
            return;
        }

        /*
        Every exchange in that list should contain a reference to the manual commit object. We use the reference
        for the last exchange in the list to commit the whole batch
         */
        final Object tmp = exchanges.getLast();
        if (tmp instanceof Exchange exchange) {
            KafkaManualCommit manual =
                    exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
            LOG.debug("Performing manual commit");
            manual.commit();
            LOG.debug("Done performing manual commit");
        }
    });
}

65.14.3. 長いポーリングタイムアウトへの対処

場合によっては、アプリケーションでポーリングプロセスのタイムアウトを長く設定するひた右葉があることもあります (pollTimeoutMs を参照)。

これを適切に行うには、まず、最大ポーリング間隔がポーリングタイムアウトよりも大きいことを確認します (maxPollIntervalMs を参照)。

次に、シャットダウンタイムアウトを増やして、コミット、クローズ、およびその他の Kafka 操作が突然中止されないようにします。以下に例を示します。

public void configure() {
    // Note that this can be configured in other ways
    getCamelContext().getShutdownStrategy().setTimeout(10000);

    // route setup ...
}
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2026 Red Hat
トップに戻る