第2章 ActiveMQ
ActiveMQ コンポーネント
ActiveMQ コンポーネントを使用すると、メッセージを JMS キューまたはトピックに送信するか、Apache ActiveMQ を使用して JMS キューまたはトピックからメッセージを消費できます。
このコンポーネントは、177章JMS コンポーネント をベースにしており、Spring の JMS サポートを宣言型トランザクションに、また Spring の JmsTemplate
を送信に、MessageListenerContainer
を消費に使用します。177章JMS コンポーネント のコンポーネントのオプションはすべて、このコンポーネントに該当します。
このコンポーネントを使用するには、クラスパスに activemq.jar
または activemq-core.jar
があり、camel-core.jar
、camel-spring.jar
、camel-jms.jar
などの Apache Camel 依存関係があることを確認してください。
JMS でトランザクションを使用している場合は、パフォーマンスに影響を与える可能性があるため、JMS ページの下の トランザクションとキャッシュレベル セクションを参照してください。
URI 形式
activemq:[queue:|topic:]destinationName
ここでは、destinationName は ActiveMQ キューまたはトピック名に置き換えます。デフォルトでは、destinationName はキュー名として解釈されます。たとえば、キューに接続するには、FOO.BAR
を次のように使用します。
activemq:FOO.BAR
必要に応じて、オプションの queue:
接頭辞を含めることができます。
activemq:queue:FOO.BAR
トピックに接続するには、topic:
接頭辞を含める必要があります。たとえば、トピック Stocks.Prices
に接続するには、次を使用します。
activemq:topic:Stocks.Prices
オプション
これらのオプションはすべてこのオプションに該当するので、177章JMS コンポーネント コンポーネントのオプションを参照してください。
Camel on EAP デプロイメント
このコンポーネントは、Red Hat JBoss Enterprise Application Platform (JBoss EAP) コンテナー上で簡素化されたデプロイメントモデルを提供する Camel on EAP (Wildfly Camel) フレームワークによってサポートされます。
組み込みブローカーまたは外部ブローカーのいずれかで動作するように ActiveMQ Camel コンポーネントを設定できます。JBoss EAP コンテナーにブローカーを埋め込むには、EAP コンテナー設定ファイルで ActiveMQ リソースアダプターを設定します。詳細は、ActiveMQ リソースアダプターの設定 を参照してください。
接続ファクトリーの設定
次の テストケース は、ActiveMQ への接続に使用される brokerURL を指定しながら、activeMQComponent ()
メソッド を使用して ActiveMQComponent を CamelContext に追加する方法を示しています。
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Spring XML を使用した接続ファクトリーの設定
次のように、ActiveMQComponent で ActiveMQ ブローカー URL を設定できます。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
接続プーリングの使用
Camel を使用して ActiveMQ ブローカーに送信する場合は、プールされた接続ファクトリーを使用して、JMS 接続、セッション、およびプロデューサーを効率的にプールするように処理することを推奨します。これは ActiveMQ Spring Support ページに記載されています。
Maven を使用して、Jencks AMQ プールを取得できます。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.3.2</version> </dependency>
次に、activemq コンポーネントを次のように設定します。
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
プールされた接続ファクトリーの init メソッドと destroy メソッドに注目してください。これは、接続プールが適切に開始および終了されるようにするために重要です。
PooledConnectionFactory
は、同時に使用される最大 8 つの接続を含む接続プールを作成します。各接続は、多くのセッションで共有できます。接続ごとのセッションの最大数を設定するために使用できる maxActive
という名前のオプションがあります。デフォルト値は 500
です。ActiveMQ 5.7 以降、このオプションはその目的をさらに反映するように名前が変更され、maxActiveSessionPerConnection
という名前になりました。concurrentConsumers
が maxConnections
よりも高い値に設定されていることに注意してください。各コンシューマーがセッションを使用していて、セッションが同じ接続を共有できるので、これは問題ありません。この例では、同時に 8 * 500 = 4000 のアクティブなセッションを指定できます。
ルートでの MessageListener POJO の呼び出し
ActiveMQ コンポーネントは、JMS MessageListener から Processor へのヘルパー Type Converter も提供します。これは、43章Bean コンポーネント コンポーネントは、任意のルート内で任意の JMS MessageListener Bean を直接呼び出すことができます。
たとえば、次のように JMS で MessageListener を作成できます。
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
次に、以下のようにルートで使用します。
from("file://foo/bar"). bean(MyListener.class);
つまり、任意の Apache Camel コンポーネントを再利用して、それらを JMS MessageListener
POJO に簡単に統合できます。
ActiveMQ 宛先オプションの使用
ActiveMQ 5.6 以降で利用可能
"destination." の接頭辞を使用して、エンドポイント uri で 宛先オプション を設定できます。たとえば、コンシューマーを排他的とマークし、そのプリフェッチサイズを 50 に設定するには、次のようにします。
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file://src/test/data?noop=true"/> <to uri="activemq:queue:foo"/> </route> <route> <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. --> <from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/> <to uri="mock:results"/> </route> </camelContext>
アドバイザリーメッセージの使用
ActiveMQ は、消費可能なトピックに配置される アドバイザリーメッセージ を生成できます。このようなメッセージは、遅いコンシューマーを検出した場合にアラートを送信したり、統計 (1 日に生成されるメッセージの数など) を作成したりするのに役立ちます。 次の Spring DSL の例は、トピックからメッセージを読み取る方法を示しています。
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
キューでメッセージを消費すると、data/activemq フォルダーの下に次のファイルが表示されます。
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
および含まれる文字列:
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
コンポーネント JAR の取得
以下の依存関係が必要です。
-
activemq-camel
ActiveMQ は、ActiveMQ プロジェクト でリリースされた 177章JMS コンポーネント コンポーネントの拡張です。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.6.0</version> </dependency>