272.9. 高级主题
272.9.1. 控制 Backpressure (重现方)
将 Camel 交换路由到外部订阅者时,由内部缓冲区处理,该缓冲区在传送之前缓存交换。如果订阅者比交换率慢,则缓冲区可能会变得太大。在很多情况下,必须避免出现这种情况。
考虑以下路由:
from("jms:queue") .to("reactive-streams:flow");
如果 JMS 队列包含大量消息,并且与 流流
关联的 Subscriber 太慢,则消息会从 JMS 进行排队并附加到缓冲区中,这可能会导致"内存不足"错误。为避免这个问题,可在路由中设置 ThrottlingInflightRoutePolicy
。
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); from("jms:queue") .routePolicy(policy) .to("reactive-streams:flow");
策略限制活动交换的最大数量(以及缓冲区的最大大小),使其小于阈值(示例中为10
)。当超过 10
条消息时,该路由将被暂停,等待订阅者处理它们。
使用这种机制时,订阅者通过回溯方式自动控制路由挂起/恢复。当多个订阅者消耗同一流的项目时,自动控制路由状态最慢。
在其他情况下,例如,在使用 http
消费者时,路由挂起使 http 服务不可用,因此将使用默认配置(无策略,未绑定的缓冲)应首选使用。用户应该通过将请求数量限制为 http 服务(例如,横向扩展)来避免内存问题。
在可以接受某些数量数据丢失的情况下,设置 BUFFER
以外的回溯策略可能是处理快速源的解决方案。
from("direct:thermostat") .to("reactive-streams:flow?backpressureStrategy=LATEST");
当使用 LATEST
backpressure 策略时,发布者只保留从该路由接收的最后一次交换,同时丢弃较旧的数据(其它选项可用)。