230.7. 消费者
有几种类型的消费者:
230.7.1. Tailable Cursor Consumer
MongoDB 提供了一种机制,可以立即使用集合中持续使用持续的数据,方法是保持光标的开放方式,类似于 *nix 系统的 tail -f
命令。这种机制比调度的轮询效率更高,因为服务器将新数据推送到客户端,而不是让客户端按计划的间隔重新执行 ping 来获取新数据。它还可减少其他冗余网络流量。
使用可尾随光标的一个先决条件:集合必须是"总结集合",即它将只保存 N 对象,当达到限制时,MongoDB 会按照最初插入的顺序清除旧对象。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。
Camel MongoDB 组件实施可尾部的光标消费者,使此功能可供您用于 Camel 路由。在插入新对象时,MongoDB 将以自然顺序将其推送为可尾部的光标消费者,后者会将其转换为 Exchange,并触发您的路由逻辑。
230.7.1.1. 可尾部的光标消费者的工作方式
要将光标转换为可尾部的光标,首先生成光标时,需要向 MongoDB 发出一些特殊标志。创建后,光标将保持打开状态,并在调用 MongoCursor.next ()
方法时阻止,直到新数据到达为止。但是,如果新数据在非确定期之后没有显示新数据,MongoDB 服务器保留自己终止您的光标。如果您希望继续使用新数据,则必须重新生成光标。为此,您必须记住关闭的位置,或者您将再次从顶部开始消耗的位置。
Camel MongoDB tailable 光标消费者会为您处理所有这些任务。您只需要向数据中的一些字段提供密钥,该特性将作为每次重新生成时定位光标的标记,例如时间戳、顺序 ID 等。它可以是 MongoDB 支持的任何数据类型。日期、字符串和整数可以正常工作。我们在此组件上下文中调用这种机制"tail 跟踪"。
消费者将记住此字段的最后一个值,每当要重新生成光标时,它将使用类似过滤器(如 increasingField > lastValue
)运行查询,以便只消耗未读取数据。
设置 increasing 字段: 在端点 URI tailTrackingIncreasingField
选项上设置 increasing 字段的密钥。在 Camel 2.10 中,它必须是数据中的顶级字段,因为尚不支持此字段的嵌套导航。也就是说,"timestamp"字段是 okay,但 "nested.timestamp" 将无法正常工作。如果您需要对嵌套增加字段的支持,请在 Camel JIRA 中创建一个问题单。
光标重新生成延迟: 要注意的一个事情是,如果初期没有新数据,MongoDB 将立即终止光标。由于我们不希望在此情形中覆盖服务器,因此增加了一个 cursorRegenerationDelay
选项(默认值为 1000ms)。
例如:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test");
以上路由会消耗来自"flights.cancellations"的上限集合,使用"departureTime"作为增加字段,默认重新生成光标延迟 1000ms。
230.7.1.2. 持久性尾部跟踪
标准尾部跟踪是易失性的,最后一个值仅保存在内存中。但是,在实践中,您需要每次重启 Camel 容器,但最后您的最后一个值将会丢失,且您的尾部的光标消费者将再次开始使用,这很有可能将重复记录发送到您的路由中。
要克服这种情况,您可以启用 持久性尾部跟踪 功能,以跟踪 MongoDB 数据库内特殊集合中持续使用的值。当消费者再次初始时,它将恢复最后跟踪的值,并在不做任何情况一样继续。
最后的读取值会在两个方面保留:每次重新生成光标以及消费者关闭时。如果需求需求,我们可能会考虑定期持续的间隔(每 5 秒清空一次),以便增加稳健性。要请求此功能,请在 Camel JIRA 中创建一个问题单。
230.7.1.3. 启用持久性尾部跟踪
要启用这个功能,请在端点 URI 上至少设置以下选项:
-
persistentTailTracking
选项为true
-
persistentId
选项是这个消费者的唯一标识符,因此可以在多个用户间重复使用相同的集合
另外,您可以将 tailTrackDb
、tailTrackCollection
和 tailTrackField
选项设置为自定义存储运行时信息的位置。有关每个选项的描述,请参阅本页顶部的端点选项表。
例如,以下路由会消耗来自"flights.cancellations"的上限集合,使用 "departureTime" 作为增加的字段,默认的重新生成光标延迟为 1000ms,并且打开持久性尾部跟踪,并在"flights.camelTailTracking" id 下保留。在 "lastTrackingValue" 字段中存储最后处理的值( camelTail
Tracking 和 lastTracking
Value 是默认值)。
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
以下是与上述示例相同的另一个示例,但持久性尾部跟踪运行时信息将存储在"lastProcessedDepartureTime"字段中的"trackers.camelTrackers"集合中:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");
230.7.2. Change Streams Consumer
通过更改流,应用程序可以在不需要复杂性和跟踪 MongoDB oplog 的风险的情况下访问实时数据更改。应用程序可以使用更改流来订阅集合上的所有数据更改,并立即响应它们。由于更改流使用聚合框架,因此应用程序也可以过滤特定更改或转换通知。
要配置 Change Streams Consumer,您需要指定 consumerType
、database
、collection
和可选的 JSON 属性 streamFilter
来过滤事件。该 JSON 属性是标准的 MongoDB $match
聚合。它可以通过 XML DSL 配置轻松指定:
<route id="filterConsumer"> <from uri="mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets"/> <to uri="mock:test"/> <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/> </route>
Java 配置:
from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets") .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}") .to("mock:test");