3.4. KRaft モードでの Kafka クラスターの実行
KRaft モードで Kafka を設定し、実行します。Kafka は、シングルノードまたはマルチノードの Kafka クラスターとして実行できます。安定性と可用性を確保するために、少なくとも 3 つのブローカーノードと 3 つのコントローラーノードを実行し、ブローカー間でトピックをレプリケーションします。
Kafka ノードは、ブローカー、コントローラー、またはその両方のロールを実行します。
- ブローカーのロール
- ノードまたはサーバーと呼ばれることもあるブローカーは、メッセージの保存と受け渡しを調整します。
- コントローラーのロール
- コントローラーはクラスターを調整し、ブローカーとパーティションのステータスを追跡するために使用するメタデータを管理します。
クラスターのメタデータは、内部の __cluster_metadata
トピックに保存されます。
ブローカーノードとコントローラーノードを組み合わせて使用できますが、これらの機能を分離したい場合があります。組み合わせたロールを実行するブローカーは、より単純なデプロイメントでより便利になります。
クラスターを識別するには、ID を作成します。この ID は、クラスターに追加するノードのログを作成するときに使用されます。
各ノードの設定で以下を指定します。
- ノード ID
- ブローカーのロール
-
コントローラーとして機能するノード (または
voters
) のリスト
各コントローラーのノード ID と接続の詳細 (ホスト名とポート) を使用して、voters
として設定されたコントローラーのリストを指定します。
設定プロパティーファイルを使用して、ロールの設定を含むブローカー設定を適用します。ブローカーの設定は、ロールによって異なります。KRaft は、3 つのブローカー設定プロパティーファイルの例を提供します。
-
/opt/kafka/config/kraft/broker.properties
には、ブローカーロールの設定例があります。 -
/opt/kafka/config/kraft/controller.properties
には、コントローラーロールの設定例があります。 -
/opt/kafka/config/kraft/server.properties
には、統合されたロールの設定例があります。
ブローカー設定は、これらのプロパティーファイルの例に基づくことができます。この手順では、server.properties
の設定例を使用します。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
手順
Kafka クラスターの一意の ID を生成します。
そのために
kafka-storage
ツールを使用できます。/opt/kafka/bin/kafka-storage.sh random-uuid
このコマンドは ID を返します。KRaft モードでクラスター ID が必要です。
クラスター内の各ノードの設定プロパティーファイルを作成します。
ファイルは、Kafka で提供される例に基づくことができます。
デュアルロールを
broker
、controller
、またはbroker, controller
として指定します。たとえば、組み合わせたロールに
process.roles=broker, controller
を指定します。クラスター内の各ノードに
0
から始まる一意のnode.id
を指定します。たとえば、
node.id=1
です。<node_id>@<hostname:port>
の形式でcontroller.quorum.voters
のリストを指定します。たとえば、
controller.quorum.voters=1@localhost:9093
です。リスナーを指定します。
各リスナーの名前、ホスト名、ポートを設定します。
たとえば、
listeners=PLAINTEXT:localhost:9092,CONTROLLER:localhost:9093
です。ブローカー間通信に使用するリスナー名を設定します。
たとえば、
inter.broker.listener.name=PLAINTEXT
です。コントローラークォーラムで使用するリスナー名を設定します。
たとえば、
controller.listener.names=CONTROLLER
です。Kafka への接続のためにクライアントにアドバタイズされる各リスナーの名前、ホスト名、およびポートを設定します。
たとえば、
advertised.listeners=PLAINTEXT:localhost:9092
です。
Kafka クラスターの各ノードにログディレクトリーを設定します。
/opt/kafka/bin/kafka-storage.sh format -t <uuid> -c /opt/kafka/config/kraft/server.properties
戻り値:
Formatting /tmp/kraft-combined-logs
<uuid> は、生成したクラスター ID に置き換えます。クラスター内の各ノードで、同じ ID を使用します。
ブローカー用に作成したプロパティーファイルを使用してブローカー設定を適用します。
デフォルトでは、
server.properties
設定ファイルで指定されたログディレクトリー (log.dirs
) は/tmp/kraft-combined-logs
に設定されます。/tmp
ディレクトリーは通常、システムが再起動するたびにクリアされるため、開発環境のみに適しています。複数のログディレクトリーを設定するには、コンマ区切りリストを追加できます。
各 Kafka ノードを起動します。
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
Kafka が稼働していることを確認します。
jcmd | grep kafka
戻り値:
process ID kafka.Kafka /opt/kafka/config/kraft/server.properties
各ノードのログをチェックして、KRaft クラスターに正常に参加していることを確認します。
tail -f /opt/kafka/logs/server.log
トピックを作成し、ブローカーからメッセージを送受信できるようになりました。
メッセージを渡すブローカーの場合、クラスター内のブローカー全体でトピックのレプリケーションを使用して、データの耐久性を確保できます。少なくとも 3 のレプリケーション係数と、In-Sync レプリカの最小数がレプリケーション係数より 1 少ない数に設定されるようにトピックを設定します。詳細は、「トピックの作成」 を参照してください。