230.7. 消费者
有几种类型的消费者:
230.7.1. Tailable Cursor Consumer
MongoDB 提供了一种机制来即时消耗集合中持续数据的机制,使光标保持打开,就像是 *nix 系统的 tail -f
命令一样。由于服务器在客户端可用时将新数据推送至客户端,而不是让客户端在计划的时间间隔返回以获取新数据,因此这种机制比调度的轮询更高效。它还可减少其他冗余网络流量。
使用 tailable 光标仅有一个必要条件:集合必须是 "capped collection",这意味着它只会保存 N 对象,当达到限制时,MongoDB 会按最初插入的顺序清除旧对象。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。
Camel MongoDB 组件实施可尾部的光标消费者,使此功能可用于 Camel 路由。当插入新对象时,MongoDB 会将它们作为 Document
推送到您的可尾部的光标消费者,后者会将它们转换为交换,并将触发您的路由逻辑。
230.7.1.1. tailable 光标消费者的工作方式
要将光标转换为可尾部的光标,在首次生成光标时,将向 MongoDB 发出一些特殊标志。创建后,光标将保持打开状态,并在调用 MongoCursor.next ()
方法时阻止,直到新数据到达为止。但是,如果新数据没有在确定周期后显示,MongoDB 服务器保留自己终止您的光标的权利。如果您有兴趣继续使用新数据,则必须重新生成光标。为此,您必须记住您离开的位置,否则您将再次从上移使用的位置。
Camel MongoDB tailable 光标消费者为您处理所有这些任务。您只需要在增加性质的数据中为某些字段提供键,这将作为标记来每次重新生成光标时定位,例如时间戳、顺序 ID 等。它可以是 MongoDB 支持的任何 datatype。日期、字符串和整数可以正常工作。我们在此组件上下文中称为"定制"机制。
消费者将记住此字段的最后一个值,无论光标要重新生成时,它将使用类似: increasingField > lastValue
的过滤器运行查询,以便只消耗未读取的数据。
设置 increasing 字段: 在端点 URI tailTrackingIncreasingField
选项上设置 increasing 字段的键。在 Camel 2.10 中,它必须是数据的顶级字段,因为尚不支持此字段的嵌套导航。也就是说,"timestamp"字段是 okay,但 "nested.timestamp" 将无法正常工作。如果您需要对嵌套增加字段的支持,请在 Camel JIRA 中创建一个票据。
光标重新生成延迟: 需要注意的是,如果新数据在初始时尚未可用,MongoDB 将立即终止光标。由于我们不想在此案例中对服务器进行重负,因此引入了一个 cursorRegenerationDelay
选项(默认值为 1000ms)。您可以修改以适合您的需求。
例如:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test");
以上路由将使用 "flights.cancellations" capped 集合,使用 "departureTime" 作为增加字段,默认重新生成光标延迟 1000ms。
230.7.1.2. 持久性尾部跟踪
标准尾部跟踪是易失性的,最后一个值仅保存在内存中。但是,在实践中,您需要立即重启 Camel 容器,然后,您的最后一个值将会丢失,并且您的 tailable 光标消费者再次从上面消耗,很可能将重复记录发送到您的路由中。
要解决这种情况,您可以启用 持久性尾部跟踪 功能,以跟踪 MongoDB 数据库中特殊集合中最后一次消耗的值。当消费者再次初始化时,它将恢复最后跟踪的值,并像没有发生任何情况一样继续。
最后的读取值会在两个字节上保留:每次光标都被重新生成以及消费者关闭时。如果需求需求需求,我们可能还会考虑在常规间隔中保留(每 5 秒刷新一次)。要请求此功能,请在 Camel JIRA 中创建一个 ticket。
230.7.1.3. 启用持久性尾部跟踪
要启用此功能,请在端点 URI 中设置以下选项:
-
persistentTailTracking
选项为true
-
persistentId
选项为这个消费者的唯一标识符,以便可以在多个消费者间重复使用相同的集合
另外,您可以将 tailTrackDb
、tailTrackCollection
和 tailTrackField
选项设置为自定义将存储运行时信息的选项。有关每个选项的描述,请参阅此页面顶部的端点选项表。
例如,以下路由会消耗来自 "flights.cancellations" capped 集合,使用 "departureTime" 作为增加字段,默认重新生成光标延迟 1000ms,并打开持久性尾部跟踪,并在"flights.camelTailTracking"上的"cancellationsTracker" ID 下保留。 在 "lastTrackingValue" 字段下存储最后处理的值(camelTailTracking
和 lastTrackingValue
是默认值)。
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
以下是与上面相同的另一个示例,但持久性尾部跟踪运行时信息将存储在 "trackers.camelTrackers" 集合中,在 "lastProcessedDepartureTime" 字段中:
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");
230.7.2. Change Streams Consumer
Change Streams 允许应用程序访问实时数据更改,而无需跟踪 MongoDB oplog 的复杂性和风险。应用程序可以使用更改流来订阅集合上的所有数据更改,并立即响应它们。由于更改流使用聚合框架,应用程序也可以过滤特定更改或转换 上的通知。
要配置 Change Streams Consumer,您需要指定 consumerType
,database
,collection
和可选的 JSON 属性 streamFilter
来过滤事件。该 JSON 属性是标准 MongoDB $match
聚合。它可使用 XML DSL 配置轻松指定:
<route id="filterConsumer"> <from uri="mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets"/> <to uri="mock:test"/> <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/> </route>
Java 配置:
from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets") .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}") .to("mock:test");