51.2. Implementing the Producer Interface
Alternative ways of implementing a producer
You can implement a producer in one of the following ways:
How to implement a synchronous producer
Example 51.4, “DefaultProducer Implementation” outlines how to implement a synchronous producer. In this case, call to
Producer.process()
blocks until a reply is received.
Example 51.4. DefaultProducer Implementation
import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultProducer; public class CustomProducer extends DefaultProducer { 1 public CustomProducer(Endpoint endpoint) { 2 super(endpoint); // Perform other initialization tasks... } public void process(Exchange exchange) throws Exception { 3 // Process exchange synchronously. // ... } }
- 1
- Implement a custom synchronous producer class, CustomProducer, by extending the
org.apache.camel.impl.DefaultProducer
class. - 2
- Implement a constructor that takes a reference to the parent endpoint.
- 3
- The
process()
method implementation represents the core of the producer code. The implementation of theprocess()
method is entirely dependent on the type of component that you are implementing. In outline, theprocess()
method is normally implemented as follows:- If the exchange contains an In message, and if this is consistent with the specified exchange pattern, then send the In message to the designated endpoint.
- If the exchange pattern anticipates the receipt of an Out message, then wait until the Out message has been received. This typically causes the
process()
method to block for a significant length of time. - When a reply is received, call
exchange.setOut()
to attach the reply to the exchange object. If the reply contains a fault message, set the fault flag on the Out message usingMessage.setFault(true)
.
How to implement an asynchronous producer
Example 51.5, “CollectionProducer Implementation” outlines how to implement an asynchronous producer. In this case, you must implement both a synchronous
process()
method and an asynchronous process()
method (which takes an additional AsyncCallback
argument).
Example 51.5. CollectionProducer Implementation
import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultProducer; public class CustomProducer extends DefaultProducer implements AsyncProcessor { 1 public CustomProducer(Endpoint endpoint) { 2 super(endpoint); // ... } public void process(Exchange exchange) throws Exception { 3 // Process exchange synchronously. // ... } public boolean process(Exchange exchange, AsyncCallback callback) { 4 // Process exchange asynchronously. CustomProducerTask task = new CustomProducerTask(exchange, callback); // Process 'task' in a separate thread... // ... return false; 5 } } public class CustomProducerTask implements Runnable { 6 private Exchange exchange; private AsyncCallback callback; public CustomProducerTask(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; this.callback = callback; } public void run() { 7 // Process exchange. // ... callback.done(false); } }
- 1
- Implement a custom asynchronous producer class, CustomProducer, by extending the
org.apache.camel.impl.DefaultProducer
class, and implementing theAsyncProcessor
interface. - 2
- Implement a constructor that takes a reference to the parent endpoint.
- 3
- Implement the synchronous
process()
method. - 4
- Implement the asynchronous
process()
method. You can implement the asynchronous method in several ways. The approach shown here is to create ajava.lang.Runnable
instance,task
, that represents the code that runs in a sub-thread. You then use the Java threading API to run the task in a sub-thread (for example, by creating a new thread or by allocating the task to an existing thread pool). - 5
- Normally, you return
false
from the asynchronousprocess()
method, to indicate that the exchange was processed asynchronously. - 6
- The CustomProducer
Task
class encapsulates the processing code that runs in a sub-thread. This class must store a copy of theExchange
object,exchange
, and theAsyncCallback
object,callback
, as private member variables. - 7
- The
run()
method contains the code that sends the In message to the producer endpoint and waits to receive the reply, if any. After receiving the reply (Out message or Fault message) and inserting it into the exchange object, you must callcallback.done()
to notify the caller that processing is complete.