第10章 Message Transformation
概要
Message Transformation パターンは、さまざまな目的のためにメッセージの内容を変更する方法を表しています。
10.1. Content Enricher
概要
Content Enricher パターンは、メッセージの宛先が元のメッセージに存在する以上のデータを必要とするシナリオを表しています。この場合、メッセージ変換、ルーティングロジック内の任意のプロセッサー、または Content Enricher メソッドを使用して外部リソースから追加データをプルします。
図10.1 Content Enricher パターン
コンテンツ補完の代替方法
Apache Camel は、コンテンツを補完するための複数の方法をサポートします。
- ルーティングロジックの任意のプロセッサーを使用したメッセージ変換
-
enrich()
メソッドは、現在のエクスチェンジのコピーを プロデューサー エンドポイントに送信し、生成された応答のデータを使用して、リソースから追加のデータを取得します。Enricher によって作成されるエクスチェンジは、常に InOut エクスチェンジです。 -
pollEnrich()
メソッドは、データの コンシューマー エンドポイントをポーリングして追加のデータを取得します。実質的に、メインルートからのコンシューマーエンドポイントとpollEnrich()
操作中のコンシューマーエンドポイントは結合されます。つまり、ルートの初期コンシューマーの受信メッセージが、ポーリングするコンシューマーのpollEnrich()
をトリガーします。
enrich()
および pollEnrich()
メソッドは動的エンドポイント URI をサポートします。URI を取得するには、現在のエクスチェンジから値を取得できる式を指定します。たとえば、データエクスチェンジから計算される名前でファイルをポーリングできます。この動作は Camel 2.16 で導入されました。この変更により XML DSL を使わず、容易に移行ができます。Java DSL は後方互換性を維持します。
メッセージ変換およびプロセッサーを使用したコンテンツの補完
Camel は、IDE 上でタイプセーフなコード補完をしながら、ルーティングおよび仲介ルールを作成できる Fluent Builder (流れるようなビルダー) を提供します。これにより、スマートな補完を提供し、リファクタリングを安全に行うことができます。分散システムをテストする場合、特定システムが利用可能または書き込みされるまで、システムの他の部分をテストするために、システムの特定部分のスタブを作成しなければならないような要件が一般的です。これを実行する 1 つの方法として、何らかの テンプレート システムを使用して、ほとんどの静的なボディーを持つ動的メッセージを生成することで、要求への応答を生成します。テンプレートを使用する別の方法として、ある宛先からメッセージを受信し、Velocity や XQuery などを使用して変換してから、別の宛先に送信する方法もあります。以下の例は、InOnly
(一方向) メッセージに対する例になります。
from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm"). to("activemq:Another.Queue");
InOut (request-reply) メッセージングを使用して ActiveMQ の My.Queue
キューでリクエストを処理するとします。JMSReplyTo
の宛先に送信されるテンプレートが生成した応答が必要です。以下の例は、これらを行う方法を示しています。
from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm");
以下の例は、DSL を使用してメッセージのボディーを変換する方法を示しています。
from("direct:start").setBody(body().append(" World!")).to("mock:result");
以下の例は、明示的な Java コードを使用してプロセッサーを追加します。
from("direct:start").process(new Processor() { public void process(Exchange exchange) { Message in = exchange.getIn(); in.setBody(in.getBody(String.class) + " World!"); } }).to("mock:result");
次の例では、変換機能として機能する Bean を有効化するために Bean 統合を使用しています。
from("activemq:My.Queue"). beanRef("myBeanName", "myMethodName"). to("activemq:Another.Queue");
以下の例は Spring XML 実装を示しています。
<route> <from uri="activemq:Input"/> <bean ref="myBeanName" method="doTransform"/> <to uri="activemq:Output"/> </route>/>
Enrich() メソッドを使用したコンテンツの補完
AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource") ...
Content Enricher (enrich
) は、(元のエクスチェンジ に含まれる) 受信メッセージを補完するために、リソースエンドポイント から追加のデータを取得します。集約ストラテジーは、元のエクスチェンジと リソースエクスチェンジ を組み合わせたものです。AggregationStrategy.aggregate(Exchange, Exchange)
メソッドの最初のパラメーターは元のエクスチェンジに対応し、2 番目のパラメーターはリソースエクスチェンジに対応します。リソースエンドポイントの結果は、リソースエクスチェンジの Out メッセージに保存されます。以下は、独自の集約ストラテジークラスを実装するためのテンプレートの例です。
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object originalBody = original.getIn().getBody(); Object resourceResponse = resource.getOut().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
このテンプレートを使用すると、元のエクスチェンジがあらゆる交換パターンを持つことができます。Enricher によって作成されるリソースエクスチェンジは、常に InOut エクスチェンジです。
Spring XML 補完の例
上記の例は Spring XML にも実装できます。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
コンテンツの補完時のデフォルト集約ストラテジー
集約ストラテジーは任意です。提供しない場合、Apache Camel はデフォルトでリソースから取得したボディーを使用します。以下に例を示します。
from("direct:start") .enrich("direct:resource") .to("direct:result");
上記のルートでは、direct:result
エンドポイントに送信されたメッセージには direct:resource
からの出力が含まれます。これは、この例ではカスタムの集約を使用していないためです。
XML DSL では、以下のように strategyRef
属性を省略します。
<route> <from uri="direct:start"/> <enrich uri="direct:resource"/> <to uri="direct:result"/> </route>
Enrich() メソッドでサポートされるオプション
enrich
DSL コマンドは、以下のオプションをサポートします。
名前 | デフォルト値 | 説明 |
| なし | Camel 2.16 以降では、このオプションは必須です。Enrich from を使用するよう外部サービスの URI を設定する式を指定します。Simple 式言語、Constant 式言語、または現在のエクスチェンジの値から動的に URI を計算できるその他の言語を使用できます。 |
|
これらのオプションは削除されています。代わりに | |
|
補完のための外部サービスのエンドポイントを参照します。 | |
|
外部サービスからの応答を 1 つの送信メッセージにマージするために使用される AggregationStrategy を参照します。デフォルトでは、Camel は外部からの応答を送信メッセージとして使用します。POJO を | |
|
POJO を | |
| false |
デフォルトの動作では、補完するデータがない場合、集約メソッドは使用されません。このオプションが true の場合、補完のデータがなく、POJO を |
| false |
デフォルトの動作では、リソースから補完するデータの取得中に例外が発生すると、集約メソッドは 使用されません。このオプションを |
| false | Camel 2.16 以降、補完操作は親エクスチェンジとリソースエクスチェンジの間で Unit of Work を共有しないのがデフォルトの動作になります。これは、リソースエクスチェンジに個別の Unit of Work があることを意味します。詳細は、Splitter パターンについてのドキュメントを参照してください。 |
|
|
Camel 2.16 以降では、このオプションを指定し、補完の操作でプロデューサーを再利用するためにプロデューサーをキャッシュすることができる |
| false | Camel 2.16 以降、このオプションは解決できないエンドポイント URI を無視するかどうかを示します。デフォルトの動作では、無効なエンドポイント URI を特定する例外が Camel によって出力されます。 |
Enrich() メソッド使用時の集約ストラテジーの指定
enrich()
メソッドは、リソースエンドポイントから追加のデータを取得し、元のエクスチェンジに含まれる受信メッセージを強化します。集約ストラテジーを使用して、元のエクスチェンジとリソースエクスチェンジを組み合わせることができます。AggregationStrategy.aggregate(Exchange, Exchange)
メソッドの最初のパラメーターは、元のエクスチェンジに対応します。2 番目のパラメーターは、リソースのエクスチェンジに対応します。リソースエンドポイントの結果は、リソースエクスチェンジの Out
メッセージに保存されます。以下に例を示します。
AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource") ...
以下のコードは、集約ストラテジーを実装するためのテンプレートです。このテンプレートを使用する実装では、元のエクスチェンジはメッセージエクスチェンジパターンになります。Enricher によって作成されるリソースエクスチェンジは、常に InOut メッセージエクスチェンジパターンです。
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object originalBody = original.getIn().getBody(); Object resourceResponse = resource.getIn().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
以下の例は、Spring XML DSL を使用して集約ストラテジーを実装する方法を示しています。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> </enrich> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
Enrich() での動的 URI の使用
Camel 2.16 以降、enrich()
および pollEnrich()
メソッドは、現在のエクスチェンジからの情報に基づいて計算される動的 URI の使用をサポートします。たとえば、orderId
キーのあるヘッダーが HTTP URL のコンテンツパスの一部として使用される HTTP エンドポイントから補完するには、以下のような操作を行うことができます。
from("direct:start") .enrich().simple("http:myserver/${header.orderId}/order") .to("direct:result");
以下は、XML DSL と同じ例です。
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich> <simple>http:myserver/${header.orderId}/order</simple> </enrich> <to uri="direct:result"/> </route>
pollEnrich() メソッドを使用したコンテンツの補完
この pollEnrich
コマンドは、リソースのエンドポイントを コンシューマー として扱います。エクスチェンジをリソースエンドポイントに送信する代わりに、エンドポイントを ポーリング します。デフォルトでは、リソースエンドポイントからエクスチェンジがない場合は、ポーリングはすぐに返します。たとえば、以下のルートは、受信 JMS メッセージのヘッダーから抽出される名前のファイルを読み取ります。
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId") .to("bean:processOrder");
ファイルが準備できるまで待機する時間を制限できます。以下の例は、20 秒の最大待機時間を示しています。
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds .to("bean:processOrder");
pollEnrich()
の集計ストラテジーを指定することもできます。以下に例を示します。
.pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)
pollEnrich()
メソッドは、consumer.bridgeErrorHandler=true
で設定されたコンシューマーをサポートします。これにより、ポーリングからの例外がルートエラーハンドラーに伝播され、たとえばポーリングを再試行できます。
consumer.bridgeErrorHandler=true
のサポートが Camel 2.18 で新たに追加されました。この動作は Camel 2.17 ではサポートされません。
集計ストラテジーの aggregate()
メソッドに渡されるリソースエクスチェンジは、エクスチェンジを受け取る前にポーリングがタイムアウトした場合は null
の可能性があります。
pollEnrich() によって使用されるポーリングメソッド
pollEnrich()
メソッドは、以下のポーリングメソッドのいずれかを呼び出して、コンシューマーエンドポイントをポーリングします。
-
receiveNoWait()
(これはデフォルトです。) -
receive()
-
receive(long timeout)
pollEnrich()
コマンドのタイムアウト引数 (ミリ秒単位) は、以下のように呼び出すメソッドを決定します。
-
タイムアウトが
0
または指定されていない場合、pollEnrich()
はreceiveNoWait
を呼び出します。 -
タイムアウトが負の値の場合、
pollEnrich()
はreceive
を呼び出します。 -
それ以外の場合は、
pollEnrich()
はreceive(timeout)
を呼び出します。
データがない場合、集約ストラテジーの newExchange
は null になります。
pollEnrich() メソッドの使用例
以下の例は、inbox/data.txt
ファイルからコンテンツを読み込むことによるメッセージの補完を示しています。
from("direct:start") .pollEnrich("file:inbox?fileName=data.txt") .to("direct:result");
以下は、XML DSL と同じ例です。
<route> <from uri="direct:start"/> <pollEnrich> <constant>file:inbox?fileName=data.txt"</constant> </pollEnrich> <to uri="direct:result"/> </route>
指定したファイルが存在しない場合は、メッセージは空になります。ファイルが存在するまで待機するタイムアウトを指定するか、特定の時間まで待機できます。以下の例では、コマンドは 5 秒未満待機します。
<route> <from uri="direct:start"/> <pollEnrich timeout="5000"> <constant>file:inbox?fileName=data.txt"</constant> </pollEnrich> <to uri="direct:result"/> </route>
pollEnrich() での動的 URI の使用
Camel 2.16 以降、enrich()
および pollEnrich()
メソッドは、現在のエクスチェンジからの情報に基づいて計算される動的 URI の使用をサポートします。たとえば、ヘッダーを使用して SEDA キュー名を示すエンドポイントから Enrich をポーリングするには、以下を行います。
from("direct:start") .pollEnrich().simple("seda:${header.name}") .to("direct:result");
以下は、XML DSL と同じ例です。
<route> <from uri="direct:start"/> <pollEnrich> <simple>seda${header.name}</simple> </pollEnrich> <to uri="direct:result"/> </route>
pollEnrich() メソッドでサポートされるオプション
pollEnrich
DSL コマンドは、以下のオプションをサポートします。
名前 | デフォルト値 | 説明 |
| なし | Camel 2.16 以降では、このオプションは必須です。Enrich from を使用するよう外部サービスの URI を設定する式を指定します。Simple 式言語、Constant 式言語、または現在のエクスチェンジの値から動的に URI を計算できるその他の言語を使用できます。 |
|
これらのオプションは削除されています。代わりに | |
|
補完のための外部サービスのエンドポイントを参照します。 | |
|
外部サービスからの応答を 1 つの送信メッセージにマージするために使用される AggregationStrategy を参照します。デフォルトでは、Camel は外部からの応答を送信メッセージとして使用します。POJO を | |
|
POJO を | |
| false |
デフォルトの動作では、補完するデータがない場合、集約メソッドは使用されません。このオプションが true の場合、補完のデータがなく、POJO を |
|
|
外部サービスからのポーリング時に応答を待つ最大時間 (ミリ秒単位)。デフォルトの動作では、 |
| false |
デフォルトの動作では、リソースから補完するデータの取得中に例外が発生すると、集約メソッドは 使用されません。このオプションを |
|
|
このオプションを指定し、 |
| false | 解決できないエンドポイント URI を無視するかどうかを示します。デフォルトの動作では、無効なエンドポイント URI を特定する例外が Camel によって出力されます。 |