Chapter 10. Flow Control


Flow control prevents producers and consumers from becoming overburdened by limiting the flow of data between them. Using AMQ Broker allows you to configure flow control for both consumers and producers.

10.1. Consumer Flow Control

Consumer flow control regulates the flow of data between the broker and the client as the client consumes messages from the broker. AMQ Broker clients buffer messages by default before delivering them to consumers. Without a buffer, the client would first need to request each message from the broker before consuming it. This type of "round-trip" communication is costly. Regulating the flow of data on the client side is important because out of memory issues can result when a consumer cannot process messages quickly enough and the buffer begins to overflow with incoming messages.

10.1.1. Setting the Consumer Window Size

The maximum size of messages held in the client-side buffer is determined by its window size. The default size of the window for AMQ Broker clients is 1 MiB, or 1024 * 1024 bytes. The default is fine for most use cases. For other cases, finding the optimal value for the window size might require benchmarking your system. AMQ Broker allows you to set the buffer window size if you need to change the default.

Setting the Window Size

The following examples demonstrate how to set the consumer window size parameter when using a Core JMS client. Each example sets a consumers window size to 300000 bytes.

Procedure

  • Set the consumer window size.

    • If the Core JMS Client uses JNDI to instantiate its connection factory, include the consumerWindowSize parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses a jndi.properties file to store the URL.

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=300000
    • If the Core JMS client does not use JNDI to instantiate its connection factory, pass a value to ActiveMQConnectionFactory.setConsumerWindowSize().

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerWindowSize(300000);

10.1.2. Handling Fast Consumers

Fast consumers can process messages as fast as they consume them. If you are confident that the consumers in your messaging system are that fast, consider setting the window size to -1. This setting allows for unbounded message buffering on the client side. Use this setting with caution, however. It can overflow client-side memory if the consumer is not able to process messages as fast as it receives them.

Setting the Window Size for Fast Consumers

Procedure

The examples below show how to set the window size to -1 when using a Core JMS client that is a fast consumer of messages.

  • Set the consumer window size to -1.

    • If the Core JMS Client uses JNDI to instantiate its connection factory, include the consumerWindowSize parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses a jndi.properties file to store the URL.

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=-1
    • If the Core JMS client does not use JNDI to instantiate its connection factory, pass a value to ActiveMQConnectionFactory.setConsumerWindowSize().

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerWindowSize(-1);

10.1.3. Handling Slow Consumers

Slow consumers take significant time to process each message. In these cases, it is recommended to not buffer messages on the client side. Messages remain on the broker side ready to be consumed by other consumers instead. One benefit of turning off the buffer is that it provides deterministic distribution between multiple consumers on a queue. To handle slow consumers by disabling the client-side buffer, set the window size to 0.

Setting the Window Size for Slow Consumers

Procedure

The examples below show you how to set the window size to 0 when using the Core JMS client that is a slow consumer of messages.

  • Set the consumer window size to 0.

    • If the Core JMS Client uses JNDI to instantiate its connection factory, include the consumerWindowSize parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses a jndi.properties file to store the URL.

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=0
    • If the Core JMS client does not use JNDI to instantiate its connection factory, pass a value to ActiveMQConnectionFactory.setConsumerWindowSize().

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerWindowSize(0);

Related Information

See the example no-consumer-buffering in INSTALL_DIR/examples/standard for an example that shows how to configure the broker to prevent consumer buffering when dealing with slow consumers.

10.1.4. Setting the Rate of Consuming Messages

You can regulate the rate at which a consumer can consume messages. Also known as "throttling", regulating the rate of consumption ensures that a consumer never consumes messages at a rate faster than configuration allows.

Note

Rate-limited flow control can be used in conjunction with window-based flow control. Rate-limited flow control affects only how many messages a client can consume in a second and not how many messages are in its buffer. With a slow rate limit and a high window-based limit, the internal buffer of the client fills up with messages quickly.

The rate must be a positive integer to enable this functionality and is the maximum desired message consumption rate specified in units of messages per second. Setting the rate to -1 disables rate-limited flow control. The default value is -1.

Setting the Rate of Consuming Messages

Procedure

The examples below use a Core JMS client that limits the rate of consuming messages to 10 messages per second.

  • Set the consumer rate.

    • If the Core JMS Client uses JNDI to instantiate its connection factory, include the consumerMaxRate parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses a jndi.properties file to store the URL.

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      java.naming.provider.url=tcp://localhost:61616?consumerMaxRate=10
    • If the Core JMS client does not use JNDI to instantiate its connection factory, pass the value to ActiveMQConnectionFactory.setConsumerMaxRate().

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setConsumerMaxRate(10);

Related information

See the consumer-rate-limit example in INSTALL_DIR/examples/standard for a working example of how to limit the consumer rate.

10.2. Producer Flow Control

In a similar way to consumer window-based flow control, AMQ Broker can limit the amount of data sent from a producer to a broker to prevent the broker from being overburdened with too much data. In the case of a producer, the window size determines the amount of bytes that can be in-flight at any one time.

10.2.1. Setting the Producer Window Size

The window size is negotiated between the broker and producer on the basis of credits, one credit for each byte in the window. As messages are sent and credits are used, the producer must request, and be granted, credits from the broker before it can send more messages. The exchange of credits between producer and broker regulates the flow of data between them.

Setting the Window Size

The following examples demonstrate how to set the producer window size to 1024 bytes when using Core JMS clients.

Procedure

  • Set the producer window size.

    • If the Core JMS Client uses JNDI to instantiate its connection factory, include the producerWindowSize parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses a jndi.properties file to store the URL.

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      java.naming.provider.url=tcp://localhost:61616?producerWindowSize=1024
    • If the Core JMS client does not use JNDI to instantiate its connection factory, pass the value to ActiveMQConnectionFactory.setProducerWindowSize().

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setProducerWindowSize(1024);

10.2.2. Blocking Messages

Because more than one producer can be associated with the same address, it is possible for the broker to allocate more credits across all producers than what is actually available. However, you can set a maximum size on any address that prevents the broker from sending more credits than are available.

In the default configuration, a global maximum size of 100Mb is used for each address. When the address is full, the broker writes further messages to the paging journal instead of routing them to the queue. Instead of paging, you can block the sending of more messages on the client side until older messages are consumed. Blocking producer flow control in this way prevents the broker from running out of memory due to producers sending more messages than can be handled at any one time.

In the configuration, blocking producer flow control is managed on a per address-setting basis. The configuration applies to all queues registered to an address. In other words, the total memory for all queues bound to that address is capped by the value given for max-size-bytes.

Note

Blocking is protocol dependent. In AMQ Broker the AMQP, OpenWire, and Core Protocol support producer flow control. AMQP handles flow control differently, however. See Blocking Flow Control Using AMQP for more information.

Configuring the Maximum Size for an Address

To configure the broker to block messages if they are larger than the set maximum number of bytes, add a new addres-setting configuration element to BROKER_INSTANCE_DIR/etc/broker.xml.

Procedure

  • In the example configuration below, an address-setting is set to BLOCK producers from sending messages after reaching its maximum size of 300000 bytes.
<configuration>
  <core>
    ...
    <address-settings>
       <address-setting match="my.blocking.queue"> 1
          <max-size-bytes>300000</max-size-bytes>  2
          <address-full-policy>BLOCK</address-full-policy> 3
       </address-setting>
    </address-settings>
  </core>
</configuration>
1
The above configuration applies to any queue referenced by the my.blocking.queue address .
2
Sets the maximum size to 300000 bytes. The broker will block producers from sending to the address if the message exceeds max-size-bytes. Note that this element supports byte notation such as "K", "Mb", and "GB".
3
Sets the address-full-policy to BLOCK to enable blocking producer flow control.

10.2.3. Blocking AMQP Messages

As explained earlier in this chapter Core Protocol uses a producer window-size flow control system. In this system, credits represent bytes and are allocated to producers. If a producer wants to send a message, it must wait until it has sufficient credits to accommodate the size of a message before sending it.

AMQP flow control credits are not representative of bytes, however, but instead represent the number of messages a producer is permitted to send, regardless of the message size. It is therefore possible in some scenarios for an AMQP client to significantly exceed the max-size-bytes of an address.

To manage this situation, add the element max-size-bytes-reject-threshold to the address-setting to specify an upper bound on an address size in bytes. Once this upper bound is reached, the broker rejects AMQP messages. By default, max-size-bytes-reject-threshold is set to -1, or no limit.

Configuring the Broker to Block AMQP Messages

To configure the broker to block AMQP messages if they are larger than the set maximum number of bytes, add a new addres-setting configuration element to BROKER_INSTANCE_DIR/etc/broker.xml.

Procedure

  • The example configuration below applies a maximum size of 300000 bytes to any AMQP message routed to the my.amqp.blocking.queue address.

    <configuration>
      <core>
        ...
        <address-settings>
           ...
           <address-setting match="my.amqp.blocking.queue"> 1
              <max-size-bytes-reject-threshold>300000</max-size-bytes-reject-threshold>  2
           </address-setting>
        </address-settings>
      </core>
    </configuration>
1
The above configuration applies to any queue referenced by the my.amqp.blocking.queue address.
2
The broker is configured to reject AMQP messages sent to queues matching this address if they are larger than the max-size-bytes-reject-threshold of 300000 bytes. Note that this element does not support byte notation such as K, Mb, and GB.

Addtional resources

10.2.4. Setting the Rate of Sending Messages

AMQ Broker can also limit the rate a producer can emit messages. The producer rate is specified in units of messages per second. Setting it to -1, the default, disables rate-limited flow control.

Setting the Rate of Sending Messages

The examples below demonstrate how to set the rate of sending messages when the producer is using a Core JMS client. Each example sets the maximum rate of sending messages to 10 per second.

Procedure

  • Set the rate that a producer can send messages.

    • If the Core JMS Client uses JNDI to instantiate its connection factory, include the producerMaxRate parameter as part of the connection string URL. Store the URL within a JNDI context environment. The example below uses a jndi.properties file to store the URL.

      java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
      java.naming.provider.url=tcp://localhost:61616?producerMaxRate=10
    • If the Core JMS client does not use JNDI to instantiate its connection factory, pass the value to ActiveMQConnectionFactory.setProducerMaxRate().

      ConnectionFactory cf =  ActiveMQJMSClient.createConnectionFactory(...)
      cf.setProducerMaxRate(10);

Related Information

See the producer-rate-limit example in INSTALL_DIR/examples/standard for a working example of how to limit a the rate of sending messages.

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.