第11章 大量のメッセージ処理


AMQ Streams デプロイメントで大量のメッセージを処理する必要がある場合は、設定オプションを使用してスループットとレイテンシーを最適化できます。

Kafka プロデューサーおよびコンシューマー設定は、Kafka ブローカーへのリクエストのサイズおよび頻度を制御するのに役立ちます。設定オプションの詳細は、以下を参照してください。

また、Kafka Connect ランタイムソースコネクター (MirrorMaker 2 を含む) およびシンクコネクターで使用されるプロデューサーとコンシューマーで同じ設定オプションを使用することもできます。

ソースコネクター
  • Kafka Connect ランタイムのプロデューサーは、メッセージを Kafka クラスターに送信します。
  • MirrorMaker 2 の場合、ソースシステムは Kafka であるため、コンシューマーはソース Kafka クラスターからメッセージを取得します。
シンクコネクター
  • Kafka Connect ランタイムのコンシューマーは、Kafka クラスターからメッセージを取得します。

コンシューマー設定 (consumer.*) の場合、1 回のフェッチ要求でフェッチされるデータの量を増やして、レイテンシーを減らすことができます。fetch.max.bytes および max.partition.fetch.bytes プロパティーを使用して、フェッチ要求のサイズを増やします。max.poll.records プロパティーを使用して、コンシューマーバッファーから返されるメッセージ数の上限を設定することもできます。

プロデューサー設定 (Producer.*) の場合、1 回のプロデュースリクエストで送信されるメッセージバッチのサイズを大きくすることができます。batch.size プロパティーを使用してバッチサイズを増やします。バッチサイズを大きくすると、送信する準備ができている未処理のメッセージの数と、メッセージキュー内のバックログのサイズが減少します。同じパーティションに送信されるメッセージはまとめてバッチ処理されます。バッチサイズに達すると、プロデュースリクエストがターゲットクラスターに送信されます。バッチサイズを大きくすると、プロデュースリクエストが遅延し、より多くのメッセージがバッチに追加され、同時にブローカーに送信されます。これにより、多数のメッセージを処理するトピックパーティションが複数ある場合に、スループットが向上します。

プロデューサーが適切なプロデューサーバッチサイズに対して処理するレコードの数とサイズを考慮します。

linger.ms を使用してミリ秒単位の待機時間を追加し、プロデューサーの負荷が減少したときにプロデュースリクエストを遅らせます。遅延は、最大バッチサイズ未満の場合に、より多くのレコードをバッチに追加できることを意味します。

Kafka Connect ソースコネクターでは、ターゲット Kafka クラスターへのデータストリーミングパイプラインは以下のようになります。

Kafka Connect ソースコネクターのデータストリーミングパイプライン

外部データソース (Kafka Connect タスク) ソースメッセージキュー プロデューサーバーッファー ターゲット Kafka トピック

Kafka Connect シンクコネクターの場合、ターゲット外部データソースへのデータストリーミングパイプラインは次のとおりです。

Kafka Connect シンクコネクターのデータストリーミングパイプライン

ソース Kafka トピック (Kafka Connect タスク) シンクメッセージキュー コンシューマーバッファー 外部データソース

MirrorMaker 2 の場合、ターゲット Kafka クラスターへのデータミラーリングパイプラインは次のとおりです。

MirrorMaker 2 のデータミラーリングパイプライン

ソース Kafka トピック (Kafka Connect タスク) ソースメッセージキュー プロデューサーバーッファー ターゲット Kafka トピック

プロデューサーは、バッファー内のメッセージをターゲット Kafka クラスター内のトピックに送信します。これが発生している間、Kafka Connect タスクは引き続きデータソースをポーリングして、ソースメッセージキューにメッセージを追加します。

ソースコネクターのプロデューサーバーッファーのサイズは、buffer.memory プロパティーを使用して設定されます。タスクは、バッファーがフラッシュされる前に、指定されたタイムアウト期間 (offset.flush.timeout.ms) 待機します。これは、送信されたメッセージがブローカーによって確認され、コミットされたデータがオフセットされるのに十分な時間です。ソースタスクは、シャットダウン中を除き、オフセットをコミットする前にプロデューサーがメッセージキューを空にするのを待ちません。

プロデューサーがソースメッセージキュー内のメッセージのスループットについていけない場合、バッファリングは、max.block.ms で制限された期間内にバッファーに使用可能なスペースができるまでブロックされます。バッファー内に未確認のメッセージがあれば、この期間中に送信されます。これらのメッセージが確認されてフラッシュされるまで、新しいメッセージはバッファーに追加されません。

次の設定変更を試して、未処理メッセージの基になるソースメッセージキューを管理可能なサイズに保つことができます。

  • offset.flush.timeout.ms のデフォルト値 (ミリ秒) を増やす
  • 十分な CPU およびメモリーリソースがあることを確認します。
  • 以下を実行して、並行して実行されるタスクの数を増やします。

    • tasks.max プロパティーを使用して並列実行するタスクの数を増やす
    • タスクを実行するワーカーのノード数を増やす

使用可能な CPU とメモリーリソース、およびワーカーノードの数に応じて、並列実行できるタスクの数を検討してください。必要な効果が得られるまで、設定値を調整し続けることを推奨します。

11.1. 大量メッセージ用の Kafka Connect の設定

Kafka Connect は、ソースの外部データシステムからデータをフェッチし、それを Kafka Connect ランタイムプロデューサーに渡して、ターゲットクラスターにレプリケートします。

次の例は、Kafka Connect ソースコネクターの設定を示しています。

大量のメッセージを処理するためのソースコネクターの設定例

# ...
producer.batch.size=327680
producer.linger.ms=100
# ...
tasks.max = 2

シンクコネクターのコンシューマー設定が追加されます。

大量のメッセージを処理するためのシンクコネクターの設定例

# ...
consumer.fetch.max.bytes=52428800
consumer.max.partition.fetch.bytes=1048576
consumer.max.poll.records=500
# ...
tasks.max = 2

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.