第14章 AMQ Streams および Kafka のアップグレード
AMQ Streams は、クラスターのダウンタイムを発生せずにアップグレードできます。AMQ Streams の各バージョンは、Apache Kafka の 1 つ以上のバージョンをサポートします。使用する AMQ Streams バージョンでサポートされれば、より高いバージョンの Kafka にアップグレードできます。サポートされる下位バージョンの Kafka にダウングレードできる場合もあります。
より新しいバージョンの AMQ Streams はより新しいバージョンの Kafka をサポートしますが、AMQ Streams をアップグレードしてから、サポートされる上位バージョンの Kafka にアップグレードする必要があります。
14.1. アップグレードの前提条件
アップグレードプロセスを開始する前に、以下を確認します。
- AMQ Streams がインストールされている必要があります。手順は2章AMQ Streams の使用を参照してください。
- 「AMQ STREAMS 1.4 ON OPENSHIFT リリースノート」に記載されているアップグレードの変更について理解している必要があります。
14.2. アップグレードプロセス
AMQ Streams のアップグレードは 2 段階のプロセスで行います。ダウンタイムなしでブローカーとクライアントをアップグレードするには、以下の順序でアップグレード手順を 必ず 完了してください。
Cluster Operator を最新の AMQ Streams バージョンに更新します。
すべての Kafka ブローカーとクライアントアプリケーションを、最新の Kafka バージョンにアップグレードします。
14.3. Kafka バージョン
Kafka のログメッセージ形式バージョンおよびブローカー間のプロトコルバージョンは、メッセージに追加されるログ形式バージョンとクラスターで使用されるプロトコルのバージョンを指定します。そのためアップグレードプロセスでは、既存の Kafka ブローカーの設定変更およびクライアントアプリケーション (コンシューマーおよびプロデューサー) のコード変更により、必ず正しいバージョンを使用されるようにする必要になります。
以下の表は、Kafka バージョンの違いを示しています。
Kafka のバージョン | Interbroker プロトコルのバージョン | ログメッセージ形式のバージョン | ZooKeeper バージョン |
---|---|---|---|
2.3.0 | 2.3 | 2.3 | 3.4.14 |
2.4.0 | 2.4 | 2.4 | 3.5.7 |
メッセージ形式のバージョン
プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリースによって変わるため、メッセージにはエンコードに使用された形式のバージョンが含まれます。ブローカーがメッセージをログに追加する前に、メッセージを新しい形式バージョンから特定の旧形式バージョンに変換するように、Kafka ブローカーを設定できます。
Kafka には、メッセージ形式のバージョンを設定する 2 通りの方法があります。
-
message.format.version
プロパティーはトピックに設定されます。 -
log.message.format.version
プロパティーは Kafka ブローカーに設定されます。
トピックの message.format.version
のデフォルト値は、Kafka ブローカーに設定される log.message.format.version
によって定義されます。トピックの message.format.version
は、トピック設定を編集すると手動で設定できます。
本セクションのアップグレード作業では、メッセージ形式のバージョンが log.message.format.version
によって定義されることを前提としています。
14.4. Cluster Operator のアップグレード
このセクションでは、AMQ Streams 1.4 を使用するように Cluster Operator デプロイメントをアップグレードする手順について説明します。
Cluster Operator によって管理される Kafka クラスターの可用性は、アップグレード操作の影響を受けません。
特定バージョンの AMQ Streams へのアップグレード方法については、そのバージョンをサポートするドキュメントを参照してください。
14.4.1. Cluster Operator の後続バージョンへのアップグレード
この手順では、Cluster Operator デプロイメントを後続バージョンにアップグレードする方法を説明します。
前提条件
- 既存の Cluster Operator デプロイメントを利用できる必要があります。
- すでに新規バージョンのインストールファイルをダウンロードしてある必要があります。
手順
既存の Cluster Operator リソースのバックアップを作成します。
oc get all -l app=strimzi -o yaml > strimzi-backup.yaml
Cluster Operator を更新します。
Cluster Operator が稼働している namespace に従い、インストールファイルを編集します。
Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
既存の Cluster Operator
Deployment
で 1 つ以上の環境変数を編集した場合、install/cluster-operator/050-Deployment-cluster-operator.yaml
ファイルを編集し、Cluster Operator の新規バージョンに加えた変更を反映させます。設定を更新したら、残りのインストールリソースとともにデプロイします。
oc apply -f install/cluster-operator
ローリングアップデートが完了するのを待ちます。
Kafka Pod のイメージを取得して、アップグレードが正常に完了したことを確認します。
oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
イメージタグには、新しい AMQ Streams バージョンと Kafka バージョンが順に示されます。例:
<New AMQ Streams version>-kafka-<Current Kafka version>
既存のリソースを更新して、非推奨になったカスタムリソースプロパティーを処理します。
これで Cluster Operator が更新されましたが、その管理下にあるクラスターで実行している Kafka のバージョンは変わりません。
次のステップ
Cluster Operator のアップグレードの次に、Kafka アップグレードを実行できます。
14.5. Kafka のアップグレード
Cluster Operator をアップグレードしたら、サポートされる上位の Kafka バージョンにブローカーをアップグレードできます。
Kafka のアップグレードは、Cluster Operator を使用して行います。Cluster Operator によるアップグレードの実行方法は、以下のバージョン間の違いによって異なります。
- Interbroker プロトコル
- ログメッセージの形式
- ZooKeeper
現在の Kafka バージョンとアップグレードする Kafka バージョンが同じ場合 (パッチレベルでのアップグレードではよくあります)、Kafka ブローカーのローリングアップデートを 1 回実行して Cluster Operator によるアップグレードを行います。
これらのバージョンが 1 つ以上異なる場合、Kafka ブローカーのローリングアップデートを 2、3 回実行して Cluster Operator でアップグレードを実行する必要があります。
その他のリソース
14.5.1. Kafka バージョンおよびイメージマッピング
Kafka のアップグレード時に、STRIMZI_KAFKA_IMAGES
および Kafka.spec.kafka.version
プロパティーの設定について考慮してください。
-
それぞれの
Kafka
リソースはKafka.spec.kafka.version
で設定できます。 Cluster Operator の
STRIMZI_KAFKA_IMAGES
環境変数により、Kafka のバージョンと、指定のKafka
リソースでそのバージョンが要求されるときに使用されるイメージをマッピングできます。-
Kafka.spec.kafka.image
を設定しないと、そのバージョンのデフォルトのイメージが使用されます。 -
Kafka.spec.kafka.image
を設定すると、デフォルトのイメージがオーバーライドされます。
-
Cluster Operator は、Kafka ブローカーの想定されるバージョンが実際にイメージに含まれているかどうかを検証できません。所定のイメージが所定の Kafka バージョンに対応することを必ず確認してください。
14.5.2. クライアントをアップグレードするストラテジー
クライアントアプリケーション (Kafka Connect コネクターを含む) をアップグレードする最善の方法は、特定の状況によって異なります。
消費するアプリケーションは、そのアプリケーションが理解するメッセージ形式のメッセージを受信する必要があります。その状態であることを、以下のいずれかの方法で確認できます。
- プロデューサーをアップグレードする 前に、トピックのすべてのコンシューマーをアップグレードする。
- ブローカーでメッセージをダウンコンバートする。
ブローカーのダウンコンバートを使用すると、ブローカーに余分な負荷が加わるので、すべてのトピックで長期にわたりダウンコンバートに頼るのは最適な方法ではありません。ブローカーの実行を最適化するには、ブローカーがメッセージを一切ダウンコンバートしないようにしてください。
ブローカーのダウンコンバートは 2 通りの方法で設定できます。
-
トピックレベルの
message.format.version
では単一のとピックが設定されます。 -
ブローカーレベルの
log.message.format.version
は、トピックレベルのmessage.format.version
が設定されてないトピックのデフォルトです。
新バージョンの形式でトピックにパブリッシュされるメッセージは、コンシューマーによって認識されます。これは、メッセージがコンシューマーに送信されるときでなく、ブローカーがプロデューサーからメッセージを受信するときに、ブローカーがダウンコンバートを実行するからです。
クライアントのアップグレードに使用できるストラテジーは複数あります。
- コンシューマーを最初にアップグレード
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
ブローカーレベルの
log.message.format.version
を新バージョンに変更します。 プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーは分かりやすく、ブローカーのダウンコンバートの発生をすべて防ぎます。ただし、所属組織内のすべてのコンシューマーを整然とアップグレードできることが前提になります。また、コンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。さらにリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加され、以前のコンシューマーバージョンに戻せなくなる場合があります。
- トピック単位でコンシューマーを最初にアップグレード
トピックごとに以下を実行します。
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
トピックレベルの
message.format.version
を新バージョンに変更します。 プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーではブローカーのダウンコンバートがすべて回避され、トピックごとにアップグレードできます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。ここでもリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加される可能性があります。
- トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり
トピックごとに以下を実行します。
-
トピックレベルの
message.format.version
を、旧バージョンに変更します (または、デフォルトがブローカーレベルのlog.message.format.version
のトピックを利用します)。 - コンシューマーおよびプロデューサーとして機能するアプリケーションをすべてアップグレードします。
- アップグレードしたアプリケーションが正しく機能することを確認します。
トピックレベルの
message.format.version
を新バージョンに変更します。このストラテジーにはブローカーのダウンコンバートが必要ですが、ダウンコンバートは一度に 1 つのトピック (またはトピックの小さなグループ) のみに必要になるので、ブローカーへの負荷は最小限に抑えられます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションにも通用します。この方法により、新しいメッセージ形式バージョンを使用する前に、アップグレードされたプロデューサーとコンシューマーが正しく機能することが保証されます。
この方法の主な欠点は、多くのトピックやアプリケーションが含まれるクラスターでの管理が複雑になる場合があることです。
-
トピックレベルの
クライアントアプリケーションをアップグレードするストラテジーは他にもあります。
複数のストラテジーを適用することもできます。たとえば、最初のいくつかのアプリケーションとトピックに、「トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり」のストラテジーを適用します。これが問題なく適用されたら、より効率的な別のストラテジーの使用を検討できます。
14.5.3. Kafka ブローカーおよびクライアントアプリケーションのアップグレード
この手順では、AMQ Streams Kafka クラスターを Kafka の上位バージョンにアップグレードする方法を説明します。
前提条件
Kafka
リソースをアップグレードするには、以下を確認します。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.config
に、アップグレード先となる Kafka バージョンでサポートされないオプションが含まれていない。 現行の Kafka バージョンの
log.message.format.version
を新しいバージョンに更新する必要があるかどうか。Kafka バージョン表を参照してください。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit kafka my-cluster
現行のバージョンの Kafka の
log.message.format.version
が新しい Kafka バージョンでも同じ場合は、次の手順に進みます。それ以外の場合、
Kafka.spec.kafka.config
のlog.message.format.version
が 現行 バージョンのデフォルトに設定されていることを確認してください。たとえば、Kafka 2.3.0 からのアップグレードでは以下のようになります。
kind: Kafka spec: # ... kafka: version: 2.3.0 config: log.message.format.version: "2.3" # ...
log.message.format.version
が設定されていない場合は、現行バージョンに設定します。注記log.message.format.version
の値は、浮動小数点数として解釈されないように文字列である必要があります。Kafka.spec.kafka.version
を変更し、新バージョンを指定します (log.message.format.version
は現行バージョンのままにします)。たとえば、Kafka 2.3.0 から 2.4.0 へのアップグレードは以下のようになります。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.4.0 1 config: log.message.format.version: "2.3" 2 # ...
Kafka バージョンのイメージが Cluster Operator の
STRIMZI_KAFKA_IMAGES
に定義されているイメージとは異なる場合は、Kafka.spec.kafka.image
を更新します。「Kafka バージョンおよびイメージマッピング」を参照してください。
エディターを保存して終了し、ローリングアップデートの完了を待ちます。
注記新バージョンの Kafka に新しい ZooKeeper バージョンがある場合、追加のローリングアップデートが発生します。
更新をログで確認するか、または Pod 状態の遷移を監視して確認します。
oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version upgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
oc get po -w
現行バージョンと新バージョンの Kafka で、Interbroker プロトコルのバージョンが異なる場合は、Cluster Operator ログで
INFO
レベルのメッセージを確認します。Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 2 of 2 completed
または、現行バージョンと新バージョンの Kafka で、Interbroker プロトコルのバージョンが同じ場合は、以下を確認します。
Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 1 of 1 completed
ローリングアップデートでは以下を行います。
- 各 Pod が新バージョンの Kafka のブローカーバイナリーを使用していることを確認します。
新バージョンの Kafka の Interbroker プロトコルを使用してメッセージを送信するように、ブローカーを設定します。
注記クライアントは引き続き旧バージョンを使用するため、ブローカーはメッセージを旧バージョンに変換してからクライアントに送信します。この余分な負荷を最小化するには、できるだけ速やかにクライアントを更新します。
クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。
「クライアントをアップグレードするストラテジー」を参照してください。
警告この手順を完了すると、ダウングレードできません。この時点で更新を元に戻す必要がある場合は、「Kafka ブローカーおよびクライアントアプリケーションのダウングレード」の手順に従います。
必要に応じて、Kafka Connect および MirrorMaker のバージョンプロパティーを新バージョンの Kafka として設定します。
-
Kafka Connect では、
KafkaConnect.spec.version
を更新します。 -
MirrorMaker では、
KafkaMirrorMaker.spec.version
を更新します。
-
Kafka Connect では、
1. で特定された
log.message.format.version
が、新しいバージョンと同じ場合は、次の手順に進みます。それ以外の場合は、
Kafka.spec.kafka.config
のlog.message.format.version
を、現在使用している新バージョンの Kafka のデフォルトバージョンに変更します。たとえば、2.4.0 へのアップグレードでは以下のようになります。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.4.0 config: log.message.format.version: "2.4" # ...
Cluster Operator によってクラスターが更新されるまで待ちます。
これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
その他のリソース
- AMQ Streams Kafka クラスターをあるバージョンから下位バージョンにダウングレードする手順は、「Kafka ブローカーおよびクライアントアプリケーションのダウングレード」を参照してください。
14.5.4. コンシューマーおよび Kafka Streams アプリケーションの Cooperative Rebalancing へのアップグレード
Kafka コンシューマーおよび Kafka Streams アプリケーションをアップグレードすることで、パーティションの再分散にデフォルトの Eager Rebalance プロトコルではなく Incremental Cooperative Rebalance プロトコルを使用できます。この新しいプロトコルが Kafka 2.4.0 に追加されました。
コンシューマーは、パーティションの割り当てを Cooperative Rebalance で保持し、クラスターの分散が必要な場合にプロセスの最後でのみ割り当てを取り消します。これにより、コンシューマーグループまたは Kafka Streams アプリケーションが使用不可能になる状態が削減されます。
Incremental Cooperative Rebalance プロトコルへのアップグレードは任意です。Eager Rebalance プロトコルは引き続きサポートされます。
前提条件
- Kafka 2.4.0 に Kafka ブローカーおよびクライアントアプリケーションをアップグレード済みであることが必要です。
手順
Incremental Cooperative Rebalance プロトコルを使用するように Kafka コンシューマーをアップグレードするには以下を行います。
-
Kafka クライアント
.jar
ファイルを新バージョンに置き換えます。 -
コンシューマー設定で、
partition.assignment.strategy
にcooperative-sticky
を追加します。たとえば、range
ストラテジーが設定されている場合は、設定をrange, cooperative-sticky
に変更します。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
-
コンシューマー設定から前述の
partition.assignment.strategy
を削除して、グループの各コンシューマーを再設定し、cooperative-sticky
ストラテジーのみを残します。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
Incremental Cooperative Rebalance プロトコルを使用するように Kafka Streams アプリケーションをアップグレードするには以下を行います。
-
Kafka Streams の
.jar
ファイルを新バージョンに置き換えます。 -
Kafka Streams の設定で、
upgrade.from
設定パラメーターをアップグレード前の Kafka バージョンに設定します (例: 2.3)。 - 各ストリームプロセッサー (ノード) を順次再起動します。
-
upgrade.from
設定パラメーターを Kafka Streams 設定から削除します。 - グループ内の各コンシューマーを順次再起動します。
その他のリソース
- Apache Kafka ドキュメントの「Notable changes in 2.4.0」。
14.6. Kafka のダウングレード
Kafka バージョンのダウングレードは、Cluster Operator を使用して行います。
Cluster Operator によるダウングレードの実行方法は、以下のバージョン間の違いによって異なります。
- Interbroker プロトコル
- ログメッセージの形式
- ZooKeeper
14.6.1. ダウングレード先のバージョン
Cluster Operator によるダウングレードの操作方法は、log.message.format.version
に応じて異なります。
-
ダウングレード先の Kafka バージョンの
log.message.format.version
が現行バージョンと同じ場合、Cluster Operator はブローカーのローリング再起動を 1 回実行してダウングレードを行います。 ダウングレード先の Kafka バージョンの
log.message.format.version
が異なる場合、ダウングレード後の Kafka バージョンが使用するバージョンに設定されたlog.message.format.version
が 常に 実行中のクラスターに存在する場合に限り、ダウングレードが可能です。通常は、アップグレードの手順が
log.message.format.version
の変更前に中止された場合にのみ該当します。その場合、ダウングレードには以下が必要です。- 2 つのバージョンで Interbroker プロトコルが異なる場合、ブローカーのローリング再起動が 2 回必要です。
- 両バージョンで同じ場合は、ローリング再起動が 1 回必要です。
14.6.2. Kafka ブローカーおよびクライアントアプリケーションのダウングレード
この手順では、AMQ Streams Kafka クラスターを Kafka の下位 (以前の) バージョンにダウングレードする方法 (2.4.0 から 2.3.0 へのダウングレードなど) を説明します。
以前のバージョンでサポートされない log.message.format.version
が新バージョンで使われていた場合 (log.message.format.version
のデフォルト値が使われていた場合など)、ダウングレードは実行 できません。たとえば以下のリソースの場合、log.message.format.version
が変更されていないので、Kafka バージョン 2.3.0 にダウングレードできます。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.4.0 config: log.message.format.version: "2.3" # ...
log.message.format.version
が "2.4"
に設定されているかまたは値がない (このためパラメーターに 2.4.0 ブローカーのデフォルト値 2.4 が採用される) 場合は、ダウングレードは実施できません。
前提条件
Kafka
リソースをダウングレードするには、以下を確認します。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.config
に、ダウングレード先となる Kafka バージョンでサポートされないオプションが含まれていない。 -
Kafka.spec.kafka.config
に、ダウングレード先のバージョンでサポートされるlog.message.format.version
がある。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit
を使用します。oc edit kafka my-cluster
Kafka.spec.kafka.version
を変更して、以前のバージョンを指定します。たとえば、Kafka 2.4.0 から 2.3.0 へのダウングレードは以下のようになります。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.3.0 1 config: log.message.format.version: "2.3" 2 # ...
注記log.message.format.version
の値は、浮動小数点数として解釈されないように文字列にする必要があります。Kafka バージョンのイメージが Cluster Operator の
STRIMZI_KAFKA_IMAGES
に定義されているイメージとは異なる場合は、Kafka.spec.kafka.image
を更新します。「Kafka バージョンおよびイメージマッピング」を参照してください。
エディターを保存して終了し、ローリングアップデートの完了を待ちます。
更新をログで確認するか、または Pod 状態の遷移を監視して確認します。
oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
oc get po -w
Kafka の前バージョンと現行バージョンで Interbroker プロトコルのバージョンが異なる場合、Cluster Operator ログで
INFO
レベルのメッセージを確認します。Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 2 of 2 completed
または、Kafka の前バージョンと現行バージョンで Interbroker プロトコルのバージョンが同じ場合は、以下を確認します。
Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 1 of 1 completed
すべてのクライアントアプリケーション (コンシューマー) をダウングレードして、以前のバージョンのクライアントバイナリーを使用します。
これで、Kafka クラスターおよびクライアントは以前の Kafka バージョンを使用するようになります。