第41章 Consumer インターフェース
概要
本章では、Apache Camel コンポーネントの実装における必須ステップである Consumer インターフェースを実装する方法を説明します。
41.1. Consumer インターフェース
概要
org.apache.camel.Consumer タイプのインスタンスは、ルートのソースエンドポイントを表します。コンシューマーの実装方法は複数あります( 「コンシューマーパターンおよびスレッド」を参照)、このレベルの柔軟性は継承階層( 図41.1「コンシューマー継承階層」を参照)に反映されます。これには、コンシューマーを実装するさまざまなベースクラスが含まれます。
図41.1 コンシューマー継承階層
コンシューマーパラメーターの注入
スケジュールされたポーリングパターン( 「スケジュールされたポーリングパターン」を参照)に従うコンシューマーの場合、Apache Camel はパラメーターをコンシューマーインスタンスへインジェクトするサポートを提供します。たとえば、custom
プレフィックスで識別されるコンポーネントの以下のエンドポイント URI について考えてみましょう。
custom:destination?consumer.myConsumerParam
Apache Camel は、フォームのクエリーオプション consumer.\*
を自動的に注入するためのサポートを提供します。consumer.myConsumerParam
パラメーターには、以下のように Consumer 実装クラスで対応する setter メソッドと getter メソッドを定義する必要があります。
public class CustomConsumer extends ScheduledPollConsumer {
...
String getMyConsumerParam() { ... }
void setMyConsumerParam(String s) { ... }
...
}
getter および setter メソッドは、通常の Java Bean の命名規則に従います (プロパティー名の最初の文字を大文字にすることも含む)。
Consumer 実装で Bean メソッドを定義する他に、Endpoint. createConsumer()の実装で
「スケジュールされたポーリングエンドポイントの実装」を参照してください。例41.1「FileEndpoint createConsumer () 実装」 は、ファイルコンポーネントの configureConsumer()
メソッドを呼び出す必要もあります。FileEndpoint
クラスから取得した createConsumer()
メソッド実装の例を示しています。
例41.1 FileEndpoint createConsumer () 実装
... public class FileEndpoint extends ScheduledPollEndpoint { ... public Consumer createConsumer(Processor processor) throws Exception { Consumer result = new FileConsumer(this, processor); configureConsumer(result); return result; } ... }
ランタイム時に、コンシューマーパラメーターの注入は以下のようになります。
-
エンドポイントが作成されると、
DefaultComponent.createEndpoint(String uri)
のデフォルトの実装は URI を解析してコンシューマーパラメーターを抽出し、ScheduledPollEndpoint.configureProperties()
を呼び出してエンドポイントインスタンスに保存します。 -
createConsumer()
が呼び出されると、メソッド実装はconfigureConsumer()
を呼び出し、コンシューマーパラメーターを注入します (例41.1「FileEndpoint createConsumer () 実装」 を参照)。 -
configureConsumer()
メソッドは Java のリフレクションを使用して、consumer.
接頭辞を削除にした後に関連するオプションと一致する名前を持つ setter メソッドを呼び出します。
スケジュールされたポーリングパラメーター
スケジュールされたポーリングパターンに続くコンシューマーは、表41.1「スケジュールされたポーリングパラメーター」 に記載されているコンシューマーパラメーターを自動的にサポートします(エンドポイント URI でクエリーパラメーターとして表示される可能性があります)。
名前 | デフォルト | 説明 |
---|---|---|
|
| 最初のポーリングの前の遅延 (ミリ秒単位)。 |
|
|
|
|
|
|
イベント駆動型のコンシューマーとポーリングコンシューマー間の変換
Apache Camel は、イベント駆動型のコンシューマーとポーリングコンシューマー間の変換に使用できる 2 つの特殊なコンシューマー実装を提供します。以下の変換クラスが提供されます。
-
org.apache.camel.impl.EventDrivenPollingConsumer
: イベント駆動型コンシューマーをポーリングコンシューマーインスタンスに変換します。 -
org.apache.camel.impl.DefaultScheduledPollConsumer
: ポーリングコンシューマーをイベント駆動型コンシューマーインスタンスに変換します。
実際には、これらのクラスを使用して Endpoint タイプの実装タスクを単純化します。Endpoint インターフェースは、コンシューマーインスタンスを作成するための以下の 2 つのメソッドを定義します。
package org.apache.camel; public interface Endpoint { ... Consumer createConsumer(Processor processor) throws Exception; PollingConsumer createPollingConsumer() throws Exception; }
createConsumer()
はイベント駆動型のコンシューマーを返し、createPollingConsumer()
はポーリングコンシューマーを返します。これらのメソッドは 1 つのみ実装します。たとえば、コンシューマーのイベント駆動型パターンに従っている場合、create Consumer()
メソッドを実装し、単に例外を発生させる createPollingConsumer()
のメソッド実装を提供します。ただし、変換クラスを利用して Apache Camel はより有用なデフォルト実装を提供できます。
たとえば、イベント駆動のパターンに従ってコンシューマーを実装する場合は、DefaultEndpoint
を拡張し、createConsumer()
メソッドを実装してエンドポイントを実装します。createPollingConsumer()
の実装は、以下のように定義される DefaultEndpoint
から継承されます。
public PollingConsumer<E> createPollingConsumer() throws Exception { return new EventDrivenPollingConsumer<E>(this); }
EventDrivenPollingConsumer
コンストラクターはイベント駆動のコンシューマーへの参照 this
を取得し、効果的にラップし、ポーリングコンシューマーに変換します。変換を実装するには、EventDrivenPollingConsumer
インスタンスは受信イベントをバッファーし、receive()
、receive(long timeout)
、および receiveNoWait()
メソッドを介してオンデマンドで利用できるようにします。
同様に、ポーリングパターンに従ってコンシューマーを実装する場合は、DefaultPollingEndpoint
を拡張し、createPollingConsumer()
メソッドを実装してエンドポイントを実装します。この場合、createConsumer()
メソッドの実装は DefaultPollingEndpoint
から継承され、デフォルトの実装は DefaultScheduledPollConsumer
インスタンス (ポーリングコンシューマーをイベント駆動型のコンシューマーに変換) を返します。
ShutdownPrepared インターフェース
コンシューマークラスはオプションで org.apache.camel.spi.ShutdownPrepared
インターフェースを実装できます。これにより、カスタムコンシューマーエンドポイントがシャットダウン通知を受け取ることができます。
例41.2「ShutdownPrepared インターフェース」 は、ShutdownPrepared
インターフェースの定義を示しています。
例41.2 ShutdownPrepared インターフェース
package org.apache.camel.spi; public interface ShutdownPrepared { void prepareShutdown(boolean forced); }
ShutdownPrepared
インターフェースは以下のメソッドを定義します。
prepareShutdown
以下のように、1 または 2 フェーズでコンシューマーエンドポイントをシャットダウンするための通知を受信します。
-
正常なシャットダウン:
forced
引数にfalse
の値がある場合。リソースを正常にクリーンアップしようとします。たとえば、スレッドを正常に停止することによりクリーンアップします。 -
Forced shutdown-
forced
引数には、値true
があります。これは、シャットダウンがタイムアウトしたことを意味するため、リソースをより積極的にクリーンアップする必要があります。これは、プロセスが終了する前にリソースをクリーンアップする最後の契機となります。
-
正常なシャットダウン:
ShutdownAware インターフェース
コンシューマークラスはオプションで org.apache.camel.spi.ShutdownAware
インターフェースを実装できます。このインターフェースは、正常なシャットダウンメカニズムと対話し、コンシューマーがシャットダウンするための追加の時間を要求できるようにします。これは通常、内部キューに保留中のエクスチェンジを保存できる SEDA などのコンポーネントに必要です。通常、SEDA コンシューマーをシャットダウンする前にキューのすべてのエクスチェンジを処理します。
例41.3「ShutdownAware インターフェース」 は、ShutdownAware
インターフェースの定義を示しています。
例41.3 ShutdownAware インターフェース
// Java package org.apache.camel.spi; import org.apache.camel.ShutdownRunningTask; public interface ShutdownAware extends ShutdownPrepared { boolean deferShutdown(ShutdownRunningTask shutdownRunningTask); int getPendingExchangesSize(); }
ShutdownAware
インターフェースは以下のメソッドを定義します。
deferShutdown
コンシューマーのシャットダウンを遅延させる場合は、このメソッドから
true
を返します。shutdownRunningTask
引数はenum
で、以下のいずれかの値を取ることができます。-
ShutdownRunningTask.CompleteCurrentTaskOnly
: コンシューマーのスレッドプールによって現在処理されているエクスチェンジの処理を終了しますが、それ以上のエクスチェンジの処理は試みません。 -
ShutdownRunningTask.CompleteAllTasks
- 保留中のエクスチェンジすべてを処理します。たとえば、SEDA コンポーネントの場合、コンシューマーは受信キューからすべてのエクスチェンジを処理します。
-
getPendingExchangesSize
- コンシューマーによって処理されるエクスチェンジの数を示します。値をゼロにすると、処理が完了し、コンシューマーをシャットダウンできます。
ShutdownAware
メソッドを定義する方法の例は、例41.7「カスタムスレッド実装」 を参照してください。