第6章 大量のメッセージ処理
Streams for Apache Kafka デプロイメントで大量のメッセージを処理する必要がある場合は、設定オプションを使用してスループットとレイテンシーを最適化できます。
プロデューサーとコンシューマーの設定は、Kafka ブローカーへの要求のサイズと頻度を制御するのに役立ちます。設定オプションの詳細は、以下を参照してください。
また、Kafka Connect ランタイムソースコネクター (MirrorMaker 2 を含む) およびシンクコネクターで使用されるプロデューサーとコンシューマーで同じ設定オプションを使用することもできます。
- ソースコネクター
- Kafka Connect ランタイムのプロデューサーは、メッセージを Kafka クラスターに送信します。
- MirrorMaker 2 の場合、ソースシステムは Kafka であるため、コンシューマーはソース Kafka クラスターからメッセージを取得します。
- シンクコネクター
- Kafka Connect ランタイムのコンシューマーは、Kafka クラスターからメッセージを取得します。
コンシューマーの場合、1 回のフェッチリクエストでフェッチされるデータの量を増やして、レイテンシーを短縮することができます。fetch.max.bytes
および max.partition.fetch.bytes
プロパティーを使用して、フェッチ要求のサイズを増やします。max.poll.records
プロパティーを使用して、コンシューマーバッファーから返されるメッセージ数の上限を設定することもできます。
MirrorMaker 2 の場合、ソースからメッセージをフェッチする特定のコンシューマーに関連して、ソースコネクターレベル (consumer.*
) で fetch.max.bytes
、max.partition.fetch.bytes
、および max.poll.records
の値を設定します。
プロデューサーの場合は、1 つの生成リクエストで送信されるメッセージバッチのサイズを増やすことができます。batch.size
プロパティーを使用してバッチサイズを増やします。バッチサイズを大きくすると、送信する準備ができている未処理のメッセージの数と、メッセージキュー内のバックログのサイズが減少します。同じパーティションに送信されるメッセージはまとめてバッチ処理されます。バッチサイズに達すると、プロデュースリクエストがターゲットクラスターに送信されます。バッチサイズを大きくすると、プロデュースリクエストが遅延し、より多くのメッセージがバッチに追加され、同時にブローカーに送信されます。これにより、多数のメッセージを処理するトピックパーティションが複数ある場合に、スループットが向上します。
プロデューサーが適切なプロデューサーバッチサイズに対して処理するレコードの数とサイズを考慮します。
linger.ms
を使用してミリ秒単位の待機時間を追加し、プロデューサーの負荷が減少したときにプロデュースリクエストを遅らせます。遅延は、最大バッチサイズ未満の場合に、より多くのレコードをバッチに追加できることを意味します。
ソースコネクターレベル (producer.override.*
) で batch.size
および linger.ms
の値を設定します。これは、ターゲット Kafka クラスターにメッセージを送信する特定のプロデューサーに関連するためです。
Kafka Connect ソースコネクターでは、ターゲット Kafka クラスターへのデータストリーミングパイプラインは以下のようになります。
Kafka Connect ソースコネクターのデータストリーミングパイプライン
外部データソース
Kafka Connect シンクコネクターの場合、ターゲット外部データソースへのデータストリーミングパイプラインは次のとおりです。
Kafka Connect シンクコネクターのデータストリーミングパイプライン
ソース Kafka トピック
MirrorMaker 2 の場合、ターゲット Kafka クラスターへのデータミラーリングパイプラインは次のとおりです。
MirrorMaker 2 のデータミラーリングパイプライン
ソース Kafka トピック
プロデューサーは、バッファー内のメッセージをターゲット Kafka クラスター内のトピックに送信します。これが発生している間、Kafka Connect タスクは引き続きデータソースをポーリングして、ソースメッセージキューにメッセージを追加します。
ソースコネクターのプロデューサーバーッファーのサイズは、producer.override.buffer.memory
プロパティーを使用して設定されます。タスクは、バッファーがフラッシュされる前に、指定されたタイムアウト期間 (offset.flush.timeout.ms
) 待機します。これは、送信されたメッセージがブローカーによって確認され、コミットされたデータがオフセットされるのに十分な時間です。ソースタスクは、シャットダウン中を除き、オフセットをコミットする前にプロデューサーがメッセージキューを空にするのを待ちません。
プロデューサーがソースメッセージキュー内のメッセージのスループットについていけない場合、バッファリングは、max.block.ms
で制限された期間内にバッファーに使用可能なスペースができるまでブロックされます。バッファー内に未確認のメッセージがあれば、この期間中に送信されます。これらのメッセージが確認されてフラッシュされるまで、新しいメッセージはバッファーに追加されません。
次の設定変更を試して、未処理メッセージの基になるソースメッセージキューを管理可能なサイズに保つことができます。
-
offset.flush.timeout.ms
のデフォルト値 (ミリ秒) を増やす - 十分な CPU およびメモリーリソースがあることを確認します。
以下を実行して、並行して実行されるタスクの数を増やします。
-
tasksMax
プロパティーを使用して並行して実行するタスクの数を増やす -
replicas
プロパティーを使用してタスクを実行するワーカーノードの数の増加
-
使用可能な CPU とメモリーリソース、およびワーカーノードの数に応じて、並列実行できるタスクの数を検討してください。必要な効果が得られるまで、設定値を調整し続けることを推奨します。
6.1. 大量メッセージ用の Kafka Connect の設定
Kafka Connect は、ソースの外部データシステムからデータをフェッチし、それを Kafka Connect ランタイムプロデューサーに渡して、ターゲットクラスターにレプリケートします。
次の例は、KafkaConnect
カスタムリソースを使用した Kafka Connect の設定を示しています。
大量のメッセージを処理するための Kafka Connect 設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: replicas: 3 config: offset.flush.timeout.ms: 10000 # ... resources: requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi # ...
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 3
config:
offset.flush.timeout.ms: 10000
# ...
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 2Gi
# ...
プロデューサー設定は、KafkaConnector
カスタムリソースを使用して管理されるソースコネクター用に追加されます。
大量のメッセージを処理するためのソースコネクターの設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: producer.override.batch.size: 327680 producer.override.linger.ms: 100 # ...
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 2
config:
producer.override.batch.size: 327680
producer.override.linger.ms: 100
# ...
FileStreamSourceConnector
および FileStreamSinkConnector
は、コネクターの例として提供されています。これらを KafkaConnector
リソースとしてデプロイする方法については、KafkaConnector
リソース のデプロイ を 参照してください。
シンクコネクターのコンシューマー設定が追加されます。
大量のメッセージを処理するためのシンクコネクターの設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector tasksMax: 2 config: consumer.fetch.max.bytes: 52428800 consumer.max.partition.fetch.bytes: 1048576 consumer.max.poll.records: 500 # ...
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-sink-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSinkConnector
tasksMax: 2
config:
consumer.fetch.max.bytes: 52428800
consumer.max.partition.fetch.bytes: 1048576
consumer.max.poll.records: 500
# ...
KafkaConnector
カスタムリソースの代わりに Kafka Connect API を使用してコネクターを管理している場合は、コネクター設定を JSON オブジェクトとして追加できます。
大量のメッセージを処理するためのソースコネクター設定を追加するための curl 要求の例
curl -X POST \ http://my-connect-cluster-connect-api:8083/connectors \ -H 'Content-Type: application/json' \ -d '{ "name": "my-source-connector", "config": { "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "file": "/opt/kafka/LICENSE", "topic":"my-topic", "tasksMax": "4", "type": "source" "producer.override.batch.size": 327680 "producer.override.linger.ms": 100 } }'
curl -X POST \
http://my-connect-cluster-connect-api:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "my-source-connector",
"config":
{
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/opt/kafka/LICENSE",
"topic":"my-topic",
"tasksMax": "4",
"type": "source"
"producer.override.batch.size": 327680
"producer.override.linger.ms": 100
}
}'