第14章 AMQ Streams および Kafka のアップグレード


AMQ Streams は、クラスターのダウンタイムを発生せずにアップグレードできます。AMQ Streams の各バージョンは、Apache Kafka の 1 つ以上のバージョンをサポートします。使用する AMQ Streams バージョンでサポートされれば、より高いバージョンの Kafka にアップグレードできます。サポートされる下位バージョンの Kafka にダウングレードできる場合もあります。

より新しいバージョンの AMQ Streams はより新しいバージョンの Kafka をサポートしますが、AMQ Streams をアップグレードしてから、サポートされる上位バージョンの Kafka にアップグレードする必要があります。

14.1. アップグレードの前提条件

アップグレードプロセスを開始する前に、以下を確認します。

14.2. アップグレードプロセス

AMQ Streams のアップグレードは 2 段階のプロセスで行います。ダウンタイムなしでブローカーとクライアントをアップグレードするには、以下の順序でアップグレード手順を 必ず 完了してください。

  1. Cluster Operator を最新の AMQ Streams バージョンに更新します。

  2. すべての Kafka ブローカーとクライアントアプリケーションを、最新の Kafka バージョンにアップグレードします。

14.3. Kafka バージョン

Kafka のログメッセージ形式バージョンおよびブローカー間のプロトコルバージョンは、メッセージに追加されるログ形式バージョンとクラスターで使用されるプロトコルのバージョンを指定します。そのためアップグレードプロセスでは、既存の Kafka ブローカーの設定変更およびクライアントアプリケーション (コンシューマーおよびプロデューサー) のコード変更により、必ず正しいバージョンを使用されるようにする必要になります。

以下の表は、Kafka バージョンの違いを示しています。

Kafka のバージョンInterbroker プロトコルのバージョンログメッセージ形式のバージョンZooKeeper バージョン

2.3.0

2.3

2.3

3.4.14

2.4.0

2.4

2.4

3.5.7

メッセージ形式のバージョン

プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリースによって変わるため、メッセージにはエンコードに使用された形式のバージョンが含まれます。ブローカーがメッセージをログに追加する前に、メッセージを新しい形式バージョンから特定の旧形式バージョンに変換するように、Kafka ブローカーを設定できます。

Kafka には、メッセージ形式のバージョンを設定する 2 通りの方法があります。

  • message.format.version プロパティーはトピックに設定されます。
  • log.message.format.version プロパティーは Kafka ブローカーに設定されます。

トピックの message.format.version のデフォルト値は、Kafka ブローカーに設定される log.message.format.version によって定義されます。トピックの message.format.version は、トピック設定を編集すると手動で設定できます。

本セクションのアップグレード作業では、メッセージ形式のバージョンが log.message.format.version によって定義されることを前提としています。

14.4. Cluster Operator のアップグレード

このセクションでは、AMQ Streams 1.4 を使用するように Cluster Operator デプロイメントをアップグレードする手順について説明します。

Cluster Operator によって管理される Kafka クラスターの可用性は、アップグレード操作の影響を受けません。

注記

特定バージョンの AMQ Streams へのアップグレード方法については、そのバージョンをサポートするドキュメントを参照してください。

14.4.1. Cluster Operator の後続バージョンへのアップグレード

この手順では、Cluster Operator デプロイメントを後続バージョンにアップグレードする方法を説明します。

前提条件

手順

  1. 既存の Cluster Operator リソースのバックアップを作成します。

    oc get all -l app=strimzi -o yaml > strimzi-backup.yaml
  2. Cluster Operator を更新します。

    Cluster Operator が稼働している namespace に従い、インストールファイルを編集します。

    Linux の場合は、以下を使用します。

    sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

    MacOS の場合は、以下を使用します。

    sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

    既存の Cluster Operator Deploymentで 1 つ以上の環境変数を編集した場合、install/cluster-operator/050-Deployment-cluster-operator.yaml ファイルを編集し、Cluster Operator の新規バージョンに加えた変更を反映させます。

  3. 設定を更新したら、残りのインストールリソースとともにデプロイします。

    oc apply -f install/cluster-operator

    ローリングアップデートが完了するのを待ちます。

  4. Kafka Pod のイメージを取得して、アップグレードが正常に完了したことを確認します。

    oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    イメージタグには、新しい AMQ Streams バージョンと Kafka バージョンが順に示されます。例: <New AMQ Streams version>-kafka-<Current Kafka version>

  5. 既存のリソースを更新して、非推奨になったカスタムリソースプロパティーを処理します。

これで Cluster Operator が更新されましたが、その管理下にあるクラスターで実行している Kafka のバージョンは変わりません。

次のステップ

Cluster Operator のアップグレードの次に、Kafka アップグレードを実行できます。

14.5. Kafka のアップグレード

Cluster Operator をアップグレードしたら、サポートされる上位の Kafka バージョンにブローカーをアップグレードできます。

Kafka のアップグレードは、Cluster Operator を使用して行います。Cluster Operator によるアップグレードの実行方法は、以下のバージョン間の違いによって異なります。

  • Interbroker プロトコル
  • ログメッセージの形式
  • ZooKeeper

現在の Kafka バージョンとアップグレードする Kafka バージョンが同じ場合 (パッチレベルでのアップグレードではよくあります)、Kafka ブローカーのローリングアップデートを 1 回実行して Cluster Operator によるアップグレードを行います。

これらのバージョンが 1 つ以上異なる場合、Kafka ブローカーのローリングアップデートを 2、3 回実行して Cluster Operator でアップグレードを実行する必要があります。

14.5.1. Kafka バージョンおよびイメージマッピング

Kafka のアップグレード時に、STRIMZI_KAFKA_IMAGES および Kafka.spec.kafka.version プロパティーの設定について考慮してください。

  • それぞれの Kafka リソースは Kafka.spec.kafka.version で設定できます。
  • Cluster Operator の STRIMZI_KAFKA_IMAGES 環境変数により、Kafka のバージョンと、指定の Kafka リソースでそのバージョンが要求されるときに使用されるイメージをマッピングできます。

    • Kafka.spec.kafka.image を設定しないと、そのバージョンのデフォルトのイメージが使用されます。
    • Kafka.spec.kafka.image を設定すると、デフォルトのイメージがオーバーライドされます。
警告

Cluster Operator は、Kafka ブローカーの想定されるバージョンが実際にイメージに含まれているかどうかを検証できません。所定のイメージが所定の Kafka バージョンに対応することを必ず確認してください。

14.5.2. クライアントをアップグレードするストラテジー

クライアントアプリケーション (Kafka Connect コネクターを含む) をアップグレードする最善の方法は、特定の状況によって異なります。

消費するアプリケーションは、そのアプリケーションが理解するメッセージ形式のメッセージを受信する必要があります。その状態であることを、以下のいずれかの方法で確認できます。

  • プロデューサーをアップグレードする 前に、トピックのすべてのコンシューマーをアップグレードする。
  • ブローカーでメッセージをダウンコンバートする。

ブローカーのダウンコンバートを使用すると、ブローカーに余分な負荷が加わるので、すべてのトピックで長期にわたりダウンコンバートに頼るのは最適な方法ではありません。ブローカーの実行を最適化するには、ブローカーがメッセージを一切ダウンコンバートしないようにしてください。

ブローカーのダウンコンバートは 2 通りの方法で設定できます。

  • トピックレベルの message.format.version では単一のとピックが設定されます。
  • ブローカーレベルの log.message.format.version は、トピックレベルの message.format.version が設定されてないトピックのデフォルトです。

新バージョンの形式でトピックにパブリッシュされるメッセージは、コンシューマーによって認識されます。これは、メッセージがコンシューマーに送信されるときでなく、ブローカーがプロデューサーからメッセージを受信するときに、ブローカーがダウンコンバートを実行するからです。

クライアントのアップグレードに使用できるストラテジーは複数あります。

コンシューマーを最初にアップグレード
  1. コンシューマーとして機能するアプリケーションをすべてアップグレードします。
  2. ブローカーレベルの log.message.format.version を新バージョンに変更します。
  3. プロデューサーとして機能するアプリケーションをアップグレードします。

    このストラテジーは分かりやすく、ブローカーのダウンコンバートの発生をすべて防ぎます。ただし、所属組織内のすべてのコンシューマーを整然とアップグレードできることが前提になります。また、コンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。さらにリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加され、以前のコンシューマーバージョンに戻せなくなる場合があります。

トピック単位でコンシューマーを最初にアップグレード

トピックごとに以下を実行します。

  1. コンシューマーとして機能するアプリケーションをすべてアップグレードします。
  2. トピックレベルの message.format.version を新バージョンに変更します。
  3. プロデューサーとして機能するアプリケーションをアップグレードします。

    このストラテジーではブローカーのダウンコンバートがすべて回避され、トピックごとにアップグレードできます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。ここでもリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加される可能性があります。

トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり

トピックごとに以下を実行します。

  1. トピックレベルの message.format.version を、旧バージョンに変更します (または、デフォルトがブローカーレベルの log.message.format.version のトピックを利用します)。
  2. コンシューマーおよびプロデューサーとして機能するアプリケーションをすべてアップグレードします。
  3. アップグレードしたアプリケーションが正しく機能することを確認します。
  4. トピックレベルの message.format.version を新バージョンに変更します。

    このストラテジーにはブローカーのダウンコンバートが必要ですが、ダウンコンバートは一度に 1 つのトピック (またはトピックの小さなグループ) のみに必要になるので、ブローカーへの負荷は最小限に抑えられます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションにも通用します。この方法により、新しいメッセージ形式バージョンを使用する前に、アップグレードされたプロデューサーとコンシューマーが正しく機能することが保証されます。

    この方法の主な欠点は、多くのトピックやアプリケーションが含まれるクラスターでの管理が複雑になる場合があることです。

クライアントアプリケーションをアップグレードするストラテジーは他にもあります。

注記

複数のストラテジーを適用することもできます。たとえば、最初のいくつかのアプリケーションとトピックに、「トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり」のストラテジーを適用します。これが問題なく適用されたら、より効率的な別のストラテジーの使用を検討できます。

14.5.3. Kafka ブローカーおよびクライアントアプリケーションのアップグレード

この手順では、AMQ Streams Kafka クラスターを Kafka の上位バージョンにアップグレードする方法を説明します。

前提条件

Kafka リソースをアップグレードするには、以下を確認します。

  • 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
  • Kafka.spec.kafka.config に、アップグレード先となる Kafka バージョンでサポートされないオプションが含まれていない。
  • 現行の Kafka バージョンの log.message.format.version を新しいバージョンに更新する必要があるかどうか。

    Kafka バージョン表を参照してください。

手順

  1. 必要に応じてエディターで Kafka クラスター設定を更新します。

    oc edit kafka my-cluster
    1. 現行のバージョンの Kafka の log.message.format.version が新しい Kafka バージョンでも同じ場合は、次の手順に進みます。

      それ以外の場合、Kafka.spec.kafka.configlog.message.format.version現行 バージョンのデフォルトに設定されていることを確認してください。

      たとえば、Kafka 2.3.0 からのアップグレードでは以下のようになります。

      kind: Kafka
      spec:
        # ...
        kafka:
          version: 2.3.0
          config:
            log.message.format.version: "2.3"
            # ...

      log.message.format.version が設定されていない場合は、現行バージョンに設定します。

      注記

      log.message.format.version の値は、浮動小数点数として解釈されないように文字列である必要があります。

    2. Kafka.spec.kafka.version を変更し、新バージョンを指定します (log.message.format.version は現行バージョンのままにします)。

      たとえば、Kafka 2.3.0 から 2.4.0 へのアップグレードは以下のようになります。

      apiVersion: v1alpha1
      kind: Kafka
      spec:
        # ...
        kafka:
          version: 2.4.0 1
          config:
            log.message.format.version: "2.3" 2
            # ...
      1
      これは新バージョンに変更されます。
      2
      これは現行バージョンのままです。
    3. Kafka バージョンのイメージが Cluster Operator の STRIMZI_KAFKA_IMAGES に定義されているイメージとは異なる場合は、Kafka.spec.kafka.image を更新します。

      「Kafka バージョンおよびイメージマッピング」を参照してください。

  2. エディターを保存して終了し、ローリングアップデートの完了を待ちます。

    注記

    新バージョンの Kafka に新しい ZooKeeper バージョンがある場合、追加のローリングアップデートが発生します。

    更新をログで確認するか、または Pod 状態の遷移を監視して確認します。

    oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version upgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
    oc get po -w

    現行バージョンと新バージョンの Kafka で、Interbroker プロトコルのバージョンが異なる場合は、Cluster Operator ログで INFO レベルのメッセージを確認します。

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 2 of 2 completed

    または、現行バージョンと新バージョンの Kafka で、Interbroker プロトコルのバージョンが同じ場合は、以下を確認します。

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 1 of 1 completed

    ローリングアップデートでは以下を行います。

    • 各 Pod が新バージョンの Kafka のブローカーバイナリーを使用していることを確認します。
    • 新バージョンの Kafka の Interbroker プロトコルを使用してメッセージを送信するように、ブローカーを設定します。

      注記

      クライアントは引き続き旧バージョンを使用するため、ブローカーはメッセージを旧バージョンに変換してからクライアントに送信します。この余分な負荷を最小化するには、できるだけ速やかにクライアントを更新します。

  3. クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。

    「クライアントをアップグレードするストラテジー」を参照してください。

    警告

    この手順を完了すると、ダウングレードできません。この時点で更新を元に戻す必要がある場合は、「Kafka ブローカーおよびクライアントアプリケーションのダウングレード」の手順に従います。

    必要に応じて、Kafka Connect および MirrorMaker のバージョンプロパティーを新バージョンの Kafka として設定します。

    1. Kafka Connect では、KafkaConnect.spec.version を更新します。
    2. MirrorMaker では、KafkaMirrorMaker.spec.version を更新します。
  4. 1. で特定された log.message.format.version が、新しいバージョンと同じ場合は、次の手順に進みます。

    それ以外の場合は、Kafka.spec.kafka.configlog.message.format.version を、現在使用している新バージョンの Kafka のデフォルトバージョンに変更します。

    たとえば、2.4.0 へのアップグレードでは以下のようになります。

    apiVersion: v1alpha1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.4.0
        config:
          log.message.format.version: "2.4"
          # ...
  5. Cluster Operator によってクラスターが更新されるまで待ちます。

    これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。

その他のリソース

14.5.4. コンシューマーおよび Kafka Streams アプリケーションの Cooperative Rebalancing へのアップグレード

Kafka コンシューマーおよび Kafka Streams アプリケーションをアップグレードすることで、パーティションの再分散にデフォルトの Eager Rebalance プロトコルではなく Incremental Cooperative Rebalance プロトコルを使用できます。この新しいプロトコルが Kafka 2.4.0 に追加されました。

コンシューマーは、パーティションの割り当てを Cooperative Rebalance で保持し、クラスターの分散が必要な場合にプロセスの最後でのみ割り当てを取り消します。これにより、コンシューマーグループまたは Kafka Streams アプリケーションが使用不可能になる状態が削減されます。

注記

Incremental Cooperative Rebalance プロトコルへのアップグレードは任意です。Eager Rebalance プロトコルは引き続きサポートされます。

手順

Incremental Cooperative Rebalance プロトコルを使用するように Kafka コンシューマーをアップグレードするには以下を行います。

  1. Kafka クライアント .jar ファイルを新バージョンに置き換えます。
  2. コンシューマー設定で、partition.assignment.strategycooperative-sticky を追加します。たとえば、range ストラテジーが設定されている場合は、設定を range, cooperative-sticky に変更します。
  3. グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
  4. コンシューマー設定から前述の partition.assignment.strategy を削除して、グループの各コンシューマーを再設定し、cooperative-sticky ストラテジーのみを残します。
  5. グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。

Incremental Cooperative Rebalance プロトコルを使用するように Kafka Streams アプリケーションをアップグレードするには以下を行います。

  1. Kafka Streams の .jar ファイルを新バージョンに置き換えます。
  2. Kafka Streams の設定で、upgrade.from 設定パラメーターをアップグレード前の Kafka バージョンに設定します (例: 2.3)。
  3. 各ストリームプロセッサー (ノード) を順次再起動します。
  4. upgrade.from 設定パラメーターを Kafka Streams 設定から削除します。
  5. グループ内の各コンシューマーを順次再起動します。

その他のリソース

14.6. Kafka のダウングレード

Kafka バージョンのダウングレードは、Cluster Operator を使用して行います。

Cluster Operator によるダウングレードの実行方法は、以下のバージョン間の違いによって異なります。

  • Interbroker プロトコル
  • ログメッセージの形式
  • ZooKeeper

14.6.1. ダウングレード先のバージョン

Cluster Operator によるダウングレードの操作方法は、log.message.format.version に応じて異なります。

  • ダウングレード先の Kafka バージョンの log.message.format.version が現行バージョンと同じ場合、Cluster Operator はブローカーのローリング再起動を 1 回実行してダウングレードを行います。
  • ダウングレード先の Kafka バージョンの log.message.format.version が異なる場合、ダウングレード後の Kafka バージョンが使用するバージョンに設定された log.message.format.version常に 実行中のクラスターに存在する場合に限り、ダウングレードが可能です。

    通常は、アップグレードの手順が log.message.format.version の変更前に中止された場合にのみ該当します。その場合、ダウングレードには以下が必要です。

    • 2 つのバージョンで Interbroker プロトコルが異なる場合、ブローカーのローリング再起動が 2 回必要です。
    • 両バージョンで同じ場合は、ローリング再起動が 1 回必要です。

14.6.2. Kafka ブローカーおよびクライアントアプリケーションのダウングレード

この手順では、AMQ Streams Kafka クラスターを Kafka の下位 (以前の) バージョンにダウングレードする方法 (2.4.0 から 2.3.0 へのダウングレードなど) を説明します。

重要

以前のバージョンでサポートされない log.message.format.version が新バージョンで使われていた場合 (log.message.format.versionのデフォルト値が使われていた場合など)、ダウングレードは実行 できません。たとえば以下のリソースの場合、log.message.format.version が変更されていないので、Kafka バージョン 2.3.0 にダウングレードできます。

apiVersion: v1alpha1
kind: Kafka
spec:
  # ...
  kafka:
    version: 2.4.0
    config:
      log.message.format.version: "2.3"
      # ...

log.message.format.version"2.4" に設定されているかまたは値がない (このためパラメーターに 2.4.0 ブローカーのデフォルト値 2.4 が採用される) 場合は、ダウングレードは実施できません。

前提条件

Kafka リソースをダウングレードするには、以下を確認します。

  • 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
  • Kafka.spec.kafka.config に、ダウングレード先となる Kafka バージョンでサポートされないオプションが含まれていない。
  • Kafka.spec.kafka.config に、ダウングレード先のバージョンでサポートされる log.message.format.version がある。

手順

  1. 必要に応じてエディターで Kafka クラスター設定を更新します。

    oc edit を使用します。

    oc edit kafka my-cluster
    1. Kafka.spec.kafka.version を変更して、以前のバージョンを指定します。

      たとえば、Kafka 2.4.0 から 2.3.0 へのダウングレードは以下のようになります。

      apiVersion: v1alpha1
      kind: Kafka
      spec:
        # ...
        kafka:
          version: 2.3.0 1
          config:
            log.message.format.version: "2.3" 2
            # ...
      1
      これは以前のバージョンに変更されます。
      2
      これは変更されません。
      注記

      log.message.format.version の値は、浮動小数点数として解釈されないように文字列にする必要があります。

    2. Kafka バージョンのイメージが Cluster Operator の STRIMZI_KAFKA_IMAGES に定義されているイメージとは異なる場合は、Kafka.spec.kafka.image を更新します。

      「Kafka バージョンおよびイメージマッピング」を参照してください。

  2. エディターを保存して終了し、ローリングアップデートの完了を待ちます。

    更新をログで確認するか、または Pod 状態の遷移を監視して確認します。

    oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
    oc get po -w

    Kafka の前バージョンと現行バージョンで Interbroker プロトコルのバージョンが異なる場合、Cluster Operator ログで INFO レベルのメッセージを確認します。

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 2 of 2 completed

    または、Kafka の前バージョンと現行バージョンで Interbroker プロトコルのバージョンが同じ場合は、以下を確認します。

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 1 of 1 completed
  3. すべてのクライアントアプリケーション (コンシューマー) をダウングレードして、以前のバージョンのクライアントバイナリーを使用します。

    これで、Kafka クラスターおよびクライアントは以前の Kafka バージョンを使用するようになります。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.