2.10. Message Prefetch Behavior
Overview
Figure 2.3, “Consumer Prefetch Limit” illustrates the behavior of a broker, as it waits to receive acknowledgments for the messages it has already sent to a consumer.
Figure 2.3. Consumer Prefetch Limit
If a consumer is slow to acknowledge messages, the broker may send it another message before the previous message is acknowledged. If the consumer continues to be slow, the number of unacknowledged messages can grow continuously larger. The broker does not continue to send messages indefinitely. When the number of unacknowledged messages reaches a set limit—the prefetch limit—the server ceases sending new messages to the consumer. No more messages will be sent until the consumer starts sending back some acknowledgments.
Note
The broker relies on acknowledgement of delivery to determine if it can dispatch additional messages to a consumer's prefetch buffer. So, if a consumer's prefetch buffer is set to 1 and it is slow to acknowledge the processing of the message, it is possible that the broker will dispatch an additional message to the consumer and the pending message count will be 2.
Red Hat JBoss A-MQ provides various options for fine tuning prefetch limits for specific circumstances. The prefetch limits can be specified for different types of consumers. You can also set the prefetch limit on a per broker, per connection, or per destination basis.
Consumer specific prefetch limits
Different prefetch limits can be set for each consumer type. Table 2.2, “Prefetch Limit Defaults” list the property name and default value for each consumer type's prefetch limit.
Consumer Type | Property | Default |
---|---|---|
Queue consumer | queuePrefetch | 1000 |
Queue browser | queueBrowserPrefetch | 500 |
Topic consumer | topicPrefetch | 32766 |
Durable topic subscriber | durableTopicPrefetch | 100 |
Setting prefetch limits per broker
You can define the prefetch limits for all consumers that attach to a particular broker by setting a destination policy on the broker. To set the destination policy, add a
destinationPolicy
element as a child of the broker
element in the broker's configuration, as shown in Example 2.13, “Configuring a Destination Policy”.
Example 2.13. Configuring a Destination Policy
<broker ... > ... <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue="queue.>" queuePrefetch=”1”/> <policyEntry topic="topic.>" topicPrefetch=”1000”/> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
In Example 2.13, “Configuring a Destination Policy”, the queue prefetch limit for all queues whose names start with
queue.
is set to 1 (the >
character is a wildcard symbol that matches one or more name segments); and the topic prefetch limit for all topics whose names start with topic.
is set to 1000.
Setting prefetch limits per connection
In a consumer, you can specify the prefetch limits on a connection by setting properties on the
ActiveMQConnectionFactory
instance. Example 2.14, “Setting Prefetch Limit Properties Per Connection” shows how to specify the prefetch limits for all consumer types on a connection factory.
Example 2.14. Setting Prefetch Limit Properties Per Connection
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Properties props = new Properties(); props.setProperty("prefetchPolicy.queuePrefetch", "1000"); props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500"); props.setProperty("prefetchPolicy.durableTopicPrefetch", "100"); props.setProperty("prefetchPolicy.topicPrefetch", "32766"); factory.setProperties(props);
Note
You can also set the prefetch limits using the consumer properties as part of the broker URI used when creating the connection factory.
Setting prefetch limits per destination
At the finest level of granularity, you can specify the prefetch limit on each destination instance that you create in a consumer. Example 2.15, “Setting the Prefetch Limit on a Destination” shows code create the queue
TEST.QUEUE
with a prefetch limit of 10. The option is set as a destination option as part of the URI used to create the queue.
Example 2.15. Setting the Prefetch Limit on a Destination
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); MessageConsumer consumer = session.createConsumer(queue);
Disabling the prefetch extension logic
The default behavior of a broker is to use delivery acknowledgements to determine the state of a consumer's prefetch buffer. For example, if a consumer's prefetch limit is configured as 1 the broker will dispatch 1 message to the consumer and when the consumer acknowledges receiving the message, the broker will dispatch a second message. If the initial message takes a long time to process, the message sitting in the prefetch buffer cannot be processed by a faster consumer.
This behavior can also cause issues when using the JCA resource adapter and transacted clients.
If the behavior is causing issues, it can be changed such that the broker will wait for the consumer to acknowledge that the message is processed before refilling the prefetch buffer. This is accomplished by setting a destination policy on the broker to disable the prefetch extension for specific destinations.
Example 2.16, “Disabling the Prefetch Extension” shows configuration for disabling the prefetch extension on all of a broker's queues.
Example 2.16. Disabling the Prefetch Extension
<broker ... > ... <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" usePrefetchExtension=”false”/> </policyEntries> </policyMap> </destinationPolicy> ... </broker>