273.9. 高级主题
273.9.1. 控制 Backpressure (producer side)
当路由 Camel 交换到外部订阅者时,会由缓存在交付前交换的内部缓冲区处理后向。如果订阅者比交换速率慢,则缓冲区可能会变得太大。在很多情况下,必须避免这种情况。
考虑以下路由:
from("jms:queue") .to("reactive-streams:flow");
如果 JMS 队列包含大量消息,并且与 流流
关联的 Subscriber 太慢,消息会从 JMS 分离并附加到缓冲区中,可能会导致"内存不足"错误。要避免这样的问题,可以在路由中设置 ThrottlingInflightRoutePolicy
。
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); from("jms:queue") .routePolicy(policy) .to("reactive-streams:flow");
策略限制活跃交换的最大数量(以及最大缓冲区大小),保持其低于阈值(示例中为10
)。当有 10
个信息处于 flight 状态时,路由会被暂停,等待订阅者处理它们。
通过这种机制,订阅者通过后退自动控制路由暂停/恢复。当多个订阅者消耗来自同一流的项目时,会自动控制路由状态的速度。
在其他情况下,例如使用 http
消费者时,路由挂起会导致 http 服务不可用,因此应该优先使用默认配置(无策略、未绑定的缓冲)。用户应尝试避免内存问题,方法是限制对 http 服务的请求数(例如,横向扩展)。
在可以接受一定数量的数据丢失的情况下,设置 BUFFER
以外的后退策略可能是处理快速源的解决方案。
from("direct:thermostat") .to("reactive-streams:flow?backpressureStrategy=LATEST");
使用 LATEST
后端策略时,仅保留从路由接收的最后交换程序,同时删除旧数据(另一个选项可用)。
273.9.2. 控制 Backpressure (consumer side)
当 Camel 使用来自 reactive-streams 发布者的项目时,可以的最大动态交换数设为 endpoint 选项。
与消费者关联的订阅者与发布者交互,以保持路由中的消息数量低于阈值。
backpressure-aware 路由示例:
from("reactive-streams:numbers?maxInflightExchanges=10") .to("direct:endpoint");
Camel 向源发布者请求的项目数(通过 reactive 流回转机制)始终小于 10
。消息由 Camel 端的单个线程处理。
并发消费者(线程)数也可以设置为端点选项(concurrentConsumers
)。使用 1 消费者(默认)时,源流中的项目顺序会被维护。当这个值增加时,项目将由多个线程同时处理(不会保留顺序)。