第 41 章 消费者接口
摘要
本章论述了如何实现 Consumer 接口,这是 Apache Camel 组件实施中的基本步骤。
41.1. Consumer Interface 复制链接链接已复制到粘贴板!
概述 复制链接链接已复制到粘贴板!
org.apache.camel.Consumer 类型的实例代表路由中的源端点。实现消费者的方法有多种(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性反映在继承层次结构中(请参阅 图 41.1 “消费者继承层次结构”),其中包括几个不同的基本类来实现消费者。
图 41.1. 消费者继承层次结构
消费者参数注入 复制链接链接已复制到粘贴板!
对于遵循调度的轮询模式(请参阅 “调度的轮询模式”一节)的用户,Apache Camel 提供了将参数注入消费者实例的支持。例如,考虑由 自定义前缀 标识的组件的以下端点 URI:
custom:destination?consumer.myConsumerParam
custom:destination?consumer.myConsumerParam
Apache Camel 支持自动注入表单 consumer.\* 的查询选项。对于 consumer.myConsumerParam 参数,您需要在 Consumer 实现类上定义对应的 setter 和 getter 方法,如下所示:
getter 和 setter 方法遵循常见的 Java bean 惯例(包括属性名称的第一个字母大写)。
除了在 Consumer 实现中定义 bean 方法外,还必须记得在 Endpoint.createConsumer () 实现中调用 configureConsumer () 方法(请参阅 “调度的轮询端点实施”一节)。
例 41.1 “FileEndpoint createConsumer ()实现” 显示从文件组件中的 FileEndpoint 类的 createConsumer () 方法实现的示例:
例 41.1. FileEndpoint createConsumer ()实现
在运行时,消费者参数注入可以正常工作:
-
创建端点时,
DefaultComponent.createEndpoint (String uri)的默认实现会解析 URI 来提取消费者参数,并通过调用ScheduledPollEndpoint.configureProperties ()将它们存储在端点实例中。 -
当调用
createConsumer ()时,方法实现调用configureConsumer ()以注入消费者参数(请参阅 例 41.1 “FileEndpoint createConsumer ()实现”)。 -
configureConsumer ()方法使用 Java 反映调用 setter 方法,其名称与消费者。前缀被剥离后与相关选项匹配。
调度的轮询参数 复制链接链接已复制到粘贴板!
遵循调度的轮询模式的消费者自动支持 表 41.1 “调度的 Poll 参数” 中显示的消费者参数(它可以显示为端点 URI 中的查询选项)。
| 名称 | default | 描述 |
|---|---|---|
|
|
| 在第一次轮询前延迟(毫秒)。 |
|
|
|
取决于 |
|
|
|
如果为
如果为 |
在事件驱动的和轮询用户间进行转换 复制链接链接已复制到粘贴板!
Apache Camel 提供了两种特殊的消费者实施,可用于回滚事件驱动的消费者和轮询消费者。提供了以下转换类:
-
org.apache.camel.impl.EventDrivenPollingConsumercriu-PROFILEConverts 是一个事件驱动的消费者到轮询消费者实例中。 -
org.apache.camel.impl.DefaultScheduledPollConsumercriu-wagonConverts 一个轮询消费者到事件驱动的消费者实例中。
在实践中,这些类用于简化实施 Endpoint 类型的任务。Endpoint 接口定义了以下两种方法来创建消费者实例:
createConsumer () 返回事件驱动的消费者,createPollingConsumer () 返回轮询消费者。您只实施这些方法。例如,如果您遵循消费者的事件驱动模式,您可以实施 createConsumer () 方法,以便为 createPollingConsumer () 提供方法实现,该方法只是引发异常。但是,在转换类的帮助下,Apache Camel 可以提供更有用的默认实现。
例如,如果要根据事件驱动的模式实施使用者,您可以通过扩展 DefaultEndpoint 并实施 createConsumer () 方法来实现端点。createPollingConsumer () 的实现继承自 DefaultEndpoint,其定义如下:
public PollingConsumer<E> createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer<E>(this);
}
public PollingConsumer<E> createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer<E>(this);
}
EventDrivenPollingConsumer 构造器获取对事件驱动的消费者的引用,这 有效嵌套它并将其转换为轮询消费者。要实现转换,EventDrivenPollingConsumer 实例会缓冲传入的事件,并通过 receive ()、接收 (长超时)和 方法按需提供它们。
receive NoWait ()
类似地,如果您根据轮询模式实施使用者,您可以通过扩展 DefaultPollingEndpoint 并实施 createPollingConsumer () 方法来实现端点。在这种情况下,createConsumer () 方法的实现继承自 DefaultPollingEndpoint,默认的实现会返回 DefaultScheduledPollConsumer 实例(将轮询消费者转换为事件驱动的消费者)。
ShutdownPrepared 接口 复制链接链接已复制到粘贴板!
消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared 接口,这使得您的自定义消费者端点能够接收关闭通知。
例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared 接口的定义。
例 41.2. ShutdownPrepared Interface
ShutdownPrepared 接口定义了以下方法:
prepareShutdown接收通知以在一个或多个阶段关闭消费者端点,如下所示:
-
恰当的关闭 mvapich-busybox
where强制参数具有值false。尝试正常清理资源。例如,通过正常停止线程。 -
强制关闭 mvapich -wagonwhere
强制参数具有值true。这意味着关闭已超时,因此您必须更积极地清理资源。这是在进程退出前清除资源的最后一个机会。
-
恰当的关闭 mvapich-busybox
ShutdownAware 接口 复制链接链接已复制到粘贴板!
消费者类可以选择实施 org.apache.camel.spi.ShutdownAware 接口,该接口与安全关闭机制交互,使消费者能够向消费者要求额外时间关闭。对于 SEDA 等组件,这通常需要此项,这些交换存储在内部队列中。通常,您希望在关闭 SEDA 使用者之前处理队列中的所有交换。
例 41.3 “ShutdownAware Interface” 显示 ShutdownAware 接口的定义。
例 41.3. ShutdownAware Interface
ShutdownAware 接口定义了以下方法:
deferShutdown如果要延迟关闭消费者,从此方法返回
true。shutdownRunningTask参数是一个枚举,它可以是以下值之一:-
ShutdownRunningTask.CompleteCurrentTaskOnlyPROFILE-PROFILEfinish 处理当前由消费者线程池处理的交换,但不试图处理任何比该交换更多的交换。 -
ShutdownRunningTask.CompleteAllTasksPROFILE-PROFILEprocesss all pending Exchanges。例如,如果是 SEDA 组件,消费者将处理来自其传入队列的所有交换。
-
getPendingExchangesSize- 指明消费者保留处理多少个交换。零值表示处理已完成,并可关闭消费者。
有关如何定义 ShutdownAware 方法的示例,请参考 例 41.7 “自定义线程实施”。