229.7. 消费者
有一些类型的消费者:
229.7.1. Tailable Cursor Consumer
MongoDB 提供了一个机制,它通过保持光标在 *nix 系统的 tail -f
命令一样,从集合中即时使用持续数据的机制。这种机制比调度的轮询效率高得多,因为服务器会将新数据推送至客户端变得可用,而不是让客户端在预定的时间间隔后重新执行 ping 来获取新数据。它还减少了其他冗余的网络流量。
只有一个必需使用尾部的光标:集合必须是"封装的集合",这意味着它只拥有 N 个对象,当达到限制时,MongoDB 会清空其最初插入的顺序中的旧对象。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。
Camel MongoDB 组件实施可尾部的光标使用者,使您能够在 Camel 路由中使用此功能。在插入新对象时,MongoDB 将以自然方式将其推送为 文档
,以便您的可跟踪光标使用者将其转换为交换器并将触发路由逻辑。
229.7.1.1. 尾部的光标消费者的工作方式
要将光标置于可尾部的光标,在首次生成光标时,几个特殊标志将向 MongoDB 发送信号。创建之后,光标将保持打开状态,并在调用 MongoCursor.next ()
方法时阻止,直到新的数据到达为止。但是,如果新数据未发生在非确定周期后,MongoDB 服务器保留了终止您的光标的权利。如果您有兴趣继续消耗新数据,您必须重新生成光标。为此,您必须记住离开的位置,否则您将再次从顶端开始消耗。
Camel MongoDB 尾部的光标使用者将为您处理所有任务。您只需向提高性质数据中的一些字段提供密钥,这样可充当标记来定位光标每次重新生成时的标记,如时间戳、顺序 ID 等。可以是 MongoDB 支持的任何数据类型。可以使用日期、字符串和 Integers 来正常工作。在此组件上下文中,我们称为"需要跟踪"的机制。
消费者将记住此字段的最后一个值,每当光标要重新生成时,都将使用过滤器运行查询,例如: increasing Field > lastValue
,以便只消耗未读取数据。
设置 increasing字段: 设置端点 URI tailTrackingIncreasingField
选项上的 increasing字段的密钥。在 Camel 2.10 中,它必须是数据的顶级字段,因为此字段的嵌套导航不被支持。也就是说,"timestamp"字段为 okay,但 "nested.timestamp" 无法正常工作。如果您需要对嵌套的增大字段的支持,请在 Camel JIRA 中创建一个 ticket。
光标回收延迟: 一个需要注意的事项是,如果新数据在初始启动时不可用,MongoDB 将立即终止光标。由于我们不想在这个情形中 重负服务器,所以引入了一个光标修复
选项(值为 1000ms。),您可以对其进行修改以符合您的需要。
例如:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test");
以上路由会使用来自 "flights.cancellations" capped collection 中,使用 "departureTime" 作为增大字段,默认的 regeneration 光标延迟为 1000ms。
229.7.1.2. 持久性跟踪
标准尾部跟踪是易失性,最后一个值只保存在内存中。但是,在实践中,您需要每次都重新启动 Camel 容器,然后,您的最后一个值将会丢失,而尾部的光标消费者会再次消耗到您的路径中,很可能将重复记录发送到您的路由。
要克服这种情况,您可以启用 持久性尾部跟踪 功能,以跟踪 MongoDB 数据库中特殊集合中不断消耗的增加值。当消费者再次进行初始时,它将恢复最近一次跟踪的值,并像没有任何变化一样继续。
最后的 read 值在两个 occasions 上保留:每次重新生成光标时,都会在消费者关闭时进行。我们可能会考虑定期保留的间隔(每 5 秒刷新一次),以便在需求到时增加稳健性。要请求此功能,请在 Camel JIRA 中创建一个 ticket。
229.7.1.3. 启用持久性尾部跟踪
要启用这个功能,请在端点 URI 上至少设置以下选项:
-
persistentTailTracking
选项为true
-
persistentId
选项用于这个使用者的唯一标识符,以便可以在很多消费者之间重复使用相同的集合
另外,您可以将 tailTrackDb
、tailTrackCollection
和 tailTrackField
选项设置为自定义存储运行时信息的位置。有关每个选项的描述,请参阅本页顶部的端点选项表。
例如,以下路由会使用来自 "flights.cancellations" capped 集合,使用 "departureTime" 作为增大字段,默认的 regeneration 光标延迟为 1000ms,打开持久的 tail 跟踪,并在 "flights.camelTailTailTracking" 的 "cancellationsTracker" id 下保留。 在 "lastTrackingValue" 字段中存储最后一个处理的值(camelTailTracking
和 lastTrackingValue
为 defaults)。
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
以下是与上面的另一个示例相同,但其中持久性 tail 跟踪运行时信息将存储在 "trackers.camelTrackers" 字段中的 "lastProcessedDepartureTime" 字段中:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");