搜索

第 41 章 消费者接口

download PDF

摘要

本章论述了如何实施使用者接口,这是实施 Apache Camel 组件中的基本步骤。

41.1. Consumer 接口

概述

org.apache.camel.Consumer 类型的实例代表路由中的源端点。实施消费者的方式有多种不同方法(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性在继承分级(查看 图 41.1 “消费者继承层次结构”)中有所反映,其中包括多个不同的基本类来实施消费者。

图 41.1. 消费者继承层次结构

消费者继承层次结构

consumer 参数注入

对于遵循调度轮询模式的用户(请参阅 “调度的轮询模式”一节),Apache Camel 为将参数注入消费者实例提供支持。例如,考虑由 自定义 前缀标识的组件的以下端点 URI:

custom:destination?consumer.myConsumerParam

Apache Camel 支持自动注入 使用者形式的查询选项。\*。对于 consumer.myConsumerParam 参数,您需要在 Consumer 实施类上定义对应的 setter 和 getter 方法,如下所示:

public class CustomConsumer extends ScheduledPollConsumer {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}

如果 getter 和 setter 方法遵循常见的 Java Bean 规范(包括属性名的第一个字母的大写)。

除了在消费者实施中定义 bean 方法外,还必须记住在 Endpoint.createConsumer() 实施中调用 configureConsumer() 方法(请参阅 “调度轮询端点实现”一节

例 41.1 “FileEndpoint createConsumer()实施” 显示了一个 createConsumer() 方法实现示例,它取自文件组件中的 FileEndpoint 类:

例 41.1. FileEndpoint createConsumer()实施

...
public class FileEndpoint extends ScheduledPollEndpoint {
    ...
    public Consumer createConsumer(Processor processor) throws Exception {
        Consumer result = new FileConsumer(this, processor);
        configureConsumer(result);
        return result;
    }
    ...
    }

在运行时,使用者参数注入功能如下:

  1. 创建端点时,DefaultComponent.createEndpoint(String uri) 的默认实现会解析 URI 以提取消费者参数,并通过调用 ScheduledPollEndpoint.configureProperties() 来把它们存储在端点实例中。
  2. 调用 createConsumer() 时,方法实施调用 configureConsumer() 以注入消费者参数(请参阅 例 41.1 “FileEndpoint createConsumer()实施”)。
  3. configureConsumer() 方法使用 Java 反应来调用名称与 消费者后的相关选项匹配的 setter 方法。 前缀被剥离。

调度的轮询参数

按照调度的轮询模式的用户自动支持 表 41.1 “调度的轮询参数” 中显示的消费者(它们可能显示为端点 URI 中的查询选项)。

表 41.1. 调度的轮询参数
名称default描述

initialDelay

1000

延迟(以毫秒为单位)到第一次轮询前。

delay

500

取决于 useFixedDelay 标志的值(时间单位为毫秒)。

useFixedDelay

false

如果为 false,则 delay 参数被解释为轮询周期。轮询将在 initialDelayinitialDelay+delayinitialDelay+2\*delay 等等。

如果为 true,则 delay 参数将解释为上一执行与下一次执行之间所经过的时间。轮询将在 initialDelayinitialDelay+[ProcessingTime]+delay 等等。其中 ProcessingTime 是处理当前线程中的交换对象所需时间。

在事件驱动和轮询消费者间转换

Apache Camel 提供了两种特殊的消费者实施,可用于在事件驱动的消费者和轮询消费者之间转换和发送。提供了以下转换类:

  • org.apache.camel.impl.EventDrivenPollingConsumer 进行在轮询消费者实例中一个事件驱动的消费者。
  • org.apache.camel.impl.DefaultScheduledPollConsumer abrt-abrtConverts一个轮询消费者到事件驱动的消费者实例中。

在实践中,这些类用于简化实施端点类型的任务。Endpoint 接口定义了以下两种方法来创建消费者实例:

package org.apache.camel;

public interface Endpoint {
    ...
    Consumer createConsumer(Processor processor) throws Exception;
    PollingConsumer createPollingConsumer() throws Exception;
}

createConsumer() 返回事件驱动的消费者,创建PollingConsumer() 返回轮询消费者。您只能实施这些方法。例如,如果您遵循消费者的事件驱动的模式,则实施 createConsumer() 方法,为创建 PollingConsumer() 提供方法实施,以简单地引发异常。但是,通过转换类,Apache Camel 可以提供更有用的默认实施。

例如,如果要根据事件驱动的模式实施消费者,您可以通过扩展 DefaultEndpoint 并实施 createConsumer() 方法来实施端点。createPollingConsumer() 的实现从 DefaultEndpoint 继承,其定义如下:

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}

EventDrivenPollingConsumer 构造器引用事件驱动的消费者, 实际上将其嵌套,并将其转换为轮询消费者。要实现转换,EventDrivenPollingConsumer 实例缓冲传入的事件,并通过 receive (长超时)和 receive NoWait() 方法根据需要提供它们。

类似地,如果您按照轮询模式实施消费者,通过扩展 DefaultPollingEndpoint 和实施 createPollingConsumer() 方法来实施端点。在本例中,createConsumer() 方法的实现从 DefaultPollingEndpoint 继承,默认的实现会返回一个 DefaultScheduledPollConsumer 实例(将轮询消费者转换为事件驱动的消费者)。

ShutdownPrepared 接口

消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared 接口,它使您的自定义消费者端点能够接收关闭通知。

例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared 接口的定义。

例 41.2. ShutdownPrepared Interface

package org.apache.camel.spi;

public interface ShutdownPrepared {

    void prepareShutdown(boolean forced);

}

ShutdownPrepared 接口定义了以下方法:

prepareShutdown

接收通知以一个或两个阶段关闭消费者端点,如下所示:

  1. 正常关闭 10.10.10.2 - thewherewhere the parameter 的值为 false。尝试正常清理资源。例如,通过正常停止线程。
  2. 强制关闭 InventoryService-West where the force argument 的值为 true。这意味着关闭超时,因此您必须更加积极地清理资源。这是在进程退出前清理资源的最后一个机会。

ShutdownAware 接口

消费者类可以选择实施 org.apache.camel.spi.ShutdownAware 接口,该界面与安全关闭机制交互,让消费者要求额外的时间关机。这通常需要这个组件,比如 SEDA,它们可能会使存储在内部队列中的待处理交换。通常,您希望在关闭 SEDA 消费者之前处理队列中的所有交换。

例 41.3 “ShutdownAware Interface” 显示 ShutdownAware 接口的定义。

例 41.3. ShutdownAware Interface

// Java
package org.apache.camel.spi;

import org.apache.camel.ShutdownRunningTask;

public interface ShutdownAware extends ShutdownPrepared {

    boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);

    int getPendingExchangesSize();
}

ShutdownAware 接口定义以下方法:

deferShutdown

如果您想要延迟用户关闭,请从此方法返回 trueshutdownRunningTask 参数是一个 enum,可取以下值之一:

  • ShutdownRunningTask.CompleteCurrentTaskOnly abrt-abrtfinish 处理目前由消费者的线程池处理的交换,但不试图处理超过该交换的交换。
  • ShutdownRunningTask.CompleteAllTasks abrt- 时间 处理所有待处理的交换。例如,如果 SEDA 组件,使用者将处理来自其传入队列的所有交换。
getPendingExchangesSize
指明消费者用于处理多少交换。零值表示完成处理,可关闭消费者。

有关如何定义 ShutdownAware 方法的示例,请参考 例 41.7 “自定义线程实施”

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.