3.2. Reducing Context Switching
Overview
Through the consumer configuration options, there are two different ways in which you can optimize the threading model:
Optimize message dispatching on the broker side
On the broker side, the broker normally dispatches messages to consumers asynchronously, which usually gives the best performance (that is, it enables the broker to cope better with slow consumers). If you are sure that your consumers are always fast, however, you could achieve better performance by disabling asynchronous dispatch on the broker (thereby avoiding the cost of unnecessary context switching).
Broker-side asynchronous dispatching can be enabled or disabled at the granularity of individual consumers. Hence, you can disable asynchronous dispatching for your fast consumers, but leave it enabled for your (possibly) slow consumers.
To disable broker-side asynchronous dispatching, set the
consumer.dispatchAsync
option to false
on the transport URI used by the consumer. For example, to disable asynchronous dispatch to the TEST.QUEUE
queue, use the following URI on the consumer side:
TEST.QUEUE?consumer.dispatchAsync=false
It is also possible to disable asynchronous dispatch by setting the
dispatchAsync
property to false on the ActiveMQ connection factory—for example:
// Java ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
Optimize message reception on the consumer side
On the consumer side, there are two layers of threads responsible for receiving incoming messages: the
Session
threads and the MessageConsumer
threads. In the special case where only one session is associated with a connection, the two layers are redundant and it is possible to optimize the threading model by eliminating the thread associated with the session layer. This section explains how to enable this consumer threading optimization.
Default consumer threading model
Figure 3.1, “Default Consumer Threading Model” gives an overview of the default threading model on a consumer. The first thread layer is responsible for pulling messages directly from the transport layer, marshalling each message, and inserting the message into a queue inside a
javax.jms.Session
instance. The second thread layer consists of a pool of threads, where each thread is associated with a javax.jms.MessageConsumer
instance. Each thread in this layer picks the relevant messages out of the session queue, inserting each message into a queue inside the javax.jms.MessageConsumer
instance.
Figure 3.1. Default Consumer Threading Model
Optimized consumer threading model
Figure 3.2, “Optimized Consumer Threading Model” gives an overview of the optimized consumer threading model. This threading model can be enabled, only if there is no more than one session associated with the connection. In this case, it is possible to optimize away the session threading layer and the
MessageConsumer
threads can then pull messages directly from the transport layer.
Figure 3.2. Optimized Consumer Threading Model
Prerequisites
This threading optimization only works, if the following prerequisites are satisfied:
- There must only be one JMS session on the connection. If there is more than one session, a separate thread is always used for each session, irrespective of the value of the
alwaysSessionAsync
flag. - One of the following acknowledgement modes must be selected:
Session.DUPS_OK_ACKNOWLEDGE
Session.AUTO_ACKNOWLEDGE
alwaysSessionAsync option
To enable the consumer threading optimization, set the
alwaysSessionAsync
option to false
on the ActiveMQConnectionFactory
(default is true
).
Note
The
optimizeAcknowledge
option is only supported by the JMS client API.
Example
The following example shows how to initialize a JMS connection and session on a consumer that exploits the threading optimization by switching off the
alwaysSessionAsync
flag:
// Java ... // Create the connection. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connectionFactory.setAlwaysSessionAsync(false); Connection connection = connectionFactory.createConnection(); connection.start(); // Create the one-and-only session on this connection. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);