307.6. 高度な使用上の注意
307.6.1. Plugable 接続リソース管理
SJMS は、組み込みの接続プールを通じて JMS 接続
リソース管理を提供します。これにより、サードパーティーの API プーリングロジックに依存する必要がなくなります。ただし、J2EE または OSGi コンテナーによって提供されるものなど、外部接続リソースマネージャーを使用する必要がある場合があります。このため、SJMS は、内部 SJMS 接続プール機能をオーバーライドするために使用できるインターフェイスを提供します。これは、ConnectionResource
インターフェイスを通じて実現されます。
ConnectionResource
は、SJMS コンポーネントに 接続
プールを提供するために使用される契約であり、必要に応じて接続を借用および返却するためのメソッドを提供します。ユーザーは、SJMS を外部接続プーリングマネージャーと統合する必要がある場合に使用する必要があります。
ただし、標準の ConnectionFactory
プロバイダーの場合は、SJMS で提供される ConnectionFactoryResource
実装をそのまま使用するか、このコンポーネント用に最適化して拡張することを推奨します。
以下は、ActiveMQ PooledConnectionFactory
でプラグ可能な ConnectionResource を使用する例です。
public class AMQConnectionResource implements ConnectionResource { private PooledConnectionFactory pcf; public AMQConnectionResource(String connectString, int maxConnections) { super(); pcf = new PooledConnectionFactory(connectString); pcf.setMaxConnections(maxConnections); pcf.start(); } public void stop() { pcf.stop(); } @Override public Connection borrowConnection() throws Exception { Connection answer = pcf.createConnection(); answer.start(); return answer; } @Override public Connection borrowConnection(long timeout) throws Exception { // SNIPPED... } @Override public void returnConnection(Connection connection) throws Exception { // Do nothing since there isn't a way to return a Connection // to the instance of PooledConnectionFactory log.info("Connection returned"); } }
次に、ConnectionResource
を SjmsComponent
に渡します。
CamelContext camelContext = new DefaultCamelContext(); AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1); SjmsComponent component = new SjmsComponent(); component.setConnectionResource(pool); camelContext.addComponent("sjms", component);
完全な使用例を確認するには、ConnectionResourceIT
を参照してください。
307.6.2. バッチメッセージのサポート
SjmsProducer は、List
をカプセル化する Exchange を作成することにより、メッセージのコレクションの発行をサポートします。この SjmsProducer は、List の内容を繰り返し処理し、各メッセージを個別に公開します。
メッセージのバッチを生成するときに、各メッセージに固有のヘッダーを設定する必要がある場合は、SJMS BatchMessage
クラスを使用できます。SjmsProducer が BatchMessage
リストに遭遇すると、各 BatchMessage
を反復処理し、含まれているペイロードとヘッダーを公開します。
以下は、BatchMessage クラスの使用例です。まず、BatchMessage
のリストを作成します:
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); for (int i = 1; i <= messageCount; i++) { String body = "Hello World " + i; BatchMessage<String> message = new BatchMessage<String>(body, null); messages.add(message); }
次に、リストを公開します。
template.sendBody("sjms:queue:batch.queue", messages);
307.6.3. カスタマイズ可能なトランザクションコミットストラテジー (ローカル JMS トランザクションのみ)
SJMS は、TransactionCommitStrategy
インターフェイスを使用して、カスタムでプラグ可能なトランザクションストラテジーを作成する手段を開発者に提供します。これにより、ユーザーは、SessionTransactionSynchronization が
セッションをいつコミットするかを決定するために使用する一連の固有の状況を定義できます。その使用例は、次のセクションで詳しく説明する BatchTransactionCommitStrategy
です。
307.6.4. トランザクションバッチのコンシューマーとプロデューサー
SJMS コンポーネントは、プロデューサーエンドポイントとコンシューマーエンドポイントの両方でローカル JMS トランザクションのバッチ処理をサポートするように設計されています。ただし、それらがそれぞれでどのように処理されるかは非常に異なります。
SJMS コンシューマーエンドポイントは、X メッセージを関連するセッションでコミットする前に処理する単純な実装です。コンシューマーでバッチ処理されたトランザクションを有効にするには、まず、transacted
パラメーターを true に設定してトランザクションを有効にし、次に transactionBatchCount
を追加して 0 より大きい任意の値に設定します。たとえば、次の設定では、10 メッセージごとにセッションがコミットされます。
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10
コンシューマーエンドポイントでのバッチの処理中に例外が発生した場合、セッションロールバックが呼び出され、メッセージが次に使用可能なコンシューマーに再配信されます。関連するセッションの BatchTransactionCommitStrategy
のカウンターも 0 にリセットされます。JMSRedelivered ヘッダーが true に設定されたメッセージを監視するために、バッチメッセージのプロセッサーにフックを配置することは、ユーザーの責任です。これは、メッセージがある時点でロールバックされたこと、および処理が成功したことを確認する必要があることを示しています。
トランザクション処理されたバッチコンシューマーには、セッションで開いているトランザクションをコミットする前にメッセージ間でデフォルトの時間 (5000 ミリ秒) 待機する内部タイマーのインスタンスも含まれます。デフォルト値の 5000 ミリ秒 (最小 1000 ミリ秒) は、ほとんどのユースケースに適していますが、さらに調整が必要な場合は、単に transactionBatchTimeout
パラメーターを設定してください。
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000
受け入れられる最小値は 1000 ミリ秒です。これは、コンテキスト切り替えの量が不要なパフォーマンスへの影響をもたらし、メリットが得られない可能性があるためです。
ただし、プロデューサーエンドポイントの処理方法は大きく異なります。プロデューサでは、各メッセージが宛先に配信された後、Exchange が閉じられ、そのメッセージへの参照がなくなります。再配信可能なすべてのメッセージを利用できるようにするには、BatchMessage をパブリッシュしているプロデューサーエンドポイントでトランザクションを有効にするだけです。トランザクションは、バッチリスト内のすべてのメッセージを含む交換の最後にコミットされます。追加の設定は必要ありません。以下に例を示します。
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); for (int i = 1; i <= messageCount; i++) { String body = "Hello World " + i; BatchMessage<String> message = new BatchMessage<String>(body, null); messages.add(message); }
トランザクションを有効にしてリストを公開します。
template.sendBody("sjms:queue:batch.queue?transacted=true", messages);