このコンテンツは選択した言語では利用できません。
Chapter 42. Producer Interface
Abstract
This chapter describes how to implement the Producer interface, which is an essential step in the implementation of a Apache Camel component.
42.1. The Producer Interface
Overview
An instance of org.apache.camel.Producer type represents a target endpoint in a route. The role of the producer is to send requests (In messages) to a specific physical endpoint and to receive the corresponding response (Out or Fault message). A Producer
object is essentially a special kind of Processor
that appears at the end of a processor chain (equivalent to a route). Figure 42.1, “Producer Inheritance Hierarchy” shows the inheritance hierarchy for producers.
Figure 42.1. Producer Inheritance Hierarchy
The Producer interface
Example 42.1, “Producer Interface” shows the definition of the org.apache.camel.Producer
interface.
Example 42.1. Producer Interface
package org.apache.camel; public interface Producer extends Processor, Service, IsSingleton { Endpoint<E> getEndpoint(); Exchange createExchange(); Exchange createExchange(ExchangePattern pattern); Exchange createExchange(E exchange); }
Producer methods
The Producer interface defines the following methods:
-
process()
(inherited from Processor) — The most important method. A producer is essentially a special type of processor that sends a request to an endpoint, instead of forwarding the exchange object to another processor. By overriding theprocess()
method, you define how the producer sends and receives messages to and from the relevant endpoint. -
getEndpoint()
— Returns a reference to the parent endpoint instance. -
createExchange()
— These overloaded methods are analogous to the corresponding methods defined in the Endpoint interface. Normally, these methods delegate to the corresponding methods defined on the parent Endpoint instance (this is what theDefaultEndpoint
class does by default). Occasionally, you might need to override these methods.
Asynchronous processing
Processing an exchange object in a producer — which usually involves sending a message to a remote destination and waiting for a reply — can potentially block for a significant length of time. If you want to avoid blocking the current thread, you can opt to implement the producer as an asynchronous processor. The asynchronous processing pattern decouples the preceding processor from the producer, so that the process()
method returns without delay. See Section 38.1.4, “Asynchronous Processing”.
When implementing a producer, you can support the asynchronous processing model by implementing the org.apache.camel.AsyncProcessor interface. On its own, this is not enough to ensure that the asynchronous processing model will be used: it is also necessary for the preceding processor in the chain to call the asynchronous version of the process()
method. The definition of the AsyncProcessor interface is shown in Example 42.2, “AsyncProcessor Interface”.
Example 42.2. AsyncProcessor Interface
package org.apache.camel; public interface AsyncProcessor extends Processor { boolean process(Exchange exchange, AsyncCallback callback); }
The asynchronous version of the process()
method takes an extra argument, callback
, of org.apache.camel.AsyncCallback type. The corresponding AsyncCallback interface is defined as shown in Example 42.3, “AsyncCallback Interface”.
Example 42.3. AsyncCallback Interface
package org.apache.camel; public interface AsyncCallback { void done(boolean doneSynchronously); }
The caller of AsyncProcessor.process()
must provide an implementation of AsyncCallback to receive the notification that processing has finished. The AsyncCallback.done()
method takes a boolean argument that indicates whether the processing was performed synchronously or not. Normally, the flag would be false
, to indicate asynchronous processing. In some cases, however, it can make sense for the producer not to process asynchronously (in spite of being asked to do so). For example, if the producer knows that the processing of the exchange will complete rapidly, it could optimise the processing by doing it synchronously. In this case, the doneSynchronously
flag should be set to true
.
ExchangeHelper class
When implementing a producer, you might find it helpful to call some of the methods in the org.apache.camel.util.ExchangeHelper
utility class. For full details of the ExchangeHelper
class, see Section 35.4, “The ExchangeHelper Class”.
42.2. Implementing the Producer Interface
Alternative ways of implementing a producer
You can implement a producer in one of the following ways:
How to implement a synchronous producer
Example 42.4, “DefaultProducer Implementation” outlines how to implement a synchronous producer. In this case, call to Producer.process()
blocks until a reply is received.
Example 42.4. DefaultProducer Implementation
import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultProducer; public class CustomProducer extends DefaultProducer { 1 public CustomProducer(Endpoint endpoint) { 2 super(endpoint); // Perform other initialization tasks... } public void process(Exchange exchange) throws Exception { 3 // Process exchange synchronously. // ... } }
- 1
- Implement a custom synchronous producer class, CustomProducer, by extending the
org.apache.camel.impl.DefaultProducer
class. - 2
- Implement a constructor that takes a reference to the parent endpoint.
- 3
- The
process()
method implementation represents the core of the producer code. The implementation of theprocess()
method is entirely dependent on the type of component that you are implementing.
In outline, the process()
method is normally implemented as follows:
- If the exchange contains an In message, and if this is consistent with the specified exchange pattern, then send the In message to the designated endpoint.
-
If the exchange pattern anticipates the receipt of an Out message, then wait until the Out message has been received. This typically causes the
process()
method to block for a significant length of time. -
When a reply is received, call
exchange.setOut()
to attach the reply to the exchange object. If the reply contains a fault message, set the fault flag on the Out message usingMessage.setFault(true)
.
How to implement an asynchronous producer
Example 42.5, “CollectionProducer Implementation” outlines how to implement an asynchronous producer. In this case, you must implement both a synchronous process()
method and an asynchronous process()
method (which takes an additional AsyncCallback
argument).
Example 42.5. CollectionProducer Implementation
import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultProducer; public class _CustomProducer_ extends DefaultProducer implements AsyncProcessor { 1 public _CustomProducer_(Endpoint endpoint) { 2 super(endpoint); // ... } public void process(Exchange exchange) throws Exception { 3 // Process exchange synchronously. // ... } public boolean process(Exchange exchange, AsyncCallback callback) { 4 // Process exchange asynchronously. CustomProducerTask task = new CustomProducerTask(exchange, callback); // Process 'task' in a separate thread... // ... return false; 5 } } public class CustomProducerTask implements Runnable { 6 private Exchange exchange; private AsyncCallback callback; public CustomProducerTask(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; this.callback = callback; } public void run() { 7 // Process exchange. // ... callback.done(false); } }
- 1
- Implement a custom asynchronous producer class, CustomProducer, by extending the
org.apache.camel.impl.DefaultProducer
class, and implementing the AsyncProcessor interface. - 2
- Implement a constructor that takes a reference to the parent endpoint.
- 3
- Implement the synchronous
process()
method. - 4
- Implement the asynchronous
process()
method. You can implement the asynchronous method in several ways. The approach shown here is to create ajava.lang.Runnable
instance,task
, that represents the code that runs in a sub-thread. You then use the Java threading API to run the task in a sub-thread (for example, by creating a new thread or by allocating the task to an existing thread pool). - 5
- Normally, you return
false
from the asynchronousprocess()
method, to indicate that the exchange was processed asynchronously. - 6
- The
CustomProducerTask
class encapsulates the processing code that runs in a sub-thread. This class must store a copy of theExchange
object,exchange
, and theAsyncCallback
object,callback
, as private member variables. - 7
- The
run()
method contains the code that sends the In message to the producer endpoint and waits to receive the reply, if any. After receiving the reply (Out message or Fault message) and inserting it into the exchange object, you must callcallback.done()
to notify the caller that processing is complete.