第 41 章 消费者接口


摘要

本章论述了如何实施 Apache Camel 组件,这是实施 Apache Camel 组件的基本步骤。

41.1. 消费者接口

概述

org.apache.camel.Consumer 类型的实例代表路由中的源端点。实施消费者的方法有几种不同(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性反映了在继承层次(请参阅 图 41.1 “消费者继承层次结构”)中,其中包括用于实施消费者的多个不同基础类。

图 41.1. 消费者继承层次结构

consumer 参数注入

对于遵循调度的轮询模式的消费者(请参阅 “调度的轮询模式”一节),Apache Camel 提供将参数注入消费者实例中的支持。例如,考虑 自定义 前缀标识的组件以下端点 URI:

custom:destination?consumer.myConsumerParam
Copy to Clipboard Toggle word wrap

Apache Camel 支持以 使用者格式自动注入查询选项。\*.对于 consumer.myConsumerParam 参数,您需要在 Consumer 实现类上定义对应的 setter 和 getter 方法,如下所示:

public class CustomConsumer extends ScheduledPollConsumer {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}
Copy to Clipboard Toggle word wrap

在 getter 和 setter 方法遵循常规的 Java bean 惯例(包括大写属性名称的第一个字母)。

除了在消费者实现中定义 bean 方法,您还必须记得在 Endpoint.createConsumer () 的实现中调用 configureConsumer () 方法(请参阅 “调度的轮询端点实现”一节)。

例 41.1 “FileEndpoint createConsumer() Implementation” 显示来自文件组件的 FileEndpoint 类的 createConsumer () 方法实现的示例:

例 41.1. FileEndpoint createConsumer() Implementation

...
public class FileEndpoint extends ScheduledPollEndpoint {
    ...
    public Consumer createConsumer(Processor processor) throws Exception {
        Consumer result = new FileConsumer(this, processor);
        configureConsumer(result);
        return result;
    }
    ...
    }
Copy to Clipboard Toggle word wrap

在运行时,消费者参数注入的工作方式如下:

  1. 创建端点时,默认的 DefaultComponent.createEndpoint (String uri) 会解析 URI 以提取消费者参数,并通过调用 ScheduledPollEndpoint.configureProperties () 将端点存储在端点实例中。
  2. 当调用 createConsumer () 时,方法实现调用 configureConsumer () 来注入使用者参数(请参阅 例 41.1 “FileEndpoint createConsumer() Implementation”)。
  3. configureConsumer () 方法使用 Java reflection 调用与 使用者前缀之后匹配的集合方法。 前缀被剥离。

调度的轮询参数

遵循调度的轮询模式的使用者会自动支持 表 41.1 “调度的轮询参数” 中显示的使用者参数(它可以显示为端点 URI 中的查询选项)。

Expand
表 41.1. 调度的轮询参数
Namedefault描述

initialDelay

1000

在第一次轮询前以毫秒为单位。

delay

500

取决于 useFixedDelay 标记的值(时间单位为毫秒)。

useFixedDelay

false

如果为 false,则 延迟 参数解析为轮询周期。轮询将在 initialDelayinitialDelay+delayinitialDelay+2\*delay 等等进行。

如果为 true,则 delay 参数将解释为上一执行和下一次执行之间的时间。轮询将发生在 initialDelay,initialDelay+[ProcessingTime]+delay 等等。这里的 ProcessingTime 是处理当前线程中交换对象的时间。

在事件驱动的和轮询消费者间转换

Apache Camel 提供了两种特殊的使用者实施,可用于在事件驱动的消费者和轮询消费者之间进行转换。提供了以下转换类:

  • org.apache.camel.impl.EventDrivenPollingConsumer 记录将事件驱动的消费者引入轮询消费者实例中。
  • org.apache.camel.impl.DefaultScheduledPollConsumer fde-scannerConverer 轮询消费者到一个事件驱动的消费者实例中。

在实践中,这些类用于简化实施端点类型的任务。Endpoint 接口定义了以下两种方法来创建使用者实例:

package org.apache.camel;

public interface Endpoint {
    ...
    Consumer createConsumer(Processor processor) throws Exception;
    PollingConsumer createPollingConsumer() throws Exception;
}
Copy to Clipboard Toggle word wrap

createConsumer () 返回事件驱动的使用者,并创建PollingConsumer () 返回轮询使用者。您只需实施其中一种方法。例如,如果您遵循消费者的事件驱动模式,您要实施 createConsumer () 方法来提供方法实施,以创建仅引发异常的轮询Consumer ()。但是,在转换类的帮助下,Apache Camel 能够提供更有用的默认实施。

例如,如果要根据事件驱动的模式实施消费者,您可以通过扩展 DefaultEndpoint 并实施 createConsumer () 方法来实施端点。创建PollingConsumer () 的实现继承自 DefaultEndpoint,其定义如下:

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}
Copy to Clipboard Toggle word wrap

EventDrivenPolsum er 构造器使用对事件驱动的消费者的引用, 有效地换行并将其转换为轮询消费者。要实施转换,EventDrivenPollingConsumer 实例缓冲传入的事件,并通过 receive ()、接收 (长超时) 以及 receiveNoWait () 方法按需提供。

类似地,如果您根据轮询模式实施您的使用者,您通过扩展 DefaultPollingEndpoint 并实施 creation PollingConsumer () 方法来实施端点。在本例中,createConsumer () 方法的实现从 DefaultPollingEndpoint 继承,默认的实施会返回 DefaultScheduledPollConsumer 实例(它将轮询使用者转换为事件驱动的使用者)。

ShutdownPrepared 接口

消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared 接口,使您的自定义消费者端点能够接收关闭通知。

例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared 接口的定义。

例 41.2. ShutdownPrepared Interface

package org.apache.camel.spi;

public interface ShutdownPrepared {

    void prepareShutdown(boolean forced);

}
Copy to Clipboard Toggle word wrap

ShutdownPrepared 接口定义了以下方法:

prepareShutdown

接收在一个或多个阶段中关闭消费者端点的通知,如下所示:

  1. 正常关闭 abrt-where,强制 参数的值为 false。尝试正常清理资源。例如,通过正常停止线程。
  2. 强制关闭 TOKEN -where 强制 参数的值为 true。这意味着关闭已超时,因此您必须更加积极地清理资源。这是在进程退出前清理资源的最后几率。

ShutdownAware Interface

消费者类可以选择实施 org.apache.camel.spi.ShutdownAware 接口,该界面与安全关闭机制交互,使消费者能够寻求额外的时间来关闭。这通常需要组件(如 SEDA),这样可将待处理的交换存储在内部队列中。通常,您希望在关闭 SEDA 消费者之前处理队列中的所有交换。

例 41.3 “ShutdownAware Interface” 显示 ShutdownAware 接口的定义。

例 41.3. ShutdownAware Interface

// Java
package org.apache.camel.spi;

import org.apache.camel.ShutdownRunningTask;

public interface ShutdownAware extends ShutdownPrepared {

    boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);

    int getPendingExchangesSize();
}
Copy to Clipboard Toggle word wrap

ShutdownAware 接口定义了以下方法:

deferShutdown

如果要延迟关闭消费者,则从此方法返回 trueshutdownRunningTask 参数是一个 enum,可采用以下值之一:

  • ShutdownRunningTask.CompleteCurrentTaskOnly 的 方式运行处理当前由消费者的线程池处理的交换,但不会尝试处理比这更多的交换。
  • ShutdownRunningTask.CompleteAllTasks 例如,在 SEDA 组件的情况下,使用者将处理来自其传入队列的所有交换。
getPendingExchangesSize
表示使用者仍然处理多少个交换。零值表示处理已完成,消费者可以关闭。

有关如何定义 ShutdownAware 方法的示例,请参考 例 41.7 “自定义线程实现”

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat