第38章 コンポーネントの実装
概要
本章では、Apache Camel コンポーネントを実装するための概要を示します。
38.1. コンポーネントのアーキテクチャー
38.1.1. コンポーネントのファクトリーパターン
概要
Apache Camel コンポーネントは、ファクトリーパターンを介して相互に関連するクラスセットで構成されます。主なエントリーポイントは Component
コンポーネントオブジェクト自体です (org.apache.camel.Component
タイプのインスタンス)。Coponnet
オブジェクトは、Endpoint
オブジェクトを作成するためのファクトリーとして使用することができます。エンドポイントオブジェクトは、Consumer
、Producer
、および Exchange
オブジェクトを作成するためのファクトリーとして機能します。これらの関係は で要約されています。 図38.1「コンポーネントファクトリーパターン」
図38.1 コンポーネントファクトリーパターン
コンポーネント
コンポーネント実装はエンドポイントファクトリーです。コンポーネント実装の主なタスクは、Component.createEndpoint()
メソッドを実装することです。このメソッドは、オンデマンドで新しいエンドポイントを作成します。
各種類のコンポーネントは、エンドポイント URI に表示される コンポーネント接頭辞 に関連付ける必要があります。たとえば、ファイルコンポーネントは、通常 file://tmp/messages/input などのエンドポイント URI で使用できる file 接頭辞に関連付けられます。Apache Camel に新しいコンポーネントをインストールする場合、特定のコンポーネント接頭辞とコンポーネントを実装するクラス名の関連付けを定義する必要があります。
エンドポイント
各エンドポイントインスタンスは特定のエンドポイント URI をカプセル化します。Apache Camel が新しいエンドポイント URI に遭遇するたびに、新しいエンドポイントインスタンスが作成されます。エンドポイントオブジェクトは、コンシューマーエンドポイントおよびプロデューサーエンドポイントを作成するためのファクトリーでもあります。
エンドポイントは org.apache.camel.Endpoint インターフェースを実装する必要があります。Endpoint インターフェースは、以下のファクトリーメソッドを定義します。
-
createConsumer()
およびcreatePollingConsumer()
コンシューマーエンドポイントを作成します。コンシューマーエンドポイントは、ルートの最初のソースエンドポイントを表します。 -
createProducer()
: ルートの最後にターゲットエンドポイントを表すプロデューサーエンドポイントを作成します。 -
createExchange()
: エクスチェンジオブジェクトを作成します。これにより、ルート上のメッセージをカプセル化します。
コンシューマー
コンシューマーエンドポイントはリクエストを 消費 します。これらはルートの先頭に現れ、受信したリクエストおよび応答のディスパッチを行うコードをカプセル化します。サービス指向の考え方から、コンシューマーが サービス を表します。
コンシューマーは org.apache.camel.Consumer インターフェースを実装する必要があります。コンシューマーの実装時には、フォローできるさまざまなパターンがあります。これらのパターンは、「コンシューマーパターンおよびスレッド」 で説明されています。
プロデューサー
プロデューサーエンドポイントはリクエストを 生成 します。これらはルートの最後に常に表示され、リクエスト送信のディスパッチおよび応答を受信するコードをカプセル化します。サービス指向の考え方から、プロデューサーはサービスコンシューマーを表します。
プロデューサーは、org.apache.camel.Producer
インターフェースを実装する必要があります。オプションでプロデューサーを実装し、非同期処理をサポートすることができます。詳細は、「非同期処理」 を参照してください。
エクスチェンジ
エクスチェンジオブジェクトは関連するメッセージセットをカプセル化します。たとえば、メッセージエクスチェンジの 1 つは、要求メッセージとその関連する応答で構成される同期呼び出しです。
エクスチェンジは org.apache.camel.Exchange インターフェースを実装する必要があります。デフォルトの実装である DefaultExchange
は、多くのコンポーネントの実装には十分です。ただし、エクスチェンジと追加のデータを関連付ける場合や、エクスチェンジに追加の処理を行う場合、エクスチェンジの実装をカスタマイズすると便利です。
メッセージ
Exchange
オブジェクトには、2 つの異なるメッセージスロットがあります。
- in メッセージ: 現在のメッセージを保持します。
- out メッセージ: リプライメッセージを一時的に保持します。
すべてのメッセージタイプは同じ Java オブジェクト org.apache.camel.Message
によって表されます。デフォルト実装のメッセージ DefaultMessage
は通常、カスタマイズする必要はありません。
38.1.2. ルートでのコンポーネントの使用
概要
Apache Camel ルートは基本的に org.apache.camel.Processor タイプのプロセッサーのパイプラインです。メッセージは、process()
メソッドを呼び出してノードからノードに渡されるエクスチェンジオブジェクト process()
でカプセル化されます。プロセッサーパイプラインのアーキテクチャーは、図38.2「ルートのコンシューマーおよびプロデューサーインスタンス」 で説明されています。
図38.2 ルートのコンシューマーおよびプロデューサーインスタンス
ソースエンドポイント
ルートの最初には、org.apache.camel.Consumer
オブジェクトで表現されるソースエンドポイントがあります。ソースエンドポイントは、受信したリクエストメッセージを受け入れ、応答をディスパッチします。ルートを構築する際、Apache Camel は、「コンポーネントのファクトリーパターン」 で説明されているように、エンドポイント URI のコンポーネントプレフィックスに基づいて、適切な Consumer
タイプを作成します。
プロセッサー
パイプラインの各中間ノードは、プロセッサーオブジェクトによって表されます (org.apache.camel.Processor インターフェースの実装) 。標準のプロセッサー (たとえば filter
、throttler
、または delayer
) を挿入したり、独自のカスタムプロセッサー実装を挿入したりできます。
ターゲットエンドポイント
ルートの最後には、org.apache.camel.Producer
オブジェクトで表現されるターゲットエンドポイントがあります。プロセッサーパイプラインの最後にあるため、プロデューサーはプロセッサーオブジェクトでもあります (org.apache.camel.Processor インターフェースを実装します)。ターゲットエンドポイントは、リクエストメッセージを送信し、応答を受信します。ルートを作成するとき、Apache Camel はエンドポイント URI からコンポーネント接頭辞に基づいて適切な Producer
タイプを作成します。
38.1.3. コンシューマーパターンおよびスレッド
概要
コンシューマーの実装に使用されるパターンは、受信エクスチェンジの処理に使用されるスレッドモデルを決定します。コンシューマーは、以下のパターンのいずれかを使用して実装できます。
- イベント駆動型のパターン: コンシューマーは外部のスレッドによって実行されます。
- スケジュールされたポーリングパターン: コンシューマーは専用のスレッドプールによって実行されます。
- ポーリングパターン: スレッドモデルは未定義のままです。
イベント駆動型のパターン
イベント駆動型のパターンでは、アプリケーションの別の部分 (通常はサードパーティーライブラリー) がコンシューマーによって実装されたメソッドを呼び出すと、受信したリクエストの処理が開始されます。イベント駆動型のコンシューマーの適切な例として、イベントが JMX ライブラリーによって開始される Apache Camel JMX コンポーネントがあります。JMX ライブラリーは、handleNotification()
メソッドを呼び出してリクエスト処理を開始します。詳細は 例41.4「JMXConsumer 実装」 を参照してください。
図38.3「イベント駆動型コンシューマー」 は、イベント駆動型のコンシューマーパターンの概要を示しています。この例では、notify()
メソッドへの呼び出しによって処理がトリガーされることを前提としています。
図38.3 イベント駆動型コンシューマー
イベント駆動型のコンシューマーは、以下のように受信リクエストを処理します。
コンシューマーは受信イベントを受信するメソッドを実装する必要があります (図38.3「イベント駆動型コンシューマー」 では
notify()
メソッドで表されます)。通常、notify()
を呼び出すスレッドはアプリケーションの別の部分であるため、コンシューマーのスレッドポリシーは外部で実行されます。たとえば、JMX コンシューマーの実装では、コンシューマーは JMX から通知を受け取る
NotificationListener.handleNotification()
メソッドを実装します。コンシューマー処理を駆動するスレッドは JMX レイヤー内に作成されます。-
notify()
メソッドの本文では、コンシューマーが最初に受信イベントをエクスチェンジオブジェクトE
に変換し、ルートの次のプロセッサーをprocess()
で呼び出しを行い、エクスチェンジオブジェクトを引数として渡します。
スケジュールされたポーリングパターン
スケジュールされたポーリングパターンでは、リクエストが到達したかどうかを定期的にチェックして、コンシューマーは受信リクエストを取得します。リクエストの確認は、java.util.concurrent ライブラリーによって提供される標準パターンで スケジュールされたエグゼキューターサービス である組み込みタイマークラスによって自動的にスケジュールされます。スケジュールされたエグゼキューターサービスは、特定のタスクを時間間隔で実行し、タスクインスタンスの実行に使用されるスレッドのプールも管理します。
図38.4「スケジュールされたポーリングコンシューマー」 は、スケジュールされたポーリングコンシューマーパターンの概要を示しています。
図38.4 スケジュールされたポーリングコンシューマー
スケジュールされたポーリングコンシューマーは、以下のようにリクエストを処理します。
-
スケジュールされたエグゼキューターサービスには、利用できるスレッドプールがあり、コンシューマーの処理を開始できます。スケジュール設定した時間間隔が経過すると、スケジュール済みエグゼキューターサービスはプールから空きスレッドの取得を試みます (デフォルトではプールには 5 つのスレッドがあります)。空きスレッドが利用可能な場合、そのスレッドを使用してコンシューマーで
poll()
メソッドを呼び出します。 -
コンシューマーの
poll()
メソッドは、受信したリクエストの処理をトリガーすることを目的としています。poll()
メソッドの本文では、コンシューマーは受信メッセージの取得を試行します。リクエストがない場合、poll()
メソッドを即座に返します。 -
要求メッセージが利用可能な場合、コンシューマーはこれをエクスチェンジオブジェクトに挿入し、ルート内の次のプロセッサーで
process()
の呼び出しを行い、エクスチェンジオブジェクトを引数として渡します。
ポーリングパターン
ポーリングパターンでは、サードパーティーがコンシューマーのポーリングメソッドの 1 つを呼び出すと、受信したリクエストの処理が開始されます。
-
receive()
-
receiveNoWait()
-
receive(long timeout)
ポーリングメソッドの呼び出しを開始するための正確なメカニズムを定義することは、コンポーネントの実装の責務です。このメカニズムはポーリングパターンで指定されません。
図38.5「Polling Consumer」 は、ポーリングコンシューマーパターンの概要を示しています。
図38.5 Polling Consumer
ポーリングを行うコンシューマーは、以下のようにリクエストを処理します。
- 受信リクエストの処理は、コンシューマーのポーリングメソッドの 1 つが呼び出されるたびに開始されます。これらのポーリングメソッドを呼び出すメカニズムは実装で定義されます。
receive()
メソッドのボディーでは、コンシューマーは受信したリクエストメッセージの取得を試みます。現在利用できるメッセージがない場合、動作は、呼び出された受信メソッドによって異なります。-
receiveNoWait()
が即座に返されます。 -
receive(long timeout)
は、指定されたタイムアウト間隔で待機します。[2] 返す前 -
receive()
は、メッセージが受信されるまで待機します。
-
-
要求メッセージが利用可能な場合、コンシューマーはこれをエクスチェンジオブジェクトに挿入し、ルート内の次のプロセッサーで
process()
の呼び出しを行い、エクスチェンジオブジェクトを引数として渡します。
38.1.4. 非同期処理
概要
プロデューサーエンドポイントは通常、エクスチェンジの処理時に 同期 パターンに従います。上記のプロセッサーがパイプラインでプロデューサーの process()
を呼び出すと、応答を受け取るまで process()
メソッドはブロックされます。この場合、プロセッサーのスレッドは、リクエストの送信および応答の受信サイクルが完了するまでブロックされます。
ただし、プロセッサーのスレッドがすぐにリリースされ、process()
呼び出しがブロックされ ない ようにするため、前のプロセッサーをプロデューサーから切り離す方が望ましい場合があります。この場合、asynchronous パターンを使用してプロデューサーを実装する必要があります。これにより、前のプロセッサーでは、非ブロッキングバージョンの process()
メソッドを呼び出すオプションが提供されます。
異なる実装オプションの概要を示すために、本セクションでは、プロデューサーエンドポイントを実装する同期パターンと非同期パターンの両方を説明します。
同期プロデューサー
図38.6「同期プロデューサー」 は、プロデューサーがエクスチェンジの処理が終了するまで前述のプロセッサーがブロックする同期プロデューサーの概要を示しています。
図38.6 同期プロデューサー
同期プロデューサーは以下のようにエクスチェンジを処理します。
-
パイプラインの前のプロセッサーは、プロデューサー上の同期
process()
メソッドを呼び出して、同期処理を開始します。同期process()
メソッドは単一のエクスチェンジ引数を取ります。 -
process()
メソッドのボディー部で、プロデューサーはリクエスト (In メッセージ) をエンドポイントに送信します。 -
交換パターンで必要な場合、プロデューサーは応答 (Out メッセージ) がエンドポイントから到達するまで待機します。このステップにより、
process()
メソッドが無限にブロックされる可能性があります。ただし、交換パターンが応答を強制しない場合、process()
メソッドはリクエストの送信直後に返すことができます。 -
process()
メソッドが返されると、エクスチェンジオブジェクトには同期呼び出しからの応答が含まれます (Out メッセージメッセージ)。
非同期プロデューサー
図38.7「非同期プロデューサー」 は、プロデューサーがサブスレッドでエクスチェンジを処理し、前のプロセッサーが長時間ブロックされない非同期プロデューサーの概要を示しています。
図38.7 非同期プロデューサー
非同期プロデューサーは以下のようにエクスチェンジを処理します。
-
プロセッサーが非同期
process()
メソッドを呼び出す前に、非同期コールバック オブジェクトを作成する必要があります。これはルートの戻り部分でエクスチェンジを処理する役割を果たします。非同期コールバックでは、プロセッサーは AsyncCallback インターフェースから継承されるクラスを実装する必要があります。 プロセッサーは、プロデューサー上の非同期
process()
メソッドを呼び出して非同期処理を開始します。非同期process()
メソッドは、2 つの引数を取ります。- エクスチェンジオブジェクト
- 同期コールバックオブジェクト
-
process()
メソッドのボディー部で、プロデューサーは処理コードをカプセル化するRunnable
オブジェクトを作成します。その後、プロデューサーはこのRunnable
オブジェクトの実行をサブスレッドに委任します。 -
非同期
process()
メソッドが返されるため、プロセッサーのスレッドが解放されますエクスチェンジの処理は個別のサブスレッドで続行されます。 -
Runnable
オブジェクトは In メッセージをエンドポイントに送信します。 -
交換パターンで必要な場合、
Runnable
オブジェクトはエンドポイントから応答 (Out または Fault メッセージ) が到達するのを待機します。Runnable
オブジェクトは応答を受け取るまでブロックされます。 -
応答が到達すると、
Runnable
オブジェクトは応答 (Out メッセージ) をエクスチェンジオブジェクトに挿入し、非同期コールバックオブジェクトのdone()
を呼び出します。次に、非同期コールバックはリプライメッセージを処理します (サブスレッドで実行されます)。