8.13. Multicast
概要
図8.9「Multicast パターン」 に記載されている Multicast パターンは、InOut メッセージ交換パターンと互換性のある、宛先パターンが固定された Recipient list のバリエーションです。これは Recipient list とは対照的に、InOnly 交換パターンとのみ互換性があります。
図8.9 Multicast パターン
カスタム集約ストラテジーを使用した Multicast
Multicast プロセッサーは、元のリクエストに対して複数の Out メッセージを受信しますが (各受信者から 1 つずつ)、呼び出し元は 1 つ のリプライを受け取るだけです。したがって、メッセージエクスチェンジのリプライの行程に固有のミスマッチがあり、この不一致を解消するためには、Multicast プロセッサーにカスタム 集約ストラテジー を提供する必要があります。集約ストラテジークラスは、すべての Out メッセージを単一のリプライメッセージに集約します。
出品者が複数の入札者に販売商品を提供する電子オークションサービスの例を考えてみましょう。各入札者は商品に対して入札し、出品者が自動的に最高額の入札を選択します。以下のように、multicast()
DSL コマンドを使用して、オファーを固定の入札者リストに配布するロジックを実装できます。
from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()). to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
seller はエンドポイント cxf:bean:offer
で表され、利用者はエンドポイント cxf:bean:Buyer1
、cxf:bean:Buyer2
、cxf:bean:Buyer3
によって表されます。様々な入札者からの入札を集約するために、Multicast プロセッサーは集約ストラテジー HighestBidAggregationStrategy
を使用します以下のように、Java に HighestBidAggregationStrategy
を実装することができます。
// Java import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.Exchange; public class HighestBidAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { float oldBid = oldExchange.getOut().getHeader("Bid", Float.class); float newBid = newExchange.getOut().getHeader("Bid", Float.class); return (newBid > oldBid) ? newExchange : oldExchange; } }
入札者は、Bid
という名前のヘッダーに入札価格を設定することが前提となります。カスタム集約ストラテジーの詳細は、「Aggregator」 を参照してください。
並列処理
デフォルトでは、Multicast プロセッサーは、受信者のエンドポイントを逐次呼び出します (to()
コマンドに記載されている順序で)。場合によっては、許容されないほどのレイテンシーが発生することがあります。このような待ち時間を回避するために、parallelProcessing()
句を追加して並行処理を有効にするオプションがあります。たとえば、電子オークションの例で並行処理を有効にするには、以下のようにルートを定義します。
from("cxf:bean:offer") .multicast(new HighestBidAggregationStrategy()) .parallelProcessing() .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
Multicast プロセッサーは、各エンドポイントに 1 つのスレッドを割り当てるスレッドプールを使用して、入札者のエンドポイントを呼び出すようになりました。
入札者のエンドポイントを呼び出すスレッドプールのサイズをカスタマイズする場合は、executorService()
メソッドを呼び出して独自のカスタム executor service を指定することができます。以下に例を示します。
from("cxf:bean:offer")
.multicast(new HighestBidAggregationStrategy())
.executorService(MyExecutor)
.to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
MyExecutor は、java.util.concurrent.ExecutorService 型のインスタンスです。
エクスチェンジに InOUt パターンがある場合、リプライメッセージを集約するために集約ストラテジーが使用されます。デフォルトの集約ストラテジーは、最新のリプライメッセージを取り、それ以前のリプライメッセージを破棄します。たとえば、以下のルートでは、カスタムストラテジー MyAggregationStrategy
を使用してエンドポイント direct:a
、direct:b
、direct:c
からのリプライを集約します。
from("direct:start") .multicast(new MyAggregationStrategy()) .parallelProcessing() .timeout(500) .to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");
XML 設定の例
以下の例は、XML で同様のルートを設定する方法を示しています。ルートは、カスタム集約ストラテジーとカスタム thread executor を使用しています。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="cxf:bean:offer"/> <multicast strategyRef="highestBidAggregationStrategy" parallelProcessing="true" threadPoolRef="myThreadExcutor"> <to uri="cxf:bean:Buyer1"/> <to uri="cxf:bean:Buyer2"/> <to uri="cxf:bean:Buyer3"/> </multicast> </route> </camelContext> <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/> <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/> </beans>
parallelProcessing
属性と threadPoolRef
属性は任意です。Multicast プロセッサーのスレッド動作をカスタマイズする場合にのみ設定する必要があります。
送信メッセージへのカスタム処理の適用
Multicast パターン は、ソース Exchange をコピーして、そのコピーをマルチキャストします。デフォルトでは、ルーターはソースメッセージのシャローコピーを作成します。シャローコピーでは、元のメッセージのヘッダーとペイロードは参照によってのみコピーされます。つまり、元のメッセージのコピーはそれらへリンクされます。マルチキャストメッセージのシャローコピーはリンクされているため、メッセージボディーが変更可能な場合は、カスタム処理を適用することができません。あるエンドポイントに送信されたコピーに適用するカスタム処理は、他のすべてのエンドポイントに送信されたコピーにも適用されます。
multicast
構文では、multicast
句で process
DSL コマンドを呼び出すことはできますが論理的に意味をなさず、onPrepare
と同じ効果を持ちません (実際、このコンテキストでは process
DSL コマンドは何の影響も与えません)。
メッセージの準備時にカスタムロジックを実行するための onPrepare の使用
エンドポイントに送信する前に、カスタム処理を各メッセージレプリカに適用する場合は、multicast
句で onPrepare
DSL コマンドを呼び出すことができます。この onPrepare
コマンドは、メッセージがシャローコピーされた 直後、かつメッセージがエンドポイントにディスパッチされる 直前 に、カスタムプロセッサーを挿入します。たとえば、以下のルートでは、direct:a
に送信されたメッセージに対して CustomProc
プロセッサーが呼び出され、direct:b
に送信されたメッセージに対しても CustomProc
プロセッサーが呼び出されます。
from("direct:start") .multicast().onPrepare(new CustomProc()) .to("direct:a").to("direct:b");
onPrepare
DSL コマンドの一般的なユースケースとして、メッセージの一部またはすべての要素のディープコピーを実行します。たとえば、以下の CustomProc
プロセッサークラスは、メッセージボディーのディープコピーを実行します。メッセージボディーの型は BodyType
であると想定され、ディープコピーはメソッド BodyType.deepCopy()
によって実行されます。
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
onPrepare
を使い、Exchange
がマルチキャストされる前に実行するカスタムロジックを任意に実装することができます。
イミュータブルなオブジェクトを設計することが推奨されます。
たとえば、この Animal クラスのようにミュータブルなメッセージボディーがあるとします。
public class Animal implements Serializable { private int id; private String name; public Animal() { } public Animal(int id, String name) { this.id = id; this.name = name; } public Animal deepClone() { Animal clone = new Animal(); clone.setId(getId()); clone.setName(getName()); return clone; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return id + " " + name; } }
次に、メッセージボディーをクローンするディープクローンプロセッサーを作成します。
public class AnimalDeepClonePrepare implements Processor { public void process(Exchange exchange) throws Exception { Animal body = exchange.getIn().getBody(Animal.class); // do a deep clone of the body which wont affect when doing multicasting Animal clone = body.deepClone(); exchange.getIn().setBody(clone); } }
次に、以下のように onPrepare
オプションを使用して Multicast ルートで AnimalDeepClonePrepare クラスを使用します。
from("direct:start") .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");
XML DSL を使用した同じ例
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- use on prepare with multicast --> <multicast onPrepareRef="animalDeepClonePrepare"> <to uri="direct:a"/> <to uri="direct:b"/> </multicast> </route> <route> <from uri="direct:a"/> <process ref="processorA"/> <to uri="mock:a"/> </route> <route> <from uri="direct:b"/> <process ref="processorB"/> <to uri="mock:b"/> </route> </camelContext> <!-- the on prepare Processor which performs the deep cloning --> <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/> <!-- processors used for the last two routes, as part of unit test --> <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/> <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>
オプション
multicast
DSL コマンドは、以下のオプションをサポートします。
名前 | デフォルト値 | 説明 |
| AggregationStrategy の参照は、受信者からの複数のリプライを Multicast からの単一の送信メッセージへ集約するために使用されます。デフォルトでは、Camel は最後のリプライを送信メッセージとして使用します。 | |
|
POJO を | |
|
|
POJO を |
|
| 有効にすると、マルチキャストへのメッセージの送信が並列処理されます。呼び出し元スレッドは、すべてのメッセージが完全に処理されるまで待機してから続行することに注意してください。マルチキャストからの送信およびリプライ処理のみが並列に処理されます。 |
|
|
有効にすると、 |
| 並列処理に使用するカスタムスレッドプールを参照します。このオプションを設定すると、並列処理は自動的に適用されるため、並列処理用オプションも有効にする必要はありません。 | |
|
| Camel 2.2: 例外発生時、すぐに継続処理を停止するかどうか。無効にすると、いずれかの失敗の有無に関わらず、Camel はメッセージをすべてのマルチキャストに送信します。AggregationStrategy クラス内で、例外処理を完全に制御することができます。 |
|
| 有効な場合、Camel はリプライを順不同で処理します。たとえば、返信順に処理します。無効な場合、Camel はマルチキャストと同じ順序でリプライを処理します。 |
|
Camel 2.5: 合計タイムアウト値をミリ秒単位で設定します。Multicast が指定の時間枠内のすべての応答を送信および処理できない場合、タイムアウトが発生し、Multicast から抜け出し続行します。TimeoutAwareAggregationStrategy を提供する場合、 | |
| camel 2.8: カスタムプロセッサーを参照して、各マルチキャストが受信する Exchange のコピーを準備します。これにより、必要に応じてメッセージのペイロードをディープクローンするなど、カスタムロジックを実行できます。 | |
|
| Camel 2.8: Unit of Work を共有すべきかどうか。詳細は、「Splitter」 で同じオプションを参照してください。 |