18.4. Kafka ブローカーおよび ZooKeeper のアップグレード


ホストマシン上の Kafka ブローカーと ZooKeeper をアップグレードして、最新バージョンの Streams for Apache Kafka を使用します。インストールファイルを更新し、すべての Kafka ブローカーを設定して再起動し、新しいブローカー間プロトコルバージョンを使用します。これらの手順の実行後に、新しい inter-broker プロトコルバージョンを使用して Kafka ブローカー間でデータが送信されます。

注記

Kafka 3.0.0 以降、メッセージ形式のバージョン値は inter.broker.protocol.version と一致することが想定されており、これを設定する必要はありません。値は、使用される Kafka バージョンを反映します。

前提条件

  • Red Hat Enterprise Linux に kafka ユーザーとしてログインしている。
  • 別のホストで使用している Kafka およびその他の Kafka コンポーネントをインストールしている。

    詳細は、「インストール環境」 を参照してください。

  • インストールファイル をダウンロードしている。

手順

Streams for Apache Kafka クラスター内の各 Kafka ブローカーについて、一度に 1 つずつ以下の手順を実行します。

  1. Streams for Apache Kafka ソフトウェアダウンロードページ から Streams for Apache Kafka のアーカイブをダウンロードします。

    注記

    プロンプトが表示されたら、Red Hat アカウントにログインします。

  2. コマンドラインで一時ディレクトリーを作成し、amq-streams-<version>-bin.zip ファイルの内容を展開します。

    mkdir /tmp/kafka
    unzip amq-streams-<version>-bin.zip -d /tmp/kafka
  3. 実行中の場合は、ホストで実行している ZooKeeper および Kafka ブローカーを停止します。

    /opt/kafka/bin/zookeeper-server-stop.sh
    /opt/kafka/bin/kafka-server-stop.sh
    jcmd | grep zookeeper
    jcmd | grep kafka

    マルチノードクラスターで Kafka を実行している場合は、「Kafka ブローカーの正常なローリング再起動の実行」 を参照してください。

  4. 既存のインストールから libs および bin ディレクトリーを削除します。

    rm -rf /opt/kafka/libs /opt/kafka/bin
  5. 一時ディレクトリーから libs および bin ディレクトリーをコピーします。

    cp -r /tmp/kafka/kafka_<version>/libs /opt/kafka/
    cp -r /tmp/kafka/kafka_<version>/bin /opt/kafka/
  6. 必要に応じて、config ディレクトリー内の設定ファイルを更新して、新しいバージョンの変更を反映します。
  7. 一時ディレクトリーを削除します。

    rm -r /tmp/kafka
  8. /opt/kafka/config/server.properties プロパティーファイルを編集します。

    inter.broker.protocol.version および log.message.format.version プロパティーを 現在の バージョンに設定します。

    たとえば、Kafka バージョン 3.6.0 から 3.7.0 にアップグレードする場合、現在のバージョンは 3.6 です。

    inter.broker.protocol.version=3.6
    log.message.format.version=3.6

    アップグレード元の Kafka バージョンに応じた正しいバージョン (3.53.6 など) を使用してください。inter.broker.protocol.version を現在の設定のままにしておくと、ブローカーはアップグレード中に相互に通信し続けることができます。

    プロパティーが設定されていない場合は、現在のバージョンでプロパティーを追加します。

    Kafka 3.0.0 以降からアップグレードする場合は、inter.broker.protocol.version を設定するだけで済みます。

  9. 更新された ZooKeeper および Kafka ブローカーを再起動します。

    /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

    Kafka ブローカーと ZooKeeper は、最新の Kafka バージョンのバイナリーの使用を開始します。

    マルチノードクラスターでブローカーを再起動する方法は、「Kafka ブローカーの正常なローリング再起動の実行」 を参照してください。

  10. 再起動した Kafka ブローカーが、フォローしているパーティションレプリカに追いついたことを確認します。

    kafka-topics.sh ツールを使用して、ブローカーに含まれるすべてのレプリカが同期していることを確認します。手順は、トピックの一覧表示および説明 を参照してください。

    次の手順では、新しい inter-broker プロトコルバージョンを使用するように Kafka ブローカーを更新します。

    各ブローカーを一度に 1 つずつ更新します。

    警告

    次の手順を完了すると、Streams for Apache Kafka をダウングレードできなくなります。

  11. /opt/kafka/config/server.properties ファイルで inter.broker.protocol.version プロパティーを 3.7 に設定します。

    inter.broker.protocol.version=3.7
  12. コマンドラインで、変更した Kafka ブローカーを停止します。

    /opt/kafka/bin/kafka-server-stop.sh
  13. Kafka が実行されていないことを確認します。

    jcmd | grep kafka
  14. 変更した Kafka ブローカーを再起動します。

    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  15. Kafka が稼働していることを確認します。

    jcmd | grep kafka
  16. Kafka 3.0.0 より前のバージョンからアップグレードする場合は、/opt/kafka/config/server.properties ファイルで log.message.format.version プロパティーを 3.7 に設定します。

    log.message.format.version=3.7
  17. コマンドラインで、変更した Kafka ブローカーを停止します。

    /opt/kafka/bin/kafka-server-stop.sh
  18. Kafka が実行されていないことを確認します。

    jcmd | grep kafka
  19. 変更した Kafka ブローカーを再起動します。

    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  20. Kafka が稼働していることを確認します。

    jcmd | grep kafka
  21. 再起動した Kafka ブローカーが、フォローしているパーティションレプリカに追いついたことを確認します。

    kafka-topics.sh ツールを使用して、ブローカーに含まれるすべてのレプリカが同期していることを確認します。手順は、トピックの一覧表示および説明 を参照してください。

  22. アップグレードで使用された場合は、従来の log.message.format.version 設定を server.properties ファイルから削除します。

クライアントアプリケーションのアップグレード

アップグレードプロセスの一環として、すべての Kafka クライアントアプリケーションが新しいバージョンのクライアントバイナリーを使用するように更新されていることを確認し、Kafka アップグレードとの互換性を確認します。必要に応じて、クライアントアプリケーションの管理を担当するチームと連携してください。

ヒント

クライアントが最新のメッセージ形式を使用していることを確認するには、kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec メトリクスを使用します。最新のメッセージ形式が使用されている場合、このメトリクスに 0 が表示されます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.