34.11. 启用持久跟踪
要启用此功能,在端点 URI 中至少设置以下选项:
-
persistentTailTracking
选项为true
-
persistentId
选项针对这个消费者的唯一标识符,以便可以在许多用户间重复使用相同的集合
另外,您还可以将 tailTrackDb
、tailTrackCollection
和 tailTrackField
选项设置为保存运行时信息的自定义。有关每个选项的描述,请参考本页顶部的端点选项表。
例如,以下路由会使用 "flights.cancellations" 容量收集,使用 "departureTime" 作为 increasing 字段,默认的重新生成光标延迟为 1000ms,并打开持久跟踪,并保留在 "flights.camelTailTracking" 的"cancellationsTracker" id 下。 在 "lastTrackingValue" 字段中存储最后处理的值(camelTailTracking
和 lastTrackingValue
都是默认值)。
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
以下是与以上地址相同的另一个示例,但永久跟踪运行时信息将存储在 "trackers.camelTrackers" 集合中,在"lastProcessedDepartureTime"字段中:
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");
34.11.1. 更改 Streams Consumer
通过更改流,应用程序就可以访问实时数据更改,而无需跟踪 MongoDB oplog 的复杂性和风险。应用程序可以使用更改流来订阅集合上的所有数据更改,并立即对其做出反应。由于更改流使用聚合框架,应用程序也可以过滤特定更改或转换通知。交换正文将包含任何更改的完整文档。
要配置 Change Streams Consumer,您需要指定 consumerType
、数据库
、收集和
可选的 JSON 属性 streamFilter
来过滤事件。该 JSON 属性是标准 MongoDB $match
聚合。它可以通过 XML DSL 配置轻松指定:
<route id="filterConsumer"> <from uri="mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }"/> <to uri="mock:test"/> </route>
Java 配置:
from("mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }") .to("mock:test");
您可以将 streamFilter 值外部化为属性占位符,以允许端点 URI 参数 清理 并更易于阅读。
changeStreams
使用者类型还会返回以下 OUT 标头:
标头密钥 | 快速持续 | 描述(从 MongoDB API doc中提取) | 数据类型 |
---|---|---|---|
|
| 发生的操作类型。可以是以下值之一: insert、delete、replace、update、drop、rename、dropDatabase、validate。 | 字符串 |
|
| 包含由插入、替换、删除、更新操作(例如 CRUD 操作)创建或修改的文档的 _id 文档。对于分片的集合,还显示文档的完整分片密钥。如果 _id 字段已属于 shard 密钥的一部分,则不会重复。 | ObjectId |