Ce contenu n'est pas disponible dans la langue sélectionnée.
Chapter 2. ActiveMQ
ActiveMQ Component
Using the ActiveMQ component, you can send messages to a JMS Queue or Topic or consume messages from a JMS Queue or Topic using Apache ActiveMQ.
This component is based on JMS component and uses Spring’s JMS support for declarative transactions, using Spring’s JmsTemplate
for sending and a MessageListenerContainer
for consuming. All JMS component options also apply to the ActiveMQ Component.
To use this component, make sure you have the activemq.jar
or activemq-core.jar
on your classpath along with any Apache Camel dependencies such as camel-core.jar
, camel-spring.jar
and camel-jms.jar
.
See section Transactions and Cache Levels below on JMS page if you are using transactions with JMS as it can impact performance.
URI format
activemq:[queue:|topic:]destinationName
Where destinationName is an ActiveMQ queue or topic name. By default, the destinationName is interpreted as a queue name. For example, to connect to the queue, FOO.BAR
, use:
activemq:FOO.BAR
You can include the optional queue:
prefix, if you prefer:
activemq:queue:FOO.BAR
To connect to a topic, you must include the topic:
prefix. For example, to connect to the topic, Stocks.Prices
, use:
activemq:topic:Stocks.Prices
Options
All JMS component options also apply to the ActiveMQ Component.
Camel on EAP deployment
This component is supported by the Camel on EAP (Wildfly Camel) framework, which offers a simplified deployment model on the Red Hat JBoss Enterprise Application Platform (JBoss EAP) container.
You can configure the ActiveMQ Camel component to work either with an embedded broker or an external broker. To embed a broker in the JBoss EAP container, configure the ActiveMQ Resource Adapter in the EAP container configuration file — for details, see ActiveMQ Resource Adapter Configuration.
Configuring the Connection Factory
The following test case shows how to add an ActiveMQComponent to the CamelContext using the activeMQComponent()
method while specifying the brokerURL used to connect to ActiveMQ.
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Configuring the Connection Factory using Spring XML
You can configure the ActiveMQ broker URL on the ActiveMQComponent as follows
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> </beans>
Using connection pooling
When sending to an ActiveMQ broker using Camel it is best practice to use a pooled connection factory to handle efficient pooling of JMS connections, sessions, and producers. See ActiveMQ Spring Support for more information.
- Add the AMQ pool with Maven:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.0.redhat-630516</version> </dependency>
- Set up the activemq component as follows:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean>
Notice the init and destroy methods on the pooled connection factory. These methods are important to ensure the connection pool is properly started and shut down.
The PooledConnectionFactory
will then create a connection pool with up to 8 connections in use at the same time. Each connection can be shared by many sessions.
There is an option named maxActive
you can use to configure the maximum number of sessions per connection; the default value is 500
.
From ActiveMQ 5.7 onwards, the option has been renamed to maxActiveSessionPerConnection
to reflect its purpose better.
Note that the concurrentConsumers
is set to a higher value than maxConnections
is. This is permitted, as each consumer is using a session, and as a session can share the same connection, this will work. In this example, we can have 8 * 500 = 4000 active sessions simultaneously.
Invoking MessageListener POJOs in a route
The ActiveMQ component also provides a helper Type Converter from a JMS MessageListener to a Processor. This means that the Bean component can invoke any JMS MessageListener bean directly inside any route.
You can create a MessageListener in JMS as follows:
Example
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
Then use it in your route as follows
Example
from("file://foo/bar"). bean(MyListener.class);
That is, you can reuse any of the Apache Camel components and easily integrate them into your JMS MessageListener
POJO\!
Using ActiveMQ Destination Options
Available as of ActiveMQ 5.6
You can configure the Destination Options in the endpoint uri, using the "destination." prefix. For example, to mark a consumer as exclusive, and set its prefetch size to 50, you can do as follows: .Example
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file://src/test/data?noop=true"/> <to uri="activemq:queue:foo"/> </route> <route> <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. --> <from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/> <to uri="mock:results"/> </route> </camelContext>
Consuming Advisory Messages
ActiveMQ can generate Advisory messages, which are put in topics you can consume. Such messages can help you send alerts in case you detect slow consumers or to build statistics (number of messages/produced per day, etc.) The following Spring DSL example shows you how to read messages from a topic.
Example
<route> <from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" /> <convertBodyTo type="java.lang.String"/> <transform> <simple>${in.body} </simple> </transform> <to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" /> </route>
If you consume a message on a queue, you should see the following files under the data/activemq folder :
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
and containing string:
Example
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140 -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles- 3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50, clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true}, redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles- 3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true, droppable = false}
Getting Component JAR
You need this dependency:
-
camel-activemq
ActiveMQ is an extension of the JMS component released with the ActiveMQ project.
<dependency> <groupId>org.fusesource</groupId> <artifactId>camel-activemq</artifactId> <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version> </dependency>