検索

8.6. Resequencer

download PDF

概要

図8.7「Resequencer パターン」 に示されている Resequencer パターンを使用すると、シーケンス式に応じてメッセージを再配列できます。シーケンス式の値が低いメッセージはバッチの先頭に移動し、値が高いメッセージは後ろに移動します。

図8.7 Resequencer パターン

Resequencer パターン

Apache Camel は、2 つの再配列アルゴリズムをサポートします。

  • Batch resequencing: メッセージをバッチで収集し、メッセージをソートして出力に送信します。
  • Stream resequencing: メッセージ間のギャップの検出に基づいて、(継続的な) メッセージ ストリームを再順序付けします。

デフォルトでは、Resequencer は重複メッセージをサポートしておらず、同じメッセージ式のメッセージが到達した場合は、最後のメッセージのみを保持します。ただし、バッチモードでは、Resequencer で重複を許可することができます。

Batch resequencing

Batch resequencing アルゴリズムは、デフォルトで有効になっています。たとえば、TimeStamp ヘッダーに含まれるタイムスタンプの値に基づいて受信メッセージのバッチを再配列するには、Java DSL で以下のルートを定義することができます。

from("direct:start").resequence(header("TimeStamp")).to("mock:result");

デフォルトでは、最大 100 メッセージ (デフォルトの バッチサイズ) までとし、1000 ミリ秒 (デフォルトの バッチタイムアウト) のインターバルで到着するすべての受信メッセージを収集することによって、バッチを取得します。バッチタイムアウトおよびバッチサイズの値をカスタマイズするには、BatchResequencerConfig インスタンスが唯一の引数として使用される batch() DSL コマンドを追加します。たとえば、バッチが最大 300 メッセージまでの、4000 ミリ秒インターバルでメッセージを収集するように前述のルートを変更するには、以下のように Java DSL ルートを定義することができます。

import org.apache.camel.model.config.BatchResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("direct:start").resequence(header("TimeStamp")).batch(new BatchResequencerConfig(300,4000L)).to("mock:result");
    }
};

XML 設定を使用して Batch resequencer パターンを指定することもできます。以下の例は、バッチサイズが 300 で、バッチタイムアウトが 4000 ミリ秒の Batch resequencer を定義しています。

<camelContext id="resequencerBatch" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start" />
    <resequence>
      <!--
        batch-config can be omitted for default (batch) resequencer settings
      -->
      <batch-config batchSize="300" batchTimeout="4000" />
      <simple>header.TimeStamp</simple>
      <to uri="mock:result" />
    </resequence>
  </route>
</camelContext>

バッチオプション

表8.2「Batch Resequencer オプション」 は、バッチモードでのみ使用できるオプションを示しています。

表8.2 Batch Resequencer オプション
Java DSLXML DSLデフォルト説明

allowDuplicates()

batch-config/@allowDuplicates

false

true の場合、バッチは重複したメッセージを破棄しません (重複 は、メッセージ式が同じ値に評価されることを意味します)。

reverse()

batch-config/@reverse

false

true の場合、メッセージを逆順で配列します (メッセージ式に適用されるデフォルトの順序は、String.compareTo () で定義されている Java の文字列語彙順に基づいています)。

たとえば、JMSPriority に基づいて JMS キューからのメッセージを再配列する場合は、以下のように allowDuplicates および reverse オプションを組み合わせる必要があります。

from("jms:queue:foo")
        // sort by JMSPriority by allowing duplicates (message can have same JMSPriority)
        // and use reverse ordering so 9 is first output (most important), and 0 is last
        // use batch mode and fire every 3th second
        .resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse()
        .to("mock:result");

Stream resequencing

Stream resequencing アルゴリズムを有効にするには、resequence() DSL コマンドに stream() を追加する必要があります。たとえば、seqnum ヘッダーのシーケンス番号の値に基づいて受信メッセージを再配列するには、以下のように DSL ルートを定義することができます。

from("direct:start").resequence(header("seqnum")).stream().to("mock:result");

Stream-prosessing resequencer アルゴリズムは、固定のバッチサイズではなく、メッセージストリーム内のギャップ検出に基づいています。ギャップ検出はタイムアウトと組み合わせることで、シーケンスのメッセージ数 (バッチサイズ) を事前に把握する必要がなくなります。メッセージには、先行および後継がわかる一意のシーケンス番号が含まれている必要があります。たとえば、シーケンス番号 3 を持つメッセージには、シーケンス番号 2 が含まれる先行メッセージと、シーケンス番号 4 を持つ後継メッセージがあります。メッセージのシーケンス 2,3,5 は、3 の後継がないため、ギャップがあります。したがって、Resequencer は、メッセージ 4 が到着するまで (またはタイムアウトが発生するまで)、メッセージ 5 を保持する必要があります。

デフォルトでは、Stream Resequencer はタイムアウトは 1000 ミリ秒、最大メッセージ容量は 100 で設定されます。ストリームのタイムアウトおよびメッセージ容量をカスタマイズするには、StreamResequencerConfig オブジェクトを引数として stream() に渡します。たとえば、メッセージ容量が 5000 でタイムアウトが 4000 ミリ秒の Stream resequencer を設定するには、以下のようにルートを定義することができます。

// Java
import org.apache.camel.model.config.StreamResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("direct:start").resequence(header("seqnum")).
            stream(new StreamResequencerConfig(5000, 4000L)).
            to("mock:result");
    }
};

メッセージストリーム内の連続するメッセージ (つまり、連続するシーケンス番号を持つメッセージ) 間の最大遅延時間が分かっている場合は、Resequencer の timeout パラメーターに、この値を設定する必要があります。この場合、ストリーム内のすべてのメッセージが正しい順序で次のプロセッサーに送信されることを保証することができます。シーケンス外となる時間差よりもタイムアウト値が小さいほど、Resequencer が未配列のメッセージを配信する可能性が高くなります。大きなタイムアウト値は、充分に高い容量値でサポートされるべきであり、ここでは容量パラメーターを使用して、Resequencer のメモリーが枯渇するのを防いでいます。

long 以外の型でシーケンス番号を使用する場合は、以下のようにカスタム comparator を定義する必要があります。

// Java
ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(5000, 4000L, comparator);
from("direct:start").resequence(header("seqnum")).stream(config).to("mock:result");

XML 設定を使用して Stream resequencer パターンを指定することもできます。以下の例は、メッセージ容量が 5000 で、タイムアウトが 4000 ミリ秒の Stream resequencer を定義します。

<camelContext id="resequencerStream" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <resequence>
      <stream-config capacity="5000" timeout="4000"/>
      <simple>header.seqnum</simple>
      <to uri="mock:result" />
    </resequence>
  </route>
</camelContext>

無効なエクスチェンジの無視

Resequencer EIP は、受信エクスチェンジが有効でない場合に CamelExchangeException 例外を出力します。これは、シーケンス式が何らかの理由で評価できない場合 (ヘッダーが見つからない場合など) が該当します 。ignoreInvalidExchanges オプションを使用して、これらの例外を無視することができます。つまり、Resequencer は無効なエクスチェンジをスキップします。

from("direct:start")
  .resequence(header("seqno")).batch().timeout(1000)
    // ignore invalid exchanges (they are discarded)
    .ignoreInvalidExchanges()
  .to("mock:result");

古いメッセージを拒否

この rejectOld オプションを使用すると、メッセージの再配列に使用されるメカニズムに関係なく、メッセージが未配列のまま送信されるのを防ぐことができます。rejectOld オプションを有効にすると、受信メッセージが最後に配信されたメッセージよりも 古い (現在の comparator によって定義されている) 場合に、Resequencer は受信メッセージを拒否します (MessageRejectedException 例外を出力します)。

from("direct:start")
    .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
    .resequence(header("seqno")).stream().timeout(1000).rejectOld()
    .to("mock:result");
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.