11.7. Durable Subscriber
概要
図11.6「Durable Subscriber パターン」 に示されているように、永続サブスクライバー は、特定の 「Publish-Subscribe Channel」 チャネルで送信されたすべてのメッセージを受信したいコンシューマーです。これには、コンシューマーがメッセージングシステムから切断されている間に送信されたメッセージも含まれます。そのためには、メッセージングシステムが切断されたコンシューマーに対して後でリプレイするためにメッセージを保存しておく必要があります。また、コンシューマー側には永続サブスクリプションの確立が必要であることを示すメカニズムも必要です。通常、パブリッシュサブスクライブチャネル (またはトピック) には、永続サブスクライバーと非永続サブスクライバーの両方を設定できます。それぞれ以下のように動作します。
- 非永続サブスクライバー - 接続 と 切断 の 2 つの状態を持つことができます。非永続サブスクライバーがトピックに接続されている間は、トピックのすべてのメッセージをリアルタイムで受信します。しかし、非永続サブスクライバーが切断されている間は、トピックに送信されたメッセージを一切受信しません。
- 永続的サブスクライバー - 接続 と 非アクティブ の 2 つの状態を持つことができます。非アクティブ状態とは、永続サブスクライバーはトピックから切断されているものの、その間に到達したメッセージを受信するつもりであることを意味します。永続サブスクライバーがトピックに再接続すると、非アクティブ時に送信されたすべてのメッセージがリプレイされます。
図11.6 Durable Subscriber パターン
JMS 永続サブスクライバー
JMS コンポーネントは Durable Subscriber パターンを実装しています。JMS エンドポイントに永続サブスクリプションを設定するには、この接続を識別するための クライアント ID と永続サブスクライバーを識別するための 永続サブスクリプション名 を指定する必要があります。たとえば、以下のルートは、クライアント ID が conn01
で、永続サブスクリプション名が John.Doe
の JMS トピック news
に永続サブスクリプションを設定します。
from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe"). to("cxf:bean:newsprocessor");
ActiveMQ エンドポイントを使用して永続サブスクリプションを設定することもできます。
from("activemq:topic:news?clientId=conn01&durableSubscriptionName=John.Doe"). to("cxf:bean:newsprocessor");
受信メッセージを同時に処理したい場合は、SEDA エンドポイントを使用して、以下のように複数の並列セグメントにルートを分散させることができます。
from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe"). to("seda:fanout"); from("seda:fanout").to("cxf:bean:newsproc01"); from("seda:fanout").to("cxf:bean:newsproc02"); from("seda:fanout").to("cxf:bean:newsproc03");
SEDA コンポーネントは Competing Consumers パターンをサポートするため、各メッセージは一度だけ処理されます。
代替例
別の方法として、「Message Dispatcher」 または 「Content-Based Router」 を永続サブスクライバーの File または JPA コンポーネントと組み合わせ、非永続サブスクライバーの SEDA のようなコンポーネントと組み合わせる方法もあります。
以下は、JMS トピックへの永続サブスクライバーを作成する簡単な例です。
Fluent Builder (流れるようなビルダー) の使用
from("direct:start").to("activemq:topic:foo"); from("activemq:topic:foo?clientId=1&durableSubscriptionName=bar1").to("mock:result1"); from("activemq:topic:foo?clientId=2&durableSubscriptionName=bar2").to("mock:result2");
<route> <from uri="direct:start"/> <to uri="activemq:topic:foo"/> </route> <route> <from uri="activemq:topic:foo?clientId=1&durableSubscriptionName=bar1"/> <to uri="mock:result1"/> </route> <route> <from uri="activemq:topic:foo?clientId=2&durableSubscriptionName=bar2"/> <to uri="mock:result2"/> </route>
以下は、JMS 永続サブスクライバーのもう 1 つの例ですが、今回は 仮想トピック を使用します(AMQ で永続サブスクリプションで推奨)。
Fluent Builder (流れるようなビルダー) の使用
from("direct:start").to("activemq:topic:VirtualTopic.foo"); from("activemq:queue:Consumer.1.VirtualTopic.foo").to("mock:result1"); from("activemq:queue:Consumer.2.VirtualTopic.foo").to("mock:result2");
<route> <from uri="direct:start"/> <to uri="activemq:topic:VirtualTopic.foo"/> </route> <route> <from uri="activemq:queue:Consumer.1.VirtualTopic.foo"/> <to uri="mock:result1"/> </route> <route> <from uri="activemq:queue:Consumer.2.VirtualTopic.foo"/> <to uri="mock:result2"/> </route>