8.3. 接收者列表
概述
一个 接收者列表 (在 图 8.3 “接收者列表模式” 中显示的是路由器类型),将每个传入的信息发送到多个不同的目的地。此外,接收者列表通常要求在运行时计算接收者列表。
图 8.3. 接收者列表模式
带有固定目的地的接收者列表
最简单的接收者列表是预先固定并已知的目的地列表,而交换模式是 Only。在这种情况下,您可以硬编码到 to()
Java DSL 命令中的目的地列表。
Java DSL 示例
以下示例演示了如何将来自消费者端点 queue:a
的 InOnly exchange 路由到固定目的地列表:
from("seda:a").to("seda:b", "seda:c", "seda:d");
XML 配置示例
以下示例演示了如何在 XML 中配置相同的路由:
<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <to uri="seda:b"/> <to uri="seda:c"/> <to uri="seda:d"/> </route> </camelContext>
运行时计算的接收者列表
在大多数情况下,当您使用接收者列表模式时,应在运行时计算接收者列表。为此,可使用 recipientList()
处理器,它将目的地列表视为其唯一参数。由于 Apache Camel 将类型转换器应用到列表参数,因此应该使用大多数标准 Java 列表类型(如集合、列表或数组)。有关类型转换器的详情,请参考 第 34.3 节 “内置(In Type Converters)”。
接收者收到 同一 交换实例的副本,而 Apache Camel 会按顺序执行它们。
Java DSL 示例
以下示例演示了如何从名为 recipientListHeader
的消息标头提取目的地列表,其中标头值是以逗号分隔的端点 URI 列表:
from("direct:a").recipientList(header("recipientListHeader").tokenize(","));
在某些情况下,如果标头值是一个列表类型,您可以将其直接用作 接收者List()
的参数。例如:
from("seda:a").recipientList(header("recipientListHeader"));
但是,这个示例完全依赖于底层组件如何解析这个特定标头。如果组件将标头解析为简单字符串,则该示例 将无法工作。标头必须解析到某种类型的 Java 列表。
XML 配置示例
以下示例演示了如何在 XML 中配置前面的路由,其中标头值是一个用逗号分开的端点 URI 列表:
<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <recipientList delimiter=","> <header>recipientListHeader</header> </recipientList> </route> </camelContext>
并行发送到多个接收者
可作为 Camel 2.2 提供
接收者列表模式 支持 parallelProcessing
,这与 splitter 模式中的相应功能类似。使用并行处理功能将交换发送到多个接收方,同时为 example.
from("direct:a").recipientList(header("myHeader")).parallelProcessing();
在 Spring XML 中,并行处理功能作为 接收者List
tag>_<-abrtfor 示例上的属性实现:
<route> <from uri="direct:a"/> <recipientList parallelProcessing="true"> <header>myHeader</header> </recipientList> </route>
异常停止
可作为 Camel 2.2 提供
接收者列表 支持 stopOnException
功能,如果任何接收者失败,您可以使用它停止发送到任何进一步的接收者。
from("direct:a").recipientList(header("myHeader")).stopOnException();
以及在 Spring XML 中,其接收者列表标签上的属性。
在 Spring XML 中,异常功能的停止是作为 接收者List
tag>_<-abrtfor 示例的属性实现:
<route> <from uri="direct:a"/> <recipientList stopOnException="true"> <header>myHeader</header> </recipientList> </route>
您可以在同一 路由中组合并行处理
和停止OnException
。
忽略无效的端点
从 Camel 2.3 开始可用
接收者列表模式 支持 ignoreInvalidEndpoints
选项,它允许接收者列表跳过无效的端点(路由 slips 模式 也支持此选项)。例如:
from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();
在 Spring XML 中,您可以通过在 recipientList
标签上设置 ignoreInvalidEndpoints
属性来启用这个选项,如下所示
<route> <from uri="direct:a"/> <recipientList ignoreInvalidEndpoints="true"> <header>myHeader</header> </recipientList> </route>
请考虑 myHeader
包含两个端点的情况,direct:foo,xxx:bar
。第一个端点有效且可以正常工作。第二个是无效,因此忽略。当遇到无效的端点时,Apache Camel 都会在 INFO
级别记录。
使用自定义 AggregationStrategy
可作为 Camel 2.2 提供
您可以使用带有 接收者列表模式 的自定义 AggregationStrategy
,这对于从列表中接收方发出回复很有用。默认情况下,Apache Camel 使用 UseLatestAggregationStrategy
聚合策略,以仅保留上次收到的回复。对于更复杂的聚合策略,您可以自行定义 AggregationStrategy
接口的实施。有关详细信息,请参阅 第 8.5 节 “聚合器”。例如,若要将自定义聚合策略 MyOwnAggregationStrategy
应用到答复消息,您可以按照如下所示定义 Java DSL 路由:
from("direct:a") .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy()) .to("direct:b");
在 Spring XML 中,您可以将自定义聚合策略指定为 接收者List
标签的属性,如下所示:
<route> <from uri="direct:a"/> <recipientList strategyRef="myStrategy"> <header>myHeader</header> </recipientList> <to uri="direct:b"/> </route> <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
使用自定义线程池
可作为 Camel 2.2 提供
这只在使用 并行处理时才需要
。默认情况下,Camel 使用有 10 个线程的线程池。请注意,在我们稍后进行线程管理和配置时(在 Camel 2.2 中可能发生)时,这可能会改变。
您像使用自定义聚合策略一样配置它。
使用方法调用作为接收者列表
您可以使用 bean 集成来提供接收者,例如:
from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");
其中,MessageRouter
bean 的定义如下:
public class MessageRouter { public String routeTo() { String queueName = "activemq:queue:test2"; return queueName; } }
bean 作为接收者列表
您可以通过将 @RecipientList
注解添加到返回接收者列表的方法,使 bean 的行为作为接收者列表。例如:
public class MessageRouter { @RecipientList public String routeTo() { String queueList = "activemq:queue:test1,activemq:queue:test2"; return queueList; } }
在这种情况下,不要在 路由中包含 recipientList
DSL 命令。定义路由,如下所示:
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");
使用超时
可作为 Camel 2.5 提供。
如果使用 并行处理
,则可以以毫秒为单位配置总 超时值
。然后,Camel 将并行处理消息,直到超时达到为止。这可让您在一条信息较慢时继续处理。
在以下示例中,recipientlist
标头的值为 direct: a、direct:b、direct:c
,因此该邮件会发送到三个接收方。我们有 250毫秒的超时,这意味着在时间段内只有最后两条消息即可完成。因此,聚合会产生字符串结果 BC
。
from("direct:start") .recipientList(header("recipients"), ",") .aggregationStrategy(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) .parallelProcessing().timeout(250) // use end to indicate end of recipientList clause .end() .to("mock:result"); from("direct:a").delay(500).to("mock:A").setBody(constant("A")); from("direct:b").to("mock:B").setBody(constant("B")); from("direct:c").to("mock:C").setBody(constant("C"));
这种 超时
功能也由 splitter
和 receiver
List 支持
。
默认情况下,如果超时发生 AggregationStrategy
不会被调用。但是,您可以实施专用版本
// Java public interface TimeoutAwareAggregationStrategy extends AggregationStrategy { /** * A timeout occurred * * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) * @param index the index * @param total the total * @param timeout the timeout value in millis */ void timeout(Exchange oldExchange, int index, int total, long timeout);
这允许您在 AggregationStrategy
中处理超时(如果您确实需要)。
超时为总计,这意味着在 X 时间后,Camel 将聚合在时间段内完成的消息。剩余部分将被取消。Camel 还将只调用 TimeoutAwareAggregationStrategy
中的 timeout
方法,用于第一个导致超时的索引。
将自定义处理应用到传出消息
在 recipientList
会向接收者端点发送一条信息前,它会创建一个消息副本,这是原始消息的绝对副本。在应该复制中,原始消息的标头和有效负载仅通过引用来复制。每个新副本不包含这些元素自己的实例。因此,消息的绝对副本会链接,当将其路由到不同的端点时,您无法应用自定义处理。
如果要在副本发送到端点前对每个消息副本执行一些自定义处理,您可以在 recipientList
子句中调用 onPrepare
DSL 命令。onPrepare
命令只在消息被放送到其端点 前 插入自定义处理器。例如,在以下路由中,为每个接收者端点 在消息副本上调用 CustomProc
处理器:
from("direct:start") .recipientList().onPrepare(new CustomProc());
onPrepare
DSL 命令的常见用例是对消息的部分或所有元素进行深入副本。这允许独立于其他消息副本修改每个消息副本。例如,以下 CustomProc
处理器类执行消息正文的深层副本,其中消息正文假定为 type、Bdy Type
,而深度副本则由方法、Bdy Type.deepCopy()
执行。
// Java import org.apache.camel.*; ... public class CustomProc implements Processor { public void process(Exchange exchange) throws Exception { BodyType body = exchange.getIn().getBody(BodyType.class); // Make a _deep_ copy of of the body object BodyType clone = BodyType.deepCopy(); exchange.getIn().setBody(clone); // Headers and attachments have already been // shallow-copied. If you need deep copies, // add some more code here. } }
选项
recipientList
DSL 命令支持以下选项:
名称 | 默认值 | 描述 |
|
| 表达式返回的多个端点时使用分隔符。 |
| 指的是 AggregationStrategy,用于将接收方的回复组合成来自 第 8.3 节 “接收者列表” 的单个传出消息。默认情况下,Camel 将使用最后的回复作为传出消息。 | |
|
这个选项可用于明确指定要使用的方法名称,当 OVAs 用作 | |
|
|
当将 POJOs 用作 |
|
| Camel 2.2: 如果启用然后同时向接收者发送信息。请注意,调用者线程仍会等待所有消息都完全处理,然后再继续。它只发送和处理来自同时发生的收件人的回复。 |
|
|
如果启用,则 |
| Camel 2.2: 请参阅自定义线程池,以用于并行处理。请注意,如果您设定了这个选项,则并行处理会被自动表示,您也不必启用该选项。 | |
|
| Camel 2.2: 出现异常时是否立即停止持续处理。如果禁用,则 Camel 会将信息发送到所有收件人,无论它们是否失败。您可在完全控制如何处理它的 AggregationStrategy 类中处理异常。 |
|
| Camel 2.3: 如果无法解析端点 uri,它会被忽略。否则,Camel 会抛出一个异常,说明 endpoint uri 无效。 |
|
| Camel 2.5: 如果启用,Camel 将处理顺序的回复,例如他们返回的顺序。如果禁用,Camel 将按照与指定的表达式相同的顺序进行回复。 |
|
Camel 2.5: 设置 millis 中指定的总超时。如果 第 8.3 节 “接收者列表” 没有可以在指定时间段内发送和处理所有回复,则超时触发器和 第 8.3 节 “接收者列表” 会中断并继续。请注意,如果您提供 AggregationStrategy,则会在中断前调用 | |
| Camel 2.8: 请参阅自定义处理器准备每个接收方的 Exchange 副本。这可让您进行任何自定义逻辑,如 deep-cloning(如果需要)信息有效负载。 | |
|
| Camel 2.8: 是否应共享工作单元。详情请查看 第 8.4 节 “Splitter” 上的相同选项。 |
|
| Camel 2.13.1/2.12.4: 允许配置 ProducerCache 的缓存大小,该缓存制作者在路由 slip 中重复使用。默认情况下,将使用默认的缓存大小,即 0。将值设为 -1 允许将缓存全部关闭。 |
在 Recipient 列表中使用 Exchange Pattern
默认情况下,Recipient List 使用当前的交换模式。但是,在有些情况下,您可以使用不同的交换模式向接收者发送信息。
例如,您可能有一个作为 InOnly
路由启动的路由。现在,如果您要通过接收者列表使用 InOut
Exchange 模式,则需要直接在接收者端点中配置交换模式。
以下示例演示了新文件将作为 InOnly 开头,然后路由到接收者列表的路由。如果要将 InOut 与 ActiveMQ(JMS)端点搭配使用,则需要使用 exchangePattern 等于 InOut 选项指定该端点。但是,响应形式随后将继续路由 JMS 请求或回复,因此响应将作为文件存储在 outbox 目录中。
from("file:inbox") // the exchange pattern is InOnly initially when using a file route .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut") .to("file:outbox");
InOut
Exchange 模式必须在超时时间内获得响应。但是,如果响应未接收,则会失败。