308.6. 高级使用备注


308.6.1. 可插入连接资源管理

SJMS2 通过内置连接池提供 JMS 连接资源管理。 http://docs.oracle.com/javaee/5/api/javax/jms/Connection.html这消除了依赖第三方 API 池逻辑的需求。然而,您可能需要使用外部连接资源管理器,如 J2EE 或 OSGi 容器提供的资源管理器。对于此 SJMS2,提供了一个接口,可用于覆盖内部 SJMS2 连接池功能。这通过 ConnectionResource 接口来完成。

Connection Resource 提供了根据需要编写和返回 Connections 的方法,是用来向 SJMS2 组件提供连接池的合同。如果需要将 SJMS2 与外部连接池管理器集成,用户应使用它。

虽然对于标准 ConnectionFactory 提供程序,建议您使用 ConnectionFactoryResource 实施,该实施由 SJMS2 提供,或随着此组件进行了优化而扩展。

以下是使用带有 ActiveMQ PooledConnectionFactory 的可插件 ConnectionResource 的示例:

public class AMQConnectionResource implements ConnectionResource {
    private PooledConnectionFactory pcf;

    public AMQConnectionResource(String connectString, int maxConnections) {
        super();
        pcf = new PooledConnectionFactory(connectString);
        pcf.setMaxConnections(maxConnections);
        pcf.start();
    }

    public void stop() {
        pcf.stop();
    }

    @Override
    public Connection borrowConnection() throws Exception {
        Connection answer = pcf.createConnection();
        answer.start();
        return answer;
    }

    @Override
    public Connection borrowConnection(long timeout) throws Exception {
        // SNIPPED...
    }

    @Override
    public void returnConnection(Connection connection) throws Exception {
        // Do nothing since there isn't a way to return a Connection
        // to the instance of PooledConnectionFactory
        log.info("Connection returned");
    }
}
Copy to Clipboard Toggle word wrap

然后,将 ConnectionResource 传递给 Sjms2Component

CamelContext camelContext = new DefaultCamelContext();
AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
Sjms2Component component = new Sjms2Component();
component.setConnectionResource(pool);
camelContext.addComponent("sjms2", component);
Copy to Clipboard Toggle word wrap

要查看其使用的完整示例,请参阅 ConnectionResourceIT

soon …​

308.6.3. 批量消息支持

Sjms2Producer 支持通过创建封装列表 的 Exchange 发布一系列消息。然后,此 Sjms2Producer 将迭代 List 的内容,并单独发布每个消息。

如果生成批处理消息,则需要设置每个消息的唯一标头,您可以使用 SJMS2 BatchMessage 类。当 Sjms2Producer 遇到 BatchMessage 列表时,它将迭代每个 BatchMessage,并发布包含的有效负载和标头。

以下是使用 BatchMessage 类的示例。首先,我们创建一个 BatchMessage 列表:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}
Copy to Clipboard Toggle word wrap

然后发布列表:

template.sendBody("sjms2:queue:batch.queue", messages);
Copy to Clipboard Toggle word wrap

SJMS2 为开发人员提供了通过使用 TransactionCommitStrategy 接口创建自定义和可插入的事务策略的方法。这允许用户定义 SessionTransactionSynchronization 将用来决定何时提交会话的唯一场景。一个用法示例是 BatchTransactionCommitStrategy,其在下一节中会进一步详细介绍。

308.6.5. Transacted Batch Consumers & Producers

SJMS2 组件设计为支持在 Producer 和 Consumer 端点上对本地 JMS 事务的批处理。它们的处理方式都截然不同。

SJMS2 使用者端点是一种简单的实现,将在提交 X 消息之前进行处理,然后再提交相关会话。要在消费者上启用批处理事务,首先通过将 transacted 参数设置为 true 来启用事务,然后添加 transactionBatchCount 并将其设置为大于 0 的任何值。例如,以下配置将提交 Session 每 10 个信息:

sjms2:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10
Copy to Clipboard Toggle word wrap

如果在消费者端点处理批处理期间发生异常,则调用 Session rollback,从而导致消息被重新提供给下一个可用的消费者。对于关联的会话,计数器也会重置为 0 ( BatchTransactionCommitStrategy )。用户负责确保 hook 在批处理消息的处理器中放置 hook,以监视 JMSRedelivered 标头设置为 true 的消息。这是指示消息在某个时间点上回滚,并且应该验证成功处理。

转用批处理消费者还将其作为内部计时器实例,在消息上提交开放事务前等待默认时间(5000ms)。默认值 5000ms (最小 1000ms)应该足以满足大多数用例,但如果需要进一步调整,只需设置 transactionBatchTimeout 参数。

sjms2:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000
Copy to Clipboard Toggle word wrap

接受的最小值为 1000ms,因为上下文切换的数量可能会导致不必要的性能影响,而不会获得好处。

生产者端点的处理方式有很大不同。各个消息发送后生成者都会关闭 Exchange,并且不再引用该消息。要使所有消息都可用于重新发送,您只需在发布 BatchMessages 的 Producer 端点上启用事务。事务将在交换的结论下提交,后者包含批处理列表中的所有消息。不需要配置任何其他操作。例如:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}
Copy to Clipboard Toggle word wrap

现在,在启用了事务的情况下发布列表:

template.sendBody("sjms2:queue:batch.queue?transacted=true", messages);
Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat