第 41 章 消费者接口


摘要

本章论述了如何实现 Consumer 接口,这是 Apache Camel 组件实施中的基本步骤。

41.1. Consumer Interface

概述

org.apache.camel.Consumer 类型的实例代表路由中的源端点。实现消费者的方法有多种(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性反映在继承层次结构中(请参阅 图 41.1 “消费者继承层次结构”),其中包括几个不同的基本类来实现消费者。

图 41.1. 消费者继承层次结构

消费者参数注入

对于遵循调度的轮询模式(请参阅 “调度的轮询模式”一节)的用户,Apache Camel 提供了将参数注入消费者实例的支持。例如,考虑由 自定义前缀 标识的组件的以下端点 URI:

custom:destination?consumer.myConsumerParam
Copy to Clipboard Toggle word wrap

Apache Camel 支持自动注入表单 consumer.\* 的查询选项。对于 consumer.myConsumerParam 参数,您需要在 Consumer 实现类上定义对应的 setter 和 getter 方法,如下所示:

public class CustomConsumer extends ScheduledPollConsumer {
    ...
    String getMyConsumerParam() { ... }
    void setMyConsumerParam(String s) { ... }
    ...
}
Copy to Clipboard Toggle word wrap

getter 和 setter 方法遵循常见的 Java bean 惯例(包括属性名称的第一个字母大写)。

除了在 Consumer 实现中定义 bean 方法外,您还必须记得在 Endpoint.createConsumer () 实施中调用 configureConsumer () 方法。请参阅 “调度的轮询端点实施”一节)。例 41.1 “FileEndpoint createConsumer ()实现” 显示从文件组件中的 FileEndpoint 类的 createConsumer () 方法实现的示例:

例 41.1. FileEndpoint createConsumer ()实现

...
public class FileEndpoint extends ScheduledPollEndpoint {
    ...
    public Consumer createConsumer(Processor processor) throws Exception {
        Consumer result = new FileConsumer(this, processor);
        configureConsumer(result);
        return result;
    }
    ...
    }
Copy to Clipboard Toggle word wrap

在运行时,消费者参数注入可以正常工作:

  1. 创建端点时,DefaultComponent.createEndpoint (String uri) 的默认实现会解析 URI 来提取消费者参数,并通过调用 ScheduledPollEndpoint.configureProperties () 将它们存储在端点实例中。
  2. 当调用 createConsumer () 时,方法实现调用 configureConsumer () 以注入消费者参数(请参阅 例 41.1 “FileEndpoint createConsumer ()实现”)。
  3. configureConsumer () 方法使用 Java 反映调用 setter 方法,其名称与 消费者。 前缀被剥离后与相关选项匹配。

调度的轮询参数

遵循调度的轮询模式的消费者自动支持 表 41.1 “调度的 Poll 参数” 中显示的消费者参数(它可以显示为端点 URI 中的查询选项)。

Expand
表 41.1. 调度的 Poll 参数
Namedefault描述

initialDelay

1000

在第一次轮询前延迟(毫秒)。

delay

500

取决于 useFixedDelay 标记的值(时间单位为毫秒)。

useFixedDelay

false

如果为 false,则 delay 参数被解释为轮询周期。轮询将在 initialDelay,initialDelay+delay,initialDelay+2\*delay 处发生,以此类推。

如果为 true,则 delay 参数解释为在上一个执行和下一个执行之间经过的时间。轮询将在 initialDelay、 initialDelay +[ProcessingTime]+delay 处发生,以此类推。其中 ProcessingTime 是处理当前线程中交换对象所需的时间。

在事件驱动的和轮询用户间进行转换

Apache Camel 提供了两种特殊的消费者实施,可用于回滚事件驱动的消费者和轮询消费者。提供了以下转换类:

  • org.apache.camel.impl.EventDrivenPollingConsumer criu-PROFILEConverts 是一个事件驱动的消费者到轮询消费者实例中。
  • org.apache.camel.impl.DefaultScheduledPollConsumer criu-wagonConverts 一个轮询消费者到事件驱动的消费者实例中。

在实践中,这些类用于简化实施 Endpoint 类型的任务。Endpoint 接口定义了以下两种方法来创建消费者实例:

package org.apache.camel;

public interface Endpoint {
    ...
    Consumer createConsumer(Processor processor) throws Exception;
    PollingConsumer createPollingConsumer() throws Exception;
}
Copy to Clipboard Toggle word wrap

createConsumer () 返回事件驱动的消费者,createPollingConsumer () 返回轮询消费者。您只实施这些方法。例如,如果您遵循消费者的事件驱动模式,您可以实施 createConsumer () 方法为 createPollingConsumer () 提供了一个方法实现,它只是引发异常。但是,在转换类的帮助下,Apache Camel 可以提供更有用的默认实现。

例如,如果要根据事件驱动的模式实施使用者,您可以通过扩展 DefaultEndpoint 并实施 createConsumer () 方法来实现端点。createPollingConsumer () 的实现继承自 DefaultEndpoint,其定义如下:

public PollingConsumer<E> createPollingConsumer() throws Exception {
    return new EventDrivenPollingConsumer<E>(this);
}
Copy to Clipboard Toggle word wrap

EventDrivenPollingConsumer 构造器获取对事件驱动的消费者的引用, 有效嵌套它并将其转换为轮询消费者。要实现转换,EventDrivenPollingConsumer 实例会缓冲传入的事件,并通过 receive ()、接收 (长超时)和 receive NoWait () 方法按需提供它们。

类似地,如果您根据轮询模式实施使用者,您可以通过扩展 DefaultPollingEndpoint 并实施 createPollingConsumer () 方法来实现端点。在这种情况下,createConsumer () 方法的实现继承自 DefaultPollingEndpoint,默认的实现会返回 DefaultScheduledPollConsumer 实例(将轮询消费者转换为事件驱动的消费者)。

ShutdownPrepared 接口

消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared 接口,这使得您的自定义消费者端点能够接收关闭通知。

例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared 接口的定义。

例 41.2. ShutdownPrepared Interface

package org.apache.camel.spi;

public interface ShutdownPrepared {

    void prepareShutdown(boolean forced);

}
Copy to Clipboard Toggle word wrap

ShutdownPrepared 接口定义了以下方法:

prepareShutdown

接收通知以在一个或多个阶段关闭消费者端点,如下所示:

  1. 恰当的关闭 mvapich-busybox where 强制参数具有值 false。尝试正常清理资源。例如,通过正常停止线程。
  2. 强制关闭 mvapich -wagonwhere 强制参数具有值 true。这意味着关闭已超时,因此您必须更积极地清理资源。这是在进程退出前清除资源的最后一个机会。

ShutdownAware 接口

消费者类可以选择实施 org.apache.camel.spi.ShutdownAware 接口,该接口与安全关闭机制交互,使消费者能够向消费者要求额外时间关闭。对于 SEDA 等组件,这通常需要此项,这些交换存储在内部队列中。通常,您希望在关闭 SEDA 使用者之前处理队列中的所有交换。

例 41.3 “ShutdownAware Interface” 显示 ShutdownAware 接口的定义。

例 41.3. ShutdownAware Interface

// Java
package org.apache.camel.spi;

import org.apache.camel.ShutdownRunningTask;

public interface ShutdownAware extends ShutdownPrepared {

    boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);

    int getPendingExchangesSize();
}
Copy to Clipboard Toggle word wrap

ShutdownAware 接口定义了以下方法:

deferShutdown

如果要延迟关闭消费者,从此方法返回 trueshutdownRunningTask 参数是一个 枚举,它可以是以下值之一:

  • ShutdownRunningTask.CompleteCurrentTaskOnly PROFILE-PROFILEfinish 处理当前由消费者线程池处理的交换,但不试图处理任何比该交换更多的交换。
  • ShutdownRunningTask.CompleteAllTasks PROFILE-PROFILEprocesss all pending Exchanges。例如,如果是 SEDA 组件,消费者将处理来自其传入队列的所有交换。
getPendingExchangesSize
指明消费者保留处理多少个交换。零值表示处理已完成,并可关闭消费者。

有关如何定义 ShutdownAware 方法的示例,请参考 例 41.7 “自定义线程实施”

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat