41.2. 实施消费者接口


实施消费者的替代方法

您可以使用以下方法之一实现使用者:

事件驱动的消费者实施

在事件驱动的消费者中,外部事件明确驱动处理。事件通过 event-listener 接口接收,其中监听器接口特定于特定事件源。

例 41.4 “JMXConsumer 实施” 显示 JMXConsumer 类的实施,该类取自 Apache Camel JMX 组件实施。JMXConsumer 类是一个事件驱动的使用者示例,它通过继承 org.apache.camel.impl.DefaultConsumer 类来实现。对于 JMXConsumer 示例,事件通过 NotificationListener.handleNotification () 方法上的调用来表示,这是接收 JMX 事件的一种标准方式。要接收这些 JMX 事件,需要实现 NotificationListener 接口并覆盖 handleNotification () 方法,如 例 41.4 “JMXConsumer 实施” 所示。

例 41.4. JMXConsumer 实施

package org.apache.camel.component.jmx;

import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1

    JMXEndpoint jmxEndpoint;

    public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.jmxEndpoint = endpoint;
    }

    public void handleNotification(Notification notification, Object handback) { 3
        try {
            getProcessor().process(jmxEndpoint.createExchange(notification)); 4
        } catch (Throwable e) {
            handleException(e); 5
        }
    }
}
1
JMXConsumer 模式通过扩展 DefaultConsumer 类来遵循事件驱动的用户的常规模式。此外,由于此消费者设计用于接收来自 JMX (由 JMX 通知表示)的事件,因此实施 NotificationListener 接口是必需的。
2
您必须至少实施一个构造器,该构造器将父 端点、端点 的引用以及到链中的下一个 处理器 引用作为参数。
3
每当 JMX 通知到达时,handleNotification () 方法(在 NotificationListener中定义)将自动被 JMX 调用。这个方法的正文应包含执行消费者事件处理的代码。由于 handleNotification () 调用源自 JMX 层,因此消费者的线程模型由 JMX 层隐式控制,而不是由 JMXConsumer 类控制。
4
此代码行中包含了两个步骤:首先,JMX 通知对象转换为交换对象,这是 Apache Camel 中事件的通用表示。然后,新创建的交换对象将传递到路由中的下一个处理器(同步)。
5
handleException () 方法由 DefaultConsumer 基本类实施。默认情况下,它使用 org.apache.camel.impl.LoggingExceptionHandler 类来处理异常。
注意

handleNotification () 方法特定于 JMX 示例。实施您自己的事件驱动消费者时,您必须识别在自定义消费者中实施的一种类类事件监听程序方法。

调度的轮询消费者实施

在调度的轮询消费者中,轮询事件由计时器类( java.util.concurrent.ScheduledExecutorService )自动生成。要接收生成的轮询事件,您必须实施 ScheduledPollConsumer.poll () 方法(请参阅 第 38.1.3 节 “消费者模式和线程”)。

例 41.5 “ScheduledPollConsumer 实施” 介绍如何实施遵循调度轮询模式的使用者,该模式通过扩展 ScheduledPollConsumer 类来实施。

例 41.5. ScheduledPollConsumer 实施

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;

import org.apache.camel.impl.ScheduledPollConsumer;

public class pass:quotes[CustomConsumer] extends ScheduledPollConsumer { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    protected void poll() throws Exception { 3
        Exchange exchange = /* Receive exchange object ... */;

        // Example of a synchronous processor.
        getProcessor().process(exchange); 4
    }

    @Override
    protected void doStart() throws Exception { 5
        // Pre-Start:
        // Place code here to execute just before start of processing.
        super.doStart();
        // Post-Start:
        // Place code here to execute just after start of processing.
    }

    @Override
    protected void doStop() throws Exception { 6
        // Pre-Stop:
        // Place code here to execute just before processing stops.
        super.doStop();
        // Post-Stop:
        // Place code here to execute just after processing stops.
    }
}
1
通过扩展 org.apache.camel.impl.ScheduledPollConsumer 类,实施计划的 poll consumer 类 CustomConsumer
2
您必须至少实施一个构造器,该构造器将父 端点、端点 的引用以及到链中的下一个 处理器 引用作为参数。
3
覆盖 poll () 方法,以接收计划的轮询事件。这是您应该放置检索和处理传入事件(通过交换对象代表)的代码。
4
在本例中,事件会同步处理。如果想要异步处理事件,则需要通过调用 getAsyncProcessor () 来改用对异步处理器的引用。有关如何异步处理事件的详情,请参考 第 38.1.4 节 “异步处理”
5
(可选) 如果您希望某些行代码在使用者启动时执行,请按如下所示覆盖 doStart () 方法。
6
(可选) 如果您希望某些行代码以使用者的形式执行,请按如下所示覆盖 doStop () 方法。

轮询消费者实施

例 41.6 “PollingConsumerSupport 的实现” 概述了如何实施遵循轮询模式的使用者,它通过扩展 轮询ConsumerSupport 类来实施。

例 41.6. PollingConsumerSupport 的实现

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.PollingConsumerSupport;

public class pass:quotes[CustomConsumer] extends PollingConsumerSupport { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint) { 2
        super(endpoint);
        this.endpoint = endpoint;
    }

    public Exchange receiveNoWait() { 3
        Exchange exchange = /* Obtain an exchange object. */;
        // Further processing ...
        return exchange;
    }

    public Exchange receive() { 4
        // Blocking poll ...
    }

    public Exchange receive(long timeout) { 5
        // Poll with timeout ...
    }

    protected void doStart() throws Exception { 6
        // Code to execute whilst starting up.
    }

    protected void doStop() throws Exception {
        // Code to execute whilst shutting down.
    }
}
1
通过扩展 org.apache.camel.impl.PollingConsumerSupport 类来实施您的轮询消费者类 CustomConsumer
2
您必须至少实施一个构造器,它作为参数获取到父端点 端点端点。轮询使用者不需要引用处理器实例。
3
receiveNoWait () 方法应实施非阻塞算法来获取事件(交换对象)。如果没有事件可用,它应该返回 null
4
receive () 方法应实施用于检索事件的阻塞算法。如果事件不可用,此方法可以无限期地阻止。
5
receive (long timeout) 方法实施一个可以阻止的算法,只要指定的超时(通常以毫秒为单位指定)。
6
如果要插入在使用者启动或关闭时执行的代码,请分别实施 doStart () 方法和 doStop () 方法。

自定义线程实现

如果标准消费者模式不适用于您的消费者实施,您可以直接实施 Consumer 接口并自行编写线程代码。但是,在编写线程代码时,务必要遵守标准 Apache Camel 线程模型,如 第 2.8 节 “线程模型” 所述。

例如,来自 camel-core 的 SEDA 组件实施了自己的消费者线程,这与 Apache Camel 线程模型一致。例 41.7 “自定义线程实现” 显示了 SedaConsumer 类如何实施其线程的概要。

例 41.7. 自定义线程实现

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Consumer for the SEDA component.
 *
 * @version $Revision: 922485 $
 */
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { 1
    private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);

    private SedaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;
    ...
    public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = processor;
    }
    ...

    public void run() { 2
        BlockingQueue<Exchange> queue = endpoint.getQueue();
        // Poll the queue and process exchanges
        ...
    }

    ...
    protected void doStart() throws Exception { 3
        int poolSize = endpoint.getConcurrentConsumers();
        executor = endpoint.getCamelContext().getExecutorServiceStrategy()
            .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 4
        for (int i = 0; i < poolSize; i++) { 5
            executor.execute(this);
        }
        endpoint.onStarted(this);
    }

    protected void doStop() throws Exception { 6
        endpoint.onStopped(this);
        // must shutdown executor on stop to avoid overhead of having them running
        endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); 7

        if (multicast != null) {
            ServiceHelper.stopServices(multicast);
        }
    }
    ...
    //----------
    // Implementation of ShutdownAware interface

    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        // deny stopping on shutdown as we want seda consumers to run in case some other queues
        // depend on this consumer to run, so it can complete its exchanges
        return true;
    }

    public int getPendingExchangesSize() {
        // number of pending messages on the queue
        return endpoint.getQueue().size();
    }

}
1
SedaConsumer 类通过扩展 org.apache.camel.impl.ServiceSupport 类来实施,并实施 使用者RunnableShutdownAware 接口。
2
实施 Runnable.run () 方法,以定义使用者在线程中运行的时间。在这种情况下,消费者在循环中运行,轮询新交换的队列,然后在队列的后方处理交换。
3
doStart () 方法继承自 ServiceSupport。您可以覆盖此方法来定义消费者启动时的功能。
4
您应当使用使用 CamelContext 注册的 ExecutorServiceStrategy 对象来创建线程池,而不是直接创建线程。这很重要,因为它使 Apache Camel 能够集中管理线程,并且支持安全关闭等功能。详情请查看 第 2.8 节 “线程模型”
5
通过调用 ExecutorService.execute () 方法 池Size 来启动 线程。
6
doStop () 方法从 ServiceSupport 继承。您可以覆盖此方法,以定义消费者在关闭时执行的操作。
7
关闭线程池,它由 executor 实例表示。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.