第 41 章 消费者接口
摘要
本章论述了如何实施 Consumer 接口,这是 Apache Camel 组件的实现中的一个重要步骤。
41.1. Consumer 接口
概述
org.apache.camel.Consumer 类型的实例代表路由中的源端点。实现消费者的方法有几种(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性在继承层次结构中反映在继承层次结构中(请参阅 图 41.1 “消费者继承层次结构”),其中包括几个用于实现消费者的不同基础类。
图 41.1. 消费者继承层次结构
消费者参数注入
对于遵循调度轮询模式的消费者(请参阅 “调度的轮询模式”一节),Apache Camel 为将参数注入消费者实例提供支持。例如,对于由 自定义前缀
标识的组件,请考虑以下端点 URI:
custom:destination?consumer.myConsumerParam
Apache Camel 支持自动注入 consumer.\*
格式的查询选项。对于 consumer.myConsumerParam
参数,您需要在 Consumer 实现类中定义对应的 setter 和 getter 方法,如下所示:
public class CustomConsumer extends ScheduledPollConsumer {
...
String getMyConsumerParam() { ... }
void setMyConsumerParam(String s) { ... }
...
}
其中 getter 和 setter 方法遵循常见的 Java Bean 约定(包括大写属性名称的第一个字母)。
除了在 Consumer 实现中定义 bean 方法外,还必须记得在 Endpoint.createConsumer ()
实现中调用 configureConsumer ()
方法(请参阅 “调度的轮询端点实现”一节)。
例 41.1 “FileEndpoint createConsumer ()Implementation” 显示了一个 createConsumer ()
方法实现的示例,它取自文件组件的 FileEndpoint
类:
例 41.1. FileEndpoint createConsumer ()Implementation
... public class FileEndpoint extends ScheduledPollEndpoint { ... public Consumer createConsumer(Processor processor) throws Exception { Consumer result = new FileConsumer(this, processor); configureConsumer(result); return result; } ... }
在运行时,消费者参数注入的工作方式如下:
-
创建端点时,
DefaultComponent.createEndpoint (String uri)
的默认实现会解析 URI 以提取消费者参数,并通过调用ScheduledPollEndpoint.configureProperties ()
来将其存储在端点实例中。 -
当调用
createConsumer ()
时,方法实现调用configureConsumer ()
来注入消费者参数(请参阅 例 41.1 “FileEndpoint createConsumer ()Implementation”)。 -
configureConsumer ()
方法使用 Java 反映来调用在consumer.
前缀被剥离后名称与相关选项匹配的 setter 方法。
调度的轮询参数
遵循调度的轮询模式的消费者会自动支持 表 41.1 “调度的 Poll 参数” 中显示的消费者参数(可在端点 URI 中显示为查询选项)。
Name | default | 描述 |
---|---|---|
|
| 第一次轮询前的延迟(以毫秒为单位)。 |
|
|
取决于 |
|
|
如果为
如果为 |
在事件驱动和轮询用户间转换
Apache Camel 提供了两种特殊的消费者实施,可用于在事件驱动的消费者和轮询消费者之间转换。提供了以下转换类:
-
org.apache.camel.impl.EventDrivenPollingConsumer
swig-wagonConver 将事件驱动的消费者转换为轮询消费者实例。 -
org.apache.camel.impl.DefaultScheduledPollConsumer
swig-wagonConverts 将轮询消费者转换成事件驱动的消费者实例。
实际上,这些类用于简化实施 Endpoint 类型的任务。Endpoint 接口定义了创建消费者实例的以下两种方法:
package org.apache.camel; public interface Endpoint { ... Consumer createConsumer(Processor processor) throws Exception; PollingConsumer createPollingConsumer() throws Exception; }
createConsumer ()
返回事件驱动的消费者,并且 createPollingConsumer ()
返回轮询消费者。您仅实施这些方法。例如,如果您遵循消费者的事件驱动模式,您将实施 createConsumer ()
方法,以提供仅引发异常的 createPollingConsumer ()
的方法实现。但是,在转换类帮助下,Apache Camel 能够提供更有用的默认实施。
例如,如果要根据事件驱动的模式实施消费者,您可以通过扩展 DefaultEndpoint
来实现端点,并实施 createConsumer ()
方法。createPollingConsumer ()
的实现继承自 DefaultEndpoint
,其定义如下:
public PollingConsumer<E> createPollingConsumer() throws Exception { return new EventDrivenPollingConsumer<E>(this); }
EventDrivenPollingConsumer
构造器会引用事件驱动的消费者,这会
有效地将其嵌套并转换为轮询消费者。要实现转换,EventDrivenPollingConsumer
实例缓冲传入的事件,并通过 receive ()
、接收 (长超时)和
方法按需提供它们。
receive
NoWait ()
类似地,如果您根据轮询模式实施消费者,您可以通过扩展 DefaultPollingEndpoint
并实施 createPollingConsumer ()
方法来实现端点。在本例中,createConsumer ()
方法的实现继承自 DefaultPollingEndpoint
,默认的实现会返回 DefaultScheduledPollConsumer
实例(它将轮询消费者转换为事件驱动的消费者)。
ShutdownPrepared interface
消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared
接口,它允许自定义消费者端点接收关闭通知。
例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared
接口的定义。
例 41.2. ShutdownPrepared Interface
package org.apache.camel.spi; public interface ShutdownPrepared { void prepareShutdown(boolean forced); }
ShutdownPrepared
接口定义以下方法:
prepareShutdown
接收通知以在一个或多个阶段关闭消费者端点,如下所示:
-
在强制参数的位置,恰当的关闭 是,其值为
false
。尝试安全地清理资源。例如,通过安全停止线程。
-
强制关闭 HEKETI -wagon,
其中
强制参数的值为true
。这意味着关闭已超时,因此您必须积极清理资源。这是在进程退出前清理资源的最后一个机会。
-
在强制参数的位置,恰当的关闭 是,其值为
ShutdownAware 接口
消费者类可以选择实施 org.apache.camel.spi.ShutdownAware
接口,后者与安全关闭机制交互,使消费者能够请求额外的时间关闭。这通常需要 SEDA 等组件,这些组件可能将待处理的交换存储在内部队列中。通常,您要在关闭 SEDA 使用者前处理队列中的所有交换。
例 41.3 “ShutdownAware Interface” 显示 ShutdownAware
接口的定义。
例 41.3. ShutdownAware Interface
// Java package org.apache.camel.spi; import org.apache.camel.ShutdownRunningTask; public interface ShutdownAware extends ShutdownPrepared { boolean deferShutdown(ShutdownRunningTask shutdownRunningTask); int getPendingExchangesSize(); }
ShutdownAware
接口定义以下方法:
deferShutdown
从此方法返回
true
,如果要延迟消费者的关闭。shutdownRunningTask
参数是一个enum
,它可以采用以下值之一:-
ShutdownRunningTask.CompleteCurrentTaskOnly
HEKETI-wagonfinish 处理当前由消费者线程池处理的交换,但不要尝试处理比此更多的交换。 -
ShutdownRunningTask.CompleteAllTasks
HEKETI-wagonprocess all of the pending Exchanges。例如,如果是 SEDA 组件,使用者将处理来自其传入队列的所有交换。
-
getPendingExchangesSize
- 表示消费者仍要处理的交换数。零值表示完成处理,并且可以关闭消费者。
有关如何定义 ShutdownAware
方法的示例,请参考 例 41.7 “自定义线程实施”。