8.5. 聚合器
概述
通过 图 8.5 “聚合器模式” 中显示的 聚合器 模式,您可以将相关消息的批处理合并到单个消息中。
图 8.5. 聚合器模式
要控制聚合器的行为,Apache Camel 允许您指定 Enterprise Integration Patterns 中描述的属性,如下所示:
- correlation 表达式 criu- iwl 决定哪些消息应聚合在一起。每个传入消息上评估了关联表达式,以生成 关联密钥。然后,具有相同关联键的传入消息被分组到同一批处理中。例如,如果要将 所有传入 的消息聚合到一个消息中,您可以使用恒定表达式。
- 当消息批处理完成后,completeness condition requiredness condition Determines.您可以将它指定为一个简单的大小限制,或者通常可以指定批处理完成后标记的 predicate 条件。
- 聚合算法 criu-wagon 将单个关联密钥的消息交换组合到单个消息交换中。
例如,假设一个库存市场数据系统每秒接收 30,000 个消息。如果您的 GUI 工具无法应对此类大规模更新率,您可能希望减慢消息流。传入的库存引号可以通过选择最新的引号并丢弃较旧的价格而一起聚合。(如果您想要捕获一些历史记录,您可以应用 delta 处理算法。)
聚合器现在使用包含更多信息的 ManagedAggregateProcessorMBean 在 JMX 中加入。它允许您使用聚合控制器来控制它。
聚合器的工作方式
图 8.6 “聚合器实施” 显示了聚合器的工作方式的概述,假设它是具有关联键(如 A、B、C 或 D)的交换流。
图 8.6. 聚合器实施
图 8.6 “聚合器实施” 中显示的交换流按如下处理:
- correlator 负责根据关联密钥对交换进行排序。对于每个传入的交换,评估关联表达式,生成关联密钥。例如,对于 图 8.6 “聚合器实施” 中显示的交换,关联键评估为 A。
聚合策略 负责合并具有相同关联密钥的交换。当一个新交换时,会进入 A,聚合器会在 聚合存储库中查找对应的聚合交换 A,并将其与新交换合并。
在特定的聚合周期完成前,传入的交换会与相应的聚合交换持续聚合。聚合周期持续到由其中一个完成机制终止为止。
注意从 Camel 2.16,新的 XSLT Aggregation 策略允许您将两个消息与 XSLT 文件合并。您可以从 toolbox 访问
AggregationStrategies.xslt ()
文件。如果在聚合器上指定了 completion predicate,则会测试聚合交换,以确定它是否准备好发送到路由中的下一个处理器。处理继续如下:
- 如果完成,则聚合交换由路由的后部分处理。有两种替代模型: 同步 (默认),这会导致调用线程阻止或 异步 (如果启用了并行处理),其中聚合交换将提交到 executor 线程池(如 图 8.6 “聚合器实施”所示)。
- 如果没有完成,聚合交换会保存回聚合存储库。
-
与同步完成测试并行,可以通过启用
completionTimeout
选项或completionInterval
选项来启用异步完成测试。这些完成测试在单独的线程中运行,并在满足完成测试时,对应的交换被标记为完成,并开始由路由的后部分处理(根据并行处理是启用并行处理还是异步处理)。 - 如果启用了并行处理,则线程池负责处理路由后者中的交换。默认情况下,这个线程池包含十个线程,但您可以选择自定义池(“线程选项”一节)。
Java DSL 示例
以下示例使用 UseLatestAggregationStrategy
聚合策略来聚合具有相同 StockSymbol
标头值的交换。对于给定 StockSymbol
值,如果收到该关联键的最后三秒以上,则聚合交换被视为完成并发送到 模拟
端点。
from("direct:start") .aggregate(header("id"), new UseLatestAggregationStrategy()) .completionTimeout(3000) .to("mock:aggregated");
XML DSL 示例
以下示例演示了如何在 XML 中配置相同的路由:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000"> <correlationExpression> <simple>header.StockSymbol</simple> </correlationExpression> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>
指定关联表达式
在 Java DSL 中,关联表达式始终作为第一个参数传递给 aggregate ()
DSL 命令。您不仅限于在此处使用简单表达式语言。您可以使用任何表达式语言或脚本语言(如 XPath、XQuery、SQL 等)指定关联表达式。
对于考试,要使用 XPath 表达式关联交换,您可以使用以下 Java DSL 路由:
from("direct:start") .aggregate(xpath("/stockQuote/@symbol"), new UseLatestAggregationStrategy()) .completionTimeout(3000) .to("mock:aggregated");
如果无法在特定传入的交换上评估关联表达式,则聚合器会默认引发 CamelExchangeException
。您可以通过设置 ignoreInvalidCorrelationKeys
选项来阻止此异常。例如,在 Java DSL 中:
from(...).aggregate(...).ignoreInvalidCorrelationKeys()
在 XML DSL 中,您可以将 ignoreInvalidCorrelationKeys
选项设置为属性,如下所示:
<aggregate strategyRef="aggregatorStrategy" ignoreInvalidCorrelationKeys="true" ...> ... </aggregate>
指定聚合策略
在 Java DSL 中,您可以将聚合策略作为第二个参数传递给 aggregate ()
DSL 命令,或使用 aggregate Strategy ()
子句指定它。例如,您可以使用 aggregationStrategy ()
子句,如下所示:
from("direct:start") .aggregate(header("id")) .aggregationStrategy(new UseLatestAggregationStrategy()) .completionTimeout(3000) .to("mock:aggregated");
Apache Camel 提供以下基本聚合策略(类属于 org.apache.camel.processor.aggregate
Java 软件包):
UseLatestAggregationStrategy
- 返回给定关联密钥的最后一个交换,从而丢弃所有之前与此密钥的交换。例如,此策略对于从股票交易中节流源很有用,您只想知道特定库存符号的最新价格。
UseOriginalAggregationStrategy
-
返回给定关联密钥的第一个交换,丢弃所有之后与此密钥的交换。您必须通过调用
UseOriginalAggregationStrategy.setOriginal ()
来设置第一个交换,然后才能使用此策略。 GroupedExchangeAggregationStrategy
-
将给定关联密钥 的所有 交换连接到列表中,该列表存储在
Exchange.GROUPED_EXCHANGE
Exchange 属性中。请参阅 “分组交换”一节。
实现自定义聚合策略
如果要应用不同的聚合策略,您可以实现以下聚合策略基本接口之一:
org.apache.camel.processor.aggregate.AggregationStrategy
- 基本聚合策略接口。
org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy
如果您希望实施在聚合周期超时时收到通知,请实施此接口。
超时
通知方法有以下签名:void timeout(Exchange oldExchange, int index, int total, long timeout)
org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy
如果您希望实施在聚合周期正常完成时收到通知,请实施此接口。通知方法有以下签名:
void onCompletion(Exchange exchange)
例如,以下代码显示了两个不同的自定义聚合策略,即 StringAggregationStrategy
和 ArrayListAggregationStrategy
::
//simply combines Exchange String body values using '' as a delimiter class StringAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(oldBody + "" + newBody); return oldExchange; } } //simply combines Exchange body values into an ArrayList<Object> class ArrayListAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(); ArrayList<Object> list = null; if (oldExchange == null) { list = new ArrayList<Object>(); list.add(newBody); newExchange.getIn().setBody(list); return newExchange; } else { list = oldExchange.getIn().getBody(ArrayList.class); list.add(newBody); return oldExchange; } } }
从 Apache Camel 2.0 开始,为非常第一个交换调用 AggregationStrategy.aggregate ()
回调方法。在聚合方法第一次调用中,oldExchange
参数为 null
,newExchange
参数包含第一个传入交换。
要使用自定义策略类 ArrayListAggregationStrategy
来聚合信息,请定义类似如下的路由:
from("direct:start") .aggregate(header("StockSymbol"), new ArrayListAggregationStrategy()) .completionTimeout(3000) .to("mock:result");
您还可以使用 XML 中的自定义聚合策略配置路由,如下所示:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000"> <correlationExpression> <simple>header.StockSymbol</simple> </correlationExpression> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="com.my_package_name.ArrayListAggregationStrategy"/>
控制自定义聚合策略的生命周期
您可以实施自定义聚合策略,以便其生命周期与控制它的企业集成模式的生命周期一致。这可用于确保聚合策略可以安全关闭。
要使用生命周期支持实施聚合策略,您必须实现 org.apache.camel.Service
接口(除 AggregationStrategy
接口之外),并提供 start ()
和 stop ()
生命周期方法的实现。例如,以下代码示例显示了具有生命周期支持的聚合策略概述:
// Java import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.Service; import java.lang.Exception; ... class MyAggStrategyWithLifecycleControl implements AggregationStrategy, Service { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { // Implementation not shown... ... } public void start() throws Exception { // Actions to perform when the enclosing EIP starts up ... } public void stop() throws Exception { // Actions to perform when the enclosing EIP is stopping ... } }
Exchange 属性
每个聚合交换上设置以下属性:
标头 | 类型 | 描述 Aggregated Exchange Properties |
---|---|---|
|
| 聚合到此交换的交换总数。 |
|
|
指明负责完成聚合交换的机制。可能的值有: |
以下属性由 SQL 组件聚合存储库红色设置(请参阅 “持久性聚合存储库”一节):
标头 | 类型 | 描述 Redelivered Exchange Properties |
---|---|---|
|
|
当前重新发送尝试的序列号(从 |
指定完成条件
必须至少 指定一个完成条件,这决定了聚合交换何时离开聚合器,并继续路由上的下一个节点。可以指定以下完成条件:
completionPredicate
-
聚合每个交换后评估 predicate,以确定完整性。值
true
表示聚合交换已完成。另外,您还可以定义实现Predicate
接口的自定义AggregationStrategy
而不是设置这个选项,在这种情况下,AggregationStrategy
将用作 completion predicate。 completionSize
- 在聚合指定数量的传入交换后完成聚合交换。
completionTimeout
(与
completionInterval
兼容) 如果指定的超时内没有聚合交换,则完成聚合交换。换句话说,超时机制会跟踪 每个 关联键值的超时时间。时钟在收到特定键值的最新交换后开始循环。如果没有在指定的超时时间内收到具有相同键值的另一个交换,则对应的聚合交换被标记为 complete,并发送到路由上的下一个节点。
completionInterval
(与
completionTimeout
兼容) 在每次时间间隔后(指定长度) 完成所有 未完成的聚合交换。没有 为每个聚合交换量身定制时间间隔。这种机制强制完成所有未完成的聚合交换。因此,在某些情况下,此机制可以在启动聚合后立即完成聚合交换。
completionFromBatchConsumer
- 当与支持 批处理消费者 机制的消费者端点结合使用时,此完成选项会根据它从消费者端点接收的信息,在当前批处理完成后自动找出出的。请参阅 “批处理消费者”一节。
forceCompletionOnStop
- 启用此选项后,它会在当前路由上下文停止时强制完成所有未完成的聚合交换。
前面的完成条件可以任意组合使用,但 completionTimeout
和 completionInterval
条件除外,它们不能同时启用。当条件组合使用时,常规规则是要触发的第一个完成条件是有效的完成条件。
指定 completion predicate
您可以指定一个任意 predicate 表达式,来确定聚合交换完成后。评估 predicate 表达式的方法有两种:
- 在最新的聚合交换上 ,是默认的行为。
-
在 最新的传入交换 iwl-MIRROR this behavior 上 ,当您启用
eagerCheckCompletion
选项时,会选择此行为。
例如,如果要在每次收到 ALERT
消息时终止库存引号流(由最新传入的交换中的 MsgType
标头值表示),您可以定义一个类似如下的路由:
from("direct:start") .aggregate( header("id"), new UseLatestAggregationStrategy() ) .completionPredicate( header("MsgType").isEqualTo("ALERT") ) .eagerCheckCompletion() .to("mock:result");
以下示例演示了如何使用 XML 配置相同的路由:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy" eagerCheckCompletion="true"> <correlationExpression> <simple>header.StockSymbol</simple> </correlationExpression> <completionPredicate> <simple>$MsgType = 'ALERT'</simple> </completionPredicate> <to uri="mock:result"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>
指定动态完成超时
可以指定 动态完成超时,其中为每个传入的交换重新计算超时值。例如,若要从每个传入交换中的 timeout
标头设置超时值,您可以定义路由,如下所示:
from("direct:start") .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy()) .completionTimeout(header("timeout")) .to("mock:aggregated");
您可以在 XML DSL 中配置相同的路由,如下所示:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy"> <correlationExpression> <simple>header.StockSymbol</simple> </correlationExpression> <completionTimeout> <header>timeout</header> </completionTimeout> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
如果动态值为 null
或 0,
您也可以添加固定的超时值,Apache Camel 将回退到使用这个值。
指定动态完成大小
可以指定 动态完成大小,每个传入的交换都会重新计算完成大小。例如,若要从每个传入交换中的 mySize
标头设置完成大小,您可以定义路由,如下所示:
from("direct:start") .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy()) .completionSize(header("mySize")) .to("mock:aggregated");
使用 Spring XML 的同一示例:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <aggregate strategyRef="aggregatorStrategy"> <correlationExpression> <simple>header.StockSymbol</simple> </correlationExpression> <completionSize> <header>mySize</header> </completionSize> <to uri="mock:aggregated"/> </aggregate> </route> </camelContext> <bean id="aggregatorStrategy" class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
如果动态值为 null
或 0,
您也可以添加固定的大小值,Apache Camel 将回退到使用这个值。
在 AggregationStrategy 中强制完成单个组
如果您实施自定义 AggregationStrategy
类,可以通过将 Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP
交换属性设置为 true
来强制完成当前消息组。这个机制 只会影响 当前组:其他消息组(具有不同关联 ID) 不会被 强制完成。此机制会覆盖任何其他完成机制,如 predicate、大小、超时等。
例如,如果消息正文大小大于 5,则以下示例 AggregationStrategy
类完成当前的组:
// Java public final class MyCompletionStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class) + "+" + newExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body); if (body.length() >= 5) { oldExchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true); } return oldExchange; } }
强制完成带有特殊消息的所有组
通过将带有特殊标头的消息发送到路由,可以强制完成所有未完成的聚合消息。您可以使用两个替代的标头设置来强制完成:
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS
-
设置为
true
,以强制完成当前聚合周期。此消息仅充当信号,不包含在 任何聚合周期中。处理此信号消息后,消息的内容将被丢弃。 Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE
-
设置为
true
,以强制完成当前聚合周期。此消息 包含在 当前的聚合周期中。
使用 AggregateController
org.apache.camel.processor.aggregate.AggregateController 可让您在运行时使用 Java 或 JMX API 控制聚合。这可用于强制完成一组交换,或查询当前的运行时统计信息。
如果没有配置自定义,聚合器提供了一个默认实现,您可以使用 getAggregateController ()
方法访问它。但是,使用 aggregateController 在路由中配置控制器。
private AggregateController controller = new DefaultAggregateController(); from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()).completionSize(10).id("myAggregator") .aggregateController(controller) .to("mock:aggregated");
另外,您可以使用 AggregateController
上的 API 来强制完成。例如,使用键 foo 完成组
int groups = controller.forceCompletionOfGroup("foo");
返回的数量将是完成的组数量。以下是完成所有组的 API:
int groups = controller.forceCompletionOfAllGroups();
强制唯一关联键
在一些聚合场景中,您可能想要强制执行相关的键对于每个批处理都是唯一的的条件。换句话说,当特定关联密钥的聚合交换完成时,您要确保不允许进一步聚合与该关联键的交换。例如,如果路由的后者部分需要处理具有唯一关联键值的交换,您可能希望强制执行此条件。
根据配置完成条件的方式,使用特定关联密钥生成多个聚合交换的风险。例如,虽然您可以定义一个 completion predicate,它旨在 等待所有 与特定关联键的交换都被接收,但您也可以定义完成超时,该超时可以在所有使用该键的交换之前触发。在这种情况下,较晚的交换可能会提高与具有相同关联键值 的第二个 聚合交换。
在这种情况下,您可以通过设置 closeCorrelationKeyOnCompletion
选项,将聚合器配置为阻止之前相关的键值的聚合交换。为了抑制重复关联键值,聚合器需要在缓存中记录以前的键值。此缓存的大小(缓存的关联密钥数)被指定为 closeCorrelationKeyOnCompletion ()
DSL 命令的参数。要指定无限大小的缓存,您可以传递值零或负整数。例如,指定 10000
键值的缓存大小:
from("direct:start") .aggregate(header("UniqueBatchID"), new MyConcatenateStrategy()) .completionSize(header("mySize")) .closeCorrelationKeyOnCompletion(10000) .to("mock:aggregated");
如果聚合交换以重复的关联键值完成,则聚合器会引发 ClosedCorrelationKeyException
异常。
使用简单表达式进行基于流的处理
您可以在流传输模式中使用简单语言表达式作为令牌以及 tokenizeXML
子命令。使用简单语言表达式将启用对动态令牌的支持。
例如,要使用 Java 分割由标签 人员
减少的名称序列,您可以使用令牌化 XML
bean 和简单语言令牌将文件分成 name
元素。
public void testTokenizeXMLPairSimple() throws Exception { Expression exp = TokenizeLanguage.tokenizeXML("${header.foo}", null);
获取由 < person
> 分离的名称的输入字符串,并将 < ;person&
gt; 设置为令牌。
exchange.getIn().setHeader("foo", "<person>"); exchange.getIn().setBody("<persons><person>James</person><person>Claus</person><person>Jonathan</person><person>Hadrian</person></persons>");
列出从输入中分离的名称。
List<?> names = exp.evaluate(exchange, List.class); assertEquals(4, names.size()); assertEquals("<person>James</person>", names.get(0)); assertEquals("<person>Claus</person>", names.get(1)); assertEquals("<person>Jonathan</person>", names.get(2)); assertEquals("<person>Hadrian</person>", names.get(3)); }
分组交换
您可以将传出批处理中的所有聚合交换组合成单个 org.apache.camel.impl.GroupedExchange
holder 类。要启用分组交换,请指定 groupExchanges ()
选项,如以下 Java DSL 路由所示:
from("direct:start") .aggregate(header("StockSymbol")) .completionTimeout(3000) .groupExchanges() .to("mock:result");
发送到 mock:result
的分组交换列表包含消息正文中聚合交换的列表。以下行显示后续处理器如何以列表的形式访问分组交换的内容:
// Java List<Exchange> grouped = ex.getIn().getBody(List.class);
当您启用分组交换功能时,不得 配置聚合策略(分组的交换功能本身是一个聚合策略)。
从传出交换上的属性访问分组交换的旧方法现已弃用,并将在以后的发行版本中删除。
批处理消费者
聚合器可以与 批处理消费者 模式协同工作,以聚合批处理消费者报告的消息总数(批处理消费者端点设置 CamelBatchSize
、CamelBatchIndex
、CamelBatchComplete
属性在传入交换上)。例如,要聚合文件消费者端点找到的所有文件,您可以使用如下路由:
from("file://inbox") .aggregate(xpath("//order/@customerId"), new AggregateCustomerOrderStrategy()) .completionFromBatchConsumer() .to("bean:processOrder");
目前,以下端点支持批处理消费者机制:file、FTP、Mail、iBatis 和 JPA。
持久性聚合存储库
默认聚合器仅使用内存 聚合存储库
。如果要永久存储待处理的聚合交换,您可以使用 SQL 组件作为 持久聚合存储库。SQL 组件包含一个 JdbcAggregationRepository
,它会持续聚合消息,并确保您不会丢失任何消息。
成功处理交换后,在存储库上调用 confirm
方法时,它标记为 complete。这意味着,如果同一交换再次失败,它将被重试,直到成功为止。
添加对 camel-sql 的依赖
要使用 SQL 组件,您必须在项目中包含对 camel-sql
的依赖项。例如,如果您使用 Maven pom.xml
文件:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-sql</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
创建聚合数据库表
您必须创建单独的聚合和已完成的数据库表,以实现持久性。例如,以下查询会为名为 my_aggregation_repo
的数据库创建表:
CREATE TABLE my_aggregation_repo ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE my_aggregation_repo_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) ); }
配置聚合存储库
您还必须在框架 XML 文件中配置聚合存储库(如 Spring 或 Blueprint):
<bean id="my_repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="my_aggregation_repo"/> <property name="transactionManager" ref="my_tx_manager"/> <property name="dataSource" ref="my_data_source"/> ... </bean>
需要 repositoryName
、transactionManager
和 dataSource
属性。有关持久聚合存储库的更多详细信息,请参阅 Apache Camel 组件参考指南 中的 SQL 组件。
线程选项
如 图 8.6 “聚合器实施” 所示,聚合器与路由的后者部分分离,其中发送到路由的交换由专用线程池处理。默认情况下,这个池仅包含一个线程。如果要指定多个线程的池,请启用 parallelProcessing
选项,如下所示:
from("direct:start") .aggregate(header("id"), new UseLatestAggregationStrategy()) .completionTimeout(3000) .parallelProcessing() .to("mock:aggregated");
默认情况下,这会创建一个具有 10 个 worker 线程的池。
如果要对创建的线程池进行更多控制,请使用 executorService
选项指定自定义 java.util.concurrent.ExecutorService
实例(在这种情况下,不需要启用 parallelProcessing
选项)。
聚合到列表
常见的聚合场景涉及将一系列传入的消息聚合到 List
对象。为方便这种情况,Apache Camel 提供了 AbstractListAggregationStrategy
抽象类,您可以快速扩展为本例创建聚合策略。传入消息正文( T
)被聚合到完成的交换中,消息正文为 List<T>
。
例如,要将一系列 Integer
消息正文聚合到一个 List<Integer&
gt; 对象中,您可以使用定义的聚合策略:
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy; ... /** * Strategy to aggregate integers into a List<Integer>. */ public final class MyListOfNumbersStrategy extends AbstractListAggregationStrategy<Integer> { @Override public Integer getValue(Exchange exchange) { // the message body contains a number, so just return that as-is return exchange.getIn().getBody(Integer.class); } }
聚合器选项
聚合器支持以下选项:
选项 | 默认 | 描述 |
---|---|---|
|
评估用于聚合的关联键的强制表达式。具有相同关联键的 Exchange 聚合在一起。如果无法评估关联密钥,则会抛出 Exception。您可以使用 | |
|
强制 | |
|
在 Registry 中查找 | |
|
聚合完成前聚合的消息数量。这个选项可以设置为固定值,或使用一个表达式来动态评估大小 - 将因此使用 | |
|
中间时间,聚合交换应在完成前不活跃。这个选项可以设置为固定值,或使用允许您动态评估超时的 Expression 进行设置 - 将使用 | |
| 在 millis 中重复周期,聚合器将完成所有当前的聚合交换。Camel 有一个后台任务,每个期间都会触发。您不能将这个选项与 completionTimeout 一同使用,只能使用其中一个选项。 | |
|
指定 predicate ( | |
|
|
这个选项是交换来自 Batch Consumer。然后,当启用 第 8.5 节 “聚合器” 时,使用由消息标头 |
|
|
在收到新的传入的交换时,是否会被强制检查是否有完成。这个选项会影响 |
|
|
如果为 |
|
|
如果启用,Camel 会将所有聚合的交换分组到一个组合的 |
|
| 是否要忽略无法评估为值的关联键。默认情况下,Camel 将抛出例外,但您可以启用这个选项并忽略这种情况。 |
|
是否应该 接受 相关的交换。您可以启用它来指示是否关联密钥已经完成,则拒绝具有相同关联密钥的任何新交换。然后 Camel 将抛出一个 | |
|
| Camel 2.5: 是否应该丢弃因为超时而完成的交换。如果启用,则当超时发生超时时,聚合的消息 不会 发出,而是被丢弃(丢弃)。 |
|
允许您自己自己实施 | |
|
在 Registry 中查找 | |
|
| 当聚合完成后,它们会从聚合器中发送。这个选项指示 Camel 是否应该将具有多个线程的线程池用于并发。如果没有指定自定义线程池,则 Camel 会创建一个具有 10 个并发线程的默认池。 |
|
如果使用 | |
|
在 Registry 中查找 | |
|
如果使用其中一个 | |
|
在 registry 中查找 | |
| 当您停止聚合器时,这个选项允许它从聚合存储库完成所有待处理的交换。 | |
|
| 打开 optimistic locking,它可与聚合存储库结合使用。 |
| 为 optimistic locking 配置重试策略。 |