8.13. 多播
概述
图 8.9 “多播模式” 中显示的 多播 模式是带有固定目的地模式 的接收者列表 的变化,它与 InOut 消息交换模式兼容。这与接收者列表相反,其仅与 InOnly Exchange 模式兼容。
图 8.9. 多播模式
使用自定义聚合策略的多播
多播处理器在响应原始请求时接收多个 Out 消息(每个接收方一个),而原始调用者仅预期接收 单个 回复。因此,消息交换的回复分支存在固有的不匹配,并且为了克服这种不匹配,必须为多播处理器提供自定义 聚合策略。聚合策略类负责将所有 Out 消息聚合到一个回复消息。
例如,电子广告服务的示例,销售者为购买者提供销售项目,以购买者为购买者列表提供项目。买家各自置于项目投标中,销售者会自动选择具有最高价格的奖金。您可以使用 multicast ()
DSL 命令实现将所提供的服务分发给固定购买列表的逻辑,如下所示:
from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()). to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中,销售者由端点 cxf:bean:offer
表示,而买方由端点 cxf:bean:Buyer1
表示,cxf:bean:Buyer2
,cxf:bean:Buyer3
。为了整合不同买方收到的投标,多播处理器使用聚合策略 HighestBidAggregationStrategy
。您可以在 Java 中实施 HighestBidAggregationStrategy
,如下所示:
// Java import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.Exchange; public class HighestBidAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { float oldBid = oldExchange.getOut().getHeader("Bid", Float.class); float newBid = newExchange.getOut().getHeader("Bid", Float.class); return (newBid > oldBid) ? newExchange : oldExchange; } }
假设买方将标价插入到名为 Bid
的标头中。有关自定义聚合策略的详情,请参考 第 8.5 节 “聚合器”。
并行处理
默认情况下,多播处理器会在另一个后调用每个接收者端点(按 to ()
命令中列出的顺序)。在某些情况下,这可能导致不可接受的延迟。为避免这些较长的延迟时间,您可以通过添加 parallelProcessing ()
子句来启用并行处理。例如,要在电子调整示例中启用并行处理,请按如下所示定义路由:
from("cxf:bean:offer") .multicast(new HighestBidAggregationStrategy()) .parallelProcessing() .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中多播处理器现在调用 buyer 端点,它使用一个线程池,每个端点都有一个线程。
如果要自定义调用购买端点的线程池的大小,您可以调用 executorService ()
方法来指定您自己的自定义 executor 服务。例如:
from("cxf:bean:offer")
.multicast(new HighestBidAggregationStrategy())
.executorService(MyExecutor)
.to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中 MyExecutor 是 java.util.concurrent.ExecutorService 类型的实例。
当交换具有 InOut 模式时,使用聚合策略来聚合回复消息。默认聚合策略采用最新的回复信息,并丢弃早期回复。例如,在以下路由中,自定义策略 MyAggregationStrategy
用于聚合来自端点的回复、direct:a
、direct:b
和 direct:c
:
from("direct:start") .multicast(new MyAggregationStrategy()) .parallelProcessing() .timeout(500) .to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");
XML 配置示例
以下示例演示了如何在 XML 中配置类似的路由,其中路由使用自定义聚合策略和自定义线程执行器:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="cxf:bean:offer"/> <multicast strategyRef="highestBidAggregationStrategy" parallelProcessing="true" threadPoolRef="myThreadExcutor"> <to uri="cxf:bean:Buyer1"/> <to uri="cxf:bean:Buyer2"/> <to uri="cxf:bean:Buyer3"/> </multicast> </route> </camelContext> <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/> <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/> </beans>
其中 parallelProcessing
属性和 threadPoolRef
属性都是可选的。只有在您要自定义多播处理器的线程行为时,才需要设置它们。
将自定义处理应用到传出消息
多播模式 复制源交换,并多播副本。默认情况下,路由器制作源消息的粗略副本。在 Show 副本中,原始消息的标头和有效负载仅通过引用复制,因此原始消息生成的副本被链接。因为应链接多播消息的复制,因此如果消息正文是可变的,您将无法应用自定义处理。您应用到发送到一个端点的副本的自定义处理也会应用到发送到所有其他端点的副本。
虽然 多播
语法允许您在 multicast
子句中调用 process
DSL 命令,但这并不 具有与 onPrepare
相同的效果(实际上,进程
DSL 命令无效)。
在准备信息时,使用 onPrepare 执行自定义逻辑
如果要在将自定义处理发送到其端点之前对每个消息副本应用自定义处理,您可以在 multicast
子句中调用 onPrepare
DSL 命令。onPrepare
命令仅在消息被压缩 后 插入自定义处理器,并在将消息分配给其端点 之前。例如,在以下路由中,在发送到 direct:a
的消息上调用
处理器,而在发送到 CustomProc
direct:b
的消息上也会调用 CustomProc 处理器。
from("direct:start") .multicast().onPrepare(new CustomProc()) .to("direct:a").to("direct:b");
onPrepare
DSL 命令的常见用例是对消息的一些或所有元素执行深度副本。例如,以下 CustomProc
处理器类执行消息正文的深度副本,其中消息正文假定为类型为 BodyType
,而深度副本则由方法 BodyType.deepCopy ()
执行。
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
您可以使用 onPrepare
实现在 交换
多播前要执行的任何类型的自定义逻辑。
建议为不可变对象设计。
例如,如果您有一个 mutable 消息正文,作为这个 Animal 类:
public class Animal implements Serializable { private int id; private String name; public Animal() { } public Animal(int id, String name) { this.id = id; this.name = name; } public Animal deepClone() { Animal clone = new Animal(); clone.setId(getId()); clone.setName(getName()); return clone; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return id + " " + name; } }
然后,我们可以创建一个深度克隆处理器来克隆消息正文:
public class AnimalDeepClonePrepare implements Processor { public void process(Exchange exchange) throws Exception { Animal body = exchange.getIn().getBody(Animal.class); // do a deep clone of the body which wont affect when doing multicasting Animal clone = body.deepClone(); exchange.getIn().setBody(clone); } }
然后,我们可以使用 onPrepare
选项在 multicast 路由中使用 AnimalDeepClonePrepare 类,如下所示:
from("direct:start") .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");
和 XML DSL 中的相同示例
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- use on prepare with multicast --> <multicast onPrepareRef="animalDeepClonePrepare"> <to uri="direct:a"/> <to uri="direct:b"/> </multicast> </route> <route> <from uri="direct:a"/> <process ref="processorA"/> <to uri="mock:a"/> </route> <route> <from uri="direct:b"/> <process ref="processorB"/> <to uri="mock:b"/> </route> </camelContext> <!-- the on prepare Processor which performs the deep cloning --> <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/> <!-- processors used for the last two routes, as part of unit test --> <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/> <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>
选项
multicast
DSL 命令支持以下选项:
Name | 默认值 | 描述 |
| 用于将多播中的回复汇编为来自多播的 AggregationStrategy,再到来自 多播 的单一传出消息。默认情况下,Camel 将使用最后一个回复作为传出消息。 | |
|
当将 POJO 用作 | |
|
|
当将 POJO 用作 |
|
| 如果启用,将消息同时发送到多播。请注意,调用者线程仍然会等待所有消息被完全处理,然后再继续。它只会发送和处理同时发生的多播的回复。 |
|
|
如果启用,则 |
| 指的是用于并行处理的自定义线程池。请注意,如果您设置了这个选项,则并行处理是自动的,您不必启用该选项。 | |
|
| Camel 2.2 : 在出现异常情况时,是否立即停止继续处理。如果禁用,则 Camel 会将消息发送到所有多播,无论它们是否失败。您可以在 AggregationStrategy 类中处理异常,您可以完全控制如何处理它。 |
|
| 如果启用,Camel 将按其返回的顺序处理回复,例如:如果禁用,Camel 将以与多播相同的顺序处理回复。 |
|
Camel 2.5: 设置以秒为单位指定的总超时。如果 多播 无法发送和处理给定时间段内的所有回复,则超时触发器 和多播 中断并继续。请注意,如果您提供了一个 TimeoutAwareAggregationStrategy,则在中断前调用 | |
| Camel 2.8: 请参阅自定义处理器准备每个多播将收到的 Exchange 副本。这可让您执行任何自定义逻辑,如在需要时深度获取消息有效负载。 | |
|
| Camel 2.8: 是否应共享工作单元。详情请查看 第 8.4 节 “Splitter” 中的同一选项。 |