1.5. Processors
Overview
To enable the router to do something more interesting than simply connecting a consumer endpoint to a producer endpoint, you can add processors to your route. A processor is a command you can insert into a routing rule to perform arbitrary processing of messages that flow through the rule. Apache Camel provides a wide variety of different processors, as shown in Table 1.1, “Apache Camel Processors”.
Java DSL | XML DSL | Description |
---|---|---|
aggregate() | aggregate |
Aggregator EIP: Creates an aggregator, which combines multiple incoming exchanges into a single exchange.
|
aop() | aop |
Use Aspect Oriented Programming (AOP) to do work before and after a specified sub-route. See Section 2.8, “Aspect Oriented Programming”.
|
bean() , beanRef() | bean |
Process the current exchange by invoking a method on a Java object (or bean). See Section 2.4, “Bean Integration”.
|
choice() | choice |
Content Based Router EIP: Selects a particular sub-route based on the exchange content, using
when and otherwise clauses.
|
convertBodyTo() | convertBodyTo |
Converts the In message body to the specified type.
|
delay() | delay |
Delayer EIP: Delays the propagation of the exchange to the latter part of the route.
|
doTry() | doTry |
Creates a try/catch block for handling exceptions, using
doCatch , doFinally , and end clauses.
|
end() | N/A | Ends the current command block. |
enrich() ,enrichRef() | enrich |
Content Enricher EIP: Combines the current exchange with data requested from a specified producer endpoint URI.
|
filter() | filter |
Message Filter EIP: Uses a predicate expression to filter incoming exchanges.
|
idempotentConsumer() | idempotentConsumer |
Idempotent Consumer EIP: Implements a strategy to suppress duplicate messages.
|
inheritErrorHandler() | @inheritErrorHandler | Boolean option that can be used to disable the inherited error handler on a particular route node (defined as a sub-clause in the Java DSL and as an attribute in the XML DSL). |
inOnly() | inOnly |
Either sets the current exchange's MEP to InOnly (if no arguments) or sends the exchange as an InOnly to the specified endpoint(s).
|
inOut() | inOut |
Either sets the current exchange's MEP to InOut (if no arguments) or sends the exchange as an InOut to the specified endpoint(s).
|
loadBalance() | loadBalance |
Load Balancer EIP: Implements load balancing over a collection of endpoints.
|
log() | log | Logs a message to the console. |
loop() | loop |
Loop EIP: Repeatedly resends each exchange to the latter part of the route.
|
markRollbackOnly() | @markRollbackOnly | (Transactions) Marks the current transaction for rollback only (no exception is raised). In the XML DSL, this option is set as a boolean attribute on the rollback element. See "Transaction Guide". |
markRollbackOnlyLast() | @markRollbackOnlyLast | (Transactions) If one or more transactions have previously been associated with this thread and then suspended, this command marks the latest transaction for rollback only (no exception is raised). In the XML DSL, this option is set as a boolean attribute on the rollback element. See "Transaction Guide". |
marshal() | marshal |
Transforms into a low-level or binary format using the specified data format, in preparation for sending over a particular transport protocol.
|
multicast() | multicast |
Multicast EIP: Multicasts the current exchange to multiple destinations, where each destination gets its own copy of the exchange.
|
onCompletion() | onCompletion |
Defines a sub-route (terminated by
end() in the Java DSL) that gets executed after the main route has completed. For conditional execution, use the onWhen sub-clause. Can also be defined on its own line (not in a route).
|
onException() | onException |
Defines a sub-route (terminated by
end() in the Java DSL) that gets executed whenever the specified exception occurs. Usually defined on its own line (not in a route).
|
pipeline() | pipeline |
Pipes and Filters EIP: Sends the exchange to a series of endpoints, where the output of one endpoint becomes the input of the next endpoint. See also Section 2.1, “Pipeline Processing”.
|
policy() | policy |
Apply a policy to the current route (currently only used for transactional policies—see "Transaction Guide").
|
pollEnrich() ,pollEnrichRef() | pollEnrich |
Content Enricher EIP: Combines the current exchange with data polled from a specified consumer endpoint URI.
|
process() ,processRef | process |
Execute a custom processor on the current exchange. See the section called “Custom processor” and Part IV, “Programming EIP Components”.
|
recipientList() | recipientList |
Recipient List EIP: Sends the exchange to a list of recipients that is calculated at runtime (for example, based on the contents of a header).
|
removeHeader() | removeHeader |
Removes the specified header from the exchange's In message.
|
removeHeaders() | removeHeaders | Removes the headers matching the specified pattern from the exchange's In message. The pattern can have the form, prefix* —in which case it matches every name starting with prefix—otherwise, it is interpreted as a regular expression. |
removeProperty() | removeProperty |
Removes the specified exchange property from the exchange.
|
resequence() | resequence |
Resequencer EIP: Re-orders incoming exchanges on the basis of a specified comparotor operation. Supports a batch mode and a stream mode.
|
rollback() | rollback |
(Transactions) Marks the current transaction for rollback only (also raising an exception, by default). See "Transaction Guide".
|
routingSlip() | routingSlip |
Routing Slip EIP: Routes the exchange through a pipeline that is constructed dynamically, based on the list of endpoint URIs extracted from a slip header.
|
sample() | sample | Creates a sampling throttler, allowing you to extract a sample of exchanges from the traffic on a route. |
setBody() | setBody |
Sets the message body of the exchange's In message.
|
setExchangePattern() | setExchangePattern |
Sets the current exchange's MEP to the specified value. See the section called “Message exchange patterns”.
|
setHeader() | setHeader |
Sets the specified header in the exchange's In message.
|
setOutHeader() | setOutHeader |
Sets the specified header in the exchange's Out message.
|
setProperty() | setProperty() |
Sets the specified exchange property.
|
sort() | sort |
Sorts the contents of the In message body (where a custom comparator can optionally be specified).
|
split() | split |
Splitter EIP: Splits the current exchange into a sequence of exchanges, where each split exchange contains a fragment of the original message body.
|
stop() | stop |
Stops routing the current exchange and marks it as completed.
|
threads() | threads |
Creates a thread pool for concurrent processing of the latter part of the route.
|
throttle() | throttle |
Throttler EIP: Limit the flow rate to the specified level (exchanges per second).
|
throwException() | throwException |
Throw the specified Java exception.
|
to() | to |
Send the exchange to one or more endpoints. See Section 2.1, “Pipeline Processing”.
|
toF() | N/A | Send the exchange to an endpoint, using string formatting. That is, the endpoint URI string can embed substitutions in the style of the C printf() function. |
transacted() | transacted |
Create a Spring transaction scope that encloses the latter part of the route. See "Transaction Guide".
|
transform() | transform |
Message Translator EIP: Copy the In message headers to the Out message headers and set the Out message body to the specified value.
|
unmarshal() | unmarshal |
Transforms the In message body from a low-level or binary format to a high-level format, using the specified data format.
|
validate() | validate | Takes a predicate expression to test whether the current message is valid. If the predicate returns false , throws a PredicateValidationException exception. |
wireTap() | wireTap |
Wire Tap EIP: Sends a copy of the current exchange to the specified wire tap URI, using the
ExchangePattern.InOnly MEP.
|
Some sample processors
To get some idea of how to use processors in a route, see the following examples:
Choice
The
choice()
processor is a conditional statement that is used to route incoming messages to alternative producer endpoints. Each alternative producer endpoint is preceded by a when()
method, which takes a predicate argument. If the predicate is true, the following target is selected, otherwise processing proceeds to the next when()
method in the rule. For example, the following choice()
processor directs incoming messages to either Target1, Target2, or Target3, depending on the values of Predicate1 and Predicate2:
from("SourceURL") .choice() .when(Predicate1).to("Target1") .when(Predicate2).to("Target2") .otherwise().to("Target3");
Or equivalently in Spring XML:
<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <choice> <when> <!-- First predicate --> <simple>header.foo = 'bar'</simple> <to uri="Target1"/> </when> <when> <!-- Second predicate --> <simple>header.foo = 'manchu'</simple> <to uri="Target2"/> </when> <otherwise> <to uri="Target3"/> </otherwise> </choice> </route> </camelContext>
In the Java DSL, there is a special case where you might need to use the
endChoice()
command. Some of the standard Apache Camel processors enable you to specify extra parameters using special sub-clauses, effectively opening an extra level of nesting which is usually terminated by the end()
command. For example, you could specify a load balancer clause as loadBalance().roundRobin().to("mock:foo").to("mock:bar").end()
, which load balances messages between the mock:foo
and mock:bar
endpoints. If the load balancer clause is embedded in a choice condition, however, it is necessary to terminate the clause using the endChoice()
command, as follows:
from("direct:start") .choice() .when(bodyAs(String.class).contains("Camel")) .loadBalance().roundRobin().to("mock:foo").to("mock:bar").endChoice() .otherwise() .to("mock:result");
Filter
The
filter()
processor can be used to prevent uninteresting messages from reaching the producer endpoint. It takes a single predicate argument: if the predicate is true, the message exchange is allowed through to the producer; if the predicate is false, the message exchange is blocked. For example, the following filter blocks a message exchange, unless the incoming message contains a header, foo
, with value equal to bar
:
from("SourceURL").filter(header("foo").isEqualTo("bar")).to("TargetURL");
Or equivalently in Spring XML:
<camelContext id="filterRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <filter> <simple>header.foo = 'bar'</simple> <to uri="TargetURL"/> </filter> </route> </camelContext>
Throttler
The
throttle()
processor ensures that a producer endpoint does not get overloaded. The throttler works by limiting the number of messages that can pass through per second. If the incoming messages exceed the specified rate, the throttler accumulates excess messages in a buffer and transmits them more slowly to the producer endpoint. For example, to limit the rate of throughput to 100 messages per second, you can define the following rule:
from("SourceURL").throttle(100).to("TargetURL");
Or equivalently in Spring XML:
<camelContext id="throttleRoute" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="SourceURL"/> <throttle maximumRequestsPerPeriod="100" timePeriodMillis="1000"> <to uri="TargetURL"/> </throttle> </route> </camelContext>
Custom processor
If none of the standard processors described here provide the functionality you need, you can always define your own custom processor. To create a custom processor, define a class that implements the
org.apache.camel.Processor
interface and overrides the process()
method. The following custom processor, MyProcessor
, removes the header named foo
from incoming messages:
Example 1.3. Implementing a Custom Processor Class
public class MyProcessor implements org.apache.camel.Processor { public void process(org.apache.camel.Exchange exchange) { inMessage = exchange.getIn(); if (inMessage != null) { inMessage.removeHeader("foo"); } } };
To insert the custom processor into a router rule, invoke the
process()
method, which provides a generic mechanism for inserting processors into rules. For example, the following rule invokes the processor defined in Example 1.3, “Implementing a Custom Processor Class”:
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL").process(myProc).to("TargetURL");