Fuse 6 is no longer supported
As of February 2025, Red Hat Fuse 6 is no longer supported. If you are using Fuse 6, please upgrade to Red Hat build of Apache Camel.第36章 Disruptor
Disruptor コンポーネント リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
Camel 2.12 以降で利用可能
中断: コンポーネントは、標準の SEDA コンポーネントと同様に非同期 SEDA 機能を提供しますが、標準の SEDA で使用される BlockingQueue の代わりに Disruptor を使用します。または、このコンポーネントで disruptor-vm: エンドポイントをサポートし、標準の 仮想マシン の代替手段を提供します。SEDA コンポーネントと同様に、中断のバッファー: エンドポイントは 単一 の CamelContext 内でのみ表示され、永続性またはリカバリーに対するサポートは提供されません。*disruptor-vm:* エンドポイントは、CamelContexts インスタンス間の通信もサポートします。そのため、このメカニズムを使用して Web アプリケーション全体で通信を行うことができます( camel-disruptor.jar が システム/ブート クラスパス上にある場合)。
SEDA または VM コンポーネントで Disruptor コンポーネントを使用することを選択する主な利点は、プロデューサーとマルチキャストされたコンシューマーまたは同時コンシューマーの間の競合が高いユースケースのパフォーマンスです。このような場合、スループットが大幅に増加し、レイテンシーが減少しています。競合のないシナリオのパフォーマンスは、SEDA および VM コンポーネントと類似しています。
Disruptor は、可能な限り SEDA および VM コンポーネントの動作を模倣して実装されます。それらの相違点は、以下のとおりです。
- 使用されるバッファーは、常にサイズでバインドされます(デフォルトの 1024 エクスチェンジ)。
- バッファーは常に禁止されるため、例外を出力する代わりにバッファーが満杯である間に Disruptor のデフォルト動作がブロックされます。このデフォルトの動作は コンポーネントに設定できます(オプションを参照)。
- Disruptor 予約は BrowsableEndpoint インターフェイスを実装しません。そのため、現在 Disruptor にあるエクスチェンジを取得できず、交換の量のみを取得できます。
- Disruptor では、コンシューマー(マルチキャストの有無)を静的に設定する必要があります。オンザフライでコンシューマーを追加または削除するには、Diruptor の保留中のすべてのエクスチェンジを完全にフラッシュする必要があります。
- 再設定の結果:Disruptor で送信されるデータは直接処理され、コンシューマーが 1 つ以上ある場合は 'gone' になり、結合後に新しいエクスチェンジのみを取得します。
- pollTimeout オプションは Disruptor コンポーネントではサポートされません。
- プロデューサーが完全な Disruptor をブロックすると、スレッド割り込みに応答しません。
Maven ユーザーは、このコンポーネントの
pom.xml
に以下の依存関係を追加する必要があります。
URI 形式 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
disruptor:someName[?options]
disruptor:someName[?options]
または
disruptor-vm:someName[?options]
disruptor-vm:someName[?options]
*someName* には、現在の CamelContext 内のエンドポイントを一意に識別する任意の文字列(または *disruptor-vm:* の場合のコンテキスト全体で)を指定できます。URI に、以下の形式でクエリーオプションを追加できます。
?option=value&option=value&...
?option=value&option=value&...
オプション リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
以下のオプションはすべて、*disruptor:* および * disruptor-vm: *コンポーネントの両方に対して有効です。
名前 | デフォルト | 説明 |
---|---|---|
size | 1024 | Disruptors リングバッファーの最大容量。2 の最も近い累乗に事実上増加します。注: このオプションを使用する場合、キュー名で最初のエンドポイントが作成され、サイズを決定するのは最小限です。すべてのエンドポイントが同じサイズを使用するようにするには、すべてのエンドポイントで size オプションを設定するか、作成される最初のエンドポイントを設定します。 |
bufferSize | コンポーネントのみ: Disruptors リングバッファーのデフォルトサイズ(保持可能なメッセージ数の容量)。このオプションは、サイズが使用されていない場合に使用されます。 | |
queueSize | コンポーネントのみ: SEDA コンポーネントとの最大互換性を維持するために <em>bufferSize</em> を指定する追加のオプション。 | |
concurrentConsumers | 1 | 同時スレッド処理エクスチェンジの数。 |
waitForTaskToComplete | IfReplyExpected | 非同期タスクが完了するまで呼び出し元が待機するかどうかを指定するオプション。Always、Never、または IfReplyExpected の 3 つのオプションがサポートされます。最初の 2 つの値は自己説明です。最後の値 IfReplyExpected は、メッセージが RequestReply-based の場合にのみ待機します。非同期メッセージングの詳細は、非同期 メッセージング を参照してください。 |
timeout | 30000 | プロデューサーが非同期タスクが完了するまで待機するタイムアウト(ミリ秒単位)。詳細は、waitForTaskToComplete および Async を参照してください。0 または負の値を使用して、タイムアウトを無効にすることができます。 |
defaultMultipleConsumers | コンポーネントのみ: multipleConsumers が指定されていない場合に使用されるこのコンポンサーによって作成されるエンドポイントに、複数のコンシューマーのデフォルト許可を設定できます。 | |
multipleConsumers
|
false
|
複数のコンシューマーを許可するかどうかを指定します。有効にすると、Publish-Subscribe メッセージングに Disruptor を使用できます。つまり、SEDA キューにメッセージを送信し、各コンシューマーがメッセージのコピーを受け取ることができます。有効にすると、すべてのコンシューマーエンドポイントでこのオプションを指定する必要があります。
|
limitConcurrentConsumers | true | concurrentConsumers 数を最大 500 に制限するかどうか。デフォルトでは、Disruptor エンドポイントがより大きな数値で設定されている場合、例外が発生します。このオプションをオフにすると、そのチェックを無効にできます。 |
blockWhenFull | true | メッセージを full Disruptor に送信するスレッドが、リングバッファーの容量が枯渇しなくなるまでブロックされるかどうか。デフォルトでは、呼び出し元のスレッドは、メッセージが受け入れられるまでブロックおよび待機します。このオプションを無効にすると、キューが満杯であることを示す例外が出力されます。 |
defaultBlockWhenFull | Component only: blockWhenFull が指定されていない場合に使用されるこのコンポントによって作成されたエンドポイントに対してリングバッファーが満杯になると、デフォルトのプロデューサー動作を設定できます。 | |
waitStrategy | Blocking | 新しいエクスチェンジが公開されるのを待つコンシューマースレッドによって使用されるストラテジーを定義します。許可されるオプションは、Blocking、Sleeping、BusySpin、および Yielding です。この件に関する詳細は、以下のセクションを参照してください。 |
defaultWaitStrategy | Component only: waitStrategy が指定されていない場合に使用されるこのコンポントによって作成されたエンドポイントにデフォルトの待機ストラテジーを設定できます。 | |
producerType | multi |
Disruptor で許可されるプロデューサーを定義します。許可されるオプションは、複数のプロデューサーと Single が、特定の最適化を有効にすることができることです(1 つのスレッドまたは他のスレッド上、または同期されている場合)。
|
待機ストラテジー リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
待機ストラテジーは、次のエクスチェンジの公開を待機しているコンシューマースレッドによって実行される待機の種類に影響します。以下のストラテジーを選択できます。
名前 | 説明 | アドバイス |
---|---|---|
Blocking | バリアで待機している Consumers に lock および condition 変数を使用するブロッキングストラテジー。 | このストラテジーは、CPU リソースほどスループットと低レイテンシーが重要ではない場合に使用できます。 |
スリープ状態 | 最初にスピンアップし、Thread.yield ()を使用し、最終的には OS と JVM の最小数で、コンシューマーがバリアで待機している間に許可されるスリープストラテジー。 | このストラテジーは、パフォーマンスと CPU リソース間の妥協です。レイテンシーの急増は、quiet 期間の後に発生する可能性があります。 |
BusySpin | バリアで待機している Consumers にビジースピンループを使用する busy Spin ストラテジー。 | このストラテジーは、CPU リソースを使用して、レイテンシージッターを引き起こす可能性のある syscall を回避します。スレッドを特定の CPU コアにバインドできる場合に最適です。 |
yielding | 初回の回転後にバリアで待機した Consumers に Thread.yield ()を使用するストラテジーを生成します。 | このストラテジーは、レイテンシーが大幅に急増することなく、パフォーマンスと CPU リソース間の妥協です。 |
リクエスト応答の使用 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
Disruptor コンポーネントは RequestReply の使用をサポートします。ここでは、呼び出し元は Async ルートが完了するまで待機します。以下に例を示します。
from("mina:tcp://0.0.0.0:9876?textline=true&sync=true").to("disruptor:input"); from("disruptor:input").to("bean:processInput").to("bean:createResponse");
from("mina:tcp://0.0.0.0:9876?textline=true&sync=true").to("disruptor:input");
from("disruptor:input").to("bean:processInput").to("bean:createResponse");
上記のルートでは、受信リクエストを受け入れるポート 9876 の TCP リスナーがあります。リクエストは disruptor:input バッファーにルーティングされます。RequestReply メッセージであるため、応答を待ちます。disruptor:input バッファーのコンシューマーが完了すると、応答が元のメッセージの応答にコピーされます。
同時コンシューマー リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Diruptor エンドポイントは単一のコンシューマースレッドを使用しますが、同時コンシューマースレッドを使用するように設定できます。そのため、スレッドプールの代わりに以下を使用できます。
from("disruptor:stageName?concurrentConsumers=5").process(...)
from("disruptor:stageName?concurrentConsumers=5").process(...)
2 つの違いは、スレッドプールは負荷に応じて動的に拡大/縮小する可能性がある一方で、同時コンシューマーの数は常に内部的に固定され、サポートされるため、パフォーマンスは高くなります。
スレッドプール リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
以下のような手順を実行して、スレッドプールを Disruptor エンドポイントに追加することに注意してください。
from("disruptor:stageName").thread(5).process(...)
from("disruptor:stageName").thread(5).process(...)
は、Diruptor を使用してパフォーマンスの一部を効果的に否定して、通常の BlockingQueue を追加して Disruptor と組み合わせて使用することで、優先できます。代わりに、concurrentConsumers オプションを使用して Disruptor エンドポイントでメッセージを処理するスレッド数を直接設定することが推奨されます。
例 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
以下のルートでは、Diruptor を使用してこの非同期キューにリクエストを送信し、別のスレッドでさらに処理するために fire-and-forget メッセージを送信でき、このスレッドの定数応答を元の呼び出し元に返します。
ここでは、Hello World メッセージを送信し、応答が OK であることが想定されます。
Object out = template.requestBody("direct:start", "Hello World"); assertEquals("OK", out);
Object out = template.requestBody("direct:start", "Hello World");
assertEquals("OK", out);
Hello World メッセージは、さらに処理するために、別のスレッドから Disruptor から取得されます。これはユニットテストからのものであるため、ユニットテストでアサーションを実行できるモックエンドポイントに送信されます。
multipleConsumers の使用 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
この例では、2 つのコンシューマーを定義し、Spring Bean として登録しました。
Disruptor foo エンドポイントで multipleConsumers=true を指定しているため、これらの 2 つ以上のコンシューマーは、種類の pub-sub スタイルのメッセージングとしてメッセージの独自のコピーを受け取ることができます。Bean はユニットテストの一部であるため、単にモックエンドポイントにメッセージを送信しますが、@Consume を使用して Disruptor から消費する方法に注意してください。
中断情報の抽出 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
必要な場合は、以下のように JMX を使用せずにバッファーサイズなどの情報を取得できます。
DisruptorEndpoint disruptor = context.getEndpoint("disruptor:xxxx"); int size = disruptor.getBufferSize();
DisruptorEndpoint disruptor = context.getEndpoint("disruptor:xxxx");
int size = disruptor.getBufferSize();