第 2 章 ActiveMQ
ActiveMQ Component
使用 ActiveMQ 组件,您可以将消息发送到 JMS Queue 或 Topic,或者使用 Apache ActiveMQ 从 JMS Queue 或 Topic 中使用消息。
此组件基于 JMS 组件,使用 Spring 的 JMS 对声明性事务的支持,使用 Spring 的 JmsTemplate
发送以及一个 MessageListenerContainer
以供消费。所有 JMS 组件选项 也适用于 ActiveMQ 组件。
要使用此组件,请确保您的类路径上具有 activemq.jar
或 activemq-core.jar
以及任何 Apache Camel 依赖项,如 camel-core.jar
、camel-spring.jar
和 camel-jms.jar
。
如果您使用带有 JMS 的事务,请参阅 JMS 页面上的下面的部分事务和缓存级别,因为它可能会影响性能。
URI 格式
activemq:[queue:|topic:]destinationName
其中 destinationName 是 ActiveMQ 队列或主题名称。默认情况下,targetName 被解释为队列名称。例如,要连接到队列 FOO.BAR
,请使用:
activemq:FOO.BAR
如果需要,您可以包含可选的 queue:
前缀:
activemq:queue:FOO.BAR
要连接到一个主题,您必须包含 topic:
前缀。例如,要连接到主题 Stocks.Prices
,请使用:
activemq:topic:Stocks.Prices
选项
所有 JMS 组件选项 也适用于 ActiveMQ 组件。
Camel on EAP 部署
此组件由 EAP (Wildfly Camel)框架上的 Camel 支持,该框架在 Red Hat JBoss Enterprise Application Platform (JBoss EAP)容器上提供了简化的部署模型。
您可以配置 ActiveMQ Camel 组件,以用于嵌入式代理或外部代理。要在 JBoss EAP 容器中嵌入代理,请在 EAP 容器配置文件中配置 ActiveMQ 资源适配器 以了解详细信息,请参阅 ActiveMQ 资源适配器配置。
配置连接工厂
以下 测试案例 演示了如何使用 activeMQComponent ()
方法将 ActiveMQComponent 添加到 CamelContext,同时指定用于连接 ActiveMQ 的 brokerURL。
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
使用 Spring XML 配置连接工厂
您可以在 ActiveMQComponent 上配置 ActiveMQ 代理 URL,如下所示
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
使用连接池
使用 Camel 发送到 ActiveMQ 代理时,最佳实践是使用池连接工厂来处理 JMS 连接、会话和生产者的有效池。如需更多信息,请参阅 ActiveMQ Spring 支持。
- 使用 Maven 添加 AMQ 池:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.0.redhat-630516</version> </dependency>
- 设置 activemq 组件,如下所示:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
注意池连接工厂的 init 和 destroy 方法。这些方法对于确保连接池已正确启动和关闭非常重要。
然后,PooledConnectionFactory
会创建最多 8 个连接同时使用的连接池。每个连接都可以由多个会话共享。
有一个名为 maxActive
的选项,可用于配置每个连接的最大会话数;默认值为 500
。
从 ActiveMQ 5.7 以后,选项已重命名为 maxActiveSessionPerConnection
,以更好地反映其目的。
请注意,concurrentConsumers
被设置为 maxConnections
的值。这是允许的,因为每个使用者都使用会话,并且会话可以共享相同的连接,因此可以正常工作。在本例中,我们可以同时有 8 个 * 500 = 4000 活跃会话。
在路由中调用 MessageListener POJO
ActiveMQ 组件还提供从 JMS MessageListener 到处理器的帮助程序 类型 Converter。https://camel.apache.org/manual/processor.html这意味着 Bean 组件 可以直接在任何路由内调用任何 JMS MessageListener bean。
您可以在 JMS 中创建 MessageListener,如下所示:
示例
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
然后,在路由中使用它,如下所示
示例
from("file://foo/bar"). bean(MyListener.class);
也就是说,您可以重复使用任何 Apache Camel 组件,并将其轻松集成到 JMS MessageListener
POJO\!
使用 ActiveMQ Destination 选项
从 ActiveMQ 5.6 开始提供
您可以使用 "destination." 前缀在 endpoint uri 中配置 Destination Options。例如,要将消费者标记为 exclusive,并将其预抓取大小设置为 50,您可以执行以下操作:.Example
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file://src/test/data?noop=true"/> <to uri="activemq:queue:foo"/> </route> <route> <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. --> <from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/> <to uri="mock:results"/> </route> </camelContext>
消耗公告消息
ActiveMQ 可以生成 公告消息,这些消息 位于您可以使用的主题中。如果您检测到缓慢消费者或构建统计信息(每天生成的消息/数量等),这些消息可以帮助您发送警报。) 以下 Spring DSL 示例演示了如何从主题读取消息。
示例
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
如果您在队列中使用消息,您应该在 data/activemq 文件夹下看到以下文件:
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
包含字符串:
示例
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
获取组件 JAR
您需要这个依赖项:
-
camel-activemq
ActiveMQ 是随 ActiveMQ 项目 一起发布的 JMS 组件的 扩展。
<dependency> <groupId>org.fusesource</groupId> <artifactId>camel-activemq</artifactId> <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version> </dependency>