8.13. 多播
概述
多播 模式在 图 8.9 “多播模式” 中显示,它是带有固定目的地模式 的接收者列表 的一种变体,它与 InOut 消息交换模式兼容。这与接收者列表不同,它只与 InOnly Exchange 模式兼容。
图 8.9. 多播模式
使用自定义聚合策略的多播
多播处理器接收多个 Out 消息,以响应原始请求(来自每个接收方的一个请求),原始的调用者仅希望收到一个回复。因此,消息交换的回复图存在固有不匹配,并且要克服这种不匹配,您必须为多播处理器提供自定义 聚合策略。聚合策略类负责将所有 Out 消息聚合到一个回复消息中。
考虑电子 auction 服务示例,其中销售者提供销售商的一个项目,供购买者列表销售。买家对参与项目而言,销售者自动选择具有最高价格的投标。您可以使用 multicast()
DSL 命令实施将提供的项分发到固定购买者列表,如下所示:
from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()). to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
这里的 seller 由端点 cxf:bean:offer
代表,而 buyers 由端点、cxf:bean:Buyer1
、cxf:bean:Buyer2
、cxf:bean:bean:Buyer3
表示。为了整合从各种买方接收的 bid,多播处理器使用聚合策略 HighestBidAggregationStrategy
。您可以在 Java 中实施 HighestBidAggregationStrategy
,如下所示:
// Java import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.Exchange; public class HighestBidAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { float oldBid = oldExchange.getOut().getHeader("Bid", Float.class); float newBid = newExchange.getOut().getHeader("Bid", Float.class); return (newBid > oldBid) ? newExchange : oldExchange; } }
假设购买者将 bid 价格插入到名为 Bid
的标头中。有关自定义聚合策略的详情,请参阅 第 8.5 节 “聚合器”。
并行处理
默认情况下,多播处理器在另一个接收方(根据 to()
命令中列出的顺序)调用每个接收者端点。在某些情况下,这可能导致不接受长延迟。为了避免这些长延迟时间,您可以选择通过添加 parallelProcessing()
子句来启用并行处理。例如,要在电子 auction 示例中启用并行处理,请按如下所示定义路由:
from("cxf:bean:offer") .multicast(new HighestBidAggregationStrategy()) .parallelProcessing() .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
在多播处理器现在调用购买器端点时,使用每个端点有一个线程的线程池。
如果要自定义调用购买器端点的线程池的大小,可以调用 executorService()
方法来指定您自己的自定义执行者服务。例如:
from("cxf:bean:offer")
.multicast(new HighestBidAggregationStrategy())
.executorService(MyExecutor)
.to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中 MyExecutor 是 java.util.concurrent.ExecutorService 类型的实例。
当交换具有 InOut 模式时,会使用聚合策略来聚合回复消息。默认聚合策略采用最新的回复消息,并丢弃早期的回复。例如,以下路由(自定义策略 MyAggregationStrategy
)用于聚合来自端点的回复、direct:a
、direct:b
和 direct:c
:
from("direct:start") .multicast(new MyAggregationStrategy()) .parallelProcessing() .timeout(500) .to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");
XML 配置示例
以下示例演示了如何在 XML 中配置类似路由,路由会使用自定义聚合策略和自定义线程执行器:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="cxf:bean:offer"/> <multicast strategyRef="highestBidAggregationStrategy" parallelProcessing="true" threadPoolRef="myThreadExcutor"> <to uri="cxf:bean:Buyer1"/> <to uri="cxf:bean:Buyer2"/> <to uri="cxf:bean:Buyer3"/> </multicast> </route> </camelContext> <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/> <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/> </beans>
当 parallelProcessing
属性和 threadPoolRef
属性都是可选的。只有想自定义多播处理器的线程行为时,才需要设置它们。
将自定义处理应用到传出消息
多播模式 复制源 Exchange 和多播。默认情况下,路由器制作源消息的绝对副本。在应该复制中,原始消息的标头和有效负载只通过引用复制,以便生成原始消息的副本。由于多播消息的绝对副本已链接,因此如果消息正文器是可变的,则无法应用自定义处理。适用于发送到一个端点的副本的自定义处理也应用于发送到每个端点的副本。
虽然 多播
语法允许您在 multicast
子句中调用
DSL 命令,但这并不有意义 process
上的意义,而且它 没有与 Prepare
(实际上,在本例中为 )相同的效果。
在准备消息时使用 onPrepare 执行自定义逻辑
如果要在将其发送到端点前将自定义处理应用到每个消息副本,您可以在 multicast
子句中调用 Prepare
DSL 命令。onPrepare
命令只在消息被放送到其端点 前 插入自定义处理器。例如,在以下路由中调用 CustomProc
处理器,消息发送到 direct:a
,自定义Proc
处理器也会在发送到 direct:b
的消息上调用。
from("direct:start") .multicast().onPrepare(new CustomProc()) .to("direct:a").to("direct:b");
onPrepare
DSL 命令的常见用例是对消息的部分或所有元素进行深入副本。例如,以下 CustomProc
处理器类执行消息正文的深层副本,其中消息正文假定为 type、Bdy Type
,而深度副本则由方法、Bdy Type.deepCopy()
执行。
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
您可以使用 onPrepare
来实现在 Exchange
多播前要执行的任何自定义逻辑。
建议为不可变对象设计。
例如,如果您有 mutable 消息正文,作为这个 Animal 类:
public class Animal implements Serializable { private int id; private String name; public Animal() { } public Animal(int id, String name) { this.id = id; this.name = name; } public Animal deepClone() { Animal clone = new Animal(); clone.setId(getId()); clone.setName(getName()); return clone; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return id + " " + name; } }
然后,我们可以创建一个深层克隆消息正文:
public class AnimalDeepClonePrepare implements Processor { public void process(Exchange exchange) throws Exception { Animal body = exchange.getIn().getBody(Animal.class); // do a deep clone of the body which wont affect when doing multicasting Animal clone = body.deepClone(); exchange.getIn().setBody(clone); } }
然后,我们可以使用 onPrepare
选项在 多播路由 中使用 AnimalDeepClonePrepare 类,如下所示:
from("direct:start") .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");
和 XML DSL 中的相同示例
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- use on prepare with multicast --> <multicast onPrepareRef="animalDeepClonePrepare"> <to uri="direct:a"/> <to uri="direct:b"/> </multicast> </route> <route> <from uri="direct:a"/> <process ref="processorA"/> <to uri="mock:a"/> </route> <route> <from uri="direct:b"/> <process ref="processorB"/> <to uri="mock:b"/> </route> </camelContext> <!-- the on prepare Processor which performs the deep cloning --> <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/> <!-- processors used for the last two routes, as part of unit test --> <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/> <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>
选项
多播
DSL 命令支持以下选项:
名称 | 默认值 | 描述 |
| 指的是 AggregationStrategy,用于将多播的回复组合成来自多播的单一传出消息。???默认情况下,Camel 将使用最后的回复作为传出消息。 | |
|
这个选项可用于明确指定要使用的方法名称,当 OVAs 用作 | |
|
|
当将 POJOs 用作 |
|
| 如果启用,则会同时将消息发送到多播。请注意,调用者线程仍会等待所有消息都完全处理,然后再继续。它只发送和处理来自同时发生的多播的回复。 |
|
|
如果启用,则 |
| 指的是用于并行处理的自定义线程池。请注意,如果您设定了这个选项,则并行处理会被自动表示,您也不必启用该选项。 | |
|
| Camel 2.2: 出现异常时是否立即停止持续处理。如果禁用,则 Camel 会将消息发送到所有多播,无论它们之一是否失败。您可在完全控制如何处理它的 AggregationStrategy 类中处理异常。 |
|
| 如果启用,Camel 将按照其返回的顺序处理查询的查询。如果禁用,Camel 将按照与多播一样处理回复。 |
|
Camel 2.5: 设置以毫秒为单位指定的总超时。如果 多播 无法发送和处理给定时间段内的所有回复,则超时触发器和 多播 中断并继续。请注意,如果您提供 TimeoutAwareAggregationStrategy,则会在中断前调用 | |
| Camel 2.8: 请参阅自定义处理器准备每个多播的副本。这可让您进行任何自定义逻辑,如 deep-cloning(如果需要)信息有效负载。 | |
|
| Camel 2.8: 是否应共享工作单元。详情请查看 第 8.4 节 “Splitter” 上的相同选项。 |