Este conteúdo não está disponível no idioma selecionado.
Chapter 12. Working with Large Messages
JBoss EAP messaging supports large messages, even when the client or server has limited amounts of memory. Large messages can be streamed as they are, or they can be compressed further for more efficient transferral. A user can send a large message by setting an InputStream
in the body of the message. When the message is sent, JBoss EAP messaging reads this InputStream
and transmits data to the server in fragments.
Neither the client nor the server stores the complete body of a large message in memory. The consumer initially receives a large message with an empty body and thereafter sets an OutputStream
on the message to stream it in fragments to a disk file.
When processing large messages, the server does not handle message properties in the same way as the message body. For example a message with a property set to a string that is bigger than journal-buffer-size
cannot be processed by the server because it overfills the journal buffer.
12.1. Streaming Large Messages
If you send large messages the standard way, the heap size required to send them can be four or more times the size of the message, meaning a 1 GB message can require 4 GB in heap memory. For this reason, JBoss EAP messaging supports setting the body of messages using the java.io.InputStream
and java.io.OutputStream
classes, which require much less memory. Input streams are used directly for sending messages and output streams are used for receiving messages.
When receiving messages, there are two ways to deal with the output stream:
-
You can block while the output stream is recovered using the
ClientMessage.saveToOutputStream(OutputStream out)
method. -
You can use the
ClientMessage.setOutputstream(OutputStream out)
method to asynchronously write the message to the stream. This method requires that the consumer be kept alive until the message has been fully received.
You can use any kind of stream you like, for example files, JDBC Blobs, or SocketInputStream, as long as it implements java.io.InputStream
for sending messages and java.io.OutputStream
for receiving messages.
Streaming Large Messages Using the Core API
The following table shows the methods available on the ClientMessage
class that are available through JMS by using object properties.
ClientMessage Method | Description | JMS Equivalent Property |
---|---|---|
|
Set the |
|
|
Set the |
|
|
Save the body of the message to the |
|
The following code example sets the output stream when receiving a core message.
ClientMessage firstMessage = consumer.receive(...); // Block until the stream is transferred firstMessage.saveOutputStream(firstOutputStream); ClientMessage secondMessage = consumer.receive(...); // Do not wait for the transfer to finish secondMessage.setOutputStream(secondOutputStream);
The following code example sets the input stream when sending a core message:
ClientMessage clientMessage = session.createMessage(); clientMessage.setInputStream(dataInputStream);
For messages larger than 2GiB, you must use the _AMQ_LARGE_SIZE
message property. This is because the getBodySize()
method will return an invalid value because it is limited to the maximum integer value.
Streaming Large Messages Over JMS
When using JMS, JBoss EAP messaging maps the core API streaming methods by setting object properties. You use the Message.setObjectProperty(String name, Object value)
method to set the input and output streams.
The InputStream
is set using the JMS_AMQ_InputStream
property on messages being sent.
BytesMessage bytesMessage = session.createBytesMessage(); FileInputStream fileInputStream = new FileInputStream(fileInput); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput); someProducer.send(bytesMessage);
The OutputStream
is set using the JMS_AMQ_SaveStream
property on messages being received in a blocking manner.
BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(120000); File outputFile = new File("huge_message_received.dat"); FileOutputStream fileOutputStream = new FileOutputStream(outputFile); BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // This will block until the entire content is saved on disk messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
The OutputStream
can also be set in a non-blocking manner by using the JMS_AMQ_OutputStream
property.
// This does not wait for the stream to finish. You must keep the consumer active. messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
When streaming large messages using JMS, only StreamMessage
and BytesMessage
objects are supported.
12.2. Configuring Large Messages
12.2.1. Configure Large Message Location
You can read the configuration for the large messages directory by using the management CLI command below. The output is also included to highlight default configuration.
/subsystem=messaging-activemq/server=default/path=large-messages-directory:read-resource { "outcome" => "success", "result" => { "path" => "activemq/largemessages", "relative-to" => "jboss.server.data.dir" } }
To achieve the best performance, it is recommended to store the large messages directory on a different physical volume from the message journal or the paging directory.
The large-messages-directory
configuration element is used to specify a location on the filesystem to store the large messages. Note that by default the path is activemq/largemessages
. You can change the location for path by using the following management CLI command.
/subsystem=messaging-activemq/server=default/path=large-messages-directory:write-attribute(name=path,value=PATH_LOCATION)
Also note the relative-to
attribute in the output above. When relative-to
is used, the value of the path attribute is treated as relative to the file path specified by relative-to
. By default this value is the JBoss EAP property jboss.server.data.dir
. For standalone servers, jboss.server.data.dir
is located at EAP_HOME/standalone/data
. For domains, each server will have its own serverX/data/activemq
directory located under EAP_HOME/domain/servers
. You can change the value of relative-to
using the following management CLI command.
/subsystem=messaging-activemq/server=default/path=large-messages-directory:write-attribute(name=relative-to,value=RELATIVE_LOCATION)
Configuring Large Message Size
Use the management CLI to view the current configuration for large messages. Note that the this configuration is part of a connection-factory
element. For example, to read the current configuration for the default RemoteConnectionFactory
that is included, use the following command:
/subsystem=messaging-activemq/server=default/connection-factory=RemoteConnectionFactory:read-attribute(name=min-large-message-size)
Set the attribute using a similar syntax.
/subsystem=messaging-activemq/server=default/connection-factory=RemoteConnectionFactory:write-attribute(name=min-large-message-size,value=NEW_MIN_SIZE)
The value of the attribute min-large-message-size
should be in bytes.
Configuring Large Message Compression
You can choose to compress large messages for fast and efficient transfer. All compression/decompression operations are handled on the client side. If the compressed message is smaller than min-large-message size
, it is sent to the server as a regular message. Compress large messages by setting the boolean property compress-large-messages
to true
using the management CLI.
/subsystem=messaging-activemq/server=default/connection-factory=RemoteConnectionFactory:write-attribute(name=compress-large-messages,value=true)
12.2.2. Configuring Large Message Size Using the Core API
If you are using the core API on the client side, you need to use the setMinLargeMessageSize
method to specify the minimum size of large messages. The minimum size of large messages (min-large-message-size
) is set to 100KB by default.
ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName())) locator.setMinLargeMessageSize(25 * 1024); ClientSessionFactory factory = ActiveMQClient.createClientSessionFactory();