第2章 ActiveMQ


ActiveMQ コンポーネント

ActiveMQ コンポーネントを使用すると、メッセージを JMS キューまたはトピックに送信したり、Apache ActiveMQ を使用して JMS キューまたはトピックからメッセージを消費したりできます。

このコンポーネントは、JMS component をベースにしており、Spring の JMS サポートを宣言型トランザクションに使用し、Spring の JmsTemplate を送信に、そして MessageListenerContainer を消費に使用します。すべての JMS コンポーネントオプション は、ActiveMQ コンポーネントにも適用されます。

このコンポーネントを使用するには、クラスパスに activemq.jar または activemq-core.jar があり、camel-core.jarcamel-spring.jarcamel-jms.jar などの Apache Camel 依存関係があることを確認してください。

トランザクションとキャッシング

JMS でトランザクションを使用している場合は、パフォーマンスに影響を与える可能性があるため、JMS ページの下の トランザクションとキャッシュレベル セクションを参照してください。

URI 形式

activemq:[queue:|topic:]destinationName

ここでは、destinationName は ActiveMQ キューまたはトピック名に置き換えます。デフォルトでは、destinationName はキュー名として解釈されます。たとえば、キューに接続するには、FOO.BAR を次のように使用します。

activemq:FOO.BAR

必要に応じて、オプションの queue: 接頭辞を含めることができます。

activemq:queue:FOO.BAR

トピックに接続するには、topic: 接頭辞を含める必要があります。たとえば、トピック Stocks.Prices に接続するには、次を使用します。

activemq:topic:Stocks.Prices

オプション

すべての JMS コンポーネントオプション は、ActiveMQ コンポーネントにも適用されます。

Camel on EAP デプロイメント

このコンポーネントは、Red Hat JBoss Enterprise Application Platform (JBoss EAP) コンテナー上で簡素化されたデプロイメントモデルを提供する Camel on EAP (Wildfly Camel) フレームワークによってサポートされます。

組み込みブローカーまたは外部ブローカーのいずれかで動作するように ActiveMQ Camel コンポーネントを設定できます。JBoss EAP コンテナーにブローカーを埋め込むには、EAP コンテナー設定ファイルで ActiveMQ リソースアダプターを設定します。詳細は、ActiveMQ リソースアダプターの設定 を参照してください。

接続ファクトリーの設定

次の テストケース は、ActiveMQ への接続に使用される brokerURL を指定しながら、activeMQComponent() メソッド を使用して、ActiveMQComponent を CamelContext に追加する方法を示しています。

camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));

Spring XML を使用した接続ファクトリーの設定

次のように、ActiveMQComponent で ActiveMQ ブローカー URL を設定できます。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>
  </bean>

</beans>

接続プーリングの使用

Camel を使用して ActiveMQ ブローカーに送信する場合は、プールされた接続ファクトリーを使用して、JMS 接続、セッション、および producer を効率的にプールするように処理することが推奨されます。詳細は、ActiveMQ Spring Support を参照してください。

  1. Maven で AMQ プールを追加します。
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.11.0.redhat-630516</version>
    </dependency>
  1. activemq コンポーネントを次のようにセットアップします。
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory"    class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
注記

プールされた接続ファクトリーの init メソッドと destroy メソッドに注目してください。これらの方法は、接続プールが適切に起動およびシャットダウンされるようにするために重要です。

PooledConnectionFactory は、同時に使用される最大 8 つの接続を含む接続プールを作成します。各接続は、多くのセッションで共有できます。

接続ごとのセッションの最大数を設定するために使用できる maxActive という名前のオプションがあります。デフォルト値は 500 です。

ActiveMQ 5.7 以降、目的が適切に反映されるようにオプションの名前が maxActiveSessionPerConnection に変更されました。

concurrentConsumersmaxConnections よりも高い値に設定されることに注意してください。これは、各 consumer がセッションを使用することから許可され、セッションが同じ接続を共有できるため、これは機能します。この例では、8 * 500 = 4000 のアクティブなセッションを同時に持つことができます。

ルートでの MessageListener POJO の呼び出し

ActiveMQ コンポーネントは、JMS MessageListener から Processor へのヘルパー Type Converter も提供します。つまり、Bean コンポーネント は、任意のルート内で任意の JMS MessageListener Bean を直接呼び出すことができます。

以下のように JMS で MessageListener を作成できます。

public class MyListener implements MessageListener {
   public void onMessage(Message jmsMessage) {
       // ...
   }
}

次に、以下のようにルートで使用します。

from("file://foo/bar").
  bean(MyListener.class);

つまり、任意の Apache Camel コンポーネントを再利用して、それらを JMS MessageListener POJO に簡単に統合できます。

ActiveMQ 宛先オプションの使用

ActiveMQ 5.6 以降で利用可能

"destination." の接頭辞を使用して、エンドポイント uri で 宛先オプション を設定できます。たとえば、コンシューマーを排他的としてマークし、そのプリフェッチサイズを 50 に設定するには、以下の例のようにします。

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="file://src/test/data?noop=true"/>
    <to uri="activemq:queue:foo"/>
  </route>
  <route>
    <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. -->
    <from uri="activemq:foo?destination.consumer.exclusive=true&amp;destination.consumer.prefetchSize=50"/>
    <to uri="mock:results"/>
  </route>
</camelContext>

アドバイザリーメッセージの使用

ActiveMQ は、消費可能なトピックに配置される アドバイザリーメッセージ を生成できます。このようなメッセージは、遅いコンシューマーを検出した場合にアラートを送信したり、統計 (1 日に生成されるメッセージの数など) を作成したりするのに役立ちます。次の Spring DSL の例は、トピックからメッセージを読み取る方法を示しています。

<route>
	<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
	<convertBodyTo type="java.lang.String"/>
	<transform>
	     <simple>${in.body}&#13;</simple>
	</transform>
	<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>

キューでメッセージを消費すると、data/activemq フォルダーの下に次のファイルが表示されるはずです。

advisoryConnection-20100312.txt

advisoryProducer-20100312.txt

および含まれる文字列:

      ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140
      -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles-
      3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null,
      expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468,
      correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null,
      marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo
      {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50,
      clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****,
      brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true},
      redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-
      3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true,
      droppable = false}

コンポーネント JAR の取得

以下の依存関係が必要です。

  • camel-activemq

ActiveMQ は、ActiveMQ プロジェクト でリリースされた JMS コンポーネント のエクステンションです。

<dependency>
    <groupId>org.fusesource</groupId>
    <artifactId>camel-activemq</artifactId>
    <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version>
</dependency>
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.