11.7. durable Subscriber
概述
一个 持久的订阅者 (如 图 11.6 “Durable Subscriber Pattern” )是希望接收通过特定 第 6.2 节 “publish-Subscribe Channel” 频道发送的所有消息的用户,包括在消费者与消息传递系统断开连接时发送的消息。这要求消息传递系统存储消息,以便稍后重新显示到断开连接的消费者。还需要一种机制供消费者表示它要建立持久的订阅。通常,发布订阅频道(或主题)可以同时有持久性和不可持续的订阅者,其行为如下:
- 非可伸缩订阅者 带有两个状态:connect 和 disconnected 。虽然一个不可导览的订阅者连接到一个主题,但它会实时接收所有主题的消息。但是,当订阅者断开连接时,一个不可持续的订阅者永远不会接收发送到主题的消息。
- durable subscriber ImporterCan 有两个状态: connected 和 inactive。非活动状态表示持久的订阅者与主题断开连接,但希望接收到达该主题的消息。当 durable 订阅者重新连接到该主题时,它会收到在不活跃时发送的所有消息的重播。
图 11.6. Durable Subscriber Pattern
JMS durable 订阅者
JMS 组件实施持久的订阅者模式。要在 JMS 端点上设置 durable 订阅,您必须指定 客户端 ID,它标识此特定连接,以及一个持久的 订阅名称,用于标识持久的订阅者。例如,以下路由为 JMS 主题设置持久的订阅,新闻中包含
客户端 ID conn01
和 durable 订阅名称 John.Doe
:
from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe"). to("cxf:bean:newsprocessor");
您还可以使用 ActiveMQ 端点设置持久订阅:
from("activemq:topic:news?clientId=conn01&durableSubscriptionName=John.Doe"). to("cxf:bean:newsprocessor");
如果要同时处理传入的消息,您可以使用 SEDA 端点将路由分到多个并行网段中,如下所示:
from("jms:topic:news?clientId=conn01&durableSubscriptionName=John.Doe"). to("seda:fanout"); from("seda:fanout").to("cxf:bean:newsproc01"); from("seda:fanout").to("cxf:bean:newsproc02"); from("seda:fanout").to("cxf:bean:newsproc03");
每条消息仅处理一次,因为 SEDA 组件支持 竞争消费者 模式。
备用示例
另一种选择是将 第 11.5 节 “消息 Dispatcher” 或 第 8.1 节 “基于内容的路由器” 与持久用户的文件或 JPA 组件合并,然后是非持久的 SEDA 等。https://access.redhat.com/documentation/en-us/red_hat_fuse/7.8/html-single/apache_camel_component_reference/index#file-component
以下是创建持久订阅者到 JMS 主题的简单示例
from("direct:start").to("activemq:topic:foo"); from("activemq:topic:foo?clientId=1&durableSubscriptionName=bar1").to("mock:result1"); from("activemq:topic:foo?clientId=2&durableSubscriptionName=bar2").to("mock:result2");
<route> <from uri="direct:start"/> <to uri="activemq:topic:foo"/> </route> <route> <from uri="activemq:topic:foo?clientId=1&durableSubscriptionName=bar1"/> <to uri="mock:result1"/> </route> <route> <from uri="activemq:topic:foo?clientId=2&durableSubscriptionName=bar2"/> <to uri="mock:result2"/> </route>
以下是 JMS durable 订阅者的另一个示例,但这次 使用虚拟主题 (AMQ over durable 订阅推荐)
使用 Fluent Builders
from("direct:start").to("activemq:topic:VirtualTopic.foo"); from("activemq:queue:Consumer.1.VirtualTopic.foo").to("mock:result1"); from("activemq:queue:Consumer.2.VirtualTopic.foo").to("mock:result2");
<route> <from uri="direct:start"/> <to uri="activemq:topic:VirtualTopic.foo"/> </route> <route> <from uri="activemq:queue:Consumer.1.VirtualTopic.foo"/> <to uri="mock:result1"/> </route> <route> <from uri="activemq:queue:Consumer.2.VirtualTopic.foo"/> <to uri="mock:result2"/> </route>