Chapter 43. Producer and Consumer Templates
Abstract
The producer and consumer templates in Apache Camel are modelled after a feature of the Spring container API, whereby access to a resource is provided through a simplified, easy-to-use API known as a template. In the case of Apache Camel, the producer template and consumer template provide simplified interfaces for sending messages to and receiving messages from producer endpoints and consumer endpoints.
43.1. Using the Producer Template
43.1.1. Introduction to the Producer Template
Overview
The producer template supports a variety of different approaches to invoking producer endpoints. There are methods that support different formats for the request message (as an
Exchange
object, as a message body, as a message body with a single header setting, and so on) and there are methods to support both the synchronous and the asynchronous style of invocation. Overall, producer template methods can be grouped into the following categories:
Synchronous invocation
The methods for invoking endpoints synchronously have names of the form
sendSuffix()
and requestSuffix()
. For example, the methods for invoking an endpoint using either the default message exchange pattern (MEP) or an explicitly specified MEP are named send()
, sendBody()
, and sendBodyAndHeader()
(where these methods respectively send an Exchange
object, a message body, or a message body and header value). If you want to force the MEP to be InOut (request/reply semantics), you can call the request()
, requestBody()
, and requestBodyAndHeader()
methods instead.
The following example shows how to create a
ProducerTemplate
instance and use it to send a message body to the activemq:MyQueue
endpoint. The example also shows how to send a message body and header value using sendBodyAndHeader()
.
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue template.sendBody("activemq:MyQueue", "<hello>world!</hello>"); // Send with a body and header template.sendBodyAndHeader( "activemq:MyQueue", "<hello>world!</hello>", "CustomerRating", "Gold" );
Synchronous invocation with a processor
A special case of synchronous invocation is where you provide the
send()
method with a Processor
argument instead of an Exchange
argument. In this case, the producer template implicitly asks the specified endpoint to create an Exchange
instance (typically, but not always having the InOnly MEP by default). This default exchange is then passed to the processor, which initializes the contents of the exchange object.
The following example shows how to send an exchange initialized by the
MyProcessor
processor to the activemq:MyQueue
endpoint.
import org.apache.camel.ProducerTemplate import org.apache.camel.impl.DefaultProducerTemplate ... ProducerTemplate template = context.createProducerTemplate(); // Send to a specific queue, using a processor to initialize template.send("activemq:MyQueue", new MyProcessor());
The
MyProcessor
class is implemented as shown in the following example. In addition to setting the In message body (as shown here), you could also initialize message heades and exchange properties.
import org.apache.camel.Processor; import org.apache.camel.Exchange; ... public class MyProcessor implements Processor { public MyProcessor() { } public void process(Exchange ex) { ex.getIn().setBody("<hello>world!</hello>"); } }
Asynchronous invocation
The methods for invoking endpoints asynchronously have names of the form
asyncSendSuffix()
and asyncRequestSuffix()
. For example, the methods for invoking an endpoint using either the default message exchange pattern (MEP) or an explicitly specified MEP are named asyncSend()
and asyncSendBody()
(where these methods respectively send an Exchange
object or a message body). If you want to force the MEP to be InOut (request/reply semantics), you can call the asyncRequestBody()
, asyncRequestBodyAndHeader()
, and asyncRequestBodyAndHeaders()
methods instead.
The following example shows how to send an exchange asynchronously to the
direct:start
endpoint. The asyncSend()
method returns a java.util.concurrent.Future
object, which is used to retrieve the invocation result at a later time.
import java.util.concurrent.Future; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; ... Exchange exchange = new DefaultExchange(context); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncSend("direct:start", exchange); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the resulting exchange from the Future Exchange result = future.get();
The producer template also provides methods to send a message body asynchronously (for example, using
asyncSendBody()
or asyncRequestBody()
). In this case, you can use one of the following helper methods to extract the returned message body from the Future
object:
<T> T extractFutureBody(Future future, Class<T> type); <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;
The first version of the
extractFutureBody()
method blocks until the invocation completes and the reply message is available. The second version of the extractFutureBody()
method allows you to specify a timeout. Both methods have a type argument, type
, which casts the returned message body to the specified type using a built-in type converter.
The following example shows how to use the
asyncRequestBody()
method to send a message body to the direct:start
endpoint. The blocking extractFutureBody()
method is then used to retrieve the reply message body from the Future
object.
Future<Object> future = template.asyncRequestBody("direct:start", "Hello"); // You can do other things, whilst waiting for the invocation to complete ... // Now, retrieve the reply message body as a String type String result = template.extractFutureBody(future, String.class);
Asynchronous invocation with a callback
In the preceding asynchronous examples, the request message is dispatched in a sub-thread, while the reply is retrieved and processed by the main thread. The producer template also gives you the option, however, of processing replies in the sub-thread, using one of the
asyncCallback()
, asyncCallbackSendBody()
, or asyncCallbackRequestBody()
methods. In this case, you supply a callback object (of org.apache.camel.impl.SynchronizationAdapter
type), which automatically gets invoked in the sub-thread as soon as a reply message arrives.
The
Synchronization
callback interface is defined as follows:
package org.apache.camel.spi;
import org.apache.camel.Exchange;
public interface Synchronization {
void onComplete(Exchange exchange);
void onFailure(Exchange exchange);
}
Where the
onComplete()
method is called on receipt of a normal reply and the onFailure()
method is called on receipt of a fault message reply. Only one of these methods gets called back, so you must override both of them to ensure that all types of reply are processed.
The following example shows how to send an exchange to the
direct:start
endpoint, where the reply message is processed in the sub-thread by the SynchronizationAdapter
callback object.
import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.SynchronizationAdapter; ... Exchange exchange = context.getEndpoint("direct:start").createExchange(); exchange.getIn().setBody("Hello"); Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { assertEquals("Hello World", exchange.getIn().getBody()); } });
Where the
SynchronizationAdapter
class is a default implementation of the Synchronization
interface, which you can override to provide your own definitions of the onComplete()
and onFailure()
callback methods.
You still have the option of accessing the reply from the main thread, because the
asyncCallback()
method also returns a Future
object—for example:
// Retrieve the reply from the main thread, specifying a timeout Exchange reply = future.get(10, TimeUnit.SECONDS);
43.1.2. Synchronous Send
Overview
The synchronous send methods are a collection of methods that you can use to invoke a producer endpoint, where the current thread blocks until the method invocation is complete and the reply (if any) has been received. These methods are compatible with any kind of message exchange protocol.
Send an exchange
The basic
send()
method is a general-purpose method that sends the contents of an Exchange
object to an endpoint, using the message exchange pattern (MEP) of the exchange. The return value is the exchange that you get after it has been processed by the producer endpoint (possibly containing an Out message, depending on the MEP).
There are three varieties of
send()
method for sending an exchange that let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint
object.
Exchange send(Exchange exchange); Exchange send(String endpointUri, Exchange exchange); Exchange send(Endpoint endpoint, Exchange exchange);
Send an exchange populated by a processor
A simple variation of the general
send()
method is to use a processor to populate a default exchange, instead of supplying the exchange object explicitly (see the section called “Synchronous invocation with a processor” for details).
The
send()
methods for sending an exchange populated by a processor let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint
object. In addition, you can optionally specify the exchange's MEP by supplying the pattern
argument, instead of accepting the default.
Exchange send(Processor processor); Exchange send(String endpointUri, Processor processor); Exchange send(Endpoint endpoint, Processor processor); Exchange send( String endpointUri, ExchangePattern pattern, Processor processor ); Exchange send( Endpoint endpoint, ExchangePattern pattern, Processor processor );
Send a message body
If you are only concerned with the contents of the message body that you want to send, you can use the
sendBody()
methods to provide the message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The
sendBody()
methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint
object. In addition, you can optionally specify the exchange's MEP by supplying the pattern
argument, instead of accepting the default. The methods without a pattern
argument return void
(even though the invocation might give rise to a reply in some cases); and the methods with a pattern
argument return either the body of the Out message (if there is one) or the body of the In message (otherwise).
void sendBody(Object body); void sendBody(String endpointUri, Object body); void sendBody(Endpoint endpoint, Object body); Object sendBody( String endpointUri, ExchangePattern pattern, Object body ); Object sendBody( Endpoint endpoint, ExchangePattern pattern, Object body );
Send a message body and header(s)
For testing purposes, it is often interesting to try out the effect of a single header setting and the
sendBodyAndHeader()
methods are useful for this kind of header testing. You supply the message body and header setting as arguments to sendBodyAndHeader()
and let the producer template take care of inserting the body and header setting into a default exchange object.
The
sendBodyAndHeader()
methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint
object. In addition, you can optionally specify the exchange's MEP by supplying the pattern
argument, instead of accepting the default. The methods without a pattern
argument return void
(even though the invocation might give rise to a reply in some cases); and the methods with a pattern
argument return either the body of the Out message (if there is one) or the body of the In message (otherwise).
void sendBodyAndHeader( Object body, String header, Object headerValue ); void sendBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); void sendBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); Object sendBodyAndHeader( String endpointUri, ExchangePattern pattern, Object body, String header, Object headerValue ); Object sendBodyAndHeader( Endpoint endpoint, ExchangePattern pattern, Object body, String header, Object headerValue );
The
sendBodyAndHeaders()
methods are similar to the sendBodyAndHeader()
methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.
void sendBodyAndHeaders( Object body, Map<String, Object> headers ); void sendBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); void sendBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); Object sendBodyAndHeaders( String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers ); Object sendBodyAndHeaders( Endpoint endpoint, ExchangePattern pattern, Object body, Map<String, Object> headers );
Send a message body and exchange property
You can try out the effect of setting a single exchange property using the
sendBodyAndProperty()
methods. You supply the message body and property setting as arguments to sendBodyAndProperty()
and let the producer template take care of inserting the body and exchange property into a default exchange object.
The
sendBodyAndProperty()
methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint
object. In addition, you can optionally specify the exchange's MEP by supplying the pattern
argument, instead of accepting the default. The methods without a pattern
argument return void
(even though the invocation might give rise to a reply in some cases); and the methods with a pattern
argument return either the body of the Out message (if there is one) or the body of the In message (otherwise).
void sendBodyAndProperty( Object body, String property, Object propertyValue ); void sendBodyAndProperty( String endpointUri, Object body, String property, Object propertyValue ); void sendBodyAndProperty( Endpoint endpoint, Object body, String property, Object propertyValue ); Object sendBodyAndProperty( String endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue ); Object sendBodyAndProperty( Endpoint endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue );
43.1.3. Synchronous Request with InOut Pattern
Overview
The synchronous request methods are similar to the synchronous send methods, except that the request methods force the message exchange pattern to be InOut (conforming to request/reply semantics). Hence, it is generally convenient to use a synchronous request method, if you expect to receive a reply from the producer endpoint.
Request an exchange populated by a processor
The basic
request()
method is a general-purpose method that uses a processor to populate a default exchange and forces the message exchange pattern to be InOut (so that the invocation obeys request/reply semantics). The return value is the exchange that you get after it has been processed by the producer endpoint, where the Out message contains the reply message.
The
request()
methods for sending an exchange populated by a processor let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Exchange request(String endpointUri, Processor processor); Exchange request(Endpoint endpoint, Processor processor);
Request a message body
If you are only concerned with the contents of the message body in the request and in the reply, you can use the
requestBody()
methods to provide the request message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The
requestBody()
methods let you specify the target endpoint in one of the following ways: as the default endpoint, as an endpoint URI, or as an Endpoint
object. The return value is the body of the reply message (Out message body), which can either be returned as plain Object
or converted to a specific type, T
, using the built-in type converters (see Section 40.3, “Built-In Type Converters”).
Object requestBody(Object body); <T> T requestBody(Object body, Class<T> type); Object requestBody( String endpointUri, Object body ); <T> T requestBody( String endpointUri, Object body, Class<T> type ); Object requestBody( Endpoint endpoint, Object body ); <T> T requestBody( Endpoint endpoint, Object body, Class<T> type );
Request a message body and header(s)
You can try out the effect of setting a single header value using the
requestBodyAndHeader()
methods. You supply the message body and header setting as arguments to requestBodyAndHeader()
and let the producer template take care of inserting the body and exchange property into a default exchange object.
The
requestBodyAndHeader()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object. The return value is the body of the reply message (Out message body), which can either be returned as plain Object
or converted to a specific type, T
, using the built-in type converters (see Section 40.3, “Built-In Type Converters”).
Object requestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); <T> T requestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue, Class<T> type ); Object requestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); <T> T requestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type );
The
requestBodyAndHeaders()
methods are similar to the requestBodyAndHeader()
methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.
Object requestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); <T> T requestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers, Class<T> type ); Object requestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); <T> T requestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type );
43.1.4. Asynchronous Send
Overview
The producer template provides a variety of methods for invoking a producer endpoint asynchronously, so that the main thread does not block while waiting for the invocation to complete and the reply message can be retrieved at a later time. The asynchronous send methods described in this section are compatible with any kind of message exchange protocol.
Send an exchange
The basic
asyncSend()
method takes an Exchange
argument and invokes an endpoint asynchronously, using the message exchange pattern (MEP) of the specified exchange. The return value is a java.util.concurrent.Future
object, which is a ticket you can use to collect the reply message at a later time—for details of how to obtain the return value from the Future
object, see the section called “Asynchronous invocation”.
The following
asyncSend()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Exchange> asyncSend(String endpointUri, Exchange exchange); Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
Send an exchange populated by a processor
A simple variation of the general
asyncSend()
method is to use a processor to populate a default exchange, instead of supplying the exchange object explicitly.
The following
asyncSend()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Exchange> asyncSend(String endpointUri, Processor processor); Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
Send a message body
If you are only concerned with the contents of the message body that you want to send, you can use the
asyncSendBody()
methods to send a message body asynchronously and let the producer template take care of inserting the body into a default exchange object.
The
asyncSendBody()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Object> asyncSendBody(String endpointUri, Object body); Future<Object> asyncSendBody(Endpoint endpoint, Object body);
43.1.5. Asynchronous Request with InOut Pattern
Overview
The asynchronous request methods are similar to the asynchronous send methods, except that the request methods force the message exchange pattern to be InOut (conforming to request/reply semantics). Hence, it is generally convenient to use an asynchronous request method, if you expect to receive a reply from the producer endpoint.
Request a message body
If you are only concerned with the contents of the message body in the request and in the reply, you can use the
requestBody()
methods to provide the request message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The
asyncRequestBody()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object. The return value that is retrievable from the Future
object is the body of the reply message (Out message body), which can be returned either as a plain Object
or converted to a specific type, T
, using a built-in type converter (see the section called “Asynchronous invocation”).
Future<Object> asyncRequestBody( String endpointUri, Object body ); <T> Future<T> asyncRequestBody( String endpointUri, Object body, Class<T> type ); Future<Object> asyncRequestBody( Endpoint endpoint, Object body ); <T> Future<T> asyncRequestBody( Endpoint endpoint, Object body, Class<T> type );
Request a message body and header(s)
You can try out the effect of setting a single header value using the
asyncRequestBodyAndHeader()
methods. You supply the message body and header setting as arguments to asyncRequestBodyAndHeader()
and let the producer template take care of inserting the body and exchange property into a default exchange object.
The
asyncRequestBodyAndHeader()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object. The return value that is retrievable from the Future
object is the body of the reply message (Out message body), which can be returned either as a plain Object
or converted to a specific type, T
, using a built-in type converter (see the section called “Asynchronous invocation”).
Future<Object> asyncRequestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue ); <T> Future<T> asyncRequestBodyAndHeader( String endpointUri, Object body, String header, Object headerValue, Class<T> type ); Future<Object> asyncRequestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue ); <T> Future<T> asyncRequestBodyAndHeader( Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type );
The
asyncRequestBodyAndHeaders()
methods are similar to the asyncRequestBodyAndHeader()
methods, except that instead of supplying just a single header setting, these methods allow you to specify a complete hash map of header settings.
Future<Object> asyncRequestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers ); <T> Future<T> asyncRequestBodyAndHeaders( String endpointUri, Object body, Map<String, Object> headers, Class<T> type ); Future<Object> asyncRequestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers ); <T> Future<T> asyncRequestBodyAndHeaders( Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type );
43.1.6. Asynchronous Send with Callback
Overview
The producer template also provides the option of processing the reply message in the same sub-thread that is used to invoke the producer endpoint. In this case, you provide a callback object, which automatically gets invoked in the sub-thread as soon as the reply message is received. In other words, the asynchronous send with callback methods enable you to initiate an invocation in your main thread and then have all of the associated processing—invocation of the producer endpoint, waiting for a reply and processing the reply—occur asynchronously in a sub-thread.
Send an exchange
The basic
asyncCallback()
method takes an Exchange
argument and invokes an endpoint asynchronously, using the message exchange pattern (MEP) of the specified exchange. This method is similar to the asyncSend()
method for exchanges, except that it takes an additional org.apache.camel.spi.Synchronization
argument, which is a callback interface with two methods: onComplete()
and onFailure()
. For details of how to use the Synchronization
callback, see the section called “Asynchronous invocation with a callback”.
The following
asyncCallback()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Exchange> asyncCallback( String endpointUri, Exchange exchange, Synchronization onCompletion ); Future<Exchange> asyncCallback( Endpoint endpoint, Exchange exchange, Synchronization onCompletion );
Send an exchange populated by a processor
The
asyncCallback()
method for processors calls a processor to populate a default exchange and forces the message exchange pattern to be InOut (so that the invocation obeys request/reply semantics).
The following
asyncCallback()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Exchange> asyncCallback( String endpointUri, Processor processor, Synchronization onCompletion ); Future<Exchange> asyncCallback( Endpoint endpoint, Processor processor, Synchronization onCompletion );
Send a message body
If you are only concerned with the contents of the message body that you want to send, you can use the
asyncCallbackSendBody()
methods to send a message body asynchronously and let the producer template take care of inserting the body into a default exchange object.
The
asyncCallbackSendBody()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Object> asyncCallbackSendBody( String endpointUri, Object body, Synchronization onCompletion ); Future<Object> asyncCallbackSendBody( Endpoint endpoint, Object body, Synchronization onCompletion );
Request a message body
If you are only concerned with the contents of the message body in the request and in the reply, you can use the
asyncCallbackRequestBody()
methods to provide the request message body as an argument and let the producer template take care of inserting the body into a default exchange object.
The
asyncCallbackRequestBody()
methods let you specify the target endpoint in one of the following ways: as an endpoint URI, or as an Endpoint
object.
Future<Object> asyncCallbackRequestBody( String endpointUri, Object body, Synchronization onCompletion ); Future<Object> asyncCallbackRequestBody( Endpoint endpoint, Object body, Synchronization onCompletion );