257.8. 高度なトピック
257.8.1. Backpressure(producer side)の制御
Camel エクスチェンジを外部サブスクライバーにルーティングする場合、バックプレッシャーはエクスチェンジの配信前にエクスチェンジをキャッシュする内部バッファーによって処理されます。サブスクライバーがエクスチェンジレートよりも遅い場合、バッファーが大きすぎる可能性があります。多くの状況では、これを回避する必要があります。
以下のルートを考慮してください。
from("jms:queue") .to("reactive-streams:flow");
JMS キューには多数のメッセージが含まれ、フロー
ストリームに関連するサブスクライバーが遅すぎると、メッセージは JMS からキューになり、バッファーに追加され、「out of memory」エラーが発生する可能性があります。このような問題を回避するには、ThrottlingInflightRoutePolicy
をルートに設定できます。
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); from("jms:queue") .routePolicy(policy) .to("reactive-streams:flow");
このポリシーは、アクティブなエクスチェンジの最大数(バッファーの最大サイズなど)を制限することで、しきい値(例では10
)よりも小さくなります。10
を超えるメッセージが進行中の場合、ルートは一時停止され、サブスクライバーがメッセージを処理するのを待機します。
このメカニズムにより、サブスクライバーはバックプレッシャーを介してルートの一時停止/再開を自動的に制御します。複数のサブスクライバーが同じストリームからアイテムを消費すると、最も遅くなります。
他の状況では、http
コンシューマーを使用する場合などで、ルートは http サービスを利用不可にするので、デフォルト設定(ポリシーなし、バインドされていないバッファーなし)の使用が推奨されます。http サービスへの要求数を制限することにより、メモリーの問題の発生を防ぐ必要があります(例: スケールアウト)。
一定量のデータ損失が許可される場合、BUFFER
以外のバックプレシャーストラテジーを設定することは、高速ソースを処理するためのソリューションとなります。
from("direct:thermostat") .to("reactive-streams:flow?backpressureStrategy=LATEST");
LATEST
バックプレシャーストラテジーが使用される場合、ルートから受け取った最後のエクスチェンジのみがパブリッシャーによって保持され、古いデータは破棄されます(他のオプションも利用可能です)。