8.15. scatter-Gather
scatter-Gather
如 图 8.11 “scatter-Gather Pattern” 所示,scatter-gather 模式 可让您将消息路由到多个动态指定的接收者,并将响应重新整合回单个消息。
图 8.11. scatter-Gather Pattern
动态 scatter-gather 示例
下例概述了从多个不同供应商获取最佳引用的应用程序。示例使用动态 第 8.3 节 “接收者列表” 来请求来自所有供应商和 第 8.5 节 “聚合器” 的报价,以选择所有响应中的最佳引用。此应用程序的路由定义如下:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <recipientList> <header>listOfVendors</header> </recipientList> </route> <route> <from uri="seda:quoteAggregator"/> <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000"> <correlationExpression> <header>quoteRequestId</header> </correlationExpression> <to uri="mock:result"/> </aggregate> </route> </camelContext>
在第一个路由中,第 8.3 节 “接收者列表” 会查看 listOfVendors
标头来获取接收者列表。因此,向此应用发送消息的客户端需要向消息添加一个 listOfVendors
标头。例 8.1 “消息传递客户端示例” 显示消息传递客户端中的一些示例代码,它将相关标头数据添加到传出消息中。
例 8.1. 消息传递客户端示例
Map<String, Object> headers = new HashMap<String, Object>(); headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3"); headers.put("quoteRequestId", "quoteRequest-1"); template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);
消息将分发到以下端点: bean:vendor1
,bean:vendor2
, 和 bean:vendor3
。这些 Bean 都由以下类实现:
public class MyVendor { private int beerPrice; @Produce(uri = "seda:quoteAggregator") private ProducerTemplate quoteAggregator; public MyVendor(int beerPrice) { this.beerPrice = beerPrice; } public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception { if ("beer".equals(item)) { exchange.getIn().setBody(beerPrice); quoteAggregator.send(exchange); } else { throw new Exception("No quote available for " + item); } } }
bean 实例、vendor1、
vendor2
和 vendor3
使用 Spring XML 语法进行实例化,如下所示:
<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/> <bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor"> <constructor-arg> <value>1</value> </constructor-arg> </bean> <bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor"> <constructor-arg> <value>2</value> </constructor-arg> </bean> <bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor"> <constructor-arg> <value>3</value> </constructor-arg> </bean>
每个 bean 都使用不同的 beer (传递至构造器参数)的不同价格进行初始化。当消息发送到每个 bean 端点时,它会到达 MyVendor.getQuote
方法。此方法通过简单的检查来查看此报价请求是否是 beer,然后稍后的步骤对交换进行检索。使用 POJO Producing 将消息转发到下一步(请参阅 @Produce 注释)。
下一步,我们想要从所有供应商中取胜者标语,并找出哪个最能(即最低)。为此,我们使用带有自定义聚合策略的 第 8.5 节 “聚合器”。第 8.5 节 “聚合器” 需要识别与当前引用相关的信息,这些信息会根据 quoteRequestId 标头的值进行,该消息根据 quoteRequestId
标头的值进行(传递至 correlationExpression
)。如 例 8.1 “消息传递客户端示例” 中所示,关联 ID 设置为 quoteRequest-1 (
关联 ID 应是唯一的)。要从集合中选择最低报价,您可以使用如下自定义聚合策略:
public class LowestQuoteAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { // the first time we only have the new exchange if (oldExchange == null) { return newExchange; } if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) { return oldExchange; } else { return newExchange; } } }
静态 scatter-gather 示例
您可以使用静态 第 8.3 节 “接收者列表” 在 scatter-gather 应用程序中明确指定接收者。以下示例显示了用于实现静态 scatter-gather 场景的路由:
from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3"); from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator"); from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator"); from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator"); from("seda:quoteAggregator") .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")