Chapter 2. ActiveMQ
ActiveMQ Component Copy linkLink copied to clipboard!
Using the ActiveMQ component, you can send messages to a JMS Queue or Topic or consume messages from a JMS Queue or Topic using Apache ActiveMQ.
This component is based on JMS component and uses Spring’s JMS support for declarative transactions, using Spring’s JmsTemplate
for sending and a MessageListenerContainer
for consuming. All JMS component options also apply to the ActiveMQ Component.
To use this component, make sure you have the activemq.jar
or activemq-core.jar
on your classpath along with any Apache Camel dependencies such as camel-core.jar
, camel-spring.jar
and camel-jms.jar
.
See section Transactions and Cache Levels below on JMS page if you are using transactions with JMS as it can impact performance.
URI format Copy linkLink copied to clipboard!
activemq:[queue:|topic:]destinationName
activemq:[queue:|topic:]destinationName
Where destinationName is an ActiveMQ queue or topic name. By default, the destinationName is interpreted as a queue name. For example, to connect to the queue, FOO.BAR
, use:
activemq:FOO.BAR
activemq:FOO.BAR
You can include the optional queue:
prefix, if you prefer:
activemq:queue:FOO.BAR
activemq:queue:FOO.BAR
To connect to a topic, you must include the topic:
prefix. For example, to connect to the topic, Stocks.Prices
, use:
activemq:topic:Stocks.Prices
activemq:topic:Stocks.Prices
Options Copy linkLink copied to clipboard!
All JMS component options also apply to the ActiveMQ Component.
Camel on EAP deployment Copy linkLink copied to clipboard!
This component is supported by the Camel on EAP (Wildfly Camel) framework, which offers a simplified deployment model on the Red Hat JBoss Enterprise Application Platform (JBoss EAP) container.
You can configure the ActiveMQ Camel component to work either with an embedded broker or an external broker. To embed a broker in the JBoss EAP container, configure the ActiveMQ Resource Adapter in the EAP container configuration file — for details, see ActiveMQ Resource Adapter Configuration.
Configuring the Connection Factory Copy linkLink copied to clipboard!
The following test case shows how to add an ActiveMQComponent to the CamelContext using the activeMQComponent()
method while specifying the brokerURL used to connect to ActiveMQ.
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
Configuring the Connection Factory using Spring XML Copy linkLink copied to clipboard!
You can configure the ActiveMQ broker URL on the ActiveMQComponent as follows
Using connection pooling Copy linkLink copied to clipboard!
When sending to an ActiveMQ broker using Camel it is best practice to use a pooled connection factory to handle efficient pooling of JMS connections, sessions, and producers. See ActiveMQ Spring Support for more information.
- Add the AMQ pool with Maven:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.0.redhat-630516</version> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.11.0.redhat-630516</version>
</dependency>
- Set up the activemq component as follows:
Notice the init and destroy methods on the pooled connection factory. These methods are important to ensure the connection pool is properly started and shut down.
The PooledConnectionFactory
will then create a connection pool with up to 8 connections in use at the same time. Each connection can be shared by many sessions.
There is an option named maxActive
you can use to configure the maximum number of sessions per connection; the default value is 500
.
From ActiveMQ 5.7 onwards, the option has been renamed to maxActiveSessionPerConnection
to reflect its purpose better.
Note that the concurrentConsumers
is set to a higher value than maxConnections
is. This is permitted, as each consumer is using a session, and as a session can share the same connection, this will work. In this example, we can have 8 * 500 = 4000 active sessions simultaneously.
Invoking MessageListener POJOs in a route Copy linkLink copied to clipboard!
The ActiveMQ component also provides a helper Type Converter from a JMS MessageListener to a Processor. This means that the Bean component can invoke any JMS MessageListener bean directly inside any route.
You can create a MessageListener in JMS as follows:
Example
public class MyListener implements MessageListener { public void onMessage(Message jmsMessage) { // ... } }
public class MyListener implements MessageListener {
public void onMessage(Message jmsMessage) {
// ...
}
}
Then use it in your route as follows
Example
from("file://foo/bar"). bean(MyListener.class);
from("file://foo/bar").
bean(MyListener.class);
That is, you can reuse any of the Apache Camel components and easily integrate them into your JMS MessageListener
POJO\!
Using ActiveMQ Destination Options Copy linkLink copied to clipboard!
Available as of ActiveMQ 5.6
You can configure the Destination Options in the endpoint uri, using the "destination." prefix. For example, to mark a consumer as exclusive, and set its prefetch size to 50, you can do as follows: .Example
Consuming Advisory Messages Copy linkLink copied to clipboard!
ActiveMQ can generate Advisory messages, which are put in topics you can consume. Such messages can help you send alerts in case you detect slow consumers or to build statistics (number of messages/produced per day, etc.) The following Spring DSL example shows you how to read messages from a topic.
Example
If you consume a message on a queue, you should see the following files under the data/activemq folder :
advisoryConnection-20100312.txt advisoryProducer-20100312.txt
advisoryConnection-20100312.txt
advisoryProducer-20100312.txt
and containing string:
Example
Getting Component JAR Copy linkLink copied to clipboard!
You need this dependency:
-
camel-activemq
ActiveMQ is an extension of the JMS component released with the ActiveMQ project.
<dependency> <groupId>org.fusesource</groupId> <artifactId>camel-activemq</artifactId> <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version> </dependency>
<dependency>
<groupId>org.fusesource</groupId>
<artifactId>camel-activemq</artifactId>
<version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version>
</dependency>