第25章 大量のメッセージ処理
AMQ Streams デプロイメントで大量のメッセージを処理する必要がある場合は、設定オプションを使用してスループットとレイテンシーを最適化できます。
プロデューサーとコンシューマーの設定は、Kafka ブローカーへの要求のサイズと頻度を制御するのに役立ちます。設定オプションの詳細は、以下を参照してください。
また、Kafka Connect ランタイムソースコネクター (MirrorMaker 2 を含む) およびシンクコネクターで使用されるプロデューサーとコンシューマーで同じ設定オプションを使用することもできます。
- ソースコネクター
- Kafka Connect ランタイムのプロデューサーは、メッセージを Kafka クラスターに送信します。
- MirrorMaker 2 の場合、ソースシステムは Kafka であるため、コンシューマーはソース Kafka クラスターからメッセージを取得します。
- シンクコネクター
- Kafka Connect ランタイムのコンシューマーは、Kafka クラスターからメッセージを取得します。
コンシューマーの場合、1 回のフェッチリクエストでフェッチされるデータの量を増やして、レイテンシーを短縮することができます。fetch.max.bytes および max.partition.fetch.bytes プロパティーを使用して、フェッチ要求のサイズを増やします。max.poll.records プロパティーを使用して、コンシューマーバッファーから返されるメッセージ数の上限を設定することもできます。
MirrorMaker 2 の場合、ソースからメッセージをフェッチする特定のコンシューマーに関連して、ソースコネクターレベル (consumer.*) で fetch.max.bytes、max.partition.fetch.bytes、および max.poll.records の値を設定します。
プロデューサーの場合は、1 つの生成リクエストで送信されるメッセージバッチのサイズを増やすことができます。batch.size プロパティーを使用してバッチサイズを増やします。バッチサイズを大きくすると、送信する準備ができている未処理のメッセージの数と、メッセージキュー内のバックログのサイズが減少します。同じパーティションに送信されるメッセージはまとめてバッチ処理されます。バッチサイズに達すると、プロデュースリクエストがターゲットクラスターに送信されます。バッチサイズを大きくすると、プロデュースリクエストが遅延し、より多くのメッセージがバッチに追加され、同時にブローカーに送信されます。これにより、多数のメッセージを処理するトピックパーティションが複数ある場合に、スループットが向上します。
プロデューサーが適切なプロデューサーバッチサイズに対して処理するレコードの数とサイズを考慮します。
linger.ms を使用してミリ秒単位の待機時間を追加し、プロデューサーの負荷が減少したときにプロデュースリクエストを遅らせます。遅延は、最大バッチサイズ未満の場合に、より多くのレコードをバッチに追加できることを意味します。
ソースコネクターレベル (producer.override.*) で batch.size および linger.ms の値を設定します。これは、ターゲット Kafka クラスターにメッセージを送信する特定のプロデューサーに関連するためです。
Kafka Connect ソースコネクターでは、ターゲット Kafka クラスターへのデータストリーミングパイプラインは以下のようになります。
Kafka Connect ソースコネクターのデータストリーミングパイプライン
外部データソース
Kafka Connect シンクコネクターの場合、ターゲット外部データソースへのデータストリーミングパイプラインは次のとおりです。
Kafka Connect シンクコネクターのデータストリーミングパイプライン
ソース Kafka トピック
MirrorMaker 2 の場合、ターゲット Kafka クラスターへのデータミラーリングパイプラインは次のとおりです。
MirrorMaker 2 のデータミラーリングパイプライン
ソース Kafka トピック
プロデューサーは、バッファー内のメッセージをターゲット Kafka クラスター内のトピックに送信します。これが発生している間、Kafka Connect タスクは引き続きデータソースをポーリングして、ソースメッセージキューにメッセージを追加します。
ソースコネクターのプロデューサーバーッファーのサイズは、producer.override.buffer.memory プロパティーを使用して設定されます。タスクは、バッファーがフラッシュされる前に、指定されたタイムアウト期間 (offset.flush.timeout.ms) 待機します。これは、送信されたメッセージがブローカーによって確認され、コミットされたデータがオフセットされるのに十分な時間です。ソースタスクは、シャットダウン中を除き、オフセットをコミットする前にプロデューサーがメッセージキューを空にするのを待ちません。
プロデューサーがソースメッセージキュー内のメッセージのスループットについていけない場合、バッファリングは、max.block.ms で制限された期間内にバッファーに使用可能なスペースができるまでブロックされます。バッファー内に未確認のメッセージがあれば、この期間中に送信されます。これらのメッセージが確認されてフラッシュされるまで、新しいメッセージはバッファーに追加されません。
次の設定変更を試して、未処理メッセージの基になるソースメッセージキューを管理可能なサイズに保つことができます。
-
offset.flush.timeout.msのデフォルト値 (ミリ秒) を増やす - 十分な CPU およびメモリーリソースがあることを確認します。
以下を実行して、並行して実行されるタスクの数を増やします。
-
tasksMaxプロパティーを使用して並行して実行するタスクの数を増やす -
replicasプロパティーを使用してタスクを実行するワーカーノードの数の増加
-
使用可能な CPU とメモリーリソース、およびワーカーノードの数に応じて、並列実行できるタスクの数を検討してください。必要な効果が得られるまで、設定値を調整し続けることを推奨します。
25.1. 大量メッセージ用の Kafka Connect の設定 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect は、ソースの外部データシステムからデータをフェッチし、それを Kafka Connect ランタイムプロデューサーに渡して、ターゲットクラスターにレプリケートします。
次の例は、KafkaConnect カスタムリソースを使用した Kafka Connect の設定を示しています。
大量のメッセージを処理するための Kafka Connect 設定の例
プロデューサー設定は、KafkaConnector カスタムリソースを使用して管理されるソースコネクター用に追加されます。
大量のメッセージを処理するためのソースコネクターの設定例
FileStreamSourceConnector および FileStreamSinkConnector は、コネクターの例として提供されています。これらを KafkaConnector リソースとしてデプロイする方法については、「KafkaConnector リソースのデプロイ」 を参照してください。
シンクコネクターのコンシューマー設定が追加されます。
大量のメッセージを処理するためのシンクコネクターの設定例
KafkaConnector カスタムリソースの代わりに Kafka Connect API を使用してコネクターを管理している場合は、コネクター設定を JSON オブジェクトとして追加できます。
大量のメッセージを処理するためのソースコネクター設定を追加するための curl 要求の例