41.2. 实施消费者接口
实施消费者的替代方法
您可以使用以下方法之一实现使用者:
事件驱动的消费者实施
在事件驱动的消费者中,外部事件明确驱动处理。事件通过 event-listener 接口接收,其中监听器接口特定于特定事件源。
例 41.4 “JMXConsumer 实施” 显示 JMXConsumer
类的实施,该类取自 Apache Camel JMX 组件实施。JMXConsumer
类是一个事件驱动的使用者示例,它通过继承 org.apache.camel.impl.DefaultConsumer
类来实现。对于 JMXConsumer
示例,事件通过 NotificationListener.handleNotification ()
方法上的调用来表示,这是接收 JMX 事件的一种标准方式。要接收这些 JMX 事件,需要实现 NotificationListener 接口并覆盖 handleNotification ()
方法,如 例 41.4 “JMXConsumer 实施” 所示。
例 41.4. JMXConsumer 实施
package org.apache.camel.component.jmx; import javax.management.Notification; import javax.management.NotificationListener; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1 JMXEndpoint jmxEndpoint; public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2 super(endpoint, processor); this.jmxEndpoint = endpoint; } public void handleNotification(Notification notification, Object handback) { 3 try { getProcessor().process(jmxEndpoint.createExchange(notification)); 4 } catch (Throwable e) { handleException(e); 5 } } }
- 1
JMXConsumer
模式通过扩展DefaultConsumer
类来遵循事件驱动的用户的常规模式。此外,由于此消费者设计用于接收来自 JMX (由 JMX 通知表示)的事件,因此实施NotificationListener
接口是必需的。- 2
- 您必须至少实施一个构造器,该构造器将父
端点、端点
的引用以及到链中的下一个处理器
引用作为参数。 - 3
- 每当 JMX 通知到达时,
handleNotification ()
方法(在NotificationListener
中定义)将自动被 JMX 调用。这个方法的正文应包含执行消费者事件处理的代码。由于handleNotification ()
调用源自 JMX 层,因此消费者的线程模型由 JMX 层隐式控制,而不是由JMXConsumer
类控制。 - 4
- 此代码行中包含了两个步骤:首先,JMX 通知对象转换为交换对象,这是 Apache Camel 中事件的通用表示。然后,新创建的交换对象将传递到路由中的下一个处理器(同步)。
- 5
handleException ()
方法由DefaultConsumer
基本类实施。默认情况下,它使用org.apache.camel.impl.LoggingExceptionHandler
类来处理异常。
handleNotification ()
方法特定于 JMX 示例。实施您自己的事件驱动消费者时,您必须识别在自定义消费者中实施的一种类类事件监听程序方法。
调度的轮询消费者实施
在调度的轮询消费者中,轮询事件由计时器类( java.util.concurrent.ScheduledExecutorService
)自动生成。要接收生成的轮询事件,您必须实施 ScheduledPollConsumer.poll ()
方法(请参阅 第 38.1.3 节 “消费者模式和线程”)。
例 41.5 “ScheduledPollConsumer 实施” 介绍如何实施遵循调度轮询模式的使用者,该模式通过扩展 ScheduledPollConsumer
类来实施。
例 41.5. ScheduledPollConsumer 实施
import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledPollConsumer; public class pass:quotes[CustomConsumer] extends ScheduledPollConsumer { 1 private final pass:quotes[CustomEndpoint] endpoint; public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint, Processor processor) { 2 super(endpoint, processor); this.endpoint = endpoint; } protected void poll() throws Exception { 3 Exchange exchange = /* Receive exchange object ... */; // Example of a synchronous processor. getProcessor().process(exchange); 4 } @Override protected void doStart() throws Exception { 5 // Pre-Start: // Place code here to execute just before start of processing. super.doStart(); // Post-Start: // Place code here to execute just after start of processing. } @Override protected void doStop() throws Exception { 6 // Pre-Stop: // Place code here to execute just before processing stops. super.doStop(); // Post-Stop: // Place code here to execute just after processing stops. } }
- 1
- 通过扩展
org.apache.camel.impl.ScheduledPollConsumer
类,实施计划的 poll consumer 类 CustomConsumer。 - 2
- 您必须至少实施一个构造器,该构造器将父
端点、端点
的引用以及到链中的下一个处理器
引用作为参数。 - 3
- 覆盖
poll ()
方法,以接收计划的轮询事件。这是您应该放置检索和处理传入事件(通过交换对象代表)的代码。 - 4
- 在本例中,事件会同步处理。如果想要异步处理事件,则需要通过调用
getAsyncProcessor ()
来改用对异步处理器的引用。有关如何异步处理事件的详情,请参考 第 38.1.4 节 “异步处理”。 - 5
- (可选) 如果您希望某些行代码在使用者启动时执行,请按如下所示覆盖
doStart ()
方法。 - 6
- (可选) 如果您希望某些行代码以使用者的形式执行,请按如下所示覆盖
doStop ()
方法。
轮询消费者实施
例 41.6 “PollingConsumerSupport 的实现” 概述了如何实施遵循轮询模式的使用者,它通过扩展 轮询ConsumerSupport
类来实施。
例 41.6. PollingConsumerSupport 的实现
import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.PollingConsumerSupport; public class pass:quotes[CustomConsumer] extends PollingConsumerSupport { 1 private final pass:quotes[CustomEndpoint] endpoint; public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint) { 2 super(endpoint); this.endpoint = endpoint; } public Exchange receiveNoWait() { 3 Exchange exchange = /* Obtain an exchange object. */; // Further processing ... return exchange; } public Exchange receive() { 4 // Blocking poll ... } public Exchange receive(long timeout) { 5 // Poll with timeout ... } protected void doStart() throws Exception { 6 // Code to execute whilst starting up. } protected void doStop() throws Exception { // Code to execute whilst shutting down. } }
- 1
- 通过扩展
org.apache.camel.impl.PollingConsumerSupport
类来实施您的轮询消费者类 CustomConsumer。 - 2
- 您必须至少实施一个构造器,它作为参数获取到父端点
端点端点
。轮询使用者不需要引用处理器实例。 - 3
receiveNoWait ()
方法应实施非阻塞算法来获取事件(交换对象)。如果没有事件可用,它应该返回null
。- 4
receive ()
方法应实施用于检索事件的阻塞算法。如果事件不可用,此方法可以无限期地阻止。- 5
receive (long timeout)
方法实施一个可以阻止的算法,只要指定的超时(通常以毫秒为单位指定)。- 6
- 如果要插入在使用者启动或关闭时执行的代码,请分别实施
doStart ()
方法和doStop ()
方法。
自定义线程实现
如果标准消费者模式不适用于您的消费者实施,您可以直接实施 Consumer
接口并自行编写线程代码。但是,在编写线程代码时,务必要遵守标准 Apache Camel 线程模型,如 第 2.8 节 “线程模型” 所述。
例如,来自 camel-core
的 SEDA 组件实施了自己的消费者线程,这与 Apache Camel 线程模型一致。例 41.7 “自定义线程实现” 显示了 SedaConsumer
类如何实施其线程的概要。
例 41.7. 自定义线程实现
package org.apache.camel.component.seda; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.util.ServiceHelper; ... import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * A Consumer for the SEDA component. * * @version $Revision: 922485 $ */ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { 1 private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class); private SedaEndpoint endpoint; private Processor processor; private ExecutorService executor; ... public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; this.processor = processor; } ... public void run() { 2 BlockingQueue<Exchange> queue = endpoint.getQueue(); // Poll the queue and process exchanges ... } ... protected void doStart() throws Exception { 3 int poolSize = endpoint.getConcurrentConsumers(); executor = endpoint.getCamelContext().getExecutorServiceStrategy() .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 4 for (int i = 0; i < poolSize; i++) { 5 executor.execute(this); } endpoint.onStarted(this); } protected void doStop() throws Exception { 6 endpoint.onStopped(this); // must shutdown executor on stop to avoid overhead of having them running endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); 7 if (multicast != null) { ServiceHelper.stopServices(multicast); } } ... //---------- // Implementation of ShutdownAware interface public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { // deny stopping on shutdown as we want seda consumers to run in case some other queues // depend on this consumer to run, so it can complete its exchanges return true; } public int getPendingExchangesSize() { // number of pending messages on the queue return endpoint.getQueue().size(); } }
- 1
SedaConsumer
类通过扩展org.apache.camel.impl.ServiceSupport
类来实施,并实施使用者
、Runnable
和ShutdownAware
接口。- 2
- 实施
Runnable.run ()
方法,以定义使用者在线程中运行的时间。在这种情况下,消费者在循环中运行,轮询新交换的队列,然后在队列的后方处理交换。 - 3
doStart ()
方法继承自ServiceSupport
。您可以覆盖此方法来定义消费者启动时的功能。- 4
- 您应当使用使用
CamelContext
注册的ExecutorServiceStrategy
对象来创建线程池,而不是直接创建线程。这很重要,因为它使 Apache Camel 能够集中管理线程,并且支持安全关闭等功能。详情请查看 第 2.8 节 “线程模型”。 - 5
- 通过调用
ExecutorService.execute ()
方法池Size 来启动
线程。 - 6
doStop ()
方法从ServiceSupport
继承。您可以覆盖此方法,以定义消费者在关闭时执行的操作。- 7
- 关闭线程池,它由
executor
实例表示。