第 41 章 消费者接口
摘要
本章论述了如何实施 Apache Camel 组件,这是实施 Apache Camel 组件的基本步骤。
41.1. 消费者接口 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
org.apache.camel.Consumer 类型的实例代表路由中的源端点。实施消费者的方法有几种不同(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性反映了在继承层次(请参阅 图 41.1 “消费者继承层次结构”)中,其中包括用于实施消费者的多个不同基础类。
图 41.1. 消费者继承层次结构
consumer 参数注入 复制链接链接已复制到粘贴板!
对于遵循调度的轮询模式的消费者(请参阅 “调度的轮询模式”一节),Apache Camel 提供将参数注入消费者实例中的支持。例如,考虑 自定义
前缀标识的组件以下端点 URI:
custom:destination?consumer.myConsumerParam
custom:destination?consumer.myConsumerParam
Apache Camel 支持以 使用者格式自动注入查询选项。\*
.对于 consumer.myConsumerParam
参数,您需要在 Consumer 实现类上定义对应的 setter 和 getter 方法,如下所示:
在 getter 和 setter 方法遵循常规的 Java bean 惯例(包括大写属性名称的第一个字母)。
除了在消费者实现中定义 bean 方法,您还必须记得在 Endpoint.createConsumer ()
的实现中调用 configureConsumer ()
方法(请参阅 “调度的轮询端点实现”一节)。
例 41.1 “FileEndpoint createConsumer() Implementation” 显示来自文件组件的 FileEndpoint
类的 createConsumer ()
方法实现的示例:
例 41.1. FileEndpoint createConsumer() Implementation
在运行时,消费者参数注入的工作方式如下:
-
创建端点时,默认的
DefaultComponent.createEndpoint (String uri)
会解析 URI 以提取消费者参数,并通过调用ScheduledPollEndpoint.configureProperties ()
将端点存储在端点实例中。 -
当调用
createConsumer ()
时,方法实现调用configureConsumer ()
来注入使用者参数(请参阅 例 41.1 “FileEndpoint createConsumer() Implementation”)。 -
configureConsumer ()
方法使用 Java reflection 调用与使用者前缀之后匹配的集合方法。
前缀被剥离。
调度的轮询参数 复制链接链接已复制到粘贴板!
遵循调度的轮询模式的使用者会自动支持 表 41.1 “调度的轮询参数” 中显示的使用者参数(它可以显示为端点 URI 中的查询选项)。
Name | default | 描述 |
---|---|---|
|
| 在第一次轮询前以毫秒为单位。 |
|
|
取决于 |
|
|
如果为
如果为 |
在事件驱动的和轮询消费者间转换 复制链接链接已复制到粘贴板!
Apache Camel 提供了两种特殊的使用者实施,可用于在事件驱动的消费者和轮询消费者之间进行转换。提供了以下转换类:
-
org.apache.camel.impl.EventDrivenPollingConsumer
记录将事件驱动的消费者引入轮询消费者实例中。 -
org.apache.camel.impl.DefaultScheduledPollConsumer
fde-scannerConverer 轮询消费者到一个事件驱动的消费者实例中。
在实践中,这些类用于简化实施端点类型的任务。Endpoint 接口定义了以下两种方法来创建使用者实例:
createConsumer ()
返回事件驱动的使用者,并创建PollingConsumer ()
返回轮询使用者。您只需实施其中一种方法。例如,如果您遵循消费者的事件驱动模式,您要实施 createConsumer ()
方法来提供方法实施,以创建仅引发异常的轮询Consumer ()
。但是,在转换类的帮助下,Apache Camel 能够提供更有用的默认实施。
例如,如果要根据事件驱动的模式实施消费者,您可以通过扩展 DefaultEndpoint
并实施 createConsumer ()
方法来实施端点。创建PollingConsumer ()
的实现继承自 DefaultEndpoint
,其定义如下:
public PollingConsumer<E> createPollingConsumer() throws Exception { return new EventDrivenPollingConsumer<E>(this); }
public PollingConsumer<E> createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer<E>(this);
}
EventDrivenPolsum
er 构造器使用对事件驱动的消费者的引用,此
有效地换行并将其转换为轮询消费者。要实施转换,EventDrivenPollingConsumer
实例缓冲传入的事件,并通过 receive
()、接收 (长超时)
以及 receiveNoWait ()
方法按需提供。
类似地,如果您根据轮询模式实施您的使用者,您通过扩展 DefaultPollingEndpoint
并实施 creation PollingConsumer ()
方法来实施端点。在本例中,createConsumer ()
方法的实现从 DefaultPollingEndpoint
继承,默认的实施会返回 DefaultScheduledPollConsumer
实例(它将轮询使用者转换为事件驱动的使用者)。
ShutdownPrepared 接口 复制链接链接已复制到粘贴板!
消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared
接口,使您的自定义消费者端点能够接收关闭通知。
例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared
接口的定义。
例 41.2. ShutdownPrepared Interface
ShutdownPrepared
接口定义了以下方法:
prepareShutdown
接收在一个或多个阶段中关闭消费者端点的通知,如下所示:
-
正常关闭 abrt-where,
强制
参数的值为false
。尝试正常清理资源。例如,通过正常停止线程。 -
强制关闭 TOKEN -where
强制
参数的值为true
。这意味着关闭已超时,因此您必须更加积极地清理资源。这是在进程退出前清理资源的最后几率。
-
正常关闭 abrt-where,
ShutdownAware Interface 复制链接链接已复制到粘贴板!
消费者类可以选择实施 org.apache.camel.spi.ShutdownAware
接口,该界面与安全关闭机制交互,使消费者能够寻求额外的时间来关闭。这通常需要组件(如 SEDA),这样可将待处理的交换存储在内部队列中。通常,您希望在关闭 SEDA 消费者之前处理队列中的所有交换。
例 41.3 “ShutdownAware Interface” 显示 ShutdownAware
接口的定义。
例 41.3. ShutdownAware Interface
ShutdownAware
接口定义了以下方法:
deferShutdown
如果要延迟关闭消费者,则从此方法返回
true
。shutdownRunningTask
参数是一个enum
,可采用以下值之一:-
ShutdownRunningTask.CompleteCurrentTaskOnly
的 方式运行处理当前由消费者的线程池处理的交换,但不会尝试处理比这更多的交换。 -
ShutdownRunningTask.CompleteAllTasks
例如,在 SEDA 组件的情况下,使用者将处理来自其传入队列的所有交换。
-
getPendingExchangesSize
- 表示使用者仍然处理多少个交换。零值表示处理已完成,消费者可以关闭。
有关如何定义 ShutdownAware
方法的示例,请参考 例 41.7 “自定义线程实现”。