230.6.5. 其他操作
230.6.5.1. aggregate
使用正文中包含的给定管道执行聚合。聚合可能比较长且重度的操作。谨慎使用。
// route: from("direct:aggregate").to("mongodb3:myDb?database=science&collection=notableScientists&operation=aggregate"); List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", group("$scientist", sum("count", 1))); from("direct:aggregate") .setBody().constant(aggregate) .to("mongodb3:myDb?database=science&collection=notableScientists&operation=aggregate") .to("mock:resultAggregate");
支持以下 IN 消息标头:
标头密钥 | 快速持续 | 描述(从 MongoDB API doc中提取) | 预期类型 |
---|---|---|---|
|
| 设置每个批处理要返回的文档数量。 | int/Integer |
|
| 启用聚合管道阶段,将数据写入临时文件。 | boolean/Boolean |
默认情况下,返回所有结果的列表。根据结果的大小,这可能会对内存进行重度。更安全的选择是设置您的 outputType=MongoIterable。下一个处理器会在消息正文中看到一个可迭代状态,允许它逐个完成结果。因此,设置批处理大小并返回可迭代操作,以便有效地检索和处理结果。
您还可以通过包含 outputType=DBCursor (Camel 2.21+)作为 endpoint 选项,从服务器返回至您的路由的文档可能比设置上述标头更简单。这了来自 Mongo 驱动程序的 DBCursor,就像您在 Mongo shell 中执行 aggregate ()一样,允许您的路由迭代结果。默认情况下,如果不使用此选项,此组件会将驱动程序的光标中的文档加载到列表,并将此数据返回到您的路由 - 这可能导致大量内存对象。请记住,使用 DBCursor 并不要求提供匹配的文档数量 - 请参阅 MongoDB 文档网站了解详细信息。
带有选项 outputType=MongoIterable 和 batch 大小示例:
// route: from("direct:aggregate").to("mongodb3:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable"); List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", group("$scientist", sum("count", 1))); from("direct:aggregate") .setHeader(MongoDbConstants.BATCH_SIZE).constant(10) .setBody().constant(aggregate) .to("mongodb3:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable") .split(body()) .streaming() .to("mock:resultAggregate");
请注意,调用 .split(body())
足以将路由一发回,但是它仍然会先将所有条目加载到内存中。因此,需要通过批处理将数据加载到内存中,需要调用 .streaming()
。