11.4. 竞争消费者
概述
图 11.3 “竞争消费者模式” 中显示的 竞争消费者 模式使多个用户能够从同一队列拉取信息,保证 每个消息仅消耗一次。此模式可用于将串行消息处理替换为并发消息处理(从而降低响应延迟)。
图 11.3. 竞争消费者模式
以下组件演示了竞争消费者模式:
基于 JMS 的竞争消费者
常规 JMS 队列隐式保证每个消息只能被一次使用。因此,JMS 队列会自动支持竞争消费者模式。例如,您可以定义三个从 JMS 队列 HighVolumeQ
拉取消息的竞争用户,如下所示:
from("jms:HighVolumeQ").to("cxf:bean:replica01"); from("jms:HighVolumeQ").to("cxf:bean:replica02"); from("jms:HighVolumeQ").to("cxf:bean:replica03");
如果 CXF (Web 服务)端点、replica
01、replica02
和 replica03
,并行处理来自 HighVolumeQ
队列的消息。
或者,您也可以设置 JMS 查询选项 concurrentConsumers
,以创建竞争消费者的线程池。例如,以下路由创建了三个竞争线程池,从指定队列中提取消息:
from("jms:HighVolumeQ?concurrentConsumers=3").to("cxf:bean:replica01");
可以在 XML DSL 中指定 concurrentConsumers
选项,如下所示:
<route> <from uri="jms:HighVolumeQ?concurrentConsumers=3"/> <to uri="cxf:bean:replica01"/> </route>
JMS 主题 不支持竞争消费者模式。按照定义,JMS 主题旨在向不同的消费者发送同一消息的多个副本。因此,它与竞争消费者模式不兼容。
基于 SEDA 的竞争消费者
SEDA 组件的目的是通过将计算拆分为阶段来简化并发处理。SEDA 端点基本上封装了内存阻塞队列(由 java.util.concurrent.BlockingQueue
实施)。因此,您可以使用 SEDA 端点将路由分为多个阶段,每个阶段都可能使用多个线程。例如,您可以定义由两个阶段组成的 SEDA 路由,如下所示:
// Stage 1: Read messages from file system. from("file://var/messages").to("seda:fanout"); // Stage 2: Perform concurrent processing (3 threads). from("seda:fanout").to("cxf:bean:replica01"); from("seda:fanout").to("cxf:bean:replica02"); from("seda:fanout").to("cxf:bean:replica03");
其中第一个阶段包含一个线程,它消耗来自文件端点 file://var/messages
的消息,并将它们路由到 SEDA 端点 seda:fanout
。第二个阶段包含三个线程:一个线程,它路由到 cxf:bean:replica01
,一个线程路由到 cxf:bean:replica02
,以及一个路由交换到 cxf:bean:replica03
的线程。这三个线程竞争从 SEDA 端点获取交换实例,该端点使用阻塞队列来实施。因为阻塞队列使用锁定来防止多个线程一次访问队列,所以您可以保证每个交换实例只能被消耗一次。
有关 SEDA 端点和由 thread ()
创建的线程池之间的区别,请参阅 Apache Camel 组件参考指南 中的 SEDA 组件。