8.13. 多播
概述
图 8.9 “多播模式” 中显示的 多播 模式是 接收者列表 与固定目标模式的一种变体,与 InOut 消息交换模式兼容。这与接收者列表相反,它只与 InOnly Exchange 模式兼容。
图 8.9. 多播模式
使用自定义聚合策略进行多播
多播处理器接收多个 Out 消息来响应原始请求(每个接收方一个),原始调用者仅预期接收 单个 回复。因此,消息交换的回复中固有不匹配,为了克服这种不匹配,您必须为多播处理器提供自定义 聚合策略。聚合策略类负责将所有 Out 消息聚合到一个回复消息中。
考虑电子模拟服务的示例,其中销售者为购买者列表提供销售项目。买方各自处于项目的投标上,销售者会自动选择价格最高的生物。您可以使用 multicast ()
DSL 命令实现将提供的发布给固定购买者列表的逻辑,如下所示:
from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()). to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中,销售者由端点 cxf:bean:offer
表示,购买者由端点 cxf:bean:Buyer1、
表示。要整合从各种买方接收的 bids,多播处理器使用聚合策略 cxf:
bean:Buyer2、cxf:
bean:Buyer3HighestBidAggregationStrategy
。您可以在 Java 中实施 HighestBidAggregationStrategy
,如下所示:
// Java import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.Exchange; public class HighestBidAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { float oldBid = oldExchange.getOut().getHeader("Bid", Float.class); float newBid = newExchange.getOut().getHeader("Bid", Float.class); return (newBid > oldBid) ? newExchange : oldExchange; } }
假设买方将 bid 价格插入到名为 Bid
的标头中。有关自定义聚合策略的详情,请参阅 第 8.5 节 “聚合器”。
并行处理
默认情况下,多播处理器在另一个后调用每个接收者端点(按 to ()
命令中列出的顺序)。在某些情况下,这可能会导致不必要的长时间延迟。为了避免这些长时间延迟时间,您可以选择通过添加 parallelProcessing ()
子句来启用并行处理。例如,要在电子示例中启用并行处理,请按如下所示定义路由:
from("cxf:bean:offer") .multicast(new HighestBidAggregationStrategy()) .parallelProcessing() .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
现在,多播处理器使用为每个端点有一个线程的线程池调用购买者端点。
如果要自定义调用购买器端点的线程池的大小,您可以调用 executorService ()
方法来指定您自己的自定义 executor 服务。例如:
from("cxf:bean:offer")
.multicast(new HighestBidAggregationStrategy())
.executorService(MyExecutor)
.to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");
其中 MyExecutor 是 java.util.concurrent.ExecutorService 类型的实例。
当交换具有 InOut 模式时,使用聚合策略来聚合回复消息。默认聚合策略采用最新的回复消息,并丢弃早期的回复。例如,在以下路由中,自定义策略 MyAggregationStrategy
用于聚合来自端点、直接:a、
和 direct:
bdirect:c
的回复:
from("direct:start") .multicast(new MyAggregationStrategy()) .parallelProcessing() .timeout(500) .to("direct:a", "direct:b", "direct:c") .end() .to("mock:result");
XML 配置示例
以下示例演示了如何在 XML 中配置类似的路由,其中路由使用自定义聚合策略和自定义线程 executor:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="cxf:bean:offer"/> <multicast strategyRef="highestBidAggregationStrategy" parallelProcessing="true" threadPoolRef="myThreadExcutor"> <to uri="cxf:bean:Buyer1"/> <to uri="cxf:bean:Buyer2"/> <to uri="cxf:bean:Buyer3"/> </multicast> </route> </camelContext> <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/> <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/> </beans>
其中 parallelProcessing
属性和 threadPoolRef
属性都是可选的。只有在您要自定义多播处理器的线程行为时,才需要设置它们。
将自定义处理应用到传出消息
多播模式 复制源交换和多播副本。默认情况下,路由器会发出源消息的粗略副本。在 shallow copy 中,原始消息的标头和有效负载仅通过参考来复制,以便原始消息生成的副本链接。因为应该会链接多播消息的副本,因此如果消息正文是可变的,则无法应用自定义处理。应用到发送到一个端点的副本的自定义处理也适用于发送到所有其他端点的副本。
虽然多播语法允许您在
子句中调用 multicast
process
DSL 命令,但这并不完全完全有意义,但它与 onPrepare
的影响(实际上,在这种上下文中,进程
DSL 命令没有影响)。
在准备消息时,使用 onPrepare 来执行自定义逻辑
如果要在发送到其端点前将自定义处理应用到每个消息副本,您可以在 multicast
子句中调用 onPrepare
DSL 命令。onPrepare
命令会在消息被授权 后 插入一个自定义处理器,只需在消息 被分配给 其端点前插入一个自定义处理器。例如,在以下路由中,在发送到 direct:a
的消息上调用 CustomProc
处理器,同时在发送到 direct:b
的消息上调用 CustomProc
处理器。
from("direct:start") .multicast().onPrepare(new CustomProc()) .to("direct:a").to("direct:b");
onPrepare
DSL 命令的常见用例是执行消息的一些或所有元素的深度副本。例如,以下 CustomProc
处理器类执行消息正文的深度副本,其中消息正文假定为 type, BodyType
,而 deep copy 由方法 BodyType.deepCopy ()
执行。
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
您可以使用 onPrepare
来实现在 Exchange
多播前要执行的任何自定义逻辑。
建议为不可变对象设计。
例如,如果您有一个 mutable 消息正文作为这个 Animal 类:
public class Animal implements Serializable { private int id; private String name; public Animal() { } public Animal(int id, String name) { this.id = id; this.name = name; } public Animal deepClone() { Animal clone = new Animal(); clone.setId(getId()); clone.setName(getName()); return clone; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return id + " " + name; } }
然后,我们可以创建一个深度克隆处理器来克隆消息正文:
public class AnimalDeepClonePrepare implements Processor { public void process(Exchange exchange) throws Exception { Animal body = exchange.getIn().getBody(Animal.class); // do a deep clone of the body which wont affect when doing multicasting Animal clone = body.deepClone(); exchange.getIn().setBody(clone); } }
然后,我们可以使用 onPrepare
选项在 多播 路由中使用 AnimalDeepClonePrepare 类,如下所示:
from("direct:start") .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");
以及 XML DSL 中的相同示例
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <!-- use on prepare with multicast --> <multicast onPrepareRef="animalDeepClonePrepare"> <to uri="direct:a"/> <to uri="direct:b"/> </multicast> </route> <route> <from uri="direct:a"/> <process ref="processorA"/> <to uri="mock:a"/> </route> <route> <from uri="direct:b"/> <process ref="processorB"/> <to uri="mock:b"/> </route> </camelContext> <!-- the on prepare Processor which performs the deep cloning --> <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/> <!-- processors used for the last two routes, as part of unit test --> <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/> <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>
选项
multicast
DSL 命令支持以下选项:
Name | 默认值 | 描述 |
| 指的是 AggregationStrategy 用于将来自多播的回复汇编为来自多播的单个传出消息。???默认情况下,Camel 将使用最后一个回复作为传出消息。 | |
|
这个选项可用于在将 POJO 用作 | |
|
|
当使用 POJO 作为 |
|
| 如果启用,则会同时向多播发送信息。请注意,调用者线程仍然会等到所有消息都已完全处理,然后再继续。它只会从同时发生的多播发送和处理回复。 |
|
|
如果启用,则 |
| 指的是用于并行处理的自定义线程池。请注意,如果您设定了这个选项,则并行处理会被自动指示,您不必启用该选项。 | |
|
| Camel 2.2: 发生异常时是否立即停止处理。如果禁用,则 Camel 会将消息发送到所有多播,无论其中之一是否失败。您可以在 AggregationStrategy 类中处理异常,您可以完全控制如何处理这种情况。 |
|
| 如果启用,Camel 将按照其返回的顺序处理没有顺序的回复,例如。如果禁用,Camel 将以与多播相同的顺序处理回复。 |
|
Camel 2.5: 以毫秒为单位设置指定的总超时。如果 多播 无法发送和处理给定时间线内的所有回复,则超时触发器 和多播 中断并继续。请注意,如果您提供 TimeoutAwareAggregationStrategy,则在中断前会调用 | |
| Camel 2.8: 请参阅自定义处理器,以准备每个多播将接收的交换副本。这可让您执行任何自定义逻辑,如 deep-cloning the message payload (如果需要的话)。 | |
|
| Camel 2.8: 是否应共享工作单元。如需了解更多详细信息,请参阅 第 8.4 节 “Splitter” 中的同一选项。 |