8.3. 受信者リスト
概要
図8.3「Recipient List パターン」 に示されている Recipient List は、各受信メッセージを複数の異なる宛先に送信するルーターの一種です。また、Recipient List は通常、実行時に受信者リスト (Recipient List) を演算する必要があります。
図8.3 Recipient List パターン
宛先が固定された Recipient List
最もシンプルな Recipient List は、宛先リストが固定され、事前に認識され、交換パターンは InOnly であるものです。この場合、宛先リストを to()
Java DSL コマンドへハードワイヤーすることができます。
ここで示す例は、宛先が固定された Recepient List のため、InOnly 交換パターン (Pipes and Filters パターン と似ています) で のみ 動作します。Out メッセージを使った交換パターンの Recipient List を作成する場合は、代わりに Multicast パターン を使用してください。
Java DSL の例
以下の例は、コンシューマーエンドポイント queue:a
から、InOnly エクスチェンジを固定された宛先リストにルーティングする方法を示しています。
from("seda:a").to("seda:b", "seda:c", "seda:d");
XML 設定の例
以下の例は、XML で同じルートを設定する方法を示しています。
<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <to uri="seda:b"/> <to uri="seda:c"/> <to uri="seda:d"/> </route> </camelContext>
実行時に演算された Recipient List
Recipient List パターンを使用するほとんどの場合で、宛先リストを実行時に演算する必要があります。これを行うには、recipientList()
プロセッサーを使用します。このプロセッサーは、唯一の引数として宛先リストを取ります。Apache Camel はリスト引数に型コンバーターを適用するため、ほとんどの標準的な Java リスト型 (例: コレクション、リスト、配列など) を使用できます。型コンバーターの詳細については 「組み込み型コンバーター」 を参照してください。
受信者は 同じ エクスチェンジインスタンスのコピーを受け取り、Apache Camel はそれらを順次実行します。
Java DSL の例
以下の例は、recipientListHeader
という名前のメッセージヘッダーから宛先リストを抽出する方法を示しています。ヘッダーの値は、コンマ区切りのエンドポイント URI のリストになります。
from("direct:a").recipientList(header("recipientListHeader").tokenize(","));
header の値がリスト型である場合、値を直接 recipientList()
の引数に使うことができます。以下に例を示します。
from("seda:a").recipientList(header("recipientListHeader"));
ただし、この例では、基礎となるコンポーネントがこの特定のヘッダーをどのように解析するかに完全に依存しています。コンポーネントがヘッダーを単純な文字列として解析する場合、この例は動作し ません。ヘッダーは、何らかの型の Java リストに変換する必要があります。
XML 設定の例
以下の例では、XML で前述のルートを定義する方法を示しています。ヘッダー値は、コンマ区切りのエンドポイント URI リストです。
<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <recipientList delimiter=","> <header>recipientListHeader</header> </recipientList> </route> </camelContext>
複数の受信者に並列で送信
Camel 2.2 で利用可能
Recipient List パターン は、parallelProcessing
をサポートしています。これは、Splitter パターン の機能に似ています。並列処理機能を使用して、複数の受信者に並列でエクスチェンジを送信します。以下に例を示します:
from("direct:a").recipientList(header("myHeader")).parallelProcessing();
Spring XML では、並列処理機能は recipientList
タグの属性として実装されています。以下に例を示します。
<route> <from uri="direct:a"/> <recipientList parallelProcessing="true"> <header>myHeader</header> </recipientList> </route>
例外時に停止
Camel 2.2 で利用可能
Recipient List は、stopOnException
機能をサポートします。これを使用すると、受信者が失敗した場合にそれ以降の受信者への送信を停止することができます。
from("direct:a").recipientList(header("myHeader")).stopOnException();
Spring XML では、Recipient List タグの属性です。
Spring XML では、例外時の停止機能は recipientList
タグの属性として実装されます。以下に例を示します。
<route> <from uri="direct:a"/> <recipientList stopOnException="true"> <header>myHeader</header> </recipientList> </route>
同じルートで parallelProcessing
と stopOnException
を組み合わせることができます。
無効なエンドポイントの無視
Camel 2.3 の時点で利用可能
Recipient List パターン は ignoreInvalidEndpoints
オプションをサポートします。これにより、Recipient List が無効なエンドポイントをスキップできます (Routing Slips パターン も、このオプションをサポートしています)。以下に例を示します。
from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();
Spring XML では、以下のように recipientList
タグに ignoreInvalidEndpoints
属性を設定することで、このオプションを有効にすることができます。
<route> <from uri="direct:a"/> <recipientList ignoreInvalidEndpoints="true"> <header>myHeader</header> </recipientList> </route>
myHeader
に direct:foo,xxx:bar
の 2 つのエンドポイントが含まれるケースについて考えてみましょう。最初のエンドポイントは有効であり、動作します。2 つ目は無効であるため、無視されます。無効なエンドポイントに遭遇するたびに、Apache Camel ログが INFO
レベルで記録されます。
カスタム AggregationStrategy の使用
Camel 2.2 で利用可能
Recipient List パターン でカスタム AggregationStrategy
を使用できます。これは、リスト内の受信者からのリプライを集計する場合に便利です。Apache Camel はデフォルトで UseLatestAggregationStrategy
集約ストラテジーを使用して、最後に受信したリプライのみを保持します。より高度な集約ストラテジーについては、AggregationStrategy
インターフェイスを独自に実装し定義できます。詳細については 「Aggregator」 を参照してください。たとえば、集約ストラテジー MyOwnAggregationStrategy
をリプライメッセージに適用するには、以下のように Java DSL ルートを定義します。
from("direct:a") .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy()) .to("direct:b");
Spring XML では、以下のようにカスタムの集約ストラテジーを recipientList
タグの属性として指定できます。
<route> <from uri="direct:a"/> <recipientList strategyRef="myStrategy"> <header>myHeader</header> </recipientList> <to uri="direct:b"/> </route> <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
カスタムスレッドプールの使用
Camel 2.2 で利用可能
これは、parallelProcessing
を使用する場合にのみ必要です。デフォルトでは、Camel は 10 個のスレッドを持つスレッドプールを使用します。今後スレッドプールの管理と設定内容をメンテナーンスする際に、変更される可能性がありますのでご注意ください (予定どおりであれば Camel 2.2) 。
カスタム集約ストラテジーを使用するのと同じように設定します。
メソッド呼び出しの Recipient List としての使用
受信者を生成するために、Bean を使用できます。以下に例を示します。
from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");
MessageRouter
Bean は以下のように定義されます。
public class MessageRouter { public String routeTo() { String queueName = "activemq:queue:test2"; return queueName; } }
Recipient List としての Bean
Bean を Recipient List として動作させるには、@RecipientList
アノテーションを Recipient List を返すメソッドに付与します。以下に例を示します。
public class MessageRouter { @RecipientList public String routeTo() { String queueList = "activemq:queue:test1,activemq:queue:test2"; return queueList; } }
この場合、ルートに recipientList
DSL コマンドを含め ない でください。以下のようにルートを定義します。
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");
タイムアウトの使用
Camel 2.5 で利用可能
parallelProcessing
を使用する場合は、合計 timeout
値をミリ秒単位で設定できます。Camel はタイムアウトに達するまでメッセージを並行して処理します。これにより、1 つのメッセージが遅い場合でも処理を継続できます。
以下の例では、recipientlist
ヘッダーの値が direct:a,direct:b,direct:c
であるため、メッセージは 3 人の受信者に送信されます。250 ミリ秒のタイムアウトがあるので、最後の 2 つのメッセージだけが、タイムフレーム内で完了することができます。そのため、集約すると BC
という文字列の結果が得られます。
from("direct:start") .recipientList(header("recipients"), ",") .aggregationStrategy(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250) // use end to indicate end of recipientList clause .end() .to("mock:result"); from("direct:a").delay(500).to("mock:A").setBody(constant("A")); from("direct:b").to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C"));
この timeout
機能は splitter
のほか、multicast
および recipientList
の両方でもサポートされています。
デフォルトでは、タイムアウトが発生した場合、AggregationStrategy
は呼び出されません。ただし、特殊なバージョンを実装することができます。
// Java public interface TimeoutAwareAggregationStrategy extends AggregationStrategy { /** * A timeout occurred * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) * @param index the index * @param total the total * @param timeout the timeout value in millis */ void timeout(Exchange oldExchange, int index, int total, long timeout);
これにより、必要であれば、AggregationStrategy
でタイムアウトに対応することができます。
タイムアウトは合計です。つまり、X 時間後に Camel は期限内に完了したメッセージのみを集約します。残りの分はキャンセルされます。また Camel は、タイムアウトの原因となった最初のインデックスに対して、TimeoutAwareAggregationStrategy
の timeout
メソッドを 1 度だけ呼び出します。
送信メッセージへのカスタム処理の適用
recipientList
がメッセージを Recipient List のいずれかに送信する前に、元のメッセージのシャローコピーであるメッセージレプリカを作成します。シャローコピーでは、元のメッセージのヘッダーおよびペイロードは参照によってのみコピーされます。各新規コピーには、それらの要素独自のインスタンスが含まれていません。その結果、メッセージのシャローコピーがリンクされ、異なるエンドポイントにルーティングする際にカスタム処理を適用することはできません。
レプリカがエンドポイントに送信される前に、各メッセージレプリカに対して何らかのカスタム処理を行う場合は、recipientList
句で onPrepare
DSL コマンドを呼び出すことができます。この onPrepare
コマンドは、メッセージがシャローコピーされた 直後、かつメッセージがエンドポイントにディスパッチされる 直前 に、カスタムプロセッサーを挿入します。たとえば、以下のルートでは、CustomProc
プロセッサーが 各 recipient エンドポイント 用のメッセージレプリカで呼び出されます。
from("direct:start") .recipientList().onPrepare(new CustomProc());
onPrepare
DSL コマンドの一般的なユースケースとして、メッセージの一部またはすべての要素のディープコピーを実行します。これにより、各メッセージのレプリカは他のレプリカとは独立して変更することができます。たとえば、以下の CustomProc
プロセッサークラスは、メッセージボディーのディープコピーを実行します。メッセージボディーの型は BodyType
であると想定され、ディープコピーはメソッド BodyType.deepCopy()
によって実行されます。
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
オプション
recipientList
DSL コマンドは以下のオプションをサポートします。
名前 | デフォルト値 | 説明 |
|
| 式が複数のエンドポイントを返した場合に使用される区切り文字。 |
| AggregationStrategy を参照し、各受信者からのリプライを集約して 「受信者リスト」 からの唯一となる送信メッセージを生成します。デフォルトでは、Camel は最後のリプライを送信メッセージとして使用します。 | |
|
POJO を | |
|
|
POJO を |
|
| Camel 2.2: 有効にすると、受信者へのメッセージ送信が並列処理されます。呼び出し元スレッドは、すべてのメッセージが完全に処理されるまで待機してから続行することに注意してください。受信者への送信と、受信者からのリプライ処理のみが並列で処理されます。 |
|
|
有効にすると、 |
| Camel 2.2: 並列処理に使用するカスタムスレッドプールを参照します。このオプションを設定すると、並列処理は自動的に適用されるため、 並列処理用オプションも有効にする必要はありません。 | |
|
| Camel 2.2: 例外発生時、すぐに継続処理を停止するかどうか。無効にすると、いずれかの失敗の有無に関わらず、Camel はメッセージをすべての受信者に送信します。AggregationStragegy クラス内で、例外処理を完全に制御することができます。 |
|
| Camel 2.3: エンドポイント URI が解決できない場合、無視されます。false の場合は、Camel はエンドポイント URI が有効ではないことを示す例外を出力します。 |
|
| Camel 2.5: 有効な場合、Camel は返信を順不同で処理します (例: 戻って来た順) 。無効な場合、Camel は指定された式と同じ順序で返信を処理します。 |
|
camel 2.5: 合計タイムアウト値をミリ秒単位で設定します。「受信者リスト」 が指定された時間枠内のすべての応答を送信および処理できない場合、タイムアウトが発生し、「受信者リスト」 から抜け出し続行します。AggregationStrategy を提供した場合、 | |
| camel 2.8: カスタムプロセッサーを参照して、各受信者が受け取るエクスチェンジのコピーを準備します。これにより、必要に応じてメッセージのペイロードをディープクローンするなど、カスタムロジックを実行できます。 | |
|
| Camel 2.8: Unit of Work を共有すべきかどうか。詳細は、「Splitter」 で同じオプションを参照してください。 |
|
| Camel 2.13.1/2.12.4: Routing Slip で再使用されるプロデューサーをキャッシュする ProducerCache のキャッシュサイズを設定できます。デフォルトのキャッシュサイズ 0 を使用します。値を -1 に設定すると、キャッシュをすべて無効にすることができます。 |
Recipient List で交換パターンを使用
デフォルトでは、Recipient List は既存の交換パターンを使用します。稀ではありますが、別の交換パターンを使用して受信者にメッセージを送信するケースがある可能性があります。
たとえば、InOnly
ルートとして開始するルートがあるとします。Recipient List で InOut
交換パターンを使用する場合、受信者用エンドポイントで直接交換パターンを設定する必要があります。
以下の例は、新規ファイルが InOnly として開始され、Recipient List へルーティングされるルートを示しています。ActiveMQ(JMS) エンドポイントで InOut を使用する場合、exchangePattern=InOut オプションを指定する必要があります。ただし、JMS リクエストやリプライをルーティングし続けるため、レスポンスは outbox ディレクトリー内にファイルとして保存されます。
from("file:inbox") // the exchange pattern is InOnly initially when using a file route .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut") .to("file:outbox");
InOut
交換パターンは、タイムアウト時にレスポンスを受け取る必要があります。ただし、レスポンスを受信できない場合は失敗します。