12.4. クライアント設定のチューニング
設定プロパティーを使用して、Kafka プロデューサーおよびコンシューマーのパフォーマンスを最適化します。
最小セットの設定プロパティーが必要ですが、プロパティーを追加または調整して、プロデューサーとコンシューマーが Kafka と対話する方法を変更できます。たとえば、プロデューサーの場合は、クライアントがリアルタイムでデータに応答できるように、メッセージのレイテンシーおよびスループットをチューニングできます。また、設定を変更して、より強力にメッセージの持続性を保証することもできます。
クライアントメトリックを分析して初期設定を行う場所を判断することから始め、必要な設定になるまで段階的に変更を加え、さらに比較を行うことができます。
12.4.1. Kafka プロデューサー設定のチューニング
特定のユースケースに合わせて調整されたオプションのプロパティーとともに、基本的なプロデューサー設定を使用します。
設定を調整してスループットを最大化すると、レイテンシーが増加する可能性があり、その逆も同様です。必要なバランスを取得するために、プロデューサー設定を実験して調整する必要があります。
12.4.1.1. 基本のプロデューサー設定
接続およびシリアライザープロパティーはすべてのプロデューサーに必要です。通常、追跡用のクライアント ID を追加し、プロデューサーで圧縮してリクエストのバッチサイズを減らすことが推奨されます。
基本的なプロデューサー設定には以下が含まれます。
- パーティション内のメッセージの順序は保証されません。
- ブローカーに到達するメッセージの完了通知は持続性を保証しません。
# ... bootstrap.servers=localhost:9092 1 key.serializer=org.apache.kafka.common.serialization.StringSerializer 2 value.serializer=org.apache.kafka.common.serialization.StringSerializer 3 client.id=my-client 4 compression.type=gzip 5 # ...
- 1
- (必須) Kafka ブローカーの host:port ブートストラップサーバーアドレスを使用して Kafka クラスターに接続するようプロデューサーを指示します。プロデューサーはアドレスを使用して、クラスター内のすべてのブローカーを検出し、接続します。サーバーがダウンした場合に備えて、コンマ区切りリストを使用して 2 つまたは 3 つのアドレスを指定しますが、クラスター内のすべてのブローカーのリストを提供する必要はありません。
- 2
- (必須) メッセージがブローカーに送信される前に、各メッセージの鍵をバイトに変換するシリアライザー。
- 3
- (必須) メッセージがブローカーに送信される前に、各メッセージの値をバイトに変換するシリアライザー。
- 4
- (任意) クライアントの論理名。リクエストのソースを特定するためにログおよびメトリクスで使用されます。
- 5
- (任意) メッセージを圧縮するコーデック。これは、送信され、圧縮された形式で格納された後、コンシューマーへの到達時に圧縮解除される可能性があります。圧縮はスループットを改善し、ストレージの負荷を減らすのに役立ちますが、圧縮や圧縮解除のコストが異常に高い低レイテンシーのアプリケーションには不適切である場合があります。
12.4.1.2. データの持続性
メッセージ配信の完了通知を使用して、データの持続性を適用し、メッセージが失われる可能性を最小限に抑えることができます。
# ...
acks=all 1
# ...
- 1
acks=all
を指定すると、パーティションリーダーは、メッセージリクエストが正常に受信されたことを確認する前に、一定数のフォロワーにメッセージを複製するように強制します。追加のチェックにより、acks=all
はプロデューサーがメッセージを送信して確認応答を受信する間のレイテンシーを長くします。
完了通知がプロデューサーに送信される前にメッセージをログに追加する必要のあるブローカーの数は、トピックの min.insync.replicas
設定によって決定されます。最初に、トピックレプリケーション係数を 3 にし、他のブローカーの In-Sync レプリカを 2 にするのが一般的です。この設定では、単一のブローカーが利用できない場合でもプロデューサーは影響を受けません。2 番目のブローカーが利用できなくなると、プロデューサーは完了通知を受信せず、それ以上のメッセージを生成できなくなります。
acks=all
をサポートするトピック設定
# ...
min.insync.replicas=2 1
# ...
- 1
- Sync レプリカでは
2
を使用します。デフォルトは1
です。
システムに障害が発生すると、バッファーの未送信データが失われる可能性があります。
12.4.1.3. 順序付き配信
メッセージは 1 度だけ配信されるため、べき等プロデューサーは重複を回避します。障害発生時でも配信の順序が維持されるように、ID とシーケンス番号がメッセージに割り当てられます。データの一貫性を保つために acks=all
を使用している場合は、順序付けられた配信に冪等性を有効にすることが妥当です。
べき等を使った順序付き配信
# ... enable.idempotence=true 1 max.in.flight.requests.per.connection=5 2 acks=all 3 retries=2147483647 4 # ...
パフォーマンスコストが原因で acks=all
と idempotency を使用していない場合には、インフライト(承認されていない)リクエストの数を 1 に設定して、順序を保持します。そうしないと、Message-A が失敗し、Message-B がブローカーに書き込まれた後にのみ成功する可能性があります。
べき等を使用しない順序付け配信
# ... enable.idempotence=false 1 max.in.flight.requests.per.connection=1 2 retries=2147483647 # ...
12.4.1.4. 信頼性の保証
べき等は、1 つのパーティションへの書き込みを 1 回だけ行う場合に便利です。トランザクションをべき等と使用すると、複数のパーティション全体で 1 度だけ書き込みを行うことができます。
トランザクションは、同じトランザクション ID を使用するメッセージが 1 度作成され、すべてがそれぞれのログに書き込まれるか、何も書き込まれないかのどちらかになることを保証します。
# ... enable.idempotence=true max.in.flight.requests.per.connection=5 acks=all retries=2147483647 transactional.id=UNIQUE-ID 1 transaction.timeout.ms=900000 2 # ...
トランザクション保証を維持するには、transactional.id
の選択が重要です。トランザクション ID は、一意なトピックパーティションセットに使用する必要があります。たとえば、トピックパーティション名からトランザクション ID への外部マッピングを使用したり、競合を回避する関数を使用してトピックパーティション名からトランザクション IDを算出したりすると、これを実現できます。
12.4.1.5. スループットおよびレイテンシーの最適化
通常、システムの要件は、指定のレイテンシー内であるメッセージの割合に対して、特定のスループットのターゲットを達成することです。たとえば、95 % のメッセージが 2 秒以内に完了確認される、1 秒あたり 500,000 個のメッセージをターゲットとします。
プロデューサーのメッセージングセマンティック (メッセージの順序付けと持続性) は、アプリケーションの要件によって定義される可能性があります。たとえば、アプリケーションが提供する重要なプロパティーや保証を壊さずに acks=0
または acks=1
を使用するオプションはありません。
ブローカーの再起動は、パーセンタイルの高いの統計に大きく影響します。たとえば、長期間では、99% のレイテンシーはブローカーの再起動に関する動作によるものです。これは、ベンチマークを設計したり、本番環境のパフォーマンスで得られた数字を使ってベンチマークを行い、そのパフォーマンスの数字を比較したりする場合に検討する価値があります。
目的に応じて、Kafka はスループットとレイテンシーのプロデューサーパフォーマンスを調整するために多くの設定パラメーターと設定方法を提供します。
- メッセージのバッチ処理(
linger.ms
およびbatch.size
) -
メッセージのバッチ処理では、同じブローカー宛のメッセージをより多く送信するために、メッセージの送信を遅らせ、単一の生成リクエストでバッチ処理できるようにします。バッチ処理では、スループットを増やすためにレイテンシーを長くして妥協します。時間ベースのバッチ処理は
linger.ms
を使用して設定され、サイズベースのバッチ処理はbatch.size
を使用して設定されます。 - Compression(
compression.type
) -
メッセージ圧縮処理により、プロデューサー (メッセージの圧縮に費やされた CPU 時間) のレイテンシーが追加されますが、リクエスト (および場合によってはディスクの書き込み) を小さくするため、スループットが増加します。圧縮に価値があるかどうか、および使用に最適な圧縮は、送信されるメッセージによって異なります。圧縮は
KafkaProducer.send()
を呼び出すスレッドで発生するため、アプリケーションでこのメソッドのレイテンシーが重要となる場合は、より多くのスレッドの使用を検討する必要があります。 - パイプライン処理(
max.in.flight.requests.per.connection
) - パイプライン処理は、以前のリクエストへの応答を受け取る前により多くのリクエストを送信します。通常、パイプライン処理を増やすと、バッチ処理の悪化などの別の問題がスループットに悪影響を与え始めるしきい値まではスループットが増加します。
レイテンシーの短縮
アプリケーションが KafkaProducer.send()
を呼び出す場合、メッセージは以下のようになります。
- インターセプターによる処理。
- シリアライズ。
- パーティションへの割り当て。
- 圧縮処理。
- パーティションごとのキューでメッセージのバッチに追加。
ここでの send()
メソッドが返されます。そのため、time send()
は以下によって決定されます。
- インターセプター、シリアライザー、およびパーティションヤーで費やされた時間。
- 使用される圧縮アルゴリズム。
- 圧縮に使用するバッファーの待機に費やされた時間。
バッチは、以下のいずれかが行われるまでキューに残ります。
-
バッチが満杯になる(
batch.size
による)。 -
linger.ms
によって導入される遅延が渡される。 - 送信者は他のパーティションのメッセージバッチを同じブローカーに送信しようとし、このバッチの追加も可能。
- プロデューサーがフラッシュまたは閉じられる。
バッチ処理とバッファーの設定を参照して、レイテンシーをブロックする send()
の影響を軽減します。
# ... linger.ms=100 1 batch.size=16384 2 buffer.memory=33554432 3 # ...
スループットの増加
メッセージの配信および送信リクエストの完了までの最大待機時間を調整して、メッセージリクエストのスループットを向上します。
また、カスタムパーティションを作成してデフォルトを置き換えることで、メッセージを指定のパーティションに転送することもできます。
# ... delivery.timeout.ms=120000 1 partitioner.class=my-custom-partitioner 2 # ...