273.9. 高级主题


273.9.1. 控制 Backpressure (producer side)

当将 Camel 交换路由到外部订阅者时,backpressure 由缓存交换的内部缓冲区处理,然后再提供它们。如果订阅者比交换率慢,则缓冲区可能会变得太大。在很多情况下,必须避免这样做。

考虑以下路由:

from("jms:queue")
.to("reactive-streams:flow");

如果 JMS 队列包含大量消息,并且与 流流 关联的 Subscriber 太慢,则消息会从 JMS 中分离并附加到缓冲区,可能会导致"超出内存"错误。要避免这些问题,可以在路由中设置 ThrottlingInflightRoutePolicy

ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
policy.setMaxInflightExchanges(10);

from("jms:queue")
.routePolicy(policy)
.to("reactive-streams:flow");

该策略限制了活动交换的最大数量(以及最大缓冲区大小),保持其低于阈值(示例中为10 )。当超过 10 个信息时,路由会被挂起,等待订阅者处理它们。

使用这个机制,订阅者通过后退自动控制路由挂起/恢复。当多个订阅者消耗同一流中的项目时,最慢的人会自动控制路由状态。

在其他情况下,例如在使用 http 消费者时,路由挂起使 http 服务不可用,因此应最好使用默认配置(no policy, unbounded buffer)。用户应尝试避免内存问题,方法是将请求数量限制为 http 服务(例如,横向扩展)。

在某些数量数据丢失的情况下,设置 BUFFER 以外的后端策略是处理快速源的解决方案。

from("direct:thermostat")
.to("reactive-streams:flow?backpressureStrategy=LATEST");

当使用 LATEST backpressure 策略时,只有从路由接收的最后交换才会被发布者保留,而旧的数据会被丢弃(其它选项可用)。

273.9.2. 控制 Backpressure (消费者侧)

当 Camel 使用 reactive-streams publisher 中的项目时,可以将动态交换的最大数量设置为端点选项。

与消费者关联的订阅者与发布者交互,以保持路由中的消息数量低于阈值。

backpressure-aware 路由示例:

from("reactive-streams:numbers?maxInflightExchanges=10")
.to("direct:endpoint");

Camel 对源发布者请求的项目数量(通过被动流恢复机制)始终低于 10。消息由 Camel 端的单一线程处理。

并发消费者(线程)的数量也可以设置为端点选项(concurrentConsumers)。使用 1 个消费者(默认值)时,会维护源流中的项目顺序。当增加这个值时,项目将被多个线程并发处理(因此不会保留订购)。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.