65.14. バッチ処理コンシューマー
Camel で Kafka バッチ処理コンシューマーを使用するには、アプリケーションで batching の設定を true にする必要があります。
受信したレコードは、パイプラインで使用されるエクスチェンジのリストに保存されます。そのため、リストの最後のエクスチェンジをコミットすることで、すべてのレコードを個別にコミットすることも、バッチ全体をまとめてコミットすることもできます。
バッチのサイズはオプション maxPollRecords で制御されます。
レコードセット全体がバッチを埋め尽くすまで待機することで長時間ブロックすることを回避するには、pollTimeoutMs オプションを使用してポーリングのタイムアウトを設定します。この場合、バッチに含まれるメッセージの数は maxPollRecords で設定された数よりも少ない可能性があります。
65.14.1. 自動コミット リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Camel はバッチ処理の使用時に自動コミットを使用します。この場合、Camel はアプリケーションによってレコードが正常に処理された後、自動的にレコードをコミットします。
失敗した場合、レコードは処理されません。以下のコードは、このアプローチの例を示しています。
public void configure() {
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List<?> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
for (Object obj : exchanges) {
if (obj instanceof Exchange exchange) {
LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
}
}
}).to(KafkaTestUtil.MOCK_RESULT);
}
65.14.1.1. 自動コミットエラーの処理 リンクのコピーリンクがクリップボードにコピーされました!
自動コミットを使用する場合、処理に失敗すると Camel はレコードをコミットしません。このため、レコードが複数回再処理されるリスクがあります。
失敗したレコードによって処理の進行が妨げられないように、適切なエラー処理メカニズムとパターン (dead-letter キューなど) を実装することが推奨されます。
以下のコードは、自動コミットエラーの処理例を示しています。
public void configure() {
/*
We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a
production scenario, applications should probably send these records to a separate topic or fix the condition
that lead to the failure
*/
onException(IllegalArgumentException.class).process(exchange -> {
LOG.warn("Failed to process batch {}", exchange.getMessage().getBody());
LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage());
}).continued(true);
from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List<?> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
// The records from the batch are stored in a list of exchanges in the original exchange.
int i = 0;
for (Object o : exchanges) {
if (o instanceof Exchange exchange) {
i++;
LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class));
if (i == 4) {
throw new IllegalArgumentException("Failed to process record");
}
}
}
}).to(KafkaTestUtil.MOCK_RESULT);
}
65.14.2. 手動コミット リンクのコピーリンクがクリップボードにコピーされました!
手動コミットによるバッチ処理を使用する場合、アプリケーションがレコードをコミットし、無効になっている可能性があるレコードの結果を処理します。
以下のコードは、このアプローチの例を示しています。
public void configure() {
from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
.process(e -> {
// The received records are stored as exchanges in a list. This gets the list of those exchanges
final List<?> exchanges = e.getMessage().getBody(List.class);
// Ensure we are actually receiving what we are asking for
if (exchanges == null || exchanges.isEmpty()) {
return;
}
/*
Every exchange in that list should contain a reference to the manual commit object. We use the reference
for the last exchange in the list to commit the whole batch
*/
final Object tmp = exchanges.getLast();
if (tmp instanceof Exchange exchange) {
KafkaManualCommit manual =
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
LOG.debug("Performing manual commit");
manual.commit();
LOG.debug("Done performing manual commit");
}
});
}
65.14.3. 長いポーリングタイムアウトへの対処 リンクのコピーリンクがクリップボードにコピーされました!
場合によっては、アプリケーションでポーリングプロセスのタイムアウトを長く設定するひた右葉があることもあります (pollTimeoutMs を参照)。
これを適切に行うには、まず、最大ポーリング間隔がポーリングタイムアウトよりも大きいことを確認します (maxPollIntervalMs を参照)。
次に、シャットダウンタイムアウトを増やして、コミット、クローズ、およびその他の Kafka 操作が突然中止されないようにします。以下に例を示します。
public void configure() {
// Note that this can be configured in other ways
getCamelContext().getShutdownStrategy().setTimeout(10000);
// route setup ...
}