10.2. Kafka クラスターのスケーリング
Kafka クラスターからブローカーを追加または削除できます。ZooKeeper クラスターからノードを追加または削除することもできます。
ブローカーを追加または削除する場合は、kafka-reassign-partitions.sh
を使用してパーティションを割り当てることができます。
Cruise Control を使用して、Kafka クラスターのリバランス時にブローカーの数に変更を組み込むこともできます。新しいブローカーをインストールして、リバランスに追加できます。削除する前に除外するブローカーを指定して理バラナスを実行できます。詳細は、14章Cruise Control を使用したクラスターのリバランス を参照してください。
10.2.1. Kafka クラスターへのブローカーの追加および削除
トピックのスループットを向上させる主な方法は、そのトピックのパーティション数を増やすことです。パーティションによってクラスター内のブローカー間でそのトピックの負荷が共有できます。ブローカーが何らかのリソース (通常は I/O) によって制約されている場合、より多くのパーティションを使用してもスループットは向上しません。代わりに、クラスターにブローカーを追加する必要があります。
追加のブローカーをクラスターに追加する場合、AMQ Streams ではパーティションは自動的に割り当てられません。既存のブローカーから新しいブローカーに移動するパーティションを決定する必要があります。すべてのブローカー間でパーティションが再分散されたら、各ブローカーのリソース使用率が低下するはずです。
クラスターからブローカーを削除する前に、そのブローカーにパーティションが割り当てられていないことを確認します。使用を停止するブローカーの各パーティションに対応する残りのブローカーを決定する必要があります。ブローカーに割り当てられたパーティションがない場合は、ブローカーを停止することができます。
10.2.2. パーティションの再割り当て
kafka-reassign-partitions.sh
は、パーティションを別のブローカーに再割り当てする際に使用します。
これには、以下の 3 つのモードがあります。
--generate
- トピックとブローカーのセットを取得し、再割り当て JSON ファイル を生成します。これにより、トピックのパーティションがブローカーに割り当てられます。再割り当て JSON ファイル を生成する簡単な方法です。ただし、トピック全体で機能するため、その使用が常に適切であるとは限りません。
--execute
- 再割り当て JSON ファイル を取得し、クラスターのパーティションおよびブローカーに適用します。パーティションを取得するブローカーは、パーティションリーダーのフォロワーになります。特定のパーティションでは、新規ブローカーが ISR に参加できたら、古いブローカーはフォロワーではなくなり、そのレプリカが削除されます。
--verify
-
--verify
は、-- execute
ステップと同じ 再割り当て JSON ファイル を使用して、ファイル内のすべてのパーティションが目的のブローカーに移動されたかどうかを確認します。再割り当てが完了すると、有効な スロットル も削除されます。スロットルを削除しないと、再割り当てが完了した後もクラスターは影響を受け続けます。
クラスターでは、1 度に 1 つの再割当てのみを実行でき、実行中の再割り当てをキャンセルすることはできません。再割り当てをキャンセルする必要がある場合は、割り当てが完了するのを待ってから別の再割り当てを実行し、最初の再割り当ての結果を元に戻します。kafka-reassign-partitions.sh
によって、元に戻すための再割り当て JSON が出力の一部として生成されます。大規模な再割り当ては、進行中の再割り当てを停止する必要がある場合に備えて、複数の小さな再割り当てに分割するようにしてください。
10.2.2.1. 再割り当て JSON ファイル
再割り当て JSON ファイル には特定の構造があります。
{
"version": 1,
"partitions": [
<PartitionObjects>
]
}
ここで <PartitionObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ], "log_dirs": [<LogDirs>] }
"log_dirs"
プロパティーはオプションで、パーティションを特定のログディレクトリーに移動するために使用されます。
以下は、トピック topic-a
およびパーティション 4
をブローカー 2
、4
および 7
に割り当て、トピック topic-b
およびパーティション 2
をブローカー 1
、5
、および 7
に割り当てる、再割り当て JSON ファイルの例になります。
{ "version": 1, "partitions": [ { "topic": "topic-a", "partition": 4, "replicas": [2,4,7] }, { "topic": "topic-b", "partition": 2, "replicas": [1,5,7] } ] }
JSON に含まれていないパーティションは変更されません。
10.2.2.2. 再割り当て JSON ファイルの生成
指定のトピックのセットのすべてのパーティションを、指定のブローカーのセットに割り当てる最も簡単な方法は、kafka-reassign-partitions.sh --generate
コマンドを使用して再割り当て JSON ファイルを生成することです。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --topics-to-move-json-file <topics_file> --broker-list <broker_list> --generate
<topics_file>
は、移動するトピックをリストする JSON ファイルです。これには、以下の構造があります。
{
"version": 1,
"topics": [
<topic_objects>
]
}
ここで <topic_objects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{
"topic": <TopicName>
}
たとえば、topic-a
および topic-b
のすべてのパーティションをブローカー 4
および 7
に移動する場合は、以下を実行します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
topics-to-be-moved.json
のコンテンツがあります。
{ "version": 1, "topics": [ { "topic": "topic-a"}, { "topic": "topic-b"} ] }
10.2.2.3. 手動による再割り当て JSON ファイルの作成
特定のパーティションを移動したい場合は、再割り当て JSON ファイルを手動で作成できます。
10.2.3. 再割り当てスロットル
パーティションの再割り当てには、多くのデータをブローカー間で移動させる必要があるため、処理が遅くなる可能性があります。クライアントへの悪影響を防ぐため、再割り当てに スロットル を使用できます。スロットルを使用すると、再割り当てに時間がかかる可能性があります。スロットルが低すぎると、新たに割り当てられたブローカーは公開されるレコードに遅れずに対応することはできず、再割り当ては永久に完了しません。スロットルが高すぎると、クライアントに影響します。たとえば、プロデューサーの場合は、承認待ちが通常のレイテンシーよりも大きくなる可能性があります。コンシューマーの場合は、ポーリング間のレイテンシーが大きいことが原因でスループットが低下する可能性があります。
10.2.4. Kafka クラスターのスケールアップ
この手順では、Kafka クラスターでブローカーの数を増やす方法を説明します。
前提条件
- 既存の Kafka クラスター。
- AMQ ブローカーが インストールされている 新しいマシン。
- 拡大されたクラスターで、パーティションをブローカーに再割り当てする方法を示す 再割り当て JSON ファイル。
手順
-
クラスター内の他のブローカーと同じ設定を使用して新しいブローカーの設定ファイルを作成します。ただし、
broker.id
には他のブローカで使用されていない番号を指定してください。 前のステップで作成した設定ファイルを
kafka-server-start.sh
スクリプトの引数に渡して、新しい Kafka ブローカーを起動します。su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Kafka ブローカーが稼働していることを確認します。
jcmd | grep Kafka
- 新しいブローカーごとに上記の手順を繰り返します。
kafka-reassign-partitions.sh
コマンドラインツールを使用して、パーティションの再割り当てを実行します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute
レプリケーションをスロットルで調整する場合、
--throttle
と inter-broker のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute
このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備えて、これをファイルに保存する必要があります。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡したターゲットの再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute
kafka-reassign-partitions.sh
コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute
オプションの代わりに--verify
オプションを使用します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify
以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify
-
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。
10.2.5. Kafka クラスターのスケールダウン
この手順では、Kafka クラスターでブローカーの数を減らす方法を説明します。
前提条件
- 既存の Kafka クラスター。
- ブローカーが削除された後にクラスターのブローカーにパーティションを再割り当てする方法が記述されている 再割り当て JSON ファイル。
手順
kafka-reassign-partitions.sh
コマンドラインツールを使用して、パーティションの再割り当てを実行します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute
レプリケーションをスロットルで調整する場合、
--throttle
と inter-broker のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute
このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備えて、これをファイルに保存する必要があります。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡したターゲットの再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute
kafka-reassign-partitions.sh
コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute
オプションの代わりに--verify
オプションを使用します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify
以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify
-
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。 削除する各ブローカーに、ログ (
log.dirs
) にライブパーティションがないことを確認します。ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
ログディレクトリーが正規表現 (
\.[a-z0-9]-delete$
) と一致しない場合は、アクティブなパーティションが存在します。アクティブなパーティションがある場合は、再割り当てが完了したどうか、あるいは、再割り当て JSON ファイルの設定を確認します。再割り当てはもう一度実行できます。次のステップに進む前に、アクティブなパーティションが存在しないことを確認します。ブローカーを停止します。
su - kafka /opt/kafka/bin/kafka-server-stop.sh
Kafka ブローカーが停止していることを確認します。
jcmd | grep kafka
10.2.6. ZooKeeper クラスターのスケールアップ
この手順では、ZooKeeper クラスターにサーバー (ノード) を追加する方法について説明します。ZooKeeper の 動的再設定 機能は、スケールアッププロセス中に安定した ZooKeeper クラスターを維持します。
前提条件
-
動的再設定が ZooKeeper 設定ファイル (
reconfigEnabled=true
) で有効になっている。 - ZooKeeper の認証が有効化され、認証メカニズムを使用して新しいサーバーにアクセスできる。
手順
各 ZooKeeper サーバーに対して、1 つずつ以下の手順を実行します。
- 「マルチノードの ZooKeeper クラスターの実行」 の説明に従ってサーバーを ZooKeeper クラスターに追加し、ZooKeeper を起動します。
- 新しいサーバーの IP アドレスと設定されたアクセスポートをメモします。
サーバーの
zookeeper-shell
セッションを開始します。クラスターにアクセスできるマシンから次のコマンドを実行します (アクセスできる場合は、ZooKeeper ノードまたはローカルマシンの 1 つである可能性があります)。su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>
シェルセッションで、ZooKeeper ノードが実行されている状態で、次の行を入力して、新しいサーバーを投票メンバーとしてクォーラムに追加します。
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>
以下に例を示します。
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181
<positive-id>
は、新しいサーバー ID4
です。2 つのポートの
<port1>
2888 は ZooKeeper サーバー間の通信用で、<port2>
3888 はリーダーエレクション用です。新しい設定は ZooKeeper クラスターの他のサーバーに伝播されます。新しいサーバーはクォーラムの完全メンバーになります。
- 追加する他のサーバーについて、手順 1 から 4 を繰り返します。
10.2.7. ZooKeeper クラスターのスケールダウン
この手順では、ZooKeeper クラスターからサーバー (ノード) を削除する方法を説明します。ZooKeeper の 動的再設定 機能は、スケールダウンプロセス中に安定した ZooKeeper クラスターを維持します。
前提条件
-
動的再設定が ZooKeeper 設定ファイル (
reconfigEnabled=true
) で有効になっている。 - ZooKeeper の認証が有効化され、認証メカニズムを使用して新しいサーバーにアクセスできる。
手順
各 ZooKeeper サーバーに対して、1 つずつ以下の手順を実行します。
スケールダウンの後も 維持される サーバーのいずれかで、
zookeeper-shell
にログインします (例: サーバー 1)。注記ZooKeeper クラスターに設定された認証メカニズムを使用してサーバーにアクセスします。
サーバー (サーバー 5 など) を削除します。
reconfig -remove 5
- 削除したサーバーを無効にします。
- ステップ 1 から 3 を繰り返し、クラスターサイズを縮小します。
関連情報
- ZooKeeper のドキュメントの サーバーの削除