第2章 デプロイメント設定


本章では、カスタムリソースを使用してサポートされるデプロイメントのさまざまな側面を設定する方法について説明します。

  • Kafka クラスター
  • Kafka Connect クラスター
  • Source2Image がサポートされる Kafka Connect クラスター
  • Kafka MirrorMaker
  • Kafka Bridge
  • Cruise Control
注記

カスタムリソースに適用されるラベルは、Kafka MirrorMaker を構成する OpenShift リソースにも適用されます。そのため、必要に応じてリソースにラベルが適用されるため便利です。

2.1. Kafka クラスターの設定

ここでは、AMQ Streams クラスターで Kafka デプロイメントを設定する方法を説明します。Kafka クラスターは ZooKeeper クラスターとデプロイされます。デプロイメントには、Kafka トピックおよびユーザーを管理する Topic Operator および User Operator も含まれます。

Kafkaの設定は、Kafkaリソースを使って行います。設定オプションは、Kafka リソース内の ZooKeeper および Entity Operator でも利用できます。Entity Operator は Topic Operator と User Operator で構成されます。

Kafka リソースの完全なスキーマは Kafka スキーマ参照」 に記載されています。

リスナーの設定

クライアントを Kafka ブローカーに接続するためのリスナーを設定します。ブローカーに接続するためのリスナーの設定に関する詳細は、「リスナーの設定」を参照してください。

Kafka へのアクセスの承認

ユーザーが実行するアクションを許可または拒否するように Kafka クラスターを設定できます。Kafka ブローカーへのアクセスをセキュアにするための詳細は、「Kafka へのアクセス管理」を参照してください。

TLS 証明書の管理

Kafka をデプロイする場合、Cluster Operator は自動で TLS 証明書の設定および更新を行い、クラスター内での暗号化および認証を有効にします。必要な場合は、更新期間の終了前にクラスターおよびクライアント CA 証明書を手動で更新できます。クラスターおよびクライアント CA 証明書によって使用される鍵を置き換えることもできます。詳細は、「CA 証明書の手動更新」および「秘密鍵の置換」を参照してください。

その他のリソース

2.1.1. Kafka の設定

Kafka リソースのプロパティーを使用して、Kafka デプロイメントを設定します。

Kafka の設定に加え、ZooKeeper および AMQ Streams Operator の設定を追加することもできます。ロギングやヘルスチェックなどの一般的な設定プロパティーは、コンポーネントごとに独立して設定されます。

この手順では、可能な設定オプションの一部のみを取り上げますが、特に重要なオプションは次のとおりです。

  • リソース要求 (CPU/メモリー)
  • 最大および最小メモリー割り当ての JVM オプション
  • リスナー (およびクライアントの認証)
  • 認証
  • ストレージ
  • ラックアウェアネス (Rack Awareness)
  • メトリクス
  • Cruise Control によるクラスターのリバランス

Kafka バージョン

Kafka configlog.message.format.version および inter.broker.protocol.version プロパティは、指定されたKafkaのバージョン(spec.kafka.version)でサポートされているバージョンでなければなりません。プロパティーは、メッセージに追加されるログ形式のバージョンと、Kafka クラスターで使用される Kafka プロトコルのバージョンを表します。Kafka バージョンのアップグレード時に、これらのプロパティーの更新が必要になります。詳細は、『Deploying and Upgrading AMQ Streams on OpenShift』の「Upgrading Kafka」を参照してください。

前提条件

  • OpenShift クラスター。
  • 稼働中の Cluster Operator。

以下をデプロイする手順については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。

手順

  1. Kafka リソースの spec プロパティーを編集します。

    設定可能なプロパティーは以下の例のとおりです。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        replicas: 3 1
        version: 2.8.0 2
        logging: 3
          type: inline
          loggers:
            kafka.root.logger.level: "INFO"
        resources: 4
          requests:
            memory: 64Gi
            cpu: "8"
          limits:
            memory: 64Gi
            cpu: "12"
        readinessProbe: 5
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe:
          initialDelaySeconds: 15
          timeoutSeconds: 5
        jvmOptions: 6
          -Xms: 8192m
          -Xmx: 8192m
        image: my-org/my-image:latest 7
        listeners: 8
          - name: plain 9
            port: 9092 10
            type: internal 11
            tls: false 12
            configuration:
              useServiceDnsDomain: true 13
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication: 14
              type: tls
          - name: external 15
            port: 9094
            type: route
            tls: true
            configuration:
              brokerCertChainAndKey: 16
                secretName: my-secret
                certificate: my-certificate.crt
                key: my-key.key
        authorization: 17
          type: simple
        config: 18
          auto.create.topics.enable: "false"
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 2
          log.message.format.version: 2.8
          inter.broker.protocol.version: 2.8
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 19
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
        storage: 20
          type: persistent-claim 21
          size: 10000Gi 22
        rack: 23
          topologyKey: topology.kubernetes.io/zone
        metricsConfig: 24
          type: jmxPrometheusExporter
          valueFrom:
            configMapKeyRef: 25
              name: my-config-map
              key: my-key
        # ...
      zookeeper: 26
        replicas: 3 27
        logging: 28
          type: inline
          loggers:
            zookeeper.root.logger: "INFO"
        resources:
          requests:
            memory: 8Gi
            cpu: "2"
          limits:
            memory: 8Gi
            cpu: "2"
        jvmOptions:
          -Xms: 4096m
          -Xmx: 4096m
        storage:
          type: persistent-claim
          size: 1000Gi
        metricsConfig:
          # ...
      entityOperator: 29
        tlsSidecar: 30
          resources:
            requests:
              cpu: 200m
              memory: 64Mi
            limits:
              cpu: 500m
              memory: 128Mi
        topicOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
          logging: 31
            type: inline
            loggers:
              rootLogger.level: "INFO"
          resources:
            requests:
              memory: 512Mi
              cpu: "1"
            limits:
              memory: 512Mi
              cpu: "1"
        userOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
          logging: 32
            type: inline
            loggers:
              rootLogger.level: INFO
          resources:
            requests:
              memory: 512Mi
              cpu: "1"
            limits:
              memory: 512Mi
              cpu: "1"
      kafkaExporter: 33
        # ...
      cruiseControl: 34
        # ...
        tlsSidecar: 35
        # ...
    1
    レプリカノードの数。クラスターにトピックがすでに定義されている場合は、クラスターをスケーリング できます。
    2
    Kafka バージョン。アップグレード手順に従うと、サポート対象のバージョンに変更できます。
    3
    指定された Kafka loggers and log levels が ConfigMap を介して直接的に (inline) または間接的に (external) に追加されます。カスタム ConfigMap は、log4j.properties キー下に配置する必要があります。Kafka kafka.root.logger.level ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。
    4
    サポートされているリソース(現在は cpumemory)の予約と、消費可能な最大リソースを指定するための制限を要求します。
    5
    コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック
    6
    Kafka を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション
    7
    高度な任意設定: 特別な場合のみ推奨されるコンテナーイメージの設定
    8
    リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、OpenShift クラスター内部または外部からの接続の 内部 または 外部 リスナーとして設定されます
    9
    リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
    10
    Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
    11
    internal として、または external リスナーに対して指定されるリスナータイプ(routeloadbalancernodeport、または ingress)。
    12
    各リスナーの TLS 暗号化を有効にします。デフォルトは false です。route リスナーには TLS 暗号化は必要ありません。
    13
    クラスターサービスサフィックス(通常は cluster.local)を含む完全修飾 DNS 名が割り当てられているかどうかを定義します。
    14
    15
    16
    外部の認証局によって管理される Kafka リスナー証明書 の任意設定。brokerCertChainAndKey は、サーバー証明書および秘密鍵が含まれる Secret を指定します。TLS による暗号化が有効な任意のリスナーで Kafka リスナー証明書を設定できます。
    17
    承認は Kafka ブローカーで簡易、OAUTH2.0、または OPA 承認を有効にします。簡易承認では、AclAuthorizer Kafka プラグインが使用されます。
    18
    19
    20
    Storageephemeralpersistent-claimjbodのいずれかに設定されています。
    21
    22
    パーシステントストレージには、ダイナミックボリュームプロビジョニングのためのストレージ idclass など、 追加の設定オプションがあります。
    23
    ラックアウェアネス (Rack awareness) は、異なるラック全体でレプリカを分散するために設定されます。topologykey はクラスターノードのラベルと一致する必要があります。
    24
    Prometheus メトリクス は有効になっています。この例では、メトリクスは Prometheus JMX Exporter (デフォルトのメトリクスエクスポーター) に対して設定されます。
    25
    Prometheus JMX Exporter 経由でメトリクスを Grafana ダッシュボードにエクスポートする Prometheus ルール。Prometheus JMX Exporter の設定が含まれる ConfigMap を参照することで有効になります。metricsConfig.valueFrom.configMapKeyRef.key 配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリクスを有効にできます。
    26
    Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定。
    27
    ZooKeeper ノードの数通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを損失すると、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。AMQ Streams では、 ZooKeeper クラスターの安定性および高可用性が重要になります。
    28
    29
    30
    Entity Operator の TLS サイドカー設定。Entity Operator は、ZooKeeper とのセキュアな通信に TLS サイドカーを使用します。
    31
    指定された Topic Operator ロガーおよびログレベル。この例では、inline ロギングを使用します。
    32
    33
    Kafka Exporter の設定。Kafka Exporter は、特にコンシューマーラグデータなどのメトリクスデータを Kafka ブローカーから抽出する任意のコンポーネントです。
    34
    Kafka クラスターのリバランス に使用される Cruise Control の任意設定。
    35
    Cruise Conrol の TLS サイドカーの設定。Cruise Control は、ZooKeeper とのセキュアな通信に TLS サイドカーを使用します。
  2. リソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE

2.1.2. Entity Operator の設定

Entity Operator は、実行中の Kafka クラスターで Kafka 関連のエンティティーを管理します。

Entity Operator は以下と構成されます。

Cluster Operator は Kafka リソース設定を介して、Kafka クラスターのデプロイ時に、上記の Operator の 1 つまたは両方を含む Entity Operator をデプロイできます。

注記

デプロイされると、デプロイメント設定に応じて、Entity Operator にオペレーターが含まれます。

これらのオペレーターは、Kafka クラスターのトピックおよびユーザーを管理するために自動的に設定されます。

2.1.2.1. Entity Operator の設定プロパティー

Kafka.specentityOperator プロパティーを使用して Entity Operator を設定します。

entityOperator プロパティーでは複数のサブプロパティーがサポートされます。

  • tlsSidecar
  • topicOperator
  • userOperator
  • template

tlsSidecar プロパティーには、ZooKeeper との通信に使用される TLS サイドカーコンテナーの設定が含まれます。

template プロパティーには、ラベル、アノテーション、アフィニティー、および容認 (Toleration) などの Entity Operator Pod の設定が含まれます。テンプレートの設定に関する詳細は、「OpenShift リソースのカスタマイズ」 を参照してください。

topicOperator プロパティーには、Topic Operator の設定が含まれます。このオプションがないと、Entity Operator は Topic Operator なしでデプロイされます。

userOperator プロパティーには、User Operator の設定が含まれます。このオプションがないと、Entity Operator は User Operator なしでデプロイされます。

Entity Operator の設定に使用されるプロパティーに関する詳細は EntityUserOperatorSpec schema reference を参照してください。

両方の Operator を有効にする基本設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    topicOperator: {}
    userOperator: {}

topicOperator および userOperator に空のオブジェクト ({}) が使用された場合、すべてのプロパティーでデフォルト値が使用されます。

topicOperator および userOperator プロパティーの両方がない場合、Entity Operator はデプロイされません。

2.1.2.2. Topic Operator 設定プロパティー

Topic Operator デプロイメントは、topicOperator オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。

watchedNamespace
Topic Operator によって KafkaTopics が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。
reconciliationIntervalSeconds
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルト は 120 です。
zookeeperSessionTimeoutSeconds
ZooKeeper セッションのタイムアウト (秒単位)。デフォルトは 18 です。
topicMetadataMaxAttempts
Kafka からトピックメタデータの取得を試行する回数。各試行の間隔は、指数バックオフとして定義されます。パーティションまたはレプリカの数によって、トピックの作成に時間がかかる可能性がある場合は、この値を大きくすることを検討してください。デフォルトは 6 です。
image
image プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、image を参照してください。
resources
resources プロパティーを使用すると、Topic Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、resources を参照してください。
ログ
logging プロパティーは、Topic Operator のロギングを設定します。詳細は ログ を参照してください。

Topic Operator の設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
    # ...

2.1.2.3. User Operator 設定プロパティー

User Operator デプロイメントは、userOperator オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。

watchedNamespace
User Operator によって KafkaUsers が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。
reconciliationIntervalSeconds
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルト は 120 です。
zookeeperSessionTimeoutSeconds
ZooKeeper セッションのタイムアウト (秒単位)。デフォルトは 18 です。
image
image プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、image を参照してください。
resources
resources プロパティーを使用すると、User Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、resources を参照してください。
ログ
logging プロパティーは、User Operator のロギングを設定します。詳細は ログ を参照してください。
secretPrefix
secretPrefix プロパティーは、KafkaUser リソースから作成されたすべての Secret の名前にプレフィックスを追加します。例えば、STRIMZI_SECRET_PREFIX=kafka-とすると、すべてのシークレット名の前にkafka-を付けることができます。そのため、my-user という名前の KafkaUser は、kafka-my-user という名前の Secret を作成します。

User Operator の設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    userOperator:
      watchedNamespace: my-user-namespace
      reconciliationIntervalSeconds: 60
    # ...

2.1.3. Kafka および ZooKeeper のストレージタイプ

Kafka および ZooKeeper はステートフルなアプリケーションであるため、データをディスクに格納する必要があります。AMQ Streams では、3 つのタイプのストレージがサポートされます。

  • 一時ストレージ
  • 永続ストレージ
  • JBOD ストレージ
注記

JBOD ストレージは Kafka でサポートされ、ZooKeeper ではサポートされていません。

Kafka リソースを設定する場合、Kafka ブローカーおよび対応する ZooKeeper ノードによって使用されるストレージのタイプを指定できます。以下のリソースの storage プロパティーを使用して、ストレージタイプを設定します。

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper

ストレージタイプは type フィールドで設定されます。

警告

Kafka クラスターをデプロイした後に、ストレージタイプを変更することはできません。

その他のリソース

2.1.3.1. データストレージに関する留意事項

効率的なデータストレージインフラストラクチャーは、AMQ Streams のパフォーマンスを最適化するために不可欠です。

ブロックストレージが必要です。NFS などのファイルストレージは、Kafka では機能しません。

ブロックストレージには、以下のいずれかのオプションを選択します。

注記

AMQ Streams には OpenShift の raw ブロックボリュームは必要ありません。

2.1.3.1.1. ファイルシステム

XFS ファイルシステムを使用するようにストレージシステムを設定することが推奨されます。AMQ Streams は ext4 ファイルシステムとも互換性がありますが、最適化するには追加の設定が必要になることがあります。

2.1.3.1.2. Apache Kafka および ZooKeeper ストレージ

Apache Kafka と ZooKeeper には別々のディスクを使用します。

3 つのタイプのデータストレージがサポートされます。

  • 一時データストレージ (開発用のみで推奨されます)
  • 永続データストレージ
  • JBOD (Just a Bunch of Disks、Kafka のみに適しています)

詳細は「Kafka および ZooKeeper ストレージ」を参照してください。

ソリッドステートドライブ (SSD) は必須ではありませんが、複数のトピックに対してデータが非同期的に送受信される大規模なクラスターで Kafka のパフォーマンスを向上させることができます。SSD は、高速で低レイテンシーのデータアクセスが必要な ZooKeeper で特に有効です。

注記

Kafka と ZooKeeper の両方にデータレプリケーションが組み込まれているため、複製されたストレージのプロビジョニングは必要ありません。

2.1.3.2. 一時ストレージ

一時ストレージは emptyDir ボリュームを使用してデータを保存します。一時ストレージを使用するには、type フィールドを ephemeral に設定します。

重要

emptyDir ボリュームは永続的ではなく、保存されたデータは Pod の再起動時に失われます。新規 Pod の起動後に、クラスターの他のノードからすべてのデータを復元する必要があります。一時ストレージは、単一ノードの ZooKeeper クラスターやレプリケーション係数が 1 の Kafka トピックでの使用には適していません。この設定により、データが失われます。

一時ストレージの例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    storage:
      type: ephemeral
    # ...
  zookeeper:
    # ...
    storage:
      type: ephemeral
    # ...

2.1.3.2.1. ログディレクトリー

一時ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。

/var/lib/kafka/data/kafka-logIDX

IDX は、Kafka ブローカーポッドインデックスです。たとえば、/var/lib/kafka/data/kafka-log0 のようになります。

2.1.3.3. 永続ストレージ

永続ストレージは Persistent Volume Claim (永続ボリューム要求、PVC) を使用して、データを保存するための永続ボリュームをプロビジョニングします。永続ボリューム要求を使用すると、ボリュームのプロビジョニングを行う ストレージクラス に応じて、さまざまなタイプのボリュームをプロビジョニングできます。永続ボリューム要求と使用できるデータタイプには、多くのタイプの SAN ストレージやローカル永続ボリューム などがあります。

永続ストレージを使用するには、typepersistent-claim に設定する必要があります。永続ストレージでは、追加の設定オプションがサポートされます。

id (任意)
ストレージ ID 番号。このオプションは、JBOD ストレージ宣言で定義されるストレージボリュームには必須です。デフォルトは 0 です。
size (必須)
永続ボリューム要求のサイズを定義します (例: 1000Gi)。
class (任意)
動的ボリュームプロビジョニングに使用する OpenShift の ストレージクラス
selector (任意)
使用する特定の永続ボリュームを選択できます。このようなボリュームを選択するラベルを表す key:value ペアが含まれます。
deleteClaim (任意)
クラスターのアンデプロイ時に永続ボリューム要求を削除する必要があるかどうかを指定するブール値。デフォルトは false です。
警告

既存の AMQ Streams クラスターで永続ボリュームのサイズを増やすことは、永続ボリュームのサイズ変更をサポートする OpenShift バージョンでのみサポートされます。サイズを変更する永続ボリュームには、ボリューム拡張をサポートするストレージクラスを使用する必要があります。ボリューム拡張をサポートしないその他のバージョンの OpenShift およびストレージクラスでは、クラスターをデプロイする前に必要なストレージサイズを決定する必要があります。既存の永続ボリュームのサイズを縮小することはできません。

size が 1000Gi の永続ストレージ設定の例 (抜粋)

# ...
storage:
  type: persistent-claim
  size: 1000Gi
# ...

以下の例は、ストレージクラスの使用例を示しています。

特定のストレージクラスを指定する永続ストレージ設定の例 (抜粋)

# ...
storage:
  type: persistent-claim
  size: 1Gi
  class: my-storage-class
# ...

最後に、selector を使用して特定のラベルが付いた永続ボリュームを選択し、SSD などの必要な機能を提供できます。

セレクターを指定する永続ストレージ設定の例 (抜粋)

# ...
storage:
  type: persistent-claim
  size: 1Gi
  selector:
    hdd-type: ssd
  deleteClaim: true
# ...

2.1.3.3.1. ストレージクラスのオーバーライド

デフォルトのストレージクラスを使用する代わりに、1 つ以上の Kafka ブローカー または ZooKeeper ノードに異なるストレージクラスを指定できます。これは、ストレージクラスが、異なるアベイラビリティーゾーンやデータセンターに制限されている場合などに便利です。この場合、overrides フィールドを使用できます。

以下の例では、デフォルトのストレージクラスの名前は my-storage-class になります。

ストレージクラスのオーバーライドを使用した AMQ Streams クラスターの例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  labels:
    app: my-cluster
  name: my-cluster
  namespace: myproject
spec:
  # ...
  kafka:
    replicas: 3
    storage:
      deleteClaim: true
      size: 100Gi
      type: persistent-claim
      class: my-storage-class
      overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
  # ...
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: true
      size: 100Gi
      type: persistent-claim
      class: my-storage-class
      overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
  # ...

overrides プロパティーが設定され、ボリュームによって以下のストレージクラスが使用されます。

  • ZooKeeper ノード 0 の永続ボリュームでは my-storage-class-zone-1a が使用されます。
  • ZooKeeper ノード 1 の永続ボリュームでは my-storage-class-zone-1b が使用されます。
  • ZooKeeepr ノード 2 の永続ボリュームでは my-storage-class-zone-1c が使用されます。
  • Kafka ブローカー 0 の永続ボリュームでは my-storage-class-zone-1a が使用されます。
  • Kafka ブローカー 1 の永続ボリュームでは my-storage-class-zone-1b が使用されます。
  • Kafka ブローカー 2 の永続ボリュームでは my-storage-class-zone-1c が使用されます。

現在、overrides プロパティーは、ストレージクラスの設定をオーバーライドするためのみに使用されます。他のストレージ設定フィールドのオーバーライドは現在サポートされていません。ストレージ設定の他のフィールドは現在サポートされていません。

2.1.3.3.2. Persistent Volume Claim (永続ボリューム要求、PVC) の命名

永続ストレージが使用されると、以下の名前で Persistent Volume Claim (永続ボリューム要求、PVC) が作成されます。

data-cluster-name-kafka-idx
Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。
data-cluster-name-zookeeper-idx
ZooKeeper ノード Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。
2.1.3.3.3. ログディレクトリー

永続ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。

/var/lib/kafka/data/kafka-logIDX

IDX は、Kafka ブローカーポッドインデックスです。たとえば、/var/lib/kafka/data/kafka-log0 のようになります。

2.1.3.4. 永続ボリュームのサイズ変更

既存の AMQ Streams クラスターによって使用される永続ボリュームのサイズを増やすことで、ストレージ容量を増やすことができます。永続ボリュームのサイズ変更は、JBOD ストレージ設定で 1 つまたは複数の永続ボリュームが使用されるクラスターでサポートされます。

注記

永続ボリュームのサイズを拡張することはできますが、縮小することはできません。永続ボリュームのサイズ縮小は、現在 OpenShift ではサポートされていません。

前提条件

  • ボリュームのサイズ変更をサポートする OpenShift クラスター。
  • Cluster Operator が稼働している必要があります。
  • ボリューム拡張をサポートするストレージクラスを使用して作成された永続ボリュームを使用する Kafka クラスター。

手順

  1. Kafka リソースで、Kafka クラスター、ZooKeeper クラスター、またはその両方に割り当てられた永続ボリュームのサイズを増やします。

    • Kafka クラスターに割り当てられたボリュームサイズを増やすには、spec.kafka.storage プロパティーを編集します。
    • ZooKeeper クラスターに割り当てたボリュームサイズを増やすには、spec.zookeeper.storage プロパティーを編集します。

      たとえば、ボリュームサイズを 1000Gi から 2000Gi に増やすには、以下のように編集します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        kafka:
          # ...
          storage:
            type: persistent-claim
            size: 2000Gi
            class: my-storage-class
          # ...
        zookeeper:
          # ...
  2. リソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE

    OpenShift では、Cluster Operator からの要求に応じて、選択された永続ボリュームの容量が増やされます。サイズ変更が完了すると、サイズ変更された永続ボリュームを使用するすべての Pod が Cluster Operator によって再起動されます。これは自動的に行われます。

その他のリソース

OpenShift での永続ボリュームのサイズ変更に関する詳細は、「Resizing Persistent Volumes using Kubernetes」を参照してください。

2.1.3.5. JBOD ストレージの概要

AMQ Streams で、複数のディスクやボリュームのデータストレージ設定である JBOD を使用するように設定できます。JBOD は、Kafka ブローカーのデータストレージを増やす方法の 1 つです。また、パフォーマンスを向上することもできます。

JBOD 設定は 1 つ以上のボリュームによって記述され、各ボリュームは 一時 または 永続 ボリュームのいずれかになります。JBOD ボリューム宣言のルールおよび制約は、一時および永続ストレージのルールおよび制約と同じです。たとえば、永続ストレージのボリュームをプロビジョニング後に縮小することはできません。また、type=ephemeral の場合は sizeLimit の値を変更することはできません。

2.1.3.5.1. JBOD の設定

AMQ Streams で JBOD を使用するには、ストレージ typejbod に設定する必要があります。volumes プロパティーを使用すると、JBOD ストレージアレイまたは設定を構成するディスクを記述できます。以下は、JBOD 設定例の抜粋になります。

# ...
storage:
  type: jbod
  volumes:
  - id: 0
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
  - id: 1
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
# ...

id は、JBOD ボリュームの作成後に変更することはできません。

ユーザーは JBOD 設定に対してボリュームを追加または削除できます。

2.1.3.5.2. JBOD および 永続ボリューム要求 (PVC)

永続ストレージを使用して JBOD ボリュームを宣言する場合、永続ボリューム要求 (Persistent Volume Claim、PVC) の命名スキームは以下のようになります。

data-id-cluster-name-kafka-idx
id は、Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの ID に置き換えます。
2.1.3.5.3. ログディレクトリー

JBOD ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。

/var/lib/kafka/data-id/kafka-log_idx_
id は、Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの ID に置き換えます。たとえば、/var/lib/kafka/data-0/kafka-log0 のようになります。

2.1.3.6. JBOD ストレージへのボリュームの追加

この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターにボリュームを追加する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。

注記

以前使用され、削除された id の下に新規ボリュームを追加する場合、以前使用された PersistentVolumeClaims が必ず削除されているよう確認する必要があります。

前提条件

  • OpenShift クラスター。
  • 稼働中の Cluster Operator。
  • JBOD ストレージのある Kafka クラスター。

手順

  1. Kafka リソースの spec.kafka.storage.volumes プロパティーを編集します。新しいボリュームを volumes アレイに追加します。たとえば、id が 2 の新しいボリュームを追加します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 1
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 2
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE
  3. 新しいトピックを作成するか、既存のパーティションを新しいディスクに再度割り当てします。

その他のリソース

トピックの再割り当てに関する詳細は 「パーティションの再割り当て」 を参照してください。

2.1.3.7. JBOD ストレージからのボリュームの削除

この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターからボリュームを削除する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。JBOD ストレージには、常に 1 つのボリュームが含まれている必要があります。

重要

データの損失を避けるには、ボリュームを削除する前にすべてのパーティションを移動する必要があります。

前提条件

  • OpenShift クラスター。
  • 稼働中の Cluster Operator。
  • 複数のボリュームがある JBOD ストレージのある Kafka クラスター。

手順

  1. 削除するディスクからすべてのパーティションを再度割り当てます。削除するディスクに割り当てられたままになっているパーティションのデータは削除される可能性があります。
  2. Kafka リソースの spec.kafka.storage.volumes プロパティーを編集します。volumes アレイから 1 つまたは複数のボリュームを削除します。たとえば、ID が 12 のボリュームを削除します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  3. リソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE

関連情報

トピックの再割り当てに関する詳細は 「パーティションの再割り当て」 を参照してください。

2.1.4. クラスターのスケーリング

2.1.4.1. Kafka クラスターのスケーリング

2.1.4.1.1. ブローカーのクラスターへの追加

トピックのスループットを向上させる主な方法は、そのトピックのパーティション数を増やすことです。これにより、追加のパーティションによってクラスター内の異なるブローカー間でトピックの負荷が共有されます。ただし、各ブローカーが特定のリソース (通常は I/O) によって制約される場合、パーティションを増やしてもスループットは向上しません。代わりに、ブローカーをクラスターに追加する必要があります。

追加のブローカーをクラスターに追加する場合、Kafka ではパーティションは自動的に割り当てられません。既存のブローカーから新規のブローカーに移動するパーティションを決定する必要があります。

すべてのブローカー間でパーティションが再分散されたら、各ブローカーのリソース使用率が低下するはずです。

2.1.4.1.2. クラスターからのブローカーの削除

AMQ Streams では StatefulSets を使用してブローカー Pod を管理されるため、あらゆる Pod を削除できるわけではありません。クラスターから削除できるのは、番号が最も大きい 1 つまたは複数の Pod のみです。たとえば、12 個のブローカーがあるクラスターでは、Pod の名前は cluster-name-kafka-0 から cluster-name-kafka-11 になります。1 つのブローカー分をスケールダウンする場合、cluster-name-kafka-11 が削除されます。

クラスターからブローカーを削除する前に、そのブローカーにパーティションが割り当てられていないことを確認します。また、使用が停止されたブローカーの各パーティションを引き継ぐ、残りのブローカーを決める必要もあります。ブローカーに割り当てられたパーティションがなければ、クラスターを安全にスケールダウンできます。

2.1.4.2. パーティションの再割り当て

現在、Topic Operator はレプリカを別のブローカーに再割当てすることをサポートしないため、ブローカー Pod に直接接続してレプリカをブローカーに再割り当てする必要があります。

ブローカー Pod 内では、kafka-reassign-partitions.sh ユーティリティーを使用してパーティションを別のブローカーに再割り当てできます。

これには、以下の 3 つのモードがあります。

--generate
トピックとブローカーのセットを取り、再割り当て JSON ファイル を生成します。これにより、トピックのパーティションがブローカーに割り当てられます。これはトピック全体で動作するため、一部のトピックのパーティションを再度割り当てする場合は使用できません。
--execute
再割り当て JSON ファイル を取り、クラスターのパーティションおよびブローカーに適用します。その結果、パーティションを取得したブローカーは、パーティションリーダーのフォロワーになります。新規ブローカーが ISR (In-Sync Replica、同期レプリカ) に参加できたら、古いブローカーはフォロワーではなくなり、そのレプリカが削除されます。
--verify
--execute ステップと同じ再割り当て JSON ファイルを使用して、--verify は、ファイル内のすべてのパーティションが目的のブローカーに移動されたかどうかをチェックします。再割り当てが完了すると、--verify は有効な スロットル も削除します。スロットルを削除しないと、再割り当てが完了した後もクラスターは影響を受け続けます。

クラスターでは、1 度に 1 つの再割当てのみを実行でき、実行中の再割当てをキャンセルすることはできません。再割り当てをキャンセルする必要がある場合は、割り当てが完了するのを待ってから別の再割り当てを実行し、最初の再割り当ての結果を元に戻します。kafka-reassign-partitions.sh によって、元に戻すための再割り当て JSON が出力の一部として生成されます。大規模な再割り当ては、進行中の再割り当てを停止する必要がある場合に備えて、複数の小さな再割り当てに分割するようにしてください。

2.1.4.2.1. 再割り当て JSON ファイル

再割り当て JSON ファイル には特定の構造があります。

{
  "version": 1,
  "partitions": [
    <PartitionObjects>
  ]
}

ここで <PartitionObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ]
}
注記

Kafka は "log_dirs" プロパティーもサポートしますが、AMQ Streams では使用しないでください。

以下は、トピック topic-a のパーティション 4 をブローカー 247 に割り当て、トピック topic-b のパーティション 2 をブローカー 157 に割り当てる再割り当て JSON ファイルの例です。

{
  "version": 1,
  "partitions": [
    {
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7]
    },
    {
      "topic": "topic-b",
      "partition": 2,
      "replicas": [1,5,7]
    }
  ]
}

JSON に含まれていないパーティションは変更されません。

2.1.4.2.2. JBOD ボリューム間でのパーティションの再割り当て

Kafka クラスターで JBOD ストレージを使用する場合は、特定のボリュームとログディレクトリー (各ボリュームに単一のログディレクトリーがある) との間でパーティションを再割り当てを選択することができます。パーティションを特定のボリュームに再割り当てするには、再割り当て JSON ファイルで log_dirs オプションを <PartitionObjects> に追加します。

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ],
  "log_dirs": [ <AssignedLogDirs> ]
}

log_dirs オブジェクトに含まれるログディレクトリーの数は、replicas オブジェクトで指定されるレプリカ数と同じである必要があります。値は、ログディレクトリーへの絶対パスか、any キーワードである必要があります。

以下は例になります。

{
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7].
      "log_dirs": [ "/var/lib/kafka/data-0/kafka-log2", "/var/lib/kafka/data-0/kafka-log4", "/var/lib/kafka/data-0/kafka-log7" ]
}

2.1.4.3. 再割り当て JSON ファイルの生成

この手順では、kafka-reassign-partitions.sh ツールを使用して、指定のトピックセットすべてのパーティションを再割り当てする再割り当て JSON ファイルを生成する方法を説明します。

前提条件

  • 稼働中の Cluster Operator。
  • Kafka リソース。
  • パーティションを再割り当てするトピックセット。

手順

  1. 移動するトピックを一覧表示する topics.json という名前の JSON ファイルを準備します。これには、以下の構造が必要です。

    {
      "version": 1,
      "topics": [
        <TopicObjects>
      ]
    }

    ここで <TopicObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。

    {
      "topic": <TopicName>
    }

    たとえば、topic-atopic-b のすべてのパーティションを再割り当てするには、以下のような topics.json ファイルを準備する必要があります。

    {
      "version": 1,
      "topics": [
        { "topic": "topic-a"},
        { "topic": "topic-b"}
      ]
    }
  2. topics.json ファイルをブローカー Pod の 1 つにコピーします。

    cat topics.json | oc exec -c kafka <BrokerPod> -i -- \
      /bin/bash -c \
      'cat > /tmp/topics.json'
  3. kafka-reassign-partitions.sh コマンドを使用して、再割り当て JSON を生成します。

    oc exec <BrokerPod> -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list <BrokerList> \
      --generate

    たとえば、 topic-a および topic-b のすべてのパーティションをブローカー 4 および 7 に移動する場合は、以下を実行しmす。

    oc exec <BrokerPod> -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list 4,7 \
      --generate

2.1.4.4. 手動による再割り当て JSON ファイルの作成

特定のパーティションを移動したい場合は、再割り当て JSON ファイルを手動で作成できます。

2.1.4.5. 再割り当てスロットル

パーティションの再割り当てには、ブローカーの間で大量のデータを転送する必要があるため、処理が遅くなる可能性があります。クライアントへの悪影響を防ぐため、再割り当て処理をスロットルで調整することができます。これにより、再割り当ての完了に時間がかかる可能性があります。

  • スロットルが低すぎると、新たに割り当てられたブローカーは公開されるレコードに遅れずに対応することはできず、再割り当ては永久に完了しません。
  • スロットルが高すぎると、クライアントに影響します。

たとえば、プロデューサーの場合は、承認待ちが通常のレイテンシーよりも大きくなる可能性があります。コンシューマーの場合は、ポーリング間のレイテンシーが大きいことが原因でスループットが低下する可能性があります。

2.1.4.6. Kafka クラスターのスケールアップ

この手順では、Kafka クラスターでブローカーの数を増やす方法を説明します。

前提条件

  • 既存の Kafka クラスター。
  • 拡大されたクラスターでパーティションをブローカーに再割り当てする方法が記述される reassignment.json というファイル名の 再割り当て JSON ファイル

手順

  1. kafka.spec.kafka.replicas 設定オプションを増やして、新しいブローカーを必要なだけ追加します。
  2. 新しいブローカー Pod が起動したことを確認します。
  3. 後でコマンドを実行するブローカー Pod に reassignment.json ファイルをコピーします。

    cat reassignment.json | \
      oc exec broker-pod -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'

    以下は例になります。

    cat reassignment.json | \
      oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'
  4. 同じブローカー Pod から kafka-reassign-partitions.sh コマンドラインツールを使用して、パーティションの再割り当てを実行します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --execute

    レプリケーションをスロットルで調整する場合、--throttle とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下は例になります。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute

    このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。

  5. 再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下は例になります。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
  6. ブローカー Pod のいずれかから kafka-reassign-partitions.sh コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute オプションの代わりに --verify オプションを使用します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify

    以下に例を示します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
  7. --verify コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な --verify によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。

2.1.4.7. Kafka クラスターのスケールダウン

この手順では、Kafka クラスターでブローカーの数を減らす方法を説明します。

前提条件

  • 既存の Kafka クラスター。
  • 最も番号の大きい Pod(s) のブローカーが削除された後にクラスターのブローカーにパーティションを再割り当てする方法が記述されている、reassignment.json という名前の 再割り当て JSON ファイル

手順

  1. 後でコマンドを実行するブローカー Pod に reassignment.json ファイルをコピーします。

    cat reassignment.json | \
      oc exec broker-pod -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'

    以下は例になります。

    cat reassignment.json | \
      oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'
  2. 同じブローカー Pod から kafka-reassign-partitions.sh コマンドラインツールを使用して、パーティションの再割り当てを実行します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --execute

    レプリケーションをスロットルで調整する場合、--throttle とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下は例になります。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute

    このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。

  3. 再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下は例になります。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
  4. ブローカー Pod のいずれかから kafka-reassign-partitions.sh コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute オプションの代わりに --verify オプションを使用します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify

    以下に例を示します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
  5. --verify コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な --verify によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。
  6. すべてのパーティションの再割り当てが終了すると、削除されるブローカーはクラスター内のいずれのパーティションにも対応しないはずです。これは、ブローカーのデータログディレクトリーにライブパーティションのログが含まれていないことを確認すると検証できます。ブローカーのログディレクトリーに、拡張正規表現 [a-zA-Z0-9.-]+\.[a-z0-9]+-delete$ と一致しないディレクトリーが含まれる場合、ブローカーにはライブパーティションがあるため、停止してはなりません。

    これを確認するには、以下のコマンドを実行します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      /bin/bash -c \
      "ls -l /var/lib/kafka/kafka-log_<N>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"

    N は削除された Pod(s) の数に置き換えます。

    上記のコマンドによって出力が生成される場合、ブローカーにはライブパーティションがあります。この場合、再割り当てが終了していないか、再割り当て JSON ファイルが適切ではありません。

  7. ブローカーにライブパーティションがないことが確認できたら、Kafka リソースの Kafka.spec.kafka.replicas を編集できます。これにより、StatefulSet がスケールダウンされ、番号が最も大きいブローカー Pod(s) が削除されます。

2.1.5. ローリングアップデートのメンテナンス時間枠

メンテナンス時間枠によって、Kafka および ZooKeeper クラスターの特定のローリングアップデートが便利な時間に開始されるようにスケジュールできます。

2.1.5.1. メンテナンス時間枠の概要

ほとんどの場合、Cluster Operator は対応する Kafka リソースの変更に対応するために Kafka または ZooKeeper クラスターのみを更新します。これにより、Kafka リソースの変更を適用するタイミングを計画し、Kafka クライアントアプリケーションへの影響を最小限に抑えることができます。

ただし、Kafka リソースの変更がなくても Kafka および ZooKeeper クラスターの更新が発生することがあります。たとえば、Cluster Operator によって管理される CA (認証局) 証明書が期限切れ直前である場合にローリング再起動の実行が必要になります。

サービスの 可用性 は Pod のローリング再起動による影響を受けないはずですが (ブローカーおよびトピックの設定が適切である場合)、Kafka クライアントアプリケーションの パフォーマンス は影響を受ける可能性があります。メンテナンス時間枠によって、Kafka および ZooKeeper クラスターのこのような自発的なアップデートが便利な時間に開始されるようにスケジュールできます。メンテナンス時間枠がクラスターに設定されていない場合は、予測できない高負荷が発生する期間など、不便な時間にこのような自発的なローリングアップデートが行われる可能性があります。

2.1.5.2. メンテナンス時間枠の定義

Kafka.spec.maintenanceTimeWindows プロパティーに文字列の配列を入力して、メンテナンス時間枠を設定します。各文字列は、UTC (協定世界時、Coordinated Universal Time) であると解釈される cron 式 です。UTC は実用的にはグリニッジ標準時と同じです。

以下の例では、日、月、火、水、および木曜日の午前 0 時に開始し、午前 1 時 59 分 (UTC) に終わる、単一のメンテナンス時間枠が設定されます。

# ...
maintenanceTimeWindows:
  - "* * 0-1 ? * SUN,MON,TUE,WED,THU *"
# ...

実際には、必要な CA 証明書の更新が設定されたメンテナンス時間枠内で完了できるように、Kafka リソースの Kafka.spec.clusterCa.renewalDays および Kafka.spec.clientsCa.renewalDays プロパティーとともにメンテナンス期間を設定する必要があります。

注記

AMQ Streams では、指定の期間にしたがってメンテナンス操作を正確にスケジュールしません。その代わりに、調整ごとにメンテナンス期間が現在「オープン」であるかどうかを確認します。これは、特定の時間枠内でのメンテナンス操作の開始が、最大で Cluster Operator の調整が行われる間隔の長さ分、遅れる可能性があることを意味します。したがって、メンテナンス時間枠は最低でもその間隔の長さにする必要があります。

その他のリソース

2.1.5.3. メンテナンス時間枠の設定

サポートされるプロセスによってトリガーされるローリングアップデートのメンテナンス時間枠を設定できます。

前提条件

  • OpenShift クラスターが必要です。
  • Cluster Operator が稼働している必要があります。

手順

  1. Kafka リソースの maintenanceTimeWindows プロパティー を追加または編集します。たとえば、0800 から 1059 までと、1400 から 1559 までのメンテナンスを可能にするには、以下のように maintenanceTimeWindows を設定します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      maintenanceTimeWindows:
        - "* * 8-10 * * ?"
        - "* * 14-15 * * ?"
  2. リソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE

関連情報

ローリングアップデートの実行:

2.1.6. ターミナルからの ZooKeeper への接続

ほとんどの Kafka CLI ツールは Kafka に直接接続できます。したがって、通常の状況では ZooKeeper に接続する必要はありません。ZooKeeper サービスは暗号化および認証でセキュア化され、AMQ Streams の一部でない外部アプリケーションでの使用は想定されていません。

ただし、ZooKeeper への接続を必要とする Kafka CLI ツールを使用する場合は、ZooKeeper コンテナー内でターミナルを使用し、ZooKeeper アドレスとして localhost:12181 に接続できます。

前提条件

  • OpenShift クラスターが利用できる必要があります。
  • Kafka クラスターが稼働している必要があります。
  • Cluster Operator が稼働している必要があります。

手順

  1. OpenShift コンソールを使用してターミナルを開くか、CLI から exec コマンドを実行します。

    以下は例になります。

    oc exec -ti my-cluster-zookeeper-0 -- bin/kafka-topics.sh --list --zookeeper localhost:12181

    必ず localhost:12181 を使用してください。

    ZooKeeper に対して Kafka コマンドを実行できるようになりました。

2.1.7. Kafka ノードの手動による削除

この手順では、OpenShift アノテーションを使用して既存の Kafka ノードを削除する方法を説明します。Kafka ノードの削除するには、Kafka ブローカーが稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim は自動的に再作成されます。

警告

PersistentVolumeClaim を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。

前提条件

以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。

手順

  1. 削除する Pod の名前を見つけます。

    たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-kafka-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。

  2. OpenShift で Pod リソースにアノテーションを付けます。

    oc annotate を使用します。

    oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
  3. 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。

2.1.8. ZooKeeper ノードの手動による削除

この手順では、OpenShift アノテーションを使用して既存の ZooKeeper ノードを削除する方法を説明します。ZooKeeper ノードの削除するには、ZooKeeper が稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim は自動的に再作成されます。

警告

PersistentVolumeClaim を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。

前提条件

以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。

手順

  1. 削除する Pod の名前を見つけます。

    たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-zookeeper-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。

  2. OpenShift で Pod リソースにアノテーションを付けます。

    oc annotate を使用します。

    oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
  3. 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。

2.1.9. Kafka クラスターリソースのリスト

以下のリソースは、OpenShift クラスターの Cluster Operator によって作成されます。

共有リソース

cluster-name-cluster-ca
クラスター通信の暗号化に使用されるクラスター CA プライベートキーのあるシークレット。
cluster-name-cluster-ca-cert
クラスター CA 公開鍵のあるシークレット。このキーは、Kafka ブローカーのアイデンティティーの検証に使用できます。
cluster-name-clients-ca
ユーザー証明書に署名するために使用されるクライアント CA 秘密鍵のあるシークレット。
cluster-name-clients-ca-cert
クライアント CA 公開鍵のあるシークレット。このキーは、Kafka ユーザーのアイデンティティーの検証に使用できます。
cluster-name-cluster-operator-certs
Kafka および ZooKeeper と通信するための Cluster Operator キーのあるシークレット。

ZooKeeper ノード

cluster-name-zookeeper
ZooKeeper ノード Pod の管理を担当する StatefulSet。
cluster-name-zookeeper-idx
Zookeeper StatefulSet によって作成された Pod。
cluster-name-zookeeper-nodes
DNS が ZooKeeper Pod の IP アドレスを直接解決するのに必要なヘッドレスサービス。
cluster-name-zookeeper-client
Kafka ブローカーがクライアントとして ZooKeeper ノードに接続するために使用するサービス。
cluster-name-zookeeper-config
ZooKeeper 補助設定が含まれ、ZooKeeper ノード Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-zookeeper-nodes
ZooKeeper ノードキーがあるシークレット。
cluster-name-zookeeper
Zookeeper ノードで使用されるサービスアカウント。
cluster-name-zookeeper
ZooKeeper ノードに設定された Pod の Disruption Budget。
cluster-name-network-policy-zookeeper
ZooKeeper サービスへのアクセスを管理するネットワークポリシー。
data-cluster-name-zookeeper-idx
ZooKeeper ノード Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。

Kafka ブローカー

cluster-name-kafka
Kafka ブローカー Pod の管理を担当する StatefulSet。
cluster-name-kafka-idx
Kafka StatefulSet によって作成された Pod。
cluster-name-kafka-brokers
DNS が Kafka ブローカー Pod の IP アドレスを直接解決するのに必要なサービス。
cluster-name-kafka-bootstrap
サービスは、OpenShift クラスター内から接続する Kafka クライアントのブートストラップサーバーとして使用できます。
cluster-name-kafka-external-bootstrap
OpenShift クラスター外部から接続するクライアントのブートストラップサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。リスナー名が external でポートが 9094 の場合、後方互換性のために古いサービス名が使用されます。
cluster-name-kafka-pod-id
トラフィックを OpenShift クラスターの外部から個別の Pod にルーティングするために使用されるサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。リスナー名が external でポートが 9094 の場合、後方互換性のために古いサービス名が使用されます。
cluster-name-kafka-external-bootstrap
OpenShift クラスターの外部から接続するクライアントのブートストラップルート。このリソースは、外部リスナーが有効になっていて、タイプ route に設定されている場合にのみ作成されます。リスナー名が external でポートが 9094 の場合、後方互換性のために古いルート名が使用されます。
cluster-name-kafka-pod-id
OpenShift クラスターの外部から個別の Pod へのトラフィックに対するルート。このリソースは、外部リスナーが有効でタイプが route に設定されている場合にのみ作成されます。リスナー名が external でポートが 9094 の場合、後方互換性のために古いルート名が使用されます。
cluster-name-kafka-listener-name-bootstrap
OpenShift クラスター外部から接続するクライアントのブートストラップサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。新しいサービス名はその他すべての外部リスナーに使用されます。
cluster-name-kafka-listener-name-pod-id
トラフィックを OpenShift クラスターの外部から個別の Pod にルーティングするために使用されるサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。新しいサービス名はその他すべての外部リスナーに使用されます。
cluster-name-kafka-listener-name-bootstrap
OpenShift クラスターの外部から接続するクライアントのブートストラップルート。このリソースは、外部リスナーが有効でタイプが route に設定されている場合にのみ作成されます。新しいルート名はその他すべての外部リスナーに使用されます。
cluster-name-kafka-listener-name-pod-id
OpenShift クラスターの外部から個別の Pod へのトラフィックに対するルート。このリソースは、外部リスナーが有効でタイプが route に設定されている場合にのみ作成されます。新しいルート名はその他すべての外部リスナーに使用されます。
cluster-name-kafka-config
Kafka 補助設定が含まれ、Kafka ブローカー Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-kafka-brokers
Kafka ブローカーキーのあるシークレット。
cluster-name-kafka
Kafka ブローカーによって使用されるサービスアカウント。
cluster-name-kafka
Kafka ブローカーに設定された Pod の Disruption Budget。
cluster-name-network-policy-kafka
Kafka サービスへのアクセスを管理するネットワークポリシー。
strimzi-namespace-name-cluster-name-kafka-init
Kafka ブローカーによって使用されるクラスターロールバインディング。
cluster-name-jmx
Kafka ブローカーポートのセキュア化に使用される JMX ユーザー名およびパスワードのあるシークレット。このリソースは、Kafka で JMX が有効になっている場合にのみ作成されます。
data-cluster-name-kafka-idx
Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。
data-id-cluster-name-kafka-idx
Kafka ブローカー Pod idx のデータを保存するために使用されるボリューム id の永続ボリューム要求です。このリソースは、永続ボリュームをプロビジョニングしてデータを保存するときに、JBOD ボリュームに永続ストレージが選択された場合のみ作成されます。

Entitiy Operator

これらのリソースは、Cluster Operator を使用して Entity Operator がデプロイされる場合にのみ作成されます。

cluster-name-entity-operator
Topic および User Operator とのデプロイメント。
cluster-name-entity-operator-random-string
Entity Operator デプロイメントによって作成された Pod。
cluster-name-entity-topic-operator-config
Topic Operator の補助設定のある ConfigMap。
cluster-name-entity-user-operator-config
User Operator の補助設定のある ConfigMap。
cluster-name-entity-operator-certs
Kafka および ZooKeeper と通信するための Entity Operator キーのあるシークレット。
cluster-name-entity-operator
Entity Operator によって使用されるサービスアカウント。
strimzi-cluster-name-entity-topic-operator
Entity Topic Operator によって使用されるロールバインディング。
strimzi-cluster-name-entity-user-operator
Entity User Operator によって使用されるロールバインディング。

Kafka Exporter

これらのリソースは、Cluster Operator を使用して Kafka Exporter がデプロイされる場合にのみ作成されます。

cluster-name-kafka-exporter
Kafka Exporter でのデプロイメント。
cluster-name-kafka-exporter-random-string
Kafka Exporter デプロイメントによって作成された Pod。
cluster-name-kafka-exporter
コンシューマーラグメトリクスの収集に使用されるサービス。
cluster-name-kafka-exporter
Kafka Exporter によって使用されるサービスアカウント。

Cruise Control

これらのリソースは、Cluster Operator を使用して Cruise Control がデプロイされた場合のみ作成されます。

cluster-name-cruise-control
Cruise Control でのデプロイメント。
cluster-name-cruise-control-random-string
Cruise Control デプロイメントによって作成された Pod。
cluster-name-cruise-control-config
Cruise Control の補助設定が含まれ、Cruise Control Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-cruise-control-certs
Kafka および ZooKeeper と通信するための Cruise Control キーのあるシークレット。
cluster-name-cruise-control
Cruise Control との通信に使用されるサービス。
cluster-name-cruise-control
Cruise Control によって使用されるサービスアカウント。
cluster-name-network-policy-cruise-control
Cruise Control サービスへのアクセスを管理するネットワークポリシー。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.