検索

第2章 ActiveMQ

download PDF

ActiveMQ コンポーネント

ActiveMQ コンポーネントを使用すると、メッセージを JMS Queue または Topic に送信できます。または Apache ActiveMQ を使用して JMS Queue またはトピックからメッセージを消費できます。

このコンポーネントは 169章JMS コンポーネント をベースとしており、送信に Spring の JmsTemplate を使用し、消費するために MessageListenerContainer を使用して宣言型トランザクションに Spring の JMS サポートを使用します。169章JMS コンポーネント コンポーネントのすべてのオプションは、このコンポーネントにも適用されます。

このコンポーネントを使用するには、クラスパスに activemq.jar または activemq-core.jarcamel-core.jarcamel-spring.jarcamel-jms.jar などの Apache Camel 依存関係が含まれるようにします。

トランザクションおよびキャッシュ

パフォーマンスに影響を与える可能 性があるため、JMS でトランザクションを使用している場合は、JMS ページのトランザクションとキャッシュレベル のセクション を参照してください。

URI 形式

activemq:[queue:|topic:]destinationName

destinationName は ActiveMQ のキューまたはトピック名です。デフォルトでは、destinationName はキュー名として解釈されます。たとえば、キュー FOO.BAR に接続するには、以下を使用します。

activemq:FOO.BAR

必要に応じて、オプションの queue: プレフィックスを含めることができます。

activemq:queue:FOO.BAR

トピックに接続するには、topic: プレフィックスを含める必要があります。たとえば、トピック Stocks.Prices に接続するには、以下を使用します。

activemq:topic:Stocks.Prices

オプション

これらのオプションはすべてこのコンポーネントに適用されるため、169章JMS コンポーネント コンポーネントのオプションを参照してください。

Camel on EAP デプロイメント

このコンポーネントは、Red Hat JBoss Enterprise Application Platform (JBoss EAP) コンテナー上で簡素化されたデプロイメントモデルを提供する Camel on EAP (Wildfly Camel) フレームワークによってサポートされます。

ActiveMQ Camel コンポーネントを設定して、埋め込みブローカーまたは外部ブローカーのいずれかと連携できます。JBoss EAP コンテナーにブローカーを組み込むには、「ActiveMQ Resource Adapter の設定」を参照してください。https://wildflyext.gitbooks.io/wildfly-camel/content/components/camel-activemq.html

接続ファクトリーの設定

以下の テストケース は、ActiveMQ への接続に使用される brokerURL を指定しながら activeMQComponent() メソッド を使用して CamelContext に ActiveMQComponent を追加する方法を説明します。

camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));

Spring XML を使用した接続ファクトリーの設定

ActiveMQComponent で ActiveMQ ブローカー URL を以下のように設定できます。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>
  </bean>

</beans>

接続プールの使用

Camel を使用して ActiveMQ ブローカーに送信する場合は、プールされた接続ファクトリーを使用して、JMS 接続、セッション、およびプロデューサーの効率的なプールを処理することが推奨されます。これは、ActiveMQ Spring Support ページに記載されています。

Maven を使用して AMQ プールを取得できます。

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.3.2</version>
    </dependency>

そして、以下のように activemq コンポーネントを設定します。

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory"    class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
注記

プールされた接続ファクトリーの init メソッドおよび destroy メソッドに注意してください。これは、接続プールが適切に開始し、シャットダウンする上で重要です。

その後、PooledConnectionFactory は、同時に使用される最大 8 つの接続を持つ接続プールを作成します。各接続は、多くのセッションで共有できます。maxActive という名前のオプションを使用して、接続ごとのセッションの最大数を設定できます。デフォルト値は 500 です。ActiveMQ 5.7 以降では、オプションの名前が maxActiveSessionPerConnection と命名されるように名前が変更されたため、目的を反映しました。concurrentConsumersmaxConnections よりも高い値に設定されていることに注意してください。これは、各コンシューマーがセッションを使用しているため、セッションが同じ接続を共有できるため、安全です。この例では、8 * 500 = 4000 アクティブなセッションを同時に指定できます。

ルートでの MessageListener POJO の呼び出し

ActiveMQ コンポーネントは、JMS MessageListener から プロセッサー へのヘルパー 型コンバーター も提供します。つまり、41章Bean コンポーネント コンポーネントはルート内で JMS MessageListener Bean を直接呼び出すことができます。

たとえば、以下のように JMS で MessageListener を作成できます。

public class MyListener implements MessageListener {
   public void onMessage(Message jmsMessage) {
       // ...
   }
}

次に、以下のようにルートで使用します。

from("file://foo/bar").
  bean(MyListener.class);

つまり、Apache Camel コンポーネントを再利用し、それらを JMS MessageListener POJO\ に簡単に統合できます。

ActiveMQ 宛先オプションの使用

ActiveMQ 5.6 から利用可能

"destination." プレフィックスを使用して、エンドポイント URI で Destination Options を設定できます。たとえば、コンシューマーを排他的としてマークし、その事前にフェッチサイズを 50 に設定するには、以下のように実行できます。

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="file://src/test/data?noop=true"/>
    <to uri="activemq:queue:foo"/>
  </route>
  <route>
    <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. -->
    <from uri="activemq:foo?destination.consumer.exclusive=true&amp;destination.consumer.prefetchSize=50"/>
    <to uri="mock:results"/>
  </route>
</camelContext>

アドバイザリーメッセージの消費

ActiveMQ は、消費可能なトピックに配置される アドバイザリーメッセージ を生成できます。このようなメッセージは、低速なコンシューマーを検出したり、統計をビルドしたりする場合にアラートを送信するのに役立ちます(1 日あたりのメッセージ/生成の数など)。 以下の Spring DSL の例は、トピックからメッセージを読み取る方法を示しています。

<route>
	<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
	<convertBodyTo type="java.lang.String"/>
	<transform>
	     <simple>${in.body}&#13;</simple>
	</transform>
	<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>

キューでメッセージを使用する場合は、data/activemq フォルダーに以下のファイルが表示されるはずです。

advisoryConnection-20100312.txt advisoryProducer-20100312.txt

かつ、文字列が含まれる:

      ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140
      -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles-
      3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null,
      expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468,
      correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null,
      marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo
      {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50,
      clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****,
      brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true},
      redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-
      3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true,
      droppable = false}

コンポーネント JAR の取得

この依存関係が必要です。

  • activemq-camel

ActiveMQ は、ActiveMQ プロジェクト でリリースされた 169章JMS コンポーネント コンポーネントの拡張機能です。

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-camel</artifactId>
  <version>5.6.0</version>
</dependency>
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.