第2章 ActiveMQ
ActiveMQ コンポーネント
ActiveMQ コンポーネントを使用すると、メッセージを JMS Queue または Topic に送信できます。または Apache ActiveMQ を使用して JMS Queue またはトピックからメッセージを消費できます。
このコンポーネントは 169章JMS コンポーネント をベースとしており、送信に Spring の JmsTemplate
を使用し、消費するために MessageListenerContainer
を使用して宣言型トランザクションに Spring の JMS サポートを使用します。169章JMS コンポーネント コンポーネントのすべてのオプションは、このコンポーネントにも適用されます。
このコンポーネントを使用するには、クラスパスに activemq.jar
または activemq-core.jar
と camel-core.jar
、camel-spring.jar
、camel-jms.jar
などの Apache Camel 依存関係が含まれるようにします。
パフォーマンスに影響を与える可能 性があるため、JMS でトランザクションを使用している場合は、JMS ページのトランザクションとキャッシュレベル のセクション を参照してください。
URI 形式
activemq:[queue:|topic:]destinationName
destinationName は ActiveMQ のキューまたはトピック名です。デフォルトでは、destinationName はキュー名として解釈されます。たとえば、キュー FOO.BAR
に接続するには、以下を使用します。
activemq:FOO.BAR
必要に応じて、オプションの queue:
プレフィックスを含めることができます。
activemq:queue:FOO.BAR
トピックに接続するには、topic:
プレフィックスを含める必要があります。たとえば、トピック Stocks.Prices
に接続するには、以下を使用します。
activemq:topic:Stocks.Prices
オプション
これらのオプションはすべてこのコンポーネントに適用されるため、169章JMS コンポーネント コンポーネントのオプションを参照してください。
Camel on EAP デプロイメント
このコンポーネントは、Red Hat JBoss Enterprise Application Platform (JBoss EAP) コンテナー上で簡素化されたデプロイメントモデルを提供する Camel on EAP (Wildfly Camel) フレームワークによってサポートされます。
ActiveMQ Camel コンポーネントを設定して、埋め込みブローカーまたは外部ブローカーのいずれかと連携できます。JBoss EAP コンテナーにブローカーを組み込むには、「ActiveMQ Resource Adapter の設定」を参照してください。https://wildflyext.gitbooks.io/wildfly-camel/content/components/camel-activemq.html
接続ファクトリーの設定
以下の テストケース は、ActiveMQ への接続に使用される brokerURL を指定しながら activeMQComponent()
メソッド を使用して CamelContext に ActiveMQComponent を追加する方法を説明します。
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Spring XML を使用した接続ファクトリーの設定
ActiveMQComponent で ActiveMQ ブローカー URL を以下のように設定できます。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
接続プールの使用
Camel を使用して ActiveMQ ブローカーに送信する場合は、プールされた接続ファクトリーを使用して、JMS 接続、セッション、およびプロデューサーの効率的なプールを処理することが推奨されます。これは、ActiveMQ Spring Support ページに記載されています。
Maven を使用して AMQ プールを取得できます。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.3.2</version> </dependency>
そして、以下のように activemq コンポーネントを設定します。
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
プールされた接続ファクトリーの init メソッドおよび destroy メソッドに注意してください。これは、接続プールが適切に開始し、シャットダウンする上で重要です。
その後、PooledConnectionFactory
は、同時に使用される最大 8 つの接続を持つ接続プールを作成します。各接続は、多くのセッションで共有できます。maxActive
という名前のオプションを使用して、接続ごとのセッションの最大数を設定できます。デフォルト値は 500
です。ActiveMQ 5.7 以降では、オプションの名前が maxActiveSessionPerConnection
と命名されるように名前が変更されたため、目的を反映しました。concurrentConsumers
が maxConnections
よりも高い値に設定されていることに注意してください。これは、各コンシューマーがセッションを使用しているため、セッションが同じ接続を共有できるため、安全です。この例では、8 * 500 = 4000 アクティブなセッションを同時に指定できます。
ルートでの MessageListener POJO の呼び出し
ActiveMQ コンポーネントは、JMS MessageListener から プロセッサー へのヘルパー 型コンバーター も提供します。つまり、41章Bean コンポーネント コンポーネントはルート内で JMS MessageListener Bean を直接呼び出すことができます。
たとえば、以下のように JMS で MessageListener を作成できます。
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
次に、以下のようにルートで使用します。
from("file://foo/bar"). bean(MyListener.class);
つまり、Apache Camel コンポーネントを再利用し、それらを JMS MessageListener
POJO\ に簡単に統合できます。
ActiveMQ 宛先オプションの使用
ActiveMQ 5.6 から利用可能
"destination." プレフィックスを使用して、エンドポイント URI で Destination Options を設定できます。たとえば、コンシューマーを排他的としてマークし、その事前にフェッチサイズを 50 に設定するには、以下のように実行できます。
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file://src/test/data?noop=true"/> <to uri="activemq:queue:foo"/> </route> <route> <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. --> <from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/> <to uri="mock:results"/> </route> </camelContext>
アドバイザリーメッセージの消費
ActiveMQ は、消費可能なトピックに配置される アドバイザリーメッセージ を生成できます。このようなメッセージは、低速なコンシューマーを検出したり、統計をビルドしたりする場合にアラートを送信するのに役立ちます(1 日あたりのメッセージ/生成の数など)。 以下の Spring DSL の例は、トピックからメッセージを読み取る方法を示しています。
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
キューでメッセージを使用する場合は、data/activemq フォルダーに以下のファイルが表示されるはずです。
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
かつ、文字列が含まれる:
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
コンポーネント JAR の取得
この依存関係が必要です。
-
activemq-camel
ActiveMQ は、ActiveMQ プロジェクト でリリースされた 169章JMS コンポーネント コンポーネントの拡張機能です。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.6.0</version> </dependency>