第150章 SJMS


SJMS コンポーネント

Camel 2.11 から利用可能
Simple JMS Component (SJMS)は、JMS クライアントの作成と設定に関してよく知られているベストプラクティスを使用する Camel で使用するための JMS クライアントです。SJMS には、Camel 向けに明示的に作成された新しい JMS クライアント API が含まれており、軽量で回復力のあるサードパーティーのメッセージング実装を排除します。これに含まれる機能を以下に示します。
  • 標準キューとトピックサポート(Durable & Non-Durable)
  • InOnly & InOut MEPのサポート
  • 非同期プロデューサーおよびコンシューマー処理
  • 内部 JMS トランザクションサポート
その他の主な機能は以下のとおりです。
  • プラグイン可能な接続リソース管理
  • セッション、コンシューマー、プロデューサープーリングおよびキャッシング管理
  • バッチコンシューマーおよびプロデューサー
  • トランザクションバッチコンシューマーおよびプロデューサー
  • カスタマイズ可能なトランザクションのコミットストラテジーのサポート(ローカル JMS トランザクションのみ)
SJMS の S 理由
s は Simple および Standard および Springless の略です。また、camel-jms はすでに取得されています。:-)
警告
これは、複雑な JMS メッセージングにおける非常に新しいコンポーネントです。そのため、このコンポーネントは継続的に開発および強化されています。Spring JMS をベースとする従来の JMS コンポーネントは、強化され、広範囲にテストされました。
Maven ユーザーは、このコンポーネントの pom.xml に以下の依存関係を追加する必要があります。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-sjms</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
Copy to Clipboard Toggle word wrap

URI 形式

sjms:[queue:|topic:]destinationName[?options]
Copy to Clipboard Toggle word wrap
destinationName は JMS キューまたはトピック名です。デフォルトでは、destinationName はキュー名として解釈されます。たとえば、キューに接続するには、FOO.BAR を次のように使用します。
sjms:FOO.BAR
Copy to Clipboard Toggle word wrap
必要に応じて、オプションの queue: 接頭辞を含めることができます。
sjms:queue:FOO.BAR
Copy to Clipboard Toggle word wrap
トピックに接続するには、topic: 接頭辞を含める 必要があり ます。たとえば、Stocks.Prices トピックに接続するには、以下を使用します。
sjms:topic:Stocks.Prices
Copy to Clipboard Toggle word wrap
?option=value&option=value&.. という形式を使用して、URI にクエリーオプションを追加します。

コンポーネントのオプションおよび設定

SJMS コンポーネントは以下の設定オプションをサポートします。
Expand
オプション 必須 デフォルト値 説明
connectionCount 1 このコンポーネントで起動したエンドポイントが使用できる接続の最大数
connectionFactory (/) null SjmsComponent を有効にするには ConnectionFactory が必要です。これは直接設定するか、ConnectionResource の一部として設定できます。
connectionResource null ConnectionResource は、ConnectionFactory のカスタマイズおよびコンテナー制御を可能にするインターフェイスです。詳細は、プラグイン可能な接続リソース管理 を参照してください。
headerFilterStrategy DefaultJmsKeyFormatStrategy
jmsKeyFormatStrategy DefaultJmsKeyFormatStrategy Camel 2.16: JMS キーをエンコードおよびデコードするための Pluggable ストラテジー。JMS 仕様に準拠できるようにします。Camel は、追加設定なしで defaultpassthrough の 2 つの実装を提供します。default ストラテジーは、ドットとハイフン(. および -)を安全にマーシャリングします。passthrough ストラテジーはキーをそのまま残します。JMS ヘッダーキーに不正な文字が含まれているかどうかは問題にならない JMS ブローカーに使用できます。org.apache.camel.component.jms.JmsKeyFormatStrategy の独自の実装を提供し、# 表記を使用して参照できます。
transactionCommitStrategy null
DestinationCreationStrategy DefaultDestinationCreationStrategy Camel 2.15.0: SJMS コンポーネントでカスタム DestinationCreationStrategy を設定できる ようにします。
messageCreatedStrategy Camel 2.16: Camel が JMS メッセージを送信するときに Camel が javax.jms.Message オブジェクトの新規インスタンスを作成する際に呼び出される指定の MessageCreatedStrategy を使用します。
以下は、必要な ConnectionFactory プロバイダーを使用して SjmsComponent を設定する方法の例になります。デフォルトで単一の接続を作成し、component 内部プーリング API を使用してこれを保存し、スレッドセーフな方法でセッション作成要求に対応できるようにします。
SjmsComponent component = new SjmsComponent();
component.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
getContext().addComponent("sjms", component);
Copy to Clipboard Toggle word wrap
永続サブスクリプションのサポートに必要な SjmsComponent の場合、デフォルトの ConnectionFactoryResource インスタンスを上書きし、clientId プロパティーを設定します。
ConnectionFactoryResource connectionResource = new ConnectionFactoryResource();
connectionResource.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
connectionResource.setClientId("myclient-id");

SjmsComponent component = new SjmsComponent();
component.setConnectionResource(connectionResource);
component.setMaxConnections(1);
Copy to Clipboard Toggle word wrap

プロデューサー設定オプション

SjmsProducer エンドポイントは以下のプロパティーをサポートします。
Expand
オプション デフォルト値 説明
acknowledgementMode AUTO_ACKNOWLEDGE SESSION_TRANSACTEDAUTO_ACKNOWLEDGE または DUPS_OK_ACKNOWLEDGE のいずれかである JMS の確認名。現時点では、CLIENT_ACKNOWLEDGE はサポートされません。
consumerCount 1 InOut のみ。応答コンシューマー用の MessageListener インスタンスの数を定義します。
exchangePattern InOnly Producers メッセージ交換パターンを設定します。
namedReplyTo null InOut のみ。応答の宛先への名前付き応答を指定します。
永続 true メッセージを永続化を有効にして配信するかどうか。
producerCount 1 MessageProducer インスタンスの数を定義します。
responseTimeOut 5000 InOut のみ。InOut Producer が応答を待つ時間を指定します。
同期 true Endpoint が同期または非同期処理を使用するかどうかを設定します。
transacted false エンドポイントが JMS セッショントランザクションを使用する場合。
ttl -1 デフォルトでは無効にされています。Message time を live ヘッダーに設定します。
prefillPool true Camel 2.14: 起動時にプロデューサー接続プールを事前に入力するか、または必要に応じてコネクションの遅延を作成するかどうか。
allowNullBody true Camel 2.15.1: ボディーのないメッセージの送信を許可するかどうか。false とメッセージボディーが null の場合は、JMSException が出力されます。
mapJmsMessage true Camel 2.16: Camel が受信した JMS メッセージを javax.jms.TextMessageString などの適切なペイロードタイプに自動的にマップするかどうかを指定します。
messageCreatedStrategy Camel 2.16: Camel が JMS メッセージを送信するときに Camel が javax.jms.Message オブジェクトの新規インスタンスを作成する際に呼び出される指定の MessageCreatedStrategy を使用します。
jmsKeyFormatStrategy Camel 2.16: JMS キーをエンコードおよびデコードするための Pluggable ストラテジー。JMS 仕様に準拠できるようにします。Camel は、追加設定なしで defaultpassthrough の 2 つの実装を提供します。default ストラテジーは、ドットとハイフン(. および -)を安全にマーシャリングします。passthrough ストラテジーはキーをそのまま残します。JMS ヘッダーキーに不正な文字が含まれているかどうかは問題にならない JMS ブローカーに使用できます。org.apache.camel.component.jms.JmsKeyFormatStrategy の独自の実装を提供し、# 表記を使用して参照できます。
includeAllJMSXProperties Camel 2.16: JMS から Camel Message にマッピングするときに、すべての JMSXxxx プロパティーを含めるかどうか。これを true に設定すると、JMSXAppIDJMSXUserID などのプロパティーが含まれます。注記:カスタムの headerFilterStrategy を使用している場合、このオプションは適用されません。

プロデューサーの使用

InOnly Producer -(デフォルト)

InOnly Producer は SJMS Producer Endpoint のデフォルトの動作です。
from("direct:start")
.to("sjms:queue:bar");
Copy to Clipboard Toggle word wrap

InOut プロデューサー

InOut 動作を有効にするには、URI に exchangePattern 属性を追加します。デフォルトでは、コンシューマーごとに専用の TemporaryQueue を使用します。
from("direct:start")
.to("sjms:queue:bar?exchangePattern=InOut");
Copy to Clipboard Toggle word wrap
namedReplyTo を指定できますが、これによりモニターポイントが改善されます。
from("direct:start")
.to("sjms:queue:bar?exchangePattern=InOut&namedReplyTo=my.reply.to.queue");
Copy to Clipboard Toggle word wrap

コンシューマー設定オプション

SjmsConsumer エンドポイントは以下のプロパティーをサポートします。
Expand
オプション デフォルト値 説明
acknowledgementMode AUTO_ACKNOWLEDGE TRANSACTEDAUTO_ACKNOWLEDGE または DUPS_OK_ACKNOWLEDGE のいずれかである JMS の確認名。現時点では、CLIENT_ACKNOWLEDGE はサポートされません。
consumerCount 1 MessageListener インスタンスの数を定義します。
durableSubscriptionId null 永続サブスクリプションに必要です。
exchangePattern InOnly Consumers メッセージ交換パターンを設定します。
messageSelector null メッセージセレクターを設定します。
同期 true Endpoint が同期または非同期処理を使用するかどうかを設定します。
transacted false エンドポイントが JMS セッショントランザクションを使用する場合。
transactionBatchCount 1 ローカル JMS トランザクションをコミットする前に処理するエクスチェンジの数。トランザクション プロパティーも true に設定する必要があります。そうしないと、このプロパティーは無視されます。
transactionBatchTimeout 5000 消費済み内容をコミットする前にトランザクションがメッセージ間で開いたままになる時間。最小値は 1000ms です。
ttl \-1 デフォルトでは無効にされています。Message time を live ヘッダーに設定します。

コンシューマーの使用

InOnly Consumer: (デフォルト)

InOnly Consumer は、SJMS コンシューマーエンドポイントのデフォルトのエクスチェンジ動作です。
from("sjms:queue:bar")
.to("mock:result");
Copy to Clipboard Toggle word wrap

InOut コンシューマー

InOut 動作を有効にするには、URI に exchangePattern 属性を追加します。
from("sjms:queue:in.out.test?exchangePattern=InOut")
.transform(constant("Bye Camel"));
Copy to Clipboard Toggle word wrap

高度な使用方法に関する注意点

プラグイン可能な接続リソース管理

SJMS は、ビルトイン 接続 プールを介して JMS 接続リソース管理を提供します。これにより、サードパーティーの API プーリングロジックに依存する必要がなくなります。ただし、J2EE や OSGi コンテナーなど、外部 Connection リソースマネージャーの使用が必要になる場合があります。この SJMS では、内部 SJMS 接続プール機能を上書きするために使用できるインターフェイスを提供します。これは ConnectionResource インターフェイスを介して実行されます。
ConnectionResource は、Connection pool を SJMS コンポーネントに提供するために使用されるコントラクトとして、必要に応じて接続を借用して返すメソッドを提供します。ユーザーは、SJMS を外部接続プールマネージャーと統合する必要がある場合に使用する必要があります。
標準の ConnectionFactory プロバイダーの場合は、このコンポーネントに対して最適化されているように SJMS で提供される ConnectionFactoryResource 実装を使用するか、または拡張することが推奨されます。
以下は、ActiveMQ PooledConnectionFactory でプラグ可能な ConnectionResource を使用する例です。
public class AMQConnectionResource implements ConnectionResource {
private PooledConnectionFactory pcf;

public AMQConnectionResource(String connectString, int maxConnections) {
super();
pcf = new PooledConnectionFactory(connectString);
pcf.setMaxConnections(maxConnections);
pcf.start();
}

public void stop() {
pcf.stop();
}

@Override
public Connection borrowConnection() throws Exception {
Connection answer = pcf.createConnection();
answer.start();
return answer;
}

@Override
public Connection borrowConnection(long timeout) throws Exception {
// SNIPPED...
}

@Override
public void returnConnection(Connection connection) throws Exception {
// Do nothing since there isn't a way to return a Connection
// to the instance of PooledConnectionFactory
log.info("Connection returned");
}
}
Copy to Clipboard Toggle word wrap
次に、ConnectionResource を SjmsComponent に渡します。
CamelContext camelContext = new DefaultCamelContext();
AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
SjmsComponent component = new SjmsComponent();
component.setConnectionResource(pool);
camelContext.addComponent("sjms", component);
Copy to Clipboard Toggle word wrap
その使用例をすべて確認するには、ConnectionResourceIT を参照してください。

セッション、コンシューマー、プロデューサープーリングおよびキャッシング管理

近日中にお待ちください ...

バッチメッセージのサポート

SjmsProducer は、List をカプセル化するエクスチェンジを作成して、メッセージのコレクションを公開します。この SjmsProducer は List の内容を繰り返し処理し、各メッセージを個別に公開します。
メッセージのバッチを生成する場合、各メッセージに固有のヘッダーを設定する必要があります。SJMS BatchMessage クラスを使用できます。SjmsProducer が BatchMessage List に遭遇すると、各 BatchMessage を繰り返し処理し、含まれるペイロードとヘッダーを公開します。
以下は BatchMessage クラスの使用例です。まず、BatchMessages の一覧を作成します。
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
String body = "Hello World " + i;
BatchMessage<String> message = new BatchMessage<String>(body, null);
messages.add(message);
}
Copy to Clipboard Toggle word wrap
次に、一覧を公開します。
template.sendBody("sjms:queue:batch.queue", messages);
Copy to Clipboard Toggle word wrap
SJMS は、開発者に TransactionCommitStrategy インターフェイスを使用してカスタムおよびプラグイン可能なトランザクションストラテジーを作成する手段を提供します。これにより、セッションをコミットするタイミングを決定するために SessionTransactionSynchronization が使用する一意の一連の状況を定義できます。その使用例は BatchTransactionCommitStrategy です。これについては次のセクションで説明します。

トランザクションバッチコンシューマーおよびプロデューサー

SjmsComponent は、Producer エンドポイントと Consumer エンドポイントの両方でローカル JMS トランザクションのバッチ処理をサポートするように設計されています。ただし、それぞれの処理方法は非常に異なります。
SjmsConsumer エンドポイントは、関連付けられた Session でコミットする前に X メッセージを処理する straitforward 実装です。コンシューマーでバッチ処理されたトランザクションを有効にするには、最初に transacted パラメーターを true に設定し、transactionBatchCount を追加し、これを 0 より大きい値に設定します。たとえば、以下の設定は 10 メッセージごとに Session をコミットします。
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10
Copy to Clipboard Toggle word wrap
コンシューマーエンドポイントでバッチの処理中に例外が発生すると、Session ロールバックが呼び出され、メッセージが次に利用可能なコンシューマーに再配信されます。カウンターは、関連付けられた Session の BatchTransactionCommitStrategy に対しても 0 にリセットされます。ユーザーがフックをバッチメッセージのプロセッサーに配置し、JMSRedelivered ヘッダーが true に設定されたメッセージを監視するのはユーザーの責任です。これは、メッセージが特定の時点でロールバックされ、成功した処理の検証が発生することを示しています。
トランザクションバッチコンシューマーは、セッションで開かれたトランザクションをコミットする前に、メッセージ間のデフォルトの時間(5000ms)を待機する内部タイマーのインスタンスも伝送します。デフォルト値の 5000ms (最低 1000ms)はほとんどのユースケースに適していますが、さらなるチューニングが必要な場合は transactionBatchTimeout パラメーターを設定します。
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000
Copy to Clipboard Toggle word wrap
許可される最小値は 1000ms です。コンテキストの切り替え量は、利点を得られることなく不要なパフォーマンスへの影響を引き起こす可能性があるためです。
ただし、プロデューサーエンドポイントははるかに異なる方法で処理されます。各メッセージが宛先に配信された後、プロデューサーでは Exchange が閉じられ、そのメッセージへの参照はなくなります。すべてのメッセージを再配信で利用可能にするには、BatchMessages を公開する Producer Endpoint でトランザクションを有効にするだけです。トランザクションは、バッチリストのすべてのメッセージが含まれるエクスチェンジの最後にコミットします。追加設定は必要ありません。以下に例を示します。
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
String body = "Hello World " + i;
BatchMessage<String> message = new BatchMessage<String>(body, null);
messages.add(message);
}
Copy to Clipboard Toggle word wrap
トランザクションを有効にして List を公開します。
template.sendBody("sjms:queue:batch.queue?transacted=true", messages);
Copy to Clipboard Toggle word wrap

追記

メッセージヘッダーの形式

SJMS コンポーネントは、Camel JMS Component で使用されるのと同じヘッダーフォーマットストラテジーを使用します。このプラグイン可能なストラテジーにより、ネットワーク経由で送信されるメッセージが JMS Message 仕様に準拠するようになります。
exchange.in.header の場合、ヘッダーキーに以下のルールが適用されます。
JMS または JMSX で始まるキーは予約されます。exchange.in.headers キーはリテラルで、すべて有効な Java 識別子である必要があります(キー名のドットは使用しないでください)。Camel は、JMS メッセージを使用するときにドットとハイフンを後継に置き換えます。
  • は、Camel がメッセージを消費するときに DOT と逆の置換に置き換えられます。
  • は、Camel がメッセージを消費するときに HYPHEN に置き換えられ、逆の置換です。鍵のフォーマットに独自のカスタムストラテジーを使用できるオプション jmsKeyFormatStrategy も参照してください。
exchange.in.header の場合、以下のルールがヘッダー値に適用されます。

メッセージの内容

ネットワーク経由でコンテンツを提供するには、配信されるメッセージのボディーが JMS Message Specification に準拠することを確認する必要があります。そのため、生成されるすべてはプリミティブまたはカウンターオブジェクト(Integer、Long、Charracter など)である必要があります。型、String、CharSequence、Date、BigDecimal、BigInteger はすべて、その toString ()表現に変換されます。その他のタイプはすべて破棄されます。

クラスタリング

クラスター環境で SJMS で InOut を使用する場合は、TemporaryQueue 宛先を使用するか、InOut プロデューサーエンドポイントごとに宛先への一意の名前付き応答を使用する必要があります。メッセージ相関は、ブローカーのメッセージセレクターではなく、エンドポイントによって処理されます。InOut Producer Endpoint は、Message JMSCorrelationID によってキャッシュされた Java Concurrency Exchangers を使用します。これにより、対象のコンシューマーによって生成される順序ですべてのメッセージが宛先から消費されるため、ブローカーのオーバーヘッドを削減しつつ優れたパフォーマンスが向上します。
現在、相関ストラテジーは JMSCorrelationId を使用することです。InOut Consumer はこのストラテジーを使用します。また、含まれる JMSReplyTo 宛先へのすべての応答メッセージにも、リクエストから JMSCorrelationId がコピーされるようにします。

トランザクションサポート

現在、SJMS は内部 JMS トランザクションの使用のみをサポートします。Camel Transaction Processor または Java Transaction API (JTA)はサポートされません。

Springless Mean I can't use Spring?

まったくありません。以下は、Spring DSL を使用した SJMS コンポーネントの例になります。
<route id="inout.named.reply.to.producer.route">
  <from uri="direct:invoke.named.reply.to.queue" />
  <to uri="sjms:queue:named.reply.to.queue?namedReplyTo=my.response.queue&xchangePattern=InOut" />
</route>
Copy to Clipboard Toggle word wrap
Springless は、Spring JMS API の依存関係から離れることを指します。新しい JMS クライアント API は、SJMS をゼロから開発しています。
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat