307.6. 高级使用备注


307.6.1. 可插入连接资源管理

SJMS 通过内置连接池提供 JMS 连接资源管理。 http://docs.oracle.com/javaee/5/api/javax/jms/Connection.html这消除了依赖第三方 API 池逻辑的需求。然而,您可能需要使用外部连接资源管理器,如 J2EE 或 OSGi 容器提供的资源管理器。对于此 SJMS,提供了一个接口,可用于覆盖内部 SJMS 连接池。这通过 ConnectionResource 接口来完成。

Connection Resource 提供了根据需要编写和返回 Connections 的方法,是用来向 SJMS 组件提供连接池的合同。必要时,用户应使用 SJMS 与外部连接池管理器集成。

虽然对于标准 ConnectionFactory 提供程序,建议您使用 ConnectionFactoryResource 实施(由 SJMS as-is 提供),或将其扩展为此组件。

以下是使用带有 ActiveMQ PooledConnectionFactory 的可插件 ConnectionResource 的示例:

public class AMQConnectionResource implements ConnectionResource {
    private PooledConnectionFactory pcf;

    public AMQConnectionResource(String connectString, int maxConnections) {
        super();
        pcf = new PooledConnectionFactory(connectString);
        pcf.setMaxConnections(maxConnections);
        pcf.start();
    }

    public void stop() {
        pcf.stop();
    }

    @Override
    public Connection borrowConnection() throws Exception {
        Connection answer = pcf.createConnection();
        answer.start();
        return answer;
    }

    @Override
    public Connection borrowConnection(long timeout) throws Exception {
        // SNIPPED...
    }

    @Override
    public void returnConnection(Connection connection) throws Exception {
        // Do nothing since there isn't a way to return a Connection
        // to the instance of PooledConnectionFactory
        log.info("Connection returned");
    }
}
Copy to Clipboard Toggle word wrap

然后,将 ConnectionResource 传递给 SjmsComponent

CamelContext camelContext = new DefaultCamelContext();
AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
SjmsComponent component = new SjmsComponent();
component.setConnectionResource(pool);
camelContext.addComponent("sjms", component);
Copy to Clipboard Toggle word wrap

要查看其使用的完整示例,请参阅 ConnectionResourceIT

307.6.2. 批量消息支持

SjmsProducer 支持通过创建封装列表 的交换来发布一系列消息。然后,此 SjmsProducer 会迭代 List 的内容并单独发布每条消息。

如果生成批处理消息,则需要设置每个消息的唯一标头,您可以使用 SJMS BatchMessage 类。当 SjmsProducer 遇到 BatchMessage 列表时,它将迭代每个 BatchMessage,并发布包含的有效负载和标头。

以下是使用 BatchMessage 类的示例。首先,我们创建一个 BatchMessage 列表:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}
Copy to Clipboard Toggle word wrap

然后发布列表:

template.sendBody("sjms:queue:batch.queue", messages);
Copy to Clipboard Toggle word wrap

SJMS 为开发人员提供了利用 TransactionCommitStrategy 接口创建自定义和可插入的事务策略的方法。这允许用户定义 SessionTransactionSynchronization 将用来决定何时提交会话的唯一场景。一个用法示例是 BatchTransactionCommitStrategy,其在下一节中会进一步详细介绍。

307.6.4. Transacted Batch Consumers & Producers

SJMS 组件设计为支持在 Producer 和 Consumer 端点上对本地 JMS 事务的批处理。它们的处理方式都截然不同。

SJMS 使用者端点是一种简单的实现,将在提交 X 消息之前处理 X 消息,然后再提交相关会话。要在消费者上启用批处理事务,首先通过将 transacted 参数设置为 true 来启用事务,然后添加 transactionBatchCount 并将其设置为大于 0 的任何值。例如,以下配置将提交 Session 每 10 个信息:

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10
Copy to Clipboard Toggle word wrap

如果在消费者端点处理批处理期间发生异常,则调用 Session rollback,从而导致消息被重新提供给下一个可用的消费者。对于关联的会话,计数器也会重置为 0 ( BatchTransactionCommitStrategy )。用户负责确保 hook 在批处理消息的处理器中放置 hook,以监视 JMSRedelivered 标头设置为 true 的消息。这是指示消息在某个时间点上回滚,并且应该验证成功处理。

转用批处理消费者还将其作为内部计时器实例,在消息上提交开放事务前等待默认时间(5000ms)。默认值 5000ms (最小 1000ms)应该足以满足大多数用例,但如果需要进一步调整,只需设置 transactionBatchTimeout 参数。

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000
Copy to Clipboard Toggle word wrap

接受的最小值为 1000ms,因为上下文切换的数量可能会导致不必要的性能影响,而不会获得好处。

生产者端点的处理方式有很大不同。各个消息发送后生成者都会关闭 Exchange,并且不再引用该消息。要使所有消息都可用于重新发送,您只需在发布 BatchMessages 的 Producer 端点上启用事务。事务将在交换的结论下提交,后者包含批处理列表中的所有消息。不需要配置任何其他操作。例如:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}
Copy to Clipboard Toggle word wrap

现在,在启用了事务的情况下发布列表:

template.sendBody("sjms:queue:batch.queue?transacted=true", messages);
Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat