230.7. 消费者


有几种类型的消费者:

230.7.1. Tailable Cursor Consumer

MongoDB 提供了一种机制来即时消耗集合中持续数据的机制,使光标保持打开,就像是 *nix 系统的 tail -f 命令一样。由于服务器在客户端可用时将新数据推送至客户端,而不是让客户端在计划的时间间隔返回以获取新数据,因此这种机制比调度的轮询更高效。它还可减少其他冗余网络流量。

使用 tailable 光标仅有一个必要条件:集合必须是 "capped collection",这意味着它只会保存 N 对象,当达到限制时,MongoDB 会按最初插入的顺序清除旧对象。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。

Camel MongoDB 组件实施可尾部的光标消费者,使此功能可用于 Camel 路由。当插入新对象时,MongoDB 会将它们作为 Document 推送到您的可尾部的光标消费者,后者会将它们转换为交换,并将触发您的路由逻辑。

230.7.1.1. tailable 光标消费者的工作方式

要将光标转换为可尾部的光标,在首次生成光标时,将向 MongoDB 发出一些特殊标志。创建后,光标将保持打开状态,并在调用 MongoCursor.next () 方法时阻止,直到新数据到达为止。但是,如果新数据没有在确定周期后显示,MongoDB 服务器保留自己终止您的光标的权利。如果您有兴趣继续使用新数据,则必须重新生成光标。为此,您必须记住您离开的位置,否则您将再次从上移使用的位置。

Camel MongoDB tailable 光标消费者为您处理所有这些任务。您只需要在增加性质的数据中为某些字段提供键,这将作为标记来每次重新生成光标时定位,例如时间戳、顺序 ID 等。它可以是 MongoDB 支持的任何 datatype。日期、字符串和整数可以正常工作。我们在此组件上下文中称为"定制"机制。

消费者将记住此字段的最后一个值,无论光标要重新生成时,它将使用类似: increasingField > lastValue 的过滤器运行查询,以便只消耗未读取的数据。

设置 increasing 字段: 在端点 URI tailTrackingIncreasingField 选项上设置 increasing 字段的键。在 Camel 2.10 中,它必须是数据的顶级字段,因为尚不支持此字段的嵌套导航。也就是说,"timestamp"字段是 okay,但 "nested.timestamp" 将无法正常工作。如果您需要对嵌套增加字段的支持,请在 Camel JIRA 中创建一个票据。

光标重新生成延迟: 需要注意的是,如果新数据在初始时尚未可用,MongoDB 将立即终止光标。由于我们不想在此案例中对服务器进行重负,因此引入了一个 cursorRegenerationDelay 选项(默认值为 1000ms)。您可以修改以适合您的需求。

例如:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
    .id("tailableCursorConsumer1")
    .autoStartup(false)
    .to("mock:test");

以上路由将使用 "flights.cancellations" capped 集合,使用 "departureTime" 作为增加字段,默认重新生成光标延迟 1000ms。

230.7.1.2. 持久性尾部跟踪

标准尾部跟踪是易失性的,最后一个值仅保存在内存中。但是,在实践中,您需要立即重启 Camel 容器,然后,您的最后一个值将会丢失,并且您的 tailable 光标消费者再次从上面消耗,很可能将重复记录发送到您的路由中。

要解决这种情况,您可以启用 持久性尾部跟踪 功能,以跟踪 MongoDB 数据库中特殊集合中最后一次消耗的值。当消费者再次初始化时,它将恢复最后跟踪的值,并像没有发生任何情况一样继续。

最后的读取值会在两个字节上保留:每次光标都被重新生成以及消费者关闭时。如果需求需求需求,我们可能还会考虑在常规间隔中保留(每 5 秒刷新一次)。要请求此功能,请在 Camel JIRA 中创建一个 ticket。

230.7.1.3. 启用持久性尾部跟踪

要启用此功能,请在端点 URI 中设置以下选项:

  • persistentTailTracking 选项为 true
  • persistentId 选项为这个消费者的唯一标识符,以便可以在多个消费者间重复使用相同的集合

另外,您可以将 tailTrackDbtailTrackCollectiontailTrackField 选项设置为自定义将存储运行时信息的选项。有关每个选项的描述,请参阅此页面顶部的端点选项表。

例如,以下路由会消耗来自 "flights.cancellations" capped 集合,使用 "departureTime" 作为增加字段,默认重新生成光标延迟 1000ms,并打开持久性尾部跟踪,并在"flights.camelTailTracking"上的"cancellationsTracker" ID 下保留。 在 "lastTrackingValue" 字段下存储最后处理的值(camelTailTrackinglastTrackingValue 是默认值)。

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");

以下是与上面相同的另一个示例,但持久性尾部跟踪运行时信息将存储在 "trackers.camelTrackers" 集合中,在 "lastProcessedDepartureTime" 字段中:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
     "&tailTrackField=lastProcessedDepartureTime")
    .id("tailableCursorConsumer3")
    .autoStartup(false)
    .to("mock:test");

230.7.2. Change Streams Consumer

Change Streams 允许应用程序访问实时数据更改,而无需跟踪 MongoDB oplog 的复杂性和风险。应用程序可以使用更改流来订阅集合上的所有数据更改,并立即响应它们。由于更改流使用聚合框架,应用程序也可以过滤特定更改或转换 上的通知。

要配置 Change Streams Consumer,您需要指定 consumerType,database,collection 和可选的 JSON 属性 streamFilter 来过滤事件。该 JSON 属性是标准 MongoDB $match 聚合。它可使用 XML DSL 配置轻松指定:

<route id="filterConsumer">
    <from uri="mongodb3:myDb?consumerType=changeStreams&amp;database=flights&amp;collection=tickets"/>
    <to uri="mock:test"/>

    <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/>
</route>

Java 配置:

from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets")
    .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}")
    .to("mock:test");
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.