8.3. 接收者列表


概述

接收者列表 (如 图 8.3 “接收者列表模式” )是将每个传入消息发送到多个不同目的地的路由器类型。另外,接收者列表通常要求在运行时计算接收者列表。

图 8.3. 接收者列表模式

接收者列表模式

带有固定目的地的接收者列表

最简单的接收者列表是目标列表已修复且提前已知的,交换模式是 InOnly。在这种情况下,您可以将目的地列表分成 to () Java DSL 命令。

注意

这里针对带有固定目的地的接收者列表的示例 只适用于 InOnly Exchange 模式(与管道 和过滤器模式类似)。如果要为带有 Out 消息的交换模式创建接收者列表,请使用 多播 模式。

Java DSL 示例

以下示例演示了如何将来自消费者端点 队列:aInOnly 交换路由到目的地的固定列表:

from("seda:a").to("seda:b", "seda:c", "seda:d");

XML 配置示例

以下示例演示了如何在 XML 中配置相同的路由:

<camelContext id="buildStaticRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <to uri="seda:b"/>
    <to uri="seda:c"/>
    <to uri="seda:d"/>
  </route>
</camelContext>

在运行时计算的接收者列表

在大多数情况下,当使用接收者列表模式时,应在运行时计算接收方列表。为此,可使用 接收者List () 处理器,它取目的地列表作为其唯一参数。因为 Apache Camel 将类型转换器应用于 list 参数,因此应该可以使用大多数标准 Java 列表类型(如集合、列表或数组)。有关类型转换器的详情,请参考 第 34.3 节 “内置类型 Converters”

接收者收到 同一 交换实例的副本,并且 Apache Camel 按顺序执行它们。

Java DSL 示例

以下示例演示了如何从名为 recipientListHeader 的消息标头中提取目的地列表,其中标头值是一个以逗号分隔的端点 URI 列表:

from("direct:a").recipientList(header("recipientListHeader").tokenize(","));

在某些情况下,如果标头值是一个列表类型,您可以直接使用它作为 接收者List () 的参数。例如:

from("seda:a").recipientList(header("recipientListHeader"));

但是,本例完全依赖于底层组件如何解析这个特定标头。如果组件以简单字符串的形式解析标头,本例 将无法工作。标头必须解析到某些类型的 Java 列表中。

XML 配置示例

以下示例演示了如何在 XML 中配置上述路由,其中 标头值是一个以逗号分隔的端点 URI 列表:

<camelContext id="buildDynamicRecipientList" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <recipientList delimiter=",">
      <header>recipientListHeader</header>
    </recipientList>
  </route>
</camelContext>

并行发送到多个接收者

可作为 Camel 2.2 提供

接收者列表模式 支持 并行处理,这与 拆分器模式 中的对应 功能类似。使用并行处理功能将交换发送到同时到达多个接收方,例如:

from("direct:a").recipientList(header("myHeader")).parallelProcessing();

在 Spring XML 中,并行处理功能是以 接收者List tag 的 attribute 进行实现的,例如:

<route>
  <from uri="direct:a"/>
  <recipientList parallelProcessing="true">
    <header>myHeader</header>
  </recipientList>
</route>

在例外时停止

可作为 Camel 2.2 提供

接收者列表 支持 stopOnException 功能,您可以在任何接收者失败时停止发送到任何进一步收件人。

from("direct:a").recipientList(header("myHeader")).stopOnException();

在 Spring XML 中,它在接收者列表标签中有一个属性。

在 Spring XML 中,停止异常功能是以 接收者List tag 的 attribute 进行实现的,例如:

<route>
  <from uri="direct:a"/>
  <recipientList stopOnException="true">
    <header>myHeader</header>
  </recipientList>
</route>
注意

您可以在同一路由中 组合并行处理stopOnException

忽略无效端点

可作为 Camel 2.3 提供

接收者列表模式 支持 ignoreInvalidEndpoints 选项,该选项可让接收者列表跳过无效的端点(路由 slips 模式 也支持这个选项)。例如:

from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

在 Spring XML 中,您可以通过在 recipientList 标签上设置 ignoreInvalidEndpoints 属性来启用这个选项,如下所示

<route>
  <from uri="direct:a"/>
  <recipientList ignoreInvalidEndpoints="true">
    <header>myHeader</header>
  </recipientList>
</route>

考虑 myHeader 包含两个端点( direct:foo,xxx:bar )的大小写。第一个端点有效并可工作。第二个操作无效,因此忽略。每当遇到无效的端点时,会位于 INFO 级别的 Apache Camel 日志。

使用自定义 AggregationStrategy

可作为 Camel 2.2 提供

您可以将自定义的 AggregationStrategy接收者列表模式 搭配使用,这对于聚合来自列表中的接收方的回复非常有用。默认情况下,Apache Camel 使用 UseLatestAggregationStrategy 聚合策略,该策略只保留上次收到的回复。对于更复杂的聚合策略,您可以自行定义 AggregationStrategy interfaceTOKEN-确保 第 8.5 节 “聚合器” 的实现。详情请参阅。例如,要将自定义聚合策略 MyOwnAggregationStrategy 应用到回复消息,您可以按照如下所示定义 Java DSL 路由:

from("direct:a")
    .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
    .to("direct:b");

在 Spring XML 中,您可以将自定义聚合策略指定为 recipientList 标签上的属性,如下所示:

<route>
  <from uri="direct:a"/>
  <recipientList strategyRef="myStrategy">
    <header>myHeader</header>
  </recipientList>
  <to uri="direct:b"/>
</route>

<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

使用自定义线程池

可作为 Camel 2.2 提供

这只在您使用 并行处理时才需要。默认情况下,Camel 使用了 10 个线程的线程池。请注意,当我们彻底掌握的线程池管理和以后配置时,这可能会改变(在 Camel 2.2 中)。

就像使用自定义聚合策略一样配置它。

使用方法调用作为接收者列表

您可以使用 bean 集成来提供接收者,例如:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");

MessageRouter bean 的定义位置:

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}

Bean 作为接收者列表

您可以通过将 @RecipientList 注释添加到返回接收者列表的方法,使 bean 作为接收者列表的行为。例如:

public class MessageRouter {

    @RecipientList
    public String routeTo() {
        String queueList = "activemq:queue:test1,activemq:queue:test2";
        return queueList;
    }
}

在本例中,不要在 路由中包含 接收者List DSL 命令。按照如下所示定义路由:

from("activemq:queue:test").bean(MessageRouter.class, "routeTo");

使用超时

可作为 Camel 2.5 提供

如果使用 parallelProcessing,可以用毫秒来配置一个总 超时值。Camel 随后会并行处理消息,直到超时达到为止。如果一条消息慢,则可以继续处理。

在以下示例中,接收者列表标头具有值 direct:a,direct:b,direct:c,因此消息会发送至三个收件人。我们有 250 毫秒的超时,这意味着在时间段内只能完成最后两个消息。因此,聚合会产生字符串 BC

from("direct:start")
    .recipientList(header("recipients"), ",")
    .aggregationStrategy(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;
                }

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
            }
        })
        .parallelProcessing().timeout(250)
    // use end to indicate end of recipientList clause
    .end()
    .to("mock:result");

from("direct:a").delay(500).to("mock:A").setBody(constant("A"));

from("direct:b").to("mock:B").setBody(constant("B"));

from("direct:c").to("mock:C").setBody(constant("C"));
注意

splittermulticastrecipientList 支持这个 超时 功能。

默认情况下,如果没有调用 AggregationStrategy,则默认会发生超时。但是,您可以实现一个特殊的版本

// Java
public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

    /**
     * A timeout occurred
     *
     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
     */
    void timeout(Exchange oldExchange, int index, int total, long timeout);

如果您真正需要,这可以处理 AggregationStrategy 中的超时时间。

超时为总计

超时时间总计,这意味着 Camel 在 X 时间后,Camel 将聚合了在时间段内完成的消息。剩余部分将被取消。对于导致超时的第一个索引,Camel 还将只调用 TimeoutAwareAggregationStrategy 中的 timeout 方法。

将自定义处理应用到传出消息

接收者List 将消息发送到接收者端点之一之前,它会创建一个消息副本,即原始消息的应复制。在匹配项中,原始消息的标头和有效负载仅通过引用来复制。每个新副本不包含这些元素自己的实例。因此,当消息被链接,您应该在将自定义处理路由到不同的端点时,您无法应用自定义处理。

如果要在将副本发送到端点之前对每条消息副本执行一些自定义处理,您可以在 接收者List 子句中调用 onPrepare DSL 命令。onPrepare 命令仅在消息 被禁止后 插入自定义处理器,且仅在消息被分配到其端点 之前。例如,在以下路由中,会在每个接收者端点 的消息副本上调用 CustomProc 处理器:

from("direct:start")
  .recipientList().onPrepare(new CustomProc());

onPrepare DSL 命令的一个常见用例是执行对消息的某些或所有元素的深度副本。这允许独立于其他消息副本修改每个消息副本。例如,以下 CustomProc 处理器类对消息正文执行深度副本,其中消息正文假定为 type、BrodyType,并且深度副本由方法 BodyType .deepCopy () 执行。

// Java
import org.apache.camel.*;
...
public class CustomProc implements Processor {

    public void process(Exchange exchange) throws Exception {
        BodyType body = exchange.getIn().getBody(BodyType.class);

        // Make a _deep_ copy of of the body object
        BodyType clone =  BodyType.deepCopy();
        exchange.getIn().setBody(clone);

        // Headers and attachments have already been
        // shallow-copied. If you need deep copies,
        // add some more code here.
    }
}

选项

接收者List DSL 命令支持以下选项:

Name

默认值

描述

delimiter

,

表达式返回多个端点时使用的分隔符。

strategyRef

 

引用 AggregationStrategy 用来将来自接收方的回复编译到来自 第 8.3 节 “接收者列表” 的单一传出消息。默认情况下,Camel 将使用最后一个回复作为传出消息。

strategyMethodName

 

当将 POJO 用作 AggregationStrategy 时,可以利用此选项来显式指定要使用的方法名称。

strategyMethodAllowNull

false

在使用 POJO 作为 AggregationStrategy 时,可以使用这个选项。如果为 false,则不使用聚合方法,如果没有数据要丰富。如果为 true,则当没有数据要丰富时,会使用null 值作为 oldExchange

parallelProcessing

false

Camel 2.2: 如果启用,则允许将消息同时发送到收件人。注意 caller 线程仍然会等待所有消息被完全处理,然后再继续。它只发送和处理来自收件人的回复,同时进行。

parallelAggregate

false

如果启用,则 AggregationStrategy 上的聚合方法可同时调用。请注意,这需要 AggregationStrategy 实现为 thread-safe。默认情况下,这个选项为 false,这意味着 Camel 会自动同步聚合方法 的调用。但是,在一些用例中,您可以通过将 AggregationStrategy 设置为 thread-safe 来提高性能,并将此选项设置为 true

executorServiceRef

 

Camel 2.2: 引用用于并行处理的自定义线程池。请注意,如果您设置了这个选项,则并行处理会被自动简化,您不需要启用该选项。

stopOnException

false

Camel 2.2: 当出现异常时,是否停止立即处理处理。如果禁用,Camel 会将消息发送到所有收件人,无论它们是否失败。您可以在 AggregationStrategy 类中处理异常,您可以在其中完全控制如何处理这种情况。

ignoreInvalidEndpoints

false

Camel 2.3: 如果端点 uri 无法解析,则忽略它。否则 Camel 将抛出异常,说明端点 uri 无效。

streaming

false

Camel 2.5: 如果启用,Camel 将按照顺序顺序进行回复,例如按它们返回的顺序。如果禁用,Camel 会按照与指定的表达式相同的顺序处理回复。

timeout

 

Camel 2.5: 设置 millis 中指定的总超时。如果 第 8.3 节 “接收者列表” 无法发送并处理给定时间段内的所有回复,则超时触发器 第 8.3 节 “接收者列表” 会中断并继续。请注意,如果您提供 AggregationStrategy,则在中断前会调用 超时 方法。

onPrepareRef

 

Camel 2.8: 请参阅自定义处理器,准备每个收件人的 Exchange 的副本。这可让您执行任何自定义逻辑,如深度克隆消息有效负载(如果需要)。

shareUnitOfWork

false

Camel 2.8: 是否应该共享工作单元。如需了解更多详细信息,请参阅 第 8.4 节 “Splitter” 相同的选项。

cacheSize

0

Camel 2.13.1/2.12.4: 允许配置 ProducerCache 的缓存大小,缓存生产者以便在路由 slip 中重复使用。默认将使用默认缓存大小 0。将值设为 -1 允许将缓存完全关闭。

在 Recipient 列表中使用 Exchange Pattern

默认情况下,Recipient List 使用当前的交换模式。但是,在有些情况下,您可以使用不同的交换模式向接收者发送信息。

例如,您可能有一个作为 InOnly 路由启动的路由。现在,如果要将 InOut Exchange 模式与接收者列表搭配使用,则需要直接在接收者端点中配置交换模式。

以下示例说明了新文件将作为 InOnly 启动的路由,然后路由到接收者列表。如果您想将 InOut 与 ActiveMQ (JMS)端点搭配使用,则需要使用与 InOut 选项的 exchangePattern 来指定它。但是,对 JMS 请求或回复形成的响应将被持续路由,因此响应作为 outbox 目录中的文件存储在 中。

from("file:inbox")
  // the exchange pattern is InOnly initially when using a file route
  .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut")
  .to("file:outbox");
注意

InOut Exchange 模式必须在超时期间获得响应。但是,如果响应不是被接收,则会失败。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.