25.7. ZooKeeper 使用時の Kafka のアップグレード
ZooKeeper ベースの Kafka クラスターを使用している場合、アップグレードには Kafka バージョンとブローカー間プロトコルバージョンの更新が必要です。
メタデータ管理に ZooKeeper を使用する状態から KRaft モードで動作する状態へと Kafka クラスターを切り替える場合は、アップグレードとは別の手順を実行する必要があります。KRaft ベースのクラスターへの移行については、8章KRaft モードへの移行 を参照してください。
25.7.1. Kafka バージョンの更新
クラスター管理に ZooKeeper を使用する場合に Kafka をアップグレードするには、Kafka
リソースの設定内の Kafka バージョン (Kafka.spec.kafka.version
) とそのブローカー間プロトコルバージョン (inter.broker.protocol.version
) を更新する必要があります。Kafka の各バージョンには、互換性のあるバージョンの Inter-broker プロトコルがあります。ブローカー間プロトコルは、ブローカー間の通信に使用されます。上記の表が示すように、プロトコルのマイナーバージョンは、通常 Kafka のマイナーバージョンと一致するように番号が増加されます。Inter-broker プロトコルのバージョンは、Kafka
リソースでクラスター全体に設定されます。これを変更するには、Kafka.spec.kafka.config
の inter.broker.protocol.version
プロパティーを編集します。
以下の表は、Kafka バージョンの違いを示しています。
Streams for Apache Kafka バージョン | Kafka バージョン | Inter-broker プロトコルバージョン | ログメッセージ形式バージョン | ZooKeeper バージョン |
---|---|---|---|---|
2.7 | 3.7.0 | 3.7 | 3.7 | 3.8.3 |
2.6 | 3.6.0 | 3.6 | 3.6 | 3.8.3 |
- Kafka 3.7.0 は実稼働環境での使用がサポートされています。
- Kafka 3.6.0 は、Streams for Apache Kafka 2.7 にアップグレードする目的でのみサポートされます。
ログメッセージ形式バージョン
プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリース間で変更になる可能性があるため、メッセージにはエンコードに使用されたメッセージ形式のバージョンが指定されます。
特定のメッセージ形式のバージョンを設定するために使用されるプロパティーは以下のとおりです。
-
トピック用の
message.format.version
プロパティー -
Kafka ブローカーの
log.message.format.version
プロパティー
Kafka 3.0.0 以降、メッセージ形式のバージョンの値は inter.broker.protocol.version
と一致すると見なされ、設定する必要はありません。値は、使用される Kafka バージョンを反映します。
Kafka 3.0.0 以降にアップグレードする場合は、inter.broker.protocol.version
を更新する際にこの設定を削除できます。それ以外の場合は、アップグレード先の Kafka バージョンに基づいてメッセージ形式のバージョンを設定できます。
トピックの message.format.version
のデフォルト値は、Kafka ブローカーに設定される log.message.format.version
によって定義されます。トピックの message.format.version
は、トピック設定を編集すると手動で設定できます。
Kafka バージョン変更によるローリング更新
Cluster Operator は、Kafka バージョンが更新されると、Kafka ブローカーへのローリング更新を開始します。さらなるローリング更新は、inter.broker.protocol.version
および log.message.format.version
の設定に応じて異なります。
Kafka.spec.kafka.config に以下が含まれている場合 | Cluster Operator によって開始されるもの |
---|---|
|
単一のローリング更新。更新後、 |
| 2 つのローリング更新 |
| 2 つのローリング更新 |
Kafka 3.0.0 以降、inter.broker.protocol.version
が 3.0
以上に設定されていると、log.message.format.version
オプションは無視されるため、設定する必要はありません。ブローカーの log.message.format.version
プロパティーおよびトピックの message.format.version
プロパティーは、非推奨となり、Kafka の今後のリリースで削除されます。
Cluster Operator は、Kafka のアップグレードの一環として、ZooKeeper のローリング更新を開始します。
- ZooKeeper バージョンが変更されなくても、単一のローリング更新が発生します。
- 新しいバージョンの Kafka に新しいバージョンの ZooKeeper が必要な場合、追加のローリング更新が発生します。
25.7.2. 古いメッセージ形式を使用したクライアントのアップグレード
Kafka 3.0 より前では、log.message.format.version
プロパティー (またはトピックレベルの message.format.version
プロパティー) を使用して、ブローカーの特定のメッセージ形式を設定できました。これにより、ブローカーは、古いメッセージ形式を使用していた古い Kafka クライアントに対応できるようになりました。Kafka は本質的に、このプロパティーを明示的に設定しなくても古いクライアントをサポートしますが、ブローカーは古いクライアントからのメッセージを変換する必要があり、これにはかなりのパフォーマンスコストが伴います。
Apache Kafka Java クライアントは、バージョン 0.11 以降、最新のメッセージ形式バージョンをサポートしています。すべてのクライアントが最新のメッセージバージョンを使用している場合は、ブローカーをアップグレードするときに、log.message.format.version
または message.format.version
のオーバーライドを削除できます。
ただし、古いメッセージ形式バージョンを使用しているクライアントがまだある場合は、まずクライアントをアップグレードすることを推奨します。コンシューマーから始めて、ブローカーのアップグレード時に log.message.format.version
または message.format.version
のオーバーライドを削除する前に、プロデューサーをアップグレードします。これにより、すべてのクライアントが最新のメッセージ形式バージョンをサポートできるようになり、アップグレードプロセスがスムーズに進むようになります。
次のメトリックを使用して、Kafka クライアントの名前とバージョンを追跡できます。
-
kafka.server:type=socket-server-metrics,clientSoftwareName=<name>,clientSoftwareVersion=<version>,listener=<listener>,networkProcessor=<processor>
次の Kafka ブローカーメトリックは、メッセージのダウンコンバージョンのパフォーマンスを監視するのに役立ちます。
-
kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch}
はメッセージ変換の実行にかかる時間に関するメトリックを提供します。 -
kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=(-.\w+)
は一定期間に変換されたメッセージの数に関するメトリックを提供します。
25.7.3. ZooKeeper ベースの Kafka クラスターとクライアントアプリケーションのアップグレード
ZooKeeper ベースの Streams for Apache Kafka クラスターを、サポートされている新しい Kafka バージョンとブローカー間プロトコルバージョンにアップグレードします。
クライアントをアップグレードするストラテジー を選択する必要もあります。Kafka クライアントは、この手順の 6 でアップグレードされます。
前提条件
- Cluster Operator が稼働中である。
-
Streams for Apache Kafka クラスターをアップグレードする前に、
Kafka
リソースのプロパティーに、新しい Kafka バージョンでサポートされていない設定オプションが 含まれていない ことを確認した。
手順
Kafka クラスター設定を更新します。
oc edit kafka <kafka_configuration_file>
設定されている場合は、
inter.broker.protocol.version
およびlog.message.format.version
プロパティーが 現在の バージョンに設定されていることを確認してください。たとえば、Kafka バージョン 3.6.0 から 3.7.0 にアップグレードする場合、現在のバージョンは 3.6 です。
kind: Kafka spec: # ... kafka: version: 3.6.0 config: log.message.format.version: "3.6" inter.broker.protocol.version: "3.6" # ...
log.message.format.version
およびinter.broker.protocol.version
が設定されていない場合、Streams for Apache Kafka では、次のステップでの Kafka バージョンの更新後、これらのバージョンを現在のデフォルトに自動的に更新します。注記log.message.format.version
およびinter.broker.protocol.version
の値は、浮動小数点数として解釈されないように文字列である必要があります。Kafka.spec.kafka.version
を変更して、新しい Kafka バージョンを指定します。現在の Kafka バージョンのデフォルトでlog.message.format.version
およびinter.broker.protocol.version
のままにします。注記kafka.version
を変更すると、クラスターのすべてのブローカーがアップグレードされ、新しいブローカーバイナリーの使用が開始されます。このプロセスでは、一部のブローカーは古いバイナリーを使用し、他のブローカーはすでに新しいバイナリーにアップグレードされています。inter.broker.protocol.version
を現在の設定のままにしておくと、ブローカーはアップグレード中に相互に通信し続けることができます。たとえば、Kafka 3.6.0 から 3.7.0 にアップグレードする場合:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 3.7.0 1 config: log.message.format.version: "3.6" 2 inter.broker.protocol.version: "3.6" 3 # ...
警告新しい Kafka バージョンの
inter.broker.protocol.version
が変更された場合は、Kafka をダウングレードできません。ブローカー間プロトコルのバージョンは、__consumer_offsets
に書き込まれたメッセージなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。ダウングレードされたクラスターはメッセージを理解しません。Kafka クラスターのイメージが
Kafka.spec.kafka.image
のKafka
カスタムリソースで定義されている場合、image
を更新して、新しい Kafka バージョンでコンテナーイメージを示すようにします。Kafka バージョンおよびイメージマッピング を参照してください。
エディターを保存して終了し、ローリング更新の完了を待ちます。
Pod の状態の遷移を監視して、ローリング更新の進捗を確認します。
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
ローリング更新により、各 Pod が新バージョンの Kafka のブローカーバイナリーを使用するようになります。
クライアントのアップグレードに選択したストラテジー に応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。
必要に応じて、Kafka Connect および MirrorMaker の
version
プロパティーを新バージョンの Kafka として設定します。-
Kafka Connect では、
KafkaConnect.spec.version
を更新します。 -
MirrorMaker では、
KafkaMirrorMaker.spec.version
を更新します。 MirrorMaker 2 の場合は、
KafkaMirrorMaker2.spec.version
を更新します。注記手動でビルドしたカスタムイメージを使用している場合は、最新の Streams for Apache Kafka ベースイメージを使用してそのイメージを最新の状態にするために、イメージを再ビルドする必要があります。たとえば、Kafka Connect のベースイメージからコンテナーイメージを作成した 場合は、最新のベースイメージとビルド設定を参照するように Dockerfile を更新します。
-
Kafka Connect では、
- アップグレードされたクライアントアプリケーションが新しい Kafka ブローカーで正しく動作することを確認します。
設定されている場合、新しい
inter.broker.protocol.version
バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 9 に進みます。たとえば、Kafka 3.7.0 にアップグレードする場合:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 3.7.0 config: log.message.format.version: "3.6" inter.broker.protocol.version: "3.7" # ...
- Cluster Operator によってクラスターが更新されるまで待ちます。
設定されている場合、新しい
log.message.format.version
バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 10 に進みます。たとえば、Kafka 3.7.0 にアップグレードする場合:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 3.7.0 config: log.message.format.version: "3.7" inter.broker.protocol.version: "3.7" # ...
重要Kafka 3.0.0 以降、
inter.broker.protocol.version
が3.0
以上に設定されていると、log.message.format.version
オプションは無視されるため、設定する必要はありません。Cluster Operator によってクラスターが更新されるまで待ちます。