32.2. ZooKeeper 使用時の Kafka のダウングレード
ZooKeeper モードで Kafka を使用している場合、ダウングレードプロセスには、Kafka バージョンと、関連する log.message.format.version および inter.broker.protocol.version プロパティーの変更が含まれます。
32.2.1. ダウングレードでの Kafka バージョンの互換性 リンクのコピーリンクがクリップボードにコピーされました!
Kafka のダウングレードは、互換性のある現在およびターゲットの Kafka バージョン と、メッセージがログに記録された状態に依存します。
そのバージョンが、クラスターで これまで使用された inter.broker.protocol.version 設定をサポートしない場合、または新しい log.message.format.version を使用するメッセージログにメッセージが追加された場合は、下位バージョンの Kafka に戻すことはできません。
inter.broker.protocol.version は、__consumer_offsets に書き込まれたメッセージのスキーマなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。クラスターで以前使用された inter.broker.protocol.version が認識されない Kafka バージョンにダウングレードすると、ブローカーが認識できないデータが発生します。
ダウングレードする Kafka のバージョンの関係は次のとおりです。
-
ダウングレードする Kafka バージョンの
log.message.format.versionが現行バージョンと 同じ である場合、Cluster Operator は、ブローカーのローリング再起動を 1 回実行してダウングレードを行います。 別 の
log.message.format.versionの場合、ダウングレード後のバージョンが使用するバージョンに設定されたlog.message.format.versionが 常 に実行中のクラスターに存在する場合に限り、ダウングレードが可能です。通常は、アップグレードの手順がlog.message.format.versionの変更前に中止された場合にのみ該当します。その場合、ダウングレードには以下が必要です。- 2 つのバージョンで interbroker プロトコルが異なる場合、ブローカーのローリング再起動が 2 回必要です。
- 両バージョンで同じ場合は、ローリング再起動が 1 回必要です。
以前のバージョンでサポートされない log.message.format.version が新バージョンで使われていた場合 (log.message.format.version のデフォルト値が使われていた場合など)、ダウングレードは実行 できません。たとえば、log.message.format.version が変更されていないため、このリソースは Kafka バージョン 3.8.0 にダウングレードできます。
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
# ...
kafka:
version: 3.9.0
config:
inter.broker.protocol.version: "3.8"
log.message.format.version: "3.8"
# ...
log.message.format.version が "3.9" に設定されている場合、または値が存在しない場合は、ダウングレードは不可能であり、パラメーターは 3.9.0 ブローカーのデフォルト値である 3.9 を取得します。
Kafka 3.0.0 以降、inter.broker.protocol.version が 3.0 以上に設定されていると、log.message.format.version オプションは無視されるため、設定する必要はありません。
32.2.2. ZooKeeper ベースの Kafka クラスターとクライアントアプリケーションのダウングレード リンクのコピーリンクがクリップボードにコピーされました!
ZooKeeper ベースの Kafka クラスターを以前のバージョンにダウングレードします。ZooKeeper ベースの Kafka クラスターを下位バージョンにダウングレードする場合 (3.9.0 から 3.8.0 に移行する場合など)、Kafka クラスターで使用されるブローカー間プロトコルバージョンが、ダウングレード先の Kafka バージョンでサポートされているバージョンであることを確認します。ダウングレード元の Kafka バージョンのブローカー間プロトコルバージョンは、ダウングレード先のバージョンより高くする必要があります。
ZooKeeper ベースのダウングレードに関連するサポートと制限事項は、Apache Kafka のドキュメントを参照してください。
前提条件
- Cluster Operator が稼働中である。
Kafka クラスターをダウングレードする前に、
Kafkaリソースについて次の点を確認してください。- 重要: Kafka バージョンの互換性。
-
Kafkaカスタムリソースには、ダウングレードされる Kafka バージョンでサポートされていないオプションは含まれません。 Kafka.spec.kafka.configに、ダウングレード先の Kafka バージョンでサポートされるlog.message.format.versionとinter.broker.protocol.versionがある。Kafka 3.0.0 以降、
inter.broker.protocol.versionが3.0以上に設定されていると、log.message.format.versionオプションは無視されるため、設定する必要はありません。
手順
Kafka クラスター設定を更新します。
oc edit kafka <kafka_configuration_file>inter.broker.protocol.versionバージョン (および該当する場合はlog.message.format.version) を、ダウングレードする Kafka バージョンでサポートされているバージョンに変更します。Kafka.spec.kafka.versionは 現在 の Kafka バージョンのままにしておきます。たとえば、Kafka 3.9.0 から 3.8.0 にダウングレードする場合:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: # ... kafka: version: 3.9.01 config: inter.broker.protocol.version: "3.8"2 log.message.format.version: "3.8" # ...注記log.message.format.versionおよびinter.broker.protocol.versionの値は、浮動小数点数として解釈されないように文字列である必要があります。エディターを保存して終了し、ローリング更新の完了を待ちます。
Pod の状態の遷移を監視して、ローリング更新の進捗を確認します。
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'ローリング更新により、各 Pod が指定された Kafka ブローカー間プロトコルバージョンを使用していることが保証されます。
Kafka.spec.kafka.versionを以前のバージョンに変更します。たとえば、Kafka 3.9.0 から 3.8.0 にダウングレードする場合:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: # ... kafka: version: 3.8.01 config: inter.broker.protocol.version: "3.8"2 log.message.format.version: "3.8" # ...Kafka バージョンのイメージが Cluster Operator の
STRIMZI_KAFKA_IMAGESに定義されているイメージとは異なる場合は、Kafka.spec.kafka.imageを更新します。「Kafka バージョンおよびイメージマッピング」を参照してください。
Cluster Operator によってクラスターが更新されるまで待ちます。
すべてのクライアントアプリケーション (コンシューマー) をダウングレードして、以前のバージョンのクライアントバイナリーを使用します。
これで、Kafka クラスターおよびクライアントは以前の Kafka バージョンを使用するようになります。
トピックメタデータの保存に ZooKeeper を使用する 1.7 よりも前のバージョンの Streams for Apache Kafka に戻す場合は、Kafka クラスターから内部トピックストアのトピックを削除します。
oc run kafka-admin -ti --image=registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.3 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete