45.11. 启用持久的尾部跟踪
要启用此功能,请在端点 URI 中至少设置以下选项:
- 
						persistentTailTracking选项为true
- 
						persistentId选项指向这个消费者的唯一标识符,以便在许多消费者间重复使用相同的集合
				另外,您可以将 tailTrackDb,tailTrackCollection 和 tailTrackField 选项设置为自定义存储运行时信息的位置。有关每个选项的描述,请参阅此页面顶部的端点选项表。
			
				例如,以下路由将使用"departureTime"作为增加字段的"flights.cancellations"大写集合,默认的重新生成光标延迟为 1000ms,持久性尾部跟踪打开,并在"flights.camelTracking" ID 下保留。 将最后处理的值存储在"lastTrackingValue"字段下(camelTailTracking 和 lastTrackingValue 是默认值)。
			
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");以下是与上述示例相同,但持久性尾部跟踪运行时信息将存储在"trackers.camelTrackers"集合中,位于"lastProcessedDepartureTime"字段中:
45.11.1. 更改流消费者
通过更改流,应用程序可以访问实时数据更改,而不会降低 MongoDB oplog 的复杂性和风险。应用程序可以使用更改流来订阅集合上的所有数据更改,并立即响应它们。由于更改流使用聚合框架,应用程序也可以过滤特定更改或转换通知。交换正文将包含任何更改的完整文档。
					要配置 Change Streams Consumer,您需要指定 consumerType、database、collection 和可选的 JSON 属性 streamFilter 来过滤事件。该 JSON 属性是标准的 MongoDB $match 聚合。可使用 XML DSL 配置轻松指定:
				
<route id="filterConsumer">
    <from uri="mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }"/>
    <to uri="mock:test"/>
</route>
<route id="filterConsumer">
    <from uri="mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }"/>
    <to uri="mock:test"/>
</route>Java 配置:
from("mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }")
    .to("mock:test");
from("mongodb:myDb?consumerType=changeStreams&database=flights&collection=tickets&streamFilter={ '$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]} }")
    .to("mock:test");您可以将 streamFilter 值外部化成属性占位符,允许清理端点 URI 参数并更轻松地读取。
					changeStreams 消费者类型也会返回以下 OUT 标头:
				
| 标头键 | 快速常数 | 描述(从 MongoDB API 文档中提取) | 数据类型 | 
|---|---|---|---|
| 
									 | 
									 | 发生的操作类型。可以是以下值: insert, delete, replace, update, drop, dropDatabase, invalidate。 | 字符串 | 
| 
									 | 
									 | 包含由插入、替换、删除、更新操作(如 CRUD 操作)创建或修改的文档。对于分片集合,还显示文档的完整分片键。如果 _id 字段已经是分片密钥的一部分,则不会重复它。 | ObjectId |