66.7. 使用方法
66.7.1. consumer エラー処理 リンクのコピーリンクがクリップボードにコピーされました!
kafka consumer が kafka ブローカーからのメッセージをポーリングしている間、エラーが発生する可能性があります。このセクションでは、何が起こるか、何を設定できるかを説明します。
Kafka poll API を呼び出すときに、consumer が例外を出力する場合があります。たとえば、無効なデータやその他の多くの種類のエラーが原因でメッセージをデシリアライズできない場合です。これらのエラーは、再試行 可能 かどうかのいずれかである KafkaException の形式です。再試行できる例外 (RetriableException) は、再試行されます (その間にポーリングタイムアウトがあります)。他のすべての種類の例外は、pollOnError 設定に従って処理されます。この設定には次の値があります。
- DISCARD はメッセージを破棄し、次のメッセージのポーリングを続行します。
- ERROR_HANDLER は Camel のエラーハンドラーを使用して例外を処理し、その後、次のメッセージのポーリングを続行します。
- RECONNECT は consumer に再接続し、メッセージのポーリングを再試行します。
- RETRY を使用すると、consumer は同じメッセージのポーリングを再試行できます
- STOP は consumer を停止します (consumer がメッセージを再度消費できるようにする必要がある場合は、手動で開始/再起動する必要があります)。
デフォルトは ERROR_HANDLER で、Camel のエラーハンドラー (設定されている場合) が原因の例外を処理します。その後、次のメッセージのポーリングを続けます。この動作は、Camel コンポーネントが持つ bridgeErrorHandler オプションに似ています。
高度な制御を行う場青は、org.apache.camel.component.kafka.PollExceptionStrategy のカスタム実装をコンポーネントレベルで設定することで、各例外に対してどのストラテジーを使用するかを制御できます。
66.7.2. コンシューマーエラー処理 (advanced) リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Camel は例外を処理するために ERROR_HANDLER を使用してポーリングします。Camel による例外となるメッセージの処理方法は、設定の breakOnFirstError 属性を使用して変更できます。Camel は、次のメッセージをポーリングし続ける代わりに、例外の原因となったメッセージが再試行されるようにオフセットをコミットします。これは上記の RETRY ポーリングストラテジーに似ています。
KafkaComponent kafka = new KafkaComponent();
kafka.setBreakOnFirstError(true);
...
camelContext.addComponent("kafka", kafka);
以下の "Kafka コンシューマーでの手動コミットの使用" セクションで、設定済みの CommitManager に基づく breakOnFirstError の動作を確認することが推奨されます。