38.7.5. 其他操作


38.7.5.1. aggregate

使用正文中包含的给定管道执行聚合。聚合可能比较长且重度的操作。谨慎使用。

// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate");
List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
        group("$scientist", sum("count", 1)));
from("direct:aggregate")
    .setBody().constant(aggregate)
    .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate")
    .to("mock:resultAggregate");

支持以下 IN 消息标头:

标头密钥快速持续描述(从 MongoDB API doc中提取)预期类型

CamelMongoDbBatchSize

MongoDbConstants.BATCH_SIZE

设置每个批处理要返回的文档数量。

int/Integer

CamelMongoDbAllowDiskUse

MongoDbConstants.ALLOW_DISK_USE

启用聚合管道阶段,将数据写入临时文件。

boolean/Boolean

默认情况下,返回所有结果的列表。根据结果的大小,这可能会对内存进行重度。更安全的选择是设置您的 outputType=MongoIterable。下一个处理器会在消息正文中看到一个可迭代状态,允许它逐个完成结果。因此,设置批处理大小并返回可迭代操作,以便有效地检索和处理结果。

例如:

List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
        group("$scientist", sum("count", 1)));
from("direct:aggregate")
    .setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
    .setBody().constant(aggregate)
    .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable")
    .split(body())
    .streaming()
    .to("mock:resultAggregate");

请注意,调用 .split(body()) 足以将路由一发回,但是它仍然会先将所有条目加载到内存中。因此,需要通过批处理将数据加载到内存中,需要调用 .streaming()

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.