第 12 章 使用大消息
JBoss EAP 消息传递支持大型消息,即使客户端或服务器的内存有限。大消息可以按照原样流传输,也可以进一步压缩,以实现更高效的传输。用户可以通过在邮件正文中设置 InputStream 来发送大消息。发送消息时,JBoss EAP 消息传递将读取此 输入流, 并将数据传输到片段中的服务器。
客户端或服务器均未将大消息的完整正文存储在内存中。使用者最初收到一条包含空正文的大消息,随后在邮件上设置一个 OutputStream,以将它流传输到磁盘文件。
在处理大型消息时,服务器不会像消息正文一样处理消息属性。例如,某个属性设为大于 journal-buffer-size 的字符串的消息无法被服务器处理,因为它会过填充日志缓冲区。
12.1. 流传输大消息 复制链接链接已复制到粘贴板!
如果您以标准方式发送大信息,发送它们所需的堆大小可以是消息大小的四倍或多倍,这意味着 1 GB 的消息可能需要 4 GB 的堆内存中。因此,JBoss EAP 消息传递支持使用 java.io. InputStream 和 java.io.OutputStream 类设置消息正文,它们需要的内存要少得多。输入流直接用于发送消息,输出流用于接收消息。
在接收信息时,可以通过两种方式处理输出流:
-
您可以使用
ClientMessage.saveToOutputStream(OutputStream out)方法阻断输出流。 -
您可以使用
ClientMessage.setOutputstream(OutputStream out)方法异步将消息写入流。此方法要求使用者保持活动状态,直到消息完全收到。
您可以使用您喜欢的任何流,例如文件 JDBC Blobs 或 SocketInputStream,只要它实施 java.io.InputStream 来 发送消息,而 java.io.OutputStream 用于接收消息。
使用核心 API 流传输大型消息
下表显示了使用对象属性通过 Jakarta Messaging 提供的 ClientMessage 类中的方法。
ClientMessage 方法 | 描述 | Jakarta 消息传递同等属性 |
|---|---|---|
|
|
设置 |
|
|
|
设置 |
|
|
|
将邮件的正文保存到 |
|
以下代码示例在接收核心消息时设置输出流。
以下代码示例在发送内核信息时设置输入流:
ClientMessage clientMessage = session.createMessage(); clientMessage.setInputStream(dataInputStream);
ClientMessage clientMessage = session.createMessage();
clientMessage.setInputStream(dataInputStream);
对于大于 2GiB 的消息,您必须使用 _AMQ_LARGE_SIZE 消息属性。这是因为 getBodySize() 方法将返回无效值,因为它仅限于最大整数值。
流处理 Jakarta 消息传递的大型消息
使用 Jakarta Messaging 时,JBoss EAP 消息传递通过设置对象属性来映射核心 API 流方式。您可以使用 Message.setObjectProperty(String name, Object value) 方法来设置输入和输出流。
在发送的消息上 ,使用 Stream。
JMS_AMQ_InputStream 属性来设置 Input
BytesMessage bytesMessage = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(bytesMessage);
BytesMessage bytesMessage = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(bytesMessage);
OutputStream 使用 JMS_AMQ_SaveStream 属性来设置,对以阻止方式接收的消息使用 JMS_AMQ_SaveStream 属性。
也可以通过使用 JMS_AMQ_Output 。
Stream 属性以非阻塞方式设置 Output Stream
// This does not wait for the stream to finish. You must keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
// This does not wait for the stream to finish. You must keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
使用 Jakarta Messaging 流传输大型消息时,仅支持 StreamMessage 和 BytesMessage 对象。