搜索

229.7. 消费者

download PDF

有一些类型的消费者:

229.7.1. Tailable Cursor Consumer

MongoDB 提供了一个机制,它通过保持光标在 *nix 系统的 tail -f 命令一样,从集合中即时使用持续数据的机制。这种机制比调度的轮询效率高得多,因为服务器会将新数据推送至客户端变得可用,而不是让客户端在预定的时间间隔后重新执行 ping 来获取新数据。它还减少了其他冗余的网络流量。

只有一个必需使用尾部的光标:集合必须是"封装的集合",这意味着它只拥有 N 个对象,当达到限制时,MongoDB 会清空其最初插入的顺序中的旧对象。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。

Camel MongoDB 组件实施可尾部的光标使用者,使您能够在 Camel 路由中使用此功能。在插入新对象时,MongoDB 将以自然方式将其推送为 文档,以便您的可跟踪光标使用者将其转换为交换器并将触发路由逻辑。

229.7.1.1. 尾部的光标消费者的工作方式

要将光标置于可尾部的光标,在首次生成光标时,几个特殊标志将向 MongoDB 发送信号。创建之后,光标将保持打开状态,并在调用 MongoCursor.next () 方法时阻止,直到新的数据到达为止。但是,如果新数据未发生在非确定周期后,MongoDB 服务器保留了终止您的光标的权利。如果您有兴趣继续消耗新数据,您必须重新生成光标。为此,您必须记住离开的位置,否则您将再次从顶端开始消耗。

Camel MongoDB 尾部的光标使用者将为您处理所有任务。您只需向提高性质数据中的一些字段提供密钥,这样可充当标记来定位光标每次重新生成时的标记,如时间戳、顺序 ID 等。可以是 MongoDB 支持的任何数据类型。可以使用日期、字符串和 Integers 来正常工作。在此组件上下文中,我们称为"需要跟踪"的机制。

消费者将记住此字段的最后一个值,每当光标要重新生成时,都将使用过滤器运行查询,例如: increasing Field > lastValue,以便只消耗未读取数据。

设置 increasing字段: 设置端点 URI tailTrackingIncreasingField 选项上的 increasing字段的密钥。在 Camel 2.10 中,它必须是数据的顶级字段,因为此字段的嵌套导航不被支持。也就是说,"timestamp"字段为 okay,但 "nested.timestamp" 无法正常工作。如果您需要对嵌套的增大字段的支持,请在 Camel JIRA 中创建一个 ticket。

光标回收延迟: 一个需要注意的事项是,如果新数据在初始启动时不可用,MongoDB 将立即终止光标。由于我们不想在这个情形中 重负服务器,所以引入了一个光标修复 选项(值为 1000ms。),您可以对其进行修改以符合您的需要。

例如:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
    .id("tailableCursorConsumer1")
    .autoStartup(false)
    .to("mock:test");

以上路由会使用来自 "flights.cancellations" capped collection 中,使用 "departureTime" 作为增大字段,默认的 regeneration 光标延迟为 1000ms。

229.7.1.2. 持久性跟踪

标准尾部跟踪是易失性,最后一个值只保存在内存中。但是,在实践中,您需要每次都重新启动 Camel 容器,然后,您的最后一个值将会丢失,而尾部的光标消费者会再次消耗到您的路径中,很可能将重复记录发送到您的路由。

要克服这种情况,您可以启用 持久性尾部跟踪 功能,以跟踪 MongoDB 数据库中特殊集合中不断消耗的增加值。当消费者再次进行初始时,它将恢复最近一次跟踪的值,并像没有任何变化一样继续。

最后的 read 值在两个 occasions 上保留:每次重新生成光标时,都会在消费者关闭时进行。我们可能会考虑定期保留的间隔(每 5 秒刷新一次),以便在需求到时增加稳健性。要请求此功能,请在 Camel JIRA 中创建一个 ticket。

229.7.1.3. 启用持久性尾部跟踪

要启用这个功能,请在端点 URI 上至少设置以下选项:

  • persistentTailTracking 选项为 true
  • persistentId 选项用于这个使用者的唯一标识符,以便可以在很多消费者之间重复使用相同的集合

另外,您可以将 tailTrackDbtailTrackCollectiontailTrackField 选项设置为自定义存储运行时信息的位置。有关每个选项的描述,请参阅本页顶部的端点选项表。

例如,以下路由会使用来自 "flights.cancellations" capped 集合,使用 "departureTime" 作为增大字段,默认的 regeneration 光标延迟为 1000ms,打开持久的 tail 跟踪,并在 "flights.camelTailTailTracking" 的 "cancellationsTracker" id 下保留。 在 "lastTrackingValue" 字段中存储最后一个处理的值(camelTailTrackinglastTrackingValue 为 defaults)。

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");

以下是与上面的另一个示例相同,但其中持久性 tail 跟踪运行时信息将存储在 "trackers.camelTrackers" 字段中的 "lastProcessedDepartureTime" 字段中:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
     "&tailTrackField=lastProcessedDepartureTime")
    .id("tailableCursorConsumer3")
    .autoStartup(false)
    .to("mock:test");
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.