45.7. MongoDB 操作 - producer 端点
45.7.1. 查询操作
45.7.1.1. findById
此操作仅从集合中检索一个元素,其 _id 字段与 IN 消息正文的内容匹配。传入的对象可以是任何等同于 Bson
类型的对象。请参阅 http://bsonspec.org/spec.html 和 http://www.mongodb.org/display/DOCS/Java+Types。
from("direct:findById") .to("mongodb:myDb?database=flights&collection=tickets&operation=findById") .to("mock:resultFindById");
请注意,默认的 _id 被 Mongo 和 ObjectId
类型处理,因此您可能需要正确转换它。
from("direct:findById") .convertBodyTo(ObjectId.class) .to("mongodb:myDb?database=flights&collection=tickets&operation=findById") .to("mock:resultFindById");
支持可选参数
此操作支持投射运算符。请参阅 指定字段过滤器(项目)。
45.7.1.2. findOneByQuery
从与 MongoDB 查询选择器匹配的集合中检索第一个元素。如果设置了 CamelMongoDbCriteria
标头,则其值将用作查询选择器。如果 CamelMongoDbCriteria
标头为 null,则 IN 消息正文将用作查询选择器。在这两种情况下,查询选择器应当为 Bson
类型,或转换为 Bson
(例如,JSON 字符串或 HashMap
)。如需更多信息,请参阅类型转换。
使用 MongoDB 驱动程序提供的 过滤器
创建查询选择器。
45.7.1.3. 没有查询选择器的示例(在集合中显示第一个文档)
from("direct:findOneByQuery") .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") .to("mock:resultFindOneByQuery");
45.7.1.4. 带有查询选择器的示例(在集合中调整第一个匹配文档):
from("direct:findOneByQuery") .setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani"))) .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") .to("mock:resultFindOneByQuery");
支持可选参数
此操作支持投射运算符和排序条款。请参阅 指定字段过滤器(项目 ),指定 sort 子句。
45.7.1.5. findAll
findAll
操作返回与查询匹配的所有文档,或者根本不没有,在这种情况下,集合中包含的所有文档都会被返回。查询对象提取 CamelMongoDbCriteria
标头。如果 CamelMongoDbCriteria 标头是 null,查询对象将被提取消息正文,即它应该类型 Bson
或转换为 Bson
。它可以是 JSON 字符串或 Hashmap。如需更多信息,请参阅类型转换。
45.7.1.5.1. 没有查询选择器的示例(重复集合中的所有文档)
from("direct:findAll") .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") .to("mock:resultFindAll");
45.7.1.5.2. 带有查询选择器的示例(重复集合中的所有匹配文档)
from("direct:findAll") .setHeader(MongoDbConstants.CRITERIA, Filters.eq("name", "Raul Kripalani")) .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") .to("mock:resultFindAll");
通过以下标头支持分页和有效的检索:
标头键 | 快速常数 | 描述(从 MongoDB API 文档中提取) | 预期类型 |
---|---|---|---|
|
| 在光标开始时丢弃给定数量的元素。 | int/Integer |
|
| 限制返回的元素数量。 | int/Integer |
|
| 限制一个批处理中返回的元素数量。光标通常获取一系列结果对象,并将它们存储在本地。如果 batchSize 为正状态,它代表检索的每个对象批处理的大小。可以调整它以优化性能和限制数据传输。如果 batchSize 为负数,它将限制返回的数量对象,这适合最大批处理大小限制(通常为 4MB),而光标将被关闭。例如,如果 batchSize 是 -10,那么服务器将返回最多 10 个文档,并且尽可能在 4MB 中,然后关闭光标。请注意,这个功能与其中的 limit ()不同,该文档必须适合最大大小,而且无需发送请求来关闭光标服务器端。即使光标迭代后也可以更改批处理大小,在这种情况下,设置将在下一个批处理检索中应用。 | int/Integer |
|
| 设置 allowDiskUse MongoDB 标志。这是自 MongoDB Server 4.3.1 起的支持。将这个标头与旧的 MongoDB Server 版本搭配使用可能会导致查询失败。 | 布尔值/Boolean |
45.7.1.5.3. 带有选项 outputType=MongoIterable 和 batch 大小的示例
from("direct:findAll") .setHeader(MongoDbConstants.BATCH_SIZE).constant(10) .setHeader(MongoDbConstants.CRITERIA, constant(Filters.eq("name", "Raul Kripalani"))) .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll&outputType=MongoIterable") .to("mock:resultFindAll");
findAll
操作也会返回以下 OUT 标头,以便在使用分页时迭代结果页面:
标头键 | 快速常数 | 描述(从 MongoDB API 文档中提取) | 数据类型 |
---|---|---|---|
|
| 与查询匹配的对象数量。这不会考虑限制/skip。 | int/Integer |
|
| 与查询匹配的对象数量。这不会考虑限制/skip。 | int/Integer |
支持可选参数
此操作支持投射运算符和排序条款。请参阅 指定字段过滤器(项目 ),指定 sort 子句。
45.7.1.6. 数�
返回集合中对象总数,返回 Long 作为 OUT 消息正文。
以下示例将计算"dynamicCollectionName"集合中记录的数量。请注意,如何启用动态性,因此操作不会针对"notableScientists"集合运行,而是针对"dynamicCollectionName"集合运行。
// from("direct:count").to("mongodb:myDb?database=tickets&collection=flights&operation=count&dynamicity=true"); Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION, "dynamicCollectionName"); assertTrue("Result is not of type Long", result instanceof Long);
您可以提供一个查询 查询对象,它会被提取 CamelMongoDbCriteria
标头。如果 CamelMongoDbCriteria 标头为 null,查询对象将被提取消息正文,即它应该类型为 Bson
或转换为 Bson
.,操作将返回与此条件匹配的文档数量。
Document query = ... Long count = template.requestBodyAndHeader("direct:count", query, MongoDbConstants.COLLECTION, "dynamicCollectionName");
45.7.1.7. 指定字段过滤器(项目)
默认情况下,查询操作将返回整个匹配对象(及其所有字段)。如果您的文档较大,且您只需要检索其字段的子集,您可以在所有查询操作中指定字段过滤器,只需设置相关的
(或类型转换为 Bson,如 JSON 字符串、映射等)。Bson
以下是一个示例,它使用 MongoDB 的 Projections
来简化 Bson 的创建。它检索 _id
和 boringField
以外的所有字段:
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") Bson fieldProjection = Projection.exclude("_id", "boringField"); Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection);
以下是一个示例,它使用 MongoDB 的 Projections
来简化 Bson 的创建。它检索 _id
和 boringField
以外的所有字段:
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") Bson fieldProjection = Projection.exclude("_id", "boringField"); Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION, fieldProjection);
45.7.1.8. 指定 sort 子句
根据特定字段排序,通常会要求从集合中获取 min/max 记录,它使用 MongoDB 的 Sorts
来简化 Bson 的创建。它检索 _id
和 boringField
以外的所有字段:
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") Bson sorts = Sorts.descending("_id"); Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.SORT_BY, sorts);
在 Camel 路由中,SORT_BY 标头可与 findOneByQuery 操作一起使用,以实现相同的结果。如果还指定了 FIELDS_PROJECTION 标头,则操作将返回可直接传递给另一个组件(例如,参数化 MyBatis SELECT 查询)的单个字段/值对。本例演示了从集合中获取最新文档,并根据 documentTimestamp
字段将结果减少到单个字段:
.from("direct:someTriggeringEvent") .setHeader(MongoDbConstants.SORT_BY).constant(Sorts.descending("documentTimestamp")) .setHeader(MongoDbConstants.FIELDS_PROJECTION).constant(Projection.include("documentTimestamp")) .setBody().constant("{}") .to("mongodb:myDb?database=local&collection=myDemoCollection&operation=findOneByQuery") .to("direct:aMyBatisParameterizedSelect");
45.7.2. 创建/更新操作
45.7.2.1. insert
将新对象插入到 MongoDB 集合中,从 IN 消息正文获取。类型 conversion 试图将其转换为 文档或
列表
。
支持两种模式:单一插入和多个插入。对于多个插入,端点将期望任何类型的对象列表、阵列或集合,只要它们是 - 或可以转换为 - 文档
。Example:
from("direct:insert") .to("mongodb:myDb?database=flights&collection=tickets&operation=insert");
此操作将返回 WriteResult,具体取决于 WriteConcern
或 invokeGetLastError
选项的值,getLastError ()
将已被调用。如果您希望访问些操作的最终结果,可以通过在 WriteResult
上调用 getLastError()
或 getCachedLastError()
来获取 CommandResult
。然后,您可以通过调用 CommandResult.ok ()
、CommandResult.getErrorMessage ()
和/或 CommandResult.getException ()
来验证结果。
请注意,新对象的 _id
必须在集合中唯一。如果没有指定值,MongoDB 将为您自动生成值。但是,如果您确实指定它且不是唯一的,则插入操作将失败(对于 Camel 通知,则需要启用 invokeGetLastError 或设置等待写入结果的 WriteConcern。
这不是组件的一个限制,而是在 MongoDB 中用于更高的吞吐量。如果您使用自定义 _id
,则需要确保应用程序级别是唯一的(也是良好的做法)。
插入的记录的 OID 存储在 CamelMongoOid
键(MongoDbConstants.OID
constant)下的消息标头中。存储的值是 org.bson.types.ObjectId
,用于单个插入或 java.util.List<org.bson.types.ObjectId>
; (如果已插入了多个记录)。
在 MongoDB Java Driver 3.x 中,insertOne 并插入Many 操作会返回 void。Camel 插入操作返回 Document 或 插入的文档列表。请注意,如果需要,每个文档都会由新的 OID 更新。
45.7.2.2. save
save 操作等同于一个 upsert (UPdate, inSERT)操作,其中将更新记录,如果它不存在,则会将其插入到一个原子操作中。MongoDB 将根据 _id
字段执行匹配。
如果进行更新,则对象会被完全替换,不允许使用 MongoDB 的 $modifiers。因此,如果要操作对象(如果已存在),有两个选项:
- 执行查询以首先检索整个对象及其所有字段(不效率),在 Camel 中更改它,然后保存它。
- 使用带有 $modifiers 的 update 操作,该操作将在服务器端执行更新。您可以启用 upsert 标志,在这种情况下,如果需要插入,MongoDB 会将 $modifiers 应用到过滤器查询对象,并插入结果。
如果要保存的文档不包含 _id
属性,则操作将是一个插入,创建的新 _id
将放置在 CamelMongoOid
标头中。
例如:
from("direct:insert") .to("mongodb:myDb?database=flights&collection=tickets&operation=save");
// route: from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=save"); org.bson.Document docForSave = new org.bson.Document(); docForSave.put("key", "value"); Object result = template.requestBody("direct:insert", docForSave);
45.7.2.3. update
更新集合上的一个或多个记录。需要过滤查询和更新规则。
您可以使用 MongoDBConstants.CRITERIA 标头作为 Bson
定义过滤器,并将更新规则定义为 Body 中的 Bson
。
在增强
While 定义过滤器 后,使用 MongoDBConstants.CRITERIA 标头作为 Bson
在进行更新前查询 mongodb,您应该注意到,如果您使用带有聚合策略的增强模式,则需要在聚合过程中将其从生成的 camel 交换中删除,然后应用 mongodb 更新。如果您在聚合和/或重新定义 MongoDBConstants.CRITERIA 标头期间没有删除此标头,则在将 camel 交换发送到 mongodb producer 端点前,可能会以无效的 camel 交换有效负载结束。
第二种需要 List<Bson> 作为 IN 消息正文,其中包含完全 2 个元素:
- 元素 1 (index 0) IANA 过滤器查询 IANA 决定了哪些对象将受到影响,与典型的查询对象相同
- 元素 2 (索引 1)Demo 更新规则如何更新匹配对象。支持 MongoDB 中的所有 修饰符操作。
多更新
默认情况下,MongoDB 只会更新 1 个对象,即使多个对象与过滤器查询匹配。要指示 MongoDB 更新所有 匹配的记录,请将 CamelMongoDbMultiUpdate
IN 消息标头设置为 true
。
将返回一个带有键 CamelMongoDbRecordsAffected
的标头(MongoDbConstants.RECORDS_AFFECTED
constant),带有更新的记录数量(从 WriteResult.getN ()
中获得)。
支持以下 IN 消息标头:
标头键 | 快速常数 | 描述(从 MongoDB API 文档中提取) | 预期类型 |
---|---|---|---|
|
| 如果更新应该应用到所有对象匹配。请参阅 http://www.mongodb.org/display/DOCS/Atomic+Operations | 布尔值/Boolean |
|
| 如果数据库应该创建元素(如果不存在) | 布尔值/Boolean |
例如,以下命令将通过将 " scientist " 字段的值设置为 "Darwin" 字段来更新其 filterField 字段等于 true 的所有记录:
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); List<Bson> body = new ArrayList<>(); Bson filterField = Filters.eq("filterField", true); body.add(filterField); BsonDocument updateObj = new BsonDocument().append("$set", new BsonDocument("scientist", new BsonString("Darwin"))); body.add(updateObj); Object result = template.requestBodyAndHeader("direct:update", body, MongoDbConstants.MULTIUPDATE, true);
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); Maps<String, Object> headers = new HashMap<>(2); headers.add(MongoDbConstants.MULTIUPDATE, true); headers.add(MongoDbConstants.FIELDS_FILTER, Filters.eq("filterField", true)); String updateObj = Updates.set("scientist", "Darwin");; Object result = template.requestBodyAndHeaders("direct:update", updateObj, headers);
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); String updateObj = "[{\"filterField\": true}, {\"$set\", {\"scientist\", \"Darwin\"}}]"; Object result = template.requestBodyAndHeader("direct:update", updateObj, MongoDbConstants.MULTIUPDATE, true);
45.7.3. 删除操作
45.7.3.1. remove
从集合中删除匹配的记录。IN 消息正文将充当移除过滤器查询,预期为 DBObject
类型或可转换的类型。
以下示例将删除其字段 'conditionField' 等于 true 的所有对象,在科学数据库中,notableScientists 集合中:
// route: from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientists&operation=remove"); Bson conditionField = Filters.eq("conditionField", true); Object result = template.requestBody("direct:remove", conditionField);
返回一个带有键 CamelMongoDbRecordsAffected
的标头 (MongoDbConstants.RECORDS_AFFECTED
constant) with type int
, 包括删除记录的数量 (从 WriteResult.getN()
复制)。
45.7.4. 批量写入操作
45.7.4.1. bulkWrite
批量执行写入操作,并控制执行顺序。需要一个 List<WriteModel<Document
>> 作为 IN 消息正文,其中包含用于插入、更新和删除操作的命令。
以下示例将插入一个新的科学家 "Pierre Curie",更新 id 为 "5" 的记录,将 "scientist" 项的值设置为 "Marie Curie" 并删除 id 为 "3" 的记录 :
// route: from("direct:bulkWrite").to("mongodb:myDb?database=science&collection=notableScientists&operation=bulkWrite"); List<WriteModel<Document>> bulkOperations = Arrays.asList( new InsertOneModel<>(new Document("scientist", "Pierre Curie")), new UpdateOneModel<>(new Document("_id", "5"), new Document("$set", new Document("scientist", "Marie Curie"))), new DeleteOneModel<>(new Document("_id", "3"))); BulkWriteResult result = template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);
默认情况下,操作按顺序执行,并在第一个写入错误上中断,而不处理列表中的任何剩余的写入操作。要指示 MongoDB 继续处理列表中的剩余的写入操作,请将 CamelMongoDbBulkOrdered
IN 消息标头设置为 false
。未排序的操作是并行执行的,无法保证此行为。
标头键 | 快速常数 | 描述(从 MongoDB API 文档中提取) | 预期类型 |
---|---|---|---|
|
| 执行排序或未排序的操作执行。默认值为 true。 | 布尔值/Boolean |
45.7.5. 其他操作
45.7.5.1. 聚合
使用正文中包含的给定管道执行聚合。聚合可能比较长且大量操作。请谨慎使用。
// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate"); List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", group("$scientist", sum("count", 1))); from("direct:aggregate") .setBody().constant(aggregate) .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate") .to("mock:resultAggregate");
支持以下 IN 消息标头:
标头键 | 快速常数 | 描述(从 MongoDB API 文档中提取) | 预期类型 |
---|---|---|---|
|
| 设置每个批处理要返回的文档数。 | int/Integer |
|
| 启用聚合管道阶段将数据写入临时文件。 | 布尔值/Boolean |
默认情况下,返回所有结果的列表。根据结果的大小,这可能会大量内存。更安全的替代方案是设置 outputType=MongoIterable。接下来的处理器将在消息正文中看到一个 iterable,允许它逐一执行结果。因此,设置批处理大小并返回可迭代功能可以有效地检索和处理结果。
一个示例类似如下:
List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", group("$scientist", sum("count", 1))); from("direct:aggregate") .setHeader(MongoDbConstants.BATCH_SIZE).constant(10) .setBody().constant(aggregate) .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable") .split(body()) .streaming() .to("mock:resultAggregate");
请注意,调用 .split (body ())
足以逐一发送路由条目,但它仍会首先将所有条目加载到内存中。因此,需要调用 .streaming ()
,以便批量将数据加载到内存中。
45.7.5.2. getDbStats
等同于在 MongoDB shell 中运行 db.stats ()
命令,该命令显示有关数据库的有用统计图。
例如:
> db.stats(); { "db" : "test", "collections" : 7, "objects" : 719, "avgObjSize" : 59.73296244784423, "dataSize" : 42948, "storageSize" : 1000058880, "numExtents" : 9, "indexes" : 4, "indexSize" : 32704, "fileSize" : 1275068416, "nsSizeMB" : 16, "ok" : 1 }
使用示例:
// from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getDbStats"); Object result = template.requestBody("direct:getDbStats", "irrelevantBody"); assertTrue("Result is not of type Document", result instanceof Document);
操作将返回与 shell 中显示的数据结构类似,格式为 OUT 消息正文中的 文档
。
45.7.5.3. getColStats
等同于在 MongoDB shell 中运行 db.collection.stats ()
命令,该命令显示有关集合的有用统计图。
例如:
> db.camelTest.stats(); { "ns" : "test.camelTest", "count" : 100, "size" : 5792, "avgObjSize" : 57.92, "storageSize" : 20480, "numExtents" : 2, "nindexes" : 1, "lastExtentSize" : 16384, "paddingFactor" : 1, "flags" : 1, "totalIndexSize" : 8176, "indexSizes" : { "_id_" : 8176 }, "ok" : 1 }
使用示例:
// from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getColStats"); Object result = template.requestBody("direct:getColStats", "irrelevantBody"); assertTrue("Result is not of type Document", result instanceof Document);
操作将返回与 shell 中显示的数据结构类似,格式为 OUT 消息正文中的 文档
。
45.7.5.4. 命令
作为命令在数据库上运行正文。对于获取主机信息、复制或分片状态时,管理员操作非常有用。
集合参数不适用于此操作。
// route: from("command").to("mongodb:myDb?database=science&operation=command"); DBObject commandBody = new BasicDBObject("hostInfo", "1"); Object result = template.requestBody("direct:command", commandBody);
45.7.6. 动态操作
Exchange 可以通过设置由 MongoDbConstants.OPERATION_HEADER
常量定义的 CamelMongoDbOperation
标头来覆盖端点的固定操作。
支持的值由 MongoDbOperation enumeration 决定,并与端点 URI 上 operation
参数的值匹配。
例如:
// from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=insert"); Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count"); assertTrue("Result is not of type Long", result instanceof Long);