第 12 章 使用大消息


JBoss EAP 消息传递支持大型消息,即使客户端或服务器的内存有限。大消息可以按照原样流传输,也可以进一步压缩,以实现更高效的传输。用户可以通过在邮件正文中设置 InputStream 来发送大消息。发送消息时,JBoss EAP 消息传递将读取此 输入流, 并将数据传输到片段中的服务器。

客户端或服务器均未将大消息的完整正文存储在内存中。使用者最初收到一条包含空正文的大消息,随后在邮件上设置一个 OutputStream,以将它流传输到磁盘文件。

警告

在处理大型消息时,服务器不会像消息正文一样处理消息属性。例如,某个属性设为大于 journal-buffer-size 的字符串的消息无法被服务器处理,因为它会过填充日志缓冲区。

12.1. 流传输大消息

如果您以标准方式发送大信息,发送它们所需的堆大小可以是消息大小的四倍或多倍,这意味着 1 GB 的消息可能需要 4 GB 的堆内存中。因此,JBoss EAP 消息传递支持使用 java.io. InputStream 和 java.io.OutputStream 类设置消息正文,它们需要的内存要少得多。输入流直接用于发送消息,输出流用于接收消息。

在接收信息时,可以通过两种方式处理输出流:

  • 您可以使用 ClientMessage.saveToOutputStream(OutputStream out) 方法阻断输出流。
  • 您可以使用 ClientMessage.setOutputstream(OutputStream out) 方法异步将消息写入流。此方法要求使用者保持活动状态,直到消息完全收到。

您可以使用您喜欢的任何流,例如文件 JDBC Blobs 或 SocketInputStream,只要它实施 java.io.InputStream 来 发送消息,而 java.io.OutputStream 用于接收消息。

使用核心 API 流传输大型消息

下表显示了使用对象属性通过 JMS 提供的 ClientMessage 类上的方法:

Expand
ClientMessage 方法描述JMS 同等属性

setBodyInputStream(InputStream)

设置 用于在邮件发送时读取邮件正文的输入流

JMS_AMQ_InputStream

setOutputStream(OutputStream)

设置 将接收邮件正文的输出流。这个方法不会阻止。

JMS_AMQ_OutputStream

saveOutputStream(OutputStream)

将邮件的正文保存到 输出流。它将阻止整个内容传输到 输出流

JMS_AMQ_SaveStream

以下代码示例在接收核心消息时设置输出流。

ClientMessage firstMessage = consumer.receive(...);

// Block until the stream is transferred
firstMessage.saveOutputStream(firstOutputStream);

ClientMessage secondMessage = consumer.receive(...);

// Do not wait for the transfer to finish
secondMessage.setOutputStream(secondOutputStream);
Copy to Clipboard Toggle word wrap

以下代码示例在发送内核信息时设置输入流:

ClientMessage clientMessage = session.createMessage();
clientMessage.setInputStream(dataInputStream);
Copy to Clipboard Toggle word wrap
注意

对于大于 2GiB 的消息,您必须使用 _AMQ_LARGE_SIZE 消息属性。这是因为 getBodySize() 方法将返回无效值,因为它仅限于最大整数值。

JMS 流传输大型消息

使用 JMS 时,JBoss EAP 消息传递通过设置对象属性来映射核心 API 流方法。您可以使用 Message.setObjectProperty(String name, Object value) 方法来设置输入和输出流。

在发送的消息上 ,使用 JMS_AMQ_InputStream 属性来设置 Input Stream。

BytesMessage bytesMessage = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(bytesMessage);
Copy to Clipboard Toggle word wrap

OutputStream 使用 JMS_AMQ_SaveStream 属性来设置,对以阻止方式接收的消息使用 JMS_AMQ_SaveStream 属性。

BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(120000);
File outputFile = new File("huge_message_received.dat");
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);

// This will block until the entire content is saved on disk
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
Copy to Clipboard Toggle word wrap

也可以通过使用 JMS_AMQ_Output Stream 属性以非阻塞方式设置 Output Stream

// This does not wait for the stream to finish. You must keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
Copy to Clipboard Toggle word wrap
注意

使用 JMS 流传输大型消息时,仅支持 StreamMessageBytesMessage 对象。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat