8.15. Scatter-Gather


Scatter-Gather

図8.11「Scatter-Gather パターン」 にあるように、scatter-gather パターン を使用すると、メッセージを動的に指定された複数の受信者にルーティングし、応答を 1 つのメッセージに再集約できます。

図8.11 Scatter-Gather パターン

ブロードキャストアグリゲート

動的なスキャッター/ギャザーの例

以下の例は、複数の異なるベンダーから最も良いビールの見積もりを取得するアプリケーションの概要を説明しています。この例では、動的な 「受信者リスト」 を使用してすべてのベンダーおよび 「Aggregator」 から引用を要求し、すべての応答の中から最適な見積もりを選出します。このアプリケーションのルートは、以下のように定義されます。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <recipientList>
      <header>listOfVendors</header>
    </recipientList>
  </route>
  <route>
    <from uri="seda:quoteAggregator"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
      <correlationExpression>
        <header>quoteRequestId</header>
      </correlationExpression>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>

最初のルートでは、「受信者リスト」listOfVendors ヘッダーを確認して受信者リストを取得します。したがって、このアプリケーションにメッセージを送信するクライアントは listOfVendors ヘッダーをメッセージに追加する必要があります。例8.1「メッセージングクライアントの例」 は、関連するヘッダーデータを送信メッセージに追加するメッセージングクライアントからのサンプルコードを示しています。

例8.1 メッセージングクライアントの例

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3");
headers.put("quoteRequestId", "quoteRequest-1");
template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);

メッセージは bean:vendor1bean:vendor2、および bean:vendor3 のエンドポイントに分散されます。これらの Bean はすべて以下のクラスによって実装されます。

public class MyVendor {
    private int beerPrice;

    @Produce(uri = "seda:quoteAggregator")
    private ProducerTemplate quoteAggregator;

    public MyVendor(int beerPrice) {
        this.beerPrice = beerPrice;
    }

    public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception {
        if ("beer".equals(item)) {
            exchange.getIn().setBody(beerPrice);
            quoteAggregator.send(exchange);
        } else {
            throw new Exception("No quote available for " + item);
        }
    }
}

Bean インスタンス、vendor1vendor2、および vendor3 は、以下のように Spring XML 構文を使用してインスタンス化されます。

<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/>

<bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>1</value>
  </constructor-arg>
</bean>

<bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>2</value>
  </constructor-arg>
</bean>

<bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>3</value>
  </constructor-arg>
</bean>

各 Bean は、それぞれ異なるビール価格で初期化されます (コンストラクター引数に渡されます)。メッセージが各 Bean エンドポイントに送信されると、MyVendor.getQuote メソッドに到達します。このメソッドは、この見積もり要求がビールに対してであるかどうかを確認する簡単なチェックを実行し、それから後のステップで取得できるようにエクスチェンジにビールの価格を設定します。メッセージは POJO 生成 を使用して次のステップに転送されます (@Produce アノテーションを参照)。

次のステップでは、すべてのベンダーからのビールの見積もりを受け取り、どのベンダーの見積もりが最良か (つまり最も低いか) を調べます。そのためには、カスタムの集約ストラテジーと共に 「Aggregator」 を使用します。「Aggregator」 は、どのメッセージが現在の見積もりに関連するものかを識別する必要があります。これは、quoteRequestId ヘッダー (correlationExpression に渡された) の値に基づいてメッセージを関連付けることによって行われます。例8.1「メッセージングクライアントの例」 にあるように、相関 ID は quoteRequest-1 に設定されています (相関 ID は一意である必要があります)。見積もりの集合の中から最も低いものを選別するには、以下のようなカスタム集約ストラテジーを使用します。

public class LowestQuoteAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;
        }

        if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
            return oldExchange;
        } else {
            return newExchange;
        }
    }
}

静的なスキャッター/ギャザーの例

静的 「受信者リスト」 を使用して、scatter-gather アプリケーションで受信者を明示的に指定できます。以下の例は、静的なスキャッター/ギャザーのシナリオを実装するために使用するルートを示しています。

from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3");

from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator");
from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator");
from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator");

from("seda:quoteAggregator")
    .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.