307.6. 高级使用备注


307.6.1. 可插入连接资源管理

SJMS 通过内置连接池提供 JMS 连接资源管理。 http://docs.oracle.com/javaee/5/api/javax/jms/Connection.html这消除了依赖于第三方 API 池逻辑的需求。但是,有时您可能需要使用外部连接资源管理器,如 J2EE 或 OSGi 容器提供的。对于此 SJMS,提供了一个可用于覆盖内部 SJMS 连接池功能的接口。这可以通过 ConnectionResource 接口来完成。

Connection Resource 提供根据需要浏览和返回连接的方法,是用于向 SJMS 组件提供连接池的合同。当用户需要时,应使用,将 SJMS 与外部连接池管理器集成。

建议在标准 ConnectionFactory 提供程序中使用 SJMS as-is 提供的 ConnectionFactoryResource 实现或扩展,因为它针对此组件进行了优化。

以下是使用带有 ActiveMQ PooledConnectionFactory 的可插件 ConnectionResource 的示例:

public class AMQConnectionResource implements ConnectionResource {
    private PooledConnectionFactory pcf;

    public AMQConnectionResource(String connectString, int maxConnections) {
        super();
        pcf = new PooledConnectionFactory(connectString);
        pcf.setMaxConnections(maxConnections);
        pcf.start();
    }

    public void stop() {
        pcf.stop();
    }

    @Override
    public Connection borrowConnection() throws Exception {
        Connection answer = pcf.createConnection();
        answer.start();
        return answer;
    }

    @Override
    public Connection borrowConnection(long timeout) throws Exception {
        // SNIPPED...
    }

    @Override
    public void returnConnection(Connection connection) throws Exception {
        // Do nothing since there isn't a way to return a Connection
        // to the instance of PooledConnectionFactory
        log.info("Connection returned");
    }
}
Copy to Clipboard Toggle word wrap

然后,将 ConnectionResource 传递给 SjmsComponent

CamelContext camelContext = new DefaultCamelContext();
AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
SjmsComponent component = new SjmsComponent();
component.setConnectionResource(pool);
camelContext.addComponent("sjms", component);
Copy to Clipboard Toggle word wrap

要查看其用法的完整示例,请参阅 ConnectionResourceIT

307.6.2. 批处理消息支持

SjmsProducer 支持通过创建封装列表的交换来发布一系列消息。然后,此 SjmsProducer 将迭代 List 的内容,并单独发布每个消息。

如果生成批处理消息,则需要设置对每条消息的标头是唯一的,您可以使用 SJMS BatchMessage 类。当 SjmsProducer 遇到 BatchMessage 列表时,它将迭代每个 BatchMessage 并发布包含的有效负载和标头。

以下是使用 BatchMessage 类的示例。首先,我们创建一个 BatchMessage 列表:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}
Copy to Clipboard Toggle word wrap

然后发布列表:

template.sendBody("sjms:queue:batch.queue", messages);
Copy to Clipboard Toggle word wrap

SJMS 为开发人员提供了一种使用 TransactionCommitStrategy 接口创建自定义和可插入的事务策略的方法。这允许用户定义 SessionTransactionSynchronization 将用来决定何时提交会话的唯一情况。它使用的示例是 BatchTransactionCommitStrategy,在下一节中详细介绍。

307.6.4. transacted Batch Consumers 和 Producers

SJMS 组件旨在支持 Producer 和 Consumer 端点上的本地 JMS 事务批处理。然而,每次处理它们的方式都有所不同。

SJMS 消费者端点是一种简单的实现,它将在与相关会话提交 X 消息之前处理 X 消息。要在消费者上启用批处理事务,首先通过将 transacted 参数设置为 true 来启用事务,然后将 transactionBatchCount 设置为大于 0 的任何值。例如,以下配置将每 10 个信息提交 Session:

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10
Copy to Clipboard Toggle word wrap

如果在消费者端点处理批处理时发生异常,则调用 Session 回滚,从而导致消息被恢复为下一个可用的消费者。对于关联的会话,计数器也会重置为 BatchTransactionCommitStrategy。用户负责确保它们在批处理消息的处理器中放入钩子,以观察其将 JMSRedelivered 标头设置为 true 的消息。这是在某个时候回滚消息,并且应该进行成功处理的验证。

转换的批处理消费者也会与它一起传输,内部计时器的实例会在提交会话上打开事务前等待消息之间的默认时间(5000ms)。默认值 5000ms (最小 1000ms)应该足以满足大多数用例,但如果需要进一步调优,只需设置 transactionBatchTimeout 参数。

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000
Copy to Clipboard Toggle word wrap

将接受的最小值是 1000ms,因为上下文切换量可能会导致不必要的性能影响,而不会获得好处。

虽然处理制作者端点的方式略有不同。各个消息后生产者都发送到其目的地,Exchange 已关闭,并且不再引用该消息。要使所有消息都可用于重新发送,只需在发布 BatchMessages 的 Producer Endpoints 上启用事务。事务将在交换结束时提交,其中包括批处理列表中的所有消息。不需要配置任何其他操作。例如:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}
Copy to Clipboard Toggle word wrap

现在,发布启用事务的列表:

template.sendBody("sjms:queue:batch.queue?transacted=true", messages);
Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat