20.2. パーティションを再割り当てするための再割り当て JSON ファイルの生成


Kafka クラスターのスケーリング後にパーティションを再割り当てするには、kafka-reassign-partitions.sh ツールを使用して再割り当て JSON ファイルを生成します。ブローカーを追加または削除しても、既存のパーティションは自動的に再配布されません。パーティション分散のバランスをとり、新しいブローカーを最大限に活用するには、kafka-reassign-partitions.sh ツールを使用してパーティションを再割り当てできます。

このツールは、Kafka クラスターに接続された対話型 Pod コンテナーから実行します。

次の手順では、mTLS を使用したセキュアな再割り当てプロセスについて説明します。TLS 暗号化と mTLS 認証を使用する Kafka クラスターが必要です。

接続を確立するには、次のものが必要です。

  • Kafka クラスターの作成時に Cluster Operator によって生成されたクラスター CA 証明書とパスワード
  • ユーザーが Kafka クラスターへのクライアントアクセス用に作成されたときに User Operator によって生成されたユーザー CA 証明書とパスワード

この手順では、CA 証明書と対応するパスワードが、PKCS #12 (.p12 および .password) 形式で含まれているクラスターとユーザーシークレットから抽出されます。パスワードは、証明書を含む .p12 ストアへのアクセスを許可します。.p12 ストアを使用してトラストストアとキーストアを指定し、Kafka クラスターへの接続を認証します。

前提条件

  • Cluster Operator が実行中である。
  • 内部 TLS 暗号化と mTLS 認証で設定された Kafka リソースに基づいて実行中の Kafka クラスターがあります。

    TLS 暗号化と mTLS 認証を使用した Kafka 設定

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        listeners:
          # ...
          - name: tls
            port: 9093
            type: internal
            tls: true 1
            authentication:
              type: tls 2
        # ...

    1
    内部リスナーの TLS 暗号化を有効にします。
    2
    リスナー認証メカニズムは相互 tls として指定されます。
  • 稼働中の Kafka クラスターには、再割り当てするトピックおよびパーティションのセットが含まれます。

    my-topic のトピック設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 3
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
        # ...

  • Kafka ブローカーからトピックを生成および使用するパーミッションを指定する ACL ルールとともに KafkaUser が設定されています。

    my-topic および my-cluster での操作を許可する ACL ルールを使用した Kafka ユーザーの設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication: 1
        type: tls
      authorization:
        type: simple 2
        acls:
          # access to the topic
          - resource:
              type: topic
              name: my-topic
            operations:
              - Create
              - Describe
              - Read
              - AlterConfigs
            host: "*"
          # access to the cluster
          - resource:
              type: cluster
            operations:
              - Alter
              - AlterConfigs
            host: "*"
          # ...
      # ...

    1
    相互 tls として定義されたユーザー認証メカニズム。
    2
    ACL ルールの認可および付随するリスト。

手順

  1. Kafka クラスターの <cluster_name>-cluster-ca-cert シークレットからクラスター CA 証明書とパスワードを抽出します。

    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password

    <cluster_name> は、Kafka クラスターの名前に置き換えます。Kafka リソースを使用して Kafka をデプロイすると、Kafka クラスター名 (<cluster_name>-cluster-ca-cert) でクラスター CA 証明書のシークレットが作成されます。例: my-cluster-cluster-ca-cert

  2. Streams for Apache Kafka イメージを使用してインタラクティブな Pod コンテナーを新たに実行し、実行中の Kafka ブローカーに接続します。

    oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"

    <interactive_pod_name> は Pod の名前に置き換えます。

  3. クラスター CA 証明書をインタラクティブな Pod コンテナーにコピーします。

    oc cp ca.p12 <interactive_pod_name>:/tmp
  4. Kafka ブローカーへのアクセス権限を持つ Kafka ユーザーのシークレットから、ユーザー CA 証明書およびパスワードを抽出します。

    oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
    oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password

    <kafka_user> は Kafka ユーザーの名前に置き換えます。KafkaUser リソースを使用して Kafka ユーザーを作成すると、ユーザー CA 証明書のあるシークレットが Kafka ユーザー名で作成されます。例: my-user

  5. ユーザー CA 証明書をインタラクティブな Pod コンテナーにコピーします。

    oc cp user.p12 <interactive_pod_name>:/tmp

    CA 証明書を使用すると、インタラクティブな Pod コンテナーが TLS を使用して Kafka ブローカーに接続できます。

  6. config.properties ファイルを作成し、Kafka クラスターへの認証に使用されるトラストストアおよびキーストアを指定します。

    前の手順でデプロイメントした証明書とパスワードを使用します。

    bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 1
    security.protocol=SSL 2
    ssl.truststore.location=/tmp/ca.p12 3
    ssl.truststore.password=<truststore_password> 4
    ssl.keystore.location=/tmp/user.p12 5
    ssl.keystore.password=<keystore_password> 6
    1
    Kafka クラスターに接続するためのブートストラップサーバーアドレス。独自の Kafka クラスター名を使用して、<kafka_cluster_name> を置き換えます。
    2
    暗号化に TLS を使用する場合のセキュリティープロトコルオプション。
    3
    トラストストアの場所には、Kafka クラスターの公開鍵証明書 (ca.p12) が含まれます。
    4
    トラストストアにアクセスするためのパスワード (ca.password)。
    5
    キーストアの場所には、Kafka ユーザーの公開鍵証明書 (user.p12) が含まれます。
    6
    キーストアにアクセスするためのパスワード (user.password)。
  7. config.properties ファイルをインタラクティブな Pod コンテナーにコピーします。

    oc cp config.properties <interactive_pod_name>:/tmp/config.properties
  8. 移動するトピックを指定する topics.json という名前の JSON ファイルを準備します。

    トピック名をコンマ区切りの一覧として指定します。

    my-topic のすべてのパーティションを再割り当てするための JSON ファイルの例

    {
      "version": 1,
      "topics": [
        { "topic": "my-topic"}
      ]
    }

    このファイルを使用して、トピックのレプリケーション係数を変更 することもできます。

  9. topics.json ファイルをインタラクティブな Pod コンテナーにコピーします。

    oc cp topics.json <interactive_pod_name>:/tmp/topics.json
  10. インタラクティブな Pod コンテナーでシェルプロセスを開始します。

    oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash

    <namespace> を Pod が実行されている OpenShift namespace に置き換えます。

  11. kafka-reassign-partitions.sh コマンドを使用して、再割り当て JSON を生成します。

    my-topic のパーティションを指定したブローカーに移動するコマンドの例

    bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list 0,1,2,3,4 \
      --generate

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.