第6章 Camel コンポーネント
本章では、サポートされる Camel コンポーネントについて詳しく説明します。
6.1. camel-activemq
Camel ActiveMQ インテグレーションは activemq コンポーネントによって提供されます。
コンポーネントは、埋め込みまたは外部ブローカーと連携するように設定できます。Wildfly / EAP コンテナー管理接続プールと XA-Transaction サポートの場合、ActiveMQ リソースアダプター はコンテナー設定ファイルに設定できます。
6.1.1. JBoss EAP ActiveMQ リソースアダプターの設定
ActiveMQ リソースアダプターの rar ファイル をダウンロードします。以下の手順で、ActiveMQ リソースアダプターを設定する方法の概要を説明します。
- JBoss EAP インスタンスを停止します。
リソースアダプターをダウンロードし、該当する JBoss EAP デプロイメントディレクトリーにコピーします。スタンドアロンモードの場合:
cp activemq-rar-5.11.1.rar ${JBOSS_HOME}/standalone/deployments/activemq-rar.rar
- ActiveMQ アダプターの JBoss EAP リソースアダプターサブシステムを設定します。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0"> <resource-adapters> <resource-adapter id="activemq-rar.rar"> <archive> activemq-rar.rar </archive> <transaction-support>XATransaction</transaction-support> <config-property name="UseInboundSession"> false </config-property> <config-property name="Password"> defaultPassword </config-property> <config-property name="UserName"> defaultUser </config-property> <config-property name="ServerUrl"> tcp://localhost:61616?jms.rmIdFromConnectionId=true </config-property> <connection-definitions> <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:/ActiveMQConnectionFactory" enabled="true" pool-name="ConnectionFactory"> <xa-pool> <min-pool-size>1</min-pool-size> <max-pool-size>20</max-pool-size> <prefill>false</prefill> <is-same-rm-override>false</is-same-rm-override> </xa-pool> </connection-definition> </connection-definitions> <admin-objects> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/HELLOWORLDMDBQueue" use-java-context="true" pool-name="HELLOWORLDMDBQueue"> <config-property name="PhysicalName"> HELLOWORLDMDBQueue </config-property> </admin-object> <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:/topic/HELLOWORLDMDBTopic" use-java-context="true" pool-name="HELLOWORLDMDBTopic"> <config-property name="PhysicalName"> HELLOWORLDMDBTopic </config-property> </admin-object> </admin-objects> </resource-adapter> </resource-adapters> </subsystem>
リソースアダプターアーカイブのファイル名が activemq-rar.rar と異なる場合は、前述の設定のアーカイブ要素の内容を変更し、アーカイブファイルの名前に一致させる必要があります。
外部ブローカーの有効なユーザーのクレデンシャルに一致するように、UserName および Password 設定プロパティーの値を選択する必要があります。
外部ブローカーによって公開される実際のホスト名およびポートに一致するように ServerUrl 設定プロパティーの値を変更する必要がある場合があります。
4) JBoss EAP を起動します。すべてが正しく設定されていれば、JBoss EAP の server.log に以下のようなメッセージが記録されるはずです。
13:16:08,412 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010406: Registered connection factory java:/AMQConnectionFactory`
6.1.2. Camel ルートの設定
以下の ActiveMQ プロデューサーおよびコンシューマーの例では、ActiveMQ 埋め込みブローカーと 'vm' トランスポートを使用します (したがって、外部 ActiveMQ ブローカーが必要なくなりました)。
この例では、CDI を camel-cdi コンポーネントと併用します。JMS ConnectionFactory インスタンスは、JNDI ルックアップを介して Camel RouteBuilder に注入されます。
6.1.2.1. ActiveMQ プロデューサー
@Startup @ApplicationScoped @ContextName("activemq-camel-context") public class ActiveMQRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("timer://sendJMSMessage?fixedRate=true&period=10000") .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>")) .to("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost") .log("JMS Message sent"); } }
メッセージが WildFlyCamelQueue 宛先に追加されるたびに、ログメッセージがコンソールに出力されます。メッセージが実際にキューに配置されていることを確認するには、Camel on EAP サブシステムによって提供される ../features/hawtio.md[Hawtio console,window=_blank] を使用できます。
6.1.2.2. ActiveMQ コンシューマー
ActiveMQ メッセージを消費するため、Camel RouteBuilder 実装はプロデューサーの例と似ています。
ActiveMQ エンドポイントが WildFlyCamelQueue 宛先からのメッセージを消費すると、内容がコンソールに記録されます。
@Override public void configure() throws Exception { from("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost") .to("log:jms?showAll=true"); }
6.1.2.3. ActiveMQ トランザクション
6.1.2.3.1. ActiveMQ リソースアダプターの設定
XA トランザクションのサポート、接続プールなどを利用するには、ActiveMQ リソースアダプターが必要です。
以下の XML スニペットは、リソースアダプターが JBoss EAP サーバーの XML 設定内でどのように設定されるかを示しています。ServerURL
は組み込みブローカーを使用するように設定されていることに注意してください。接続ファクトリーは JNDI 名 java:/ActiveMQConnectionFactory
にバインドされます。これは、以下の RouteBuilder の例で検索されます。
最後に、'queue1' と 'queue2' という名前の 2 つのキューが設定されます。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0"> <resource-adapters> <resource-adapter id="activemq-rar.rar"> ... <admin-objects> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue1" use-java-context="true" pool-name="queue1pool"> <config-property name="PhysicalName">queue1</config-property> </admin-object> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue2" use-java-context="true" pool-name="queue2pool"> <config-property name="PhysicalName">queue2</config-property> </admin-object> </admin-objects> </resource-adapter> </resource-adapters> </subsystem>
6.1.2.4. トランザクションマネージャー
camel-activemq コンポーネントには、タイプ org.springframework.transaction.PlatformTransactionManager
のトランザクションマネージャーが必要です。そのため、最初にこの要件を満たす JtaTransactionManager
を拡張する Bean を作成します。Bean には @Named
アノテーションが付けられており、Camel Bean レジストリー内に Bean を登録できることに注意してください。また、JBoss EAP トランザクションマネージャーとユーザートランザクションインスタンスは CDI を使用して注入されることにも注意してください。
@Named("transactionManager") public class CdiTransactionManager extends JtaTransactionManager { @Resource(mappedName = "java:/TransactionManager") private TransactionManager transactionManager; @Resource private UserTransaction userTransaction; @PostConstruct public void initTransactionManager() { setTransactionManager(transactionManager); setUserTransaction(userTransaction); } }
6.1.2.5. トランザクションポリシー
次に、使用するトランザクションポリシーを宣言する必要があります。ここでも、@Named
アノテーションを使用して Bean を Camel で利用可能にします。また、トランザクションマネージャーも、必要なトランザクションポリシーで TransactionTemplate
を作成できるように注入されます。このインスタンスの PROPAGATION_REQUIRED
@Named("PROPAGATION_REQUIRED") public class CdiRequiredPolicy extends SpringTransactionPolicy { @Inject public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) { super(new TransactionTemplate(cdiTransactionManager, new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))); } }
6.1.2.6. ルートビルダー
これで、Camel RouteBuilder クラスを設定し、Camel ActiveMQ コンポーネントに必要な依存関係を注入できるようになりました。リソースアダプター設定に設定した ActiveMQ 接続ファクトリーは、以前に設定したトランザクションマネージャーと共に注入されます。
この例の RouteBuilder では、queue1 から任意のメッセージが消費されるたびに、メッセージは queue2 という名前の別の JMS キューにルーティングされます。queue2 から消費されるメッセージにより、rollback() DSL メソッドを使用して JMS トランザクションがロールバックされます。これにより、元のメッセージがデッドレターキュー (DLQ) に配置されます。
@Startup @ApplicationScoped @ContextName("activemq-camel-context") public class ActiveMQRouteBuilder extends RouteBuilder { @Resource(mappedName = "java:/ActiveMQConnectionFactory") private ConnectionFactory connectionFactory; @Inject private CdiTransactionManager transactionManager; @Override public void configure() throws Exception { ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent(); activeMQComponent.setTransacted(false); activeMQComponent.setConnectionFactory(connectionFactory); activeMQComponent.setTransactionManager(transactionManager); getContext().addComponent("activemq", activeMQComponent); errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ") .useOriginalMessage() .maximumRedeliveries(0) .redeliveryDelay(1000)); from("activemq:queue:queue1F") .transacted("PROPAGATION_REQUIRED") .to("activemq:queue:queue2"); from("activemq:queue:queue2") .to("log:end") .rollback(); } }
6.1.3. セキュリティー
JMS のセキュリティー セクションを参照してください。
6.1.4. GitHub のコード例
camel-activemq アプリケーション のサンプルは GitHub で利用できます。