検索

第8章 AMQ Streams のアップグレード

download PDF

AMQ Streams on OpenShift をバージョン 1.7 にアップグレードすると、新機能および改良された機能、パフォーマンスの向上、およびセキュリティーオプションを利用できます。

このアップグレード中に、Kafka をサポートされる最新バージョンにアップグレードします。各 Kafka リリースによって、AMQ Streams デプロイメントに新機能、改善点、およびバグ修正が導入されます。

新しいバージョンで問題が発生した場合は、AMQ Streams を以前のバージョンに ダウングレード できます。

リリースされた AMQ Streams バージョンの一覧は、Red Hat カスタマーポータルの「製品ダウンロード」で確認できます。

アップグレードパス

2 つのアップグレードパスが可能です。

インクリメント
AMQ Streams を以前のマイナーバージョンからバージョン 1.7 にアップグレードします。
マルチバージョン

AMQ Streams を 1 回で古いバージョンからバージョン 1.7 にアップグレードします (1 つ以上の中間バージョンを飛ばします)。

たとえば、AMQ Streams 1.5 から直接 1.7 にアップグレードします。

Kafka バージョンのサポート

Kafka バージョン の表には、AMQ Streams 1.7 でサポートされる Kafka バージョンが記載されています。この表では以下に注意してください。

  • 最新の Kafka バージョンは実稼働でサポートされます。
  • 最新バージョンより前の Kafka バージョンは、AMQ Streams 1.7 へのアップグレードの目的でのみサポートされます。

本章で説明するアップグレード手順を実行する前に、アップグレードする Kafka バージョンを特定します。

注記

ご使用のバージョンの AMQ Streams によってサポートされれば、上位バージョンの Kafka にアップグレードできます。サポートされる下位バージョンの Kafka にダウングレードできる場合もあります。

ダウンタイムと可用性

高可用性に対してトピックが設定されている場合、AMQ Streams をアップグレードしても、これらのトピックからデータをパブリッシュおよび読み取るコンシューマーとプロデューサーのダウンタイムは発生しません。高可用性トピックのレプリケーション係数は 3 以上であり、パーティションはブローカー間で均等に分散されます。

AMQ Streams をアップグレードするとローリングアップデートがトリガーされ、プロセスのさまざまな段階ですべてのブローカーが順に再起動されます。ローリングアップデート中に、すべてのブローカーがオンライン状態ではないため、クラスター全体の可用性 は一時的に低下します。クラスターの可用性が低下すると、ブローカーの障害によってメッセージが失われる可能性が高くなります。

8.1. AMQ Streams および Kafka のアップグレード

AMQ Streams のアップグレードは 3 段階のプロセスで行います。ダウンタイムなしでブローカーとクライアントをアップグレードするには、以下の順序でアップグレード手順を 必ず 完了してください。

  1. Cluster Operator を新しい AMQ Streams バージョンに更新します。

    実施する手法は、Cluster Operator のデプロイ 方法によって異なります。

    • インストール用の YAML ファイルを使用して Cluster Operator をデプロイした場合は、「Cluster Operator のアップグレード」の説明に従って、Operator のインストールファイルを変更してアップグレードを実行します。
    • OperatorHub から Cluster Operator をデプロイした場合は、Operator Lifecycle Manager (OLM) を使用して AMQ Streams Operator の更新チャネルを新しい AMQ Streams バージョンに変更します。

      選択したアップグレードストラテジーに応じて、チャネルの更新後に以下のいずれかを実行します。

      • 自動アップグレードが開始されます。
      • 手動アップグレードでは、インストールを開始する前に承認が必要です。

        OperatorHub を使用した Operator のアップグレードについての詳細は、OpenShift ドキュメントの「Upgrading installed Operators」を参照してください。

  2. すべての Kafka ブローカーとクライアントアプリケーションを、サポートされる最新の Kafka バージョンにアップグレードします。

  3. 該当する場合は、以下のタスクを実行します。

    1. 既存のカスタムリソースを更新して、非推奨になったカスタムリソースプロパティーを処理します。

    2. GenericKafkaListener スキーマを使用するようにリスナーを更新します。

任意手順: Incremental Cooperative Rebalance のアップグレード

パーティションの再分散に Incremental Cooperative Rebalance プロトコルを使用するために、コンシューマーと Kafka Streams アプリケーションのアップグレードを検討します。

8.1.1. Kafka バージョン

Kafka のログメッセージ形式バージョンとブローカー間のプロトコルバージョンは、それぞれメッセージに追加されるログ形式バージョンとクラスターで使用される Kafka プロトコルのバージョンを指定します。正しいバージョンが使用されるようにするため、アップグレードプロセスでは、既存の Kafka ブローカーの設定変更と、クライアントアプリケーション (コンシューマーおよびプロデューサー) のコード変更が行われます。

以下の表は、Kafka バージョンの違いを示しています。

Kafka のバージョンInterbroker プロトコルのバージョンログメッセージ形式のバージョンZooKeeper のバージョン

2.6.0

2.6

2.6

3.5.8

2.7.0

2.7

2.7

3.5.8

ブローカー間のプロトコルバージョン

Kafka では、ブローカー間の通信に使用されるネットワークプロトコルはブローカー間プロトコル (Inter-broker protocol) と呼ばれます。Kafka の各バージョンには、互換性のあるバージョンのブローカー間プロトコルがあります。上記の表が示すように、プロトコルのマイナーバージョンは、通常 Kafka のマイナーバージョンと一致するように番号が増加されます。

ブローカー間プロトコルのバージョンは、Kafka リソースでクラスター全体に設定されます。これを変更するには、Kafka.spec.kafka.configinter.broker.protocol.version プロパティーを編集します。

ログメッセージ形式のバージョン

プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリース間で変更される可能性があるため、メッセージにはエンコードに使用された形式のバージョンが指定されます。ブローカーがメッセージをログに追加する前に、メッセージを新しい形式バージョンから特定の旧形式バージョンに変換するように、Kafka ブローカーを設定できます。

Kafka には、メッセージ形式のバージョンを設定する 2 通りの方法があります。

  • message.format.version プロパティーはトピックに設定されます。
  • log.message.format.version プロパティーは Kafka ブローカーに設定されます。

トピックの message.format.version のデフォルト値は、Kafka ブローカーに設定される log.message.format.version によって定義されます。トピックの message.format.version は、トピック設定を編集すると手動で設定できます。

本セクションのアップグレード作業では、メッセージ形式のバージョンが log.message.format.version によって定義されることを前提としています。

8.1.2. Cluster Operator のアップグレード

このセクションでは、AMQ Streams 1.7 を使用するように Cluster Operator デプロイメントをアップグレードする手順について説明します。

OperatorHub ではなくインストール用の YAML ファイルを使用して Cluster Operator をデプロイした場合は、次の手順に従ってください。

Cluster Operator によって管理される Kafka クラスターの可用性は、アップグレード操作による影響を受けません。

注記

特定バージョンの AMQ Streams へのアップグレード方法については、そのバージョンをサポートするドキュメントを参照してください。

8.1.2.1. Cluster Operator のアップグレード

この手順では、AMQ Streams 1.7 を使用するように Cluster Operator デプロイメントをアップグレードする方法を説明します。

前提条件

手順

  1. 既存の Cluster Operator リソース (/install/cluster-operator ディレクトリー内) に追加した設定変更を覚えておきます。すべての変更は、新しいバージョンの Cluster Operator によって上書きされます。
  2. カスタムリソースを更新して、AMQ Streams バージョン 1.7 で使用できるサポート対象の設定オプションを反映します。
  3. Cluster Operator を更新します。

    1. Cluster Operator を実行している namespace に従い、新しい Cluster Operator バージョンのインストールファイルを編集します。

      Linux の場合は、以下を使用します。

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

      MacOS の場合は、以下を使用します。

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
    2. 既存の Cluster Operator Deployment で 1 つ以上の環境変数を編集した場合、install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml ファイルを編集し、これらの環境変数を使用します。
  4. 設定を更新したら、残りのインストールリソースとともにデプロイします。

    oc replace -f install/cluster-operator

    ローリングアップデートが完了するのを待ちます。

  5. 新しい Operator バージョンがアップグレード前の Kafka バージョンをサポートしなくなった場合、「Version not found」というエラーメッセージが Cluster Operator によって返されます。そうでない場合は、エラーメッセージは返されません。

    以下に例を示します。

    "Version 2.4.0 is not supported. Supported versions are: 2.6.0, 2.6.1, 2.7.0."
    • エラーメッセージが返される場合は、新しい Cluster Operator バージョンでサポートされる Kafka バージョンにアップグレードします。

      1. Kafka カスタムリソースを編集します。
      2. spec.kafka.version プロパティーをサポートされる Kafka バージョンに変更します。
    • エラーメッセージが返されない場合は、次のステップに進みます。Kafka のバージョンを後でアップグレードします。
  6. Kafka Pod のイメージを取得して、アップグレードが正常に完了したことを確認します。

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    イメージタグには、新しい Operator のバージョンが表示されます。以下に例を示します。

    registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:{ContainerVersion}

Cluster Operator はバージョン 1.7 にアップグレードされましたが、管理するクラスターで稼働している Kafka のバージョンは変更されていません。

Cluster Operator のアップグレードの次に、Kafka のアップグレードを実行する必要があります。

8.1.3. Kafka のアップグレード

Cluster Operator を 1.7 にアップグレードした後、次にすべての Kafka ブローカーをサポートされる最新バージョンの Kafka にアップグレードします。

Kafka のアップグレードは、Kafka ブローカーのローリングアップデートによって Cluster Operator によって実行されます。

Cluster Operator は、Kafka クラスターの設定に基づいてローリングアップデートを開始します。

Kafka.spec.kafka.config に含まれるものCluster Operator によって開始されるもの

inter.broker.protocol.versionlog.message.format.version の両方

単一のローリングアップデート更新後に、inter.broker.protocol.version を手動で更新する必要があります。その後に log.message.format.version が必要です。それぞれを変更すると、さらにローリングアップデートがトリガーされます。

inter.broker.protocol.version または log.message.format.version のいずれか

2 つのローリングアップデート

inter.broker.protocol.versionlog.message.format.version の設定がない

2 つのローリングアップデート

Cluster Operator は、Kafka のアップグレードの一環として、ZooKeeper のローリングアップデートを開始します。

  • ZooKeeper バージョンが変更されなくても、単一のローリングアップデートが発生します。
  • 新しいバージョンの Kafka に新しいバージョンの ZooKeeper が必要な場合、追加のローリングアップデートが発生します。

8.1.3.1. Kafka バージョンおよびイメージマッピング

Kafka のアップグレード時に、STRIMZI_KAFKA_IMAGES 環境変数と Kafka.spec.kafka.version プロパティーの設定について考慮してください。

  • それぞれの Kafka リソースは Kafka.spec.kafka.version で設定できます。
  • Cluster Operator の STRIMZI_KAFKA_IMAGES 環境変数により、Kafka のバージョンと、指定の Kafka リソースでそのバージョンが要求されるときに使用されるイメージをマッピングできます。

    • Kafka.spec.kafka.image を設定しないと、そのバージョンのデフォルトのイメージが使用されます。
    • Kafka.spec.kafka.image を設定すると、デフォルトのイメージがオーバーライドされます。
警告

Cluster Operator は、Kafka ブローカーの想定されるバージョンが実際にイメージに含まれているかどうかを検証できません。所定のイメージが所定の Kafka バージョンに対応することを必ず確認してください。

8.1.3.2. Kafka ブローカーおよびクライアントアプリケーションのアップグレード

この手順では、AMQ Streams Kafka クラスターを最新のサポートされる Kafka バージョンにアップグレードする方法を説明します。

新しい Kafka バージョンを現在のバージョンと比較すると、新しいバージョンは ログメッセージ形式の上位バージョンブローカー間プロトコルの上位バージョン、またはその両方をサポートする可能性があります。必要に応じて、これらのバージョンをアップグレードする手順を実行します。詳細は、「Kafka バージョン」 を参照してください。

クライアントをアップグレードするストラテジーを選択する必要もあります。Kafka クライアントは、この手順の 6 でアップグレードされます。

前提条件

Kafka リソースをアップグレードするには、以下を確認します。

  • 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
  • Kafka.spec.kafka.config には、新しい Kafka バージョンでサポートされないオプションが含まれていない

手順

  1. Kafka クラスター設定を更新します。

    oc edit kafka my-cluster
  2. 設定されている場合は、Kafka.spec.kafka.configlog.message.format.version および inter.broker.protocol.version現在 の Kafka バージョンのデフォルトに設定されていることを確認してください。

    たとえば、Kafka 2.6.0 から 2.7.0 へのアップグレードは以下のようになります。

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.6"
          inter.broker.protocol.version: "2.6"
          # ...

    log.message.format.version および inter.broker.protocol.version が設定されていない場合、AMQ Streams は次のステップの Kafka バージョンへの更新後にこれらのバージョンを現在のデフォルトに自動的に更新します。

    注記

    log.message.format.version および inter.broker.protocol.version の値は、浮動小数点数として解釈されないように文字列である必要があります。

  3. Kafka.spec.kafka.version を変更し、新しい Kafka バージョンを指定します。 log.message.format.versioninter.broker.protocol.version現在 のKafka バージョンのデフォルトままにします。

    注記

    kafka.version を変更して、クラスターのすべてのブローカーがアップグレードされ、新しいブローカーバイナリーの使用が開始されるようにします。このプロセスでは、一部のブローカーは古いバイナリーを使用し、他のブローカーはすでに新しいバイナリーにアップグレードされています。inter.broker.protocol.version を変更しないと、ブローカーはアップグレード中も相互に通信を継続できます。

    たとえば、Kafka 2.6.0 から 2.7.0 へのアップグレードは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.7.0 1
        config:
          log.message.format.version: "2.6" 2
          inter.broker.protocol.version: "2.6" 3
          # ...
    1
    Kafka のバージョンが新しいバージョンに変更されます。
    2
    メッセージ形式のバージョンは変更されません。
    3
    ブローカー間のプロトコルバージョンは変更されません。
    警告

    新しい Kafka バージョンの inter.broker.protocol.version が変更した場合は、Kafka をダウングレードできません。ブローカー間プロトコルのバージョンは、__consumer_offsets に書き込まれたメッセージなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。ダウングレードされたクラスターはメッセージを理解しません。

  4. Kafka クラスターのイメージが Kafka.spec.kafka.image の Kafka カスタムリソースで定義されている場合、image を更新して、新しい Kafka バージョンでコンテナーイメージを示すようにします。

    Kafka バージョンおよびイメージマッピング」を参照してください。

  5. エディターを保存して終了し、ローリングアップデートの完了を待ちます。

    Pod の状態の遷移を監視して、ローリングアップデートの進捗を確認します。

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    ローリングアップデートにより、各 Pod が新バージョンの Kafka のブローカーバイナリーを使用するようにします。

  6. クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。

    必要に応じて、Kafka Connect および MirrorMaker の version プロパティーを新しいバージョンの Kafka として設定します。

    1. Kafka Connect では、KafkaConnect.spec.version を更新します。
    2. MirrorMaker では、KafkaMirrorMaker.spec.version を更新します。
    3. MirrorMaker 2.0 では、KafkaMirrorMaker2.spec.version を更新します。
  7. 設定されている場合、新しい inter.broker.protocol.version バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 9 に進みます。

    たとえば、Kafka 2.7.0 へのアップグレードでは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.7.0
        config:
          log.message.format.version: "2.6"
          inter.broker.protocol.version: "2.7"
          # ...
  8. Cluster Operator によってクラスターが更新されるまで待ちます。
  9. 設定されている場合、新しい log.message.format.version バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 10 に進みます。

    たとえば、Kafka 2.7.0 へのアップグレードでは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.7.0
        config:
          log.message.format.version: "2.7"
          inter.broker.protocol.version: "2.7"
          # ...
  10. Cluster Operator によってクラスターが更新されるまで待ちます。

    • これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
    • ブローカーは、ブローカー間プロトコルのバージョンと新バージョンの Kafka を使用して、メッセージを送信するように設定されます。

Kafka のアップグレードに従い、必要な場合は以下を行うことができます。

8.1.4. リスナーの汎用リスナー設定への更新

AMQ Streams では、Kafka リソースの Kafka リスナーを設定するための GenericKafkaListener スキーマが提供されます。

GenericKafkaListener は、AMQ Streams から削除された KafkaListeners スキーマに置き換わるものです。

GenericKafkaListener スキーマでは、名前とポートが一意であれば、必要なリスナーをいくつでも設定できます。listeners 設定は配列として定義されますが、非推奨の形式もサポートされます。

OpenShift クラスター内のクライアントの場合は、plain (暗号化なし) または tls 内部 リスナーを作成できます。

OpenShift クラスター外のクライアントの場合は、外部 リスナーを作成して接続メカニズムを指定します。接続メカニズムは nodeportloadbalanceringress、または route のいずれかです。

KafkaListeners スキーマは plaintls、および external リスナーのサブプロパティーを使用し、それぞれに固定ポートを使用します。アップグレードプロセスのいずれかの段階で、KafkaListeners スキーマを使用して設定されたリスナーを GenericKafkaListener スキーマの形式に変換する必要があります。

たとえば、現在 Kafka 設定で以下の設定を使用しているとします。

これまでのリスナー設定

listeners:
  plain:
    # ...
  tls:
    # ...
  external:
    type: loadbalancer
    # ...

以下を使用して、リスナーを新しい形式に変換します。

新しいリスナー設定

listeners:
  #...
  - name: plain
    port: 9092
    type: internal
    tls: false 1
  - name: tls
    port: 9093
    type: internal
    tls: true
  - name: external
    port: 9094
    type: EXTERNAL-LISTENER-TYPE 2
    tls: true

1
すべてのリスナーに TLS プロパティーが必要になります。
2
オプション: ingressloadbalancernodeportroute

必ず 正確な 名前とポート番号を使用してください。

これまでの形式で使用された追加の configuration または overrides プロパティーは、新しい形式に更新する必要があります。

リスナー configuration に導入された変更は次のとおりです。

  • overridesconfiguration セクションとマージされます。
  • dnsAnnotations の名前が annotations に変更されました。
  • preferredAddressType の名前が preferredNodePortAddressType に変更されました。
  • address の名前が alternativeNames に変更されました。
  • loadBalancerSourceRanges および externalTrafficPolicy は、非推奨になった template からリスナー設定に移されました。

たとえば、以下の設定は以下のようになります。

たとえば、以下の設定を見てみましょう。

listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          #...

これを以下に変更します。

新しい追加リスナー設定

listeners:
    #...
  - name: external
    port: 9094
    type:loadbalancer
    tls: true
    authentication:
      type: tls
    configuration:
      bootstrap:
        annotations:
          #...

重要

後方互換性を維持するため、新しいリスナー設定にある名前およびポート番号を使用する 必要 があります。他の値を使用すると、Kafka リスナーおよび OpenShift サービスの名前が変更されます。

それぞれのリスナータイプで利用可能な設定オプションの詳細は、「GenericKafkaListener スキーマ参照」を参照してください。

8.1.5. クライアントをアップグレードするストラテジー

クライアントアプリケーション (Kafka Connect コネクターを含む) のアップグレードに適切な方法は、特定の状況によって異なります。

消費するアプリケーションは、そのアプリケーションが理解するメッセージ形式のメッセージを受信する必要があります。その状態であることを、以下のいずれかの方法で確認できます。

  • プロデューサーをアップグレードする 前に、トピックのすべてのコンシューマーをアップグレードする。
  • ブローカーでメッセージをダウンコンバートする。

ブローカーのダウンコンバートを使用すると、ブローカーに余分な負荷が加わるので、すべてのトピックで長期にわたりダウンコンバートに頼るのは最適な方法ではありません。ブローカーの実行を最適化するには、ブローカーがメッセージを一切ダウンコンバートしないようにしてください。

ブローカーのダウンコンバートは 2 通りの方法で設定できます。

  • トピックレベルの message.format.version では単一のとピックが設定されます。
  • ブローカーレベルの log.message.format.version は、トピックレベルの message.format.version が設定されてないトピックのデフォルトです。

新バージョンの形式でトピックにパブリッシュされるメッセージは、コンシューマーによって認識されます。これは、メッセージがコンシューマーに送信されるときでなく、ブローカーがプロデューサーからメッセージを受信するときに、ブローカーがダウンコンバートを実行するからです。

クライアントのアップグレードに使用できるストラテジーは複数あります。

コンシューマーを最初にアップグレード
  1. コンシューマーとして機能するアプリケーションをすべてアップグレードします。
  2. ブローカーレベルの log.message.format.version を新バージョンに変更します。
  3. プロデューサーとして機能するアプリケーションをアップグレードします。

    このストラテジーは分かりやすく、ブローカーのダウンコンバートの発生をすべて防ぎます。ただし、所属組織内のすべてのコンシューマーを整然とアップグレードできることが前提になります。また、コンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。さらにリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加され、以前のコンシューマーバージョンに戻せなくなる場合があります。

トピック単位でコンシューマーを最初にアップグレード

トピックごとに以下を実行します。

  1. コンシューマーとして機能するアプリケーションをすべてアップグレードします。
  2. トピックレベルの message.format.version を新バージョンに変更します。
  3. プロデューサーとして機能するアプリケーションをアップグレードします。

    このストラテジーではブローカーのダウンコンバートがすべて回避され、トピックごとにアップグレードできます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。ここでもリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加される可能性があります。

トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり

トピックごとに以下を実行します。

  1. トピックレベルの message.format.version を、旧バージョンに変更します (または、デフォルトがブローカーレベルの log.message.format.version のトピックを利用します)。
  2. コンシューマーおよびプロデューサーとして機能するアプリケーションをすべてアップグレードします。
  3. アップグレードしたアプリケーションが正しく機能することを確認します。
  4. トピックレベルの message.format.version を新バージョンに変更します。

    このストラテジーにはブローカーのダウンコンバートが必要ですが、ダウンコンバートは一度に 1 つのトピック (またはトピックの小さなグループ) のみに必要になるので、ブローカーへの負荷は最小限に抑えられます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションにも通用します。この方法により、新しいメッセージ形式バージョンを使用する前に、アップグレードされたプロデューサーとコンシューマーが正しく機能することが保証されます。

    この方法の主な欠点は、多くのトピックやアプリケーションが含まれるクラスターでの管理が複雑になる場合があることです。

クライアントアプリケーションをアップグレードするストラテジーは他にもあります。

注記

複数のストラテジーを適用することもできます。たとえば、最初のいくつかのアプリケーションとトピックに、「トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり」のストラテジーを適用します。これが問題なく適用されたら、より効率的な別のストラテジーの使用を検討できます。

8.2. AMQ Streams カスタムリソースのアップグレード

AMQ Streams を 1.7 にアップグレードした後、カスタムリソースが API バージョン v1beta2 を使用していることを確認する必要があります。これは、1.7 にアップグレードした後にいつでも実行 できますが、アップグレードは AMQ Streams の次のマイナーバージョンの更新前に完了する必要があります。

重要

カスタムリソースの v1beta2 へのアップグレードは、Cluster Operator の アップグレード後に実行し、Cluster Operator がリソースを認識できるようにする 必要があり ます。

注記

カスタムリソースを v1beta2 にアップグレードすると、Kubernetes v1.22 に必要な OpenShift CRD v1 へ移行する準備ができます。

カスタムリソースへの CLI のアップグレード

AMQ Streams では、API 変換ツール とそのリリースアーティファクトが提供されます。

AMQ Streams のダウンロードサイトから ZIP または TAR.GZ をダウンロードできます。このツールを使用するには、そのツールを展開して、スクリプトを bin ディレクトリーで使用します。

その CLI から、ツールを使用してカスタムリソースの形式を以下の 2 つの方法のいずれかで v1beta2 に変換できます。

カスタムリソースの変換後に、CRD の ストレージ API バージョンとして v1beta2 を設定する必要があります。

カスタムリソースへの手動アップグレード

API 変換ツールを使用してカスタムリソースを v1beta2 に更新する代わりに、v1beta2 を使用するように各カスタムリソースを手動で更新することもできます。

他のコンポーネントの設定など、Kafka カスタムリソースを更新します。

デプロイメントに適用される他のカスタムリソースを更新します。

手動の手順では、各カスタムリソースに加えた変更が示されます。これらの変更後に、API 変換ツールを使用して CRD をアップグレードする必要があります。

8.2.1. API のバージョン管理

カスタムリソースは、CRD によって OpenShift に追加された API を使用して編集および制御されます。言い換えると、CRD は Kubernetes API を拡張して、カスタムリソースを作成できるようにします。CRD 自体は OpenShift 内のリソースです。これらの CRD は、OpenShift クラスターにインストールされ、カスタムリソースの API のバージョンを定義します。カスタムリソース API の各バージョンで、そのバージョンの独自のスキーマを定義できます。AMQ Streams Operator を含む OpenShift クライアントは、API バージョンが含まれる URL パス (API パス) を使用して Kubernetes API サーバーによって提供されるカスタムリソースにアクセスします。

v1beta2 が導入され、カスタムリソースのスキーマが更新されるようになりました。古い API バージョンは非推奨になりました。

以下の AMQ Streams カスタムリソースでは、v1alpha1 API バージョンは非推奨になりました。

  • Kafka
  • KafkaConnect
  • KafkaConnectS2I
  • KafkaConnector
  • KafkaMirrorMaker
  • KafkaMirrorMaker2
  • KafkaTopic
  • KafkaUser
  • KafkaBridge
  • KafkaRebalance

以下の AMQ Streams カスタムリソースでは、v1beta1 API バージョンは非推奨になりました。

  • Kafka
  • KafkaConnect
  • KafkaConnectS2I
  • KafkaMirrorMaker
  • KafkaTopic
  • KafkaUser
重要

v1alpha1 および v1beta1 バージョンは、次のマイナーリリースで削除されます。

8.2.2. API 変換ツールを使用したカスタムリソース設定ファイルの変換

この手順では、API 変換ツールを使用して、AMQ Streams カスタムリソースの設定を記述する YAML ファイルを v1beta2 に適用可能な形式に変換する方法を説明します。これには、convert-file (cf) コマンドを使用します。

convert-file コマンドは、複数のドキュメントが含まれる YAML ファイルを変換できます。マルチドキュメントの YAML ファイルでは、含まれる AMQ Streams カスタムリソースがすべて変換されます。AMQ Streams 以外の OpenShift リソースは、変換された出力ファイルに未変更の状態でレプリケートされます。

YAML ファイルの変換後、設定を適用してクラスターのカスタムリソースを更新する必要があります。または、GitOps 同期メカニズムがクラスターの更新に使用される場合は、これを使用して変更を適用できます。変換は、カスタムリソースが OpenShift クラスターで更新される場合にのみ完了します。

または、convert-resource の手順を使用して、カスタムリソースを直接変換することもできます

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。
  • リリースアーティファクトで提供される API 変換ツールが必要です。
  • このツールには Java 11 が必要です。

API 変換ツールおよび convert-file コマンドで利用可能なフラグの詳細は、CLI で help を使用します。

bin/api-conversion.sh help
bin/api-conversion.sh help convert-file

Windows を使用している場合は、この手順で bin/api-conversion.cmd を使用します。

表8.1 YAML ファイル変換のフラグ
フラグ説明

-f, --file=NAME-OF-YAML-FILE

変換される AMQ Streams カスタムリソースの YAML ファイルを指定します。

-o, --output=NAME-OF-CONVERTED-YAML-FILE

変換されたカスタムリソースの出力 YAML ファイルを作成します。

--in-place

変換された YAML で元のソースファイルを更新します。

手順

  1. convert-file コマンドおよび適切なフラグで、API 変換ツールを実行します。

    例 1: YAML ファイルを変換し、出力を表示しますが、ファイルは変更されません。

    bin/api-conversion.sh convert-file --file input.yaml

    例 2: YAML ファイルを変換し、変更を元のソースファイルに書き込みます。

    bin/api-conversion.sh convert-file --file input.yaml --in-place

    例 3: YAML ファイルを変換し、変更を新しい出力ファイルに書き込みます。

    bin/api-conversion.sh convert-file --file input.yaml --output output.yaml
  2. 変換した設定ファイルを使用して、カスタムリソースを更新します。

    oc apply -f CONVERTED-CONFIG-FILE
  3. カスタムリソースが変換されたことを確認します。

    oc get KIND CUSTOM-RESOURCE-NAME -o yaml

8.2.3. API 変換ツールを使用したカスタムリソースの直接変換

この手順では、API 変換ツールを使用して OpenShift クラスターの AMQ Streams カスタムリソースを直接 v1beta2 に適用可能な形式に変換する方法を説明します。これには、convert-resource (cr) コマンドを使用します。このコマンドは Kubernetes API を使用して変換を行います。

kind プロパティーに基づいて、AMQ Streams カスタムリソースのタイプを 1 つ以上指定するか、すべてのタイプを変換できます。特定の namespace またはすべての namespace を変換の対象にすることもできます。1 つの namespace を対象にする場合、その namespace のすべてのカスタムリソースを変換でき、その名前と種類を指定すると単一のカスタムリソースを変換できます。

または、convert-file の手順を使用して、カスタムリソースを記述する YAML ファイルを変換および適用できます

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。
  • リリースアーティファクトで提供される API 変換ツールが必要です。
  • このツールには Java 11 (OpenJDK) が必要です。
  • 手順では、以下を行うために RBAC の権限を持つユーザー管理者アカウントが必要です。

    • --name オプションを使用して、変換される AMQ Streams カスタムリソースを取得。
    • --name オプションを使用せずに、変換される AMQ Streams カスタムリソースを一覧表示。
    • 変換される AMQ Streams カスタムリソースの置き換え。

API 変換ツールおよび convert-resource コマンドで利用可能なフラグの詳細は、CLI で help を使用します。

bin/api-conversion.sh help
bin/api-conversion.sh help convert-resource

Windows を使用している場合は、この手順で bin/api-conversion.cmd を使用します。

表8.2 カスタムリソースを変換するためのフラグ
フラグ説明

-k, --kind

変換するカスタムリソースの種類を指定します。指定されていない場合はすべてのリソースを変換します。

-a, --all-namespaces

すべての namespace でカスタムリソースを変換します。

-n, --namespace

OpenShift namespace または OpenShift プロジェクトを指定します。指定されていない場合は現在の namespace を使用します。

--name

--namespace および単一のカスタムリソース --kind が使用される場合は、変換されるカスタムリソースの名前を指定します。

手順

  1. convert-resource コマンドおよび適切なフラグで、API 変換ツールを実行します。

    例 1: 現在の namespace のすべての AMQ Streams リソースを変換します。

    bin/api-conversion.sh convert-resource

    例 2: すべての namespace のすべての AMQ Streams リソースを変換します。

    bin/api-conversion.sh convert-resource --all-namespaces

    例 3: my-kafka namespace のすべての AMQ Streams リソースを変換します。

    bin/api-conversion.sh convert-resource --namespace my-kafka

    例 4: すべての namespace の Kafka リソースのみを変換します。

    bin/api-conversion.sh convert-resource --all-namespaces --kind Kafka

    例 5: すべての namespace の Kafka および Kafka Connect リソースを変換します。

    bin/api-conversion.sh convert-resource --all-namespaces --kind Kafka --kind KafkaConnect

    例 6: my-kafka namespace の my-cluster という名前の Kafka カスタムリソースを変換します。

    bin/api-conversion.sh convert-resource --kind Kafka --namespace my-kafka --name my-cluster
  2. カスタムリソースが変換されたことを確認します。

    oc get KIND CUSTOM-RESOURCE-NAME -o yaml

8.2.4. API 変換ツールを使用した CRD の v1beta2 へのアップグレード

この手順では、API 変換ツールを使用して、AMQ Streams 固有のリソースを v1beta2 に適用可能な形式でインスタンス化および管理するために使用されるスキーマを定義する CRD を変換する方法を説明します。これには、crd-upgrade コマンドを使用します。

OpenShift クラスター全体のすべての AMQ Streams カスタムリソースを v1beta2 に変換した後に、この手順を実行します。CRD を最初にアップグレードしてからカスタムリソースを変換する場合は、このコマンドを再度実行する必要があります。

このコマンドは、CRD の spec.versions を更新し、v1beta2ストレージ API バージョンとして宣言します。このコマンドは、カスタムリソースも更新するため、それらのカスタムリソースは v1beta2 下に保存されます。新しいカスタムリソースインスタンスはストレージ API バージョンの仕様から作成されるため、1 つの API バージョンのみがストレージバージョンとしてマークされます。

v1beta2 をストレージバージョンとして使用するように CRD をアップグレードした場合は、カスタムリソースで v1beta2 プロパティーのみを使用する必要があります。

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。
  • リリースアーティファクトで提供される API 変換ツールが必要です。
  • このツールには Java 11 (OpenJDK) が必要です。
  • カスタムリソースが v1beta2 に変換されている必要があります。
  • 手順では、以下を行うために RBAC の権限を持つユーザー管理者アカウントが必要です。

    • すべての namespace の AMQ Streams カスタムリソースのリスト。
    • 変換される AMQ Streams カスタムリソースの置き換え。
    • CRD の更新。
    • CRD の状態の置き換え。

API 変換ツールの詳細は、CLI で help を使用します。

bin/api-conversion.sh help

Windows を使用している場合は、この手順で bin/api-conversion.cmd を使用します。

手順

  1. v1beta2 を使用するようにカスタムリソースを変換していない場合は、そのように変換します。

    これは、API 変換ツールを使用して、以下のいずれかの方法で行います。

  2. crd-upgrade コマンドを使用して、API 変換ツールを実行します。

    bin/api-conversion.sh crd-upgrade
  3. CRD がアップグレードされ、v1beta2 がストレージバージョンであることを確認します。

    たとえば、Kafka トピック CRD の場合は次のとおりです。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: CustomResourceDefinition
    metadata:
      name: kafkatopics.kafka.strimzi.io
      #...
    spec:
      group: kafka.strimzi.io
      #...
      versions:
      - name: v1beta2
        served: true
        storage: true
        #...
    status:
      #...
      storedVersions:
      - v1beta2

8.2.5. v1beta2 をサポートするように Kafka リソースをアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。

手順

デプロイメントの Kafka カスタムリソースごとに以下の手順を実行します。

  1. エディターで Kafka カスタムリソースを更新します。

    oc edit kafka KAFKA-CLUSTER
  2. .spec.kafka.listener を新しい汎用リスナー形式に更新していない場合は、「リスナーの汎用リスナー設定への更新」 の説明に従って更新します。

    警告

    API バージョン v1beta2 では、古いリスナーの形式はサポートされません。

  3. affinity.spec.kafka.affinity に存在する場合は .spec.kafka.template.pod.affinity に移動します。
  4. tolerations.spec.kafka.tolerations に存在する場合は .spec.kafka.template.pod.tolerations に移動します。
  5. .spec.kafka.template.tlsSidecarContainer が存在する場合は削除します。
  6. .spec.kafka.tlsSidecarContainer が存在する場合は削除します。
  7. 以下のポリシー設定のいずれかが存在する場合:

    • .spec.kafka.template.externalBootstrapService.externalTrafficPolicy
    • .spec.kafka.template.perPodService.externalTrafficPolicy

      1. type: loadbalancer および type: nodeport リスナー両方の設定を .spec.kafka.listeners[].configuration.externalTrafficPolicy に移動します。
      2. .spec.kafka.template.externalBootstrapService.externalTrafficPolicy または .spec.kafka.template.perPodService.externalTrafficPolicy を削除します。
  8. 以下の loadbalancer リスナー設定のいずれかが存在する場合:

    • .spec.kafka.template.externalBootstrapService.loadBalancerSourceRanges
    • .spec.kafka.template.perPodService.loadBalancerSourceRanges

      1. type: loadbalancer リスナーの設定を .spec.kafka.listeners[].configuration.loadBalancerSourceRanges に移動します。
      2. .spec.kafka.template.externalBootstrapService.loadBalancerSourceRanges または .spec.kafka.template.perPodService.loadBalancerSourceRanges を削除します。
  9. type: external ロギングが .spec.kafka.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  10. .spec.kafka.metrics フィールドを使用してメトリクスを有効にする場合:

    1. JMX Prometheus エクスポーターの YAML 設定をキーの下に保存する新しい ConfigMap を作成します。YAML は .spec.kafka.metrics フィールドの内容と一致している必要があります。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-metrics
        labels:
          app: strimzi
      data:
        kafka-metrics-config.yaml: |
            <YAML>
    2. ConfigMap およびキーを示す .spec.kafka.metricsConfig プロパティーを追加します。

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-metrics
            key: kafka-metrics-config.yaml
    3. 古い .spec.kafka.metrics フィールドを削除します。
  11. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

次のステップ

Kafka カスタムリソースごとに、ZooKeeper、Topic Operator、Entity Operator、および Cruise Control (デプロイされている場合) の設定をアップグレードして、バージョン v1beta2 をサポートするようにします。これは以下の手順で説明します。

v1beta2 をサポートするようにすべての Kafka 設定が更新されると、 Kafka カスタムリソースを v1beta2 にアップグレードできます

8.2.6. v1beta2 をサポートするように ZooKeeper をアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。

手順

デプロイメントの Kafka カスタムリソースごとに以下の手順を実行します。

  1. エディターで Kafka カスタムリソースを更新します。

    oc edit kafka KAFKA-CLUSTER
  2. affinity.spec.zookeeper.affinity に存在する場合は .spec.zookeeper.template.pod.affinity に移動します。
  3. tolerations.spec.zookeeper.tolerations に存在する場合は .spec.zookeeper.template.pod.tolerations に移動します。
  4. .spec.zookeeper.template.tlsSidecarContainer が存在する場合は削除します。
  5. .spec.zookeeper.tlsSidecarContainer が存在する場合は削除します。
  6. type: external ロギングが .spec.kafka.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  7. .spec.zookeeper.metrics フィールドを使用してメトリクスを有効にする場合:

    1. JMX Prometheus エクスポーターの YAML 設定をキーの下に保存する新しい ConfigMap を作成します。YAML は .spec.zookeeper.metrics フィールドの内容と一致している必要があります。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-metrics
        labels:
          app: strimzi
      data:
        zookeeper-metrics-config.yaml: |
            <YAML>
    2. ConfigMap およびキーを示す .spec.zookeeper.metricsConfig プロパティーを追加します。

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-metrics
            key: zookeeper-metrics-config.yaml
    3. 古い .spec.zookeeper.metrics フィールドを削除します。
  8. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.7. v1beta2 をサポートするように Topic Operator をアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。

手順

デプロイメントの Kafka カスタムリソースごとに以下の手順を実行します。

  1. エディターで Kafka カスタムリソースを更新します。

    oc edit kafka KAFKA-CLUSTER
  2. Kafka.spec.topicOperator を使用する場合:

    1. affinity.spec.topicOperator.affinity から .spec.entityOperator.template.pod.affinity に移動します。
    2. tolerations.spec.topicOperator.tolerations から .spec.entityOperator.template.pod.tolerations に移動します。
    3. .spec.topicOperator.tlsSidecar.spec.entityOperator.tlsSidecar に移動します。
    4. affinitytolerations、および tlsSidecar の移動後に、.spec.topicOperator の残りの設定を .spec.entityOperator.topicOperator に移動します。
  3. type: external ロギングが .spec.topicOperator.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
    注記

    このステップは、Entity Operator のアップグレード の一部として完了することもできます。

  4. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.8. v1beta2 をサポートするように Entity Operator をアップグレード

前提条件

手順

デプロイメントの Kafka カスタムリソースごとに以下の手順を実行します。

  1. エディターで Kafka カスタムリソースを更新します。

    oc edit kafka KAFKA-CLUSTER
  2. affinity.spec.entityOperator.affinity から .spec.entityOperator.template.pod.affinity に移動します。
  3. tolerations.spec.entityOperator.tolerations から .spec.entityOperator.template.pod.tolerations に移動します。
  4. type: external ロギングが .spec.entityOperator.userOperator.logging または .spec.entityOperator.topicOperator.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
  5. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.9. v1beta2 をサポートするように Cruise Control をアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。
  • Cruise Control が設定およびデプロイされている必要があります。『AMQ Streams on OpenShift の使用』の「Cruise Control のデプロイ」を参照してください。

手順

Kafka クラスターの Kafka.spec.cruiseControl 設定ごとに以下の手順を実行します。

  1. エディターで Kafka カスタムリソースを更新します。

    oc edit kafka KAFKA-CLUSTER
  2. type: external ロギングが .spec.cruiseControl.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
  3. .spec.cruiseControl.metrics フィールドを使用してメトリクスを有効にする場合:

    1. JMX Prometheus エクスポーターの YAML 設定をキーの下に保存する新しい ConfigMap を作成します。YAML は .spec.cruiseControl.metrics フィールドの内容と一致している必要があります。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-metrics
        labels:
          app: strimzi
      data:
        cruise-control-metrics-config.yaml: |
            <YAML>
    2. ConfigMap およびキーを示す .spec.cruiseControl.metricsConfig プロパティーを追加します。

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-metrics
            key: cruise-control-metrics-config.yaml
    3. 古い .spec.cruiseControl.metrics フィールドを削除します。
  4. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.10. Kafka リソースの API バージョンを v1beta2 にアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。
  • Kafka カスタムリソース内で以下の設定が更新済みである必要があります。

手順

デプロイメントの Kafka カスタムリソースごとに以下の手順を実行します。

  1. エディターで Kafka カスタムリソースを更新します。

    oc edit kafka KAFKA-CLUSTER
  2. Kafka カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1beta1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  3. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.11. Kafka Connect リソースの v1beta2 へのアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。

手順

デプロイメントの KafkaConnect カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaConnect カスタムリソースを更新します。

    oc edit kafkaconnect KAFKA-CONNECT-CLUSTER
  2. 以下があるか確認します。

    KafkaConnect.spec.affinity
    KafkaConnect.spec.tolerations

    あれば以下に変更します。

    KafkaConnect.spec.template.pod.affinity
    KafkaConnect.spec.template.pod.tolerations

    たとえば、以下の場合を考えてみましょう。

    spec:
      # ...
      affinity:
        # ...
      tolerations:
        # ...

    これを以下に変更します。

    spec:
      # ...
      template:
        pod:
          affinity:
            # ...
          tolerations:
            # ...
  3. type: external ロギングが .spec.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  4. .spec.metrics フィールドを使用してメトリクスを有効にする場合:

    1. JMX Prometheus エクスポーターの YAML 設定をキーの下に保存する新しい ConfigMap を作成します。YAML は .spec.metrics フィールドの内容と一致している必要があります。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-connect-metrics
        labels:
          app: strimzi
      data:
        connect-metrics-config.yaml: |
            <YAML>
    2. ConfigMap およびキーを示す .spec.metricsConfig プロパティーを追加します。

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-connect-metrics
            key: connect-metrics-config.yaml
    3. 古い .spec.metrics フィールドを削除します。
  5. KafkaConnect カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1beta1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  6. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.12. Kafka Connect S2I リソースの v1beta2 へのアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。

手順

デプロイメントの KafkaConnectS2I カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaConnectS2I カスタムリソースを更新します。

    oc edit kafkaconnects2i S2I-CLUSTER
  2. 以下があるか確認します。

    KafkaConnectS2I.spec.affinity
    KafkaConnectS2I.spec.tolerations

    あれば以下に変更します。

    KafkaConnectS2I.spec.template.pod.affinity
    KafkaConnectS2I.spec.template.pod.tolerations

    たとえば、以下の場合を考えてみましょう。

    spec:
      # ...
      affinity:
        # ...
      tolerations:
        # ...

    これを以下に変更します。

    spec:
      # ...
      template:
        pod:
          affinity:
            # ...
          tolerations:
            # ...
  3. type: external ロギングが .spec.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  4. .spec.metrics フィールドを使用してメトリクスを有効にする場合:

    1. JMX Prometheus エクスポーターの YAML 設定をキーの下に保存する新しい ConfigMap を作成します。YAML は .spec.metrics フィールドの内容と一致している必要があります。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-connect-s2i-metrics
        labels:
          app: strimzi
      data:
        connect-s2i-metrics-config.yaml: |
            <YAML>
    2. ConfigMap およびキーを示す .spec.metricsConfig プロパティーを追加します。

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-connect-s2i-metrics
            key: connect-s2i-metrics-config.yaml
    3. 古い .spec.metrics フィールドを削除します。
  5. KafkaConnectS2I カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1beta1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  6. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.13. Kafka MirrorMaker リソースの v1beta2 へのアップグレード

前提条件

手順

デプロイメントの KafkaMirrorMaker カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaMirrorMaker カスタムリソースを更新します。

    oc edit kafkamirrormaker MIRROR-MAKER
  2. 以下があるか確認します。

    KafkaMirrorMaker.spec.affinity
    KafkaMirrorMaker.spec.tolerations

    あれば以下に変更します。

    KafkaMirrorMaker.spec.template.pod.affinity
    KafkaMirrorMaker.spec.template.pod.tolerations

    たとえば、以下の場合を考えてみましょう。

    spec:
      # ...
      affinity:
        # ...
      tolerations:
        # ...

    これを以下に変更します。

    spec:
      # ...
      template:
        pod:
          affinity:
            # ...
          tolerations:
            # ...
  3. type: external ロギングが .spec.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  4. .spec.metrics フィールドを使用してメトリクスを有効にする場合:

    1. JMX Prometheus エクスポーターの YAML 設定をキーの下に保存する新しい ConfigMap を作成します。YAML は .spec.metrics フィールドの内容と一致している必要があります。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-mm-metrics
        labels:
          app: strimzi
      data:
        mm-metrics-config.yaml: |
            <YAML>
    2. ConfigMap およびキーを示す .spec.metricsConfig プロパティーを追加します。

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-mm-metrics
            key: mm-metrics-config.yaml
    3. 古い .spec.metrics フィールドを削除します。
  5. KafkaMirrorMaker カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1beta1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  6. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.14. Kafka MirrorMaker 2.0 リソースの v1beta2 へのアップグレード

前提条件

手順

デプロイメントの KafkaMirrorMaker2 カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaMirrorMaker2 カスタムリソースを更新します。

    oc edit kafkamirrormaker2 MIRROR-MAKER-2
  2. affinity.spec.affinity に存在する場合は .spec.template.pod.affinity に移動します。
  3. tolerations.spec.tolerations に存在する場合は .spec.template.pod.tolerations に移動します。
  4. type: external ロギングが .spec.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  5. .spec.metrics フィールドを使用してメトリクスを有効にする場合:

    1. JMX Prometheus エクスポーターの YAML 設定をキーの下に保存する新しい ConfigMap を作成します。YAML は .spec.metrics フィールドの内容と一致している必要があります。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-mm2-metrics
        labels:
          app: strimzi
      data:
        mm2-metrics-config.yaml: |
            <YAML>
    2. ConfigMap およびキーを示す .spec.metricsConfig プロパティーを追加します。

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-mm2-metrics
            key: mm2-metrics-config.yaml
    3. 古い .spec.metrics フィールドを削除します。
  6. KafkaMirrorMaker2 カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1alpha1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  7. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.15. Kafka Bridge リソースの v1beta2 へのアップグレード

前提条件

手順

デプロイメントの KafkaBridge リソースごとに以下の手順を実行します。

  1. エディターで KafkaBridge カスタムリソースを更新します。

    oc edit kafkabridge KAFKA-BRIDGE
  2. type: external ロギングが KafkaBridge.spec.logging で設定されている場合:

    ロギング設定が含まれる ConfigMap の name を置き換えます。

    logging:
      type: external
      name: my-config-map

    valueFrom.configMapKeyRef フィールドで、ログ保存される ConfigMap の name および key の両方を指定します。

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
  3. KafkaBridge カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1alpha1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  4. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.16. Kafka User リソースの v1beta2 へのアップグレード

前提条件

  • v1beta2 API バージョンをサポートする User Operator が稼働している必要があります。

手順

デプロイメントの KafkaUser カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaUser カスタムリソースを更新します。

    oc edit kafkauser KAFKA-USER
  2. KafkaUser カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1beta1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  3. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.17. Kafka Topic リソースの v1beta2 へのアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Topic Operator が稼働している必要があります。

手順

デプロイメントの KafkaTopic カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaTopic カスタムリソースを更新します。

    oc edit kafkatopic KAFKA-TOPIC
  2. KafkaTopic カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1beta1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  3. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.18. Kafka Connector リソースの v1beta2 へのアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。
  • コネクターインスタンスを管理するために KafkaConnector カスタムリソースがデプロイされている必要があります。「コネクターの作成および管理」 を参照してください。

手順

デプロイメントの KafkaConnector カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaConnector カスタムリソースを更新します。

    oc edit kafkaconnector KAFKA-CONNECTOR
  2. KafkaConnector カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1alpha1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  3. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.2.19. Kafka Rebalance リソースの v1beta2 へのアップグレード

前提条件

  • v1beta2 API バージョンをサポートする Cluster Operator が稼働している必要があります。
  • Cruise Control が設定およびデプロイされている必要があります。『AMQ Streams on OpenShift の使用』の「Cruise Control のデプロイ」を参照してください。

手順

デプロイメントの KafkaRebalance カスタムリソースごとに以下の手順を実行します。

  1. エディターで KafkaRebalance カスタムリソースを更新します。

    oc edit kafkarebalance KAFKA-REBALANCE
  2. KafkaRebalance カスタムリソースの apiVersionv1beta2 に更新します。

    以下の行を見つけます。

    apiVersion: kafka.strimzi.io/v1alpha1

    この行を以下の行に変更します。

    apiVersion: kafka.strimzi.io/v1beta2
  3. ファイルを保存し、エディターを終了して更新したカスタムリソースが調整されるのを待ちます。

8.3. コンシューマーの Cooperative Rebalancing へのアップグレード

Kafka コンシューマーおよび Kafka Streams アプリケーションをアップグレードすることで、パーティションの再分散にデフォルトの Eager Rebalance プロトコルではなく Incremental Cooperative Rebalance プロトコルを使用できます。この新しいプロトコルが Kafka 2.4.0 に追加されました。

コンシューマーは、パーティションの割り当てを Cooperative Rebalance で保持し、クラスターの分散が必要な場合にプロセスの最後でのみ割り当てを取り消します。これにより、コンシューマーグループまたは Kafka Streams アプリケーションが使用不可能になる状態が削減されます。

注記

Incremental Cooperative Rebalance プロトコルへのアップグレードは任意です。Eager Rebalance プロトコルは引き続きサポートされます。

手順

Incremental Cooperative Rebalance プロトコルを使用するように Kafka コンシューマーをアップグレードするには以下を行います。

  1. Kafka クライアント .jar ファイルを新バージョンに置き換えます。
  2. コンシューマー設定で、partition.assignment.strategycooperative-sticky を追加します。たとえば、range ストラテジーが設定されている場合は、設定を range, cooperative-sticky に変更します。
  3. グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
  4. コンシューマー設定から前述の partition.assignment.strategy を削除して、グループの各コンシューマーを再設定し、cooperative-sticky ストラテジーのみを残します。
  5. グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。

Incremental Cooperative Rebalance プロトコルを使用するように Kafka Streams アプリケーションをアップグレードするには以下を行います。

  1. Kafka Streams の .jar ファイルを新バージョンに置き換えます。
  2. Kafka Streams の設定で、upgrade.from 設定パラメーターをアップグレード前の Kafka バージョンに設定します (例: 2.3)。
  3. 各ストリームプロセッサー (ノード) を順次再起動します。
  4. upgrade.from 設定パラメーターを Kafka Streams 設定から削除します。
  5. グループ内の各コンシューマーを順次再起動します。

その他のリソース

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.