50.2. Implementing the Consumer Interface
Alternative ways of implementing a consumer
You can implement a consumer in one of the following ways:
Event-driven consumer implementation
In an event-driven consumer, processing is driven explicitly by external events. The events are received through an event-listener interface, where the listener interface is specific to the particular event source.
Example 50.4, “JMXConsumer Implementation” shows the implementation of the
JMXConsumer
class, which is taken from the Apache Camel JMX component implementation. The JMXConsumer
class is an example of an event-driven consumer, which is implemented by inheriting from the org.apache.camel.impl.DefaultConsumer
class. In the case of the JMXConsumer
example, events are represented by calls on the NotificationListener.handleNotification()
method, which is a standard way of receiving JMX events. In order to receive these JMX events, it is necessary to implement the NotificationListener
interface and override the handleNotification()
method, as shown in Example 50.4, “JMXConsumer Implementation”.
Example 50.4. JMXConsumer Implementation
package org.apache.camel.component.jmx; import javax.management.Notification; import javax.management.NotificationListener; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1 JMXEndpoint jmxEndpoint; public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2 super(endpoint, processor); this.jmxEndpoint = endpoint; } public void handleNotification(Notification notification, Object handback) { 3 try { getProcessor().process(jmxEndpoint.createExchange(notification)); 4 } catch (Throwable e) { handleException(e); 5 } } }
- 1
- The
JMXConsumer
pattern follows the usual pattern for event-driven consumers by extending theDefaultConsumer
class. Additionally, because this consumer is designed to receive events from JMX (which are represented by JMX notifications), it is necessary to implement theNotificationListener
interface. - 2
- You must implement at least one constructor that takes a reference to the parent endpoint,
endpoint
, and a reference to the next processor in the chain,processor
, as arguments. - 3
- The
handleNotification()
method (which is defined inNotificationListener
) is automatically invoked by JMX whenever a JMX notification arrives. The body of this method should contain the code that performs the consumer's event processing. Because thehandleNotification()
call originates from the JMX layer, the consumer's threading model is implicitly controlled by the JMX layer, not by theJMXConsumer
class.NoteThehandleNotification()
method is specific to the JMX example. When implementing your own event-driven consumer, you must identify an analogous event listener method to implement in your custom consumer. - 4
- This line of code combines two steps. First, the JMX notification object is converted into an exchange object, which is the generic representation of an event in Apache Camel. Then the newly created exchange object is passed to the next processor in the route (invoked synchronously).
- 5
- The
handleException()
method is implemented by theDefaultConsumer
base class. By default, it handles exceptions using theorg.apache.camel.impl.LoggingExceptionHandler
class.
Scheduled poll consumer implementation
In a scheduled poll consumer, polling events are automatically generated by a timer class,
java.util.concurrent.ScheduledExecutorService
. To receive the generated polling events, you must implement the ScheduledPollConsumer.poll()
method (see Section 47.1.3, “Consumer Patterns and Threading”).
Example 50.5, “ScheduledPollConsumer Implementation” shows how to implement a consumer that follows the scheduled poll pattern, which is implemented by extending the
ScheduledPollConsumer
class.
Example 50.5. ScheduledPollConsumer Implementation
import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledPollConsumer; public class CustomConsumer extends ScheduledPollConsumer { 1 private final CustomEndpoint endpoint; public CustomConsumer(CustomEndpoint endpoint, Processor processor) { 2 super(endpoint, processor); this.endpoint = endpoint; } protected void poll() throws Exception { 3 Exchange exchange = /* Receive exchange object ... */; // Example of a synchronous processor. getProcessor().process(exchange); 4 } @Override protected void doStart() throws Exception { 5 // Pre-Start: // Place code here to execute just before start of processing. super.doStart(); // Post-Start: // Place code here to execute just after start of processing. } @Override protected void doStop() throws Exception { 6 // Pre-Stop: // Place code here to execute just before processing stops. super.doStop(); // Post-Stop: // Place code here to execute just after processing stops. } }
- 1
- Implement a scheduled poll consumer class, CustomConsumer, by extending the
org.apache.camel.impl.ScheduledPollConsumer
class. - 2
- You must implement at least one constructor that takes a reference to the parent endpoint,
endpoint
, and a reference to the next processor in the chain,processor
, as arguments. - 3
- Override the
poll()
method to receive the scheduled polling events. This is where you should put the code that retrieves and processes incoming events (represented by exchange objects). - 4
- In this example, the event is processed synchronously. If you want to process events asynchronously, you should use a reference to an asynchronous processor instead, by calling
getAsyncProcessor()
. For details of how to process events asynchronously, see Section 47.1.4, “Asynchronous Processing”. - 5
- (Optional) If you want some lines of code to execute as the consumer is starting up, override the
doStart()
method as shown. - 6
- (Optional) If you want some lines of code to execute as the consumer is stopping, override the
doStop()
method as shown.
Polling consumer implementation
Example 50.6, “PollingConsumerSupport Implementation” outlines how to implement a consumer that follows the polling pattern, which is implemented by extending the
PollingConsumerSupport
class.
Example 50.6. PollingConsumerSupport Implementation
import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.PollingConsumerSupport; public class CustomConsumer extends PollingConsumerSupport { 1 private final CustomEndpoint endpoint; public CustomConsumer(CustomEndpoint endpoint) { 2 super(endpoint); this.endpoint = endpoint; } public Exchange receiveNoWait() { 3 Exchange exchange = /* Obtain an exchange object. */; // Further processing ... return exchange; } public Exchange receive() { 4 // Blocking poll ... } public Exchange receive(long timeout) { 5 // Poll with timeout ... } protected void doStart() throws Exception { 6 // Code to execute whilst starting up. } protected void doStop() throws Exception { // Code to execute whilst shutting down. } }
- 1
- Implement your polling consumer class, CustomConsumer, by extending the
org.apache.camel.impl.PollingConsumerSupport
class. - 2
- You must implement at least one constructor that takes a reference to the parent endpoint,
endpoint
, as an argument. A polling consumer does not need a reference to a processor instance. - 3
- The
receiveNoWait()
method should implement a non-blocking algorithm for retrieving an event (exchange object). If no event is available, it should returnnull
. - 4
- The
receive()
method should implement a blocking algorithm for retrieving an event. This method can block indefinitely, if events remain unavailable. - 5
- The
receive(long timeout)
method implements an algorithm that can block for as long as the specified timeout (typically specified in units of milliseconds). - 6
- If you want to insert code that executes while a consumer is starting up or shutting down, implement the
doStart()
method and thedoStop()
method, respectively.
Custom threading implementation
If the standard consumer patterns are not suitable for your consumer implementation, you can implement the
Consumer
interface directly and write the threading code yourself. When writing the threading code, however, it is important that you comply with the standard Apache Camel threading model, as described in Section 2.8, “Threading Model”.
For example, the SEDA component from
camel-core
implements its own consumer threading, which is consistent with the Apache Camel threading model. Example 50.7, “Custom Threading Implementation” shows an outline of how the SedaConsumer
class implements its threading.
Example 50.7. Custom Threading Implementation
package org.apache.camel.component.seda; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.util.ServiceHelper; ... import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * A Consumer for the SEDA component. * * @version $Revision: 922485 $ */ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { 1 private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class); private SedaEndpoint endpoint; private Processor processor; private ExecutorService executor; ... public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; this.processor = processor; } ... public void run() { 2 BlockingQueue<Exchange> queue = endpoint.getQueue(); // Poll the queue and process exchanges ... } ... protected void doStart() throws Exception { 3 int poolSize = endpoint.getConcurrentConsumers(); executor = endpoint.getCamelContext().getExecutorServiceStrategy() .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 4 for (int i = 0; i < poolSize; i++) { 5 executor.execute(this); } endpoint.onStarted(this); } protected void doStop() throws Exception { 6 endpoint.onStopped(this); // must shutdown executor on stop to avoid overhead of having them running endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); 7 executor = null; if (multicast != null) { ServiceHelper.stopServices(multicast); } } ... //---------- // Implementation of ShutdownAware interface public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // deny stopping on shutdown as we want seda consumers to run in case some other queues // depend on this consumer to run, so it can complete its exchanges return true; } public int getPendingExchangesSize() { // number of pending messages on the queue return endpoint.getQueue().size(); } }
- 1
- The
SedaConsumer
class is implemented by extending theorg.apache.camel.impl.ServiceSupport
class and implementing theConsumer
,Runnable
, andShutdownAware
interfaces. - 2
- Implement the
Runnable.run()
method to define what the consumer does while it is running in a thread. In this case, the consumer runs in a loop, polling the queue for new exchanges and then processing the exchanges in the latter part of the queue. - 3
- The
doStart()
method is inherited fromServiceSupport
. You override this method in order to define what the consumer does when it starts up. - 4
- Instead of creating threads directly, you should create a thread pool using the
ExecutorServiceStrategy
object that is registered with theCamelContext
. This is important, because it enables Apache Camel to implement centralized management of threads and support such features as graceful shutdown.For details, see Section 2.8, “Threading Model”. - 5
- Kick off the threads by calling the
ExecutorService.execute()
methodpoolSize
times. - 6
- The
doStop()
method is inherited fromServiceSupport
. You override this method in order to define what the consumer does when it shuts down. - 7
- Shut down the thread pool, which is represented by the
executor
instance.