41.2. 实施 Consumer 接口
实施消费者的替代方法 复制链接链接已复制到粘贴板!
您可以使用以下方法之一实现消费者:
事件驱动的消费者实施 复制链接链接已复制到粘贴板!
在事件驱动的消费者中,处理由外部事件显式驱动。事件通过 event-listener 接口接收,其中监听程序接口特定于特定事件源。
例 41.4 “JMXConsumer 实现” 显示 JMXConsumer 类的实施,该类取自 Apache Camel JMX 组件实施。JMXConsumer 类是一个事件驱动的消费者的示例,它通过从 org.apache.camel.impl.DefaultConsumer 类继承来实现。对于 JMXConsumer 示例,事件由 NotificationListener.handleNotification () 方法上的调用来表示,这是接收 JMX 事件的标准方法。要接收这些 JMX 事件,需要实现 NotificationListener 接口并覆盖 handleNotification () 方法,如 例 41.4 “JMXConsumer 实现” 所示。
例 41.4. JMXConsumer 实现
package org.apache.camel.component.jmx;
import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
public class JMXConsumer extends DefaultConsumer implements NotificationListener {
JMXEndpoint jmxEndpoint;
public JMXConsumer(JMXEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.jmxEndpoint = endpoint;
}
public void handleNotification(Notification notification, Object handback) {
try {
getProcessor().process(jmxEndpoint.createExchange(notification));
} catch (Throwable e) {
handleException(e);
}
}
}
- 1
JMXConsumer模式通过扩展DefaultConsumer类,遵循事件驱动的消费者的常规模式。此外,由于此使用者设计为从 JMX 通知(由 JMX 通知表示)接收事件,因此需要实施NotificationListener接口。- 2
- 您必须实施对父端点、
端点以及对链中下一个处理器的引用,作为参数的一个构造器。 - 3
- 当 JMX 通知到达时,
handleNotification ()方法(在NotificationListener中定义) 会自动由 JMX 调用。此方法的正文应包含执行消费者事件处理的代码。由于handleNotification ()调用源自 JMX 层,因此消费者的线程模型由 JMX 层隐式控制,而不是JMXConsumer类。 - 4
- 这行代码组合了两个步骤。首先,JMX 通知对象转换为交换对象,这是 Apache Camel 中事件的通用表示。然后,新创建的交换对象传递到路由中的下一个处理器(以同步方式)。
- 5
handleException ()方法由DefaultConsumer基本类实现。默认情况下,它使用org.apache.camel.impl.LoggingExceptionHandler类来处理异常。
handleNotification () 方法特定于 JMX 示例。在实施您自己的事件驱动的消费者时,您必须识别类似事件监听程序方法,以便在自定义消费者中实施。
调度的轮询消费者实现 复制链接链接已复制到粘贴板!
在调度的轮询消费者中,轮询事件由计时器类 java.util.concurrent.ScheduledExecutorService 自动生成。要接收生成的轮询事件,您必须实现 ScheduledPollConsumer.poll () 方法(请参阅 第 38.1.3 节 “消费者模式和线程”)。
例 41.5 “ScheduledPollConsumer Implementation” 演示了如何实现遵循调度的轮询模式的消费者,该模式通过扩展 ScheduledPollConsumer 类来实现。
例 41.5. ScheduledPollConsumer Implementation
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
public class pass:quotes[CustomConsumer] extends ScheduledPollConsumer {
private final pass:quotes[CustomEndpoint] endpoint;
public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
protected void poll() throws Exception {
Exchange exchange = /* Receive exchange object ... */;
// Example of a synchronous processor.
getProcessor().process(exchange);
}
@Override
protected void doStart() throws Exception {
// Pre-Start:
// Place code here to execute just before start of processing.
super.doStart();
// Post-Start:
// Place code here to execute just after start of processing.
}
@Override
protected void doStop() throws Exception {
// Pre-Stop:
// Place code here to execute just before processing stops.
super.doStop();
// Post-Stop:
// Place code here to execute just after processing stops.
}
}
- 1
- 通过扩展
org.apache.camel.impl.ScheduledPollConsumer类,实施调度的轮询消费者类 CustomConsumer。 - 2
- 您必须实施对父端点、
端点以及对链中下一个处理器的引用,作为参数的一个构造器。 - 3
- 覆盖
poll ()方法,以接收调度的轮询事件。这是您应该放置检索和处理传入事件(由交换对象代表)的代码。 - 4
- 在本例中,事件会被同步处理。如果要异步处理事件,您应该通过调用
getAsyncProcessor ()来使用对异步处理器的引用。有关如何异步处理事件的详情,请参考 第 38.1.4 节 “异步处理”。 - 5
- (可选) 如果您希望某些行代码在启动使用者时执行,请覆盖
doStart ()方法,如下所示。 - 6
- (可选) 如果您希望某些代码行执行,因为消费者正在停止,请覆盖
doStop ()方法,如下所示。
轮询消费者实施 复制链接链接已复制到粘贴板!
例 41.6 “PollingConsumerSupport Implementation” 概述了如何通过扩展 PollingConsumerSupport 类来实现一个遵循轮询模式的消费者。
例 41.6. PollingConsumerSupport Implementation
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.PollingConsumerSupport;
public class pass:quotes[CustomConsumer] extends PollingConsumerSupport {
private final pass:quotes[CustomEndpoint] endpoint;
public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
public Exchange receiveNoWait() {
Exchange exchange = /* Obtain an exchange object. */;
// Further processing ...
return exchange;
}
public Exchange receive() {
// Blocking poll ...
}
public Exchange receive(long timeout) {
// Poll with timeout ...
}
protected void doStart() throws Exception {
// Code to execute whilst starting up.
}
protected void doStop() throws Exception {
// Code to execute whilst shutting down.
}
}
- 1
- 通过扩展
org.apache.camel.impl.PollingConsumerSupport类,实施您的轮询消费者类 CustomConsumer。 - 2
- 您必须至少实施对父端点端点端点 的引用的一个构造器,作为参数。
轮询消费者不需要对处理器实例的引用。 - 3
receiveNoWait ()方法应该实施用于检索事件(exchange 对象)的非阻塞算法。如果没有可用的事件,它应返回null。- 4
receive ()方法应该实施用于检索事件的阻塞算法。如果事件仍不可用,此方法可以无限期地阻止。- 5
receive (长超时)方法实施一种算法,只要指定的超时(通常以毫秒为单位)指定。- 6
- 如果要在消费者启动或关机期间插入执行的代码,请分别实施
doStart ()方法和doStop ()方法。
自定义线程实现 复制链接链接已复制到粘贴板!
如果标准的消费者模式不适用于您的消费者实施,您可以直接实施 Consumer 接口,并自行编写线程代码。当编写线程代码时,您必须遵守标准的 Apache Camel 线程模型,如 第 2.8 节 “线程模型” 所述。
例如,camel-core 中的 SEDA 组件实施自己的消费者线程,这与 Apache Camel 线程模型一致。例 41.7 “自定义线程实施” 显示 SedaConsumer 类如何实现其线程的概要。
例 41.7. 自定义线程实施
package org.apache.camel.component.seda;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Consumer for the SEDA component.
*
* @version $Revision: 922485 $
*/
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware {
private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);
private SedaEndpoint endpoint;
private Processor processor;
private ExecutorService executor;
...
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
this.processor = processor;
}
...
public void run() {
BlockingQueue<Exchange> queue = endpoint.getQueue();
// Poll the queue and process exchanges
...
}
...
protected void doStart() throws Exception {
int poolSize = endpoint.getConcurrentConsumers();
executor = endpoint.getCamelContext().getExecutorServiceStrategy()
.newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
for (int i = 0; i < poolSize; i++) {
executor.execute(this);
}
endpoint.onStarted(this);
}
protected void doStop() throws Exception {
endpoint.onStopped(this);
// must shutdown executor on stop to avoid overhead of having them running
endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor);
if (multicast != null) {
ServiceHelper.stopServices(multicast);
}
}
...
//----------
// Implementation of ShutdownAware interface
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
// deny stopping on shutdown as we want seda consumers to run in case some other queues
// depend on this consumer to run, so it can complete its exchanges
return true;
}
public int getPendingExchangesSize() {
// number of pending messages on the queue
return endpoint.getQueue().size();
}
}
- 1
SedaConsumer类通过扩展org.apache.camel.impl.ServiceSupport类来实现,并实施Consumer、Runnable、和ShutdownAware接口。- 2
- 实施
Runnable.run ()方法,以定义在线程中运行的使用者的作用。在这种情况下,消费者在循环中运行,轮询新交换的队列,然后在队列的后方部分处理交换。 - 3
doStart ()方法继承自ServiceSupport。您可以覆盖此方法,以定义消费者启动时的作用。- 4
- 您应该使用通过
CamelContext注册的ExecutorServiceStrategy对象创建一个线程池,而不是直接创建线程。这很重要,因为它可让 Apache Camel 实现线程的集中管理和支持,如安全关闭。详情请查看 第 2.8 节 “线程模型”。 - 5
- 通过调用
ExecutorService.execute ()方法poolSize时间来启动线程。 - 6
doStop ()方法继承自ServiceSupport。您可以覆盖此方法,以定义消费者在关闭时的作用。- 7
- 关闭线程池,由
executor实例表示。