18.4. Kafka ブローカーおよび ZooKeeper のアップグレード
ホストマシン上の Kafka ブローカーと ZooKeeper をアップグレードして、最新バージョンの Streams for Apache Kafka を使用します。インストールファイルを更新し、すべての Kafka ブローカーを設定して再起動し、新しいブローカー間プロトコルバージョンを使用します。これらの手順の実行後に、新しい inter-broker プロトコルバージョンを使用して Kafka ブローカー間でデータが送信されます。
Kafka 3.0.0 以降、メッセージ形式のバージョン値は inter.broker.protocol.version
と一致することが想定されており、これを設定する必要はありません。値は、使用される Kafka バージョンを反映します。
前提条件
-
Red Hat Enterprise Linux に
kafka
ユーザーとしてログインしている。 別のホストで使用している Kafka およびその他の Kafka コンポーネントをインストールしている。
詳細は、「インストール環境」 を参照してください。
- インストールファイル をダウンロードしている。
手順
Streams for Apache Kafka クラスター内の各 Kafka ブローカーについて、一度に 1 つずつ以下の手順を実行します。
Streams for Apache Kafka ソフトウェアダウンロードページ から Streams for Apache Kafka のアーカイブをダウンロードします。
注記プロンプトが表示されたら、Red Hat アカウントにログインします。
コマンドラインで一時ディレクトリーを作成し、
amq-streams-<version>-bin.zip
ファイルの内容を展開します。mkdir /tmp/kafka unzip amq-streams-<version>-bin.zip -d /tmp/kafka
実行中の場合は、ホストで実行している ZooKeeper および Kafka ブローカーを停止します。
/opt/kafka/bin/zookeeper-server-stop.sh /opt/kafka/bin/kafka-server-stop.sh jcmd | grep zookeeper jcmd | grep kafka
マルチノードクラスターで Kafka を実行している場合は、「Kafka ブローカーの正常なローリング再起動の実行」 を参照してください。
既存のインストールから
libs
およびbin
ディレクトリーを削除します。rm -rf /opt/kafka/libs /opt/kafka/bin
一時ディレクトリーから
libs
およびbin
ディレクトリーをコピーします。cp -r /tmp/kafka/kafka_<version>/libs /opt/kafka/ cp -r /tmp/kafka/kafka_<version>/bin /opt/kafka/
-
必要に応じて、
config
ディレクトリー内の設定ファイルを更新して、新しいバージョンの変更を反映します。 一時ディレクトリーを削除します。
rm -r /tmp/kafka
/opt/kafka/config/server.properties
プロパティーファイルを編集します。inter.broker.protocol.version
およびlog.message.format.version
プロパティーを 現在の バージョンに設定します。たとえば、Kafka バージョン 3.6.0 から 3.7.0 にアップグレードする場合、現在のバージョンは 3.6 です。
inter.broker.protocol.version=3.6 log.message.format.version=3.6
アップグレード元の Kafka バージョンに応じた正しいバージョン (
3.5
、3.6
など) を使用してください。inter.broker.protocol.version
を現在の設定のままにしておくと、ブローカーはアップグレード中に相互に通信し続けることができます。プロパティーが設定されていない場合は、現在のバージョンでプロパティーを追加します。
Kafka 3.0.0 以降からアップグレードする場合は、
inter.broker.protocol.version
を設定するだけで済みます。更新された ZooKeeper および Kafka ブローカーを再起動します。
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Kafka ブローカーと ZooKeeper は、最新の Kafka バージョンのバイナリーの使用を開始します。
マルチノードクラスターでブローカーを再起動する方法は、「Kafka ブローカーの正常なローリング再起動の実行」 を参照してください。
再起動した Kafka ブローカーが、フォローしているパーティションレプリカに追いついたことを確認します。
kafka-topics.sh
ツールを使用して、ブローカーに含まれるすべてのレプリカが同期していることを確認します。手順は、トピックの一覧表示および説明 を参照してください。次の手順では、新しい inter-broker プロトコルバージョンを使用するように Kafka ブローカーを更新します。
各ブローカーを一度に 1 つずつ更新します。
警告次の手順を完了すると、Streams for Apache Kafka をダウングレードできなくなります。
/opt/kafka/config/server.properties
ファイルでinter.broker.protocol.version
プロパティーを3.7
に設定します。inter.broker.protocol.version=3.7
コマンドラインで、変更した Kafka ブローカーを停止します。
/opt/kafka/bin/kafka-server-stop.sh
Kafka が実行されていないことを確認します。
jcmd | grep kafka
変更した Kafka ブローカーを再起動します。
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Kafka が稼働していることを確認します。
jcmd | grep kafka
Kafka 3.0.0 より前のバージョンからアップグレードする場合は、
/opt/kafka/config/server.properties
ファイルでlog.message.format.version
プロパティーを3.7
に設定します。log.message.format.version=3.7
コマンドラインで、変更した Kafka ブローカーを停止します。
/opt/kafka/bin/kafka-server-stop.sh
Kafka が実行されていないことを確認します。
jcmd | grep kafka
変更した Kafka ブローカーを再起動します。
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Kafka が稼働していることを確認します。
jcmd | grep kafka
再起動した Kafka ブローカーが、フォローしているパーティションレプリカに追いついたことを確認します。
kafka-topics.sh
ツールを使用して、ブローカーに含まれるすべてのレプリカが同期していることを確認します。手順は、トピックの一覧表示および説明 を参照してください。-
アップグレードで使用された場合は、従来の
log.message.format.version
設定をserver.properties
ファイルから削除します。
クライアントアプリケーションのアップグレード
アップグレードプロセスの一環として、すべての Kafka クライアントアプリケーションが新しいバージョンのクライアントバイナリーを使用するように更新されていることを確認し、Kafka アップグレードとの互換性を確認します。必要に応じて、クライアントアプリケーションの管理を担当するチームと連携してください。
クライアントが最新のメッセージ形式を使用していることを確認するには、kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec
メトリクスを使用します。最新のメッセージ形式が使用されている場合、このメトリクスに 0
が表示されます。