第41章 Consumer インターフェース
概要
本章では、Apache Camel コンポーネントの実装における必須ステップである Consumer インターフェースを実装する方法を説明します。
41.1. Consumer インターフェース リンクのコピーリンクがクリップボードにコピーされました!
概要 リンクのコピーリンクがクリップボードにコピーされました!
org.apache.camel.Consumer タイプのインスタンスは、ルートのソースエンドポイントを表します。コンシューマーの実装方法は複数あります( 「コンシューマーパターンおよびスレッド」を参照)、このレベルの柔軟性は継承階層( 図41.1「コンシューマー継承階層」を参照)に反映されます。これには、コンシューマーを実装するさまざまなベースクラスが含まれます。
図41.1 コンシューマー継承階層
コンシューマーパラメーターの注入 リンクのコピーリンクがクリップボードにコピーされました!
スケジュールされたポーリングパターン( 「スケジュールされたポーリングパターン」を参照)に従うコンシューマーの場合、Apache Camel はパラメーターをコンシューマーインスタンスへインジェクトするサポートを提供します。たとえば、custom プレフィックスで識別されるコンポーネントの以下のエンドポイント URI について考えてみましょう。
custom:destination?consumer.myConsumerParam
custom:destination?consumer.myConsumerParam
Apache Camel は、フォームのクエリーオプション consumer.\* を自動的に注入するためのサポートを提供します。consumer.myConsumerParam パラメーターには、以下のように Consumer 実装クラスで対応する setter メソッドと getter メソッドを定義する必要があります。
getter および setter メソッドは、通常の Java Bean の命名規則に従います (プロパティー名の最初の文字を大文字にすることも含む)。
Consumer 実装で Bean メソッドを定義する他に、Endpoint. createConsumer()の実装で 「スケジュールされたポーリングエンドポイントの実装」を参照してください。例41.1「FileEndpoint createConsumer () 実装」 は、ファイルコンポーネントの configureConsumer() メソッドを呼び出す必要もあります。FileEndpoint クラスから取得した createConsumer() メソッド実装の例を示しています。
例41.1 FileEndpoint createConsumer () 実装
ランタイム時に、コンシューマーパラメーターの注入は以下のようになります。
-
エンドポイントが作成されると、
DefaultComponent.createEndpoint(String uri)のデフォルトの実装は URI を解析してコンシューマーパラメーターを抽出し、ScheduledPollEndpoint.configureProperties()を呼び出してエンドポイントインスタンスに保存します。 -
createConsumer()が呼び出されると、メソッド実装はconfigureConsumer()を呼び出し、コンシューマーパラメーターを注入します (例41.1「FileEndpoint createConsumer () 実装」 を参照)。 -
configureConsumer()メソッドは Java のリフレクションを使用して、consumer.接頭辞を削除にした後に関連するオプションと一致する名前を持つ setter メソッドを呼び出します。
スケジュールされたポーリングパラメーター リンクのコピーリンクがクリップボードにコピーされました!
スケジュールされたポーリングパターンに続くコンシューマーは、表41.1「スケジュールされたポーリングパラメーター」 に記載されているコンシューマーパラメーターを自動的にサポートします(エンドポイント URI でクエリーパラメーターとして表示される可能性があります)。
| 名前 | デフォルト | 説明 |
|---|---|---|
|
|
| 最初のポーリングの前の遅延 (ミリ秒単位)。 |
|
|
|
|
|
|
|
|
イベント駆動型のコンシューマーとポーリングコンシューマー間の変換 リンクのコピーリンクがクリップボードにコピーされました!
Apache Camel は、イベント駆動型のコンシューマーとポーリングコンシューマー間の変換に使用できる 2 つの特殊なコンシューマー実装を提供します。以下の変換クラスが提供されます。
-
org.apache.camel.impl.EventDrivenPollingConsumer: イベント駆動型コンシューマーをポーリングコンシューマーインスタンスに変換します。 -
org.apache.camel.impl.DefaultScheduledPollConsumer: ポーリングコンシューマーをイベント駆動型コンシューマーインスタンスに変換します。
実際には、これらのクラスを使用して Endpoint タイプの実装タスクを単純化します。Endpoint インターフェースは、コンシューマーインスタンスを作成するための以下の 2 つのメソッドを定義します。
createConsumer() はイベント駆動型のコンシューマーを返し、createPollingConsumer() はポーリングコンシューマーを返します。これらのメソッドは 1 つのみ実装します。たとえば、コンシューマーのイベント駆動型パターンに従っている場合、create Consumer() メソッドを実装し、単に例外を発生させる createPollingConsumer() のメソッド実装を提供します。ただし、変換クラスを利用して Apache Camel はより有用なデフォルト実装を提供できます。
たとえば、イベント駆動のパターンに従ってコンシューマーを実装する場合は、DefaultEndpoint を拡張し、createConsumer() メソッドを実装してエンドポイントを実装します。createPollingConsumer() の実装は、以下のように定義される DefaultEndpoint から継承されます。
public PollingConsumer<E> createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer<E>(this);
}
public PollingConsumer<E> createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer<E>(this);
}
EventDrivenPollingConsumer コンストラクターはイベント駆動のコンシューマーへの参照 this を取得し、効果的にラップし、ポーリングコンシューマーに変換します。変換を実装するには、EventDrivenPollingConsumer インスタンスは受信イベントをバッファーし、receive()、receive(long timeout)、および receiveNoWait() メソッドを介してオンデマンドで利用できるようにします。
同様に、ポーリングパターンに従ってコンシューマーを実装する場合は、DefaultPollingEndpoint を拡張し、createPollingConsumer() メソッドを実装してエンドポイントを実装します。この場合、createConsumer() メソッドの実装は DefaultPollingEndpoint から継承され、デフォルトの実装は DefaultScheduledPollConsumer インスタンス (ポーリングコンシューマーをイベント駆動型のコンシューマーに変換) を返します。
ShutdownPrepared インターフェース リンクのコピーリンクがクリップボードにコピーされました!
コンシューマークラスはオプションで org.apache.camel.spi.ShutdownPrepared インターフェースを実装できます。これにより、カスタムコンシューマーエンドポイントがシャットダウン通知を受け取ることができます。
例41.2「ShutdownPrepared インターフェース」 は、ShutdownPrepared インターフェースの定義を示しています。
例41.2 ShutdownPrepared インターフェース
ShutdownPrepared インターフェースは以下のメソッドを定義します。
prepareShutdown以下のように、1 または 2 フェーズでコンシューマーエンドポイントをシャットダウンするための通知を受信します。
-
正常なシャットダウン:
forced引数にfalseの値がある場合。リソースを正常にクリーンアップしようとします。たとえば、スレッドを正常に停止することによりクリーンアップします。 -
Forced shutdown-
forced引数には、値trueがあります。これは、シャットダウンがタイムアウトしたことを意味するため、リソースをより積極的にクリーンアップする必要があります。これは、プロセスが終了する前にリソースをクリーンアップする最後の契機となります。
-
正常なシャットダウン:
ShutdownAware インターフェース リンクのコピーリンクがクリップボードにコピーされました!
コンシューマークラスはオプションで org.apache.camel.spi.ShutdownAware インターフェースを実装できます。このインターフェースは、正常なシャットダウンメカニズムと対話し、コンシューマーがシャットダウンするための追加の時間を要求できるようにします。これは通常、内部キューに保留中のエクスチェンジを保存できる SEDA などのコンポーネントに必要です。通常、SEDA コンシューマーをシャットダウンする前にキューのすべてのエクスチェンジを処理します。
例41.3「ShutdownAware インターフェース」 は、ShutdownAware インターフェースの定義を示しています。
例41.3 ShutdownAware インターフェース
ShutdownAware インターフェースは以下のメソッドを定義します。
deferShutdownコンシューマーのシャットダウンを遅延させる場合は、このメソッドから
trueを返します。shutdownRunningTask引数はenumで、以下のいずれかの値を取ることができます。-
ShutdownRunningTask.CompleteCurrentTaskOnly: コンシューマーのスレッドプールによって現在処理されているエクスチェンジの処理を終了しますが、それ以上のエクスチェンジの処理は試みません。 -
ShutdownRunningTask.CompleteAllTasks- 保留中のエクスチェンジすべてを処理します。たとえば、SEDA コンポーネントの場合、コンシューマーは受信キューからすべてのエクスチェンジを処理します。
-
getPendingExchangesSize- コンシューマーによって処理されるエクスチェンジの数を示します。値をゼロにすると、処理が完了し、コンシューマーをシャットダウンできます。
ShutdownAware メソッドを定義する方法の例は、例41.7「カスタムスレッド実装」 を参照してください。