Chapter 44. Implementing a Component
Abstract
This chapter provides a general overview of the approaches can be used to implement a Apache Camel component.
44.1. Component Architecture
44.1.1. Factory Patterns for a Component
Overview
A Apache Camel component consists of a set of classes that are related to each other through a factory pattern. The primary entry point to a component is the
Component
object itself (an instance of org.apache.camel.Component
type). You can use the Component
object as a factory to create Endpoint
objects, which in turn act as factories for creating Consumer
, Producer
, and Exchange
objects. These relationships are summarized in Figure 44.1, “Component Factory Patterns”
Figure 44.1. Component Factory Patterns
Component
A component implementation is an endpoint factory. The main task of a component implementor is to implement the
Component.createEndpoint()
method, which is responsible for creating new endpoints on demand.
Each kind of component must be associated with a component prefix that appears in an endpoint URI. For example, the file component is usually associated with the file prefix, which can be used in an endpoint URI like file://tmp/messages/input. When you install a new component in Apache Camel, you must define the association between a particular component prefix and the name of the class that implements the component.
Endpoint
Each endpoint instance encapsulates a particular endpoint URI. Every time Apache Camel encounters a new endpoint URI, it creates a new endpoint instance. An endpoint object is also a factory for creating consumer endpoints and producer endpoints.
Endpoints must implement the
org.apache.camel.Endpoint
interface. The Endpoint
interface defines the following factory methods:
createConsumer()
andcreatePollingConsumer()
—Creates a consumer endpoint, which represents the source endpoint at the beginning of a route.createProducer()
—Creates a producer endpoint, which represents the target endpoint at the end of a route.createExchange()
—Creates an exchange object, which encapsulates the messages passed up and down the route.
Consumer
Consumer endpoints consume requests. They always appear at the start of a route and they encapsulate the code responsible for receiving incoming requests and dispatching outgoing replies. From a service-oriented prospective a consumer represents a service.
Consumers must implement the
org.apache.camel.Consumer
interface. There are a number of different patterns you can follow when implementing a consumer. These patterns are described in Section 44.1.3, “Consumer Patterns and Threading”.
Producer
Producer endpoints produce requests. They always appears at the end of a route and they encapsulate the code responsible for dispatching outgoing requests and receiving incoming replies. From a service-oriented prospective a producer represents a service consumer.
Producers must implement the
org.apache.camel.Producer
interface. You can optionally implement the producer to support an asynchronous style of processing. See Section 44.1.4, “Asynchronous Processing” for details.
Exchange
Exchange objects encapsulate a related set of messages. For example, one kind of message exchange is a synchronous invocation, which consists of a request message and its related reply.
Exchanges must implement the
org.apache.camel.Exchange
interface. The default implementation, DefaultExchange
, is sufficient for many component implementations. However, if you want to associated extra data with the exchanges or have the exchanges preform additional processing, it can be useful to customize the exchange implementation.
Message
There are two different message slots in an
Exchange
object:
- In message—holds the current message.
- Out message—temporarily holds a reply message.
All of the message types are represented by the same Java object,
org.apache.camel.Message
. It is not always necessary to customize the message implementation—the default implementation, DefaultMessage
, is usually adequate.
44.1.2. Using a Component in a Route
Overview
A Apache Camel route is essentially a pipeline of processors, of
org.apache.camel.Processor
type. Messages are encapsulated in an exchange object, E
, which gets passed from node to node by invoking the process()
method. The architecture of the processor pipeline is illustrated in Figure 44.2, “Consumer and Producer Instances in a Route”.
Figure 44.2. Consumer and Producer Instances in a Route
Source endpoint
At the start of the route, you have the source endpoint, which is represented by an
org.apache.camel.Consumer
object. The source endpoint is responsible for accepting incoming request messages and dispatching replies. When constructing the route, Apache Camel creates the appropriate Consumer
type based on the component prefix from the endpoint URI, as described in Section 44.1.1, “Factory Patterns for a Component”.
Processors
Each intermediate node in the pipeline is represented by a processor object (implementing the
org.apache.camel.Processor
interface). You can insert either standard processors (for example, filter
, throttler
, or delayer
) or insert your own custom processor implementations.
Target endpoint
At the end of the route is the target endpoint, which is represented by an
org.apache.camel.Producer
object. Because it comes at the end of a processor pipeline, the producer is also a processor object (implementing the org.apache.camel.Processor
interface). The target endpoint is responsible for sending outgoing request messages and receiving incoming replies. When constructing the route, Apache Camel creates the appropriate Producer
type based on the component prefix from the endpoint URI.
44.1.3. Consumer Patterns and Threading
Overview
The pattern used to implement the consumer determines the threading model used in processing the incoming exchanges. Consumers can be implemented using one of the following patterns:
- Event-driven pattern—The consumer is driven by an external thread.
- Scheduled poll pattern—The consumer is driven by a dedicated thread pool.
- Polling pattern—The threading model is left undefined.
Event-driven pattern
In the event-driven pattern, the processing of an incoming request is initiated when another part of the application (typically a third-party library) calls a method implemented by the consumer. A good example of an event-driven consumer is the Apache Camel JMX component, where events are initiated by the JMX library. The JMX library calls the
handleNotification()
method to initiate request processing—see Example 47.4, “JMXConsumer Implementation” for details.
Figure 44.3, “Event-Driven Consumer” shows an outline of the event-driven consumer pattern. In this example, it is assumed that processing is triggered by a call to the
notify()
method.
Figure 44.3. Event-Driven Consumer
The event-driven consumer processes incoming requests as follows:
- The consumer must implement a method to receive the incoming event (in Figure 44.3, “Event-Driven Consumer” this is represented by the
notify()
method). The thread that callsnotify()
is normally a separate part of the application, so the consumer's threading policy is externally driven.For example, in the case of the JMX consumer implementation, the consumer implements theNotificationListener.handleNotification()
method to receive notifications from JMX. The threads that drive the consumer processing are created within the JMX layer. - In the body of the
notify()
method, the consumer first converts the incoming event into an exchange object,E
, and then callsprocess()
on the next processor in the route, passing the exchange object as its argument.
Scheduled poll pattern
In the scheduled poll pattern, the consumer retrieves incoming requests by checking at regular time intervals whether or not a request has arrived. Checking for requests is scheduled automatically by a built-in timer class, the scheduled executor service, which is a standard pattern provided by the java.util.concurrent library. The scheduled executor service executes a particular task at timed intervals and it also manages a pool of threads, which are used to run the task instances.
Figure 44.4, “Scheduled Poll Consumer” shows an outline of the scheduled poll consumer pattern.
Figure 44.4. Scheduled Poll Consumer
The scheduled poll consumer processes incoming requests as follows:
- The scheduled executor service has a pool of threads at its disposal, that can be used to initiate consumer processing. After each scheduled time interval has elapsed, the scheduled executor service attempts to grab a free thread from its pool (there are five threads in the pool by default). If a free thread is available, it uses that thread to call the
poll()
method on the consumer. - The consumer's
poll()
method is intended to trigger processing of an incoming request. In the body of thepoll()
method, the consumer attempts to retrieve an incoming message. If no request is available, thepoll()
method returns immediately. - If a request message is available, the consumer inserts it into an exchange object and then calls
process()
on the next processor in the route, passing the exchange object as its argument.
Polling pattern
In the polling pattern, processing of an incoming request is initiated when a third-party calls one of the consumer's polling methods:
receive()
receiveNoWait()
receive(long timeout)
It is up to the component implementation to define the precise mechanism for initiating calls on the polling methods. This mechanism is not specified by the polling pattern.
Figure 44.5, “Polling Consumer” shows an outline of the polling consumer pattern.
Figure 44.5. Polling Consumer
The polling consumer processes incoming requests as follows:
- Processing of an incoming request is initiated whenever one of the consumer's polling methods is called. The mechanism for calling these polling methods is implementation defined.
- In the body of the
receive()
method, the consumer attempts to retrieve an incoming request message. If no message is currently available, the behavior depends on which receive method was called.receiveNoWait()
returns immediatelyreceive(long timeout)
waits for the specified timeout interval[3] before returningreceive()
waits until a message is received
- If a request message is available, the consumer inserts it into an exchange object and then calls
process()
on the next processor in the route, passing the exchange object as its argument.
44.1.4. Asynchronous Processing
Overview
Producer endpoints normally follow a synchronous pattern when processing an exchange. When the preceding processor in a pipeline calls
process()
on a producer, the process()
method blocks until a reply is received. In this case, the processor's thread remains blocked until the producer has completed the cycle of sending the request and receiving the reply.
Sometimes, however, you might prefer to decouple the preceding processor from the producer, so that the processor's thread is released immediately and the
process()
call does not block. In this case, you should implement the producer using an asynchronous pattern, which gives the preceding processor the option of invoking a non-blocking version of the process()
method.
To give you an overview of the different implementation options, this section describes both the synchronous and the asynchronous patterns for implementing a producer endpoint.
Synchronous producer
Figure 44.6, “Synchronous Producer” shows an outline of a synchronous producer, where the preceding processor blocks until the producer has finished processing the exchange.
Figure 44.6. Synchronous Producer
The synchronous producer processes an exchange as follows:
- The preceding processor in the pipeline calls the synchronous
process()
method on the producer to initiate synchronous processing. The synchronousprocess()
method takes a single exchange argument. - In the body of the
process()
method, the producer sends the request (In message) to the endpoint. - If required by the exchange pattern, the producer waits for the reply (Out message) to arrive from the endpoint. This step can cause the
process()
method to block indefinitely. However, if the exchange pattern does not mandate a reply, theprocess()
method can return immediately after sending the request. - When the
process()
method returns, the exchange object contains the reply from the synchronous call (an Out message message).
Asynchronous producer
Figure 44.7, “Asynchronous Producer” shows an outline of an asynchronous producer, where the producer processes the exchange in a sub-thread, and the preceding processor is not blocked for any significant length of time.
Figure 44.7. Asynchronous Producer
The asynchronous producer processes an exchange as follows:
- Before the processor can call the asynchronous
process()
method, it must create an asynchronous callback object, which is responsible for processing the exchange on the return portion of the route. For the asynchronous callback, the processor must implement a class that inherits from theAsyncCallback
interface. - The processor calls the asynchronous
process()
method on the producer to initiate asynchronous processing. The asynchronousprocess()
method takes two arguments:- an exchange object
- a synchronous callback object
- In the body of the
process()
method, the producer creates aRunnable
object that encapsulates the processing code. The producer then delegates the execution of thisRunnable
object to a sub-thread. - The asynchronous
process()
method returns, thereby freeing up the processor's thread. The exchange processing continues in a separate sub-thread. - The
Runnable
object sends the In message to the endpoint. - If required by the exchange pattern, the
Runnable
object waits for the reply (Out or Fault message) to arrive from the endpoint. TheRunnable
object remains blocked until the reply is received. - After the reply arrives, the
Runnable
object inserts the reply (Out message) into the exchange object and then callsdone()
on the asynchronous callback object. The asynchronous callback is then responsible for processing the reply message (executed in the sub-thread).