Chapter 13. Working with Large Messages
You can configure AMQ Broker to store large messages on a physical disk or in a database table. Handling large messages in this way avoids the memory overhead that occurs when storing several large messages in memory.
AMQ Broker can persist large messages even if the client and broker are running with limited memory. The maximum size of a large message depends only on the amount of space available for your physical disk or database table.
Large message support is available for the AMQP, Core, and OpenWire protocols. Additionally, the STOMP protocol provides its own method for handling large messages. See "Handling Large Messages with STOMP" for more information.
If you persist large messages on a disk, it is recommended that the large messages directory be located on a different volume than the one used to persist the message journal or the paging directory.
13.1. Preparing Brokers to Store Large Messages
Large messages are stored on physical disk or database table. You must configure the broker to specify where large messages are stored.
Procedure
Add configuration to
BROKER_INSTANCE_DIR/etc/broker.xml
that references the storage location for large messages.If you are storing large messages on disk, add the
large-messages-directory
configuration element and provide the file system location, as shown in the following example:<configuration> <core> ... <large-messages-directory>/path/to/large-messages</large-messages-directory> 1 ... </core> </configuration>
- 1
- The default value for the
large-messages-directory
configuration element isBROKER_INSTANCE_DIR/data/largemessages
If you are storing large messages in a database table, add the name of the table to your
database-store
, as shown in the following example:<store> <database-store> ... <large-message-table>MY_TABLE</large-message-table> 1 </database-store> </store>
- 1
- The default value for the
large-message-table
configuration element isLARGE_MESSAGE_TABLE
.
Additional Resources
See the large-message
example found under BROKER_INSTANCE_DIR/examples/standard/
for a working example showing how to work with large messages.
For more information about configuring a data-store
see Configuring JDBC Persistence.
13.2. Preparing Clients to Send Large Messages
You prepare client connections to handle large messages by setting a value for the property minLargeMessageSize
. The value can be provided as a parameter in the connection URL, or it can be set by using a supported client API. Any message larger than minLargeMessageSize
is considered a large message.
AMQ Broker messages are encoded using two bytes per character. Therefore, if the message data is filled with ASCII characters (which are one byte in size), the size of the resulting message would roughly double. When setting the value of minLargeMessageSize
, remember that encoding can increase message size. The default value for minLargeMessageSize` is 100KiB.
Procedure
Set the minimum size for large messages.
If you are using JNDI to instantiate your connection factory, set the size in a
jndi.properties
file by using the parameterminLargeMessageSize
.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?minLargeMessageSize=250000
If you are not using JNDI, set the size using the method
ActiveMQConnectionFactory.setMinLargeMessageSize()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setMinLargeMessageSize(250000);
13.3. Preparing OpenWire Clients to Send Large Messages
Configuration options added to the connection URI used by an AMQ OpenWire JMS client must include the prefix wireFormat.
to take effect. Options missing this prefix are ignored.
Procedure
Set the minimum size for large messages.
If you are using JNDI to instantiate your connection factory, set the size in a
jndi.properties
file by using the parameterminLargeMessageSize
. You must add the prefixwireFormat.
to the parameter for it to take effect.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?wireFormat.minLargeMessageSize=250000
If you are not using JNDI, set the size using the method
ActiveMQConnectionFactory.setMinLargeMessageSize()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setMinLargeMessageSize(250000);
13.4. Sending Large Messages
AMQ Broker supports Java-based InputStreams
for sending large messages. The most common use case is to send files stored on your disk, but you could also send the data as JDBC Blobs or JSON objects recovered from HTTPRequests
.
When using JMS, streaming large messages is supported only when using StreamMessage
and BytesMessage
.
Procedure
To send a large message, set the
JMS_AMQ_InputStream
property to mark the message as streamed:BytesMessage message = session.createBytesMessage(); FileInputStream fileInputStream = new FileInputStream(fileInput); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput); ...
13.5. Receiving Large Messages
The AMQ Broker Core JMS API has a method for synchronously receiving a streamed message. The methods block further processing until the input stream is completely received.
Procedure
To receive a large message, set the
JMS_AMQ_SaveStream
on the message object:BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000); File outputFile = new File("large_message_received.dat"); FileOutputStream fileOutputStream = new FileOutputStream(outputFile); BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // This will block until the entire content is saved on disk messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
Receiving a Large Message Asynchronously
The Core JMS API also has a method for asynchronously receiving a streamed message. The method does not block processing by a consumer while it receives the input stream.
Procedure
To receive a large message asynchronously, set the
JMS_AMQ_OutputStream
parameter on the message object:BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000); File outputFile = new File("large_message_received.dat"); FileOutputStream fileOutputStream = new FileOutputStream(outputFile); BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // This will not block until the entire content is saved on disk messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
13.6. Large Messages and Java Clients
There are a two recommended options available to Java developers who are writing clients that use large messages.
One option is to use an instance of InputStream
and OutputStream
. For example, a FileInputStream
could be used to send a message taken from a large file on a physical disk. A FileOutputStream
could then be used by the receiver to stream the message to a location on its local file system.
Another option is to stream a JMS BytesMessage
or StreamMessage
directly:
BytesMessage rm = (BytesMessage)cons.receive(10000); byte data[] = new byte[1024]; for (int i = 0; i < rm.getBodyLength(); i += 1024) { int numberOfBytes = rm.readBytes(data); // Do whatever you want with the data }
13.7. Compressing Large Messages
You can enable clients to compress large messages before sending them. The ZIP algorithm is used to compress the message body as the message is sent to the broker.
If the compressed size of a large message is less than the value of minLargeMessageSize
, the message is sent as a regular message. Therefore, it is not written to the broker’s large-message data directory.
If you use a Core JMS client and JNDI, use the JNDI context environment to enable message compression:
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?compressLargeMessages=true
Add the
minLargeMessageSize
parameter to the connection factory URL to set the minimum size requirement for messages to be compressed. In the following example, messages are compressed only when they exceed 250 kilobytes in size.connectionFactory.myConnectionFactory=tcp://localhost:61616?compressLargeMessages=true&minLargeMessageSize=250kb
13.8. Handling Large Messages with STOMP
STOMP clients might send large bodies of frames, which can exceed the size of the broker’s internal buffer, causing unexpected errors.
To prevent this situation from occurring, set the acceptor’s stompMinLargeMessageSize
parameter to the desired size. Proper sizing is affected by system resources such as the amount of disk space available, as well as the size of the messages. It is recommended that you run performance tests using several values for stompMinLargeMessageSize
to determine an appropriate size.
The broker checks the size of the body of each STOMP frame coming from connections established with this acceptor. If the size of the body is equal to or greater than the value of stompMinLargeMessageSize
, the message is persisted as a large message.
Procedure
-
Open the configuration file
BROKER_INSTANCE_DIR/etc/broker.xml
-
Add the
stompMinLargeMessageSize
parameter and its desired value to an existing or newacceptor
, as shown in the following example:
<acceptors> <acceptor name="stomp-acceptor">tcp://localhost:61613?protocols=STOMP;stompMinLargeMessageSize=10240</acceptor> ... </acceptors>
In the preceding example, the broker is configured to accept STOMP messages on port 61613
. If the acceptor receives a STOMP frame with a body larger than or equal to 10240
bytes the broker will persist it as a large message.
When a large message is delivered to a STOMP consumer, the broker automatically converts it from a large message to a normal message before sending it to the client. If a large message is compressed, the broker decompresses it before sending it to STOMP clients.
The default value of stompMinLargeMessageSize
is 102400 bytes.