12.2. Kafka クラスターのスケーリング
Kafka クラスターからブローカーを追加または削除できます。ZooKeeper クラスターからノードを追加または削除することもできます。
ブローカーを追加または削除する場合は、kafka-reassign-partitions.sh を使用してパーティションを割り当てることができます。
Cruise Control を使用して、Kafka クラスターのリバランス時にブローカーの数に変更を組み込むこともできます。新しいブローカーをインストールして、リバランスに追加できます。削除する前に除外するブローカーを指定して理バラナスを実行できます。詳細は、14章Cruise Control を使用したクラスターのリバランス を参照してください。
12.2.1. Kafka クラスターへのブローカーの追加および削除 リンクのコピーリンクがクリップボードにコピーされました!
トピックのスループットを向上させる主な方法は、そのトピックのパーティション数を増やすことです。パーティションによってクラスター内のブローカー間でそのトピックの負荷が共有できます。ブローカーが何らかのリソース (通常は I/O) によって制約されている場合、より多くのパーティションを使用してもスループットは向上しません。代わりに、クラスターにブローカーを追加する必要があります。
追加のブローカーをクラスターに追加する場合、AMQ Streams ではパーティションは自動的に割り当てられません。既存のブローカーから新しいブローカーに移動するパーティションを決定する必要があります。すべてのブローカー間でパーティションが再分散されたら、各ブローカーのリソース使用率が低下するはずです。
クラスターからブローカーを削除する前に、そのブローカーにパーティションが割り当てられていないことを確認します。使用を停止するブローカーの各パーティションに対応する残りのブローカーを決定する必要があります。ブローカーに割り当てられたパーティションがない場合は、ブローカーを停止することができます。
12.2.2. パーティションの再割り当て リンクのコピーリンクがクリップボードにコピーされました!
kafka-reassign-partitions.sh は、パーティションを別のブローカーに再割り当てする際に使用します。
これには、以下の 3 つのモードがあります。
--generate- トピックとブローカーのセットを取得し、再割り当て JSON ファイル を生成します。これにより、トピックのパーティションがブローカーに割り当てられます。再割り当て JSON ファイル を生成する簡単な方法です。ただし、トピック全体で機能するため、その使用が常に適切であるとは限りません。
--execute- 再割り当て JSON ファイル を取得し、クラスターのパーティションおよびブローカーに適用します。パーティションを取得するブローカーは、パーティションリーダーのフォロワーになります。特定のパーティションでは、新規ブローカーが ISR に参加できたら、古いブローカーはフォロワーではなくなり、そのレプリカが削除されます。
--verify-
--verifyは、-- executeステップと同じ 再割り当て JSON ファイル を使用して、ファイル内のすべてのパーティションが目的のブローカーに移動されたかどうかを確認します。再割り当てが完了すると、有効な スロットル も削除されます。スロットルを削除しないと、再割り当てが完了した後もクラスターは影響を受け続けます。
クラスターでは、1 度に 1 つの再割当てのみを実行でき、実行中の再割り当てをキャンセルすることはできません。再割り当てをキャンセルする必要がある場合は、割り当てが完了するのを待ってから別の再割り当てを実行し、最初の再割り当ての結果を元に戻します。kafka-reassign-partitions.sh によって、元に戻すための再割り当て JSON が出力の一部として生成されます。大規模な再割り当ては、進行中の再割り当てを停止する必要がある場合に備えて、複数の小さな再割り当てに分割するようにしてください。
12.2.2.1. 再割り当て JSON ファイル リンクのコピーリンクがクリップボードにコピーされました!
再割り当て JSON ファイル には特定の構造があります。
{
"version": 1,
"partitions": [
<PartitionObjects>
]
}
ここで <PartitionObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{
"topic": <TopicName>,
"partition": <Partition>,
"replicas": [ <AssignedBrokerIds> ],
"log_dirs": [<LogDirs>]
}
"log_dirs" プロパティーはオプションで、パーティションを特定のログディレクトリーに移動するために使用されます。
以下は、トピック topic-a およびパーティション 4 をブローカー 2、4 および 7 に割り当て、トピック topic-b およびパーティション 2 をブローカー 1、5、および 7 に割り当てる、再割り当て JSON ファイルの例になります。
{
"version": 1,
"partitions": [
{
"topic": "topic-a",
"partition": 4,
"replicas": [2,4,7]
},
{
"topic": "topic-b",
"partition": 2,
"replicas": [1,5,7]
}
]
}
JSON に含まれていないパーティションは変更されません。
12.2.2.2. 再割り当て JSON ファイルの生成 リンクのコピーリンクがクリップボードにコピーされました!
指定のトピックのセットのすべてのパーティションを、指定のブローカーのセットに割り当てる最も簡単な方法は、kafka-reassign-partitions.sh --generate コマンドを使用して再割り当て JSON ファイルを生成することです。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --topics-to-move-json-file <topics_file> --broker-list <broker_list> --generate
<topics_file> は、移動するトピックをリストする JSON ファイルです。これには、以下の構造があります。
{
"version": 1,
"topics": [
<topic_objects>
]
}
ここで <topic_objects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{
"topic": <TopicName>
}
たとえば、topic-a および topic-b のすべてのパーティションをブローカー 4 および 7 に移動する場合は、以下を実行します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
topics-to-be-moved.json のコンテンツがあります。
{
"version": 1,
"topics": [
{ "topic": "topic-a"},
{ "topic": "topic-b"}
]
}
12.2.2.3. 手動による再割り当て JSON ファイルの作成 リンクのコピーリンクがクリップボードにコピーされました!
特定のパーティションを移動したい場合は、再割り当て JSON ファイルを手動で作成できます。
12.2.3. 再割り当てスロットル リンクのコピーリンクがクリップボードにコピーされました!
パーティションの再割り当てには、多くのデータをブローカー間で移動させる必要があるため、処理が遅くなる可能性があります。クライアントへの悪影響を防ぐため、再割り当てに スロットル を使用できます。スロットルを使用すると、再割り当てに時間がかかる可能性があります。スロットルが低すぎると、新たに割り当てられたブローカーは公開されるレコードに遅れずに対応することはできず、再割り当ては永久に完了しません。スロットルが高すぎると、クライアントに影響します。たとえば、プロデューサーの場合は、承認待ちが通常のレイテンシーよりも大きくなる可能性があります。コンシューマーの場合は、ポーリング間のレイテンシーが大きいことが原因でスループットが低下する可能性があります。
12.2.4. Kafka クラスターのスケールアップ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka クラスターでブローカーの数を増やす方法を説明します。
前提条件
- 既存の Kafka クラスター。
- AMQ ブローカーが インストールされている 新しいマシン。
- 拡大されたクラスターで、パーティションをブローカーに再割り当てする方法を示す 再割り当て JSON ファイル。
手順
-
クラスター内の他のブローカーと同じ設定を使用して新しいブローカーの設定ファイルを作成します。ただし、
broker.idには他のブローカで使用されていない番号を指定してください。 前のステップで作成した設定ファイルを
kafka-server-start.shスクリプトの引数に渡して、新しい Kafka ブローカーを起動します。su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesKafka ブローカーが稼働していることを確認します。
jcmd | grep Kafka- 新しいブローカーごとに上記の手順を繰り返します。
kafka-reassign-partitions.shコマンドラインツールを使用して、パーティションの再割り当てを実行します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --executeレプリケーションをスロットルで調整する場合、
--throttleと inter-broker のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --executeこのコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備えて、これをファイルに保存する必要があります。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡したターゲットの再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --executekafka-reassign-partitions.shコマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--executeオプションの代わりに--verifyオプションを使用します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify-
--verifyコマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verifyによって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。
12.2.5. Kafka クラスターのスケールダウン リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Kafka クラスターでブローカーの数を減らす方法を説明します。
前提条件
- 既存の Kafka クラスター。
- ブローカーが削除された後にクラスターのブローカーにパーティションを再割り当てする方法が記述されている 再割り当て JSON ファイル。
手順
kafka-reassign-partitions.shコマンドラインツールを使用して、パーティションの再割り当てを実行します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --executeレプリケーションをスロットルで調整する場合、
--throttleと inter-broker のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --executeこのコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備えて、これをファイルに保存する必要があります。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡したターゲットの再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --executekafka-reassign-partitions.shコマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--executeオプションの代わりに--verifyオプションを使用します。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify以下に例を示します。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify-
--verifyコマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verifyによって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。 削除する各ブローカーに、ログ (
log.dirs) にライブパーティションがないことを確認します。ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'ログディレクトリーが正規表現 (
\.[a-z0-9]-delete$) と一致しない場合は、アクティブなパーティションが存在します。アクティブなパーティションがある場合は、再割り当てが完了したどうか、あるいは、再割り当て JSON ファイルの設定を確認します。再割り当てはもう一度実行できます。次のステップに進む前に、アクティブなパーティションが存在しないことを確認します。ブローカーを停止します。
su - kafka /opt/kafka/bin/kafka-server-stop.shKafka ブローカーが停止していることを確認します。
jcmd | grep kafka
12.2.6. ZooKeeper クラスターのスケールアップ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、ZooKeeper クラスターにサーバー (ノード) を追加する方法について説明します。ZooKeeper の 動的再設定 機能は、スケールアッププロセス中に安定した ZooKeeper クラスターを維持します。
前提条件
-
動的再設定が ZooKeeper 設定ファイル (
reconfigEnabled=true) で有効になっている。 - ZooKeeper の認証が有効化され、認証メカニズムを使用して新しいサーバーにアクセスできる。
手順
各 ZooKeeper サーバーに対して、1 つずつ以下の手順を実行します。
- 「マルチノードの ZooKeeper クラスターの実行」 の説明に従ってサーバーを ZooKeeper クラスターに追加し、ZooKeeper を起動します。
- 新しいサーバーの IP アドレスと設定されたアクセスポートをメモします。
サーバーの
zookeeper-shellセッションを開始します。クラスターにアクセスできるマシンから次のコマンドを実行します (アクセスできる場合は、ZooKeeper ノードまたはローカルマシンの 1 つである可能性があります)。su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>シェルセッションで、ZooKeeper ノードが実行されている状態で、次の行を入力して、新しいサーバーを投票メンバーとしてクォーラムに追加します。
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>以下に例を示します。
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181<positive-id>は、新しいサーバー ID4です。2 つのポートの
<port1>2888 は ZooKeeper サーバー間の通信用で、<port2>3888 はリーダーエレクション用です。新しい設定は ZooKeeper クラスターの他のサーバーに伝播されます。新しいサーバーはクォーラムの完全メンバーになります。
- 追加する他のサーバーについて、手順 1 から 4 を繰り返します。
12.2.7. ZooKeeper クラスターのスケールダウン リンクのコピーリンクがクリップボードにコピーされました!
この手順では、ZooKeeper クラスターからサーバー (ノード) を削除する方法を説明します。ZooKeeper の 動的再設定 機能は、スケールダウンプロセス中に安定した ZooKeeper クラスターを維持します。
前提条件
-
動的再設定が ZooKeeper 設定ファイル (
reconfigEnabled=true) で有効になっている。 - ZooKeeper の認証が有効化され、認証メカニズムを使用して新しいサーバーにアクセスできる。
手順
各 ZooKeeper サーバーに対して、1 つずつ以下の手順を実行します。
スケールダウンの後も 維持される サーバーのいずれかで、
zookeeper-shellにログインします (例: サーバー 1)。注記ZooKeeper クラスターに設定された認証メカニズムを使用してサーバーにアクセスします。
サーバー (サーバー 5 など) を削除します。
reconfig -remove 5- 削除したサーバーを無効にします。
- ステップ 1 から 3 を繰り返し、クラスターサイズを縮小します。