Chapter 35. Implementing a Processor
Abstract
Apache Camel allows you to implement a custom processor. You can then insert the custom processor into a route to perform operations on exchange objects as they pass through the route.
35.1. Processing Model
Pipelining model
The pipelining model describes the way in which processors are arranged in Section 5.4, “Pipes and Filters”. Pipelining is the most common way to process a sequence of endpoints (a producer endpoint is just a special type of processor). When the processors are arranged in this way, the exchange’s In and Out messages are processed as shown in Figure 35.1, “Pipelining Model”.
Figure 35.1. Pipelining Model
The processors in the pipeline look like services, where the In message is analogous to a request, and the Out message is analogous to a reply. In fact, in a realistic pipeline, the nodes in the pipeline are often implemented by Web service endpoints, such as the CXF component.
For example, Example 35.1, “Java DSL Pipeline” shows a Java DSL pipeline constructed from a sequence of two processors, ProcessorA
, ProcessorB
, and a producer endpoint, TargetURI.
Example 35.1. Java DSL Pipeline
from(SourceURI).pipeline(ProcessorA, ProcessorB, TargetURI);
35.2. Implementing a Simple Processor
Overview
This section describes how to implement a simple processor that executes message processing logic before delegating the exchange to the next processor in the route.
Processor interface
Simple processors are created by implementing the org.apache.camel.Processor interface. As shown in Example 35.2, “Processor Interface”, the interface defines a single method, process()
, which processes an exchange object.
Example 35.2. Processor Interface
package org.apache.camel; public interface Processor { void process(Exchange exchange) throws Exception; }
Implementing the Processor interface
To create a simple processor you must implement the Processor interface and provide the logic for the process()
method. Example 35.3, “Simple Processor Implementation” shows the outline of a simple processor implementation.
Example 35.3. Simple Processor Implementation
import org.apache.camel.Processor; public class MyProcessor implements Processor { public MyProcessor() { } public void process(Exchange exchange) throws Exception { // Insert code that gets executed *before* delegating // to the next processor in the chain. ... } }
All of the code in the process()
method gets executed before the exchange object is delegated to the next processor in the chain.
For examples of how to access the message body and header values inside a simple processor, see Section 35.3, “Accessing Message Content”.
Inserting the simple processor into a route
Use the process()
DSL command to insert a simple processor into a route. Create an instance of your custom processor and then pass this instance as an argument to the process()
method, as follows:
org.apache.camel.Processor myProc = new MyProcessor(); from("SourceURL").process(myProc).to("TargetURL");
35.3. Accessing Message Content
Accessing message headers
Message headers typically contain the most useful message content from the perspective of a router, because headers are often intended to be processed in a router service. To access header data, you must first get the message from the exchange object (for example, using Exchange.getIn()
), and then use the Message interface to retrieve the individual headers (for example, using Message.getHeader()
).
Example 35.4, “Accessing an Authorization Header” shows an example of a custom processor that accesses the value of a header named Authorization
. This example uses the ExchangeHelper.getMandatoryHeader()
method, which eliminates the need to test for a null header value.
Example 35.4. Accessing an Authorization Header
import org.apache.camel.*;
import org.apache.camel.util.ExchangeHelper;
public class MyProcessor implements Processor {
public void process(Exchange exchange) {
String auth = ExchangeHelper.getMandatoryHeader(
exchange,
"Authorization",
String.class
);
// process the authorization string...
// ...
}
}
For full details of the Message interface, see Section 34.2, “Messages”.
Accessing the message body
You can also access the message body. For example, to append a string to the end of the In message, you can use the processor shown in Example 35.5, “Accessing the Message Body”.
Example 35.5. Accessing the Message Body
import org.apache.camel.*; import org.apache.camel.util.ExchangeHelper; public class MyProcessor implements Processor { public void process(Exchange exchange) { Message in = exchange.getIn(); in.setBody(in.getBody(String.class) + " World!"); } }
Accessing message attachments
You can access a message’s attachments using either the Message.getAttachment()
method or the Message.getAttachments()
method. See Example 34.2, “Message Interface” for more details.
35.4. The ExchangeHelper Class
Overview
The org.apache.camel.util.ExchangeHelper
class is a Apache Camel utility class that provides methods that are useful when implementing a processor.
Resolve an endpoint
The static resolveEndpoint()
method is one of the most useful methods in the ExchangeHelper
class. You use it inside a processor to create new Endpoint
instances on the fly.
Example 35.6. The resolveEndpoint()
Method
public final class ExchangeHelper { ... @SuppressWarnings({"unchecked" }) public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException { ... } ... }
The first argument to resolveEndpoint()
is an exchange instance, and the second argument is usually an endpoint URI string. Example 35.7, “Creating a File Endpoint” shows how to create a new file endpoint from an exchange instance exchange
Example 35.7. Creating a File Endpoint
Endpoint file_endp = ExchangeHelper.resolveEndpoint(exchange, "file://tmp/messages/in.xml");
Wrapping the exchange accessors
The ExchangeHelper
class provides several static methods of the form getMandatoryBeanProperty()
, which wrap the corresponding getBeanProperty()
methods on the Exchange
class. The difference between them is that the original getBeanProperty()
accessors return null
, if the corresponding property is unavailable, and the getMandatoryBeanProperty()
wrapper methods throw a Java exception. The following wrapper methods are implemented in the ExchangeHelper
class:
public final class ExchangeHelper { ... public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException { ... } public static <T> T getMandatoryHeader(Exchange exchange, String propertyName, Class<T> type) throws NoSuchHeaderException { ... } public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException { ... } public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { ... } public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException { ... } public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { ... } ... }
Testing the exchange pattern
Several different exchange patterns are compatible with holding an In message. Several different exchange patterns are also compatible with holding an Out message. To provide a quick way of checking whether or not an exchange object is capable of holding an In message or an Out message, the ExchangeHelper
class provides the following methods:
public final class ExchangeHelper { ... public static boolean isInCapable(Exchange exchange) { ... } public static boolean isOutCapable(Exchange exchange) { ... } ... }
Get the In message’s MIME content type
If you want to find out the MIME content type of the exchange’s In message, you can access it by calling the ExchangeHelper.getContentType(exchange)
method. To implement this, the ExchangeHelper
object looks up the value of the In message’s Content-Type
header — this method relies on the underlying component to populate the header value).