Ce contenu n'est pas disponible dans la langue sélectionnée.
Chapter 2. ActiveMQ
ActiveMQ Component Copier lienLien copié sur presse-papiers!
Using the ActiveMQ component, you can send messages to a JMS Queue or Topic or consume messages from a JMS Queue or Topic using Apache ActiveMQ.
This component is based on JMS component and uses Spring’s JMS support for declarative transactions, using Spring’s JmsTemplate for sending and a MessageListenerContainer for consuming. All JMS component options also apply to the ActiveMQ Component.
To use this component, make sure you have the activemq.jar or activemq-core.jar on your classpath along with any Apache Camel dependencies such as camel-core.jar, camel-spring.jar and camel-jms.jar.
See section Transactions and Cache Levels below on JMS page if you are using transactions with JMS as it can impact performance.
URI format Copier lienLien copié sur presse-papiers!
activemq:[queue:|topic:]destinationName
Where destinationName is an ActiveMQ queue or topic name. By default, the destinationName is interpreted as a queue name. For example, to connect to the queue, FOO.BAR, use:
activemq:FOO.BAR
You can include the optional queue: prefix, if you prefer:
activemq:queue:FOO.BAR
To connect to a topic, you must include the topic: prefix. For example, to connect to the topic, Stocks.Prices, use:
activemq:topic:Stocks.Prices
Options Copier lienLien copié sur presse-papiers!
All JMS component options also apply to the ActiveMQ Component.
Camel on EAP deployment Copier lienLien copié sur presse-papiers!
This component is supported by the Camel on EAP (Wildfly Camel) framework, which offers a simplified deployment model on the Red Hat JBoss Enterprise Application Platform (JBoss EAP) container.
You can configure the ActiveMQ Camel component to work either with an embedded broker or an external broker. To embed a broker in the JBoss EAP container, configure the ActiveMQ Resource Adapter in the EAP container configuration file — for details, see ActiveMQ Resource Adapter Configuration.
Configuring the Connection Factory Copier lienLien copié sur presse-papiers!
The following test case shows how to add an ActiveMQComponent to the CamelContext using the activeMQComponent() method while specifying the brokerURL used to connect to ActiveMQ.
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Configuring the Connection Factory using Spring XML Copier lienLien copié sur presse-papiers!
You can configure the ActiveMQ broker URL on the ActiveMQComponent as follows
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
</camelContext>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://somehost:61616"/>
</bean>
</beans>
Using connection pooling Copier lienLien copié sur presse-papiers!
When sending to an ActiveMQ broker using Camel it is best practice to use a pooled connection factory to handle efficient pooling of JMS connections, sessions, and producers. See ActiveMQ Spring Support for more information.
- Add the AMQ pool with Maven:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.11.0.redhat-630516</version>
</dependency>
- Set up the activemq component as follows:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="concurrentConsumers" value="10"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
Notice the init and destroy methods on the pooled connection factory. These methods are important to ensure the connection pool is properly started and shut down.
The PooledConnectionFactory will then create a connection pool with up to 8 connections in use at the same time. Each connection can be shared by many sessions.
There is an option named maxActive you can use to configure the maximum number of sessions per connection; the default value is 500.
From ActiveMQ 5.7 onwards, the option has been renamed to maxActiveSessionPerConnection to reflect its purpose better.
Note that the concurrentConsumers is set to a higher value than maxConnections is. This is permitted, as each consumer is using a session, and as a session can share the same connection, this will work. In this example, we can have 8 * 500 = 4000 active sessions simultaneously.
Invoking MessageListener POJOs in a route Copier lienLien copié sur presse-papiers!
The ActiveMQ component also provides a helper Type Converter from a JMS MessageListener to a Processor. This means that the Bean component can invoke any JMS MessageListener bean directly inside any route.
You can create a MessageListener in JMS as follows:
Example
public class MyListener implements MessageListener {
public void onMessage(Message jmsMessage) {
// ...
}
}
Then use it in your route as follows
Example
from("file://foo/bar").
bean(MyListener.class);
That is, you can reuse any of the Apache Camel components and easily integrate them into your JMS MessageListener POJO\!
Using ActiveMQ Destination Options Copier lienLien copié sur presse-papiers!
Available as of ActiveMQ 5.6
You can configure the Destination Options in the endpoint uri, using the "destination." prefix. For example, to mark a consumer as exclusive, and set its prefetch size to 50, you can do as follows: .Example
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="file://src/test/data?noop=true"/>
<to uri="activemq:queue:foo"/>
</route>
<route>
<!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. -->
<from uri="activemq:foo?destination.consumer.exclusive=true&destination.consumer.prefetchSize=50"/>
<to uri="mock:results"/>
</route>
</camelContext>
Consuming Advisory Messages Copier lienLien copié sur presse-papiers!
ActiveMQ can generate Advisory messages, which are put in topics you can consume. Such messages can help you send alerts in case you detect slow consumers or to build statistics (number of messages/produced per day, etc.) The following Spring DSL example shows you how to read messages from a topic.
Example
<route>
<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
<convertBodyTo type="java.lang.String"/>
<transform>
<simple>${in.body} </simple>
</transform>
<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>
If you consume a message on a queue, you should see the following files under the data/activemq folder :
advisoryConnection-20100312.txt
advisoryProducer-20100312.txt
and containing string:
Example
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140
-1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles-
3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null,
expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468,
correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null,
marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo
{commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50,
clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****,
brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true},
redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-
3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true,
droppable = false}
Getting Component JAR Copier lienLien copié sur presse-papiers!
You need this dependency:
-
camel-activemq
ActiveMQ is an extension of the JMS component released with the ActiveMQ project.
<dependency>
<groupId>org.fusesource</groupId>
<artifactId>camel-activemq</artifactId>
<version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version>
</dependency>