1.5. プロセッサー
概要
ルーターが単にコンシューマーエンドポイントをプロデューサーエンドポイントに接続するだけでなく、より複雑なことを実行できるようにするため、プロセッサー をルートに追加することができます。プロセッサーは、ルーティングルールに挿入して、ルールを通過するメッセージの任意処理を実行するコマンドです。Apache Camel は、表1.1「Apache Camel プロセッサー」 に示されるように、さまざまなプロセッサーを提供します。
Java DSL | XML DSL | 説明 |
---|---|---|
|
| 「Aggregator」: 複数の受信エクスチェンジを単一のエクスチェンジに統合するアグリゲーターを作成します。 |
|
| アスペクト指向プログラミング (AOP) を使用して、指定されたサブルートの前後で作業を行います。 |
|
| Java オブジェクト (または Bean) でメソッドを呼び出して、現在のエクスチェンジを処理します。「Bean インテグレーション」 を参照してください。 |
|
|
「Content-Based Router」: |
|
| In メッセージボディーを、指定された型に変換します。 |
|
| 「Delayer」: ルートの後続部分へのエクスチェンジの伝搬を遅延します。 |
|
|
|
| 該当なし | 現在のコマンドブロックを終了します。 |
|
| 「Content Enricher」: 現在のエクスチェンジと、指定された プロデューサー エンドポイント URI から要求されたデータを統合します。 |
|
| 「Message Filter」: 述語式を使用して受信エクスチェンジをフィルタリングします。 |
|
| 「Idempotent Consumer」: 重複メッセージを抑制するためのストラテジーを実装します。 |
|
| 特定のルートノードで継承されたエラーハンドラーを無効にするために使用できるブール値オプション (Java DSL でサブ句として定義され、XML DSL の属性として定義) 。 |
|
| 現在のエクスチェンジの MEP を InOnly (引数がない場合) に設定するか、指定されたエンドポイントへエクスチェンジを InOnly として送信します。 |
|
| 現在のエクスチェンジの MEP を InOut (引数がない場合) に設定するか、指定されたエンドポイントへエクスチェンジを InOut として送信します。 |
|
| 「Load Balancer」: エンドポイントのコレクションに対する負荷分散を実装します。 |
|
| コンソールにメッセージを記録します。 |
|
| 「Loop」: 各エクスチェンジをルートの後半部分に繰り返し再送信します。 |
|
|
(トランザクション) 現在のトランザクションをロールバックオンリーにマークします (例外は発生しません)。XML DSL では、このオプションは |
|
|
(トランザクション) 1 つ以上のトランザクションがこのスレッドに関連付けられてから一時停止されている場合、このコマンドは最新のトランザクションをロールバックオンリーにマークします (例外は発生しません)。XML DSL では、このオプションは |
|
| 指定されたデータフォーマットを使用して低レベルまたはバイナリーフォーマットに変換し、特定のトランスポートプロトコルで送信できるようにします。 |
|
| 「Multicast」: 現在のエクスチェンジを複数の宛先にマルチキャストし、各宛先がエクスチェンジの独自のコピーを取得します。 |
|
|
メインルートの完了後に実行されるサブルート (Java DSL |
|
|
指定された例外が発生するたびに実行されるサブルート (Java DSL |
|
| 「パイプとフィルター」: エクスチェンジを一連のエンドポイントに送信します。この場合、1 つのエンドポイントの出力が次のエンドポイントの入力になります。「パイプライン処理」 も参照してください。 |
|
| 現在のルートにポリシーを適用します(現時点ではトランザクションポリシーにのみ使用されます)。Apache Karaf Transaction Guide を参照してください。 |
|
| 「Content Enricher」: 現在のエクスチェンジと、指定された コンシューマー エンドポイント URI からポーリングされたデータを統合します。 |
|
| 現在のエクスチェンジでカスタムプロセッサーを実行します。「カスタムプロセッサー」 および パートIII「高度な Camel プログラミング」 を参照してください。 |
|
| 「受信者リスト」: エクスチェンジを、実行時に算出される受信者のリストに送信します(例: ヘッダーの内容に基づく)。 |
|
| 指定したヘッダーをエクスチェンジの In メッセージから削除します。 |
|
|
指定したパターンに一致するヘッダーをエクスチェンジの In メッセージから削除します。パターンにはフォームを持たせることができます。 |
|
| エクスチェンジから、指定したエクスチェンジプロパティーを削除します。 |
|
|
指定したパターンに一致するプロパティーをエクスチェンジから削除します。カンマで区切られた 1 つ以上の文字列のリストを引数として取ります。最初の文字列はパターンです (上記の |
|
| 「Resequencer」: 指定されたコンパレーター操作に基づいて受信エクスチェンジの順序を変更します。バッチ モードと ストリーム モードをサポートします。 |
|
| (トランザクション) 現在のトランザクションをロールバックオンリーにマークします (デフォルトでは例外も発生)。『 Apache Karaf Transaction Guide 』を参照してください。 |
|
| 「Routing Slip」: Slip ヘッダーから抽出されるエンドポイント URI の一覧に基づいて、動的に構築されたパイプラインを使用してエクスチェンジをルーティングします。 |
|
| サンプリングスロットラーを作成し、ルート上のトラフィックからエクスチェンジのサンプルを抽出できるようにします。 |
|
| エクスチェンジの In メッセージのメッセージボディーを設定します。 |
|
| 現在のエクスチェンジの MEP を指定された値に設定します。「メッセージ交換パターン」 を参照してください。 |
|
| エクスチェンジの In メッセージに指定したヘッダーを設定します。 |
|
| エクスチェンジの Out メッセージに指定したヘッダーを設定します。 |
|
| 指定したエクスチェンジプロパティーを設定します。 |
|
| In メッセージボディーの内容を並べ替えます (カスタムコンパレーターをオプションで指定できます) 。 |
|
| 「Splitter」: 現在のエクスチェンジを一連のエクスチェンジに分割します。この場合、分割された各エクスチェンジには元のメッセージボディーのフラグメントが含まれます。 |
|
| 現在のエクスチェンジのルーティングを停止し、完了したとマークします。 |
|
| ルートの後続部分を並列に処理するためにスレッドプールを作成します。 |
|
| 「Throttler」: フローレートを指定のレベルに制限します(1 秒あたりの交換数)。 |
|
| 指定された Java 例外をスローします。 |
|
| エクスチェンジを 1 つ以上のエンドポイントに送信します。「パイプライン処理」 を参照してください。 |
| 該当なし |
文字列フォーマットを使用して、エクスチェンジをエンドポイントに送信します。つまり、エンドポイント URI 文字列は C 言語の |
|
| ルートの後続部分を囲む Spring トランザクションスコープを作成します。『 Apache Karaf Transaction Guide 』を参照してください。 |
|
| 「メッセージトランスレーター」: In メッセージヘッダーを Out メッセージヘッダーにコピーし、Out メッセージのボディーを指定の値に設定します。 |
|
| 指定したデータフォーマットを使用して、In メッセージボディーを低レベルまたはバイナリー形式から高レベル形式に変換します。 |
|
|
述語式を取り、現在のメッセージが有効かどうかを検証します。述語が |
|
|
「Wire Tap」: |
サンプルプロセッサー
ルートでプロセッサーを使用する方法をある程度理解するには、以下の例を参照してください。
Choice
choice()
プロセッサーは、受信メッセージを別のプロデューサーエンドポイントにルーティングするために使用される条件文です。代替となる各プロデューサーエンドポイントの前には、述語の引数を取る when()
メソッドがあります。述語が true の場合、後続のターゲットが選択されます。そうでない場合、ルール内の次の when()
メソッドに処理が進みます。たとえば、以下の choice()
プロセッサーは Predicate1 および Predicate2 の値に応じて、受信メッセージを Target1、Target2、または Target3 のいずれかに転送します。
from("SourceURL") .choice() .when(Predicate1).to("Target1") .when(Predicate2).to("Target2") .otherwise().to("Target3");
または、Spring XML で同等のものを記述すると、このようになります。
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <choice> <when> <!-- First predicate --> <simple>header.foo = 'bar'</simple> <to uri="Target1"/> </when> <when> <!-- Second predicate --> <simple>header.foo = 'manchu'</simple> <to uri="Target2"/> </when> <otherwise> <to uri="Target3"/> </otherwise> </choice> </route> </camelContext>
Java DSL には、endChoice()
コマンドを使用する必要がある可能性がある特殊なケースがあります。標準の Apache Camel プロセッサーの中には、特殊なサブ句を使用して追加のパラメーターを指定でき、通常は end()
コマンドで終了する追加レベルのネストを効果的に展開します。たとえば、ロードバランサー句を loadBalance().roundRobin().to("mock:foo").to("mock:bar").end()
として指定できます。これにより、mock:foo
と mock:bar
エンドポイント間でメッセージの負荷分散が行われます。ただし、ロードバランサー句が choice の条件に組み込まれている場合は、以下のように endChoice()
コマンドを使用して句を終了する必要があります。
from("direct:start") .choice() .when(bodyAs(String.class).contains("Camel")) .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice() .otherwise() .to("mock:result");
フィルター
filter()
プロセッサーを使用すると、必要のないメッセージがプロデューサーエンドポイントに到達しないようにすることができます。述語の引数を 1 つ取ります。述語が true の場合、メッセージエクスチェンジはプロデューサーに対して許可されます。述語が false の場合、メッセージエクスチェンジはブロックされます。たとえば、以下のフィルターは、受信メッセージに bar
の値を持つヘッダー foo
が含まれない限り、メッセージエクスチェンジをブロックします。
from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");
または、Spring XML で同等のものを記述すると、このようになります。
<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <filter> <simple>header.foo = 'bar'</simple> <to uri="TargetURL"/> </filter> </route> </camelContext>
Throttler
throttle()
プロセッサーは、プロデューサーエンドポイントがオーバーロードしないようにします。スロットラーは、1 秒間に通過できるメッセージの数を制限することで機能します。受信メッセージが指定されたレートを超える場合、スロットラーは超過したメッセージをバッファーに蓄積し、ゆっくりとプロデューサーエンドポイントに送信します。たとえば、スループットのレートを毎秒 100 メッセージに制限するには、以下のルールを定義します。
from("SourceURL").throttle(100).to("TargetURL");
または、Spring XML で同等のものを記述すると、このようになります。
<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000"> <to uri="TargetURL"/> </throttle> </route> </camelContext>
カスタムプロセッサー
ここに記載されている標準プロセッサーがいずれも必要な機能を提供しない場合は、いつでも独自のカスタムプロセッサーを定義できます。カスタムプロセッサーを作成するには、org.apache.camel.Processor
インターフェースを実装し、process()
メソッドを上書きするクラスを定義します。以下のカスタムプロセッサー MyProcessor
は、受信メッセージから、ヘッダー foo
を削除します。
例1.3 カスタムプロセッサークラスの実装
public class MyProcessor implements org.apache.camel.Processor { public void process(org.apache.camel.Exchange exchange) { inMessage = exchange.getIn(); if (inMessage != null) { inMessage.removeHeader("foo"); } } };
カスタムプロセッサーをルータールールに挿入するには、process()
メソッドを呼び出します。このメソッドは、ルールにプロセッサを挿入するための一般的なメカニズムを提供します。たとえば、以下のルールは、例1.3「カスタムプロセッサークラスの実装」 で定義されたプロセッサーを呼び出します。
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL").process(myProc).to("TargetURL");