20.5. トピックのレプリケーション係数の変更
Kafka クラスター内のトピックのレプリケーション係数を変更するには、kafka-reassign-partitions.sh
ツールを使用します。これを行うには、Kafka クラスターに接続されている対話型の Pod コンテナーからツールを実行し、再割り当てファイルを使用してトピックレプリカを変更する方法を記述します。
この手順では、TLS を使用するセキュアなプロセスについて説明します。TLS 暗号化と mTLS 認証を使用する Kafka クラスターが必要です。
前提条件
-
内部 TLS 暗号化と mTLS 認証で設定された
Kafka
リソースに基づいて実行中の Kafka クラスターがあります。 - 実行中の Kafka ブローカーに接続されている対話型 Pod コンテナーを実行している。
-
reassignment.json
という名前の再割り当て JSON ファイルを生成している。 -
KafkaUser
として接続されている。このユーザーは、Kafka クラスターとそのトピックの管理権限を指定する ACL ルールで設定されている。
再割り当て JSON ファイルの生成 を参照してください。
この手順では、my-topic
というトピックに 4 つのレプリカがあり、それを 3 つに減らしたいと考えています。topic.json
という名前の JSON ファイルはトピックを指定し、reassignment.json
ファイルの生成に使用されました。
my-topic
を指定する JSON ファイルの例
{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
手順
まだ確認していない場合には、インタラクティブな Pod コンテナーを実行 して
reassignment.json
という名前の再割り当てJSONファイルを生成します。現在のレプリカ割り当てと提案されたレプリカ割り当てを示す再割り当て JSON ファイルの例
Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[3,4,2,0],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[0,2,3,1],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[1,3,0,4],"log_dirs":["any","any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3,4],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4,0],"log_dirs":["any","any","any","any"]}]}
後で変更を元に戻す必要がある場合に備えて、このファイルのコピーをローカルに保存します。
reassignment.json
を編集して、各パーティションからレプリカを削除します。たとえば、
jq
コマンドライン JSON パーサーツール を使用して、トピックの各パーティションのリストに含まれる最後のレプリカを削除します。各パーティションの最後のトピックレプリカの削除
jq '.partitions[].replicas |= del(.[-1])' reassignment.json > reassignment.json
更新されたレプリカを示す再割り当てファイルの例
{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4],"log_dirs":["any","any","any","any"]}]}
reassignment.json
ファイルを対話型 Pod コンテナーにコピーします。oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.json
<interactive_pod_name> は Pod の名前に置き換えます。
インタラクティブな Pod コンテナーでシェルプロセスを開始します。
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
<namespace> を Pod が実行されている OpenShift namespace に置き換えます。
インタラクティブな Pod コンテナーから
kafka-reassign-partitions.sh
スクリプトを使用して、トピックレプリカを変更します。bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --execute
注記ブローカーからレプリカを削除する場合、ブローカー間のデータ移動は必要ないため、レプリケーションを調整する必要はありません。レプリカを追加している場合は、スロットルレートを変更することができます。
いずれかのブローカー Pod から
kafka-reassign-partitions.sh
コマンドラインツールを使用して、トピックレプリカへの変更が完了したことを確認します。これは先ほどの手順と同じコマンドですが、--verify
オプションの代わりに--execute
オプションを使用します。bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --verify
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。--describe
オプションを指定してbin/kafka-topics.sh
コマンドを実行して、トピックへの変更の結果を確認します。bin/kafka-topics.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --describe
トピックのレプリカ数を削減した結果
my-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 my-topic Partition: 1 Leader: 2 Replicas: 1,2,3 Isr: 1,2,3 my-topic Partition: 2 Leader: 3 Replicas: 2,3,4 Isr: 2,3,4
最後に、
KafkaTopic
カスタムリソースを編集して.spec.replicas
を 3 に変更し、調整を待ちます。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 3