273.9. 高级主题


273.9.1. 控制 Backpressure (producer side)

当路由 Camel 交换到外部订阅者时,会由缓存在交付前交换的内部缓冲区处理后向。如果订阅者比交换速率慢,则缓冲区可能会变得太大。在很多情况下,必须避免这种情况。

考虑以下路由:

from("jms:queue")
.to("reactive-streams:flow");

如果 JMS 队列包含大量消息,并且与 流流 关联的 Subscriber 太慢,消息会从 JMS 分离并附加到缓冲区中,可能会导致"内存不足"错误。要避免这样的问题,可以在路由中设置 ThrottlingInflightRoutePolicy

ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
policy.setMaxInflightExchanges(10);

from("jms:queue")
.routePolicy(policy)
.to("reactive-streams:flow");

策略限制活跃交换的最大数量(以及最大缓冲区大小),保持其低于阈值(示例中为10 )。当有 10 个信息处于 flight 状态时,路由会被暂停,等待订阅者处理它们。

通过这种机制,订阅者通过后退自动控制路由暂停/恢复。当多个订阅者消耗来自同一流的项目时,会自动控制路由状态的速度。

在其他情况下,例如使用 http 消费者时,路由挂起会导致 http 服务不可用,因此应该优先使用默认配置(无策略、未绑定的缓冲)。用户应尝试避免内存问题,方法是限制对 http 服务的请求数(例如,横向扩展)。

在可以接受一定数量的数据丢失的情况下,设置 BUFFER 以外的后退策略可能是处理快速源的解决方案。

from("direct:thermostat")
.to("reactive-streams:flow?backpressureStrategy=LATEST");

使用 LATEST 后端策略时,仅保留从路由接收的最后交换程序,同时删除旧数据(另一个选项可用)。

273.9.2. 控制 Backpressure (consumer side)

当 Camel 使用来自 reactive-streams 发布者的项目时,可以的最大动态交换数设为 endpoint 选项。

与消费者关联的订阅者与发布者交互,以保持路由中的消息数量低于阈值。

backpressure-aware 路由示例:

from("reactive-streams:numbers?maxInflightExchanges=10")
.to("direct:endpoint");

Camel 向源发布者请求的项目数(通过 reactive 流回转机制)始终小于 10。消息由 Camel 端的单个线程处理。

并发消费者(线程)数也可以设置为端点选项(concurrentConsumers)。使用 1 消费者(默认)时,源流中的项目顺序会被维护。当这个值增加时,项目将由多个线程同时处理(不会保留顺序)。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.