第36章 Disruptor


Disruptor コンポーネント

Camel 2.12 以降で利用可能
中断: コンポーネントは、標準の SEDA コンポーネントと同様に非同期 SEDA 機能を提供しますが、標準の SEDA で使用される BlockingQueue の代わりに Disruptor を使用します。または、このコンポーネントで disruptor-vm: エンドポイントをサポートし、標準の 仮想マシン の代替手段を提供します。SEDA コンポーネントと同様に、中断のバッファー: エンドポイントは 単一CamelContext 内でのみ表示され、永続性またはリカバリーに対するサポートは提供されません。*disruptor-vm:* エンドポイントは、CamelContexts インスタンス間の通信もサポートします。そのため、このメカニズムを使用して Web アプリケーション全体で通信を行うことができます( camel-disruptor.jarシステム/ブート クラスパス上にある場合)。
SEDA または VM コンポーネントで Disruptor コンポーネントを使用することを選択する主な利点は、プロデューサーとマルチキャストされたコンシューマーまたは同時コンシューマーの間の競合が高いユースケースのパフォーマンスです。このような場合、スループットが大幅に増加し、レイテンシーが減少しています。競合のないシナリオのパフォーマンスは、SEDA および VM コンポーネントと類似しています。
Disruptor は、可能な限り SEDA および VM コンポーネントの動作を模倣して実装されます。それらの相違点は、以下のとおりです。
  • 使用されるバッファーは、常にサイズでバインドされます(デフォルトの 1024 エクスチェンジ)。
  • バッファーは常に禁止されるため、例外を出力する代わりにバッファーが満杯である間に Disruptor のデフォルト動作がブロックされます。このデフォルトの動作は コンポーネントに設定できます(オプションを参照)。
  • Disruptor 予約は BrowsableEndpoint インターフェイスを実装しません。そのため、現在 Disruptor にあるエクスチェンジを取得できず、交換の量のみを取得できます。
  • Disruptor では、コンシューマー(マルチキャストの有無)を静的に設定する必要があります。オンザフライでコンシューマーを追加または削除するには、Diruptor の保留中のすべてのエクスチェンジを完全にフラッシュする必要があります。
  • 再設定の結果:Disruptor で送信されるデータは直接処理され、コンシューマーが 1 つ以上ある場合は 'gone' になり、結合後に新しいエクスチェンジのみを取得します。
  • pollTimeout オプションは Disruptor コンポーネントではサポートされません。
  • プロデューサーが完全な Disruptor をブロックすると、スレッド割り込みに応答しません。
Maven ユーザーは、このコンポーネントの pom.xml に以下の依存関係を追加する必要があります。
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-disruptor</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>
Copy to Clipboard Toggle word wrap

URI 形式

 disruptor:someName[?options]
Copy to Clipboard Toggle word wrap
または
 disruptor-vm:someName[?options]
Copy to Clipboard Toggle word wrap
*someName* には、現在の CamelContext 内のエンドポイントを一意に識別する任意の文字列(または *disruptor-vm:* の場合のコンテキスト全体で)を指定できます。URI に、以下の形式でクエリーオプションを追加できます。
  ?option=value&option=value&...
Copy to Clipboard Toggle word wrap

オプション

以下のオプションはすべて、*disruptor:* および * disruptor-vm: *コンポーネントの両方に対して有効です。
Expand
名前 デフォルト 説明
size 1024 Disruptors リングバッファーの最大容量。2 の最も近い累乗に事実上増加します。注: このオプションを使用する場合、キュー名で最初のエンドポイントが作成され、サイズを決定するのは最小限です。すべてのエンドポイントが同じサイズを使用するようにするには、すべてのエンドポイントで size オプションを設定するか、作成される最初のエンドポイントを設定します。
bufferSize コンポーネントのみ: Disruptors リングバッファーのデフォルトサイズ(保持可能なメッセージ数の容量)。このオプションは、サイズが使用されていない場合に使用されます。
queueSize コンポーネントのみ: SEDA コンポーネントとの最大互換性を維持するために <em>bufferSize</em> を指定する追加のオプション。
concurrentConsumers 1 同時スレッド処理エクスチェンジの数。
waitForTaskToComplete IfReplyExpected 非同期タスクが完了するまで呼び出し元が待機するかどうかを指定するオプション。AlwaysNever、または IfReplyExpected の 3 つのオプションがサポートされます。最初の 2 つの値は自己説明です。最後の値 IfReplyExpected は、メッセージが RequestReply-based の場合にのみ待機します。非同期メッセージングの詳細は、非同期 メッセージング を参照してください。
timeout 30000 プロデューサーが非同期タスクが完了するまで待機するタイムアウト(ミリ秒単位)。詳細は、waitForTaskToComplete および Async を参照してください。0 または負の値を使用して、タイムアウトを無効にすることができます。
defaultMultipleConsumers コンポーネントのみ: multipleConsumers が指定されていない場合に使用されるこのコンポンサーによって作成されるエンドポイントに、複数のコンシューマーのデフォルト許可を設定できます。
multipleConsumers
false
複数のコンシューマーを許可するかどうかを指定します。有効にすると、Publish-Subscribe メッセージングに Disruptor を使用できます。つまり、SEDA キューにメッセージを送信し、各コンシューマーがメッセージのコピーを受け取ることができます。有効にすると、すべてのコンシューマーエンドポイントでこのオプションを指定する必要があります。
limitConcurrentConsumers true concurrentConsumers 数を最大 500 に制限するかどうか。デフォルトでは、Disruptor エンドポイントがより大きな数値で設定されている場合、例外が発生します。このオプションをオフにすると、そのチェックを無効にできます。
blockWhenFull true メッセージを full Disruptor に送信するスレッドが、リングバッファーの容量が枯渇しなくなるまでブロックされるかどうか。デフォルトでは、呼び出し元のスレッドは、メッセージが受け入れられるまでブロックおよび待機します。このオプションを無効にすると、キューが満杯であることを示す例外が出力されます。
defaultBlockWhenFull Component only: blockWhenFull が指定されていない場合に使用されるこのコンポントによって作成されたエンドポイントに対してリングバッファーが満杯になると、デフォルトのプロデューサー動作を設定できます。
waitStrategy Blocking 新しいエクスチェンジが公開されるのを待つコンシューマースレッドによって使用されるストラテジーを定義します。許可されるオプションは、BlockingSleepingBusySpin、および Yielding です。この件に関する詳細は、以下のセクションを参照してください。
defaultWaitStrategy Component only: waitStrategy が指定されていない場合に使用されるこのコンポントによって作成されたエンドポイントにデフォルトの待機ストラテジーを設定できます。
producerType multi
Disruptor で許可されるプロデューサーを定義します。許可されるオプションは、複数のプロデューサーと Single が、特定の最適化を有効にすることができることです(1 つのスレッドまたは他のスレッド上、または同期されている場合)。

待機ストラテジー

待機ストラテジーは、次のエクスチェンジの公開を待機しているコンシューマースレッドによって実行される待機の種類に影響します。以下のストラテジーを選択できます。
Expand
名前 説明 アドバイス
Blocking バリアで待機している Consumers に lock および condition 変数を使用するブロッキングストラテジー。 このストラテジーは、CPU リソースほどスループットと低レイテンシーが重要ではない場合に使用できます。
スリープ状態 最初にスピンアップし、Thread.yield ()を使用し、最終的には OS と JVM の最小数で、コンシューマーがバリアで待機している間に許可されるスリープストラテジー。 このストラテジーは、パフォーマンスと CPU リソース間の妥協です。レイテンシーの急増は、quiet 期間の後に発生する可能性があります。
BusySpin バリアで待機している Consumers にビジースピンループを使用する busy Spin ストラテジー。 このストラテジーは、CPU リソースを使用して、レイテンシージッターを引き起こす可能性のある syscall を回避します。スレッドを特定の CPU コアにバインドできる場合に最適です。
yielding 初回の回転後にバリアで待機した Consumers に Thread.yield ()を使用するストラテジーを生成します。 このストラテジーは、レイテンシーが大幅に急増することなく、パフォーマンスと CPU リソース間の妥協です。

リクエスト応答の使用

Disruptor コンポーネントは RequestReply の使用をサポートします。ここでは、呼び出し元は Async ルートが完了するまで待機します。以下に例を示します。
from("mina:tcp://0.0.0.0:9876?textline=true&sync=true").to("disruptor:input");
from("disruptor:input").to("bean:processInput").to("bean:createResponse");
Copy to Clipboard Toggle word wrap
上記のルートでは、受信リクエストを受け入れるポート 9876 の TCP リスナーがあります。リクエストは disruptor:input バッファーにルーティングされます。RequestReply メッセージであるため、応答を待ちます。disruptor:input バッファーのコンシューマーが完了すると、応答が元のメッセージの応答にコピーされます。

同時コンシューマー

デフォルトでは、Diruptor エンドポイントは単一のコンシューマースレッドを使用しますが、同時コンシューマースレッドを使用するように設定できます。そのため、スレッドプールの代わりに以下を使用できます。
from("disruptor:stageName?concurrentConsumers=5").process(...)
Copy to Clipboard Toggle word wrap
2 つの違いは、スレッドプールは負荷に応じて動的に拡大/縮小する可能性がある一方で、同時コンシューマーの数は常に内部的に固定され、サポートされるため、パフォーマンスは高くなります。

スレッドプール

以下のような手順を実行して、スレッドプールを Disruptor エンドポイントに追加することに注意してください。
from("disruptor:stageName").thread(5).process(...)
Copy to Clipboard Toggle word wrap
は、Diruptor を使用してパフォーマンスの一部を効果的に否定して、通常の BlockingQueue を追加して Disruptor と組み合わせて使用することで、優先できます。代わりに、concurrentConsumers オプションを使用して Disruptor エンドポイントでメッセージを処理するスレッド数を直接設定することが推奨されます。

以下のルートでは、Diruptor を使用してこの非同期キューにリクエストを送信し、別のスレッドでさらに処理するために fire-and-forget メッセージを送信でき、このスレッドの定数応答を元の呼び出し元に返します。
public void configure() throws Exception {
    from("direct:start")
        // send it to the disruptor that is async
        .to("disruptor:next")
        // return a constant response
        .transform(constant("OK"));

    from("disruptor:next").to("mock:result");
}
Copy to Clipboard Toggle word wrap
ここでは、Hello World メッセージを送信し、応答が OK であることが想定されます。
Object out = template.requestBody("direct:start", "Hello World");
assertEquals("OK", out);
Copy to Clipboard Toggle word wrap
Hello World メッセージは、さらに処理するために、別のスレッドから Disruptor から取得されます。これはユニットテストからのものであるため、ユニットテストでアサーションを実行できるモックエンドポイントに送信されます。

multipleConsumers の使用

この例では、2 つのコンシューマーを定義し、Spring Bean として登録しました。
<!-- define the consumers as spring beans -->
<bean id="consumer1" class="org.apache.camel.spring.example.FooEventConsumer"/>

<bean id="consumer2" class="org.apache.camel.spring.example.AnotherFooEventConsumer"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <!-- define a shared endpoint which the consumers can refer to instead of using url -->
    <endpoint id="foo" uri="disruptor:foo?multipleConsumers=true"/>
</camelContext>
Copy to Clipboard Toggle word wrap
Disruptor foo エンドポイントで multipleConsumers=true を指定しているため、これらの 2 つ以上のコンシューマーは、種類の pub-sub スタイルのメッセージングとしてメッセージの独自のコピーを受け取ることができます。Bean はユニットテストの一部であるため、単にモックエンドポイントにメッセージを送信しますが、@Consume を使用して Disruptor から消費する方法に注意してください。
public class FooEventConsumer {

    @EndpointInject(uri = "mock:result")
    private ProducerTemplate destination;

    @Consume(ref = "foo")
    public void doSomething(String body) {
        destination.sendBody("foo" + body);
    }

}
Copy to Clipboard Toggle word wrap

中断情報の抽出

必要な場合は、以下のように JMX を使用せずにバッファーサイズなどの情報を取得できます。
DisruptorEndpoint disruptor = context.getEndpoint("disruptor:xxxx");
int size = disruptor.getBufferSize();
Copy to Clipboard Toggle word wrap
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat