第 12 章 使用大消息
JBoss EAP 消息传递支持大型消息,即使客户端或服务器的内存有限。大消息可以按照原样流传输,也可以进一步压缩,以实现更高效的传输。用户可以通过在邮件正文中设置 InputStream
来发送大消息。发送消息时,JBoss EAP 消息传递将读取此 输入流,
并将数据传输到片段中的服务器。
客户端或服务器均未将大消息的完整正文存储在内存中。使用者最初收到一条包含空正文的大消息,随后在邮件上设置一个 OutputStream
,以将它流传输到磁盘文件。
在处理大型消息时,服务器不会像消息正文一样处理消息属性。例如,某个属性设为大于 journal-buffer-size
的字符串的消息无法被服务器处理,因为它会过填充日志缓冲区。
12.1. 流传输大消息 复制链接链接已复制到粘贴板!
如果您以标准方式发送大信息,发送它们所需的堆大小可以是消息大小的四倍或多倍,这意味着 1 GB 的消息可能需要 4 GB 的堆内存中。因此,JBoss EAP 消息传递支持使用 java.io. InputStream 和 java.io
.OutputStream 类设置消息正文,它们需要的内存要少得多。输入流直接用于发送消息,输出流用于接收消息。
在接收信息时,可以通过两种方式处理输出流:
-
您可以使用
ClientMessage.saveToOutputStream(OutputStream out)
方法阻断输出流。 -
您可以使用
ClientMessage.setOutputstream(OutputStream out)
方法异步将消息写入流。此方法要求使用者保持活动状态,直到消息完全收到。
您可以使用您喜欢的任何流,例如文件 JDBC Blobs 或 SocketInputStream,只要它实施 java.io.InputStream 来
发送消息,而 java.io.OutputStream
用于接收消息。
使用核心 API 流传输大型消息
下表显示了使用对象属性通过 JMS 提供的 ClientMessage
类上的方法:
ClientMessage 方法 | 描述 | JMS 同等属性 |
---|---|---|
|
设置 |
|
|
设置 |
|
|
将邮件的正文保存到 |
|
以下代码示例在接收核心消息时设置输出流。
以下代码示例在发送内核信息时设置输入流:
ClientMessage clientMessage = session.createMessage(); clientMessage.setInputStream(dataInputStream);
ClientMessage clientMessage = session.createMessage();
clientMessage.setInputStream(dataInputStream);
对于大于 2GiB 的消息,您必须使用 _AMQ_LARGE_SIZE
消息属性。这是因为 getBodySize()
方法将返回无效值,因为它仅限于最大整数值。
JMS 流传输大型消息
使用 JMS 时,JBoss EAP 消息传递通过设置对象属性来映射核心 API 流方法。您可以使用 Message.setObjectProperty(String name, Object value)
方法来设置输入和输出流。
在发送的消息上 ,使用
Stream。
JMS_AMQ_InputStream
属性来设置 Input
BytesMessage bytesMessage = session.createBytesMessage(); FileInputStream fileInputStream = new FileInputStream(fileInput); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput); someProducer.send(bytesMessage);
BytesMessage bytesMessage = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(bytesMessage);
OutputStream
使用 JMS_AMQ_SaveStream
属性来设置,对以阻止方式接收的消息使用 JMS_AMQ_SaveStream 属性。
也可以通过使用 JMS_AMQ_Output
。
Stream 属性以非阻塞方式设置 Output
Stream
// This does not wait for the stream to finish. You must keep the consumer active. messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
// This does not wait for the stream to finish. You must keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
使用 JMS 流传输大型消息时,仅支持 StreamMessage
和 BytesMessage
对象。