第2章 ActiveMQ
ActiveMQ コンポーネント
ActiveMQ コンポーネントを使用すると、メッセージを JMS キューまたはトピックに送信したり、Apache ActiveMQ を使用して JMS キューまたはトピックからメッセージを消費したりできます。
このコンポーネントは、JMS component をベースにしており、Spring の JMS サポートを宣言型トランザクションに使用し、Spring の JmsTemplate
を送信に、そして MessageListenerContainer
を消費に使用します。すべての JMS コンポーネントオプション は、ActiveMQ コンポーネントにも適用されます。
このコンポーネントを使用するには、クラスパスに activemq.jar
または activemq-core.jar
があり、camel-core.jar
、camel-spring.jar
、camel-jms.jar
などの Apache Camel 依存関係があることを確認してください。
JMS でトランザクションを使用している場合は、パフォーマンスに影響を与える可能性があるため、JMS ページの下の トランザクションとキャッシュレベル セクションを参照してください。
URI 形式
activemq:[queue:|topic:]destinationName
ここでは、destinationName は ActiveMQ キューまたはトピック名に置き換えます。デフォルトでは、destinationName はキュー名として解釈されます。たとえば、キューに接続するには、FOO.BAR
を次のように使用します。
activemq:FOO.BAR
必要に応じて、オプションの queue:
接頭辞を含めることができます。
activemq:queue:FOO.BAR
トピックに接続するには、topic:
接頭辞を含める必要があります。たとえば、トピック Stocks.Prices
に接続するには、次を使用します。
activemq:topic:Stocks.Prices
オプション
すべての JMS コンポーネントオプション は、ActiveMQ コンポーネントにも適用されます。
Camel on EAP デプロイメント
このコンポーネントは、Red Hat JBoss Enterprise Application Platform (JBoss EAP) コンテナー上で簡素化されたデプロイメントモデルを提供する Camel on EAP (Wildfly Camel) フレームワークによってサポートされます。
組み込みブローカーまたは外部ブローカーのいずれかで動作するように ActiveMQ Camel コンポーネントを設定できます。JBoss EAP コンテナーにブローカーを埋め込むには、EAP コンテナー設定ファイルで ActiveMQ リソースアダプターを設定します。詳細は、ActiveMQ リソースアダプターの設定 を参照してください。
接続ファクトリーの設定
次の テストケース は、ActiveMQ への接続に使用される brokerURL を指定しながら、activeMQComponent()
メソッド を使用して、ActiveMQComponent を CamelContext に追加する方法を示しています。
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Spring XML を使用した接続ファクトリーの設定
次のように、ActiveMQComponent で ActiveMQ ブローカー URL を設定できます。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
接続プーリングの使用
Camel を使用して ActiveMQ ブローカーに送信する場合は、プールされた接続ファクトリーを使用して、JMS 接続、セッション、および producer を効率的にプールするように処理することが推奨されます。詳細は、ActiveMQ Spring Support を参照してください。
- Maven で AMQ プールを追加します。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.0.redhat-630516</version> </dependency>
- activemq コンポーネントを次のようにセットアップします。
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
プールされた接続ファクトリーの init メソッドと destroy メソッドに注目してください。これらの方法は、接続プールが適切に起動およびシャットダウンされるようにするために重要です。
PooledConnectionFactory
は、同時に使用される最大 8 つの接続を含む接続プールを作成します。各接続は、多くのセッションで共有できます。
接続ごとのセッションの最大数を設定するために使用できる maxActive
という名前のオプションがあります。デフォルト値は 500
です。
ActiveMQ 5.7 以降、目的が適切に反映されるようにオプションの名前が maxActiveSessionPerConnection
に変更されました。
concurrentConsumers
は maxConnections
よりも高い値に設定されることに注意してください。これは、各 consumer がセッションを使用することから許可され、セッションが同じ接続を共有できるため、これは機能します。この例では、8 * 500 = 4000 のアクティブなセッションを同時に持つことができます。
ルートでの MessageListener POJO の呼び出し
ActiveMQ コンポーネントは、JMS MessageListener から Processor へのヘルパー Type Converter も提供します。つまり、Bean コンポーネント は、任意のルート内で任意の JMS MessageListener Bean を直接呼び出すことができます。
以下のように JMS で MessageListener を作成できます。
例
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
次に、以下のようにルートで使用します。
例
from("file://foo/bar"). bean(MyListener.class);
つまり、任意の Apache Camel コンポーネントを再利用して、それらを JMS MessageListener
POJO に簡単に統合できます。
ActiveMQ 宛先オプションの使用
ActiveMQ 5.6 以降で利用可能
"destination." の接頭辞を使用して、エンドポイント uri で 宛先オプション を設定できます。たとえば、コンシューマーを排他的としてマークし、そのプリフェッチサイズを 50 に設定するには、以下の例のようにします。
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file://src/test/data?noop=true"/> <to uri="activemq:queue:foo"/> </route> <route> <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. --> <from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/> <to uri="mock:results"/> </route> </camelContext>
アドバイザリーメッセージの使用
ActiveMQ は、消費可能なトピックに配置される アドバイザリーメッセージ を生成できます。このようなメッセージは、遅いコンシューマーを検出した場合にアラートを送信したり、統計 (1 日に生成されるメッセージの数など) を作成したりするのに役立ちます。次の Spring DSL の例は、トピックからメッセージを読み取る方法を示しています。
例
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
キューでメッセージを消費すると、data/activemq フォルダーの下に次のファイルが表示されるはずです。
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
および含まれる文字列:
例
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
コンポーネント JAR の取得
以下の依存関係が必要です。
-
camel-activemq
ActiveMQ は、ActiveMQ プロジェクト でリリースされた JMS コンポーネント のエクステンションです。
<dependency> <groupId>org.fusesource</groupId> <artifactId>camel-activemq</artifactId> <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version> </dependency>