第7章 大きなメッセージサイズの処理
Kafka のデフォルトのメッセージバッチサイズは 1 MB で、ほとんどのユースケースで最大のスループットを得るのに最適です。Kafka は、十分なディスク容量があれば、スループットを下げてより大きなバッチに対応できます。
サイズの大きいメッセージは、次の 4 つの方法で処理できます。
- ブローカー、プロデューサー、およびコンシューマーは、サイズのより大きいメッセージに対応するように設定されています。
- プロデューサー側のメッセージ圧縮 が、圧縮メッセージをログに書き込みます。
- 参照ベースのメッセージングでは、メッセージのペイロード内の他のシステムに保存されているデータへの参照のみが送信されます。
- インラインメッセージングが、同じキーを使用するチャンクにメッセージを分割し、これらを Kafka Streams などのストリームプロセッサーを使用して、出力に組み合わせます。
非常に大きなメッセージを処理するのでない限り、この設定アプローチが推奨されます。参照ベースのメッセージングとメッセージ圧縮のオプションは、他のほとんどの状況に対応します。いずれのオプションを使用する場合も、パフォーマンスの問題が発生しないように注意する必要があります。
7.1. より大きなメッセージを処理するための Kafka コンポーネントの設定
サイズの大きいメッセージはシステムのパフォーマンスに影響を与え、メッセージ処理を複雑化させる可能性があります。サイズの大きいメッセージを回避できない場合は、設定オプションを利用できます。サイズの大きいメッセージを効率的に処理し、メッセージフローのブロックを防ぐには、次の設定を調整することを検討してください。
最大レコードバッチサイズの調整:
-
すべてのトピックでより大きなレコードバッチサイズをサポートするには、ブローカーレベルで
message.max.bytes
を設定します。 -
個々のトピックのより大きなレコードバッチサイズをサポートするには、トピックレベルで
max.message.bytes
を設定します。
-
すべてのトピックでより大きなレコードバッチサイズをサポートするには、ブローカーレベルで
-
各パーティションフォロワーによってフェッチされるメッセージの最大サイズを増やします (
replica.fetch.max.bytes
)。 -
プロデューサーのバッチサイズ (
batch.size
) を増やして、単一のプロデュース要求で送信されるメッセージバッチのサイズを増やします。 -
より大きなレコードバッチに対応するために、プロデューサー (
max.request.size
) とコンシューマー (fetch.max.bytes
) の最大リクエストサイズを大きく設定します。 -
各パーティションでコンシューマーに返されるデータの量に、より高い上限 (
max.partition.fetch.bytes
) を設定します。
最大レコードバッチサイズに対応できるように、バッチ要求の最大サイズが少なくとも message.max.bytes
と同じ大きさであることを確認します。
ブローカー設定の例
message.max.bytes: 10000000 replica.fetch.max.bytes: 10485760
message.max.bytes: 10000000
replica.fetch.max.bytes: 10485760
プロデューサーの設定例
batch.size: 327680 max.request.size: 10000000
batch.size: 327680
max.request.size: 10000000
コンシューマー設定の例
fetch.max.bytes: 10000000 max.partition.fetch.bytes: 10485760
fetch.max.bytes: 10000000
max.partition.fetch.bytes: 10485760
また、Kafka Bridge、Kafka Connect、MirrorMaker 2 などの他の Kafka コンポーネントで使用されるプロデューサーとコンシューマーを設定して、サイズがより大きなメッセージをより効率的に処理することもできます。
- Kafka Bridge
特定のプロデューサーおよびコンシューマー設定プロパティーを使用して Kafka Bridge を設定します。
-
プロデューサー用の
producer.config
-
コンシューマー向け
consumer.config
Kafka Bridge の設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: # ... producer: config: batch.size: 327680 max.request.size: 10000000 consumer: config: fetch.max.bytes: 10000000 max.partition.fetch.bytes: 10485760 # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: # ... producer: config: batch.size: 327680 max.request.size: 10000000 consumer: config: fetch.max.bytes: 10000000 max.partition.fetch.bytes: 10485760 # ...
-
プロデューサー用の
- Kafka Connect
Kafka Connect の場合、プロデューサーとコンシューマーの設定プロパティーの接頭辞を使用して、メッセージの送受信を行うソースコネクターとシンクコネクターを設定します。
-
ソースコネクターが Kafka クラスターにメッセージを送信するために使用するプロデューサーの
producer.override
-
シンクコネクターが Kafka クラスターからメッセージを取得するために使用するコンシューマーの
consumer
Kafka Connect ソースコネクター設定の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: # ... config: producer.override.batch.size: 327680 producer.override.max.request.size: 10000000 # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: # ... config: producer.override.batch.size: 327680 producer.override.max.request.size: 10000000 # ...
Kafka Connect シンクコネクター設定の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect-cluster spec: # ... config: consumer.fetch.max.bytes: 10000000 consumer.max.partition.fetch.bytes: 10485760 # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect-cluster spec: # ... config: consumer.fetch.max.bytes: 10000000 consumer.max.partition.fetch.bytes: 10485760 # ...
-
ソースコネクターが Kafka クラスターにメッセージを送信するために使用するプロデューサーの
- MirrorMaker 2
MirrorMaker 2 の場合、プロデューサーとコンシューマーの設定プロパティーの接頭辞を使用して、ソース Kafka クラスターからメッセージを取得するソースコネクターを設定します。
-
ターゲットの Kafka クラスターにデータを複製するために使用されるランタイム Kafka Connect プロデューサーの
producer.override
-
シンクコネクターがソース Kafka クラスターからメッセージを取得するために使用するコンシューマーの
consumer
MirrorMaker 2 ソースコネクター設定の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: # ... mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: tasksMax: 2 config: producer.override.batch.size: 327680 producer.override.max.request.size: 10000000 consumer.fetch.max.bytes: 10000000 consumer.max.partition.fetch.bytes: 10485760 # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: # ... mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: tasksMax: 2 config: producer.override.batch.size: 327680 producer.override.max.request.size: 10000000 consumer.fetch.max.bytes: 10000000 consumer.max.partition.fetch.bytes: 10485760 # ...
-
ターゲットの Kafka クラスターにデータを複製するために使用されるランタイム Kafka Connect プロデューサーの