35.7. MongoDB 操作 - 制作者端点


35.7.1. 查询操作

35.7.1.1. findById

此操作仅检索一个元素,其 _id 字段与 IN 消息正文的内容匹配。incoming 对象可以是等同于 Bson 类型的任何对象。请参阅 http://bsonspec.org/spec.htmlhttp://www.mongodb.org/display/DOCS/Java+Types。

from("direct:findById")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findById")
    .to("mock:resultFindById");

请注意,默认的 _id 由 Mongo 视为 和 ObjectId 类型,因此您可能需要正确转换。

from("direct:findById")
    .convertBodyTo(ObjectId.class)
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findById")
    .to("mock:resultFindById");
注意

支持可选参数
。此操作支持投射操作器。请参阅 指定字段过滤器(项目 )。

35.7.1.2. findOneByQuery

从与 MongoDB 查询选择器匹配的集合中检索第一个元素。如果设置了 CamelMongoDbCriteria 标头,则其值将用作查询选择器。如果 CamelMongoDbCriteria 标头为 null,则将使用 IN 消息正文作为查询选择器。在这两种情况下,查询选择器都应是 Bson 类型,或转换为 Bson (例如,JSON 字符串或 HashMap)。如需更多信息,请参阅类型转换。

使用 MongoDB 驱动程序提供的 过滤器 创建查询选择器。

from("direct:findOneByQuery")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery")
    .to("mock:resultFindOneByQuery");
from("direct:findOneByQuery")
    .setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani")))
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery")
    .to("mock:resultFindOneByQuery");
注意

支持可选参数
。此操作支持预测运算符和排序条款。请参阅 指定字段过滤器(项目),指定 sort 子句。

35.7.1.5. findAll

findAll 操作会返回与查询匹配的所有文档,或根本不返回所有文档,在这种情况下,返回集合中包含的所有文档。query 对象会被提取 CamelMongoDbCriteria 标头。如果 CamelMongoDbCriteria 标头是 null,查询对象是提取的消息正文,即:它应当是 Bson 类型,或转换为 Bson。它可以是 JSON 字符串或哈希映射。如需更多信息,请参阅类型转换。

from("direct:findAll")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
    .to("mock:resultFindAll");
from("direct:findAll")
    .setHeader(MongoDbConstants.CRITERIA, Filters.eq("name", "Raul Kripalani"))
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
    .to("mock:resultFindAll");

通过以下标头支持分页和高效检索:

Expand
标头键快速持续描述(从 MongoDB API 文档中提取)预期类型

CamelMongoDbNumToSkip

MongoDbConstants.NUM_TO_SKIP

在光标的开头丢弃给定数量元素。

int/Integer

CamelMongoDbLimit

MongoDbConstants.LIMIT

限制返回的元素数量。

int/Integer

CamelMongoDbBatchSize

MongoDbConstants.BATCH_SIZE

限制一个批处理中返回的元素数量。光标通常获取结果对象的批处理,并将它们存储在本地。如果 batchSize 为正数,它代表检索的每个对象批处理的大小。可以对其进行调整以优化性能并限制数据传输。如果 batchSize 为负数,它将限制返回的对象数量,该数量适合最大批处理大小限制(通常为 4MB),光标将被关闭。例如,如果 batchSize 是 -10,则服务器会返回最多 10 个文档,并且尽可能在 4MB 中容纳,然后关闭光标。请注意,此功能与文档中必须适合最大大小的 limit ()不同,并且无需发送请求以关闭光标服务器端。即使在光标迭代后也可以更改批处理大小,在这种情况下,设置将应用到下一个批处理检索。

int/Integer

CamelMongoDbAllowDiskUse

MongoDbConstants.ALLOW_DISK_USE

设置 allowDiskUse MongoDB 标志。从 MongoDB Server 4.3.1 开始,支持它。将此标头与旧的 MongoDB 服务器版本搭配使用可能会导致查询失败。

boolean/Boolean

from("direct:findAll")
    .setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
    .setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani")))
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll&outputType=MongoIterable")
    .to("mock:resultFindAll");

findAll 操作还会返回以下 OUT 标头,以便在使用分页时迭代结果页面:

Expand
标头键快速持续描述(从 MongoDB API 文档中提取)数据类型

CamelMongoDbResultTotalSize

MongoDbConstants.RESULT_TOTAL_SIZE

与查询匹配的对象数量。这不会考虑 limit/skip。

int/Integer

CamelMongoDbResultPageSize

MongoDbConstants.RESULT_PAGE_SIZE

与查询匹配的对象数量。这不会考虑 limit/skip。

int/Integer

注意

支持可选参数
。此操作支持预测运算符和排序条款。请参阅 指定字段过滤器(项目),指定 sort 子句。

35.7.1.6. 数量

返回集合中对象总数,返回 Long 作为 OUT 消息正文。
以下示例将计算"dynamicCollectionName"集合中的记录数。请注意如何启用动态性,因此操作不会针对 "notableScientists" 集合运行,而是针对 "dynamicCollectionName" 集合。

// from("direct:count").to("mongodb:myDb?database=tickets&collection=flights&operation=count&dynamicity=true");
Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION, "dynamicCollectionName");
assertTrue("Result is not of type Long", result instanceof Long);

您可以提供一个查询。查询对象会被提取的 CamelMongoDbCriteria 标头。如果 CamelMongoDbCriteria 标头是 null,则查询对象所提取的消息正文(例如,它应当为 Bson 或可转换为 Bson.,并且操作将返回与此条件匹配的文档数量。

Document query = ...
Long count = template.requestBodyAndHeader("direct:count", query, MongoDbConstants.COLLECTION, "dynamicCollectionName");

35.7.1.7. 指定字段过滤器(投射)

默认情况下,查询操作将返回其整个匹配对象(及其所有字段)。如果您的文档比较大,且您只需要检索其字段子集,您可以在所有查询操作中指定字段过滤器,只需通过设置相关的 Bson (或类型转换为 Bson,如 JSON 字符串、映射等)在 CamelMongoDbFieldsProjection 标头、恒定快捷键上指定字段过滤器:Mongo DbConstants.FIELDS_PROJECTION

以下是一个示例,它使用 MongoDB 的 Projections 来简化 Bson 的创建。它检索除 _idboringField 以外的所有字段:

// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
Bson fieldProjection = Projection.exclude("_id", "boringField");
Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection);

以下是一个示例,它使用 MongoDB 的 Projections 来简化 Bson 的创建。它检索除 _idboringField 以外的所有字段:

// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
Bson fieldProjection = Projection.exclude("_id", "boringField");
Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection);

35.7.1.8. 指定 sort 子句

通常,需要从集合中获取 min/max 记录,它根据使用 MongoDB 的 Sorts 的特定字段排序来获取 min/max 记录来简化 Bson 的创建。它检索除 _idboringField 以外的所有字段:

// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
Bson sorts = Sorts.descending("_id");
Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.SORT_BY, sorts);

在 Camel 路由中,SORT_BY 标头可与 findOneByQuery 操作一起使用,以实现相同的结果。如果还指定了 FIELDS_PROJECTION 标头,则操作将返回可直接传递给另一个组件(例如,参数化 MyBatis SELECT 查询)的单个字段/值对。本例演示了根据 documentTimestamp 字段从集合中获取最新文档,并将结果减少到单个字段:

.from("direct:someTriggeringEvent")
.setHeader(MongoDbConstants.SORT_BY).constant(Sorts.descending("documentTimestamp"))
.setHeader(MongoDbConstants.FIELDS_PROJECTION).constant(Projection.include("documentTimestamp"))
.setBody().constant("{}")
.to("mongodb:myDb?database=local&collection=myDemoCollection&operation=findOneByQuery")
.to("direct:aMyBatisParameterizedSelect");

35.7.2. 创建/更新操作

35.7.2.1. insert

将新对象插入到 MongoDB 集合中,从 IN 消息正文中获取。尝试将其转换为 文档或 列表
支持两种模式:单一插入和多个插入。对于多个插入,端点将预期一个任何类型的对象的列表、数组或集合,只要它们是 - 或可转换为 - 文档。例如:

from("direct:insert")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=insert");

该操作将返回 WriteResult,具体取决于 WriteConcerninvokeGetLastError 选项的值,getLastError () 将被称为 already 或 not。如果您希望访问些操作的最终结果,可以通过在 WriteResult 上调用 getLastError()getCachedLastError() 来获取 CommandResult。然后,您可以通过调用 CommandResult.ok (), CommandResult.getErrorMessage () 和/或 CommandResult.getException () 来验证结果。

请注意,新对象的 _id 在集合中必须是唯一的。如果没有指定值,MongoDB 会自动为您生成一个。但是,如果您确实指定了它,则插入操作将失败(并且对于 Camel 注意,您需要启用 invokeGetLastError 或设置 WriteConcern 来等待写入结果)。

这不是组件的限制,但它是 MongoDB 中如何实现更高的吞吐量。如果您使用自定义 _id,则应该确保应用程序级别是唯一的(这也是一个好的做法)。

插入记录的 OID 存储在 CamelMongoOid 密钥(MongoDbConstants.OID 常态)下的消息标头中。存储的值是 org.bson.types.ObjectId,用于单个插入,如果插入了多个记录,则为 java.util.List<org.bson.types.ObjectId >。

在 MongoDB Java Driver 3.x 中,insertOne 和 insertMany 操作返回 void。Camel 插入操作返回插入文档或文档列表。请注意,如果需要,每个文档都由新的 OID 更新。

35.7.2.2. save

save 操作等同于一个 upsert (UPdate,inSERT)操作,记录将更新的位置,如果记录不存在,它将在一个原子操作中插入。MongoDB 将根据 _id 字段执行匹配。

请注意,如果更新,对象将被完全替换,并且不允许使用 MongoDB 的 $modifier。因此,如果您要操作对象(如果已存在),有两个选项:

  1. 执行查询,以便首先检索整个对象及其所有字段(效率不高),在 Camel 中更改它,然后保存它。
  2. 将 update 操作与 $modifiers 搭配使用,这将在服务器端执行更新。您可以启用 upsert 标志,在这种情况下,MongoDB 会将 $modifiers 应用到过滤器查询对象并插入结果。

如果要保存的文档不包含 _id 属性,则操作将是插入的,所创建的新 _id 将放置在 CamelMongoOid 标头中。

例如:

from("direct:insert")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=save");
// route: from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=save");
org.bson.Document docForSave = new org.bson.Document();
docForSave.put("key", "value");
Object result = template.requestBody("direct:insert", docForSave);

35.7.2.3. update

更新集合上的一个或多个记录。需要过滤器查询和更新规则。

您可以使用 MongoDBConstants.CRITERIA 标头定义为 Bson,并在 Body 中将更新规则定义为 Bson

注意

在增强 ,使用 MongoDBConstants.CRITERIA 标头作为 Bson 来查询 mongodb,在进行更新前,您应该注意到,在聚合策略中使用了 mongodb 更新时,您需要在聚合策略过程中从生成的 camel Exchange 中删除它,然后应用 mongodb 更新。
如果您在聚合和/或重新定义 MongoDBConstants.CRITERIA 标头期间没有删除此标头,然后再向 mongodb producer 端点发送 camel Exchange payload,在更新 mongodb 时可能会最终使用无效的 camel Exchange payload。

第二种方法 Require a List<Bson> 作为 IN 消息正文,其中包含正好 2 个元素:

  • 元素 1 (index 0) TOKEN 过滤器查询 TOKEN 确定将影响哪些对象,与典型的查询对象相同
  • 元素 2 (index 1) TOKEN update rules TOKEN 如何更新匹配的对象。支持 MongoDB 中的所有 修饰符操作
注意

Multiupdates
默认情况下,MongoDB 只会更新 1 个对象,即使多个对象与过滤器查询匹配。要指示 MongoDB 更新所有 匹配的记录,请将 CamelMongoDbMultiUpdate IN 消息标头设置为 true

带有密钥 CamelMongoDbRecordsAffected 的标头将被返回(MongoDbConstants.RECORDS_AFFECTED 常度)已更新的记录数(从 WriteResult.getN ()中复制)。

支持以下 IN 消息标头:

Expand
标头键快速持续描述(从 MongoDB API 文档中提取)预期类型

CamelMongoDbMultiUpdate

MongoDbConstants.MULTIUPDATE

如果更新应当应用到与所有对象匹配的所有对象。请参阅 http://www.mongodb.org/display/DOCS/Atomic+Operations

boolean/Boolean

CamelMongoDbUpsert

MongoDbConstants.UPSERT

如果数据库应该创建元素(如果不存在)

boolean/Boolean

例如,以下命令将通过将 "scientist" 字段的值设置为 "Darwin" 字段来更新 all records,其 filterField 字段等于 true :

// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update");
List<Bson> body = new ArrayList<>();
Bson filterField = Filters.eq("filterField", true);
body.add(filterField);
BsonDocument updateObj = new BsonDocument().append("$set", new BsonDocument("scientist", new BsonString("Darwin")));
body.add(updateObj);
Object result = template.requestBodyAndHeader("direct:update", body, MongoDbConstants.MULTIUPDATE, true);
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update");
Maps<String, Object> headers = new HashMap<>(2);
headers.add(MongoDbConstants.MULTIUPDATE, true);
headers.add(MongoDbConstants.FIELDS_FILTER, Filters.eq("filterField", true));
String updateObj = Updates.set("scientist", "Darwin");;
Object result = template.requestBodyAndHeaders("direct:update", updateObj, headers);
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update");
String updateObj = "[{\"filterField\": true}, {\"$set\", {\"scientist\", \"Darwin\"}}]";
Object result = template.requestBodyAndHeader("direct:update", updateObj, MongoDbConstants.MULTIUPDATE, true);

35.7.3. 删除操作

35.7.3.1. remove

从集合中删除匹配记录。IN 消息正文将充当删除过滤器查询,并且预期为 DBObject 类型,或者对它转换的类型。
以下示例将删除所有字段 'conditionField' 等于 true 的对象,位于 Science 数据库, notableScientists 集合中:

// route: from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientists&operation=remove");
Bson conditionField = Filters.eq("conditionField", true);
Object result = template.requestBody("direct:remove", conditionField);

返回一个带有键 CamelMongoDbRecordsAffected 的标头 (MongoDbConstants.RECORDS_AFFECTED constant) with type int, 包括删除记录的数量 (从 WriteResult.getN() 复制)。

35.7.4. 批量写操作

35.7.4.1. bulkWrite

在批量执行写入操作时,使用控制顺序执行。需要 List<WriteModel<Document >> 作为 IN 消息正文,其中包含插入、更新和删除操作的命令。

以下示例将插入一个新的科学家 "Pierre Curie",更新 id 为 "5" 的记录,将 "scientist" 项的值设置为 "Marie Curie" 并删除 id 为 "3" 的记录 :

// route: from("direct:bulkWrite").to("mongodb:myDb?database=science&collection=notableScientists&operation=bulkWrite");
List<WriteModel<Document>> bulkOperations = Arrays.asList(
            new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
            new UpdateOneModel<>(new Document("_id", "5"),
                                 new Document("$set", new Document("scientist", "Marie Curie"))),
            new DeleteOneModel<>(new Document("_id", "3")));

BulkWriteResult result = template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);

默认情况下,操作按顺序执行,并在第一个写入错误上中断,而不处理列表中任何剩余的写操作。要指示 MongoDB 继续处理列表中剩余的写入操作,请将 CamelMongoDbBulkOrdered IN 消息标头设置为 false。未排序的操作是并行执行的,此行为无法保证。

Expand
标头键快速持续描述(从 MongoDB API 文档中提取)预期类型

CamelMongoDbBulkOrdered

MongoDbConstants.BULK_ORDERED

执行有序或未排序的操作执行。默认值为 true。

boolean/Boolean

35.7.5. 其他操作

35.7.5.1. 聚合

对正文中包含的给定管道执行聚合。聚合可能比较长且繁重的操作。请谨慎使用。

// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate");
List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
        group("$scientist", sum("count", 1)));
from("direct:aggregate")
    .setBody().constant(aggregate)
    .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate")
    .to("mock:resultAggregate");

支持以下 IN 消息标头:

Expand
标头键快速持续描述(从 MongoDB API 文档中提取)预期类型

CamelMongoDbBatchSize

MongoDbConstants.BATCH_SIZE

设置每个批处理要返回的文档数量。

int/Integer

CamelMongoDbAllowDiskUse

MongoDbConstants.ALLOW_DISK_USE

启用聚合管道阶段将数据写入临时文件。

boolean/Boolean

默认情况下,返回所有结果的列表。根据结果的大小,这可能会给内存造成重度。更安全的替代方案是设置您的 outputType=MongoIterable。下一处理器会在消息正文中看到可迭代性,以便它逐一逐步执行结果。因此设置批处理大小并返回可移动性,从而可以有效地检索和处理结果。

例如:

List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist",
        group("$scientist", sum("count", 1)));
from("direct:aggregate")
    .setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
    .setBody().constant(aggregate)
    .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable")
    .split(body())
    .streaming()
    .to("mock:resultAggregate");

请注意,调用 .split (body ()) 足以逐个发送路由的条目,但它仍然会首先将所有条目加载到内存中。因此,需要调用 .streaming () 来通过批处理将数据加载到内存中。

35.7.5.2. getDbStats

等同于在 MongoDB shell 中运行 db.stats () 命令,该命令显示有关数据库的有用统计图。
例如:

> db.stats();
{
    "db" : "test",
    "collections" : 7,
    "objects" : 719,
    "avgObjSize" : 59.73296244784423,
    "dataSize" : 42948,
    "storageSize" : 1000058880,
    "numExtents" : 9,
    "indexes" : 4,
    "indexSize" : 32704,
    "fileSize" : 1275068416,
    "nsSizeMB" : 16,
    "ok" : 1
}

使用示例:

// from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getDbStats");
Object result = template.requestBody("direct:getDbStats", "irrelevantBody");
assertTrue("Result is not of type Document", result instanceof Document);

该操作将返回类似于 shell 中显示的数据结构,其格式为 OUT 消息正文中的 Document

35.7.5.3. getColStats

等同于在 MongoDB shell 中运行 db.collection.stats () 命令,该命令显示有关集合的有用统计图。
例如:

> db.camelTest.stats();
{
    "ns" : "test.camelTest",
    "count" : 100,
    "size" : 5792,
    "avgObjSize" : 57.92,
    "storageSize" : 20480,
    "numExtents" : 2,
    "nindexes" : 1,
    "lastExtentSize" : 16384,
    "paddingFactor" : 1,
    "flags" : 1,
    "totalIndexSize" : 8176,
    "indexSizes" : {
        "_id_" : 8176
    },
    "ok" : 1
}

使用示例:

// from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getColStats");
Object result = template.requestBody("direct:getColStats", "irrelevantBody");
assertTrue("Result is not of type Document", result instanceof Document);

该操作将返回类似于 shell 中显示的数据结构,其格式为 OUT 消息正文中的 Document

35.7.5.4. 命令

在数据库上运行正文作为命令。在获取主机信息、复制或分片状态时,管理员操作非常有用。

collection 参数不用于此操作。

// route: from("command").to("mongodb:myDb?database=science&operation=command");
DBObject commandBody = new BasicDBObject("hostInfo", "1");
Object result = template.requestBody("direct:command", commandBody);

35.7.6. 动态操作

Exchange 可以通过设置 MongoDbConstants.OPERATION_HEADER 常量定义的 CamelMongoDbOperation 标头来覆盖端点的固定操作。
支持的值由 MongoDbOperation enumeration 决定,并与端点 URI 上的 operation 参数的接受值匹配。

例如:

// from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=insert");
Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count");
assertTrue("Result is not of type Long", result instanceof Long);
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部