2.2. 多个输入
概述
标准路由仅使用 Java DSL 中的 from (EndpointURL)
语法从单一端点获取其输入。但是,如果您需要为路由定义多个输入,该怎么办?Apache Camel 提供了多个替代方案来指定多个路由输入。采取的方法取决于您要相互独立处理交换,还是希望以某种方式合并来自不同输入的交换(在这种情况下,您应该使用 “内容增强模式”一节)。
多个独立输入
指定多个输入的最简单方法是使用 from ()
DSL 命令的多参数形式,例如:
from("URI1", "URI2", "URI3").to("DestinationUri");
或者您可以使用以下等同的语法:
from("URI1").from("URI2").from("URI3").to("DestinationUri");
在这两个示例中,来自每个输入端点 URI 1、URI2 和 URI3 的交换都会相互独立处理,并在单独的线程中独立处理。实际上,您可以将前面的路由视为与以下三个独立路由相等:
from("URI1").to("DestinationUri"); from("URI2").to("DestinationUri"); from("URI3").to("DestinationUri");
分段路由
例如,您可能希望合并来自两个不同的消息传递系统传入的消息,并使用同一路由处理它们。在大多数情况下,您可以通过将路由划分为不同的片段来处理多个输入,如 图 2.5 “使用分段路由处理多个输入” 所示。
图 2.5. 使用分段路由处理多个输入
路由的初始片段取来自一些外部队列的 input,例如: activemq:Nyse
和 activemq:Nasdaq
categories-busybox,将传入的交换发送到内部端点 InternalUrl。第二个路由片段合并了传入的交换,从内部端点获取它们,并将它们发送到目标队列 activemq:USTxn
。InternalUrl 是仅用于在路由器应用程序中使用的 端点的 URL。以下类型的端点适合内部使用:
这些端点的主要目的是,您可以将路由的不同片段连接在一起。它们都提供了一种将多个输入合并到单个路由的有效方法。
直接端点
直接组件提供将链接在一起的最简单机制。直接组件的事件模型是 同步的,因此后续路由片段在与第一个网段相同的线程中运行。直接 URL 的一般格式为 :EndpointID
,端点 ID EndpointID 只是标识端点实例的唯一字母数字字符串。
例如,如果要从两个消息队列获取输入,activemq:Nyse
和 activemq:Nasdaq
,并将它们合并到单个消息队列 activemq:USTxn
中,您可以通过定义以下路由集来完成此操作:
from("activemq:Nyse").to("direct:mergeTxns"); from("activemq:Nasdaq").to("direct:mergeTxns"); from("direct:mergeTxns").to("activemq:USTxn");
其中,前两个路由取消息队列( Nyse
和 Nasdaq
)的输入,并将它们发送到端点 direct:mergeTxns
。最后一个队列组合了前两个队列的输入,并将组合的消息队列发送到 activemq:USTxn
队列。
直接端点的实现的行为如下:每当交换到达生成者端点时(例如 :direct:mergeTxns")
),直接端点将交换直接传递给具有相同端点 ID 的所有消费者端点(例如,来自"direct:mergeTxns")
)。直接端点只能用于在同一 Java 虚拟机(JVM)实例中属于同一 CamelContext
的路由之间进行通信。
SEDA 端点
SEDA 组件提供了一种替代机制,用于将路由链接到一起。您可以用与直接组件类似的方法使用它,但它具有不同的底层事件和线程模型,如下所示:
- 处理 SEDA 端点 并 不同步。也就是说,当您将交换发送到 SEDA producer 端点时,控制会立即返回到路由中的前面的处理器。
-
SEDA 端点包含一个队列缓冲区(
java.util.concurrent.BlockingQueue
类型),它在下一个路由段处理前存储所有传入交换。 - 每个 SEDA 使用者端点创建一个线程池(默认大小为 5),以处理来自阻塞队列的交换对象。
- SEDA 组件支持 竞争消费者 模式,确保每个传入交换仅处理一次,即使有多个消费者附加到特定端点。
使用 SEDA 端点的一个主要优点是路由可以更快响应,指向内置的消费者线程池。库存事务示例可以重新写入使用 SEDA 端点,而不是直接端点,如下所示:
from("activemq:Nyse").to("seda:mergeTxns"); from("activemq:Nasdaq").to("seda:mergeTxns"); from("seda:mergeTxns").to("activemq:USTxn");
本示例和直接示例之间的主要区别在于,在使用 SEDA 时,第二个路由网段(从 seda:mergeTxns
到 activemq:USTxn
)由五个线程池处理。
SEDA 比单纯地将路由段粘贴更多。暂存事件驱动的架构(SEDA)包含用于构建更易管理的多线程应用程序的设计原则。Apache Camel 中 SEDA 组件的目的只是使您能够将此设计理念应用到您的应用。有关 SEDA 的详情,请参考 http://www.eecs.harvard.edu/~mdw/proj/seda/。
VM 端点
虚拟机组件与 SEDA 端点非常相似。唯一的区别是,而 SEDA 组件仅限于将路由段与同一 CamelContext
内的连接在一起,虚拟机组件使您能够将来自不同 Apache Camel 应用程序的路由链接在一起,只要它们在同一 Java 虚拟机中运行。
库存事务示例可以重新写入使用虚拟机端点而不是 SEDA 端点,如下所示:
from("activemq:Nyse").to("vm:mergeTxns"); from("activemq:Nasdaq").to("vm:mergeTxns");
在单独的路由器应用程序中(在同一 Java 虚拟机中运行),您可以定义路由的第二个片段,如下所示:
from("vm:mergeTxns").to("activemq:USTxn");
内容增强模式
内容增强模式定义了处理路由的多个输入的基本方式。当交换进入增强器处理器时,增强器会联系外部资源来检索信息,然后添加到原始消息中。在此模式中,外部资源实际上代表消息的第二个输入。
例如,假设您正在编写处理信用请求的应用。在处理信用请求前,您需要为客户分配信用评级的数据,其中 ratings 数据存储在 目录中的一个文件中( src/data/ratings
)。您可以使用 pollEnrich ()
模式和 GroupedExchangeAggregationStrategy
聚合策略将传入的信用请求与 ratings 文件中的数据合并,如下所示:
from("jms:queue:creditRequests") .pollEnrich("file:src/data/ratings?noop=true", new GroupedExchangeAggregationStrategy()) .bean(new MergeCreditRequestAndRatings(), "merge") .to("jms:queue:reformattedRequests");
其中 GroupedExchangeAggregationStrategy
类是来自 org.apache.camel.processor.aggregate
软件包的标准聚合策略,将每个新交换添加到 java.util.List
实例,并将生成的列表存储在 Exchange.GROUPED_EXCHANGE
Exchange 属性中。在这种情况下,列表包含两个元素:原始交换(来自 creditRequests
JMS 队列);以及增强交换(来自文件端点)。
要访问分组的交换,您可以使用类似如下的代码:
public class MergeCreditRequestAndRatings { public void merge(Exchange ex) { // Obtain the grouped exchange List<Exchange> list = ex.getProperty(Exchange.GROUPED_EXCHANGE, List.class); // Get the exchanges from the grouped exchange Exchange originalEx = list.get(0); Exchange ratingsEx = list.get(1); // Merge the exchanges ... } }
此应用的另一种方法是将合并代码直接放入自定义聚合策略类的实施中。
有关内容增强模式的详情,请参考 第 10.1 节 “内容增强”。