20.2. パーティションを再割り当てするための再割り当て JSON ファイルの生成
Kafka クラスターのスケーリング後にパーティションを再割り当てするには、kafka-reassign-partitions.sh
ツールを使用して再割り当て JSON ファイルを生成します。ブローカーを追加または削除しても、既存のパーティションは自動的に再配布されません。パーティション分散のバランスをとり、新しいブローカーを最大限に活用するには、kafka-reassign-partitions.sh
ツールを使用してパーティションを再割り当てできます。
このツールは、Kafka クラスターに接続された対話型 Pod コンテナーから実行します。
次の手順では、mTLS を使用したセキュアな再割り当てプロセスについて説明します。TLS 暗号化と mTLS 認証を使用する Kafka クラスターが必要です。
接続を確立するには、次のものが必要です。
- Kafka クラスターの作成時に Cluster Operator によって生成されたクラスター CA 証明書とパスワード
- ユーザーが Kafka クラスターへのクライアントアクセス用に作成されたときに User Operator によって生成されたユーザー CA 証明書とパスワード
この手順では、CA 証明書と対応するパスワードが、PKCS #12 (.p12
および .password
) 形式で含まれているクラスターとユーザーシークレットから抽出されます。パスワードは、証明書を含む .p12
ストアへのアクセスを許可します。.p12
ストアを使用してトラストストアとキーストアを指定し、Kafka クラスターへの接続を認証します。
前提条件
- Cluster Operator が実行中である。
内部 TLS 暗号化と mTLS 認証で設定された
Kafka
リソースに基づいて実行中の Kafka クラスターがあります。TLS 暗号化と mTLS 認証を使用した Kafka 設定
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... listeners: # ... - name: tls port: 9093 type: internal tls: true 1 authentication: type: tls 2 # ...
稼働中の Kafka クラスターには、再割り当てするトピックおよびパーティションのセットが含まれます。
my-topic
のトピック設定例apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 10 replicas: 3 config: retention.ms: 7200000 segment.bytes: 1073741824 # ...
Kafka ブローカーからトピックを生成および使用するパーミッションを指定する ACL ルールとともに
KafkaUser
が設定されています。my-topic
およびmy-cluster
での操作を許可する ACL ルールを使用した Kafka ユーザーの設定例apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: 1 type: tls authorization: type: simple 2 acls: # access to the topic - resource: type: topic name: my-topic operations: - Create - Describe - Read - AlterConfigs host: "*" # access to the cluster - resource: type: cluster operations: - Alter - AlterConfigs host: "*" # ... # ...
手順
Kafka クラスターの
<cluster_name>-cluster-ca-cert
シークレットからクラスター CA 証明書とパスワードを抽出します。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
<cluster_name> は、Kafka クラスターの名前に置き換えます。
Kafka
リソースを使用して Kafka をデプロイすると、Kafka クラスター名 (<cluster_name>-cluster-ca-cert
) でクラスター CA 証明書のシークレットが作成されます。例:my-cluster-cluster-ca-cert
Streams for Apache Kafka イメージを使用してインタラクティブな Pod コンテナーを新たに実行し、実行中の Kafka ブローカーに接続します。
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"
<interactive_pod_name> は Pod の名前に置き換えます。
クラスター CA 証明書をインタラクティブな Pod コンテナーにコピーします。
oc cp ca.p12 <interactive_pod_name>:/tmp
Kafka ブローカーへのアクセス権限を持つ Kafka ユーザーのシークレットから、ユーザー CA 証明書およびパスワードを抽出します。
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password
<kafka_user> は Kafka ユーザーの名前に置き換えます。
KafkaUser
リソースを使用して Kafka ユーザーを作成すると、ユーザー CA 証明書のあるシークレットが Kafka ユーザー名で作成されます。例:my-user
ユーザー CA 証明書をインタラクティブな Pod コンテナーにコピーします。
oc cp user.p12 <interactive_pod_name>:/tmp
CA 証明書を使用すると、インタラクティブな Pod コンテナーが TLS を使用して Kafka ブローカーに接続できます。
config.properties
ファイルを作成し、Kafka クラスターへの認証に使用されるトラストストアおよびキーストアを指定します。前の手順でデプロイメントした証明書とパスワードを使用します。
bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 1 security.protocol=SSL 2 ssl.truststore.location=/tmp/ca.p12 3 ssl.truststore.password=<truststore_password> 4 ssl.keystore.location=/tmp/user.p12 5 ssl.keystore.password=<keystore_password> 6
- 1
- Kafka クラスターに接続するためのブートストラップサーバーアドレス。独自の Kafka クラスター名を使用して、<kafka_cluster_name> を置き換えます。
- 2
- 暗号化に TLS を使用する場合のセキュリティープロトコルオプション。
- 3
- トラストストアの場所には、Kafka クラスターの公開鍵証明書 (
ca.p12
) が含まれます。 - 4
- トラストストアにアクセスするためのパスワード (
ca.password
)。 - 5
- キーストアの場所には、Kafka ユーザーの公開鍵証明書 (
user.p12
) が含まれます。 - 6
- キーストアにアクセスするためのパスワード (
user.password
)。
config.properties
ファイルをインタラクティブな Pod コンテナーにコピーします。oc cp config.properties <interactive_pod_name>:/tmp/config.properties
移動するトピックを指定する
topics.json
という名前の JSON ファイルを準備します。トピック名をコンマ区切りの一覧として指定します。
my-topic
のすべてのパーティションを再割り当てするための JSON ファイルの例{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
このファイルを使用して、トピックのレプリケーション係数を変更 することもできます。
topics.json
ファイルをインタラクティブな Pod コンテナーにコピーします。oc cp topics.json <interactive_pod_name>:/tmp/topics.json
インタラクティブな Pod コンテナーでシェルプロセスを開始します。
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
<namespace> を Pod が実行されている OpenShift namespace に置き換えます。
kafka-reassign-partitions.sh
コマンドを使用して、再割り当て JSON を生成します。my-topic
のパーティションを指定したブローカーに移動するコマンドの例bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate