第 10 章 流控制
通过限制它们间的数据流,流程控制可以防止生产者和使用者变得超值。通过使用 AMQ Broker,您可以为使用者和生产器配置流控制。
10.1. 消费者流控制
消费者流控制规定了代理和客户端之间的数据流,因为客户端会消耗来自代理的消息。默认情况下,AMQ Broker 客户端缓冲消息在向消费者交付前。如果没有缓冲,客户端首先需要请求来自代理的每个信息,然后再使用它。这种类型的"往返"通信非常昂贵。在客户端上监管数据流非常重要,因为内存不足问题可能会导致使用者无法快速处理消息,并且缓冲区开始与传入消息溢出。
10.1.1. 设置 Consumer Window 大小
客户端缓冲区中操作的最大消息大小由其 窗口大小 决定。AMQ Broker 客户端窗口的默认大小为 1 MiB,或 1024 * 1024 字节。对于大多数用例来说,默认设置非常正常。对于其他情况,找到窗口大小的最佳值可能需要对系统进行基准测试。如果需要更改默认,AMQ Broker 允许您设置缓冲区窗口大小。
设置窗口大小
以下示例演示了如何在使用 Core JMS 客户端时设置使用者窗口大小参数。每个示例将消费者窗口大小设置为 300000
字节。
步骤
设置使用者窗口大小。
如果核心 JMS 客户端使用 JNDI 来实例化其连接工厂,则将
consumerWindowSize
参数包含为连接字符串 URL 的一部分。将 URL 存储在 JNDI 上下文环境中。以下示例使用jndi.properties
文件来存储 URL。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=300000
如果核心 JMS 客户端不使用 JNDI 来实例化其连接工厂,请将值传递给
ActiveMQConnectionFactory.setConsumerWindowSize()
。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(300000);
10.1.2. 处理 Fast Consumers
快速消费者可以像使用时快速处理消息。如果您确信消息传递系统中的使用者速度很快,请考虑将窗口大小设置为 -1
。此设置允许在客户端端绑定消息缓冲。但要谨慎使用此设置。如果使用者无法像收到消息一样,它能够使客户端一侧内存处理速度。
为 Fast Consumers 设置窗口大小
步骤
以下示例说明了如何在使用快速消息使用者的 Core JMS 客户端时,如何将窗口大小设置为 -1
。
将使用者窗口大小设置为
-1
。如果核心 JMS 客户端使用 JNDI 来实例化其连接工厂,则将
consumerWindowSize
参数包含为连接字符串 URL 的一部分。将 URL 存储在 JNDI 上下文环境中。以下示例使用jndi.properties
文件来存储 URL。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=-1
如果核心 JMS 客户端不使用 JNDI 来实例化其连接工厂,请将值传递给
ActiveMQConnectionFactory.setConsumerWindowSize()
。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(-1);
10.1.3. 处理较低消费者
缓慢的消费者需要花费大量时间来处理每条信息。在这些情况下,建议不要在客户端一侧缓冲信息。相反,消息仍保留在代理端,以供其他消费者使用。关闭缓冲区的一个好处是它在队列中的多个消费者之间提供确定的分布。要通过禁用客户端缓冲区来处理较慢的用户,请将窗口大小设置为 0。
为 Slow Consumers 设置窗口大小
步骤
以下示例显示,在使用 Core JMS 客户端时,如何将窗口大小设置为 0,
而这个客户端是速度较慢的消息。
将使用者窗口大小设置为
0。
如果核心 JMS 客户端使用 JNDI 来实例化其连接工厂,则将
consumerWindowSize
参数包含为连接字符串 URL 的一部分。将 URL 存储在 JNDI 上下文环境中。以下示例使用jndi.properties
文件来存储 URL。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=0
如果核心 JMS 客户端不使用 JNDI 来实例化其连接工厂,请将值传递给
ActiveMQConnectionFactory.setConsumerWindowSize()
。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(0);
相关信息
请参阅示例 no-consumer-buffering
in INSTALL_DIR/examples/ standard
以了解一个示例,它演示了如何配置代理以防止处理速度较慢的用户时的消费者缓冲。
10.1.4. 设置截止消息的比率
您可以规定消费者可以使用消息的速率。也称为 "throttling",对消耗率进行限制,可确保消费者不会消耗比配置允许的速度快的消息。
速率限制流控制可与基于窗口的流控制一起使用。速率限制流控制只会影响客户端可以消耗的每秒消息数量,而不影响其缓冲区中有多少消息。随着速率的速率限制和高窗口的限制,客户端的内部缓冲区会快速使用消息进行填充。
速率必须是正整数,才能启用这个功能,它是每秒以消息单位指定的最大消息消耗率。将速率限制设置为 -1
可禁用速率限制流控制。默认值为 -1
。
设置截止消息的比率
步骤
以下示例使用 Core JMS 客户端,将消耗消息速率限制为每秒 10
个消息。
设置使用者率。
如果核心 JMS 客户端使用 JNDI 来实例化其连接工厂,请将
consumerMaxRate
参数作为连接字符串 URL 的一部分。将 URL 存储在 JNDI 上下文环境中。以下示例使用jndi.properties
文件来存储 URL。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616?consumerMaxRate=10
如果核心 JMS 客户端不使用 JNDI 来实例化其连接工厂,请将值传递到
ActiveMQConnectionFactory.setConsumerMaxRate()
。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerMaxRate(10);
相关信息
有关如何限制消费者率的工作示例,请参阅 INSTALL_DIR/examples/standard
中的 consumer-rate-limit
示例。