8.6. Resequencer
概要
図8.7「Resequencer パターン」 に示されている Resequencer パターンを使用すると、シーケンス式に従ってメッセージを再配列できます。シーケンス式の値が低いメッセージはバッチの先頭に移動し、値が高いメッセージは後ろに移動します。
図8.7 Resequencer パターン
Apache Camel は、2 つの再配列アルゴリズムをサポートします。
- Batch resequencing: メッセージをバッチで収集し、メッセージをソートして出力に送信します。
- Stream resequencing: メッセージ間のギャップの検出に基づいて、(継続的な) メッセージ ストリームを再順序付けします。
デフォルトでは、Resequencer は重複メッセージをサポートしておらず、同じメッセージ式のメッセージが到達した場合は、最後のメッセージのみを保持します。ただし、バッチモードでは、Resequencer で重複を許可することができます。
Batch resequencing
Batch resequencing アルゴリズムは、デフォルトで有効になっています。たとえば、TimeStamp
ヘッダーに含まれるタイムスタンプの値に基づいて受信メッセージのバッチを再配列するには、Java DSL で以下のルートを定義することができます。
from("direct:start").resequence(header("TimeStamp")).to("mock:result");
デフォルトでは、最大 100 メッセージ (デフォルトの バッチサイズ) までとし、1000 ミリ秒 (デフォルトの バッチタイムアウト) のインターバルで到着するすべての受信メッセージを収集することによって、バッチを取得します。バッチタイムアウトおよびバッチサイズの値をカスタマイズするには、BatchResequencerConfig
インスタンスが唯一の引数として使用される batch()
DSL コマンドを追加します。たとえば、バッチが最大 300 メッセージまでの、4000 ミリ秒インターバルでメッセージを収集するように前述のルートを変更するには、以下のように Java DSL ルートを定義することができます。
import org.apache.camel.model.config.BatchResequencerConfig; RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start").resequence(header("TimeStamp")).batch(new BatchResequencerConfig(300,4000L)).to("mock:result"); } };
XML 設定を使用して Batch resequencer パターンを指定することもできます。以下の例は、バッチサイズが 300 で、バッチタイムアウトが 4000 ミリ秒の Batch resequencer を定義しています。
<camelContext id="resequencerBatch" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <resequence> <!-- batch-config can be omitted for default (batch) resequencer settings --> <batch-config batchSize="300" batchTimeout="4000" /> <simple>header.TimeStamp</simple> <to uri="mock:result" /> </resequence> </route> </camelContext>
バッチオプション
表8.2「Batch Resequencer オプション」 は、バッチモードでのみ使用できるオプションを示しています。
Java DSL | XML DSL | デフォルト | 説明 |
---|---|---|---|
|
|
|
|
|
|
|
|
たとえば、JMSPriority
に基づいて JMS キューからのメッセージを再配列する場合は、以下のように allowDuplicates
および reverse
オプションを組み合わせる必要があります。
from("jms:queue:foo") // sort by JMSPriority by allowing duplicates (message can have same JMSPriority) // and use reverse ordering so 9 is first output (most important), and 0 is last // use batch mode and fire every 3th second .resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse() .to("mock:result");
Stream resequencing
Stream resequencing アルゴリズムを有効にするには、resequence()
DSL コマンドに stream()
を追加する必要があります。たとえば、seqnum
ヘッダーのシーケンス番号の値に基づいて受信メッセージを再配列するには、以下のように DSL ルートを定義することができます。
from("direct:start").resequence(header("seqnum")).stream().to("mock:result");
Stream-prosessing resequencer アルゴリズムは、固定のバッチサイズではなく、メッセージストリーム内のギャップ検出に基づいています。ギャップ検出はタイムアウトと組み合わせることで、シーケンスのメッセージ数 (バッチサイズ) を事前に把握する必要がなくなります。メッセージには、先行および後継がわかる一意のシーケンス番号が含まれている必要があります。たとえば、シーケンス番号 3
を持つメッセージには、シーケンス番号 2
が含まれる先行メッセージと、シーケンス番号 4
を持つ後継メッセージがあります。メッセージのシーケンス 2,3,5
は、3
の後継がないため、ギャップがあります。したがって、Resequencer は、メッセージ 4
が到着するまで (またはタイムアウトが発生するまで)、メッセージ 5
を保持する必要があります。
デフォルトでは、Stream Resequencer はタイムアウトは 1000 ミリ秒、最大メッセージ容量は 100 で設定されます。ストリームのタイムアウトおよびメッセージ容量をカスタマイズするには、StreamResequencerConfig
オブジェクトを引数として stream()
に渡します。たとえば、メッセージ容量が 5000 でタイムアウトが 4000 ミリ秒の Stream resequencer を設定するには、以下のようにルートを定義することができます。
// Java import org.apache.camel.model.config.StreamResequencerConfig; RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start").resequence(header("seqnum")). stream(new StreamResequencerConfig(5000, 4000L)). to("mock:result"); } };
メッセージストリーム内の連続するメッセージ (つまり、連続するシーケンス番号を持つメッセージ) 間の最大遅延時間が分かっている場合は、Resequencer の timeout パラメーターに、この値を設定する必要があります。この場合、ストリーム内のすべてのメッセージが正しい順序で次のプロセッサーに送信されることを保証することができます。シーケンス外となる時間差よりもタイムアウト値が小さいほど、Resequencer が未配列のメッセージを配信する可能性が高くなります。大きなタイムアウト値は、充分に高い容量値でサポートされるべきであり、ここでは容量パラメーターを使用して、Resequencer のメモリーが枯渇するのを防いでいます。
long
以外の型でシーケンス番号を使用する場合は、以下のようにカスタム comparator を定義する必要があります。
// Java ExpressionResultComparator<Exchange> comparator = new MyComparator(); StreamResequencerConfig config = new StreamResequencerConfig(5000, 4000L, comparator); from("direct:start").resequence(header("seqnum")).stream(config).to("mock:result");
XML 設定を使用して Stream resequencer パターンを指定することもできます。以下の例は、メッセージ容量が 5000 で、タイムアウトが 4000 ミリ秒の Stream resequencer を定義します。
<camelContext id="resequencerStream" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <resequence> <stream-config capacity="5000" timeout="4000"/> <simple>header.seqnum</simple> <to uri="mock:result" /> </resequence> </route> </camelContext>
無効なエクスチェンジの無視
Resequencer EIP は、受信エクスチェンジが有効でない場合に CamelExchangeException
例外をスローします。これは、シーケンス式が何らかの理由で評価できない場合 (ヘッダーが見つからない場合など) が該当します 。ignoreInvalidExchanges
オプションを使用して、これらの例外を無視することができます。つまり、Resequencer は無効なエクスチェンジをスキップします。
from("direct:start")
.resequence(header("seqno")).batch().timeout(1000)
// ignore invalid exchanges (they are discarded)
.ignoreInvalidExchanges()
.to("mock:result");
古いメッセージを拒否
この rejectOld
オプションを使用すると、メッセージの再配列に使用されるメカニズムに関係なく、メッセージが未配列のまま送信されるのを防ぐことができます。rejectOld
オプションを有効にすると、受信メッセージが最後に配信されたメッセージよりも 古い (現在の comparator によって定義されている) 場合に、Resequencer は受信メッセージを拒否します (MessageRejectedException
例外をスローします)。
from("direct:start")
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
.resequence(header("seqno")).stream().timeout(1000).rejectOld()
.to("mock:result");