このコンテンツは選択した言語では利用できません。
2.3. Concurrent Store and Dispatch
Abstract
Concurrent store and dispatch is a strategy that facilitates high rates of message throughput, provided the consumers are able to keep up with the flow of messages from the broker.
Overview
Concurrent store and dispatch is a strategy that facilitates high rates of message throughput, provided the consumers are able to keep up with the flow of messages from the broker. By allowing the storing of messages to proceed concurrently with the dispatch of those messages to consumers, it can happen that the consumers return acknowledgments before the messages are ever written to disk. In this case, the message writes can be optimized away, because the dispatch has already completed.
Enabling concurrent store and dispatch
Concurrent store and dispatch is enabled by default for queues.
If you want to enable concurrent store and dispatch for topics, you must set the
kahaDB
element's concurrentStoreAndDispatchTopics
attribute to true
.
Concurrent with slow consumers
Figure 2.2, “Concurrent Store and Dispatch—Slow Consumers” shows an outline what happens in the broker when concurrent store and dispatch is enabled and the attached consumers are relatively slow to acknowledge messages.
Figure 2.2. Concurrent Store and Dispatch—Slow Consumers
In the slow consumer case, concurrent store and dispatch behaves as follows:
- The producer sends a message,
M
, to a destination on the broker. - The broker sends the message,
M
, to the persistence layer. Because concurrency is enabled, the message is initially held in a task queue, which is serviced by a pool of threads that are responsible for writing to the journal. - Storing and dispatching are now performed concurrently. The message is dispatched either to one consumer (queue destination) or possibly to multiple destinations (topic consumer). In the meantime, because the attached consumers are slow, we can be sure that the thread pool has already pulled the message off the task queue and written it to the journal.
- The consumer(s) acknowledge receipt of the message.
- The broker asks the persistence layer to remove the message from persistent storage, because delivery is now complete.NoteIn practice, because the KahaDB persistence layer is not able to remove the message from the rolling log files, KahaDB simply logs the fact that delivery of this message is complete. (At some point in the future, when all of the messages in the log file are marked as complete, the entire log file will be deleted.)
Concurrent with fast consumers
Figure 2.3, “Concurrent Store and Dispatch—Fast Consumers” shows an outline what happens in the broker when concurrent store and dispatch is enabled and the attached consumers are relatively fast at acknowledging messages.
Figure 2.3. Concurrent Store and Dispatch—Fast Consumers
In the fast consumer case, concurrent store and dispatch behaves as follows:
- The producer sends a message,
M
, to a destination on the broker. - The broker sends the message,
M
, to the persistence layer. Because concurrency is enabled, the message is initially held in a queue, which is serviced by a pool of threads. - Storing and dispatching are now performed concurrently. The message is dispatched to one or more consumers.In the meantime, assuming that the broker is fairly heavily loaded, it is probable that the message has not yet been written to the journal.
- Because the consumers are fast, they rapidly acknowledge receipt of the message.
- When all of the consumer acknowledgments are received, the broker asks the persistence layer to remove the message from persistent storage. But in the current example, the message is still pending and has not been written to the journal. The persistence layer can therefore remove the message just by deleting it from the in-memory task queue.
Disabling concurrent store and dispatch
If you want to configure the KahaDB message store to use serialized store and dispatch, you must explicitly disable concurrent store and dispatch for queues. Example 2.2, “KahaDB Configured with Serialized Store and Dispatch” explicitly disables the store and dispatch feature for queues and topics.
Example 2.2. KahaDB Configured with Serialized Store and Dispatch
<broker brokerName="broker" persistent="true" useShutdownHook="false"> ... <persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="32mb" concurrentStoreAndDispatchQueues="false" concurrentStoreAndDispatchTopics="false" /> </persistenceAdapter> </broker>
Serialized store and dispatch
Figure 2.4, “Serialized Store and Dispatch” shows an outline what happens in the broker when concurrent store and dispatch is disabled, so that the store and dispatch steps are performed in sequence.
Figure 2.4. Serialized Store and Dispatch
In the serialized case, the store and dispatch steps occur as follows:
- The producer sends a message,
M
, to a destination on the broker. - The broker sends the message,
M
, to the persistence layer. Because concurrency is disabled, the message is immediately written to the journal (assumingenableJournalDiskSyncs
istrue
). - The message is dispatched to one or more consumers.
- The consumers acknowledge receipt of the message.
- When all of the consumer acknowledgments are received, the broker asks the persistence layer to remove the message from persistent storage (in the case of the KahaDB, this means that the persistence layer records in the journal that delivery of this message is now complete).
JMS durability requirements
In order to avoid losing messages, the JMS specification requires the broker to persist each message received from a producer, before sending an acknowledgment back to the producer. In the case of JMS transactions, the requirement is to persist the transaction data (including the messages in the transaction scope), before acknowledging a commit directive. Both of these conditions ensure that data is not lost.
Make sure that the message saves are synced to disk right away by setting the
kahaDB
element's enableJournalDiskSyncs
attribute to true
.
Note
true
is the default value for the enableJournalDiskSyncs
attribute.