此内容没有您所选择的语言版本。

8.13. Scatter-Gather


Scatter-Gather

The scatter-gather pattern, as shown in Figure 8.12, “Scatter-Gather Pattern”, enables you to route messages to a number of dynamically specified recipients and re-aggregate the responses back into a single message.

Figure 8.12. Scatter-Gather Pattern

Scatter-Gather Pattern

Dynamic scatter-gather example

The following example outlines an application that gets the best quote for beer from several different vendors. The examples uses a dynamic Recipient List to request a quote from all vendors and an Aggregator to pick the best quote out of all the responses. The routes for this application are defined as follows:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <recipientList>
      <header>listOfVendors</header>
    </recipientList>
  </route>
  <route>
    <from uri="seda:quoteAggregator"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000">
      <correlationExpression>
        <header>quoteRequestId</header>
      </correlationExpression>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>
In the first route, the Recipient List looks at the listOfVendors header to obtain the list of recipients. Hence, the client that sends messages to this application needs to add a listOfVendors header to the message. Example 8.1, “Messaging Client Sample” shows some sample code from a messaging client that adds the relevant header data to outgoing messages.

Example 8.1. Messaging Client Sample

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3");
headers.put("quoteRequestId", "quoteRequest-1");
template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers);
The message would be distributed to the following endpoints: bean:vendor1, bean:vendor2, and bean:vendor3. These beans are all implemented by the following class:
public class MyVendor {
    private int beerPrice;
    
    @Produce(uri = "seda:quoteAggregator")
    private ProducerTemplate quoteAggregator;
            
    public MyVendor(int beerPrice) {
        this.beerPrice = beerPrice;
    }
        
    public void getQuote(@XPath("/quote_request/@item") String item, Exchange exchange) throws Exception {
        if ("beer".equals(item)) {
            exchange.getIn().setBody(beerPrice);
            quoteAggregator.send(exchange);
        } else {
            throw new Exception("No quote available for " + item);
        }
    }
}
The bean instances, vendor1, vendor2, and vendor3, are instantiated using Spring XML syntax, as follows:
<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/>

<bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>1</value>
  </constructor-arg>
</bean>

<bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>2</value>
  </constructor-arg>
</bean>

<bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor">
  <constructor-arg>
    <value>3</value>
  </constructor-arg>
</bean>
Each bean is initialized with a different price for beer (passed to the constructor argument). When a message is sent to each bean endpoint, it arrives at the MyVendor.getQuote method. This method does a simple check to see whether this quote request is for beer and then sets the price of beer on the exchange for retrieval at a later step. The message is forwarded to the next step using POJO Producing (see the @Produce annotation).
At the next step, we want to take the beer quotes from all vendors and find out which one was the best (that is, the lowest). For this, we use an Aggregator with a custom aggregation strategy. The Aggregator needs to identify which messages are relevant to the current quote, which is done by correlating messages based on the value of the quoteRequestId header (passed to the correlationExpression). As shown in Example 8.1, “Messaging Client Sample”, the correlation ID is set to quoteRequest-1 (the correlation ID should be unique). To pick the lowest quote out of the set, you can use a custom aggregation strategy like the following:
public class LowestQuoteAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;
        }

        if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
            return oldExchange;
        } else {
            return newExchange;
        }
    }
}

Static scatter-gather example

You can specify the recipients explicitly in the scatter-gather application by employing a static Recipient List. The following example shows the routes you would use to implement a static scatter-gather scenario:
from("direct:start").multicast().to("seda:vendor1", "seda:vendor2", "seda:vendor3");

from("seda:vendor1").to("bean:vendor1").to("seda:quoteAggregator");
from("seda:vendor2").to("bean:vendor2").to("seda:quoteAggregator");
from("seda:vendor3").to("bean:vendor3").to("seda:quoteAggregator");

from("seda:quoteAggregator")
    .aggregate(header("quoteRequestId"), new LowestQuoteAggregationStrategy()).to("mock:result")
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.