8.15. Scatter-Gather
Scatter-Gather
図8.11「Scatter-Gather パターン」 にあるように、scatter-gather パターン を使用すると、メッセージを動的に指定された複数の受信者にルーティングし、応答を 1 つのメッセージに再集約できます。
図8.11 Scatter-Gather パターン
動的なスキャッター/ギャザーの例
以下の例は、複数の異なるベンダーから最も良いビールの見積もりを取得するアプリケーションの概要を説明しています。この例では、動的な 「受信者リスト」 を使用してすべてのベンダーおよび 「Aggregator」 から引用を要求し、すべての応答の中から最適な見積もりを選出します。このアプリケーションのルートは、以下のように定義されます。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <recipientList> <header>listOfVendors</header> </recipientList> </route> <route> <from uri="seda:quoteAggregator"/> <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000"> <correlationExpression> <header>quoteRequestId</header> </correlationExpression> <to uri="mock:result"/> </aggregate> </route> </camelContext>
最初のルートでは、「受信者リスト」 は listOfVendors
ヘッダーを確認して受信者リストを取得します。したがって、このアプリケーションにメッセージを送信するクライアントは listOfVendors
ヘッダーをメッセージに追加する必要があります。例8.1「メッセージングクライアントの例」 は、関連するヘッダーデータを送信メッセージに追加するメッセージングクライアントからのサンプルコードを示しています。
例8.1 メッセージングクライアントの例
Map<String, Object> headers = new HashMap<String, Object>(); headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3"); headers.put("quoteRequestId", "quoteRequest-1"); template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);
メッセージは bean:vendor1
、bean:vendor2
、および bean:vendor3
のエンドポイントに分散されます。これらの Bean はすべて以下のクラスによって実装されます。
public class MyVendor { private int beerPrice; @Produce(uri = "seda:quoteAggregator") private ProducerTemplate quoteAggregator; public MyVendor(int beerPrice) { this.beerPrice = beerPrice; } public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception { if ("beer".equals(item)) { exchange.getIn().setBody(beerPrice); quoteAggregator.send(exchange); } else { throw new Exception("No quote available for " + item); } } }
Bean インスタンス、vendor1
、vendor2
、および vendor3
は、以下のように Spring XML 構文を使用してインスタンス化されます。
<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/> <bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor"> <constructor-arg> <value>1</value> </constructor-arg> </bean> <bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor"> <constructor-arg> <value>2</value> </constructor-arg> </bean> <bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor"> <constructor-arg> <value>3</value> </constructor-arg> </bean>
各 Bean は、それぞれ異なるビール価格で初期化されます (コンストラクター引数に渡されます)。メッセージが各 Bean エンドポイントに送信されると、MyVendor.getQuote
メソッドに到達します。このメソッドは、この見積もり要求がビールに対してであるかどうかを確認する簡単なチェックを実行し、それから後のステップで取得できるようにエクスチェンジにビールの価格を設定します。メッセージは POJO 生成 を使用して次のステップに転送されます (@Produce アノテーションを参照)。
次のステップでは、すべてのベンダーからのビールの見積もりを受け取り、どのベンダーの見積もりが最良か (つまり最も低いか) を調べます。そのためには、カスタムの集約ストラテジーと共に 「Aggregator」 を使用します。「Aggregator」 は、どのメッセージが現在の見積もりに関連するものかを識別する必要があります。これは、quoteRequestId
ヘッダー (correlationExpression
に渡された) の値に基づいてメッセージを関連付けることによって行われます。例8.1「メッセージングクライアントの例」 にあるように、相関 ID は quoteRequest-1
に設定されています (相関 ID は一意である必要があります)。見積もりの集合の中から最も低いものを選別するには、以下のようなカスタム集約ストラテジーを使用します。
public class LowestQuoteAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { // the first time we only have the new exchange if (oldExchange == null) { return newExchange; } if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) { return oldExchange; } else { return newExchange; } } }
静的なスキャッター/ギャザーの例
静的 「受信者リスト」 を使用して、scatter-gather アプリケーションで受信者を明示的に指定できます。以下の例は、静的なスキャッター/ギャザーのシナリオを実装するために使用するルートを示しています。
from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3"); from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator"); from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator"); from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator"); from("seda:quoteAggregator") .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")