230.7. 消费者


有几种类型的消费者:

230.7.1. Tailable Cursor Consumer

MongoDB 提供了一种机制,可以立即使用集合中持续使用持续的数据,方法是保持光标的开放方式,类似于 *nix 系统的 tail -f 命令。这种机制比调度的轮询效率更高,因为服务器将新数据推送到客户端,而不是让客户端按计划的间隔重新执行 ping 来获取新数据。它还可减少其他冗余网络流量。

使用可尾随光标的一个先决条件:集合必须是"总结集合",即它将只保存 N 对象,当达到限制时,MongoDB 会按照最初插入的顺序清除旧对象。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。

Camel MongoDB 组件实施可尾部的光标消费者,使此功能可供您用于 Camel 路由。在插入新对象时,MongoDB 将以自然顺序将其推送为可尾部的光标消费者,后者会将其转换为 Exchange,并触发您的路由逻辑。

230.7.1.1. 可尾部的光标消费者的工作方式

要将光标转换为可尾部的光标,首先生成光标时,需要向 MongoDB 发出一些特殊标志。创建后,光标将保持打开状态,并在调用 MongoCursor.next () 方法时阻止,直到新数据到达为止。但是,如果新数据在非确定期之后没有显示新数据,MongoDB 服务器保留自己终止您的光标。如果您希望继续使用新数据,则必须重新生成光标。为此,您必须记住关闭的位置,或者您将再次从顶部开始消耗的位置。

Camel MongoDB tailable 光标消费者会为您处理所有这些任务。您只需要向数据中的一些字段提供密钥,该特性将作为每次重新生成时定位光标的标记,例如时间戳、顺序 ID 等。它可以是 MongoDB 支持的任何数据类型。日期、字符串和整数可以正常工作。我们在此组件上下文中调用这种机制"tail 跟踪"。

消费者将记住此字段的最后一个值,每当要重新生成光标时,它将使用类似过滤器(如 increasingField > lastValue )运行查询,以便只消耗未读取数据。

设置 increasing 字段: 在端点 URI tailTrackingIncreasingField 选项上设置 increasing 字段的密钥。在 Camel 2.10 中,它必须是数据中的顶级字段,因为尚不支持此字段的嵌套导航。也就是说,"timestamp"字段是 okay,但 "nested.timestamp" 将无法正常工作。如果您需要对嵌套增加字段的支持,请在 Camel JIRA 中创建一个问题单。

光标重新生成延迟: 要注意的一个事情是,如果初期没有新数据,MongoDB 将立即终止光标。由于我们不希望在此情形中覆盖服务器,因此增加了一个 cursorRegenerationDelay 选项(默认值为 1000ms)。

例如:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
    .id("tailableCursorConsumer1")
    .autoStartup(false)
    .to("mock:test");

以上路由会消耗来自"flights.cancellations"的上限集合,使用"departureTime"作为增加字段,默认重新生成光标延迟 1000ms。

230.7.1.2. 持久性尾部跟踪

标准尾部跟踪是易失性的,最后一个值仅保存在内存中。但是,在实践中,您需要每次重启 Camel 容器,但最后您的最后一个值将会丢失,且您的尾部的光标消费者将再次开始使用,这很有可能将重复记录发送到您的路由中。

要克服这种情况,您可以启用 持久性尾部跟踪 功能,以跟踪 MongoDB 数据库内特殊集合中持续使用的值。当消费者再次初始时,它将恢复最后跟踪的值,并在不做任何情况一样继续。

最后的读取值会在两个方面保留:每次重新生成光标以及消费者关闭时。如果需求需求,我们可能会考虑定期持续的间隔(每 5 秒清空一次),以便增加稳健性。要请求此功能,请在 Camel JIRA 中创建一个问题单。

230.7.1.3. 启用持久性尾部跟踪

要启用这个功能,请在端点 URI 上至少设置以下选项:

  • persistentTailTracking 选项为 true
  • persistentId 选项是这个消费者的唯一标识符,因此可以在多个用户间重复使用相同的集合

另外,您可以将 tailTrackDbtailTrackCollectiontailTrackField 选项设置为自定义存储运行时信息的位置。有关每个选项的描述,请参阅本页顶部的端点选项表。

例如,以下路由会消耗来自"flights.cancellations"的上限集合,使用 "departureTime" 作为增加的字段,默认的重新生成光标延迟为 1000ms,并且打开持久性尾部跟踪,并在"flights.camelTailTracking" id 下保留。在 "lastTrackingValue" 字段中存储最后处理的值( camelTail Tracking 和 lastTracking Value 是默认值)。

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");

以下是与上述示例相同的另一个示例,但持久性尾部跟踪运行时信息将存储在"lastProcessedDepartureTime"字段中的"trackers.camelTrackers"集合中:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
     "&tailTrackField=lastProcessedDepartureTime")
    .id("tailableCursorConsumer3")
    .autoStartup(false)
    .to("mock:test");

230.7.2. Change Streams Consumer

通过更改流,应用程序可以在不需要复杂性和跟踪 MongoDB oplog 的风险的情况下访问实时数据更改。应用程序可以使用更改流来订阅集合上的所有数据更改,并立即响应它们。由于更改流使用聚合框架,因此应用程序也可以过滤特定更改或转换通知。

要配置 Change Streams Consumer,您需要指定 consumerTypedatabasecollection 和可选的 JSON 属性 streamFilter 来过滤事件。该 JSON 属性是标准的 MongoDB $match 聚合。它可以通过 XML DSL 配置轻松指定:

<route id="filterConsumer">
    <from uri="mongodb3:myDb?consumerType=changeStreams&amp;database=flights&amp;collection=tickets"/>
    <to uri="mock:test"/>

    <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/>
</route>

Java 配置:

from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets")
    .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}")
    .to("mock:test");
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.