Fuse 6 is no longer supported
As of February 2025, Red Hat Fuse 6 is no longer supported. If you are using Fuse 6, please upgrade to Red Hat build of Apache Camel.Ce contenu n'est pas disponible dans la langue sélectionnée.
Chapter 47. Implementing a Component
Abstract
47.1. Component Architecture Copier lienLien copié sur presse-papiers!
47.1.1. Factory Patterns for a Component Copier lienLien copié sur presse-papiers!
Overview Copier lienLien copié sur presse-papiers!
Component object itself (an instance of org.apache.camel.Component type). You can use the Component object as a factory to create Endpoint objects, which in turn act as factories for creating Consumer, Producer, and Exchange objects. These relationships are summarized in Figure 47.1, “Component Factory Patterns”
Figure 47.1. Component Factory Patterns
Component Copier lienLien copié sur presse-papiers!
Component.createEndpoint() method, which is responsible for creating new endpoints on demand.
Endpoint Copier lienLien copié sur presse-papiers!
org.apache.camel.Endpoint interface. The Endpoint interface defines the following factory methods:
createConsumer()andcreatePollingConsumer()—Creates a consumer endpoint, which represents the source endpoint at the beginning of a route.createProducer()—Creates a producer endpoint, which represents the target endpoint at the end of a route.createExchange()—Creates an exchange object, which encapsulates the messages passed up and down the route.
Consumer Copier lienLien copié sur presse-papiers!
org.apache.camel.Consumer interface. There are a number of different patterns you can follow when implementing a consumer. These patterns are described in Section 47.1.3, “Consumer Patterns and Threading”.
Producer Copier lienLien copié sur presse-papiers!
org.apache.camel.Producer interface. You can optionally implement the producer to support an asynchronous style of processing. See Section 47.1.4, “Asynchronous Processing” for details.
Exchange Copier lienLien copié sur presse-papiers!
org.apache.camel.Exchange interface. The default implementation, DefaultExchange, is sufficient for many component implementations. However, if you want to associated extra data with the exchanges or have the exchanges preform additional processing, it can be useful to customize the exchange implementation.
Message Copier lienLien copié sur presse-papiers!
Exchange object:
- In message—holds the current message.
- Out message—temporarily holds a reply message.
org.apache.camel.Message. It is not always necessary to customize the message implementation—the default implementation, DefaultMessage, is usually adequate.
47.1.2. Using a Component in a Route Copier lienLien copié sur presse-papiers!
Overview Copier lienLien copié sur presse-papiers!
org.apache.camel.Processor type. Messages are encapsulated in an exchange object, E, which gets passed from node to node by invoking the process() method. The architecture of the processor pipeline is illustrated in Figure 47.2, “Consumer and Producer Instances in a Route”.
Figure 47.2. Consumer and Producer Instances in a Route
Source endpoint Copier lienLien copié sur presse-papiers!
org.apache.camel.Consumer object. The source endpoint is responsible for accepting incoming request messages and dispatching replies. When constructing the route, Apache Camel creates the appropriate Consumer type based on the component prefix from the endpoint URI, as described in Section 47.1.1, “Factory Patterns for a Component”.
Processors Copier lienLien copié sur presse-papiers!
org.apache.camel.Processor interface). You can insert either standard processors (for example, filter, throttler, or delayer) or insert your own custom processor implementations.
Target endpoint Copier lienLien copié sur presse-papiers!
org.apache.camel.Producer object. Because it comes at the end of a processor pipeline, the producer is also a processor object (implementing the org.apache.camel.Processor interface). The target endpoint is responsible for sending outgoing request messages and receiving incoming replies. When constructing the route, Apache Camel creates the appropriate Producer type based on the component prefix from the endpoint URI.
47.1.3. Consumer Patterns and Threading Copier lienLien copié sur presse-papiers!
Overview Copier lienLien copié sur presse-papiers!
- Event-driven pattern—The consumer is driven by an external thread.
- Scheduled poll pattern—The consumer is driven by a dedicated thread pool.
- Polling pattern—The threading model is left undefined.
Event-driven pattern Copier lienLien copié sur presse-papiers!
handleNotification() method to initiate request processing—see Example 50.4, “JMXConsumer Implementation” for details.
notify() method.
Figure 47.3. Event-Driven Consumer
- The consumer must implement a method to receive the incoming event (in Figure 47.3, “Event-Driven Consumer” this is represented by the
notify()method). The thread that callsnotify()is normally a separate part of the application, so the consumer's threading policy is externally driven.For example, in the case of the JMX consumer implementation, the consumer implements theNotificationListener.handleNotification()method to receive notifications from JMX. The threads that drive the consumer processing are created within the JMX layer. - In the body of the
notify()method, the consumer first converts the incoming event into an exchange object,E, and then callsprocess()on the next processor in the route, passing the exchange object as its argument.
Scheduled poll pattern Copier lienLien copié sur presse-papiers!
Figure 47.4. Scheduled Poll Consumer
- The scheduled executor service has a pool of threads at its disposal, that can be used to initiate consumer processing. After each scheduled time interval has elapsed, the scheduled executor service attempts to grab a free thread from its pool (there are five threads in the pool by default). If a free thread is available, it uses that thread to call the
poll()method on the consumer. - The consumer's
poll()method is intended to trigger processing of an incoming request. In the body of thepoll()method, the consumer attempts to retrieve an incoming message. If no request is available, thepoll()method returns immediately. - If a request message is available, the consumer inserts it into an exchange object and then calls
process()on the next processor in the route, passing the exchange object as its argument.
Polling pattern Copier lienLien copié sur presse-papiers!
receive()receiveNoWait()receive(long timeout)
Figure 47.5. Polling Consumer
- Processing of an incoming request is initiated whenever one of the consumer's polling methods is called. The mechanism for calling these polling methods is implementation defined.
- In the body of the
receive()method, the consumer attempts to retrieve an incoming request message. If no message is currently available, the behavior depends on which receive method was called.receiveNoWait()returns immediatelyreceive(long timeout)waits for the specified timeout interval[3] before returningreceive()waits until a message is received
- If a request message is available, the consumer inserts it into an exchange object and then calls
process()on the next processor in the route, passing the exchange object as its argument.
47.1.4. Asynchronous Processing Copier lienLien copié sur presse-papiers!
Overview Copier lienLien copié sur presse-papiers!
process() on a producer, the process() method blocks until a reply is received. In this case, the processor's thread remains blocked until the producer has completed the cycle of sending the request and receiving the reply.
process() call does not block. In this case, you should implement the producer using an asynchronous pattern, which gives the preceding processor the option of invoking a non-blocking version of the process() method.
Synchronous producer Copier lienLien copié sur presse-papiers!
Figure 47.6. Synchronous Producer
- The preceding processor in the pipeline calls the synchronous
process()method on the producer to initiate synchronous processing. The synchronousprocess()method takes a single exchange argument. - In the body of the
process()method, the producer sends the request (In message) to the endpoint. - If required by the exchange pattern, the producer waits for the reply (Out message) to arrive from the endpoint. This step can cause the
process()method to block indefinitely. However, if the exchange pattern does not mandate a reply, theprocess()method can return immediately after sending the request. - When the
process()method returns, the exchange object contains the reply from the synchronous call (an Out message message).
Asynchronous producer Copier lienLien copié sur presse-papiers!
Figure 47.7. Asynchronous Producer
- Before the processor can call the asynchronous
process()method, it must create an asynchronous callback object, which is responsible for processing the exchange on the return portion of the route. For the asynchronous callback, the processor must implement a class that inherits from theAsyncCallbackinterface. - The processor calls the asynchronous
process()method on the producer to initiate asynchronous processing. The asynchronousprocess()method takes two arguments:- an exchange object
- a synchronous callback object
- In the body of the
process()method, the producer creates aRunnableobject that encapsulates the processing code. The producer then delegates the execution of thisRunnableobject to a sub-thread. - The asynchronous
process()method returns, thereby freeing up the processor's thread. The exchange processing continues in a separate sub-thread. - The
Runnableobject sends the In message to the endpoint. - If required by the exchange pattern, the
Runnableobject waits for the reply (Out or Fault message) to arrive from the endpoint. TheRunnableobject remains blocked until the reply is received. - After the reply arrives, the
Runnableobject inserts the reply (Out message) into the exchange object and then callsdone()on the asynchronous callback object. The asynchronous callback is then responsible for processing the reply message (executed in the sub-thread).