38.7.5. 其他操作
38.7.5.1. 聚合 复制链接链接已复制到粘贴板!
对正文中包含的给定管道执行聚合。聚合可能比较长且繁重的操作。请谨慎使用。
// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate");
List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
group("$scientist", sum("count", 1)));
from("direct:aggregate")
.setBody().constant(aggregate)
.to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate")
.to("mock:resultAggregate");
支持以下 IN 消息标头:
| 标头键 | 快速持续 | 描述(从 MongoDB API 文档中提取) | 预期类型 |
|---|---|---|---|
|
|
| 设置每个批处理要返回的文档数量。 | int/Integer |
|
|
| 启用聚合管道阶段将数据写入临时文件。 | boolean/Boolean |
默认情况下,返回所有结果的列表。根据结果的大小,这可能会给内存造成重度。更安全的替代方案是设置您的 outputType=MongoIterable。下一处理器会在消息正文中看到可迭代性,以便它逐一逐步执行结果。因此设置批处理大小并返回可移动性,从而可以有效地检索和处理结果。
例如:
List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
group("$scientist", sum("count", 1)));
from("direct:aggregate")
.setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
.setBody().constant(aggregate)
.to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable")
.split(body())
.streaming()
.to("mock:resultAggregate");
请注意,调用 .split (body ()) 足以逐个发送路由的条目,但它仍然会首先将所有条目加载到内存中。因此,需要调用 .streaming () 来通过批处理将数据加载到内存中。