2.2. 多个输入
概述
标准路由使用来自单个端点的输入,使用 Java DSL 中的 from (EndpointURL)
语法。但是,如果您需要为路由定义多个输入,该怎么办?Apache Camel 提供了几个替代方案来为路由指定多个输入。进行的方法取决于您希望交换相互独立处理,还是希望以某种方式合并不同输入的交换(在这种情况下,您应该使用 “内容增强模式”一节)。
多个独立输入
指定多个输入的最简单方法是使用 from ()
DSL 命令的多参数形式,例如:
from("URI1", "URI2", "URI3").to("DestinationUri");
或者,您可以使用以下等效语法:
from("URI1").from("URI2").from("URI3").to("DestinationUri");
在这两个示例中,从每个输入端点、URI1、URI 2 和 URI3 交换都是相互独立处理的,在单独的线程中。实际上,您可以将上述路由视为等同于以下三个独立的路由:
from("URI1").to("DestinationUri"); from("URI2").to("DestinationUri"); from("URI3").to("DestinationUri");
分段路由
例如,您可能希望合并来自两个不同的消息传递系统传入的消息,并使用同一路由处理它们。在大多数情况下,您可以通过将路由划分为网段来处理多个输入,如 图 2.5 “使用分段路由处理多个输入” 所示。
图 2.5. 使用分段路由处理多个输入
路由的初始片段取来自一些外部队列的 inputs (例如: activemq:Nyse
和 activemq:Nasdaq
HEKETI-wagon),并将传入的交换发送到内部端点 InternalUrl。第二个路由片段合并了传入的交换,将其从内部端点中获取,并将它们发送到目标队列 activemq:USTxn
。InternalUrl 是端点的 URL,仅用于 在路由器 应用程序中使用。以下类型的端点适合内部使用:
这些端点的主要目的是使您能够将不同的部分绑定到路由的不同部分。它们都提供了一种将多个输入合并为一个路由的有效方法。
直接端点
直接组件提供将连接在一起的路由的最简单机制。直接组件的事件模型是 同步的,因此后续的路由片段在与第一个网段相同的线程中运行。直接 URL 的一般格式是 direct:EndpointID
,其中端点 ID EndpointID 只是用于标识端点实例的唯一字母数字字符字符串。
例如,如果要从两个消息队列( activemq:Nyse
和 activemq:Nasdaq
)获取输入,并将它们合并到单个消息队列 activemq:USTxn
中,您可以通过定义以下一组路由来完成此操作:
from("activemq:Nyse").to("direct:mergeTxns"); from("activemq:Nasdaq").to("direct:mergeTxns"); from("direct:mergeTxns").to("activemq:USTxn");
其中前两个路由从消息队列( Nyse
和 Nasdaq
)获取输入,并将它们发送到端点,即 direct:mergeTxns
。最后队列组合了来自前两个队列的输入,并将组合消息流发送到 activemq:USTxn
队列。
直接端点的实现的行为如下:每当交换到达生成者端点时(例如,to ("direct:mergeTxns")
),直接端点将交换直接传递给具有相同端点 ID (例如,from ("direct:mergeTxns")
)的所有消费者端点。直接端点只能用于在属于同一 Java 虚拟机(JVM)实例中同一 CamelContext
的路由之间进行通信。
SEDA 端点
SEDA 组件提供了链接在一起路由的备选机制。您可以像直接组件一样使用它,但它有不同的底层事件和线程模型,如下所示:
- 对 SEDA 端点的处理 不是 同步的。也就是说,当您将交换发送到 SEDA 生成端点时,控制会立即返回到路由中的前一个处理器。
-
SEDA 端点包含一个队列缓冲区(
java.util.concurrent.BlockingQueue
类型),它在由下一个路由段处理之前存储所有传入交换。 - 每个 SEDA 使用者端点都会创建一个线程池(默认大小为 5),以处理来自阻塞队列的交换对象。
- SEDA 组件支持 竞争消费者 模式,这样可保证仅处理一次传入的交换,即使将多个消费者附加到特定端点。
使用 SEDA 端点的一个主要优点是路由可以更快响应,并影响内置消费者线程池。库存事务示例可以重新编写为使用 SEDA 端点,而不是直接端点,如下所示:
from("activemq:Nyse").to("seda:mergeTxns"); from("activemq:Nasdaq").to("seda:mergeTxns"); from("seda:mergeTxns").to("activemq:USTxn");
这个示例和直接示例的主要区别在于,使用 SEDA 时,第二个路由片段(从 seda:mergeTxns
到 activemq:USTxn
)由五个线程池处理。
SEDA 比简单地将路由段粘贴到一起。分阶段的事件驱动架构(SEDA)包括一个设计原则,用于构建更易于管理的多线程应用程序。Apache Camel 中的 SEDA 组件的目的是使您能够将此设计理念应用到您的应用程序。有关 SEDA 的详情,请参考 http://www.eecs.harvard.edu/~mdw/proj/seda/。
VM 端点
虚拟机组件与 SEDA 端点非常相似。唯一的区别是,当 SEDA 组件从同一 CamelContext
内链接在一起的路由片段时,虚拟机组件可让您将不同 Apache Camel 应用程序的路由连接在一起,只要它们在同一个 Java 虚拟机中运行。
库存事务示例可以重新编写为使用虚拟机端点而不是 SEDA 端点,如下所示:
from("activemq:Nyse").to("vm:mergeTxns"); from("activemq:Nasdaq").to("vm:mergeTxns");
在单独的路由器应用程序中(在同一 Java 虚拟机中运行),您可以定义路由的第二个片段,如下所示:
from("vm:mergeTxns").to("activemq:USTxn");
内容增强模式
内容增强模式定义了处理对路由的多个输入的根本不同方法。当交换进入增强处理器时,增强器会联系外部资源以检索信息,然后添加到原始消息中。在这种模式中,外部资源有效地代表对消息的第二个输入。
例如,假设您正在编写处理信用请求的应用。在处理信用请求之前,您需要使用为客户分配信用评级的数据对其进行补充,其中评级数据存储在目录 src/data/ratings
的文件中。您可以使用 pollEnrich ()
模式和 GroupedExchangeAggregationStrategy
聚合策略将传入的信用卡请求与 ratings 文件中的数据合并,如下所示:
from("jms:queue:creditRequests") .pollEnrich("file:src/data/ratings?noop=true", new GroupedExchangeAggregationStrategy()) .bean(new MergeCreditRequestAndRatings(), "merge") .to("jms:queue:reformattedRequests");
其中 GroupedExchangeAggregationStrategy
类是来自 org.apache.camel.processor.aggregate
软件包的标准聚合策略,将每个新交换添加到 java.util.List
实例,并将生成的列表存储在 Exchange.GROUPED_EXCHANGE
Exchange 属性中。在本例中,列表中包含两个元素:原始交换(来自 creditRequests
JMS 队列);以及增强器交换(从文件端点)。
要访问分组的交换,您可以使用类似如下的代码:
public class MergeCreditRequestAndRatings { public void merge(Exchange ex) { // Obtain the grouped exchange List<Exchange> list = ex.getProperty(Exchange.GROUPED_EXCHANGE, List.class); // Get the exchanges from the grouped exchange Exchange originalEx = list.get(0); Exchange ratingsEx = list.get(1); // Merge the exchanges ... } }
此应用的替代方法是将合并代码直接放入自定义聚合策略类的实现中。
有关内容增强模式的详情,请参考 第 10.1 节 “内容增强”。