8.3. Recipient List
Overview
A recipient list, shown in Figure 8.3, “Recipient List Pattern”, is a type of router that sends each incoming message to multiple different destinations. In addition, a recipient list typically requires that the list of recipients be calculated at run time.
Figure 8.3. Recipient List Pattern
Recipient list with fixed destinations
The simplest kind of recipient list is where the list of destinations is fixed and known in advance, and the exchange pattern is InOnly. In this case, you can hardwire the list of destinations into the
to()
Java DSL command.
Note
Java DSL example
The following example shows how to route an InOnly exchange from a consumer endpoint,
queue:a
, to a fixed list of destinations:
from("seda:a").to("seda:b", "seda:c", "seda:d");
XML configuration example
The following example shows how to configure the same route in XML:
<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <to uri="seda:b"/> <to uri="seda:c"/> <to uri="seda:d"/> </route> </camelContext>
Recipient list calculated at run time
In most cases, when you use the recipient list pattern, the list of recipients should be calculated at runtime. To do this use the
recipientList()
processor, which takes a list of destinations as its sole argument. Because Apache Camel applies a type converter to the list argument, it should be possible to use most standard Java list types (for example, a collection, a list, or an array). For more details about type converters, see Section 42.3, “Built-In Type Converters”.
The recipients receive a copy of the same exchange instance and Apache Camel executes them sequentially.
Java DSL example
The following example shows how to extract the list of destinations from a message header called
recipientListHeader
, where the header value is a comma-separated list of endpoint URIs:
from("direct:a").recipientList(header("recipientListHeader").tokenize(","));
In some cases, if the header value is a list type, you might be able to use it directly as the argument to
recipientList()
. For example:
from("seda:a").recipientList(header("recipientListHeader"));
However, this example is entirely dependent on how the underlying component parses this particular header. If the component parses the header as a simple string, this example will not work. The header must be parsed into some type of Java list.
XML configuration example
The following example shows how to configure the preceding route in XML, where the header value is a comma-separated list of endpoint URIs:
<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <recipientList delimiter=","> <header>recipientListHeader</header> </recipientList> </route> </camelContext>
Sending to multiple recipients in parallel
Available as of Camel 2.2
The Recipient List supports
parallelProcessing
, which is similar to the corresponding feature in Splitter. Use the parallel processing feature to send the exchange to multiple recipients concurrently—for example:
from("direct:a").recipientList(header("myHeader")).parallelProcessing();
In Spring XML, the parallel processing feature is implemented as an attribute on the
recipientList
tag—for example:
<route> <from uri="direct:a"/> <recipientList parallelProcessing="true"> <header>myHeader</header> </recipientList> </route>
Stop on exception
Available as of Camel 2.2
The Recipient List supports the
stopOnException
feature, which you can use to stop sending to any further recipients, if any recipient fails.
from("direct:a").recipientList(header("myHeader")).stopOnException();
And in Spring XML its an attribute on the recipient list tag.
In Spring XML, the stop on exception feature is implemented as an attribute on the
recipientList
tag—for example:
<route> <from uri="direct:a"/> <recipientList stopOnException="true"> <header>myHeader</header> </recipientList> </route>
Note
You can combine
parallelProcessing
and stopOnException
in the same route.
Ignore invalid endpoints
Available as of Camel 2.3
The Recipient List supports the
ignoreInvalidEndpoints
option, which enables the recipient list to skip invalid endpoints (Routing Slip also supports this option). For example:
from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();
And in Spring XML, you can enable this option by setting the
ignoreInvalidEndpoints
attribute on the recipientList
tag, as follows
<route> <from uri="direct:a"/> <recipientList ignoreInvalidEndpoints="true"> <header>myHeader</header> </recipientList> </route>
Consider the case where
myHeader
contains the two endpoints, direct:foo,xxx:bar
. The first endpoint is valid and works. The second is invalid and, therefore, ignored. Apache Camel logs at INFO
level whenever an invalid endpoint is encountered.
Using custom AggregationStrategy
Available as of Camel 2.2
You can use a custom
AggregationStrategy
with the Recipient List, which is useful for aggregating replies from the recipients in the list. By default, Apache Camel uses the UseLatestAggregationStrategy
aggregation strategy, which keeps just the last received reply. For a more sophisticated aggregation strategy, you can define your own implementation of the AggregationStrategy
interface—see Aggregator EIP for details. For example, to apply the custom aggregation strategy, MyOwnAggregationStrategy
, to the reply messages, you can define a Java DSL route as follows:
from("direct:a") .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy()) .to("direct:b");
In Spring XML, you can specify the custom aggregation strategy as an attribute on the
recipientList
tag, as follows:
<route> <from uri="direct:a"/> <recipientList strategyRef="myStrategy"> <header>myHeader</header> </recipientList> <to uri="direct:b"/> </route> <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
Using custom thread pool
Available as of Camel 2.2
This is only needed when you use
parallelProcessing
. By default Camel uses a thread pool with 10 threads. Notice this is subject to change when we overhaul thread pool management and configuration later (hopefully in Camel 2.2).
You configure this just as you would with the custom aggregation strategy.
Using method call as recipient list
You can use a Bean to provide the recipients, for example:
from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");
Where the
MessageRouter
bean is defined as follows:
public class MessageRouter { public String routeTo() { String queueName = "activemq:queue:test2"; return queueName; } }
Bean as recipient list
You can make a bean behave as a recipient list by adding the
@RecipientList
annotation to a methods that returns a list of recipients. For example:
public class MessageRouter { @RecipientList public String routeTo() { String queueList = "activemq:queue:test1,activemq:queue:test2"; return queueList; } }
In this case, do not include the
recipientList
DSL command in the route. Define the route as follows:
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");
Using timeout
Available as of Camel 2.5
If you use
parallelProcessing
, you can configure a total timeout
value in milliseconds. Camel will then process the messages in parallel until the timeout is hit. This allows you to continue processing if one message is slow.
In the example below, the
recipientlist
header has the value, direct:a,direct:b,direct:c
, so that the message is sent to three recipients. We have a timeout of 250 milliseconds, which means only the last two messages can be completed within the timeframe. The aggregation therefore yields the string result, BC
.
from("direct:start") .recipientList(header("recipients"), ",") .aggregationStrategy(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250) // use end to indicate end of recipientList clause .end() .to("mock:result"); from("direct:a").delay(500).to("mock:A").setBody(constant("A")); from("direct:b").to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C"));
Note
This
timeout
feature is also supported by splitter
and both multicast
and recipientList
.
By default if a timeout occurs the
AggregationStrategy
is not invoked. However you can implement a specialized version
// Java public interface TimeoutAwareAggregationStrategy extends AggregationStrategy { /** * A timeout occurred * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) * @param index the index * @param total the total * @param timeout the timeout value in millis */ void timeout(Exchange oldExchange, int index, int total, long timeout);
This allows you to deal with the timeout in the
AggregationStrategy
if you really need to.
Timeout is total
The timeout is total, which means that after X time, Camel will aggregate the messages which has completed within the timeframe. The remainders will be cancelled. Camel will also only invoke the
timeout
method in the TimeoutAwareAggregationStrategy
once, for the first index which caused the timeout.
Apply custom processing to the outgoing messages
Before
recipientList
sends a message to one of the recipient endpoints, it creates a message replica, which is a shallow copy of the original message. If you want to perform some custom processing on each message replica before the replica is sent to its endpoint, you can invoke the onPrepare
DSL command in the recipientList
clause. The onPrepare
command inserts a custom processor just after the message has been shallow-copied and just before the message is dispatched to its endpoint. For example, in the following route, the CustomProc
processor is invoked on the message replica for each recipient endpoint:
from("direct:start") .recipientList().onPrepare(new CustomProc());
A common use case for the
onPrepare
DSL command is to perform a deep copy of some or all elements of a message. This allows each message replica to be modified independently of the others. For example, the following CustomProc
processor class performs a deep copy of the message body, where the message body is presumed to be of type, BodyType
, and the deep copy is performed by the method, BodyType.deepCopy()
.
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
Options
The
recipientList
DSL command supports the following options:
Name | Default Value | Description |
---|---|---|
delimiter
|
,
|
Delimiter used if the Expression returned multiple endpoints. |
strategyRef
|
Refers to an AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the Recipient List. By default Camel will use the last reply as the outgoing message. | |
strategyMethodName
|
This option can be used to explicitly specify the method name to use, when using POJOs as the AggregationStrategy .
|
|
strategyMethodAllowNull
|
false
|
This option can be used, when using POJOs as the AggregationStrategy . If false , the aggregate method is not used, when there is no data to enrich. If true , null values are used for the oldExchange , when there is no data to enrich.
|
parallelProcessing
|
false
|
Camel 2.2: If enables then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. |
parallelAggregate
|
false
|
If enabled, the aggregate method on AggregationStrategy can be called concurrently. Note that this requires the implementation of AggregationStrategy to be thread-safe. By default, this option is false , which means that Camel automatically synchronizes calls to the aggregate method. In some use-cases, however, you can improve performance by implementing AggregationStrategy as thread-safe and setting this option to true .
|
executorServiceRef
|
Camel 2.2: Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. | |
stopOnException
|
false
|
Camel 2.2: Whether or not to stop continue processing immediately when an exception occurred. If disable, then Camel will send the message to all recipients regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that. |
ignoreInvalidEndpoints
|
false
|
Camel 2.3: If an endpoint uri could not be resolved, should it be ignored. Otherwise Camel will thrown an exception stating the endpoint uri is not valid. |
streaming
|
false
|
Camel 2.5: If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as the Expression specified. |
timeout
|
Camel 2.5: Sets a total timeout specified in millis. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out.
|
|
onPrepareRef
|
Camel 2.8: Refers to a custom Processor to prepare the copy of the Exchange each recipient will receive. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc. | |
shareUnitOfWork
|
false
|
Camel 2.8: Whether the unit of work should be shared. See the same option on Splitter for more details. |
cacheSize
|
0
|
Camel 2.13.1/2.12.4: Allows to configure the cache size for the ProducerCache which caches producers for reuse in the routing slip. Will by default use the default cache size which is 0. Setting the value to -1 allows to turn off the cache all together. |
Using Exchange Pattern in Recipient List
By default, the Recipient List uses the current exchange pattern. However, there may be few cases where you can send a message to a recipient using a different exchange pattern.
For example, you may have a route that initiates as a InOnly route. Now, If you want to use InOut exchange pattern with a recipient list, you need to configure the exchange pattern directly in the recipient endpoints.
The following example illustrates the route where the new files will start as InOnly and then route to a recipient list. If you want to use InOut with the ActiveMQ (JMS) endpoint, you need to specify this using the exchangePattern equals to InOut option. However, the response form the JMS request or reply will then be continued routed, and thus the response is stored in as a file in the outbox directory.
from("file:inbox") // the exchange pattern is InOnly initially when using a file route .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut") .to("file:outbox");
Note
The
InOut
exchange pattern must get a response during the timeout. However, it fails if the response is not recieved.