第 2 章 ActiveMQ
ActiveMQ 组件
利用 ActiveMQ 组件,您可以发送消息到 JMS Queue 或 Topic,或者使用 Apache ActiveMQ 来使用来自 JMS Queue 或 Topic 的消息。
此组件基于 JMS 组件,使用 Spring 的 JMS 支持进行声明性事务,使用 Spring 的 JmsTemplate
发送和 MessageListenerContainer
以供使用。所有 JMS 组件选项 也适用于 ActiveMQ 组件。
要使用此组件,请确保在 classpath 上具有 activemq.jar
或 activemq-core.jar
,以及任何 Apache Camel 依赖项,如 camel-core.jar
、camel-spring.jar
和 camel-jms.jar
。
如果使用 JMS 的事务,请参阅 JMS 页面上的 Transactions 和 Cache Levels 部分,因为它可能会影响性能。
URI 格式
activemq:[queue:|topic:]destinationName
其中 destinationName 是 ActiveMQ 队列或主题名称。默认情况下,destinationName 解释为队列名称。例如,要连接到队列 FOO.BAR
,请使用:
activemq:FOO.BAR
如果需要,您可以包含可选的 queue:
前缀:
activemq:queue:FOO.BAR
要连接到主题,您必须包含 topic:
前缀。例如,要连接到主题 Stocks.Prices
,请使用:
activemq:topic:Stocks.Prices
选项
所有 JMS 组件选项 也适用于 ActiveMQ 组件。
Camel on EAP 部署
此组件由 EAP 上的 Camel (Wildfly Camel)框架支持,该框架在 Red Hat JBoss Enterprise Application Platform (JBoss EAP)容器上提供了简化的部署模型。
您可以配置 ActiveMQ Camel 组件,以用于嵌入式代理或外部代理。要在 JBoss EAP 容器中嵌入代理,请在 EAP 容器配置文件 configuring ActiveMQ 资源适配器中配置 ActiveMQ 资源适配器,详细信息请参阅 ActiveMQ 资源适配器配置。
配置连接工厂
以下 测试案例 演示了如何在指定用于连接 ActiveMQ 的 brokerURL 时,使用 activeMQComponent ()
方法将 ActiveMQComponent 添加到 CamelContext 中。
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
使用 Spring XML 配置连接工厂
您可以在 ActiveMQComponent 上配置 ActiveMQ 代理 URL,如下所示
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
使用连接池
使用 Camel 发送到 ActiveMQ 代理时,最好使用池化连接工厂来处理 JMS 连接、会话和生产者的有效池。如需更多信息,请参阅 ActiveMQ Spring 支持。
- 使用 Maven 添加 AMQ 池:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.0.redhat-630516</version> </dependency>
- 设置 activemq 组件,如下所示:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
注意池连接工厂的 init 和 destroy 方法。这些方法是确保正确启动和关闭连接池。
然后,pooled ConnectionFactory
将创建一个同时使用最多 8 个连接的连接池。每个连接可由多个会话共享。
有一个名为 maxActive
的选项,可用于配置每个连接的最大会话数;默认值为 500
。
自 ActiveMQ 5.7 起,选项已重命名为 maxActiveSessionPerConnection
,以更好地反映其目的。
请注意,concurrentConsumers
被设置为高于 maxConnections
的值。允许这样做,因为每个消费者都使用会话,因为会话可以共享相同的连接,所以这可以正常工作。在这个示例中,我们可以同时有 8 * 500 = 4000 活跃的会话。
在路由中调用 MessageListener POJO
ActiveMQ 组件还提供了从 JMS MessageListener 到处理器的帮助 类型转换器。https://camel.apache.org/manual/processor.html这意味着 Bean 组件 可以直接在任何路由内调用任何 JMS MessageListener bean。
您可以在 JMS 中创建 MessageListener,如下所示:
Example
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
然后,在您的路由中使用它,如下所示
Example
from("file://foo/bar"). bean(MyListener.class);
也就是说,您可以重复使用任何 Apache Camel 组件,并将它们轻松集成到 JMS MessageListener
POJO\!
使用 ActiveMQ Destination Options
从 ActiveMQ 5.6 开始提供
您可以使用 "destination." 前缀在端点 uri 中配置 Destination Options。例如,要将消费者标记为 exclusive,并将其预抓取大小设置为 50,您可以执行以下操作: .Example
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file://src/test/data?noop=true"/> <to uri="activemq:queue:foo"/> </route> <route> <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. --> <from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/> <to uri="mock:results"/> </route> </camelContext>
消耗公告消息
ActiveMQ 可以生成 公告消息,被放入您可以使用的主题中。当您检测到较慢的用户或构建统计信息时,这些消息可帮助您发送警报(每天生成的消息/数量等)以下 Spring DSL 示例演示了如何从主题读取消息。
Example
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
如果您在队列中使用消息,则应在 data/activemq 文件夹下看到以下文件:
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
包含字符串:
Example
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
获取组件 JAR
您需要这个依赖项:
-
camel-activemq
ActiveMQ 是与 ActiveMQ 项目 一起发布的 JMS 组件的 扩展。
<dependency> <groupId>org.fusesource</groupId> <artifactId>camel-activemq</artifactId> <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version> </dependency>