8.3. 接收者列表
概述
图 8.3 “接收者列表模式” 中显示的 接收者列表 是路由器类型,将每个传入消息发送到多个不同的目的地。此外,接收者列表通常要求在运行时计算接收者列表。
图 8.3. 接收者列表模式
带有固定目的地的接收者列表
最简单的接收者列表是: 预先修复目的地列表并提前已知的,交换模式为 InOnly。在这种情况下,您可以硬化将目的地列表放入 to ()
Java DSL 命令中。
Java DSL 示例
以下示例演示了如何将 InOnly Exchange 从消费者端点 queue:a
路由到固定目的地列表:
from("seda:a").to("seda:b", "seda:c", "seda:d");
XML 配置示例
以下示例演示了如何在 XML 中配置相同的路由:
<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <to uri="seda:b"/> <to uri="seda:c"/> <to uri="seda:d"/> </route> </camelContext>
在运行时计算的接收者列表
在大多数情况下,当您使用接收者列表模式时,应在运行时计算接收者列表。为此,可使用 recipientList ()
处理器,该处理器使用目的地列表作为其唯一参数。由于 Apache Camel 将类型转换器应用到列表参数,因此应当可以使用大多数标准 Java 列表类型(如集合、列表或数组)。有关类型转换器的详情,请参考 第 34.3 节 “built-In Type Converters”。
接收者收到 同一 交换实例的副本,Apache Camel 按顺序执行它们。
Java DSL 示例
以下示例演示了如何从名为 receiverListHeader
的消息标头中提取目的地列表,其中标头值是以逗号分隔的端点 URI 列表:
from("direct:a").recipientList(header("recipientListHeader").tokenize(","));
在某些情况下,如果标头值是列表类型,您可以直接使用它作为 recipientList ()
的参数。例如:
from("seda:a").recipientList(header("recipientListHeader"));
但是,这个示例完全取决于底层组件如何解析这个特定标头。如果组件将标头解析为简单字符串,则此示例 将无法正常工作。标头必须解析为某种类型的 Java 列表。
XML 配置示例
以下示例演示了如何在 XML 中配置上述路由,其中标头值是以逗号分隔的端点 URI 列表:
<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <recipientList delimiter=","> <header>recipientListHeader</header> </recipientList> </route> </camelContext>
并行发送到多个接收者
从 Camel 2.2 开始提供
接收者列表模式 支持 parallelProcessing
,这与 splitter 模式 中的对应 功能类似。使用并行处理功能将交换同时发送到多个接收者,例如:
from("direct:a").recipientList(header("myHeader")).parallelProcessing();
在 Spring XML 中,并行处理功能是作为 recipientList
tag5-4-wagon 等属性实现的:
<route> <from uri="direct:a"/> <recipientList parallelProcessing="true"> <header>myHeader</header> </recipientList> </route>
异常停止
从 Camel 2.2 开始提供
接收者列表 支持 stopOnException
功能,如果任何接收者失败,您可以使用它停止发送到任何进一步的接收者。
from("direct:a").recipientList(header("myHeader")).stopOnException();
在 Spring XML 中,其接收者列表标签上的属性。
在 Spring XML 中,停止在异常功能上作为属性实现,作为 recipientList
tag swig-wagon 等属性:
<route> <from uri="direct:a"/> <recipientList stopOnException="true"> <header>myHeader</header> </recipientList> </route>
您可以在同一路由中组合 parallelProcessing
和 stopOnException
。
忽略无效的端点
从 Camel 2.3 开始提供
接收者列表模式 支持 ignoreInvalidEndpoints
选项,该选项可让接收者列表跳过无效的端点(路由 slips 模式 也支持这个选项)。例如:
from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();
在 Spring XML 中,您可以通过在 recipientList
标签上设置 ignoreInvalidEndpoints
属性来启用这个选项,如下所示
<route> <from uri="direct:a"/> <recipientList ignoreInvalidEndpoints="true"> <header>myHeader</header> </recipientList> </route>
例如,myHeader
包含两个端点 direct:foo,xxx:bar
。第一个端点有效且正常工作。第二个无效,因此忽略。每当遇到无效的端点时,Apache Camel 会在 INFO
级别记录。
使用自定义 AggregationStrategy
从 Camel 2.2 开始提供
您可以使用带有 接收者列表模式 的自定义 AggregationStrategy
,这对于聚合列表中 来自接收者的回复很有用。默认情况下,Apache Camel 使用 UseLatestAggregationStrategy
聚合策略,仅保留最后收到的回复。对于更复杂的聚合策略,您可以定义您自己的 AggregationStrategy
接口 implementation,详情请参阅 第 8.5 节 “聚合器”。例如,要将自定义聚合策略 MyOwnAggregationStrategy
应用到回复消息,您可以定义 Java DSL 路由,如下所示:
from("direct:a") .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy()) .to("direct:b");
在 Spring XML 中,您可以将自定义聚合策略指定为 receiverList
标签上的属性,如下所示:
<route> <from uri="direct:a"/> <recipientList strategyRef="myStrategy"> <header>myHeader</header> </recipientList> <to uri="direct:b"/> </route> <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
使用自定义线程池
从 Camel 2.2 开始提供
这只在使用 并行进程时才需要
。默认情况下,Camel 使用有 10 个线程的线程池。请注意,当我们稍后进行过度处理线程池管理和配置时(在 Camel 2.2 中),这可能会有变化。
您像使用自定义聚合策略一样配置它。
使用方法调用作为接收者列表
您可以使用 bean 集成来提供接收者,例如:
from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");
其中 MessageRouter
bean 定义如下:
public class MessageRouter { public String routeTo() { String queueName = "activemq:queue:test2"; return queueName; } }
bean 作为接收者列表
您可以通过在返回一组接收者列表的方法中添加 @RecipientList
注释,使 bean 充当接收者列表。例如:
public class MessageRouter { @RecipientList public String routeTo() { String queueList = "activemq:queue:test1,activemq:queue:test2"; return queueList; } }
在这种情况下,不要在 路由中包含 recipientList
DSL 命令。按如下方式定义路由:
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");
使用超时
从 Camel 2.5 开始提供
如果使用 parallelProcessing
,您可以以毫秒为单位配置总 超时值
。然后,Camel 将并行处理消息,直到超时到达为止。如果一个信息缓慢,可以继续处理。
在以下示例中,receiverlist
标头的值为 direct:a,direct:b,direct:c
,以便消息发送到三个接收者。我们有一个 250 毫秒的超时时间,这意味着在时间范围内只能完成最后两个消息。因此,聚合会生成字符串结果 BC
。
from("direct:start") .recipientList(header("recipients"), ",") .aggregationStrategy(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250) // use end to indicate end of recipientList clause .end() .to("mock:result"); from("direct:a").delay(500).to("mock:A").setBody(constant("A")); from("direct:b").to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C"));
splitter
和 multicast
和 recipientList
也支持此 超时
功能。
默认情况下,如果没有调用 AggregationStrategy
,则默认超时。但是,您可以实施一个专用版本
// Java public interface TimeoutAwareAggregationStrategy extends AggregationStrategy { /** * A timeout occurred * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) * @param index the index * @param total the total * @param timeout the timeout value in millis */ void timeout(Exchange oldExchange, int index, int total, long timeout);
如果您真正需要,这可让您处理 AggregationStrategy
中的超时。
超时是 total,这意味着 X 时间后,Camel 将聚合在时间线内完成的消息。剩余部分将被取消。Camel 还将只针对导致 超时
的第一个索引在 TimeoutAwareAggregationStrategy
一次调用超时方法。
将自定义处理应用到传出消息
在 receiverList
发送消息到其中一个接收者端点之前,它会创建一个消息副本,这是原始邮件的粗略副本。在 Show 副本中,原始消息的标头和有效负载仅通过引用复制。每个新副本不包含它们自己的这些元素实例。因此,消息应该会链接,在将自定义处理路由到不同的端点时,您无法应用自定义处理。
如果要在副本发送到其端点之前对每个消息副本执行一些自定义处理,您可以在 recipientList
子句中调用 onPrepare
DSL 命令。onPrepare
命令仅在消息被压缩 后 插入自定义处理器,并在将消息分配给其端点 之前。例如,在以下路由中,每个接收者端点 的消息副本上调用 CustomProc
处理器:
from("direct:start") .recipientList().onPrepare(new CustomProc());
onPrepare
DSL 命令的常见用例是对消息的一些或所有元素执行深度副本。这样,每个消息副本都可以独立于其他副本进行修改。例如,以下 CustomProc
处理器类执行消息正文的深度副本,其中消息正文假定为类型为 BodyType
,而深度副本则由方法 BodyType.deepCopy ()
执行。
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
选项
recipientList
DSL 命令支持以下选项:
Name | 默认值 | 描述 |
|
| 如果 Expression 返回多个端点,则使用分隔符。 |
| 是指用于将来自接收者的回复汇编为 第 8.3 节 “接收者列表” 的单一传出消息的 AggregationStrategy。默认情况下,Camel 将使用最后一个回复作为传出消息。 | |
|
当将 POJO 用作 | |
|
|
当将 POJO 用作 |
|
| Camel 2.2: 如果启用然后向接收方发送信息。请注意,调用者线程仍然会等待所有消息被完全处理,然后再继续。它仅发送和处理来自同时发生的接收方的回复。 |
|
|
如果启用,则 |
| Camel 2.2: 请参阅用于并行处理的自定义线程池。请注意,如果您设置了这个选项,则并行处理是自动的,您不必启用该选项。 | |
|
| Camel 2.2 : 在出现异常情况时,是否立即停止继续处理。如果禁用,则 Camel 会将消息发送到所有接收者,无论它们是否失败。您可以在 AggregationStrategy 类中处理异常,您可以完全控制如何处理它。 |
|
| Camel 2.3: 如果无法解析端点 uri,则应忽略它。否则 Camel 将抛出一个例外,表示端点 uri 无效。 |
|
| Camel 2.5: 如果启用,则 Camel 将按其返回的顺序处理回复,例如:如果禁用,Camel 将以与指定的 Expression 相同的顺序处理回复。 |
|
Camel 2.5: 设置在 millis 中指定的总超时。如果 第 8.3 节 “接收者列表” 无法发送和处理给定时间段内的所有回复,则超时触发器和 第 8.3 节 “接收者列表” 中断并继续。请注意,如果您提供了一个 AggregationStrategy,则在中断前调用 | |
| Camel 2.8: 请参阅自定义处理器来准备每个接收者将收到的 Exchange 副本。这可让您执行任何自定义逻辑,如在需要时深度获取消息有效负载。 | |
|
| Camel 2.8: 是否应共享工作单元。详情请查看 第 8.4 节 “Splitter” 中的同一选项。 |
|
| Camel 2.13.1/2.12.4 : 允许为 ProducerCache 配置缓存大小,该缓存生成者可在路由 slip 中重复使用。默认情况下,将使用默认缓存大小为 0。将值设为 -1 允许将缓存全部关闭。 |
在 Recipient 列表中使用交换模式
默认情况下,Recipient List 使用当前的交换模式。但是,在有些情况下,您可以使用不同的交换模式向接收者发送消息。
例如,您可能有一个作为 InOnly
路由启动的路由。现在,如果要使用带有接收者列表的 InOut
Exchange 模式,则需要直接在接收者端点中配置交换模式。
以下示例演示了新文件将以 InOnly 开头,然后路由到接收者列表的路由。如果要将 InOut 与 ActiveMQ (JMS)端点搭配使用,您需要使用 exchangePattern 等于 InOut 选项来指定此端点。但是,响应形成 JMS 请求或回复将持续路由,因此响应将作为 outbox 目录中的文件存储在 中。
from("file:inbox") // the exchange pattern is InOnly initially when using a file route .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut") .to("file:outbox");
InOut
Exchange 模式必须在超时期间获得响应。但是,如果响应没有修复,它会失败。