第6章 大量のメッセージ処理


Streams for Apache Kafka デプロイメントで大量のメッセージを処理する必要がある場合は、設定オプションを使用してスループットとレイテンシーを最適化できます。

プロデューサーとコンシューマーの設定は、Kafka ブローカーへの要求のサイズと頻度を制御するのに役立ちます。設定オプションの詳細は、以下を参照してください。

また、Kafka Connect ランタイムソースコネクター (MirrorMaker 2 を含む) およびシンクコネクターで使用されるプロデューサーとコンシューマーで同じ設定オプションを使用することもできます。

ソースコネクター
  • Kafka Connect ランタイムのプロデューサーは、メッセージを Kafka クラスターに送信します。
  • MirrorMaker 2 の場合、ソースシステムは Kafka であるため、コンシューマーはソース Kafka クラスターからメッセージを取得します。
シンクコネクター
  • Kafka Connect ランタイムのコンシューマーは、Kafka クラスターからメッセージを取得します。

コンシューマーの場合、1 回のフェッチリクエストでフェッチされるデータの量を増やして、レイテンシーを短縮することができます。fetch.max.bytes および max.partition.fetch.bytes プロパティーを使用して、フェッチ要求のサイズを増やします。max.poll.records プロパティーを使用して、コンシューマーバッファーから返されるメッセージ数の上限を設定することもできます。

MirrorMaker 2 の場合、ソースからメッセージをフェッチする特定のコンシューマーに関連して、ソースコネクターレベル (consumer.*) で fetch.max.bytesmax.partition.fetch.bytes、および max.poll.records の値を設定します。

プロデューサーの場合は、1 つの生成リクエストで送信されるメッセージバッチのサイズを増やすことができます。batch.size プロパティーを使用してバッチサイズを増やします。バッチサイズを大きくすると、送信する準備ができている未処理のメッセージの数と、メッセージキュー内のバックログのサイズが減少します。同じパーティションに送信されるメッセージはまとめてバッチ処理されます。バッチサイズに達すると、プロデュースリクエストがターゲットクラスターに送信されます。バッチサイズを大きくすると、プロデュースリクエストが遅延し、より多くのメッセージがバッチに追加され、同時にブローカーに送信されます。これにより、多数のメッセージを処理するトピックパーティションが複数ある場合に、スループットが向上します。

プロデューサーが適切なプロデューサーバッチサイズに対して処理するレコードの数とサイズを考慮します。

linger.ms を使用してミリ秒単位の待機時間を追加し、プロデューサーの負荷が減少したときにプロデュースリクエストを遅らせます。遅延は、最大バッチサイズ未満の場合に、より多くのレコードをバッチに追加できることを意味します。

ソースコネクターレベル (producer.override.*) で batch.size および linger.ms の値を設定します。これは、ターゲット Kafka クラスターにメッセージを送信する特定のプロデューサーに関連するためです。

Kafka Connect ソースコネクターでは、ターゲット Kafka クラスターへのデータストリーミングパイプラインは以下のようになります。

Kafka Connect ソースコネクターのデータストリーミングパイプライン

外部データソース (Kafka Connect タスク) ソースメッセージキュー プロデューサーバーッファー ターゲット Kafka トピック

Kafka Connect シンクコネクターの場合、ターゲット外部データソースへのデータストリーミングパイプラインは次のとおりです。

Kafka Connect シンクコネクターのデータストリーミングパイプライン

ソース Kafka トピック (Kafka Connect タスク) シンクメッセージキュー コンシューマーバッファー 外部データソース

MirrorMaker 2 の場合、ターゲット Kafka クラスターへのデータミラーリングパイプラインは次のとおりです。

MirrorMaker 2 のデータミラーリングパイプライン

ソース Kafka トピック (Kafka Connect タスク) ソースメッセージキュー プロデューサーバーッファー ターゲット Kafka トピック

プロデューサーは、バッファー内のメッセージをターゲット Kafka クラスター内のトピックに送信します。これが発生している間、Kafka Connect タスクは引き続きデータソースをポーリングして、ソースメッセージキューにメッセージを追加します。

ソースコネクターのプロデューサーバーッファーのサイズは、producer.override.buffer.memory プロパティーを使用して設定されます。タスクは、バッファーがフラッシュされる前に、指定されたタイムアウト期間 (offset.flush.timeout.ms) 待機します。これは、送信されたメッセージがブローカーによって確認され、コミットされたデータがオフセットされるのに十分な時間です。ソースタスクは、シャットダウン中を除き、オフセットをコミットする前にプロデューサーがメッセージキューを空にするのを待ちません。

プロデューサーがソースメッセージキュー内のメッセージのスループットについていけない場合、バッファリングは、max.block.ms で制限された期間内にバッファーに使用可能なスペースができるまでブロックされます。バッファー内に未確認のメッセージがあれば、この期間中に送信されます。これらのメッセージが確認されてフラッシュされるまで、新しいメッセージはバッファーに追加されません。

次の設定変更を試して、未処理メッセージの基になるソースメッセージキューを管理可能なサイズに保つことができます。

  • offset.flush.timeout.ms のデフォルト値 (ミリ秒) を増やす
  • 十分な CPU およびメモリーリソースがあることを確認します。
  • 以下を実行して、並行して実行されるタスクの数を増やします。

    • tasksMax プロパティーを使用して並行して実行するタスクの数を増やす
    • replicas プロパティーを使用してタスクを実行するワーカーノードの数の増加

使用可能な CPU とメモリーリソース、およびワーカーノードの数に応じて、並列実行できるタスクの数を検討してください。必要な効果が得られるまで、設定値を調整し続けることを推奨します。

6.1. 大量メッセージ用の Kafka Connect の設定

Kafka Connect は、ソースの外部データシステムからデータをフェッチし、それを Kafka Connect ランタイムプロデューサーに渡して、ターゲットクラスターにレプリケートします。

次の例は、KafkaConnect カスタムリソースを使用した Kafka Connect の設定を示しています。

大量のメッセージを処理するための Kafka Connect 設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 3
  config:
    offset.flush.timeout.ms: 10000
    # ...
  resources:
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # ...
Copy to Clipboard

プロデューサー設定は、KafkaConnector カスタムリソースを使用して管理されるソースコネクター用に追加されます。

大量のメッセージを処理するためのソースコネクターの設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    producer.override.batch.size: 327680
    producer.override.linger.ms: 100
    # ...
Copy to Clipboard

注記

FileStreamSourceConnector および FileStreamSinkConnector は、コネクターの例として提供されています。これらを KafkaConnector リソースとしてデプロイする方法については、KafkaConnector リソース のデプロイ を 参照してください。

シンクコネクターのコンシューマー設定が追加されます。

大量のメッセージを処理するためのシンクコネクターの設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSinkConnector
  tasksMax: 2
  config:
    consumer.fetch.max.bytes: 52428800
    consumer.max.partition.fetch.bytes: 1048576
    consumer.max.poll.records: 500
    # ...
Copy to Clipboard

KafkaConnector カスタムリソースの代わりに Kafka Connect API を使用してコネクターを管理している場合は、コネクター設定を JSON オブジェクトとして追加できます。

大量のメッセージを処理するためのソースコネクター設定を追加するための curl 要求の例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
      "producer.override.batch.size": 327680
      "producer.override.linger.ms": 100
    }
}'
Copy to Clipboard

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat