15.2. ブローカーの追加後のパーティションの再割り当て
Kafka クラスター内のブローカーの数を増やした後、kafka-reassign-partitions.sh
ツールによって生成された再割り当てファイルを使用してパーティションを再割り当てします。再割り当てファイルには、拡大された Kafka クラスター内のブローカーにパーティションを再度割り当てる方法を記述する必要があります。ファイルで指定された再割り当てをブローカーに適用し、新しいパーティションの割り当てを確認します。
この手順では、TLS を使用するセキュアなスケーリングプロセスを説明します。TLS 暗号化と mTLS 認証を使用する Kafka クラスターが必要です。
kafka-reassign-partitions.sh
ツールを使用することもできますが、パーティション再割り当てとクラスターの再バランシングを自動化 するには、Cruise Control の使用を推奨します。Cruise Control は、ダウンタイムなしでトピックをあるブローカーから別のブローカーに移動でき、パーティションを再割り当てする最も効率的な方法です。
前提条件
- 既存の Kafka クラスター。
- 追加の AMQ ブローカーが インストールされた 新しいマシン。
拡大されたクラスター内のブローカーにパーティションを再割り当てする方法を指定する JSON ファイルを作成している。
この手順では、
my-topic
というトピックのすべてのパーティションを再割り当てします。topic.json
という名前の JSON ファイルはトピックを指定し、reassignment.json
ファイルの生成に使用されます。
my-topic
を指定する JSON ファイルの例
{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
手順
-
クラスター内の他のブローカーと同じ設定を使用して新しいブローカーの設定ファイルを作成します。ただし、
broker.id
には他のブローカで使用されていない番号を指定してください。 前のステップで作成した設定ファイルを
kafka-server-start.sh
スクリプトの引数に渡して、新しい Kafka ブローカーを起動します。su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
Kafka ブローカーが稼働していることを確認します。
jcmd | grep Kafka
- 新しいブローカーごとに上記の手順を繰り返します。
まだ行っていない場合は、
kafka-reassign-partitions.sh
ツールを使用してreassignment.json
という名前の再割り当て JSON ファイルを生成します。再割り当て JSON ファイルを生成するコマンドの例
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --topics-to-move-json-file topics.json \ 1 --broker-list 0,1,2,3,4 \ 2 --generate
現在のレプリカ割り当てと提案されたレプリカ割り当てを示す再割り当て JSON ファイルの例
Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3,4],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4,0],"log_dirs":["any","any","any","any"]}]}
後で変更を元に戻す必要がある場合に備えて、このファイルのコピーをローカルに保存します。
--execute
オプションを使用してパーティションの再割り当てを実行します。/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --execute
レプリケーションをスロットルで調整する場合、
--throttle
とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --throttle 5000000 \ --execute
--verify
オプションを使用して、再割り当てが完了したことを確認します。/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --verify
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。