第 41 章 消费者接口


摘要

本章论述了如何实施 Consumer 接口,这是 Apache Camel 组件的实现中的一个重要步骤。

41.1. Consumer 接口

概述

org.apache.camel.Consumer 类型的实例代表路由中的源端点。实现消费者的方法有几种(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性在继承层次结构中反映在继承层次结构中(请参阅 图 41.1 “消费者继承层次结构”),其中包括几个用于实现消费者的不同基础类。

图 41.1. 消费者继承层次结构

消费者参数注入

对于遵循调度轮询模式的消费者(请参阅 “调度的轮询模式”一节),Apache Camel 为将参数注入消费者实例提供支持。例如,对于由 自定义前缀 标识的组件,请考虑以下端点 URI:

custom:destination?consumer.myConsumerParam
Copy to Clipboard Toggle word wrap

Apache Camel 支持自动注入 consumer.\* 格式的查询选项。对于 consumer.myConsumerParam 参数,您需要在 Consumer 实现类中定义对应的 setter 和 getter 方法,如下所示:

public class CustomConsumer extends ScheduledPollConsumer {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}
Copy to Clipboard Toggle word wrap

其中 getter 和 setter 方法遵循常见的 Java Bean 约定(包括大写属性名称的第一个字母)。

除了在 Consumer 实现中定义 bean 方法外,还必须记得在 Endpoint.createConsumer () 实现中调用 configureConsumer () 方法(请参阅 “调度的轮询端点实现”一节)。

例 41.1 “FileEndpoint createConsumer ()Implementation” 显示了一个 createConsumer () 方法实现的示例,它取自文件组件的 FileEndpoint 类:

例 41.1. FileEndpoint createConsumer ()Implementation

...
public class FileEndpoint extends ScheduledPollEndpoint {
    ...
    public Consumer createConsumer(Processor processor) throws Exception {
        Consumer result = new FileConsumer(this, processor);
        configureConsumer(result);
        return result;
    }
    ...
    }
Copy to Clipboard Toggle word wrap

在运行时,消费者参数注入的工作方式如下:

  1. 创建端点时,DefaultComponent.createEndpoint (String uri) 的默认实现会解析 URI 以提取消费者参数,并通过调用 ScheduledPollEndpoint.configureProperties () 来将其存储在端点实例中。
  2. 当调用 createConsumer () 时,方法实现调用 configureConsumer () 来注入消费者参数(请参阅 例 41.1 “FileEndpoint createConsumer ()Implementation”)。
  3. configureConsumer () 方法使用 Java 反映来调用在 consumer. 前缀被剥离后名称与相关选项匹配的 setter 方法。

调度的轮询参数

遵循调度的轮询模式的消费者会自动支持 表 41.1 “调度的 Poll 参数” 中显示的消费者参数(可在端点 URI 中显示为查询选项)。

Expand
表 41.1. 调度的 Poll 参数
Namedefault描述

initialDelay

1000

第一次轮询前的延迟(以毫秒为单位)。

delay

500

取决于 useFixedDelay 标记的值(时间单位为毫秒)。

useFixedDelay

false

如果为 false,则 delay 参数将解释为轮询周期。轮询将在 initialDelay、 initialDelay +delayinitialDelay+2\*delay 等发生。

如果为 true,则 delay 参数将解释为上一个执行和下一次执行之间经过的时间。轮询将在 initialDelay、 initialDelay +[ProcessingTime]+delay 等发生。其中 ProcessingTime 是处理当前线程中交换对象所需的时间。

在事件驱动和轮询用户间转换

Apache Camel 提供了两种特殊的消费者实施,可用于在事件驱动的消费者和轮询消费者之间转换。提供了以下转换类:

  • org.apache.camel.impl.EventDrivenPollingConsumer swig-wagonConver 将事件驱动的消费者转换为轮询消费者实例。
  • org.apache.camel.impl.DefaultScheduledPollConsumer swig-wagonConverts 将轮询消费者转换成事件驱动的消费者实例。

实际上,这些类用于简化实施 Endpoint 类型的任务。Endpoint 接口定义了创建消费者实例的以下两种方法:

package org.apache.camel;

public interface Endpoint {
    ...
    Consumer createConsumer(Processor processor) throws Exception;
    PollingConsumer createPollingConsumer() throws Exception;
}
Copy to Clipboard Toggle word wrap

createConsumer () 返回事件驱动的消费者,并且 createPollingConsumer () 返回轮询消费者。您仅实施这些方法。例如,如果您遵循消费者的事件驱动模式,您将实施 createConsumer () 方法,以提供仅引发异常的 createPollingConsumer () 的方法实现。但是,在转换类帮助下,Apache Camel 能够提供更有用的默认实施。

例如,如果要根据事件驱动的模式实施消费者,您可以通过扩展 DefaultEndpoint 来实现端点,并实施 createConsumer () 方法。createPollingConsumer () 的实现继承自 DefaultEndpoint,其定义如下:

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}
Copy to Clipboard Toggle word wrap

EventDrivenPollingConsumer 构造器会引用事件驱动的消费者,这会 有效地将其嵌套并转换为轮询消费者。要实现转换,EventDrivenPollingConsumer 实例缓冲传入的事件,并通过 receive ()、接收 (长超时)和 receive NoWait () 方法按需提供它们。

类似地,如果您根据轮询模式实施消费者,您可以通过扩展 DefaultPollingEndpoint 并实施 createPollingConsumer () 方法来实现端点。在本例中,createConsumer () 方法的实现继承自 DefaultPollingEndpoint,默认的实现会返回 DefaultScheduledPollConsumer 实例(它将轮询消费者转换为事件驱动的消费者)。

ShutdownPrepared interface

消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared 接口,它允许自定义消费者端点接收关闭通知。

例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared 接口的定义。

例 41.2. ShutdownPrepared Interface

package org.apache.camel.spi;

public interface ShutdownPrepared {

    void prepareShutdown(boolean forced);

}
Copy to Clipboard Toggle word wrap

ShutdownPrepared 接口定义以下方法:

prepareShutdown

接收通知以在一个或多个阶段关闭消费者端点,如下所示:

  1. 在强制参数的位置,恰当的关闭 是,其值为 false尝试安全地清理资源。例如,通过安全停止线程。
  2. 强制关闭 HEKETI -wagon,其中 强制参数的值为 true。这意味着关闭已超时,因此您必须积极清理资源。这是在进程退出前清理资源的最后一个机会。

ShutdownAware 接口

消费者类可以选择实施 org.apache.camel.spi.ShutdownAware 接口,后者与安全关闭机制交互,使消费者能够请求额外的时间关闭。这通常需要 SEDA 等组件,这些组件可能将待处理的交换存储在内部队列中。通常,您要在关闭 SEDA 使用者前处理队列中的所有交换。

例 41.3 “ShutdownAware Interface” 显示 ShutdownAware 接口的定义。

例 41.3. ShutdownAware Interface

// Java
package org.apache.camel.spi;

import org.apache.camel.ShutdownRunningTask;

public interface ShutdownAware extends ShutdownPrepared {

    boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);

    int getPendingExchangesSize();
}
Copy to Clipboard Toggle word wrap

ShutdownAware 接口定义以下方法:

deferShutdown

从此方法返回 true,如果要延迟消费者的关闭。shutdownRunningTask 参数是一个 enum,它可以采用以下值之一:

  • ShutdownRunningTask.CompleteCurrentTaskOnly HEKETI-wagonfinish 处理当前由消费者线程池处理的交换,但不要尝试处理比此更多的交换。
  • ShutdownRunningTask.CompleteAllTasks HEKETI-wagonprocess all of the pending Exchanges。例如,如果是 SEDA 组件,使用者将处理来自其传入队列的所有交换。
getPendingExchangesSize
表示消费者仍要处理的交换数。零值表示完成处理,并且可以关闭消费者。

有关如何定义 ShutdownAware 方法的示例,请参考 例 41.7 “自定义线程实施”

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat