272.9. 高度なトピック
272.9.1. バックプレッシャーの制御 (プロデューサー側)
Camel エクスチェンジを外部サブスクライバーにルーティングする場合、バックプレッシャーは、エクスチェンジを配信する前にキャッシュする内部バッファーによって処理されます。サブスクライバーがエクスチェンジレートよりも遅い場合、バッファーが大きくなりすぎる可能性があります。多くの場合、これは避ける必要があります。
次のルートを検討します。
from("jms:queue") .to("reactive-streams:flow");
JMS キューに多数のメッセージが含まれており、flow
ストリームに関連付けられているサブスクライバーが遅すぎる場合、メッセージは JMS からデキューされてバッファーに追加されるため、out of memory エラーが発生する可能性があります。このような問題を回避するために、ThrottlingInflightRoutePolicy
をルートに設定できます。
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); from("jms:queue") .routePolicy(policy) .to("reactive-streams:flow");
このポリシーは、アクティブなエクスチェンジの最大数 (およびバッファーの最大サイズ) を制限し、しきい値 (例では 10
) よりも低く保ちます。10
を超えるメッセージが処理中の場合、ルートは中断され、サブスクライバーがそれらを処理するのを待ちます。
このメカニズムにより、サブスクライバーは、バックプレッシャを通じてルートの一時停止/再開を自動的に制御します。複数のサブスクライバーが同じストリームからアイテムを消費している場合、最も遅いサブスクライバーがルートステータスを自動的に制御します。
他の状況では、例えば。http
コンシューマーを使用する場合、ルートの一時停止により http サービスが使用できなくなるため、デフォルトの設定 (ポリシーなし、無制限のバッファー) を使用することをお勧めします。ユーザーは、http サービスへの要求の数を制限する (例: スケールアウト) ことによって、メモリーの問題を回避するようにしてください。
一定量のデータ損失が許容されるコンテキストでは、BUFFER
以外のバックプレッシャストラテジーを設定することが、高速なソースを処理するためのソリューションになる可能性があります。
from("direct:thermostat") .to("reactive-streams:flow?backpressureStrategy=LATEST");
LATEST
バックプレッシャーストラテジーが使用される場合、ルートから受信した最後のエクスチェンジのみがパブリッシャーによって保持され、古いデータは破棄されます (他のオプションが利用可能です)。
272.9.2. バックプレッシャーの制御 (コンシューマー側)
Camel がリアクティブストリームパブリッシャーからアイテムを消費する場合、インフライトエクスチェンジの最大数をエンドポイントオプションとして設定できます。
コンシューマーに関連付けられたサブスクライバーは、パブリッシャーと対話して、ルート内のメッセージ数をしきい値より低く保ちます。
バックプレッシャ対応ルートの例:
from("reactive-streams:numbers?maxInflightExchanges=10") .to("direct:endpoint");
Camel がソースパブリッシャーに (リアクティブストリームバックプレッシャーメカニズムを介して) リクエストするアイテムの数は、常に 10
未満です。メッセージは、Camel 側の単一のスレッドによって処理されます。
同時コンシューマー (スレッド) の数は、エンドポイントオプション (concurrentConsumers
) として設定することもできます。1 つのコンシューマー (デフォルト) を使用する場合、ソースストリーム内の項目の順序が維持されます。この値を大きくすると、アイテムは複数のスレッドによって同時に処理されます (したがって、順序は保持されません)。