307.6. 高级使用备注
307.6.1. 可插入连接资源管理
SJMS 通过内置连接池提供 JMS 连接资源管理。 http://docs.oracle.com/javaee/5/api/javax/jms/Connection.html这消除了依赖于第三方 API 池逻辑的需求。但是,有时您可能需要使用外部连接资源管理器,如 J2EE 或 OSGi 容器提供的。对于此 SJMS,提供了一个可用于覆盖内部 SJMS 连接池功能的接口。这可以通过
ConnectionResource
接口来完成。
Connection
Resource 提供根据需要浏览和返回连接的方法,是用于向 SJMS 组件提供连接池的合同。当用户需要时,应使用,将 SJMS 与外部连接池管理器集成。
建议在标准 ConnectionFactory
提供程序中使用 SJMS as-is 提供的 ConnectionFactoryResource
实现或扩展,因为它针对此组件进行了优化。
以下是使用带有 ActiveMQ PooledConnectionFactory
的可插件 ConnectionResource 的示例:
public class AMQConnectionResource implements ConnectionResource { private PooledConnectionFactory pcf; public AMQConnectionResource(String connectString, int maxConnections) { super(); pcf = new PooledConnectionFactory(connectString); pcf.setMaxConnections(maxConnections); pcf.start(); } public void stop() { pcf.stop(); } @Override public Connection borrowConnection() throws Exception { Connection answer = pcf.createConnection(); answer.start(); return answer; } @Override public Connection borrowConnection(long timeout) throws Exception { // SNIPPED... } @Override public void returnConnection(Connection connection) throws Exception { // Do nothing since there isn't a way to return a Connection // to the instance of PooledConnectionFactory log.info("Connection returned"); } }
然后,将 ConnectionResource
传递给 SjmsComponent
:
CamelContext camelContext = new DefaultCamelContext(); AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1); SjmsComponent component = new SjmsComponent(); component.setConnectionResource(pool); camelContext.addComponent("sjms", component);
要查看其用法的完整示例,请参阅 ConnectionResourceIT
。
307.6.2. 批处理消息支持
SjmsProducer 支持通过创建封装列表的交换来发布一系列消息。然后,此 SjmsProducer 将迭代 List 的内容,并单独发布每个消息。
如果生成批处理消息,则需要设置对每条消息的标头是唯一的,您可以使用 SJMS BatchMessage
类。当 SjmsProducer 遇到 BatchMessage
列表时,它将迭代每个 BatchMessage
并发布包含的有效负载和标头。
以下是使用 BatchMessage 类的示例。首先,我们创建一个 BatchMessage
列表:
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); for (int i = 1; i <= messageCount; i++) { String body = "Hello World " + i; BatchMessage<String> message = new BatchMessage<String>(body, null); messages.add(message); }
然后发布列表:
template.sendBody("sjms:queue:batch.queue", messages);
307.6.3. 可自定义的事务提交策略(仅限本地 JMS 事务)
SJMS 为开发人员提供了一种使用 TransactionCommitStrategy
接口创建自定义和可插入的事务策略的方法。这允许用户定义 SessionTransactionSynchronization
将用来决定何时提交会话的唯一情况。它使用的示例是 BatchTransactionCommitStrategy
,在下一节中详细介绍。
307.6.4. transacted Batch Consumers 和 Producers
SJMS 组件旨在支持 Producer 和 Consumer 端点上的本地 JMS 事务批处理。然而,每次处理它们的方式都有所不同。
SJMS 消费者端点是一种简单的实现,它将在与相关会话提交 X 消息之前处理 X 消息。要在消费者上启用批处理事务,首先通过将 transacted
参数设置为 true 来启用事务,然后将 transactionBatchCount
设置为大于 0 的任何值。例如,以下配置将每 10 个信息提交 Session:
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10
如果在消费者端点处理批处理时发生异常,则调用 Session 回滚,从而导致消息被恢复为下一个可用的消费者。对于关联的会话,计数器也会重置为 BatchTransactionCommitStrategy
。用户负责确保它们在批处理消息的处理器中放入钩子,以观察其将 JMSRedelivered 标头设置为 true 的消息。这是在某个时候回滚消息,并且应该进行成功处理的验证。
转换的批处理消费者也会与它一起传输,内部计时器的实例会在提交会话上打开事务前等待消息之间的默认时间(5000ms)。默认值 5000ms (最小 1000ms)应该足以满足大多数用例,但如果需要进一步调优,只需设置 transactionBatchTimeout
参数。
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000
将接受的最小值是 1000ms,因为上下文切换量可能会导致不必要的性能影响,而不会获得好处。
虽然处理制作者端点的方式略有不同。各个消息后生产者都发送到其目的地,Exchange 已关闭,并且不再引用该消息。要使所有消息都可用于重新发送,只需在发布 BatchMessages 的 Producer Endpoints 上启用事务。事务将在交换结束时提交,其中包括批处理列表中的所有消息。不需要配置任何其他操作。例如:
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>(); for (int i = 1; i <= messageCount; i++) { String body = "Hello World " + i; BatchMessage<String> message = new BatchMessage<String>(body, null); messages.add(message); }
现在,发布启用事务的列表:
template.sendBody("sjms:queue:batch.queue?transacted=true", messages);