第12章 大きなメッセージの処理
JBoss EAP メッセージングは、クライアントまたはサーバーでメモリーのサイズが制限されていても、大きなメッセージに対応できます。大きなメッセージは、そのままにストリーミングしたり、効率的に転送するためにさらに圧縮したりできます。ユーザーはメッセージのボディーに InputStream
を設定すると、大きなメッセージを送信できます。メッセージが送信されると、JBoss EAP メッセージングがこの InputStream
を読み取り、データを断片化してサーバーへ送信します。
クライアントもサーバーも、大きなメッセージのボディー部分を完全な形でメモリーに保存しません。コンシューマーは、最初にボディーが空の大きなメッセージを受け取った後、メッセージに OutputStream
を設定して、断片的にディスクファイルへストリーミングします。
大きなメッセージを処理する場合、サーバーはメッセージのボディーと同じ方法でメッセージプロパティーを処理しません。たとえば、journal-buffer-size
より大きい文字列に設定されたプロパティーを持つメッセージは、ジャーナルバッファーが満杯になるため、サーバーで処理できません。
12.1. 大きなメッセージのストリーミング
標準的な方法で大きなメッセージを送信する場合、メッセージを送信するのに必要なヒープサイズはメッセージのサイズの 4 倍以上になる可能性があります (つまり 1 GB のメッセージでは 4 GB のヒープメモリーが必要になる可能性があります)。このため、JBoss EAP メッセージングでは、java.io.InputStream
と java.io.OutputStream
クラスを使用してメッセージのボディーの設定をサポートしていますが、これに必要なメモリーはそれほど多くありません。入力ストリームがメッセージと出力ストリームの送信にそのまま使用され、メッセージの受信に使用されます。
メッセージの受信時に、出力ストリームを処理する方法は 2 つあります。
-
ClientMessage.saveToOutputStream(OutputStream out)
メソッドを使用して出力ストリームを復元している間はブロックできます。 -
ClientMessage.setOutputstream (OutputStream out)
メソッドを使用してメッセージをストリームに非同期に書き込むことができます。この方式では、メッセージが完全に受信されるまでコンシューマーをキープアライブする必要があります。
メッセージの送信に java.io.InputStream
を実装し、メッセージの受信に java.io.OutputStream
を実装している限り、ストリームの種類に関係なく (ファイル、JDBC Blob、または SocketInputStream など) 使用できます。
コア API を使用した大きなメッセージのストリーミング
以下の表に、オブジェクトプロパティーを使用して JMS で利用可能な ClientMessage
クラスで使用できるメソッドを示します。
ClientMessage メソッド | 説明 | JMS 等価プロパティー |
---|---|---|
|
送信時にメッセージボディーを読み取るために使用される |
|
|
メッセージのボディーを受信する |
|
|
メッセージのボディーを |
|
以下のコード例では、コアメッセージを受信する際に出力ストリームを設定します。
ClientMessage firstMessage = consumer.receive(...); // Block until the stream is transferred firstMessage.saveOutputStream(firstOutputStream); ClientMessage secondMessage = consumer.receive(...); // Do not wait for the transfer to finish secondMessage.setOutputStream(secondOutputStream);
以下のコード例では、コアメッセージを送信する際に入力ストリームを設定します。
ClientMessage clientMessage = session.createMessage(); clientMessage.setInputStream(dataInputStream);
2GiB を超えるメッセージの場合は、_AMQ_LARGE_SIZE
メッセージプロパティーを使用する必要があります。getBodySize()
メソッドが最大整数値に制限されているために無効な値を返すためです。
JMS での大きなメッセージのストリーミング
JMS を使用する場合、JBoss EAP メッセージングはオブジェクトプロパティーを設定して、コア API ストリーミングメソッドをマッピングします。Message.setObjectProperty(String name, Object value)
メソッドを使用して入力ストリームと出力ストリームを設定します。
InputStream
は、送信メッセージで JMS_AMQ_InputStream
プロパティーを使用して設定されます。
BytesMessage bytesMessage = session.createBytesMessage(); FileInputStream fileInputStream = new FileInputStream(fileInput); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); bytesMessage.setObjectProperty("JMS_AMQ_InputStream", bufferedInput); someProducer.send(bytesMessage);
OutputStream
は、ブロックの方法で受信されたメッセージの JMS_AMQ_SaveStream
プロパティーを使用して設定されます。
BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(120000); File outputFile = new File("huge_message_received.dat"); FileOutputStream fileOutputStream = new FileOutputStream(outputFile); BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // This will block until the entire content is saved on disk messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
JMS_AMQ_OutputStream
プロパティーを使用すると、OutputStream
をブロック以外の方法で設定することもできます。
// This does not wait for the stream to finish. You must keep the consumer active. messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
JMS を使用して大きなメッセージをストリーミングする場合、StreamMessage
オブジェクトと BytesMessage
オブジェクトのみがサポートされます。