273.9. 고급 주제
273.9.1. 백업 제어(producer side)
Camel이 외부 구독자로 교환하는 경우 백압은 전달하기 전에 교환을 캐시하는 내부 버퍼에 의해 처리됩니다. 구독자가 교환 속도보다 느리면 버퍼가 너무 커질 수 있습니다. 많은 경우에 이는 피해야 합니다.
다음 경로를 고려하십시오.
from("jms:queue") .to("reactive-streams:flow");
JMS 큐에 많은 수의 메시지가 포함되어 있고 흐름
스트림과 연결된 구독자가 너무 느리면 메시지가 JMS에서 큐에 추가되고 버퍼에 추가되므로 "메모리 부족" 오류가 발생할 수 있습니다. 이러한 문제를 방지하기 위해 경로에 ThrottlingInflightRoutePolicy
를 설정할 수 있습니다.
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); from("jms:queue") .routePolicy(policy) .to("reactive-streams:flow");
이 정책은 활성 교환의 최대 수(및 버퍼의 최대 크기)를 제한하여 임계값(예:10
)보다 낮게 유지합니다. 10
개 이상의 메시지가 전송되면 경로가 중지되어 구독자가 처리할 때까지 기다립니다.
이 메커니즘을 사용하여 구독자는 백압을 통해 경로 일시 중지/복구를 자동으로 제어합니다. 여러 구독자가 동일한 스트림의 항목을 사용하는 경우 가장 느린 사용자가 경로 상태를 자동으로 제어합니다.
다른 경우(예: http
소비자를 사용하는 경우 경로 일시 중지를 사용하면 http 서비스를 사용할 수 없으므로 기본 구성(정책 없음, 바인딩되지 않은 버퍼)을 사용하는 것이 좋습니다. 사용자는 http 서비스에 대한 요청 수를 제한하여 메모리 문제를 방지해야 합니다(예: 축소).
특정 양의 데이터 손실이 허용되는 상황에서 BUFFER
이외의 백업 전략을 설정하면 빠른 소스를 처리하는 솔루션이 될 수 있습니다.
from("direct:thermostat") .to("reactive-streams:flow?backpressureStrategy=LATEST");
LATEST
backpressure 전략이 사용되는 경우 이전 데이터가 삭제되는 동안 경로에서 수신된 마지막 교환만 유지됩니다(다른 옵션을 사용할 수 있음).
273.9.2. 백업 제어(consumer side)
Camel이 reactive-streams 게시자의 항목을 사용하는 경우 최대 진행 중인 교환 수를 끝점 옵션으로 설정할 수 있습니다.
소비자와 연결된 구독자는 경로에 있는 메시지 수를 임계값보다 낮게 유지하기 위해 게시자와 상호 작용합니다.
backpressure 인식 경로의 예:
from("reactive-streams:numbers?maxInflightExchanges=10") .to("direct:endpoint");
Camel이 소스 게시자에 요청하는 항목(반복 스트림 백압 메커니즘을 통해)은 항상 10
보다 적습니다. 메시지는 Camel 측의 단일 스레드에 의해 처리됩니다.
동시 소비자(스레드) 수를 엔드포인트 옵션(concurrentConsumers
)으로 설정할 수도 있습니다. 1 소비자(기본값)를 사용하는 경우 소스 스트림의 항목 순서가 유지됩니다. 이 값이 증가하면 여러 스레드에 의해 항목이 동시에 처리됩니다(그러한 순서를 유지하지 않음).