11.4. 竞争消费者


概述

图 11.3 “竞争消费者模式” 中显示的 竞争消费者 模式使多个消费者能够从同一队列拉取信息,并保证 每个消息仅消耗一次。此模式可用于将串行消息处理替换为并发消息处理(响应延迟出现相应的减少)。

图 11.3. 竞争消费者模式

竞争消费者模式

以下组件演示了竞争消费者模式:

基于 JMS 的竞争消费者

定期的 JMS 队列会隐式保证每个消息一次只能被使用。因此,JMS 队列自动支持竞争消费者模式。例如,您可以定义三个竞争消费者,从 JMS 队列 HighVolumeQ 中拉取消息,如下所示:

from("jms:HighVolumeQ").to("cxf:bean:replica01");
from("jms:HighVolumeQ").to("cxf:bean:replica02");
from("jms:HighVolumeQ").to("cxf:bean:replica03");

其中 CXF (Web 服务)端点、replica 01、replica02replica03,请并行处理来自 HighVolumeQ 队列的消息。

或者,您可以设置 JMS 查询选项 concurrentConsumers,以创建竞争消费者的线程池。例如,以下路由会创建一个三个竞争线程池,该池从指定队列中选择消息:

from("jms:HighVolumeQ?concurrentConsumers=3").to("cxf:bean:replica01");

并且 concurrentConsumers 选项也可以在 XML DSL 中指定,如下所示:

 <route>
   <from uri="jms:HighVolumeQ?concurrentConsumers=3"/>
   <to uri="cxf:bean:replica01"/>
 </route>
注意

JMS 主题 无法支持竞争消费者模式。按照定义,一个 JMS 主题旨在向不同的使用者发送相同消息的多个副本。因此,它与竞争消费者模式不兼容。

基于 SEDA 的竞争消费者

SEDA 组件的目的是通过将计算划分为阶段来简化并发处理。SEDA 端点基本上封装内存阻塞队列(由 java.util.concurrent.BlockingQueue实施)。因此,您可以使用 SEDA 端点将路由分成阶段,其中每个阶段可能使用多个线程。例如,您可以定义一个由两个阶段组成的 SEDA 路由,如下所示:

// Stage 1: Read messages from file system.
from("file://var/messages").to("seda:fanout");

// Stage 2: Perform concurrent processing (3 threads).
from("seda:fanout").to("cxf:bean:replica01");
from("seda:fanout").to("cxf:bean:replica02");
from("seda:fanout").to("cxf:bean:replica03");

第一个阶段包含一个线程,它会消耗来自文件端点 file://var/messages 并将其路由到 SEDA 端点 seda:fanout。第二个阶段包含三个线程:一个线程将交换到 cxf:bean:replica01、路由交换到 cxf:bean:replica02 的线程,以及路由交换到 cxf:bean:replica03 的线程。这三个线程竞争为从 SEDA 端点获取交换实例,该端点使用阻塞队列来实施。由于阻塞队列使用锁定来防止多个线程每次访问队列,因此您可以保证每个交换实例只能消耗一次。

有关 SEDA 端点和 thread () 创建的线程池之间的区别,请参阅 NameOfCamelCompRef 中的 SEDA 组件

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.