Chapter 9. Message Transformation
Abstract
The message transformation patterns describe how to modify the contents of messages for various purposes.
9.1. Content Enricher
Overview
The content enricher pattern describes a scenario where the message destination requires more data than is present in the original message. In this case, you would use a content enricher to pull in the extra data from an external resource.
Figure 9.1. Content Enricher Pattern
Models of content enrichment
Apache Camel supports two kinds of content enricher, as follows:
enrich()
— obtains additional data from the resource by sending a copy of the current exchange to a producer endpoint and then using the data from the resulting reply (the exchange created by the enricher is always an InOut exchange).pollEnrich()
— obtains the additional data by polling a consumer endpoint for data. Effectively, the consumer endpoint from the main route and the consumer endpoint inpollEnrich()
are coupled, such that exchanges incoming on the main route trigger a poll of thepollEnrich()
endpoint.
Note
Content Enricher with enrich and pollEnrich pattern supports dynamic endpoint
uris
. You can compute uris
using an expression that enables you to use values from the current exchange. For example, you can poll a file with a name that is computed from the data exchange. This change breaks the XML DSL and enables you to migrate easily. The Java DSL stays backwards compatible.
Content enrichment using enrich()
AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource") ...
The content enricher (
enrich
) retrieves additional data from a resource endpoint in order to enrich an incoming message (contained in the orginal exchange). An aggregation strategy combines the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange)
method corresponds to the the original exchange, and the second parameter corresponds to the resource exchange. The results from the resource endpoint are stored in the resource exchange's Out message. Here is a sample template for implementing your own aggregation strategy class:
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object originalBody = original.getIn().getBody(); Object resourceResponse = resource.getOut().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
Using this template, the original exchange can have any exchange pattern. The resource exchange created by the enricher is always an InOut exchange.
Spring XML Enrich Example
The preceding example can also be implemented in Spring XML:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
Default aggregation strategy
The aggregation strategy is optional. If you do not provide it, Apache Camel will use the body obtained from the resource by default. For example:
from("direct:start") .enrich("direct:resource") .to("direct:result");
In the preceding route, the message sent to the
direct:result
endpoint contains the output from the direct:resource
, because this example does not use any custom aggregation.
In XML DSL, just omit the
strategyRef
attribute, as follows:
<route> <from uri="direct:start"/> <enrich uri="direct:resource"/> <to uri="direct:result"/> </route>
Enrich Options
The
enrich
DSL command supports the following options:
Name | Default Value | Description |
---|---|---|
uri
|
The endpoint uri for the external service to enrich from. You must use either uri or ref .
|
|
ref
|
Refers to the endpoint for the external service to enrich from. You must use either uri or ref .
|
|
strategyRef
|
Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. | |
aggregateOnException
|
Refers to the Aggregate method. The aggregateOnException enables you to deal with exceptions. For example, you can suppress the exception or set a custom message. | |
shareUnitOfWork
|
false | Camel 2.16: Shares the unit of work with the parent and the resource exchange. However, by default, Enrich does not share the unit of work between the parent exchange and the resource exchange. Also, the resource exchange has its own individual unit of work. |
Content enrich using pollEnrich
The
pollEnrich
command treats the resource endpoint as a consumer. Instead of sending an exchange to the resource endpoint, it polls the endpoint. By default, the poll returns immediately, if there is no exchange available from the resource endpoint. For example, the following route reads a file whose name is extracted from the header of an incoming JMS message:
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId") .to("bean:processOrder");
And if you want to wait at most 20 seconds for the file to be ready, you can use a timeout as follows:
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds .to("bean:processOrder");
You can also specify an aggregation strategy for
pollEnrich
, as follows:
.pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)
Note
The resource exchange passed to the aggregation strategy's
aggregate()
method might be null
, if the poll times out before an exchange is received.
Data from current Exchange not used
pollEnrich
does not access any data from the current Exchange, so that, when polling, it cannot use any of the existing headers you may have set on the Exchange. For example, you cannot set a filename in the Exchange.FILE_NAME
header and use pollEnrich
to consume only that file. For that, you must set the filename in the endpoint URI.
Polling methods used by pollEnrich()
In general, the
pollEnrich()
enricher polls the consumer endpoint using one of the following polling methods:
receiveNoWait()
(used by default)receive()
receive(long timeout)
The
pollEnrich()
command's timeout argument (specified in milliseconds) determines which method gets called, as follows:
- Timeout is
0
or not specified,receiveNoWait
is called. - Timeout is negative,
receive
is called. - Otherwise,
receive(timeout)
is called.
pollEnrich example
In this example we enrich the message by loading the content from the file named inbox/data.txt.
from("direct:start") .pollEnrich("file:inbox?fileName=data.txt") .to("direct:result");
And in XML DSL you do:
<route> <from uri="direct:start"/> <pollEnrich uri="file:inbox?fileName=data.txt"/> <to uri="direct:result"/> </route>
If there is no file then the message is empty. We can use a timeout to either wait (potential forever) until a file exists, or use a timeout to wait a period. For example to wait up til 5 seconds you can do:
<route> <from uri="direct:start"/> <pollEnrich uri="file:inbox?fileName=data.txt" timeout="5000"/> <to uri="direct:result"/> </route>
PollEnrich Options
The
pollEnrich
DSL command supports the following options:
Name | Default Value | Description |
---|---|---|
uri
|
The endpoint uri for the external service to enrich from. You must use either uri or ref .
|
|
ref
|
Refers to the endpoint for the external service to enrich from. You must use either uri or ref .
|
|
strategyRef
|
Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. | |
timeout
|
0
|
Timeout in millis to use when polling from the external service. See below for important details about the timeout. |