第11章 大量のメッセージ処理
AMQ Streams デプロイメントで大量のメッセージを処理する必要がある場合は、設定オプションを使用してスループットとレイテンシーを最適化できます。
Kafka プロデューサーおよびコンシューマー設定は、Kafka ブローカーへのリクエストのサイズおよび頻度を制御するのに役立ちます。設定オプションの詳細は、以下を参照してください。
また、Kafka Connect ランタイムソースコネクター (MirrorMaker 2 を含む) およびシンクコネクターで使用されるプロデューサーとコンシューマーで同じ設定オプションを使用することもできます。
- ソースコネクター
- Kafka Connect ランタイムのプロデューサーは、メッセージを Kafka クラスターに送信します。
- MirrorMaker 2 の場合、ソースシステムは Kafka であるため、コンシューマーはソース Kafka クラスターからメッセージを取得します。
- シンクコネクター
- Kafka Connect ランタイムのコンシューマーは、Kafka クラスターからメッセージを取得します。
コンシューマー設定 (consumer.*
) の場合、1 回のフェッチ要求でフェッチされるデータの量を増やして、レイテンシーを減らすことができます。fetch.max.bytes
および max.partition.fetch.bytes
プロパティーを使用して、フェッチ要求のサイズを増やします。max.poll.records
プロパティーを使用して、コンシューマーバッファーから返されるメッセージ数の上限を設定することもできます。
プロデューサー設定 (Producer.*
) の場合、1 回のプロデュースリクエストで送信されるメッセージバッチのサイズを大きくすることができます。batch.size
プロパティーを使用してバッチサイズを増やします。バッチサイズを大きくすると、送信する準備ができている未処理のメッセージの数と、メッセージキュー内のバックログのサイズが減少します。同じパーティションに送信されるメッセージはまとめてバッチ処理されます。バッチサイズに達すると、プロデュースリクエストがターゲットクラスターに送信されます。バッチサイズを大きくすると、プロデュースリクエストが遅延し、より多くのメッセージがバッチに追加され、同時にブローカーに送信されます。これにより、多数のメッセージを処理するトピックパーティションが複数ある場合に、スループットが向上します。
プロデューサーが適切なプロデューサーバッチサイズに対して処理するレコードの数とサイズを考慮します。
linger.ms
を使用してミリ秒単位の待機時間を追加し、プロデューサーの負荷が減少したときにプロデュースリクエストを遅らせます。遅延は、最大バッチサイズ未満の場合に、より多くのレコードをバッチに追加できることを意味します。
Kafka Connect ソースコネクターでは、ターゲット Kafka クラスターへのデータストリーミングパイプラインは以下のようになります。
Kafka Connect ソースコネクターのデータストリーミングパイプライン
外部データソース
Kafka Connect シンクコネクターの場合、ターゲット外部データソースへのデータストリーミングパイプラインは次のとおりです。
Kafka Connect シンクコネクターのデータストリーミングパイプライン
ソース Kafka トピック
MirrorMaker 2 の場合、ターゲット Kafka クラスターへのデータミラーリングパイプラインは次のとおりです。
MirrorMaker 2 のデータミラーリングパイプライン
ソース Kafka トピック
プロデューサーは、バッファー内のメッセージをターゲット Kafka クラスター内のトピックに送信します。これが発生している間、Kafka Connect タスクは引き続きデータソースをポーリングして、ソースメッセージキューにメッセージを追加します。
ソースコネクターのプロデューサーバーッファーのサイズは、buffer.memory
プロパティーを使用して設定されます。タスクは、バッファーがフラッシュされる前に、指定されたタイムアウト期間 (offset.flush.timeout.ms
) 待機します。これは、送信されたメッセージがブローカーによって確認され、コミットされたデータがオフセットされるのに十分な時間です。ソースタスクは、シャットダウン中を除き、オフセットをコミットする前にプロデューサーがメッセージキューを空にするのを待ちません。
プロデューサーがソースメッセージキュー内のメッセージのスループットについていけない場合、バッファリングは、max.block.ms
で制限された期間内にバッファーに使用可能なスペースができるまでブロックされます。バッファー内に未確認のメッセージがあれば、この期間中に送信されます。これらのメッセージが確認されてフラッシュされるまで、新しいメッセージはバッファーに追加されません。
次の設定変更を試して、未処理メッセージの基になるソースメッセージキューを管理可能なサイズに保つことができます。
-
offset.flush.timeout.ms
のデフォルト値 (ミリ秒) を増やす - 十分な CPU およびメモリーリソースがあることを確認します。
以下を実行して、並行して実行されるタスクの数を増やします。
-
tasks.max
プロパティーを使用して並列実行するタスクの数を増やす - タスクを実行するワーカーのノード数を増やす
-
使用可能な CPU とメモリーリソース、およびワーカーノードの数に応じて、並列実行できるタスクの数を検討してください。必要な効果が得られるまで、設定値を調整し続けることを推奨します。
11.1. 大量メッセージ用の Kafka Connect の設定
Kafka Connect は、ソースの外部データシステムからデータをフェッチし、それを Kafka Connect ランタイムプロデューサーに渡して、ターゲットクラスターにレプリケートします。
次の例は、Kafka Connect ソースコネクターの設定を示しています。
大量のメッセージを処理するためのソースコネクターの設定例
# ... producer.batch.size=327680 producer.linger.ms=100 # ... tasks.max = 2
シンクコネクターのコンシューマー設定が追加されます。
大量のメッセージを処理するためのシンクコネクターの設定例
# ... consumer.fetch.max.bytes=52428800 consumer.max.partition.fetch.bytes=1048576 consumer.max.poll.records=500 # ... tasks.max = 2