検索

230.7. Consumers

download PDF

コンシューマーにはいくつかのタイプがあります。

230.7.1. Tailable カーソルコンシューマー

MongoDB は、*nix システムの tail -f コマンドのようにカーソルを開いたままにすることで、コレクションから進行中のデータを瞬時に消費するメカニズムを提供します。このメカニズムは、クライアントがスケジュールされた間隔で ping を返して新しいデータを取得するのではなく、新しいデータが利用可能になったときにサーバーがクライアントにプッシュするため、スケジュールされたポーリングよりもはるかに効率的です。また、冗長なネットワークトラフィックも削減されます。

tailable カーソルを使用するための必要条件は 1 つだけです。つまり、コレクションは " 上限付きコレクション " である必要があります。これは、N 個のオブジェクトのみを保持することを意味し、制限に達すると、MongoDB は最初に挿入されたのと同じ順序で古いオブジェクトをフラッシュします。詳細は、http://www.mongodb.org/display/DOCS/Tailable+Cursors を参照してください。

Camel MongoDB コンポーネントは、tailable カーソルコンシューマーを実装しているため、この機能を Camel ルートで使用できるようになります。新しいオブジェクトが挿入されると、MongoDB はそれらを Document として自然な順序で tailable カーソルconsumer にプッシュします。tailable cursor consumer はそれらをエクスチェンジに変換し、ルートロジックをトリガーします。

230.7.1.1. tailable cursor consumer の仕組み

カーソルを tailable カーソルに変えるには、最初にカーソルを生成するときに、いくつかの特別なフラグを MongoDB に通知する必要があります。作成されると、カーソルは開いたままになり、新しいデータが到着するまで MongoCursor.next() メソッドを呼び出すとブロックされます。ただし、MongoDB サーバーは、不確定な期間が経過しても新しいデータが表示されない場合、カーソルを強制終了する権利を留保します。新しいデータを引き続き使用する場合は、カーソルを再生成する必要があります。そのためには、中断した位置を覚えておく必要があります。そうしないと、もう一度最初から消費し始めます。

Camel MongoDB tailable cursor consumer は、これらすべてのタスクを処理します。タイムスタンプ、シーケンシャル ID など、再生成されるたびにカーソルを配置するマーカーとして機能する、増加する性質のデータ内のフィールドにキーを提供するだけで済みます。MongoDB でサポートされている任意のデータ型にすることができます。日付、文字列、および整数がうまく機能することがわかっています。このコンポーネントのコンテキストでは、このメカニズムをテールトラッキングと呼びます。

consumer はこのフィールドの最後の値を記憶し、カーソルが再生成されるたびに、次のようなフィルターを使用してクエリーを実行します。increasingField > lastValue のようなフィルタでクエリを実行し、未読のデータのみが消費されるようにします。

増加フィールドの設定: エンドポイント URI の tailTrackingIncreasingField オプションで増加フィールドのキーを設定します。Camel 2.10 では、このフィールドのネストされたナビゲーションがまだサポートされていないため、データの最上位フィールドである必要があります。つまり、"timestamp" フィールドは問題ありませんが、"nested.timestamp" は機能しません。ネストされた増加するフィールドのサポートが必要な場合は、Camel JIRA でチケットを開いてください。

カーソル再生成の遅延: 注意すべきことの 1 つは、初期化時に新しいデータがまだ利用できない場合、MongoDB はカーソルを即座に強制終了することです。この場合、サーバーに負荷をかけたくないので、cursorRegenerationDelay オプションが導入されています (デフォルト値は 1000 ミリ秒です)。これは、ニーズに合わせて変更できます。

以下に例を示します。

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
    .id("tailableCursorConsumer1")
    .autoStartup(false)
    .to("mock:test");

上記のルートは、departureTime を増加フィールドとして使用し、デフォルトの再生成カーソル遅延を 1000 ミリ秒にして、flights.cancellations キャップコレクションから消費します。

230.7.1.2. 永続的なテールトラッキング

標準のテールトラッキングは揮発性であり、最後の値はメモリーにのみ保持されます。ただし、実際には Camel コンテナーを時々再起動する必要がありますが、最後の値は失われ、tailable cursor consumer は再び先頭から消費を開始し、ルートに重複レコードを送信する可能性が非常に高くなります。

この状況を克服するために、永続的なテール追跡 機能を有効にして、MongoDB データベース内の特別なコレクションで最後に消費された増加値を追跡することもできます。consumer が再び初期化されると、最後に追跡された値が復元され、何も起こらなかったかのように続行されます。

最後に読み取られた値は、カーソルが再生成されるたびと consumer がシャットダウンするときの 2 つの場合に保持されます。需要があれば、堅牢性を高めるために、将来的には定期的な間隔 (5 秒ごとにフラッシュ) で永続化することも検討する可能性があります。この機能をリクエストするには、Camel JIRA でチケットを開いてください。

230.7.1.3. 永続的なテールトラッキングを有効にする

この機能を有効にするには、エンドポイント URI で少なくとも次のオプションを設定します。

  • persistentTailTracking オプションを true に設定
  • この consumer の一意の識別子に persistentId オプションを追加して、同じコレクションを多くの consumer で再利用できるようにします

さらに、tailTrackDbtailTrackCollection、および tailTrackField オプションを設定して、ランタイム情報が保存される場所をカスタマイズできます。各オプションの説明については、このページの上部にあるエンドポイントオプションの表を参照してください。

たとえば、次のルートは、departureTime を増加フィールドとして使用し、デフォルトの再生成カーソル遅延を 1000 ミリ秒に設定して、永続的なテールトラッキングをオンにし、cancellationsTracker の下で永続化して、flights.cancellations キャップコレクションから消費します。flights.camelTailTracking の id で、最後に処理された値を lastTrackingValue フィールドに格納します (camelTailTrackinglastTrackingValue はデフォルトです)。

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");

以下は、上記と同じ別の例ですが、永続的なテールトラッキングランタイム情報が trackers.camelTrackers コレクションの lastProcessedDepartureTime フィールドに格納されます。

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
     "&tailTrackField=lastProcessedDepartureTime")
    .id("tailableCursorConsumer3")
    .autoStartup(false)
    .to("mock:test");

230.7.2. Change Streams consumer

変更ストリームを使用すると、MongoDB oplog を追跡する複雑さやリスクなしに、アプリケーションはリアルタイムのデータ変更にアクセスできます。アプリケーションは変更ストリームを使用して、コレクションのすべてのデータ変更をサブスクライブし、すぐに対応できます。変更ストリームは集約フレームワークを使用するため、アプリケーションは特定の変更をフィルタリングしたり、通知を自由に変換したりすることもできます。

Change Streams Consumer を設定するには、consumerTypedatabasecollection、およびオプションの JSON プロパティー streamFilter を指定して、イベントをフィルタリングする必要があります。その JSON プロパティーは、MongoDB の標準的な $match 集計です。XML DSL 設定を使用して簡単に指定できます。

<route id="filterConsumer">
    <from uri="mongodb3:myDb?consumerType=changeStreams&amp;database=flights&amp;collection=tickets"/>
    <to uri="mock:test"/>

    <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/>
</route>

Java 設定:

from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets")
    .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}")
    .to("mock:test");
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.