273.9. 高级主题
273.9.1. 控制 Backpressure (producer side)
当将 Camel 交换路由到外部订阅者时,backpressure 由缓存交换的内部缓冲区处理,然后再提供它们。如果订阅者比交换率慢,则缓冲区可能会变得太大。在很多情况下,必须避免这样做。
考虑以下路由:
from("jms:queue") .to("reactive-streams:flow");
如果 JMS 队列包含大量消息,并且与 流流
关联的 Subscriber 太慢,则消息会从 JMS 中分离并附加到缓冲区,可能会导致"超出内存"错误。要避免这些问题,可以在路由中设置 ThrottlingInflightRoutePolicy
。
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); from("jms:queue") .routePolicy(policy) .to("reactive-streams:flow");
该策略限制了活动交换的最大数量(以及最大缓冲区大小),保持其低于阈值(示例中为10
)。当超过 10
个信息时,路由会被挂起,等待订阅者处理它们。
使用这个机制,订阅者通过后退自动控制路由挂起/恢复。当多个订阅者消耗同一流中的项目时,最慢的人会自动控制路由状态。
在其他情况下,例如在使用 http
消费者时,路由挂起使 http 服务不可用,因此应最好使用默认配置(no policy, unbounded buffer)。用户应尝试避免内存问题,方法是将请求数量限制为 http 服务(例如,横向扩展)。
在某些数量数据丢失的情况下,设置 BUFFER
以外的后端策略是处理快速源的解决方案。
from("direct:thermostat") .to("reactive-streams:flow?backpressureStrategy=LATEST");
当使用 LATEST
backpressure 策略时,只有从路由接收的最后交换才会被发布者保留,而旧的数据会被丢弃(其它选项可用)。
273.9.2. 控制 Backpressure (消费者侧)
当 Camel 使用 reactive-streams publisher 中的项目时,可以将动态交换的最大数量设置为端点选项。
与消费者关联的订阅者与发布者交互,以保持路由中的消息数量低于阈值。
backpressure-aware 路由示例:
from("reactive-streams:numbers?maxInflightExchanges=10") .to("direct:endpoint");
Camel 对源发布者请求的项目数量(通过被动流恢复机制)始终低于 10
。消息由 Camel 端的单一线程处理。
并发消费者(线程)的数量也可以设置为端点选项(concurrentConsumers
)。使用 1 个消费者(默认值)时,会维护源流中的项目顺序。当增加这个值时,项目将被多个线程并发处理(因此不会保留订购)。