第 6 章 Camel 组件
本章有关支持的 camel 组件的详情
6.1. camel-activemq
Camel ActiveMQ 集成由 activemq 组件提供。
组件可以配置为使用嵌入式或外部代理。对于 Wildfly / EAP 容器管理的连接池和 XA-Transaction 支持,可以将 ActiveMQ 资源适配器 配置到容器配置文件中。
6.1.1. JBoss EAP ActiveMQ 资源适配器配置
下载 ActiveMQ 资源适配器 rar 文件。下列步骤概述了配置 ActiveMQ 资源适配器。
- 停止您的 JBoss EAP 实例。
下载资源适配器并复制到相关的 JBoss EAP 部署目录。对于独立模式:
cp activemq-rar-5.11.1.rar ${JBOSS_HOME}/standalone/deployments/activemq-rar.rar
- 为 ActiveMQ 适配器配置 JBoss EAP 资源适配器子系统。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0"> <resource-adapters> <resource-adapter id="activemq-rar.rar"> <archive> activemq-rar.rar </archive> <transaction-support>XATransaction</transaction-support> <config-property name="UseInboundSession"> false </config-property> <config-property name="Password"> defaultPassword </config-property> <config-property name="UserName"> defaultUser </config-property> <config-property name="ServerUrl"> tcp://localhost:61616?jms.rmIdFromConnectionId=true </config-property> <connection-definitions> <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:/ActiveMQConnectionFactory" enabled="true" pool-name="ConnectionFactory"> <xa-pool> <min-pool-size>1</min-pool-size> <max-pool-size>20</max-pool-size> <prefill>false</prefill> <is-same-rm-override>false</is-same-rm-override> </xa-pool> </connection-definition> </connection-definitions> <admin-objects> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/HELLOWORLDMDBQueue" use-java-context="true" pool-name="HELLOWORLDMDBQueue"> <config-property name="PhysicalName"> HELLOWORLDMDBQueue </config-property> </admin-object> <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:/topic/HELLOWORLDMDBTopic" use-java-context="true" pool-name="HELLOWORLDMDBTopic"> <config-property name="PhysicalName"> HELLOWORLDMDBTopic </config-property> </admin-object> </admin-objects> </resource-adapter> </resource-adapters> </subsystem>
如果您的资源适配器存档文件名与 activemq-rar.rar.rar 不同,您必须更改上述配置中的 archive 元素的内容,以匹配您的存档文件名称。
必须选择 UserName 和 Password 配置属性的值,以匹配外部代理中有效用户的凭证。
您可能需要更改 ServerUrl 配置 属性的值,以匹配外部代理公开的实际主机名和端口。
4) 启动 JBoss EAP。如果一切配置正确,您应该在 JBoss EAP server.log 中查看消息,如下所示:
13:16:08,412 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010406: Registered connection factory java:/AMQConnectionFactory`
6.1.2. Camel 路由配置
以下 ActiveMQ producer 和消费者示例使用 ActiveMQ 嵌入式代理和"vm"传输(避免对外部 ActiveMQ 代理的需求)。
示例将 CDI 与 camel-cdi 组件一起使用。JMS ConnectionFactory 实例通过 JNDI 查找注入到 Camel RouteBuilder 中。
6.1.2.1. ActiveMQ Producer
@Startup @ApplicationScoped @ContextName("activemq-camel-context") public class ActiveMQRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("timer://sendJMSMessage?fixedRate=true&period=10000") .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>")) .to("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost") .log("JMS Message sent"); } }
每次将消息添加到 WildFlyCamelQueue 目的地时,都会向控制台输出一条日志消息。要验证消息实际上被放入队列中,您可以使用 Camel on EAP 子系统提供的 ../features/hawtio.md[Hawtio 控制台,window=_blank]。
6.1.2.2. ActiveMQ Consumer
要使用 ActiveMQ 消息,Camel RouteBuilder 实施与制作者示例类似。
当 ActiveMQ 端点消耗来自 WildFlyCamelQueue 目的地的消息时,内容会记录到控制台。
@Override public void configure() throws Exception { from("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost") .to("log:jms?showAll=true"); }
6.1.2.3. ActiveMQ Transactions
6.1.2.3.1. ActiveMQ 资源适配器配置
需要 ActiveMQ 资源适配器来利用 XA 事务支持、连接池等。
以下 XML 片段显示了如何在 JBoss EAP 服务器 XML 配置中配置资源适配器。请注意,ServerURL
设置为使用嵌入式代理。连接工厂绑定到 JNDI 名称 java:/ActiveMQConnectionFactory
。这将在下面的 RouteBuilder 示例中查找。
最后,两个队列配置为"queue1"和"queue2"。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0"> <resource-adapters> <resource-adapter id="activemq-rar.rar"> ... <admin-objects> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue1" use-java-context="true" pool-name="queue1pool"> <config-property name="PhysicalName">queue1</config-property> </admin-object> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue2" use-java-context="true" pool-name="queue2pool"> <config-property name="PhysicalName">queue2</config-property> </admin-object> </admin-objects> </resource-adapter> </resource-adapters> </subsystem>
6.1.2.4. 事务管理器
camel-activemq 组件需要类型为 org.springframework.transaction.PlatformTransactionManager
的事务管理器。因此,您可以先创建一个可满足此要求的 bean 扩展 JtaTransactionManager
。请注意,bean 被标注为 @Named
,以允许在 Camel bean registry 中注册 bean。另请注意,JBoss EAP 事务管理器和用户事务实例使用 CDI 注入。
@Named("transactionManager") public class CdiTransactionManager extends JtaTransactionManager { @Resource(mappedName = "java:/TransactionManager") private TransactionManager transactionManager; @Resource private UserTransaction userTransaction; @PostConstruct public void initTransactionManager() { setTransactionManager(transactionManager); setUserTransaction(userTransaction); } }
6.1.2.5. 事务策略
接下来,您需要声明要使用的事务策略。同样,使用 @Named
注释使 bean 可用于 Camel。事务管理器也会注入,以便使用所需的事务策略创建 TransactionTemplate
。此实例中 PROPAGATION_REQUIRED
。
@Named("PROPAGATION_REQUIRED") public class CdiRequiredPolicy extends SpringTransactionPolicy { @Inject public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) { super(new TransactionTemplate(cdiTransactionManager, new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))); } }
6.1.2.6. 路由构建器
现在,您可以配置 Camel RouteBuilder 类,并注入 Camel ActiveMQ 组件所需的依赖项。在资源适配器配置上配置的 ActiveMQ 连接工厂会与您之前配置的事务管理器一起注入。
在本例中,当任何消息都从 queue1 使用时,它们都会路由到另一个名为 queue2 的 JMS 队列。从 queue2 使用的消息会导致 JMS 事务通过 rollback ()DSL 方法回滚。这将使原始消息放置在死信队列(DLQ)上。
@Startup @ApplicationScoped @ContextName("activemq-camel-context") public class ActiveMQRouteBuilder extends RouteBuilder { @Resource(mappedName = "java:/ActiveMQConnectionFactory") private ConnectionFactory connectionFactory; @Inject private CdiTransactionManager transactionManager; @Override public void configure() throws Exception { ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent(); activeMQComponent.setTransacted(false); activeMQComponent.setConnectionFactory(connectionFactory); activeMQComponent.setTransactionManager(transactionManager); getContext().addComponent("activemq", activeMQComponent); errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ") .useOriginalMessage() .maximumRedeliveries(0) .redeliveryDelay(1000)); from("activemq:queue:queue1F") .transacted("PROPAGATION_REQUIRED") .to("activemq:queue:queue2"); from("activemq:queue:queue2") .to("log:end") .rollback(); } }
6.1.3. 安全性
请参阅 JMS 安全性部分。
6.1.4. GitHub 上的代码示例
GitHub 上提供了一个 camel-activemq 应用 示例。