8.5. 聚合器


概述

通过 图 8.5 “聚合器模式” 所示的 聚合器 模式,您可以将相关消息批量合并到一个消息中。

图 8.5. 聚合器模式

聚合器模式

要控制聚合器的行为,Apache Camel 允许您指定 企业集成模式 中描述的属性,如下所示:

  • correlation 表达式 spite确定哪些消息应聚合在一起。关联表达式评估在每个传入消息上,以生成 关联密钥。具有相同相关性键的传入消息被分组到同一批处理中。例如,如果要将 所有传入 信息聚合到单个消息,您可以使用恒定表达式。
  • 完成消息批次后,ness 条件 只有 TOKEN 确定。您可以将它指定为简单大小限制,或者更普遍,您可以在批处理完成时指定标志的 predicate 条件。
  • 聚合算法 TOKEN-sandboxed 将消息交换器用于单一消息交换器,用于将单个关联密钥交换到单个消息交换中。

例如,一个库存市场数据系统,每秒接收 30,000 个消息。如果您的 GUI 工具无法应对此类更新率,您可能希望降低消息流。只需选择最新的报价并丢弃旧价格,即可将传入股票报价聚合在一起。(如果您愿意捕获一些历史记录,可以应用 delta 处理算法。)

注意

现在,聚合器使用包含更多信息的 ManagedAggregateProcessorMBean 中列出 JMX。它允许您使用聚合控制器来控制它。

聚合器的工作方式

图 8.6 “聚合器实施” 显示聚合器如何工作的概述,假设它带有与 A、B、C 或 D 等关联键(如 A、B、C 或 D)的交换流。

图 8.6. 聚合器实施

消息路由 02

图 8.6 “聚合器实施” 中显示的交换流处理如下:

  1. correlator 负责根据关联密钥对交换的排序负责。对于每个传入交换,会评估关联表达式,从而获得关联密钥。例如,对于 图 8.6 “聚合器实施” 中显示的交换,关联键评估为 A。
  2. 聚合策略 负责将交换与相同关联密钥进行合并。当新的交换器发布时,聚合器会在聚合存储库中查找对应的 聚合交换、A',并将其与新的交换合并。

    在完成特定的聚合周期之前,传入交换随对应的聚合交换一起聚合。聚合周期持续持续,直到其中一个完成机制终止。

    注意

    从 Camel 2.16,新的 XSLT 聚合策略允许您将两个消息与 XSLT 文件合并。您可以从 toolbox 访问 AggregationStrategies.xslt () 文件。

  3. 如果在聚合器上指定了完成 predicate,则会测试聚合交换来确定是否准备好发送到路由中的下一个处理器。处理按如下方式继续:

    • 如果完成,则聚合交换由路由的后一部分进行处理。此模型有两种替代模型: 同步 (默认),这会导致调用线程块或 异步 (如果启用了并行处理),其中聚合交换被提交到 executor 线程池(如 图 8.6 “聚合器实施”所示)。
    • 如果没有完成,聚合交换会重新保存到聚合存储库。
  4. 与同步完成测试并行,可以通过启用 completionTimeout 选项或 completionInterval 选项来启用异步完成测试。这些完成测试在单独的线程中运行,每当满足完成测试时,对应的交换都会标记为完成,并被路由的后一部分(根据并行处理启用或异步处理)进行处理。
  5. 如果启用了并行处理,则线程池负责处理路由后面的部分中交换。默认情况下,这个线程池包含十个线程,但您可以选择自定义池(“线程选项”一节)。

Java DSL 示例

以下示例使用 UseLatestAggregationStrategy 聚合策略使用同一 StockSymbol 标头值来聚合交换。对于给定的 StockSymbol 值,如果自收到了最后的关联密钥交换了三秒以上,则聚合交换被视为完成并发送到 模拟 端点。

from("direct:start")
    .aggregate(header("id"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

XML DSL 示例

以下示例演示了如何在 XML 中配置相同的路由:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy"
                   completionTimeout="3000">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>

指定关联表达式

在 Java DSL 中,关联表达式始终作为参数传递到 aggregate () DSL 命令。您不限于此处使用简单表达式语言。您可以使用任何表达式语言或脚本语言(如 XPath、XQuery、SQL 等)指定关联表达式。

对于 exampe,要使用 XPath 表达式来关联交换,您可以使用以下 Java DSL 路由:

from("direct:start")
    .aggregate(xpath("/stockQuote/@symbol"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

如果无法对特定传入的交换进行评估,则聚合器默认会引发 CamelExchangeException。您可以通过设置 ignoreInvalidCorrelationKeys 选项来抑制此异常。例如,在 Java DSL 中:

from(...).aggregate(...).ignoreInvalidCorrelationKeys()

在 XML DSL 中,您可以设置 ignoreInvalidCorrelationKeys 选项作为属性,如下所示:

<aggregate strategyRef="aggregatorStrategy"
           ignoreInvalidCorrelationKeys="true"
           ...>
    ...
</aggregate>

指定聚合策略

在 Java DSL 中,您可以将聚合策略作为第二参数传递给 aggregate () DSL 命令,或使用 aggregationStrategy () 子句指定它。例如,您可以按照如下所示使用 aggregationStrategy () 子句:

from("direct:start")
    .aggregate(header("id"))
        .aggregationStrategy(new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

Apache Camel 提供以下基本的聚合策略(其中类属于 org.apache.camel.processor.aggregate Java 软件包):

UseLatestAggregationStrategy
返回给定关联密钥的最后一个交换,并用这个密钥丢弃所有之前的交换。例如,此策略可用于调整库存交换的馈送,因为您希望知道特定库存符号的最新价格。
UseOriginalAggregationStrategy
返回给定关联密钥的第一个交换,并用此密钥丢弃所有之后的交换。您必须在使用此策略前调用 UseOriginalAggregationStrategy.setOriginal () 来设置第一个交换。
GroupedExchangeAggregationStrategy
将给定关联密钥 的所有 交换连接到列表中,此交换存储在 Exchange.GROUPED_EXCHANGE Exchange 属性中。请参阅 “分组的交换”一节

实施自定义聚合策略

如果要应用不同的聚合策略,您可以实现以下聚合策略基本接口之一:

org.apache.camel.processor.aggregate.AggregationStrategy
基本聚合策略接口。
org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy

如果您希望实现在聚合周期超时时接收通知,实施此接口。超时 通知方法有以下签名:

void timeout(Exchange oldExchange, int index, int total, long timeout)
org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy

如果您希望实现在聚合周期正常完成时接收通知,实施此接口。通知方法有以下签名:

void onCompletion(Exchange exchange)

例如,以下代码显示了两个不同的自定义聚合策略,StringAggregationStrategyArrayListAggregationStrategy:

 //simply combines Exchange String body values using '' as a delimiter
 class StringAggregationStrategy implements AggregationStrategy {

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         if (oldExchange == null) {
             return newExchange;
         }

         String oldBody = oldExchange.getIn().getBody(String.class);
         String newBody = newExchange.getIn().getBody(String.class);
         oldExchange.getIn().setBody(oldBody + "" + newBody);
         return oldExchange;
     }
 }

 //simply combines Exchange body values into an ArrayList<Object>
 class ArrayListAggregationStrategy implements AggregationStrategy {

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
 	    Object newBody = newExchange.getIn().getBody();
     	ArrayList<Object> list = null;
         if (oldExchange == null) {
 		    list = new ArrayList<Object>();
 		    list.add(newBody);
 		    newExchange.getIn().setBody(list);
 		    return newExchange;
         } else {
 	        list = oldExchange.getIn().getBody(ArrayList.class);
 	    	list.add(newBody);
 		    return oldExchange;
 	    }
     }
 }
注意

从 Apache Camel 2.0 开始,也调用 AggregationStrategy.aggregate () 回调方法,用于非常首先的交换。在第一个调用 aggregate 方法时,oldExchange 参数为 nullnewExchange 参数则包含第一个传入的交换。

要使用自定义策略类 ArrayListAggregationStrategy 来聚合信息,请定义类似以下的路由:

from("direct:start")
    .aggregate(header("StockSymbol"), new ArrayListAggregationStrategy())
    .completionTimeout(3000)
    .to("mock:result");

您还可以使用 XML 中的自定义聚合策略配置路由,如下所示:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregate strategyRef="aggregatorStrategy"
               completionTimeout="3000">
      <correlationExpression>
        <simple>header.StockSymbol</simple>
      </correlationExpression>
      <to uri="mock:aggregated"/>
    </aggregate>
  </route>
</camelContext>

<bean id="aggregatorStrategy" class="com.my_package_name.ArrayListAggregationStrategy"/>

控制自定义聚合策略的生命周期

您可以实施自定义聚合策略,以便其生命周期与控制它的企业集成模式的生命周期一致。这有助于确保聚合策略可以正常关闭。

要实施具有生命周期支持的聚合策略,您必须实施 org.apache.camel.Service 接口(在 AggregationStrategy 接口之外),并提供 start ()stop () 生命周期方法的实施。例如,以下代码示例显示了一个带有生命周期支持的聚合策略概述:

// Java
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Service;
import java.lang.Exception;
...
class MyAggStrategyWithLifecycleControl
       implements AggregationStrategy, Service {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // Implementation not shown...
        ...
    }

    public void start() throws Exception {
        // Actions to perform when the enclosing EIP starts up
        ...
    }

    public void stop() throws Exception {
        // Actions to perform when the enclosing EIP is stopping
        ...
    }
}

交换属性

每个聚合交换中都会设置以下属性:

标头类型描述聚合交换属性

Exchange.AGGREGATED_SIZE

int

在这个交换中聚合的交换总数。

Exchange.AGGREGATED_COMPLETED_BY

字符串

表示负责完成聚合交换的机制。可能的值有: predicatesizetimeoutintervalconsumer

以下属性是在被 SQL 组件聚合仓库重新提供的交换上设置(请参阅 “持久聚合存储库”一节):

标头类型描述 Redelivered Exchange Properties

Exchange.REDELIVERY_COUNTER

int

当前重新传送尝试的序列号(从 1开始)。

指定完成条件

至少需要指定一个 完成条件,决定聚合交换何时离开聚合器并继续进行路由上的下一个节点。可以指定以下完成条件:

completionPredicate
在每个交换被聚合后评估谓词以确定完整性。值 true 表示聚合交换已完成。另外,您还可以定义一个实现 Predicate 接口的自定义 AggregationStrategy,在这种情况下,AggregationStrategy 将用作完成 predicate。
completionSize
在聚合指定数量的传入交换后完成聚合交换。
completionTimeout

(与 completionInterval兼容) 完成聚合交换(如果没有在指定超时内聚合交换)。

换句话说,超时机制会跟踪 每个 关联键值的超时。时钟在使用特定密钥值进行最新交换后开始采用。如果指定超时中 没有收到 具有相同密钥值的另一个交换,则对应的聚合交换将标记为完成,并发送到路由上的下一个节点。

completionInterval

(与 completionTimeout兼容) 在每次经过的时间间隔(指定长度)后,完成所有 未完成的聚合交换。

每个聚合交换 没有 量身定时间隔。这种机制强制同时完成所有未完成的聚合交换。因此,在某些情况下,此机制可以在启动聚合交换后立即完成聚合交换。

completionFromBatchConsumer
当与支持 批处理消费者 机制的消费者端点结合使用时,此完成选项将根据从消费者端点接收的信息,此完成选项会自动查出当前一批交换。请参阅 “批处理消费者”一节
forceCompletionOnStop
启用此选项后,它会强制在当前路由上下文停止时完成所有未完成的聚合交换。

前面的完成条件可以任意组合,但 completionTimeoutcompletionInterval 条件除外,这些条件无法同时启用。当条件结合使用时,常规规则是触发的第一个完成条件是有效的完成条件。

指定完成 predicate

您可以指定任意 predicate 表达式,决定何时完成聚合交换。评估 predicate 表达式的方法有两种:

  • 在最新的聚合交换 occasionally这是默认行为。
  • 在启用 eagerCheckCompletion 选项时,选择最新的传入交换 cephfs-指代此行为。

例如,如果您想要在每次收到 ALERT 消息时终止 stock quotes 流(根据最新传入交换中的 MsgType 标头的值所示),您可以定义类似如下的路由:

from("direct:start")
    .aggregate(
      header("id"),
      new UseLatestAggregationStrategy()
    )
        .completionPredicate(
          header("MsgType").isEqualTo("ALERT")
         )
        .eagerCheckCompletion()
    .to("mock:result");

以下示例演示了如何使用 XML 配置相同的路由:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregate strategyRef="aggregatorStrategy"
               eagerCheckCompletion="true">
      <correlationExpression>
          <simple>header.StockSymbol</simple>
      </correlationExpression>
      <completionPredicate>
          <simple>$MsgType = 'ALERT'</simple>
      </completionPredicate>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>

指定动态完成超时

可以指定 动态完成超时,其中每个传入交换都会重新计算超时值。例如,若要从每个传入交换的 timeout 标头设置超时值,您可以按照如下所示定义路由:

from("direct:start")
    .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
        .completionTimeout(header("timeout"))
    .to("mock:aggregated");

您可以在 XML DSL 中配置相同的路由,如下所示:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <completionTimeout>
                <header>timeout</header>
            </completionTimeout>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
注意

如果动态值为 null0, 您也可以添加固定超时值,并且 Apache Camel 将回退到使用这个值。

指定动态完成大小

可以指定 动态完成大小,其中为每个传入交换重新计算完成大小。例如,若要从每个传入交换的 mySize 标头设置完成大小,您可以按照如下所示定义路由:

from("direct:start")
    .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
        .completionSize(header("mySize"))
    .to("mock:aggregated");

使用 Spring XML 的相同示例:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <completionSize>
                <header>mySize</header>
            </completionSize>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
注意

如果动态值为 null0, 您也可以添加固定大小值,并且 Apache Camel 回退以使用这个值。

强制从 AggregationStrategy 完成单个组

如果您实施了自定义 AggregationStrategy 类,有一个机制来强制完成当前消息组,方法是在从 AggregationStrategy.aggregate () 方法返回的交换时将 Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP Exchange 属性设置为 true。这个机制 仅影响 当前的组:其他消息组(具有不同的关联 ID) 不会被 强制完成。这种机制会覆盖任何其他完成机制,如 predicate、大小、超时等。

例如,如果消息正文大小大于 5,则以下示例 AggregationStrategy 类将完成当前的组:

// Java
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String body = oldExchange.getIn().getBody(String.class) + "+"
            + newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(body);
        if (body.length() >= 5) {
            oldExchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }
        return oldExchange;
    }
}

强制完成带有特殊消息的所有组

通过向路由发送带有特殊标头的消息,可以强制完成所有未完成的聚合消息。您可以使用两个替代的标头设置来强制完成:

Exchange.AGGREGATION_COMPLETE_ALL_GROUPS
设置为 true,以强制完成当前的聚合周期。此消息仅作为信号运作,不包含在 任何聚合周期中。在处理此信号消息后,消息的内容将被丢弃。
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE
设置为 true,以强制完成当前的聚合周期。这个消息 包含在 当前的聚合周期中。

使用 AggregateController

org.apache.camel.processor.aggregate.AggregateController 可让您使用 Java 或 JMX API 在运行时控制聚合。这可用于强制完成一组交换,或者查询当前的运行时统计数据。

如果没有配置自定义,则聚合器提供了一个默认实现,您可以使用 getAggregateController () 方法访问。但是,使用 aggregateController 在路由中配置控制器很容易。

private AggregateController controller = new DefaultAggregateController();

from("direct:start")
   .aggregate(header("id"), new MyAggregationStrategy()).completionSize(10).id("myAggregator")
      .aggregateController(controller)
      .to("mock:aggregated");

另外,您可以使用 AggregateController上的 API 来强制完成。例如,使用密钥 foo 完成组

int groups = controller.forceCompletionOfGroup("foo");

数字返回可以是已完成的组数。以下是一个 API 以完成所有组:

 int groups = controller.forceCompletionOfAllGroups();

强制唯一关联密钥

在一些聚合场景中,您可能需要强制关联密钥对于每批交换是唯一的条件。换句话说,当特定关联键的聚合交换完成时,您希望确保不允许与该关联密钥进一步的聚合交换。例如,如果路由的后面的部分应该与唯一关联键值交换,您可能希望强制实施此条件。

根据配置完成条件的方式,可能会有多个使用特定关联密钥生成的聚合交换的风险。例如,虽然您可能会定义一个完成 predicate,它设计为等待所有与特定相关性密钥交换的交换被接收,但您可能还会定义一个完成超时,这可以在与该密钥的所有交换到达之前触发。在这种情况下,相关的交换可能会增加 第二个 聚合交换,具有相同相关性的键值。

对于这样的情形,您可以通过设置 closeCorrelationKeyOnCompletion 选项,将聚合器配置为禁止聚合与之前关联密钥值的聚合交换。为了禁止重复的关联键值,聚合器需要记录之前的关联键值。此缓存的大小(缓存的关联键的数量)指定为 closeCorrelationKeyOnCompletion () DSL 命令的参数。要指定无限大小的缓存,您可以传递值零或负整数。例如,要指定 10000 个键值的缓存大小:

from("direct:start")
    .aggregate(header("UniqueBatchID"), new MyConcatenateStrategy())
        .completionSize(header("mySize"))
        .closeCorrelationKeyOnCompletion(10000)
    .to("mock:aggregated");

如果聚合交换使用重复的关联键值完成,则聚合器会抛出一个 ClosedCorrelationKeyException 异常。

使用 Simple 表达式进行流处理

您可以使用 Simple 语言表达式作为令牌,在 streaming 模式中使用 tokenizeXML 子命令。使用简单语言表达式将启用对动态令牌的支持。

例如,要使用 Java 将一系列按标签用户角色分离的名称分隔,您可以使用 令牌化XML bean 和简单语言令牌 将该文件拆分为 名称 元素。

public void testTokenizeXMLPairSimple() throws Exception {
        Expression exp = TokenizeLanguage.tokenizeXML("${header.foo}", null);

获取由 < person> 划分的名称的 输入字符串,并将 &lt ;person& gt; 设置为令牌。

        exchange.getIn().setHeader("foo", "<person>");
        exchange.getIn().setBody("<persons><person>James</person><person>Claus</person><person>Jonathan</person><person>Hadrian</person></persons>");

列出从输入中分离的名称。

        List<?> names = exp.evaluate(exchange, List.class);
        assertEquals(4, names.size());

        assertEquals("<person>James</person>", names.get(0));
        assertEquals("<person>Claus</person>", names.get(1));
        assertEquals("<person>Jonathan</person>", names.get(2));
        assertEquals("<person>Hadrian</person>", names.get(3));
    }

分组的交换

您可以将传出批处理中的所有聚合交换合并为一个 org.apache.camel.impl.GroupedExchange holder 类。要启用分组的交换,请指定 groupExchanges () 选项,如以下 Java DSL 路由中所示:

from("direct:start")
    .aggregate(header("StockSymbol"))
        .completionTimeout(3000)
        .groupExchanges()
    .to("mock:result");

发送到 mock:result 的分组交换列表包含消息正文中聚合交换的列表。以下代码行显示了后续处理器如何以列表的形式访问所分组交换的内容:

// Java
List<Exchange> grouped = ex.getIn().getBody(List.class);
注意

当您启用分组的交换功能时,不得 配置聚合策略(分组交换功能本身是一个聚合策略)。

注意

从传出交换上的属性访问分组交换的旧方法现已弃用,并将在以后的发行版本中删除。

批处理消费者

聚合器可以与 批处理消费者模式一同工作,以汇总批处理消费者 报告的消息总数(批处理消费者端点设置 CamelBatchSize、CamelBatchIndex、CamelBatchIndex、CamelBatchIndex 的属性)和 CamelBatchComplete 属性。例如,要聚合由文件消费者端点找到的所有文件,您可以使用类似如下的路由:

from("file://inbox")
    .aggregate(xpath("//order/@customerId"), new AggregateCustomerOrderStrategy())
    .completionFromBatchConsumer()
    .to("bean:processOrder");

目前,以下端点支持批处理使用者机制:file、FTP、邮件、iBatis 和 JPA。

持久聚合存储库

默认聚合器仅使用内存中的 AggregationRepository。如果要永久存储待处理的聚合交换,您可以使用 SQL 组件 作为持久聚合存储库。SQL 组件包含一个 JdbcAggregationRepository,它可即时保留聚合的信息,并确保您不会丢失任何信息。

成功处理交换时,当存储库上调用 确认方法时,它将标记为完成。这意味着,如果同一交换再次失败,它将重试,直到成功为止。

添加对 camel-sql 的依赖

要使用 SQL 组件,您必须在项目中包含对 camel-sql 的依赖项。例如,如果您使用 Maven pom.xml 文件:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-sql</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

创建聚合数据库表

您必须创建单独的聚合和已完成的数据库表以实现持久性。例如,以下查询为名为 my_aggregation_repo 的数据库创建表:

CREATE TABLE my_aggregation_repo (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 constraint aggregation_pk PRIMARY KEY (id)
);

CREATE TABLE my_aggregation_repo_completed (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 constraint aggregation_completed_pk PRIMARY KEY (id)
);
}

配置聚合存储库

您还必须在框架 XML 文件中配置聚合存储库(如 Spring 或 Blueprint):

<bean id="my_repo"
    class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
    <property name="repositoryName" value="my_aggregation_repo"/>
    <property name="transactionManager" ref="my_tx_manager"/>
    <property name="dataSource" ref="my_data_source"/>
    ...
</bean>

repositoryName事务管理器dataSource 属性是必需的。有关持久聚合存储库配置选项的详情,请参阅 Apache Camel 组件参考指南中的 SQL 组件

线程选项

图 8.6 “聚合器实施” 所示,聚合器与路由的后一部分分离,该交换发送到路由的后部分,由专用线程池处理。默认情况下,这个池仅包含一个线程。如果要指定有多个线程的池,启用 parallelProcessing 选项,如下所示:

from("direct:start")
    .aggregate(header("id"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
        .parallelProcessing()
    .to("mock:aggregated");

默认情况下,这会创建一个具有 10 个 worker 线程的池。

如果要对创建的线程池进行更多控制,请使用 executorService 选项指定自定义 java.util.concurrent.ExecutorService 实例(在这种情况下,不需要启用 并行处理 选项)。

聚合到列表

常见的聚合方案涉及将一系列传入的消息正文整合到 List 对象中。为便于这种情况,Apache Camel 提供了 AbstractListAggregationStrategy 抽象类,您可以快速扩展以针对此案例创建聚合策略。传入类型消息 T 聚合到完成的交换中,消息正文类型为 List<T>

例如,要将一系列 Integer 消息正文聚合到一个 List<Integer& gt; 对象中,您可以使用如下定义的聚合策略:

import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
...
/**
 * Strategy to aggregate integers into a List<Integer>.
 */
public final class MyListOfNumbersStrategy extends AbstractListAggregationStrategy<Integer> {
 
    @Override
    public Integer getValue(Exchange exchange) {
        // the message body contains a number, so just return that as-is
        return exchange.getIn().getBody(Integer.class);
    }
}

聚合器选项

聚合器支持以下选项:

表 8.1. 聚合器选项
选项默认值描述

correlationExpression

 

评估用于聚合的关联密钥的强制表达式。具有相同关联密钥的 Exchange 聚合在一起。如果无法评估关联密钥,则将抛出异常。您可以使用 ignoreBadCorrelationKeys 选项禁用它。

aggregationStrategy

 

强制 AggregationStrategy,用于将传入的 Exchange 与已合并的交换合并。首次调用 oldExchange 参数为空 。在随后的调用中,oldExchange 包含合并的交换,newExchange 是新传入的 Exchange。从 Camel 2.9.2 开始,策略可以是 TimeoutAwareAggregationStrategy 实现,它支持超时回调。从 Camel 2.16 开始,策略也可以是 PreCompletionAwareAggregationStrategy 实施。它在预完成模式下运行完成检查。

strategyRef

 

在 Registry 中查找 AggregationStrategy 的引用。

completionSize

 

聚合完成前聚合的消息数。这个选项可以被设置为固定值或使用表达式,允许您动态评估大小 - 将以结果使用 Integer。如果两个都被设置为 null0, 则 Camel 将回退为使用固定值。

completionTimeout

 

mill 表示聚合交换在完成前应处于不活跃的时间。这个选项可以被设置为固定值或使用表达式,允许您动态评估超时 - 因此将使用 Long。如果两个都被设置为 null0, 则 Camel 将回退为使用固定值。您不能将这个选项与 completionInterval 一起使用,只能同时使用这两者之一。

completionInterval

 

在 millis 中重复此操作,聚合器将完成所有当前的聚合交换。Camel 有一个后台任务,会在每个期间触发。您不能将这个选项与 completionTimeout 一起使用,只能使用其中之一。

completionPredicate

 

指定 predicate (来自 org.apache.camel.Predicate 类型),它在聚合交换完成时信号。另外,您还可以定义一个实现 Predicate 接口的自定义 AggregationStrategy,在这种情况下,AggregationStrategy 将用作完成 predicate。

completionFromBatchConsumer

false

如果交换来自 Batch Consumer,则这个选项。然后在启用 第 8.5 节 “聚合器” 时,将使用消息标题 CamelBatchSize 中由 Batch Consumer 决定的批处理大小。请参阅 Batch Consumer 的更多详细信息。这可用于聚合给定轮询中查看 文件 端点的所有文件。

eagerCheckCompletion

false

在收到新的传入交换时,是否预先检查完成。这个选项会影响 completionPredicate 选项的行为,因为交换会相应地传递更改。当为 Predicate 中传递的 Exchange false 时,聚合的 Exchange 是聚合的 Exchange,这意味着您可以存储来自 AggregationStrategy 的聚合交换中的任何信息供 Predicate 使用。真正在 Predicate 中传递的 Exchange 是 进入 的 Exchange,这意味着您可以从传入的 Exchange 访问数据。

forceCompletionOnStop

false

如果为 true,在当前路由上下文停止时完成所有聚合交换。

groupExchanges

false

如果启用,Camel 会将所有聚合的 Exchange 分组到一个合并的 org.apache.camel.impl.GroupedExchange 拥有者类,其中包含所有聚合的 Exchanges。因此,只有一个 Exchange 才会从聚合器中发送出一个 Exchange。可用于将许多传入 Exchange 组合为单个输出交换,而无需自己对自定义 AggregationStrategy 进行编码。

ignoreInvalidCorrelationKeys

false

是否忽略无法被评估为值的关联键。默认情况下,Camel 将抛出一个例外项,但您可以启用这个选项并忽略这种情况。

closeCorrelationKeyOnCompletion

 

是否应该 接受 交换。您可以启用此功能,表明是否已完成关联密钥,然后与同一关联密钥的任何新交换都会被拒绝。然后 Camel 将抛出一个 封闭的CorrelationKeyException 异常。使用此选项时,传递一个 整数,它是 LRUCache 的数字,这样可保留最后的、封闭的关联键数。您可以传递 0 或一个负值来指示未绑定的缓存。通过使用数字,如果您使用的是不同相关性键的日志,请确保缓存不会增长太大。

discardOnCompletionTimeout

false

Camel 2.5: 应丢弃因为超时而完成的交换器。如果启用,则当超时发生聚合的消息时,不会 发出但丢弃(断开连接)。

aggregationRepository

 

允许您自己实施 org.apache.camel.spi.AggregationRepository,可跟踪当前航班聚合交换的跟踪。Camel 默认使用基于内存的实现。

aggregationRepositoryRef

 

在 registry 中查找 聚合Repository 的引用。

parallelProcessing

false

当聚合完成后,它们会从聚合器中发送。这个选项指明 Camel 是否应使用具有多个线程的线程池来并发。如果没有指定自定义线程池,Camel 会创建一个包含 10 个并发线程的默认池。

executorService

 

如果使用 parallelProcessing,您可以指定要使用的自定义线程池。事实上,如果您不使用 并行处理 此自定义线程池来发送聚合交换的并行处理。

executorServiceRef

 

在 Registry 中查找 executorService 的引用

timeoutCheckerExecutorService

 

如果使用一个 completionTimeoutcompletionTimeoutExpressioncompletionInterval 选项之一,则会创建一个后台线程来检查每个聚合器的完成。设置这个选项,以提供要使用的自定义线程池,而不是为每个聚合器创建新线程。

timeoutCheckerExecutorServiceRef

 

在注册表中查找 timeoutCheckerExecutorService 的引用。

completeAllOnStop

 

当您停止聚合器时,此选项可以从聚合存储库完成所有待处理的交换。

optimisticLocking

false

打开光纤锁定,该锁定可与聚合存储库结合使用。

optimisticLockRetryPolicy

 

为光转锁定配置重试策略。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.