5.5. プロデューサーのスループットおよびレイテンシーの最適化
通常、システムの要件は、指定のレイテンシー内であるメッセージの割合に対して、特定のスループットのターゲットを達成することです。たとえば、95 % のメッセージが 2 秒以内に完了確認される、1 秒あたり 500,000 個のメッセージをターゲットとします。
プロデューサーのメッセージングセマンティック (メッセージの順序付けと持続性) は、アプリケーションの要件によって定義される可能性があります。たとえば、アプリケーションが提供する重要なプロパティーや保証を壊さずに acks=0
または acks=1
を使用するオプションはありません。
ブローカーの再起動は、パーセンタイルの高い統計に大きな影響を与えます。たとえば、長期間では、99% のレイテンシーはブローカーの再起動に関する動作によるものです。これは、ベンチマークを設計したり、本番環境のパフォーマンスで得られた数字を使用してベンチマークを行い、そのパフォーマンスの数字を比較したりする場合に検討する価値があります。
目的に応じて、Kafka はスループットとレイテンシーのプロデューサーパフォーマンスを調整するために多くの設定パラメーターと設定方法を提供します。
- メッセージのバッチ処理 (
linger.ms
およびbatch.size
) -
メッセージのバッチ処理では、同じブローカー宛のメッセージをより多く送信するために、メッセージの送信を遅らせ、単一の生成リクエストでバッチ処理できるようにします。バッチ処理では、スループットを増やすためにレイテンシーを長くして妥協します。時間ベースのバッチ処理は
linger.ms
を使用して設定され、サイズベースのバッチ処理はbatch.size
を使用して設定されます。 - 圧縮 (
compression.type
) -
メッセージ圧縮処理により、プロデューサー (メッセージの圧縮に費やされた CPU 時間) のレイテンシーが追加されますが、リクエスト (および場合によってはディスクの書き込み) を小さくするため、スループットが増加します。圧縮に価値があるかどうか、および使用に最適な圧縮は、送信されるメッセージによって異なります。圧縮は
KafkaProducer.send()
を呼び出すスレッドで発生するため、アプリケーションでこのメソッドのレイテンシーが重要となる場合は、より多くのスレッドの使用を検討する必要があります。 - パイプライン処理 (
max.in.flight.requests.per.connection
) - パイプライン処理は、以前のリクエストへのレスポンスを受け取る前により多くのリクエストを送信します。通常、パイプライン処理を増やすとスループットの向上し、そのしきい値に達すると、バッチ処理の悪化などの他の影響がスループットへの影響を打ち消し始めます。
レイテンシーの短縮
アプリケーションが KafkaProducer.send()
メソッドを呼び出すと、送信前のメッセージに対して一連の操作が実行されます。
- インターセプション: 設定されたインターセプターによってメッセージが処理されます。
- シリアライゼーション: メッセージが適切な形式にシリアライズされます。
- パーティションの割り当て: 各メッセージが特定のパーティションに割り当てられます。
- 圧縮: ネットワーク帯域幅を節約するためにメッセージが圧縮されます。
- バッチ処理: 圧縮されたメッセージが、パーティション固有のキュー内のバッチに追加されます。
これらの操作の間、send()
メソッドは一時的にブロックされます。また、buffer.memory
が満杯の場合、またはメタデータが利用できない場合も、同メソッドはブロックされたままになります。
バッチは、以下のいずれかが行われるまでキューに残ります。
-
バッチが満杯になる (
batch.size
による)。 -
linger.ms
によって導入された遅延が経過する。 - 送信者が、他のパーティションのバッチを同じブローカーにディスパッチする準備ができており、このバッチを含めることができる。
- プロデューサーがフラッシュまたは閉じられている。
send()
のブロッキング状態がレイテンシーに与える影響を最小限に抑えるには、バッチ処理とバッファリングの設定を最適化します。linger.ms
プロパティーと batch.size
プロパティーを使用し、より多くのメッセージを単一の生成リクエストにバッチ処理してスループットを高めます。
# ... linger.ms=100 1 batch.size=16384 2 buffer.memory=33554432 3 # ...
スループットの増加
デフォルトを置き換えるカスタムパーティショナーを使用して、指定したパーティションにメッセージを送信することで、メッセージリクエストのスループットを向上できます。
# ...
partitioner.class=my-custom-partitioner 1
# ...
- 1
- カスタムパーティショナーのクラス名を指定します。