此内容没有您所选择的语言版本。
Chapter 11. Flow Control
Flow control prevents producers and consumers from becoming overburdened by limiting the flow of data between them. Using AMQ Broker allows you to configure flow control for both consumers and producers.
11.1. Consumer Flow Control
Consumer flow control regulates the flow of data between the broker and the client as the client consumes messages from the broker. AMQ Broker clients buffer messages by default before delivering them to consumers. Without a buffer, the client would first need to request each message from the broker before consuming it. This type of "round-trip" communication is costly. Regulating the flow of data on the client side is important because out of memory issues can result when a consumer cannot process messages quickly enough and the buffer begins to overflow with incoming messages.
11.1.1. Setting the Consumer Window Size
The maximum size of messages held in the client-side buffer is determined by its window size. The default size of the window for AMQ Broker clients is 1 MiB, or 1024 * 1024 bytes. The default is fine for most use cases. For other cases, finding the optimal value for the window size might require benchmarking your system. AMQ Broker allows you to set the buffer window size if you need to change the default.
Setting the Window Size
The following examples demonstrate how to set the consumer window size parameter when using a Core JMS client. Each example sets a consumers window size to 300000
bytes.
Procedure
Set the consumer window size.
If the Core JMS Client uses JNDI to instantiate its connection factory, include the
consumerWindowSize
parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses ajndi.properties
file to store the URL.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=300000
If the Core JMS client does not use JNDI to instantiate its connection factory, pass a value to
ActiveMQConnectionFactory.setConsumerWindowSize()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(300000);
11.1.2. Handling Fast Consumers
Fast consumers can process messages as fast as they consume them. If you are confident that the consumers in your messaging system are that fast, consider setting the window size to -1
. This setting allows for unbounded message buffering on the client side. Use this setting with caution, however. It can overflow client-side memory if the consumer is not able to process messages as fast as it receives them.
Setting the Window Size for Fast Consumers
Procedure
The examples below show how to set the window size to -1
when using a Core JMS client that is a fast consumer of messages.
Set the consumer window size to
-1
.If the Core JMS Client uses JNDI to instantiate its connection factory, include the
consumerWindowSize
parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses ajndi.properties
file to store the URL.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=-1
If the Core JMS client does not use JNDI to instantiate its connection factory, pass a value to
ActiveMQConnectionFactory.setConsumerWindowSize()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(-1);
11.1.3. Handling Slow Consumers
Slow consumers take significant time to process each message. In these cases, it is recommended to not buffer messages on the client side. Messages remain on the broker side ready to be consumed by other consumers instead. One benefit of turning off the buffer is that it provides deterministic distribution between multiple consumers on a queue. To handle slow consumers by disabling the client-side buffer, set the window size to 0
.
Setting the Window Size for Slow Consumers
Procedure
The examples below show you how to set the window size to 0
when using the Core JMS client that is a slow consumer of messages.
Set the consumer window size to
0
.If the Core JMS Client uses JNDI to instantiate its connection factory, include the
consumerWindowSize
parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses ajndi.properties
file to store the URL.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=0
If the Core JMS client does not use JNDI to instantiate its connection factory, pass a value to
ActiveMQConnectionFactory.setConsumerWindowSize()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(0);
Related Information
See the example no-consumer-buffering
in INSTALL_DIR/examples/standard
for an example that shows how to configure the broker to prevent consumer buffering when dealing with slow consumers.
11.1.4. Setting the Rate of Consuming Messages
You can regulate the rate at which a consumer can consume messages. Also known as "throttling", regulating the rate of consumption ensures that a consumer never consumes messages at a rate faster than configuration allows.
Rate-limited flow control can be used in conjunction with window-based flow control. Rate-limited flow control affects only how many messages a client can consume in a second and not how many messages are in its buffer. With a slow rate limit and a high window-based limit, the internal buffer of the client fills up with messages quickly.
The rate must be a positive integer to enable this functionality and is the maximum desired message consumption rate specified in units of messages per second. Setting the rate to -1
disables rate-limited flow control. The default value is -1
.
Setting the Rate of Consuming Messages
Procedure
The examples below use a Core JMS client that limits the rate of consuming messages to 10
messages per second.
Set the consumer rate.
If the Core JMS Client uses JNDI to instantiate its connection factory, include the
consumerMaxRate
parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses ajndi.properties
file to store the URL.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616?consumerMaxRate=10
If the Core JMS client does not use JNDI to instantiate its connection factory, pass the value to
ActiveMQConnectionFactory.setConsumerMaxRate()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerMaxRate(10);
Related information
See the consumer-rate-limit
example in INSTALL_DIR/examples/standard
for a working example of how to limit the consumer rate.
11.2. Producer Flow Control
In a similar way to consumer window-based flow control, AMQ Broker can limit the amount of data sent from a producer to a broker to prevent the broker from being overburdened with too much data. In the case of a producer, the window size determines the amount of bytes that can be in-flight at any one time.
11.2.1. Setting the Producer Window Size
The window size is negotiated between the broker and producer on the basis of credits, one credit for each byte in the window. As messages are sent and credits are used, the producer must request, and be granted, credits from the broker before it can send more messages. The exchange of credits between producer and broker regulates the flow of data between them.
Setting the Window Size
The following examples demonstrate how to set the producer window size to 1024
bytes when using Core JMS clients.
Procedure
Set the producer window size.
If the Core JMS Client uses JNDI to instantiate its connection factory, include the
producerWindowSize
parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses ajndi.properties
file to store the URL.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616?producerWindowSize=1024
If the Core JMS client does not use JNDI to instantiate its connection factory, pass the value to
ActiveMQConnectionFactory.setProducerWindowSize()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setProducerWindowSize(1024);
11.2.2. Blocking Messages
Because more than one producer can be associated with the same address, it is possible for the broker to allocate more credits across all producers than what is actually available. However, you can set a maximum size on any address that prevents the broker from sending more credits than are available.
In the default configuration, a global maximum size of 100Mb is used for each address. When the address is full, the broker writes further messages to the paging journal instead of routing them to the queue. Instead of paging, you can block the sending of more messages on the client side until older messages are consumed. Blocking producer flow control in this way prevents the broker from running out of memory due to producers sending more messages than can be handled at any one time.
In the configuration, blocking producer flow control is managed on a per address-setting
basis. The configuration applies to all queues registered to an address. In other words, the total memory for all queues bound to that address is capped by the value given for max-size-bytes
.
Blocking is protocol dependent. In AMQ Broker the AMQP, OpenWire, and Core protocols feature producer flow control. The AMQP protocol handles flow control differently, however. See Blocking Flow Control Using AMQP for more information.
Configuring the Maximum Size for an Address
To configure the broker to block messages if they are larger than the set maximum number of bytes, add a new addres-setting
configuration element to BROKER_INSTANCE_DIR/etc/broker.xml
.
Procedure
-
In the example configuration below, an
address-setting
is set toBLOCK
producers from sending messages after reaching its maximum size of300000
bytes.
<configuration> <core> ... <address-settings> <address-setting match="my.blocking.queue"> 1 <max-size-bytes>300000</max-size-bytes> 2 <address-full-policy>BLOCK</address-full-policy> 3 </address-setting> </address-settings> </core> </configuration>
- 1
- The above configuration applies to any queue referenced by the
my.blocking.queue
address . - 2
- Sets the maximum size to
300000
bytes. The broker will block producers from sending to the address if the message exceedsmax-size-bytes
. Note that this element supports byte notation such as "K", "Mb", and "GB". - 3
- Sets the
address-full-policy
toBLOCK
to enable blocking producer flow control.
11.2.3. Blocking AMQP Messages
As explained earlier in this chapter the Core protocol uses a producer window-size flow control system. In this system, credits represent bytes and are allocated to producers. If a producer wants to send a message, it must wait until it has sufficient credits to accommodate the size of a message before sending it.
AMQP flow control credits are not representative of bytes, however, but instead represent the number of messages a producer is permitted to send, regardless of the message size. It is therefore possible in some scenarios for an AMQP client to significantly exceed the max-size-bytes
of an address.
To manage this situation, add the element max-size-bytes-reject-threshold
to the address-setting
to specify an upper bound on an address size in bytes. Once this upper bound is reached, the broker rejects AMQP messages. By default, max-size-bytes-reject-threshold
is set to -1
, or no limit.
Configuring the Broker to Block AMQP Messages
To configure the broker to block AMQP messages if they are larger than the set maximum number of bytes, add a new addres-setting
configuration element to BROKER_INSTANCE_DIR/etc/broker.xml
.
Procedure
The example configuration below applies a maximum size of
300000
bytes to any AMQP message routed to themy.amqp.blocking.queue
address.<configuration> <core> ... <address-settings> ... <address-setting match="my.amqp.blocking.queue"> 1 <max-size-bytes-reject-threshold>300000</max-size-bytes-reject-threshold> 2 </address-setting> </address-settings> </core> </configuration>
- 1
- The above configuration applies to any queue referenced by the
my.amqp.blocking.queue
address. - 2
- The broker is configured to reject AMQP messages sent to queues matching this address if they are larger than the
max-size-bytes-reject-threshold
of300000
bytes. Note that this element does not support byte notation such asK
,Mb
, andGB
.
Addtional resources
- For more information about how to configure credits for AMQP producers, see Chapter 3, Network Connections: Protocols.
11.2.4. Setting the Rate of Sending Messages
AMQ Broker can also limit the rate a producer can emit messages. The producer rate is specified in units of messages per second. Setting it to -1
, the default, disables rate-limited flow control.
Setting the Rate of Sending Messages
The examples below demonstrate how to set the rate of sending messages when the producer is using a Core JMS client. Each example sets the maximum rate of sending messages to 10
per second.
Procedure
Set the rate that a producer can send messages.
If the Core JMS Client uses JNDI to instantiate its connection factory, include the
producerMaxRate
parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses ajndi.properties
file to store the URL.java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616?producerMaxRate=10
If the Core JMS client does not use JNDI to instantiate its connection factory, pass the value to
ActiveMQConnectionFactory.setProducerMaxRate()
.ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setProducerMaxRate(10);
Related Information
See the producer-rate-limit
example in INSTALL_DIR/examples/standard
for a working example of how to limit a the rate of sending messages.