第6章 Camel コンポーネント


本章では、サポートされる Camel コンポーネントについて詳しく説明します。

6.1. camel-activemq

Camel ActiveMQ インテグレーションは activemq コンポーネントによって提供されます。

コンポーネントは、埋め込みまたは外部ブローカーと連携するように設定できます。Wildfly / EAP コンテナー管理接続プールと XA-Transaction サポートの場合、ActiveMQ リソースアダプター はコンテナー設定ファイルに設定できます。

6.1.1. JBoss EAP ActiveMQ リソースアダプターの設定

ActiveMQ リソースアダプターの rar ファイル をダウンロードします。以下の手順で、ActiveMQ リソースアダプターを設定する方法の概要を説明します。

  1. JBoss EAP インスタンスを停止します。
  2. リソースアダプターをダウンロードし、該当する JBoss EAP デプロイメントディレクトリーにコピーします。スタンドアロンモードの場合:

    cp activemq-rar-5.11.0.redhat-630396.rar ${JBOSS_HOME}/standalone/deployments/activemq-rar.rar
  3. ActiveMQ アダプターの JBoss EAP リソースアダプターサブシステムを設定します。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
     <resource-adapters>
         <resource-adapter id="activemq-rar.rar">
             <archive>
                 activemq-rar.rar
             </archive>
             <transaction-support>XATransaction</transaction-support>
             <config-property name="UseInboundSession">
                 false
             </config-property>
             <config-property name="Password">
                 defaultPassword
             </config-property>
             <config-property name="UserName">
                 defaultUser
             </config-property>
             <config-property name="ServerUrl">
                 tcp://localhost:61616?jms.rmIdFromConnectionId=true
             </config-property>
             <connection-definitions>
                 <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:/ActiveMQConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                     <xa-pool>
                         <min-pool-size>1</min-pool-size>
                         <max-pool-size>20</max-pool-size>
                         <prefill>false</prefill>
                         <is-same-rm-override>false</is-same-rm-override>
                     </xa-pool>
                 </connection-definition>
             </connection-definitions>
             <admin-objects>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/HELLOWORLDMDBQueue" use-java-context="true" pool-name="HELLOWORLDMDBQueue">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBQueue
                     </config-property>
                 </admin-object>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:/topic/HELLOWORLDMDBTopic" use-java-context="true" pool-name="HELLOWORLDMDBTopic">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBTopic
                     </config-property>
                 </admin-object>
             </admin-objects>
         </resource-adapter>
     </resource-adapters>
 </subsystem>

リソースアダプターアーカイブのファイル名が activemq-rar.rar と異なる場合は、前述の設定のアーカイブ要素の内容を変更し、アーカイブファイルの名前に一致させる必要があります。

外部ブローカーの有効なユーザーのクレデンシャルに一致するように、UserName および Password 設定プロパティーの値を選択する必要があります。

外部ブローカーによって公開される実際のホスト名およびポートに一致するように ServerUrl 設定プロパティーの値を変更する必要がある場合があります。

4) JBoss EAP を起動します。すべてが正しく設定されていれば、JBoss EAP の server.log に以下のようなメッセージが記録されるはずです。

13:16:08,412 INFO  [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010406: Registered connection factory java:/AMQConnectionFactory`

6.1.2. Camel ルートの設定

以下の ActiveMQ プロデューサーおよびコンシューマーの例では、ActiveMQ 埋め込みブローカーと 'vm' トランスポートを使用します (したがって、外部 ActiveMQ ブローカーが必要なくなりました)。

この例では、CDI を camel-cdi コンポーネントと併用します。JMS ConnectionFactory インスタンスは、JNDI ルックアップを介して Camel RouteBuilder に注入されます。

6.1.2.1. ActiveMQ プロデューサー

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Override
  public void configure() throws Exception {
    from("timer://sendJMSMessage?fixedRate=true&period=10000")
    .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
    .to("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
    .log("JMS Message sent");
  }
}

メッセージが WildFlyCamelQueue 宛先に追加されるたびに、ログメッセージがコンソールに出力されます。メッセージが実際にキューに配置されていることを確認するには、Camel on EAP サブシステムによって提供される ../features/hawtio.md[Hawtio console,window=_blank] を使用できます。

activemq キューのブラウズ

6.1.2.2. ActiveMQ コンシューマー

ActiveMQ メッセージを消費するため、Camel RouteBuilder 実装はプロデューサーの例と似ています。

ActiveMQ エンドポイントが WildFlyCamelQueue 宛先からのメッセージを消費すると、内容がコンソールに記録されます。

@Override
public void configure() throws Exception {
  from("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
  .to("log:jms?showAll=true");
}

6.1.2.3. ActiveMQ トランザクション

6.1.2.3.1. ActiveMQ リソースアダプターの設定

XA トランザクションのサポート、接続プールなどを利用するには、ActiveMQ リソースアダプターが必要です。

以下の XML スニペットは、リソースアダプターが JBoss EAP サーバーの XML 設定内でどのように設定されるかを示しています。ServerURL は組み込みブローカーを使用するように設定されていることに注意してください。接続ファクトリーは JNDI 名 java:/ActiveMQConnectionFactory にバインドされます。これは、以下の RouteBuilder の例で検索されます。

最後に、'queue1' と 'queue2' という名前の 2 つのキューが設定されます。

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
  <resource-adapters>
    <resource-adapter id="activemq-rar.rar">
      ...
      <admin-objects>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue1" use-java-context="true" pool-name="queue1pool">
          <config-property name="PhysicalName">queue1</config-property>
        </admin-object>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue2" use-java-context="true" pool-name="queue2pool">
          <config-property name="PhysicalName">queue2</config-property>
        </admin-object>
      </admin-objects>
    </resource-adapter>
  </resource-adapters>
</subsystem>

6.1.2.4. トランザクションマネージャー

camel-activemq コンポーネントには、タイプ org.springframework.transaction.PlatformTransactionManager のトランザクションマネージャーが必要です。そのため、最初にこの要件を満たす JtaTransactionManager を拡張する Bean を作成します。Bean には @Named アノテーションが付けられており、Camel Bean レジストリー内に Bean を登録できることに注意してください。また、JBoss EAP トランザクションマネージャーとユーザートランザクションインスタンスは CDI を使用して注入されることにも注意してください。

@Named("transactionManager")
public class CdiTransactionManager extends JtaTransactionManager {

  @Resource(mappedName = "java:/TransactionManager")
  private TransactionManager transactionManager;

  @Resource
  private UserTransaction userTransaction;

  @PostConstruct
  public void initTransactionManager() {
    setTransactionManager(transactionManager);
    setUserTransaction(userTransaction);
  }
}

6.1.2.5. トランザクションポリシー

次に、使用するトランザクションポリシーを宣言する必要があります。ここでも、@Named アノテーションを使用して Bean を Camel で利用可能にします。また、トランザクションマネージャーも、必要なトランザクションポリシーで TransactionTemplate を作成できるように注入されます。このインスタンスの PROPAGATION_REQUIRED

@Named("PROPAGATION_REQUIRED")
public class CdiRequiredPolicy extends SpringTransactionPolicy {
  @Inject
  public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) {
    super(new TransactionTemplate(cdiTransactionManager,
      new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)));
  }
}

6.1.2.6. ルートビルダー

これで、Camel RouteBuilder クラスを設定し、Camel ActiveMQ コンポーネントに必要な依存関係を注入できるようになりました。リソースアダプター設定に設定した ActiveMQ 接続ファクトリーは、以前に設定したトランザクションマネージャーと共に注入されます。

この例の RouteBuilder では、queue1 から任意のメッセージが消費されるたびに、メッセージは queue2 という名前の別の JMS キューにルーティングされます。queue2 から消費されるメッセージにより、rollback() DSL メソッドを使用して JMS トランザクションがロールバックされます。これにより、元のメッセージがデッドレターキュー (DLQ) に配置されます。

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:/ActiveMQConnectionFactory")
  private ConnectionFactory connectionFactory;

  @Inject
  private CdiTransactionManager transactionManager;

  @Override
  public void configure() throws Exception {
    ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent();
    activeMQComponent.setTransacted(false);
    activeMQComponent.setConnectionFactory(connectionFactory);
    activeMQComponent.setTransactionManager(transactionManager);

    getContext().addComponent("activemq", activeMQComponent);

      errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ")
      .useOriginalMessage()
      .maximumRedeliveries(0)
      .redeliveryDelay(1000));

    from("activemq:queue:queue1F")
      .transacted("PROPAGATION_REQUIRED")
      .to("activemq:queue:queue2");

    from("activemq:queue:queue2")
      .to("log:end")
      .rollback();
  }
}

6.1.3. セキュリティー

JMS のセキュリティー セクションを参照してください。

6.1.4. GitHub のコード例

camel-activemq アプリケーション のサンプルは GitHub で利用できます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.