272.9. 고급 주제
272.9.1. Backpressure 제어 (producer side)
Camel을 외부 구독자에게 라우팅할 때 백pressure는 전달하기 전에 교환을 캐시하는 내부 버퍼에 의해 처리됩니다. 구독자가 교환 속도보다 느리면 버퍼가 너무 클 수 있습니다. 많은 상황에서 이것은 피해야 합니다.
다음 경로를 고려해 보십시오.
from("jms:queue") .to("reactive-streams:flow");
JMS 큐에 많은 수의 메시지가 포함되어 있으며 흐름
스트림과 연결된 구독자가 너무 느리면 메시지가 JMS에서 큐에 추가되고 버퍼에 추가되어 "메모리 부족" 오류가 발생할 수 있습니다. 이러한 문제를 방지하기 위해 경로에 ThrottlingInflightRoutePolicy
를 설정할 수 있습니다.
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); from("jms:queue") .routePolicy(policy) .to("reactive-streams:flow");
이 정책은 활성 교환(및 버퍼의 최대 크기)을 제한하며 임계값(예:10
)보다 낮게 유지합니다. 10
개 이상의 메시지가 진행 중인 경우 경로가 일시 중지되어 구독자가 이를 처리할 때까지 기다립니다.
이 메커니즘을 통해 구독자는 백 압축을 통해 경로 일시 중지/재개 자동으로 제어합니다. 여러 구독자가 동일한 스트림에서 항목을 소비하는 경우 가장 느린 구독자가 경로 상태를 자동으로 제어합니다.
다른 상황에서는 http 소비자를 사용할 때 경로 일시 중지를 통해 http
서비스를 사용할 수 없게 되므로 기본 구성(Policy, 바인딩되지 않은 버퍼 없음)을 사용하는 것이 좋습니다. 사용자는 http 서비스로의 요청 수를 제한하여 메모리 문제를 방지해야 합니다(예: 확장).
특정 양의 데이터 손실이 허용되는 컨텍스트에서 BUFFER
이외의 백업 전략을 설정하면 빠른 소스를 처리하는 솔루션이 될 수 있습니다.
from("direct:thermostat") .to("reactive-streams:flow?backpressureStrategy=LATEST");
LATEST
백pressure 전략을 사용하면 경로에서 수신한 마지막 교환만 게시자에 의해 유지되는 반면 이전 데이터는 삭제됩니다(기타 옵션 사용 가능).
272.9.2. Backpressure (consumer side)
Camel이 Reactive-streams 게시자의 항목을 사용하는 경우 최대 inflight 기능 그룹을 끝점 옵션으로 설정할 수 있습니다.
소비자와 연결된 구독자는 경로의 메시지 수를 임계값보다 낮게 유지하기 위해 게시자와 상호 작용합니다.
backpressure 인식 경로의 예:
from("reactive-streams:numbers?maxInflightExchanges=10") .to("direct:endpoint");
Camel이 소스 게시자에 요청하는 항목 수(재동 스트림 백pressure 메커니즘을 통한)는 항상 10
보다 낮습니다. 메시지는 Camel 측에서 단일 스레드에 의해 처리됩니다.
동시 소비자(스레드) 수는 끝점 옵션(currentConsumers
)으로 설정할 수도 있습니다. 1개의 소비자(기본값)를 사용하는 경우 소스 스트림의 항목 순서가 유지됩니다. 이 값이 증가하면 여러 스레드에 의해 항목이 동시에 처리되므로 순서가 유지되지 않습니다.