11.4. Competing Consumers
概要
図11.3「Competing Consumers パターン」 に示される Competing Consumers パターンは、複数のコンシューマーが同じキューからメッセージを引き出すことを可能にしつつ、各メッセージはたった一度だけ消費される ことを保証します。このパターンを使用すると、逐次的なメッセージ処理を並行的なメッセージ処理に置き換えることができます (結果として応答レイテンシーの減少をもたらします) 。
図11.3 Competing Consumers パターン
以下のコンポーネントは、Competing Consumers パターンの例になります。
JMS ベースの競合コンシューマー
通常の JMS キューは、各メッセージがたった一度だけ消費されることを暗黙的に保証しています。そのため、JMS キューは自動的に Competing Consumers パターンをサポートします。たとえば、以下のように、JMS キュー HighVolumeQ
からメッセージを引き出す 3 つの Competing Consumers を定義できます。
from("jms:HighVolumeQ").to("cxf:bean:replica01"); from("jms:HighVolumeQ").to("cxf:bean:replica02"); from("jms:HighVolumeQ").to("cxf:bean:replica03");
ここでは、CXF (Web サービス) エンドポイント replica01
、replica02
、および replica03
は、HighVolumeQ
キューからのメッセージを並行して処理します。
もう 1 つの方法として、JMS クエリーオプション concurrentConsumers
を設定して、Competing Consumers のスレッドプールを作成することもできます。たとえば、以下のルートは、指定されたキューからメッセージを取得する 3 つの競合スレッドのプールを作成します。
from("jms:HighVolumeQ?concurrentConsumers=3").to("cxf:bean:replica01");
concurrentConsumers
オプションは、以下のように XML DSL においても指定することができます。
<route> <from uri="jms:HighVolumeQ?concurrentConsumers=3"/> <to uri="cxf:bean:replica01"/> </route>
JMS トピックは Competing Consumers パターンをサポート しません。定義上、JMS トピックは、同じメッセージの複数のコピーを異なるコンシューマーに送信することを目的としています。したがって、Competing Consumers パターンとは互換性がありません。
SEDA ベースの競合コンシューマー
SEDA コンポーネントの目的は、計算を複数のステージに分割することで並行処理を単純化することです。SEDA エンドポイントは本質的に、インメモリーのブロッキングキュー (java.util.concurrent.BlockingQueue
で実装された) をカプセル化します。そのため、SEDA エンドポイントを使用してルートを複数のステージに分割し、各ステージでは複数のスレッドを使用することもできます。たとえば、以下のように 2 つのステージで設定される SEDA ルートを定義できます。
// Stage 1: Read messages from file system. from("file://var/messages").to("seda:fanout"); // Stage 2: Perform concurrent processing (3 threads). from("seda:fanout").to("cxf:bean:replica01"); from("seda:fanout").to("cxf:bean:replica02"); from("seda:fanout").to("cxf:bean:replica03");
最初のステージには、ファイルエンドポイント file://var/messages
からのメッセージを消費し、それらを SEDA エンドポイント seda:fanout
にルーティングする単一のスレッドが含まれています。2 番目のステージには、3 つのスレッドが含まれます。エクスチェンジを cxf:bean:replica01
にルーティングするスレッド、エクスチェンジを cxf:bean:replica02
にルーティングするスレッド、そしてエクスチェンジを cxf:bean:replica03
にルーティングするスレッドです。これら 3 つのスレッドは、ブロッキングキューを使用して実装された SEDA エンドポイントから、エクスチェンジインスタンスを取得するために競合します。ブロッキングキューはロックを使用して一度に複数のスレッドがキューにアクセスするのを防ぐため、エクスチェンジインスタンスは一度だけ消費されることが保証されます。
SEDA エンドポイントと thread()
によって作成されたスレッドプールとの違いについては、Apache Camel コンポーネントリファレンスガイド の SEDA コンポーネント を参照してください。