Chapter 13. Working with Large Messages
AMQ Broker supports sending and receiving of large messages even when the client and broker are running with limited memory. The only limit to the size of a message that is sent or consumed is the amount of disk space you have available.
Large messages are sent using an InputStream
on a message body. For example, a FileInputStream
could be used to send a large message from a large file on disk. As the InputStream
is read, the data is sent to the broker in fragments. The broker persists the fragments to disk as it receives them.
When a broker later delivers a large message to a consumer, the fragments are read back from the disk and sent. At first, the consumer receives a large message with an empty body. The consumer can then set an OutputStream
on the message to stream the large message body to a file on disk or elsewhere. At no time is the entire message body stored fully in memory, either on the client or on the broker.
You can store large messages to a physical disk or to a database table.
For the best performance, the large messages directory should be stored on a different physical volume than the one used to store the message journal or paging directory.
13.1. Preparing Brokers to Store Large Messages
Large messages are stored on a file system location available to the broker. The following procedure shows you how to configure the location where large messages are stored.
Configuring the Large Messages Directory
Procedure
Add the configuration to
BROKER_INSTANCE_DIR/etc/broker.xml
that references the location chosen to store large messages.If you are using journal persistence, add the
large-messages-directory
element and provide the file system path to the location used to store large messages.<configuration> <core> ... <large-messages-directory>/path/to/large-messages</large-messages-directory> 1 ... </core> </configuration>
- 1
- The default value for the
large-messages-directory
configuration element isBROKER_INSTANCE_DIR/data/largemessages
If you are using JDBC persistence, add the name of the database table used to persist large messages to your
database-store
.<store> <database-store> ... <large-message-table>MY_TABLE</large-message-table> 1 </database-store> </store>
- 1
- The default value for the
large-message-table
configuration element isLARGE_MESSAGE_TABLE
.
Related Information
See the large-message
example found under BROKER_INSTANCE_DIR/examples/standard/
for a working example showing how to work with large messages.
For more information about configuring a data-store
see Configuring JDBC Persistence.
13.2. Preparing Clients to Send Large Messages
Clients prepare their connection for large messages by providing a value for the property minLargeMessageSize
. The value can be provided as a parameter in the URL used to connect to a broker, or it can be set using a supported client API. Any message larger than minLargeMessageSize
is considered a large message that is split up and sent in fragments.
AMQ Broker messages are encoded using two bytes per character so if the message data is filled with ASCII characters (which are one byte) the size of the resulting AMQ Broker message would roughly double. This is important when calculating the size of a "large" message as it may appear to be less than the minLargeMessageSize
before it is sent, but it then turns into a "large" message once it is encoded. The default value is 100KiB.
Configuring a Client to Send Large Messages
The following examples show how to prepare a JMS client to send large messages.
Procedure
Set the minimum size for large messages.
If you are using JNDI to instantiate your connection factory, you can specify the size in a
jndi.properties
, using the parameterminLargeMessageSize
.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?minLargeMessageSize=250000
If you are not using JNDI, specify the minimum large message size using the method
ActiveMQConnectionFactory.setMinLargeMessageSize()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setMinLargeMessageSize(250000);
13.3. Sending Large Messages
AMQ Broker supports using Java-based InputStreams
for sending large messages. The most common use case is to send files stored on your disk, but you could also send such data as JDBC Blobs or JSON objects recovered from HTTPRequests
.
When using JMS, streaming large messages is only supported when using StreamMessage
and BytesMessage
.
Procedure
Set the
JMS_AMQ_InputStream
property to mark the message as streamed, as in the example below.BytesMessage message = session.createBytesMessage(); FileInputStream fileInputStream = new FileInputStream(fileInput); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput); ...
13.4. Receiving Large Messages
The AMQ Broker Core JMS API has a method for synchronously receiving a streamed message. The methods block further processing until the input stream is completely received.
Receiving a Large Message Synchronously
Procedure
Set the
JMS_AMQ_SaveStream
on the message object.BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000); File outputFile = new File("large_message_received.dat"); FileOutputStream fileOutputStream = new FileOutputStream(outputFile); BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // This will block until the entire content is saved on disk messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
Receiving a Large Message Asynchronously
The Core JMS API also has a method for asynchronously receiving a streamed message. The method does not block processing by a consumer while it receives the input stream.
Procedure
Set the
JMS_AMQ_OutputStream
parameter on the message object.BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000); File outputFile = new File("large_message_received.dat"); FileOutputStream fileOutputStream = new FileOutputStream(outputFile); BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // This will not block until the entire content is saved on disk messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
13.5. An Alternative to Streaming Messages
If you choose not to use the InputStream
or OutputStream
capability of AMQ Broker, you could still access the data directly by getting the bytes of the body as you normally would.
You can also stream a JMS BytesMessage
or StreamMessage
directly, as in the example below.
+
BytesMessage rm = (BytesMessage)cons.receive(10000); byte data[] = new byte[1024]; for (int i = 0; i < rm.getBodyLength(); i += 1024) { int numberOfBytes = rm.readBytes(data); // Do whatever you want with the data }
13.6. Compressing Large Messages
Clients can also compress a large message before sending it. The ZIP algorithm is used to compress the message body as the message is sent to the broker.
If the compressed size of a large message is below the value of minLargeMessageSize
, the message is sent as a regular message. Therefore, it is not written to the broker’s large-message data directory.
Procedure
If you use a Core JMS client and JNDI, use the JNDI context environment to enable message compression, as in the example below.
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?compressLargeMessages=true
(Optional) Add the
minLargeMessageSize
parameter to the connection factory URL to set the minimum size needed before a message is compressed. In the example below, messages are compressed only if they exceed 250 kilobytes in size.connectionFactory.myConnectionFactory=tcp://localhost:61616?compressLargeMessages=true&minLargeMessageSize=250kb