8.13. 多播
概述
图 8.9 “多播模式” 中显示的 多播 模式是 接收者列表 与固定目的地模式的一种变体,它与 InOut 消息交换模式兼容。这与接收者列表不同,它只与 InOnly Exchange 模式兼容。
图 8.9. 多播模式
使用自定义聚合策略进行多播
虽然多播处理器会收到多个 Out 消息,以响应原始请求(每个收件人之一),原始调用者只预期收到一个回复。因此,消息交换的回复分支存在固有不匹配,并且为了克服这种不匹配,您必须为多播处理器提供自定义 聚合策略。聚合策略类负责将所有 Out 消息聚合到一个回复消息中。
考虑电子拍卖行服务的示例,销售者为购买者列表提供销售项目。买家都参与此项项目的投标,销售者会自动选择价格最高的标识。您可以使用 multicast ()
DSL 命令实施将优惠分发给固定购买器列表的逻辑,如下所示:
from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()). to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中,销售者由端点、cxf:bean:offer
表示,而买方由端点 cxf:bean:Buyer1
,cxf:bean:Buyer2
,cxf:bean:Buyer3
表示。要合并从不同购买者收到的 bid,多播处理器使用聚合策略 HighestBidAggregationStrategy
。您可以在 Java 中实现 HighestBidAggregationStrategy
,如下所示:
// Java import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.Exchange; public class HighestBidAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { float oldBid = oldExchange.getOut().getHeader("Bid", Float.class); float newBid = newExchange.getOut().getHeader("Bid", Float.class); return (newBid > oldBid) ? newExchange : oldExchange; } }
假定买方将 bid price 插入名为 Bid
的标头中。有关自定义聚合策略的详情,请参考 第 8.5 节 “聚合器”。
并行处理
默认情况下,多播处理器在另一个阶段调用每个接收者端点(按 to ()
命令中列出的顺序)。在某些情况下,这会导致无法接受长延迟。为了避免这些长时间的延迟,您可以通过添加 parallelProcessing ()
子句来启用并行处理。例如,要在电子途径示例中启用并行处理,请按如下所示定义路由:
from("cxf:bean:offer") .multicast(new HighestBidAggregationStrategy()) .parallelProcessing() .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
现在,多播处理器调用购买器端点,使用每个端点有一个线程的线程池。
如果要自定义调用买方端点的线程池的大小,您可以调用 executorService ()
方法来指定您自己的自定义 executor 服务。例如:
from("cxf:bean:offer")
.multicast(new HighestBidAggregationStrategy())
.executorService(MyExecutor)
.to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中 MyExecutor 是 java.util.concurrent.ExecutorService 类型的实例。
当交换具有 InOut 模式时,使用聚合策略来聚合回复消息。默认聚合策略采用最新的回复消息,并丢弃之前的回复。例如,在以下路由中,自定义策略 MyAggregationStrategy
用于聚合来自端点、direct:a
、direct:b
和 direct:c
的回复:
from("direct:start") .multicast(new MyAggregationStrategy()) .parallelProcessing() .timeout(500) .to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");
XML 配置示例
以下示例演示了如何在 XML 中配置类似的路由,该路由使用自定义聚合策略和自定义线程执行器:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="cxf:bean:offer"/> <multicast strategyRef="highestBidAggregationStrategy" parallelProcessing="true" threadPoolRef="myThreadExcutor"> <to uri="cxf:bean:Buyer1"/> <to uri="cxf:bean:Buyer2"/> <to uri="cxf:bean:Buyer3"/> </multicast> </route> </camelContext> <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/> <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/> </beans>
如果 parallelProcessing
属性和 threadPoolRef
属性都是可选的。只有在您要自定义多播处理器的线程行为时才需要设置它们。
将自定义处理应用到传出消息
多播模式 复制源 Exchange 和多播副本。默认情况下,路由器会复制源消息。在匹配项中,原始消息的标头和有效负载仅通过引用复制,以便原始消息的结果副本链接。由于应链接多播消息的副本,因此如果消息正文不可行,则无法应用自定义处理。应用到发送到一个端点的副本的自定义处理也应用于发送到其他端点的副本。
虽然 多播
语法允许您在 多播
子句中调用 process
DSL 命令,但这不意义意义,但与 onPrepare
(事实上来说,进程
DSL 命令无效)。
使用 onPrepare 在准备消息时执行自定义逻辑
如果要将自定义处理应用到每个消息副本,然后再发送到端点,您可以在 多播
句中调用 onPrepare
DSL 命令。onPrepare
命令仅在消息 被禁止后 插入自定义处理器,且仅在消息被分配到其端点 之前。例如,在以下路由中,在发送到 direct:a
的消息上调用
处理器也会在发送到 CustomProc
处理器,自定义Procdirect:b
的消息上调用。
from("direct:start") .multicast().onPrepare(new CustomProc()) .to("direct:a").to("direct:b");
onPrepare
DSL 命令的一个常见用例是执行对消息的某些或所有元素的深度副本。例如,以下 CustomProc
处理器类对消息正文执行深度副本,其中消息正文假定为 type、BrodyType,并且深度副本由方法
执行。
BodyType
.deepCopy ()
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
您可以使用 onPrepare
实施在 Exchange
多播之前您要执行的任何类型的自定义逻辑。
建议做法是为不可变对象设计。
例如,如果您有一个可变的消息正文,作为这个 Animal 类:
public class Animal implements Serializable { private int id; private String name; public Animal() { } public Animal(int id, String name) { this.id = id; this.name = name; } public Animal deepClone() { Animal clone = new Animal(); clone.setId(getId()); clone.setName(getName()); return clone; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return id + " " + name; } }
然后,我们可以创建一个深度克隆消息正文的克隆处理器:
public class AnimalDeepClonePrepare implements Processor { public void process(Exchange exchange) throws Exception { Animal body = exchange.getIn().getBody(Animal.class); // do a deep clone of the body which wont affect when doing multicasting Animal clone = body.deepClone(); exchange.getIn().setBody(clone); } }
然后,我们可以使用 Prepare 选项在 多播路由 中使用 AnimalDeepClone Prepare
类,如下所示:
from("direct:start") .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");
和 XML DSL 中的相同示例
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- use on prepare with multicast --> <multicast onPrepareRef="animalDeepClonePrepare"> <to uri="direct:a"/> <to uri="direct:b"/> </multicast> </route> <route> <from uri="direct:a"/> <process ref="processorA"/> <to uri="mock:a"/> </route> <route> <from uri="direct:b"/> <process ref="processorB"/> <to uri="mock:b"/> </route> </camelContext> <!-- the on prepare Processor which performs the deep cloning --> <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/> <!-- processors used for the last two routes, as part of unit test --> <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/> <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>
选项
multicast
DSL 命令支持以下选项:
Name | 默认值 | 描述 |
| 引用 AggregationStrategy 用来将来自多播的回复编译到多播的单一传出消息。???默认情况下,Camel 将使用最后一个回复作为传出消息。 | |
|
当将 POJO 用作 | |
|
|
在使用 POJO 作为 |
|
| 如果启用,则会同时向多播发送消息。注意 caller 线程仍然会等待所有消息被完全处理,然后再继续。它只发送和处理来自多播的回复,同时进行。 |
|
|
如果启用,则 |
| 指的是用于并行处理的自定义线程池。请注意,如果您设置了这个选项,则并行处理会被自动简化,您不需要启用该选项。 | |
|
| Camel 2.2: 当出现异常时,是否停止立即处理处理。如果禁用,Camel 会将消息发送到所有多播,无论它们是否失败。您可以在 AggregationStrategy 类中处理异常,您可以在其中完全控制如何处理这种情况。 |
|
| 如果启用,Camel 将按照顺序发出的顺序处理回复的回复,如它们返回的顺序。如果禁用,Camel 会按照与多播相同的顺序处理回复。 |
|
Camel 2.5: 设置以毫秒为单位指定的总超时。如果 多播 无法发送和接收给定时间段内的所有回复,则超时触发器和 多播 会中断并继续。请注意,如果您提供 TimeoutAwareAggregationStrategy,则在中断前会调用 | |
| Camel 2.8: 请参阅自定义处理器,以准备每个多播的交换副本。这可让您执行任何自定义逻辑,如深度克隆消息有效负载(如果需要)。 | |
|
| Camel 2.8: 是否应该共享工作单元。如需了解更多详细信息,请参阅 第 8.4 节 “Splitter” 相同的选项。 |