Chapter 6. Messaging Channels
Abstract
Messaging channels provide the plumbing for a messaging application. This chapter describes the different kinds of messaging channels available in a messaging system, and the roles that they play.
6.1. Point-to-Point Channel
Overview
A point-to-point channel, shown in Figure 6.1, “Point to Point Channel Pattern” is a message channel that guarantees that only one receiver consumes any given message. This is in contrast to a publish-subscribe channel, which allows multiple receivers to consume the same message. In particular, with a publish-subscribe channel, it is possible for multiple receivers to subscribe to the same channel. If more than one receiver competes to consume a message, it is up to the message channel to ensure that only one receiver actually consumes the message.
Figure 6.1. Point to Point Channel Pattern
Components that support point-to-point channel
The following Apache Camel components support the point-to-point channel pattern:
JMS
In JMS, a point-to-point channel is represented by a queue. For example, you can specify the endpoint URI for a JMS queue called Foo.Bar
as follows:
jms:queue:Foo.Bar
The qualifier, queue:
, is optional, because the JMS component creates a queue endpoint by default. Therefore, you can also specify the following equivalent endpoint URI:
jms:Foo.Bar
See Jms in the Apache Camel Component Reference Guide for more details.
ActiveMQ
In ActiveMQ, a point-to-point channel is represented by a queue. For example, you can specify the endpoint URI for an ActiveMQ queue called Foo.Bar
as follows:
activemq:queue:Foo.Bar
See ActiveMQ in the Apache Camel Component Reference Guide for more details.
SEDA
The Apache Camel Staged Event-Driven Architecture (SEDA) component is implemented using a blocking queue. Use the SEDA component if you want to create a lightweight point-to-point channel that is internal to the Apache Camel application. For example, you can specify an endpoint URI for a SEDA queue called SedaQueue
as follows:
seda:SedaQueue
JPA
The Java Persistence API (JPA) component is an EJB 3 persistence standard that is used to write entity beans out to a database. See JPA in the Apache Camel Component Reference Guide for more details.
XMPP
The XMPP (Jabber) component supports the point-to-point channel pattern when it is used in the person-to-person mode of communication. See XMPP in the Apache Camel Component Reference Guide for more details.
6.2. Publish-Subscribe Channel
Overview
A publish-subscribe channel, shown in Figure 6.2, “Publish Subscribe Channel Pattern”, is a Section 5.2, “Message Channel” that enables multiple subscribers to consume any given message. This is in contrast with a Section 6.1, “Point-to-Point Channel”. Publish-subscribe channels are frequently used as a means of broadcasting events or notifications to multiple subscribers.
Figure 6.2. Publish Subscribe Channel Pattern
Components that support publish-subscribe channel
The following Apache Camel components support the publish-subscribe channel pattern:
JMS
In JMS, a publish-subscribe channel is represented by a topic. For example, you can specify the endpoint URI for a JMS topic called StockQuotes
as follows:
jms:topic:StockQuotes
See Jms in the Apache Camel Component Reference Guide for more details.
ActiveMQ
In ActiveMQ, a publish-subscribe channel is represented by a topic. For example, you can specify the endpoint URI for an ActiveMQ topic called StockQuotes
, as follows:
activemq:topic:StockQuotes
See ActiveMQ in the Apache Camel Component Reference Guide for more details.
XMPP
The XMPP (Jabber) component supports the publish-subscribe channel pattern when it is used in the group communication mode. See Xmpp in the Apache Camel Component Reference Guide for more details.
Static subscription lists
If you prefer, you can also implement publish-subscribe logic within the Apache Camel application itself. A simple approach is to define a static subscription list, where the target endpoints are all explicitly listed at the end of the route. However, this approach is not as flexible as a JMS or ActiveMQ topic.
Java DSL example
The following Java DSL example shows how to simulate a publish-subscribe channel with a single publisher, seda:a
, and three subscribers, seda:b
, seda:c
, and seda:d
:
from("seda:a").to("seda:b", "seda:c", "seda:d");
This only works for the InOnly message exchange pattern.
XML configuration example
The following example shows how to configure the same route in XML:
<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <to uri="seda:b"/> <to uri="seda:c"/> <to uri="seda:d"/> </route> </camelContext>
6.3. Dead Letter Channel
Overview
The dead letter channel pattern, shown in Figure 6.3, “Dead Letter Channel Pattern”, describes the actions to take when the messaging system fails to deliver a message to the intended recipient. This includes such features as retrying delivery and, if delivery ultimately fails, sending the message to a dead letter channel, which archives the undelivered messages.
Figure 6.3. Dead Letter Channel Pattern
Creating a dead letter channel in Java DSL
The following example shows how to create a dead letter channel using Java DSL:
errorHandler(deadLetterChannel("seda:errors")); from("seda:a").to("seda:b");
Where the errorHandler()
method is a Java DSL interceptor, which implies that all of the routes defined in the current route builder are affected by this setting. The deadLetterChannel()
method is a Java DSL command that creates a new dead letter channel with the specified destination endpoint, seda:errors
.
The errorHandler()
interceptor provides a catch-all mechanism for handling all error types. If you want to apply a more fine-grained approach to exception handling, you can use the onException
clauses instead(see the section called “onException clause”).
XML DSL example
You can define a dead letter channel in the XML DSL, as follows:
<route errorHandlerRef="myDeadLetterErrorHandler"> ... </route> <bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder"> <property name="deadLetterUri" value="jms:queue:dead"/> <property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/> </bean> <bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy"> <property name="maximumRedeliveries" value="3"/> <property name="redeliveryDelay" value="5000"/> </bean>
Redelivery policy
Normally, you do not send a message straight to the dead letter channel, if a delivery attempt fails. Instead, you re-attempt delivery up to some maximum limit, and after all redelivery attempts fail you would send the message to the dead letter channel. To customize message redelivery, you can configure the dead letter channel to have a redelivery policy. For example, to specify a maximum of two redelivery attempts, and to apply an exponential backoff algorithm to the time delay between delivery attempts, you can configure the dead letter channel as follows:
errorHandler(deadLetterChannel("seda:errors").maximumRedeliveries(2).useExponentialBackOff()); from("seda:a").to("seda:b");
Where you set the redelivery options on the dead letter channel by invoking the relevant methods in a chain (each method in the chain returns a reference to the current RedeliveryPolicy
object). Table 6.1, “Redelivery Policy Settings” summarizes the methods that you can use to set redelivery policies.
Method Signature | Default | Description |
---|---|---|
|
| Controls whether redelivery is attempted during graceful shutdown or while a route is stopping. A delivery that is already in progress when stopping is initiated will not be interrupted. |
|
|
If exponential backoff is enabled, let d, m*d, m*m*d, m*m*m*d, ... |
|
|
If collision avoidance is enabled, let |
|
|
Camel 2.15: Specifies whether or not to handle an exception that occurs while processing a message in the dead letter channel. If |
| None | Apache Camel 2.0: See the section called “Redeliver delay pattern”. |
|
|
Apache Camel 2.0: Disables the redelivery feature. To enable redelivery, set |
|
|
Apache Camel 2.0: If |
|
| Specifies the delay (in milliseconds) before attempting the first redelivery. |
|
| Specifies whether to log at WARN level, when an exception is raised in the dead letter channel. |
|
|
Apache Camel 2.0: If |
|
| Apache Camel 2.0: Maximum number of delivery attempts. |
|
|
Apache Camel 2.0: When using an exponential backoff strategy (see |
| None | Apache Camel 2.0: Configures a processor that gets called before every redelivery attempt. |
|
| Apache Camel 2.0: Specifies the delay (in milliseconds) between redelivery attempts. Apache Camel 2.16.0 : The default redelivery delay is one second. |
|
|
Apache Camel 2.0: Specifies the logging level at which to log delivery failure (specified as an |
|
|
Apache Camel 2.0: Specifies the logging level at which to redelivery attempts (specified as an |
|
| Enables collision avoidence, which adds some randomization to the backoff timings to reduce contention probability. |
|
|
Apache Camel 2.0: If this feature is enabled, the message sent to the dead letter channel is a copy of the original message exchange, as it existed at the beginning of the route (in the |
|
| Enables exponential backoff. |
Redelivery headers
If Apache Camel attempts to redeliver a message, it automatically sets the headers described in Table 6.2, “Dead Letter Redelivery Headers” on the In message.
Header Name | Type | Description |
---|---|---|
|
|
Apache Camel 2.0: Counts the number of unsuccessful delivery attempts. This value is also set in |
|
|
Apache Camel 2.0: True, if one or more redelivery attempts have been made. This value is also set in |
|
|
Apache Camel 2.6: Holds the maximum redelivery setting (also set in the |
Redelivery exchange properties
If Apache Camel attempts to redeliver a message, it automatically sets the exchange properties described in Table 6.3, “Redelivery Exchange Properties”.
Exchange Property Name | Type | Description |
---|---|---|
|
|
Provides the route ID of the route that failed. The literal name of this property is |
Using the original message
Available as of Apache Camel 2.0 Because an exchange object is subject to modification as it passes through the route, the exchange that is current when an exception is raised is not necessarily the copy that you would want to store in the dead letter channel. In many cases, it is preferable to log the message that arrived at the start of the route, before it was subject to any kind of transformation by the route. For example, consider the following route:
from("jms:queue:order:input") .to("bean:validateOrder"); .to("bean:transformOrder") .to("bean:handleOrder");
The preceding route listen for incoming JMS messages and then processes the messages using the sequence of beans: validateOrder
, transformOrder
, and handleOrder
. But when an error occurs, we do not know in which state the message is in. Did the error happen before the transformOrder
bean or after? We can ensure that the original message from jms:queue:order:input
is logged to the dead letter channel by enabling the useOriginalMessage
option as follows:
// will use original body errorHandler(deadLetterChannel("jms:queue:dead") .useOriginalMessage().maximumRedeliveries(5).redeliveryDelay(5000);
Redeliver delay pattern
Available as of Apache Camel 2.0 The delayPattern
option is used to specify delays for particular ranges of the redelivery count. The delay pattern has the following syntax: limit1:delay1;limit2:delay2;limit3:delay3;…
, where each delayN is applied to redeliveries in the range limitN ⇐ redeliveryCount < limitN+1
For example, consider the pattern, 5:1000;10:5000;20:20000
, which defines three groups and results in the following redelivery delays:
- Attempt number 1..4 = 0 milliseconds (as the first group starts with 5).
- Attempt number 5..9 = 1000 milliseconds (the first group).
- Attempt number 10..19 = 5000 milliseconds (the second group).
- Attempt number 20.. = 20000 milliseconds (the last group).
You can start a group with limit 1 to define a starting delay. For example, 1:1000;5:5000
results in the following redelivery delays:
- Attempt number 1..4 = 1000 millis (the first group)
- Attempt number 5.. = 5000 millis (the last group)
There is no requirement that the next delay should be higher than the previous and you can use any delay value you like. For example, the delay pattern, 1:5000;3:1000
, starts with a 5 second delay and then reduces the delay to 1 second.
Which endpoint failed?
When Apache Camel routes messages, it updates an Exchange property that contains the last endpoint the Exchange was sent to. Hence, you can obtain the URI for the current exchange’s most recent destination using the following code:
// Java String lastEndpointUri = exchange.getProperty(Exchange.TO_ENDPOINT, String.class);
Where Exchange.TO_ENDPOINT
is a string constant equal to CamelToEndpoint
. This property is updated whenever Camel sends a message to any endpoint.
If an error occurs during routing and the exchange is moved into the dead letter queue, Apache Camel will additionally set a property named CamelFailureEndpoint
, which identifies the last destination the exchange was sent to before the error occured. Hence, you can access the failure endpoint from within a dead letter queue using the following code:
// Java String failedEndpointUri = exchange.getProperty(Exchange.FAILURE_ENDPOINT, String.class);
Where Exchange.FAILURE_ENDPOINT
is a string constant equal to CamelFailureEndpoint
.
These properties remain set in the current exchange, even if the failure occurs after the given destination endpoint has finished processing. For example, consider the following route:
from("activemq:queue:foo") .to("http://someserver/somepath") .beanRef("foo");
Now suppose that a failure happens in the foo
bean. In this case the Exchange.TO_ENDPOINT
property and the Exchange.FAILURE_ENDPOINT
property still contain the value.
onRedelivery processor
When a dead letter channel is performing redeliveries, it is possible to configure a Processor
that is executed just before every redelivery attempt. This can be used for situations where you need to alter the message before it is redelivered.
For example, the following dead letter channel is configured to call the MyRedeliverProcessor
before redelivering exchanges:
// we configure our Dead Letter Channel to invoke // MyRedeliveryProcessor before a redelivery is // attempted. This allows us to alter the message before errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(5) .onRedelivery(new MyRedeliverProcessor()) // setting delay to zero is just to make unit teting faster .redeliveryDelay(0L));
Where the MyRedeliveryProcessor
process is implemented as follows:
// This is our processor that is executed before every redelivery attempt
// here we can do what we want in the java code, such as altering the message
public class MyRedeliverProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// the message is being redelivered so we can alter it
// we just append the redelivery counter to the body
// you can of course do all kind of stuff instead
String body = exchange.getIn().getBody(String.class);
int count = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
exchange.getIn().setBody(body + count);
// the maximum redelivery was set to 5
int max = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
assertEquals(5, max);
}
}
Control redelivery during shutdown or stopping
If you stop a route or initiate graceful shutdown, the default behavior of the error handler is to continue attempting redelivery. Because this is typically not the desired behavior, you have the option of disabling redelivery during shutdown or stopping, by setting the allowRedeliveryWhileStopping
option to false
, as shown in the following example:
errorHandler(deadLetterChannel("jms:queue:dead")
.allowRedeliveryWhileStopping(false)
.maximumRedeliveries(20)
.redeliveryDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.INFO));
The allowRedeliveryWhileStopping
option is true
by default, for backwards compatibility reasons. During aggressive shutdown, however, redelivery is always suppressed, irrespective of this option setting (for example, after graceful shutdown has timed out).
Using onExceptionOccurred Processor
Dead Letter channel supports the onExceptionOccurred processor to allow the custom processing of a message, after an exception occurs. You can use it for custom logging too. Any new exceptions thrown from the onExceptionOccurred processor is logged as WARN and ignored, not to override the existing exception.
The difference between the onRedelivery processor and onExceptionOccurred processor is you can process the former exactly before the redelivery attempt. However, it does not happen immediately after an exception occurs. For example, If you configure the error handler to do five seconds delay between the redelivery attempts, then the redelivery processor is invoked five seconds later, after an exception occurs.
The following example explains how to do the custom logging when an exception occurs. You need to configure the onExceptionOccurred to use the custom processor.
errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(5000).onExceptionOccurred(myProcessor));
onException clause
Instead of using the errorHandler()
interceptor in your route builder, you can define a series of onException()
clauses that define different redelivery policies and different dead letter channels for various exception types. For example, to define distinct behavior for each of the NullPointerException
, IOException
, and Exception
types, you can define the following rules in your route builder using Java DSL:
onException(NullPointerException.class) .maximumRedeliveries(1) .setHeader("messageInfo", "Oh dear! An NPE.") .to("mock:npe_error"); onException(IOException.class) .initialRedeliveryDelay(5000L) .maximumRedeliveries(3) .backOffMultiplier(1.0) .useExponentialBackOff() .setHeader("messageInfo", "Oh dear! Some kind of I/O exception.") .to("mock:io_error"); onException(Exception.class) .initialRedeliveryDelay(1000L) .maximumRedeliveries(2) .setHeader("messageInfo", "Oh dear! An exception.") .to("mock:error"); from("seda:a").to("seda:b");
Where the redelivery options are specified by chaining the redelivery policy methods (as listed in Table 6.1, “Redelivery Policy Settings”), and you specify the dead letter channel’s endpoint using the to()
DSL command. You can also call other Java DSL commands in the onException()
clauses. For example, the preceding example calls setHeader()
to record some error details in a message header named, messageInfo
.
In this example, the NullPointerException
and the IOException
exception types are configured specially. All other exception types are handled by the generic Exception
exception interceptor. By default, Apache Camel applies the exception interceptor that most closely matches the thrown exception. If it fails to find an exact match, it tries to match the closest base type, and so on. Finally, if no other interceptor matches, the interceptor for the Exception
type matches all remaining exceptions.
OnPrepareFailure
Before you pass the exchange to the dead letter queue, you can use the onPrepare
option to allow a custom processor to prepare the exchange. It enables you to add information about the exchange, such as the cause of exchange failure. For example, the following processor adds a header with the exception message.
public class MyPrepareProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); exchange.getIn().setHeader("FailedBecause", cause.getMessage()); } }
You can configue the error handler to use the processor as follows.
errorHandler(deadLetterChannel("jms:dead").onPrepareFailure(new MyPrepareProcessor()));
However, the onPrepare
option is also available using the default error handler.
<bean id="myPrepare" class="org.apache.camel.processor.DeadLetterChannelOnPrepareTest.MyPrepareProcessor"/> <errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="jms:dead" onPrepareFailureRef="myPrepare"/>
6.4. Guaranteed Delivery
Overview
Guaranteed delivery means that once a message is placed into a message channel, the messaging system guarantees that the message will reach its destination, even if parts of the application should fail. In general, messaging systems implement the guaranteed delivery pattern, shown in Figure 6.4, “Guaranteed Delivery Pattern”, by writing messages to persistent storage before attempting to deliver them to their destination.
Figure 6.4. Guaranteed Delivery Pattern
Components that support guaranteed delivery
The following Apache Camel components support the guaranteed delivery pattern:
- JMS
- ActiveMQ
- ActiveMQ Journal
- File Component in the Apache Camel Component Reference Guide
JMS
In JMS, the deliveryPersistent
query option indicates whether or not persistent storage of messages is enabled. Usually it is unnecessary to set this option, because the default behavior is to enable persistent delivery. To configure all the details of guaranteed delivery, it is necessary to set configuration options on the JMS provider. These details vary, depending on what JMS provider you are using. For example, MQSeries, TibCo, BEA, Sonic, and others, all provide various qualities of service to support guaranteed delivery.
See Jms in the Apache Camel Component Reference Guide> for more details.
ActiveMQ
In ActiveMQ, message persistence is enabled by default. From version 5 onwards, ActiveMQ uses the AMQ message store as the default persistence mechanism. There are several different approaches you can use to enabe message persistence in ActiveMQ.
The simplest option (different from Figure 6.4, “Guaranteed Delivery Pattern”) is to enable persistence in a central broker and then connect to that broker using a reliable protocol. After a message is been sent to the central broker, delivery to consumers is guaranteed. For example, in the Apache Camel configuration file, META-INF/spring/camel-context.xml
, you can configure the ActiveMQ component to connect to the central broker using the OpenWire/TCP protocol as follows:
<beans ... > ... <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="tcp://somehost:61616"/> </bean> ... </beans>
If you prefer to implement an architecture where messages are stored locally before being sent to a remote endpoint (similar to Figure 6.4, “Guaranteed Delivery Pattern”), you do this by instantiating an embedded broker in your Apache Camel application. A simple way to achieve this is to use the ActiveMQ Peer-to-Peer protocol, which implicitly creates an embedded broker to communicate with other peer endpoints. For example, in the camel-context.xml
configuration file, you can configure the ActiveMQ component to connect to all of the peers in group, GroupA
, as follows:
<beans ... > ... <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="peer://GroupA/broker1"/> </bean> ... </beans>
Where broker1
is the broker name of the embedded broker (other peers in the group should use different broker names). One limiting feature of the Peer-to-Peer protocol is that it relies on IP multicast to locate the other peers in its group. This makes it unsuitable for use in wide area networks (and in some local area networks that do not have IP multicast enabled).
A more flexible way to create an embedded broker in the ActiveMQ component is to exploit ActiveMQ’s VM protocol, which connects to an embedded broker instance. If a broker of the required name does not already exist, the VM protocol automatically creates one. You can use this mechanism to create an embedded broker with custom configuration. For example:
<beans ... > ... <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="vm://broker1?brokerConfig=xbean:activemq.xml"/> </bean> ... </beans>
Where activemq.xml
is an ActiveMQ file which configures the embedded broker instance. Within the ActiveMQ configuration file, you can choose to enable one of the following persistence mechanisms:
- AMQ persistence(the default) — A fast and reliable message store that is native to ActiveMQ. For details, see amqPersistenceAdapter and AMQ Message Store.
- JDBC persistence — Uses JDBC to store messages in any JDBC-compatible database. For details, see jdbcPersistenceAdapter and ActiveMQ Persistence.
- Journal persistence — A fast persistence mechanism that stores messages in a rolling log file. For details, see journalPersistenceAdapter and ActiveMQ Persistence.
- Kaha persistence — A persistence mechanism developed specifically for ActiveMQ. For details, see kahaPersistenceAdapter and ActiveMQ Persistence.
See ActiveMQ in the Apache Camel Component Reference Guide for more details.
ActiveMQ Journal
The ActiveMQ Journal component is optimized for a special use case where multiple, concurrent producers write messages to queues, but there is only one active consumer. Messages are stored in rolling log files and concurrent writes are aggregated to boost efficiency.
6.5. Message Bus
Overview
Message bus refers to a messaging architecture, shown in Figure 6.5, “Message Bus Pattern”, that enables you to connect diverse applications running on diverse computing platforms. In effect, the Apache Camel and its components constitute a message bus.
Figure 6.5. Message Bus Pattern
The following features of the message bus pattern are reflected in Apache Camel:
- Common communication infrastructure — The router itself provides the core of the common communication infrastructure in Apache Camel. However, in contrast to some message bus architectures, Apache Camel provides a heterogeneous infrastructure: messages can be sent into the bus using a wide variety of different transports and using a wide variety of different message formats.
Adapters — Where necessary, Apache Camel can translate message formats and propagate messages using different transports. In effect, Apache Camel is capable of behaving like an adapter, so that external applications can hook into the message bus without refactoring their messaging protocols.
In some cases, it is also possible to integrate an adapter directly into an external application. For example, if you develop an application using Apache CXF, where the service is implemented using JAX-WS and JAXB mappings, it is possible to bind a variety of different transports to the service. These transport bindings function as adapters.