8.15. scatter-Gather


scatter-Gather

scatter-gather 模式 (如 图 8.11 “scatter-Gather Pattern” 所示)可让您将信息路由到多个动态指定的接收方,并将响应重新整理回单个消息。

图 8.11. scatter-Gather Pattern

广播聚合

动态 scatter-gather 示例

以下示例概述了从多个不同供应商获得最佳引用的应用程序。这个示例使用动态 第 8.3 节 “接收者列表” 来请求来自所有供应商和 第 8.5 节 “聚合器” 的引用,以选择出所有响应的最佳引用。此应用程序的路由定义如下:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <recipientList>
      <header>listOfVendors</header>
    </recipientList>
  </route>
  <route>
    <from uri="seda:quoteAggregator"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
      <correlationExpression>
        <header>quoteRequestId</header>
      </correlationExpression>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>

在第一个路由中,第 8.3 节 “接收者列表” 查看 listOfVendors 标头来获取接收者列表。因此,向此应用发送消息的客户端需要向消息中添加 listOfVendors 标头。例 8.1 “消息传递客户端示例” 显示消息传递客户端的一些示例代码,这些代码将相关的标头数据添加到传出消息。

例 8.1. 消息传递客户端示例

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3");
headers.put("quoteRequestId", "quoteRequest-1");
template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);

该消息将分发到以下端点: bean:vendor1、 bean:vendor2bean:vendor3。这些 Bean 都由以下类实现:

public class MyVendor {
    private int beerPrice;

    @Produce(uri = "seda:quoteAggregator")
    private ProducerTemplate quoteAggregator;

    public MyVendor(int beerPrice) {
        this.beerPrice = beerPrice;
    }

    public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception {
        if ("beer".equals(item)) {
            exchange.getIn().setBody(beerPrice);
            quoteAggregator.send(exchange);
        } else {
            throw new Exception("No quote available for " + item);
        }
    }
}

bean 实例、vendor1、 vendor2vendor3 使用 Spring XML 语法进行实例化,如下所示:

<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/>

<bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>1</value>
  </constructor-arg>
</bean>

<bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>2</value>
  </constructor-arg>
</bean>

<bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>3</value>
  </constructor-arg>
</bean>

每个 bean 都使用不同价格进行初始化(传递给构造器参数)。当消息发送到每个 bean 端点时,它会到达 MyVendor.getQuote 方法。这个方法进行简单的检查来查看此引用请求是否为 beer,然后设置交换上的价格,以便稍后进行检索。使用 POJO Producing 将消息转发到下一步(请参阅 @Produce 注释)。

下一步,我们希望从所有供应商处获取 beer quotes,并找出哪个是最佳(即最低)。为此,我们使用带有自定义聚合策略的 第 8.5 节 “聚合器”第 8.5 节 “聚合器” 需要识别与当前引用相关的消息,这通过根据 quoteRequestId 标头的值(传递至 correlationExpression)来修正消息。如 例 8.1 “消息传递客户端示例” 所示,关联 ID 设置为 quoteRequest-1 ( 关联 ID 应该是唯一的)。要选择集合中的最低引用,您可以使用自定义聚合策略,如下所示:

public class LowestQuoteAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;
        }

        if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
            return oldExchange;
        } else {
            return newExchange;
        }
    }
}

静态 scatter-gather 示例

您可以使用静态 第 8.3 节 “接收者列表” 在 scatter-gather 应用程序中明确指定接收者。以下示例显示了您要用来实现静态 scatter-gather 场景的路由:

from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3");

from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator");
from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator");
from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator");

from("seda:quoteAggregator")
    .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.