第10章 フロー制御
フロー制御は、プロデューサーとコンシューマー間のデータのフローを制限することで、プロデューサーとコンシューマーの超過を防ぎます。AMQ Broker を使用すると、コンシューマーとプロデューサーの両方のフロー制御を設定できます。
10.1. コンシューマーフロー制御
コンシューマーフロー制御は、クライアントがブローカーからメッセージを消費する際に、ブローカーとクライアント間のデータフローを制御します。AMQ Broker クライアントは、デフォルトでメッセージをバッファーしてからコンシューマーに配信します。バッファーがない場合、クライアントはまず、消費する前にブローカーから各メッセージを要求する必要があります。このタイプのラウンドトリップ通信はコストがかかります。メモリー不足の問題によりコンシューマーがメッセージをすばやく処理できず、バッファーが受信メッセージでオーバーフローを開始するため、クライアント側のデータのフローを制限することが重要になります。
10.1.1. コンシューマーウィンドウサイズの設定
クライアント側のバッファーに保持されるメッセージの最大サイズは、その ウィンドウサイズ によって決定されます。AMQ Broker クライアントのウィンドウのデフォルトサイズは 1 MiB または 1024 * 1024 バイトです。ほとんどのユースケースでは、デフォルトでは問題ありません。その他のケースでは、ウィンドウサイズの最適な値を見つけるには、システムのベンチマークが必要になる場合があります。AMQ Broker では、デフォルトを変更する必要がある場合はバッファーウインドウサイズを設定できます。
ウィンドウサイズの設定
以下の例は、Core JMS クライアントを使用する場合にコンシューマーウインドウサイズパラメーターを設定する方法を示しています。それぞれの例では、コンシューマーウィンドウサイズを 300000
バイトに設定します。
手順
コンシューマーウィンドウサイズを設定します。
Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として
consumerWindowSize
パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties
ファイルを使用して URL を保存します。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=300000
Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を
ActiveMQConnectionFactory.setConsumerWindowSize()
に渡します。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(300000);
10.1.2. 高速コンシューマーの処理
高速コンシューマーは、メッセージをコンシュームすると同時に処理できます。メッセージングシステムのコンシューマーが高速であると確信できる場合は、ウィンドウサイズを -
1 に設定することを検討してください。この設定により、クライアント側でバインドされていないメッセージバッファーリングが可能になります。ただし、この設定は注意して使用してください。コンシューマーが受信と同時にメッセージを処理できない場合、クライアント側のメモリーをオーバーフローさせることができます。
高速コンシューマーのウィンドウサイズの設定
手順
以下の例は、メッセージの高速コンシューマーである Core JMS クライアントを使用する場合に、ウィンドウサイズを -
1 に設定する方法を示しています。
コンシューマーウィンドウサイズを
-
1 に設定します。Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として
consumerWindowSize
パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties
ファイルを使用して URL を保存します。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=-1
Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を
ActiveMQConnectionFactory.setConsumerWindowSize()
に渡します。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(-1);
10.1.3. 低速なコンシューマーの処理
低速なコンシューマーは、各メッセージを処理するのにかなり時間がかかります。このような場合は、クライアント側でメッセージをバッファーしないことが推奨されます。メッセージはブローカー側で残り、他のコンシューマーによって消費されます。バッファーをオフにする利点の 1 つは、キュー上の複数のコンシューマー間で確定的な分散を提供することです。クライアント側のバッファーを無効にして低速なコンシューマーを処理するには、ウィンドウサイズを 0
に設定します。
低速なコンシューマーのウィンドウサイズの設定
手順
以下の例は、メッセージの低速なコンシューマーである Core JMS クライアントを使用する場合に、ウィンドウサイズを 0
に設定する方法を示しています。
コンシューマーウィンドウサイズを
0
に設定します。Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として
consumerWindowSize
パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties
ファイルを使用して URL を保存します。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory connectionFactory.myConnectionFactory=tcp://localhost:61616?consumerWindowSize=0
Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を
ActiveMQConnectionFactory.setConsumerWindowSize()
に渡します。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerWindowSize(0);
関連情報
低速なコンシューマーを処理する場合にコンシューマーをバッファーしないようにブローカーを設定する方法を示す例は、INSTALL_DIR/examples/standard
の no-consumer-buffering
の例を参照してください。
10.1.4. メッセージ消費率の設定
コンシューマーがメッセージを消費できるレートを調整できます。スロットリングとしても知られており、消費率は、コンシューマーが設定を許可するよりも高速にメッセージを消費しないようにします。
レート制限のあるフロー制御は、ウィンドウベースのフロー制御と併用できます。レート制限のあるフロー制御は、クライアントが 1 秒以内に消費できるメッセージ数のみに影響し、バッファー内のメッセージ数には影響しません。レート制限が遅く、ウィンドウベースの制限が高いと、クライアントの内部バッファーがメッセージですぐに一杯になります。
この機能を有効にするには、レートは正の整数である必要があります。1 秒あたりのメッセージ単位で指定される必要なメッセージ消費率の最大値です。レートを -1
に設定すると、レート制限のあるフロー制御が無効になります。デフォルト値は -1
です。
メッセージ消費率の設定
手順
以下の例では、メッセージの消費速度を毎秒 10
メッセージに制限する Core JMS クライアントを使用しています。
コンシューマーレートを設定します。
Core JMS Client が JNDI を使用して接続ファクトリーをインスタンス化する場合は、connection string URL の一部として
consumerMaxRate
パラメーターを含めます。JNDI コンテキスト環境内に URL を保存します。以下の例では、jndi.properties
ファイルを使用して URL を保存します。java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616?consumerMaxRate=10
Core JMS クライアントが JNDI を使用して接続ファクトリーをインスタンス化しない場合は、値を
ActiveMQConnectionFactory.setConsumerMaxRate()
に渡します。ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(...) cf.setConsumerMaxRate(10);
関連情報
コンシューマーレートの制限方法の作業例は、INSTALL_DIR/examples/standard
の consumer-rate-limit
の例を参照してください。