230.7. 消费者
有几种类型的用户:
230.7.1. Tailable Cursor Consumer
MongoDB 提供了一种机制来即时消耗来自集合的持续数据,方法是使光标像对 *nix 系统的 tail -f
命令一样打开。这种机制比调度的轮询效率要高得多,因为服务器在客户端上推送新数据就会可用,而不是让客户端按计划的间隔返回来获取新数据。它还减少了其他冗余网络流量。
只需要使用尾部的光标,即集合必须是 "capped collection",这意味着它仅拥有 N 对象,并在达到限制时,MongoDB 清除旧对象与其最初插入的顺序相同。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。
Camel MongoDB 组件实施尾部的光标消费者,使此功能可用于您的 Camel 路由。在插入新对象后,MongoDB 会将它们作为文档
推送到您的尾部光标消费者,这将将其转换为 Exchange,并触发您的路由逻辑。
230.7.1.1. 尾部的光标消费者的工作方式
要将光标置于可尾部的光标中,在第一次生成光标时,会将几个特殊标记信号分配给 MongoDB。创建后,光标将保持打开状态,并将在调用 MongoCursor.next()
方法时阻止,直到新数据到达。但是,如果新数据在非确定周期后没有出现,MongoDB 服务器保留自己要终止您的光标。如果您有兴趣继续使用新数据,则必须重新生成光标。为此,您必须记住离开的位置,或者您将再次占用率的其他位置。
Camel MongoDB tail 可移动的光标消费者为您处理所有这些任务。您只是向数据中增加性质的部分字段提供密钥,这样每次重新生成时,都将作为您的光标定位的标记,例如时间戳、后续 ID 等。它可以是 MongoDB 支持的任何数据类型。日期、字符串和 Integers 可以正常工作。我们在这个组件上下文中调用这种机制"tail 跟踪"。
使用者将记住此字段的最后值,以及每当要重新生成光标时,将通过一个过滤器(如 increase Field > lastValue
)运行查询,以便只消耗非读数据。
设置 increasing 字段: 在端点 URI tailTrackingIncreasingField
选项上设置 increase 字段的键。在 Camel 2.10 中,它必须是数据中的顶层字段,目前还不支持此字段的嵌套导航。也就是说,"timestamp"字段是 okay,但"nested.timestamp"将无法工作。如果您需要支持嵌套增加的字段,请在 Camel JIRA 中创建一个 ticket。
光标重新生成延迟: 一个需要注意的是,如果新数据在初始发生时还没有提供,MongoDB 将立即终止光标。由于我们不想超过服务器,因此引进了 cursorRegenerationDelay
选项(默认值为 1000ms)。
例如:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test");
以上路由将消耗来自"flights.cancellations"功能的集合,使用 "departureTime" 作为 increasing 字段,默认的重新生成光标延迟为 1000ms。
230.7.1.2. 持久跟踪
标准尾部跟踪是易失性,最后一个值仅保存在内存中。但是,在实践中,您需要现在每一次重启 Camel 容器,但最后的值将会丢失,并且您的尾部光标消费者再次使用自上而来,很可能将重复的记录发送到您的路由。
要克服这种情况,您可以启用 持久的尾部跟踪 功能,以便在 MongoDB 数据库的特殊集合中跟踪最后一次消耗的值。当消费者再次指出时,它将恢复最后的跟踪值,并在没有任何情况时继续操作。
最后的 read 值会保留两个结果:每次重新生成光标以及消费者关闭的时间。在未来 5 秒内,我们可能会考虑定期持续保留时间(每 5 秒清空一次),以便在需求时增加稳健性。要请求此功能,请在 Camel JIRA 中创建一个 ticket。
230.7.1.3. 启用持久跟踪
要启用此功能,在端点 URI 中至少设置以下选项:
-
persistentTailTracking
选项为true
-
persistentId
选项针对这个消费者的唯一标识符,以便可以在许多用户间重复使用相同的集合
另外,您还可以将 tailTrackDb
、tailTrackCollection
和 tailTrackField
选项设置为保存运行时信息的自定义。有关每个选项的描述,请参考本页顶部的端点选项表。
例如,以下路由会使用 "flights.cancellations" 容量收集,使用 "departureTime" 作为 increasing 字段,默认的重新生成光标延迟为 1000ms,并打开持久跟踪,并保留在 "flights.camelTailTracking" 的"cancellationsTracker" id 下。 在 "lastTrackingValue" 字段中存储最后处理的值(camelTailTracking
和 lastTrackingValue
都是默认值)。
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
以下是与以上地址相同的另一个示例,但永久跟踪运行时信息将存储在 "trackers.camelTrackers" 集合中,在"lastProcessedDepartureTime"字段中:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");