8.13. Multicast


概要

図8.9「Multicast パターン」 に記載されている Multicast パターンは、InOut メッセージ交換パターンと互換性のある、宛先パターンが固定された Recipient list のバリエーションです。これは Recipient list とは対照的に、InOnly 交換パターンとのみ互換性があります。

図8.9 Multicast パターン

Multicast パターン

カスタム集約ストラテジーを使用した Multicast

Multicast プロセッサーは、元のリクエストに対して複数の Out メッセージを受信しますが (各受信者から 1 つずつ)、呼び出し元は 1 つ のリプライを受け取るだけです。したがって、メッセージエクスチェンジのリプライの行程に固有のミスマッチがあり、この不一致を解消するためには、Multicast プロセッサーにカスタム 集約ストラテジー を提供する必要があります。集約ストラテジークラスは、すべての Out メッセージを単一のリプライメッセージに集約します。

出品者が複数の入札者に販売商品を提供する電子オークションサービスの例を考えてみましょう。各入札者は商品に対して入札し、出品者が自動的に最高額の入札を選択します。以下のように、multicast() DSL コマンドを使用して、オファーを固定の入札者リストに配布するロジックを実装できます。

from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()).
    to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

seller はエンドポイント cxf:bean:offer で表され、利用者はエンドポイント cxf:bean:Buyer1cxf:bean:Buyer2cxf:bean:Buyer3 によって表されます。様々な入札者からの入札を集約するために、Multicast プロセッサーは集約ストラテジー HighestBidAggregationStrategy を使用します以下のように、Java に HighestBidAggregationStrategy を実装することができます。

// Java
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Exchange;

public class HighestBidAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        float oldBid = oldExchange.getOut().getHeader("Bid", Float.class);
        float newBid = newExchange.getOut().getHeader("Bid", Float.class);
        return (newBid > oldBid) ? newExchange : oldExchange;
    }
}

入札者は、Bid という名前のヘッダーに入札価格を設定することが前提となります。カスタム集約ストラテジーの詳細は、「Aggregator」 を参照してください。

並列処理

デフォルトでは、Multicast プロセッサーは、受信者のエンドポイントを逐次呼び出します (to() コマンドに記載されている順序で)。場合によっては、許容されないほどのレイテンシーが発生することがあります。このような待ち時間を回避するために、parallelProcessing() 句を追加して並行処理を有効にするオプションがあります。たとえば、電子オークションの例で並行処理を有効にするには、以下のようにルートを定義します。

from("cxf:bean:offer")
    .multicast(new HighestBidAggregationStrategy())
        .parallelProcessing()
        .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

Multicast プロセッサーは、各エンドポイントに 1 つのスレッドを割り当てるスレッドプールを使用して、入札者のエンドポイントを呼び出すようになりました。

入札者のエンドポイントを呼び出すスレッドプールのサイズをカスタマイズする場合は、executorService() メソッドを呼び出して独自のカスタム executor service を指定することができます。以下に例を示します。

from("cxf:bean:offer")
    .multicast(new HighestBidAggregationStrategy())
        .executorService(MyExecutor)
        .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

MyExecutor は、java.util.concurrent.ExecutorService 型のインスタンスです。

エクスチェンジに InOUt パターンがある場合、リプライメッセージを集約するために集約ストラテジーが使用されます。デフォルトの集約ストラテジーは、最新のリプライメッセージを取り、それ以前のリプライメッセージを破棄します。たとえば、以下のルートでは、カスタムストラテジー MyAggregationStrategy を使用してエンドポイント direct:adirect:bdirect:c からのリプライを集約します。

from("direct:start")
  .multicast(new MyAggregationStrategy())
      .parallelProcessing()
      .timeout(500)
      .to("direct:a", "direct:b", "direct:c")
  .end()
  .to("mock:result");

XML 設定の例

以下の例は、XML で同様のルートを設定する方法を示しています。ルートは、カスタム集約ストラテジーとカスタム thread executor を使用しています。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    ">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="cxf:bean:offer"/>
      <multicast strategyRef="highestBidAggregationStrategy"
                 parallelProcessing="true"
                 threadPoolRef="myThreadExcutor">
         <to uri="cxf:bean:Buyer1"/>
         <to uri="cxf:bean:Buyer2"/>
         <to uri="cxf:bean:Buyer3"/>
      </multicast>
    </route>
  </camelContext>

  <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/>
  <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/>

</beans>

parallelProcessing 属性と threadPoolRef 属性は任意です。Multicast プロセッサーのスレッド動作をカスタマイズする場合にのみ設定する必要があります。

送信メッセージへのカスタム処理の適用

Multicast パターン は、ソース Exchange をコピーして、そのコピーをマルチキャストします。デフォルトでは、ルーターはソースメッセージのシャローコピーを作成します。シャローコピーでは、元のメッセージのヘッダーとペイロードは参照によってのみコピーされます。つまり、元のメッセージのコピーはそれらへリンクされます。マルチキャストメッセージのシャローコピーはリンクされているため、メッセージボディーが変更可能な場合は、カスタム処理を適用することができません。あるエンドポイントに送信されたコピーに適用するカスタム処理は、他のすべてのエンドポイントに送信されたコピーにも適用されます。

注記

multicast 構文では、multicast 句で process DSL コマンドを呼び出すことはできますが論理的に意味をなさず、onPrepare と同じ効果を持ちません (実際、このコンテキストでは process DSL コマンドは何の影響も与えません)。

メッセージの準備時にカスタムロジックを実行するための onPrepare の使用

エンドポイントに送信する前に、カスタム処理を各メッセージレプリカに適用する場合は、multicast 句で onPrepare DSL コマンドを呼び出すことができます。この onPrepare コマンドは、メッセージがシャローコピーされた 直後、かつメッセージがエンドポイントにディスパッチされる 直前 に、カスタムプロセッサーを挿入します。たとえば、以下のルートでは、direct:a に送信されたメッセージに対して CustomProc プロセッサーが呼び出され、direct:b に送信されたメッセージに対しても CustomProc プロセッサーが呼び出されます。

from("direct:start")
  .multicast().onPrepare(new CustomProc())
  .to("direct:a").to("direct:b");

onPrepare DSL コマンドの一般的なユースケースとして、メッセージの一部またはすべての要素のディープコピーを実行します。たとえば、以下の CustomProc プロセッサークラスは、メッセージボディーのディープコピーを実行します。メッセージボディーの型は BodyType であると想定され、ディープコピーはメソッド BodyType.deepCopy() によって実行されます。

// Java
import org.apache.camel.*;
...
public class CustomProc implements Processor {

    public void process(Exchange exchange) throws Exception {
        BodyType body = exchange.getIn().getBody(BodyType.class);

        // Make a _deep_ copy of of the body object
        BodyType clone =  BodyType.deepCopy();
        exchange.getIn().setBody(clone);

        // Headers and attachments have already been
        // shallow-copied. If you need deep copies,
        // add some more code here.
    }
}

onPrepare を使い、Exchange がマルチキャストされる前に実行するカスタムロジックを任意に実装することができます。

注記

イミュータブルなオブジェクトを設計することが推奨されます。

たとえば、この Animal クラスのようにミュータブルなメッセージボディーがあるとします。

public class Animal implements Serializable {

     private int id;
     private String name;

     public Animal() {
     }

     public Animal(int id, String name) {
         this.id = id;
         this.name = name;
     }

     public Animal deepClone() {
         Animal clone = new Animal();
         clone.setId(getId());
         clone.setName(getName());
         return clone;
     }

     public int getId() {
         return id;
     }

     public void setId(int id) {
         this.id = id;
     }

     public String getName() {
         return name;
     }

     public void setName(String name) {
         this.name = name;
     }

     @Override
     public String toString() {
         return id + " " + name;
     }
 }

次に、メッセージボディーをクローンするディープクローンプロセッサーを作成します。

public class AnimalDeepClonePrepare implements Processor {

     public void process(Exchange exchange) throws Exception {
         Animal body = exchange.getIn().getBody(Animal.class);

         // do a deep clone of the body which wont affect when doing multicasting
         Animal clone = body.deepClone();
         exchange.getIn().setBody(clone);
     }
 }

次に、以下のように onPrepare オプションを使用して Multicast ルートで AnimalDeepClonePrepare クラスを使用します。

from("direct:start")
     .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");

XML DSL を使用した同じ例

<camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
         <from uri="direct:start"/>
         <!-- use on prepare with multicast -->
         <multicast onPrepareRef="animalDeepClonePrepare">
             <to uri="direct:a"/>
             <to uri="direct:b"/>
         </multicast>
     </route>

     <route>
         <from uri="direct:a"/>
         <process ref="processorA"/>
         <to uri="mock:a"/>
     </route>
     <route>
         <from uri="direct:b"/>
         <process ref="processorB"/>
         <to uri="mock:b"/>
     </route>
 </camelContext>

 <!-- the on prepare Processor which performs the deep cloning -->
 <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/>

 <!-- processors used for the last two routes, as part of unit test -->
 <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/>
 <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>

オプション

multicast DSL コマンドは、以下のオプションをサポートします。

名前

デフォルト値

説明

strategyRef

 

AggregationStrategy の参照は、受信者からの複数のリプライを Multicast からの単一の送信メッセージへ集約するために使用されます。デフォルトでは、Camel は最後のリプライを送信メッセージとして使用します。

strategyMethodName

 

POJO を AggregationStrategy として使用している場合に、使用するメソッド名を明示的に指定するために使用できます。

strategyMethodAllowNull

false

POJO を AggregationStrategy として使用している場合に、このオプションを使用することができます。false に設定すると、エンリッチするデータがない場合に aggregate メソッドは使用されません。true に設定すると、エンリッチするデータがない場合には、oldExchangenull 値が使用されます。

parallelProcessing

false

有効にすると、マルチキャストへのメッセージの送信が並列処理されます。呼び出し元スレッドは、すべてのメッセージが完全に処理されるまで待機してから続行することに注意してください。マルチキャストからの送信およびリプライ処理のみが並列に処理されます。

parallelAggregate

false

有効にすると、AggregationStrategyaggregate メソッドを同時に呼び出すことができます。これには、AggregationStrategy の実装がスレッドセーフである必要があることに注意してください。デフォルトでは、このオプションは false となっており、Camel が自動的に aggregate メソッドへの呼び出しを同期することを意味します。ただし、場合によっては、AggregationStrategy をスレッドセーフとして実装し、このオプションを true に設定することで、パフォーマンスを向上させることができます。

executorServiceRef

 

並列処理に使用するカスタムスレッドプールを参照します。このオプションを設定すると、並列処理は自動的に適用されるため、並列処理用オプションも有効にする必要はありません。

stopOnException

false

Camel 2.2: 例外発生時、すぐに継続処理を停止するかどうか。無効にすると、いずれかの失敗の有無に関わらず、Camel はメッセージをすべてのマルチキャストに送信します。AggregationStrategy クラス内で、例外処理を完全に制御することができます。

streaming

false

有効な場合、Camel はリプライを順不同で処理します。たとえば、返信順に処理します。無効な場合、Camel はマルチキャストと同じ順序でリプライを処理します。

timeout

 

Camel 2.5: 合計タイムアウト値をミリ秒単位で設定します。Multicast が指定の時間枠内のすべての応答を送信および処理できない場合、タイムアウトが発生し、Multicast から抜け出し続行します。TimeoutAwareAggregationStrategy を提供する場合、timeout メソッドが抜け出す前に呼び出されるため、注意してください。

onPrepareRef

 

camel 2.8: カスタムプロセッサーを参照して、各マルチキャストが受信する Exchange のコピーを準備します。これにより、必要に応じてメッセージのペイロードをディープクローンするなど、カスタムロジックを実行できます。

shareUnitOfWork

false

Camel 2.8: Unit of Work を共有すべきかどうか。詳細は、「Splitter」 で同じオプションを参照してください。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.