此内容没有您所选择的语言版本。
Chapter 14. Scaling clusters by adding or removing nodes
Kafka clusters can be scaled by adding or removing nodes, which serve as brokers, controllers, or both. In production, broker and controller roles are typically separated.
14.1. Scaling brokers 复制链接链接已复制到粘贴板!
Adding brokers improves performance and reliability by increasing available resources, enabling larger workloads, and enhancing fault tolerance through more replicas. Removing underutilized brokers optimizes resource consumption and efficiency.
Operations for scaling brokers:
- Adding brokers
- Set up the new broker node, making sure its storage is formatted using the existing Cluster ID. When started, rebalance partitions to distribute load onto the new broker.
- Removing brokers
- Before shutting down brokers, rebalance partitions to migrate their data to other brokers in the cluster to prevent data loss.
Cruise Control can automate partition reassignments:
-
add_brokermode redistributes partitions to new brokers. -
remove_brokermode moves partitions off brokers marked for removal.
Broker scaling or partition scaling? Increasing partitions can enhance topic throughput by distributing workload, but if brokers are constrained, such as by I/O limitations, adding partitions alone won’t help. Instead, more brokers are needed.
The number of brokers impacts replication settings. Choose values based on your fault tolerance requirements. For example, a replication factor of 3 ensures that each partition is replicated across three brokers, helping maintain availability during failures. For data durability, a minimum in-sync value of 2 requires at least two replicas acknowledge a write of the partition data.
Example replica configuration
default.replication.factor = 3 min.insync.replicas = 2
default.replication.factor = 3
min.insync.replicas = 2
14.2. Adding new brokers 复制链接链接已复制到粘贴板!
To add a new broker to a Kafka cluster, configure the broker, start it, and rebalance partition replicas to distribute load evenly across all brokers.
Prerequisites
-
Streams for Apache Kafka is installed on the host, and the configuration files and tools are available.
This procedure useskafka-server-start.shand Cruise Control for rebalancing. - Administrative access to the broker nodes.
For more information on using Cruise Control, see Chapter 15, Using Cruise Control for cluster rebalancing.
Procedure
Configure the new broker using a
broker.propertiesfile.At a minimum, the broker requires the following configuration:
- A unique node ID
- A valid listener configuration
Bootstrap information for the KRaft controller quorum
Example broker configuration
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Format the broker:
./bin/kafka-storage.sh format --cluster-id <uuid> --config ./config/broker.properties --no-initial-controllers
./bin/kafka-storage.sh format --cluster-id <uuid> --config ./config/broker.properties --no-initial-controllersCopy to Clipboard Copied! Toggle word wrap Toggle overflow Use the appropriate cluster ID.
Start the broker:
./bin/kafka-server-start.sh ./config/broker.properties
./bin/kafka-server-start.sh ./config/broker.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check the logs of the broker to ensure that is has successfully joined the KRaft cluster:
tail -f ./logs/server.log
tail -f ./logs/server.logCopy to Clipboard Copied! Toggle word wrap Toggle overflow Rebalance partitions to include the new broker using Cruise Control’s
add_brokermode:curl -v -X POST 'http://<cruise_control_-_host>:9090/kafkacruisecontrol/add_broker?brokerid=5&dryrun=false'
curl -v -X POST 'http://<cruise_control_-_host>:9090/kafkacruisecontrol/add_broker?brokerid=5&dryrun=false'Copy to Clipboard Copied! Toggle word wrap Toggle overflow TipCheck the rebalance with
dryrun=truefirst to review the proposed changes before running the command withdryrun=false.Monitor the rebalance progress using the Cruise Control
/user_tasksendpoint:curl 'http://<cruise_control_host>:9090/kafkacruisecontrol/user_tasks'
curl 'http://<cruise_control_host>:9090/kafkacruisecontrol/user_tasks'Copy to Clipboard Copied! Toggle word wrap Toggle overflow Wait for the rebalance to complete.
Verify that the broker has been added to the active cluster:
kafka-cluster.sh --bootstrap-server <kafka_host>:9092 --describe
kafka-cluster.sh --bootstrap-server <kafka_host>:9092 --describeCopy to Clipboard Copied! Toggle word wrap Toggle overflow This displays the list of brokers and their status.
<kafka_host>must be an active broker in the cluster with a known, accessible listener port.
14.3. Removing brokers 复制链接链接已复制到粘贴板!
To remove a broker from a Kafka cluster, rebalance partition replicas off the broker, shut it down, and remove it from service.
Prerequisites
-
Streams for Apache Kafka is installed on the host, and the configuration files and tools are available.
This procedure uses Cruise Control for rebalancing and assumes the broker was originally configured using abroker.propertiesfile. - Administrative access to the broker nodes.
-
The node ID of the broker to be removed is known.
In this procedure, we remove the node with ID 5.
For more information on using Cruise Control, see Chapter 15, Using Cruise Control for cluster rebalancing.
Procedure
Rebalance partitions off the broker that you want to remove using Cruise Control’s
remove_brokermode:curl -X POST 'http://<cruise_control_-_host>:9090/kafkacruisecontrol/remove_broker?brokerid=5&dryrun=false'
curl -X POST 'http://<cruise_control_-_host>:9090/kafkacruisecontrol/remove_broker?brokerid=5&dryrun=false'Copy to Clipboard Copied! Toggle word wrap Toggle overflow TipCheck the rebalance with
dryrun=truefirst to review the proposed changes before running the command withdryrun=false.Monitor the rebalance progress using the Cruise Control
/user_tasksendpoint:curl 'http://<cruise_control_host>:9090/kafkacruisecontrol/user_tasks'
curl 'http://<cruise_control_host>:9090/kafkacruisecontrol/user_tasks'Copy to Clipboard Copied! Toggle word wrap Toggle overflow Wait for the rebalance to complete.
Shut down the broker:
./bin/kafka-server-stop.sh
./bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow Unregister the broker to remove it from the cluster metadata.
For more information on unregistering, see Section 14.4, “Unregistering brokers after scale-down operations”.
Verify that the broker has been removed from the active cluster:
kafka-cluster.sh --bootstrap-server <kafka_host>:9092 --describe
kafka-cluster.sh --bootstrap-server <kafka_host>:9092 --describeCopy to Clipboard Copied! Toggle word wrap Toggle overflow This displays the list of brokers and their status.
<kafka_host>must be an active broker in the cluster with a known, accessible listener port.
14.4. Unregistering brokers after scale-down operations 复制链接链接已复制到粘贴板!
After removing a broker from a Kafka cluster, use the kafka-cluster.sh script to unregister the broker from the cluster metadata. Failing to unregister removed nodes leads to stale metadata, which causes operational issues.
Prerequisites
Before unregistering a broker, ensure the following tasks are completed:
-
Reassign the partitions from the broker you plan to remove to the remaining brokers using the Cruise control
remove-nodesoperation. -
Update the cluster configuration, if necessary, to adjust the replication factor for topics (
default.replication.factor) and the minimum required number of in-sync replica acknowledgements (min.insync.replicas). - Stop the Kafka broker service on the broker and remove the broker from the cluster.
Procedure
Unregister the removed broker from the cluster:
./bin/kafka-cluster.sh unregister \ --bootstrap-server <broker_host>:<port> \ --id <node_id_number>
./bin/kafka-cluster.sh unregister \ --bootstrap-server <broker_host>:<port> \ --id <node_id_number>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Verify the current state of the cluster by describing the topics:
./bin/kafka-topics.sh \ --bootstrap-server <broker_host>:<port> \ --describe
./bin/kafka-topics.sh \ --bootstrap-server <broker_host>:<port> \ --describeCopy to Clipboard Copied! Toggle word wrap Toggle overflow
14.5. Scaling controller quorums dynamically 复制链接链接已复制到粘贴板!
Dynamic controller quorums support scaling without requiring system downtime. Dynamic scaling supports the following use cases:
- Replacing controllers after hardware failure
- Migrating clusters to new machines
- Moving nodes between dedicated controller roles and combined broker and controller roles
A dynamic quorum is specified in the controller configuration by using the controller.quorum.bootstrap.servers property to list the host:port endpoints of the controllers. Only one controller can be added or removed from the cluster at a time, so complex quorum changes are implemented as a series of single changes. New controllers initially join as observers, replicating the metadata log but not counting toward the quorum. After catching up with the active controller, a new controller becomes eligible to join the quorum.
When removing controllers, it is recommended to shut them down first to avoid unnecessary leader elections. If the removed controller is the active controller, it steps down from the quorum only after the new quorum is confirmed. The removed controller does not include itself when calculating the last commit position in the __cluster_metadata log.
Although it is possible to configure a static quorum, this approach is not recommended because it requires downtime when scaling.
14.6. Checking the controller quorum type 复制链接链接已复制到粘贴板!
Determine whether a Kafka cluster uses a static or dynamic controller quorum.
Prerequisites
- Administrative access to a controller node.
- The Kafka CLI tools are available on the host.
Procedure
Describe the KRaft feature version.
bin/kafka-features.sh --bootstrap-controller <controller-host>:9093 describe
bin/kafka-features.sh --bootstrap-controller <controller-host>:9093 describeCopy to Clipboard Copied! Toggle word wrap Toggle overflow Identify the finalized KRaft version. In the output, locate the line that begins with
Feature: kraft.version.If
FinalizedVersionLevelis0, the cluster uses a static controller quorum. If it is1or higher, the cluster uses a dynamic controller quorum. If the cluster uses a dynamic controller quorum, you do not need to complete any further procedures related to enabling dynamic controller quorum management.
14.7. Enabling dynamic controller quorum management 复制链接链接已复制到粘贴板!
Enable dynamic controller quorum management so that controllers can be added to or removed from a Kafka cluster without restarting the cluster.
Prerequisites
- Administrative access to the Kafka cluster.
- Access to at least one broker and one controller node.
- The Kafka CLI tools are available on the host.
Procedure
Determine whether the cluster uses a static controller quorum.
For more information, see Checking the controller quorum type.
If the cluster already supports dynamic controller quorum management, you do not need to complete this procedure.
Upgrade the KRaft feature version to enable dynamic controller quorum support.
To upgrade all feature versions to the current release version:
bin/kafka-features.sh --bootstrap-server <broker-host>:9092 upgrade --release-version 4.1
bin/kafka-features.sh --bootstrap-server <broker-host>:9092 upgrade --release-version 4.1Copy to Clipboard Copied! Toggle word wrap Toggle overflow Alternatively, upgrade only the KRaft feature version:
bin/kafka-features.sh --bootstrap-server <broker-host>:9092 upgrade --feature kraft.version=1
bin/kafka-features.sh --bootstrap-server <broker-host>:9092 upgrade --feature kraft.version=1Copy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the KRaft feature version is finalized.
bin/kafka-features.sh --bootstrap-server <broker-host>:9092 describe
bin/kafka-features.sh --bootstrap-server <broker-host>:9092 describeCopy to Clipboard Copied! Toggle word wrap Toggle overflow Confirm that
FinalizedVersionLevelforFeature: kraft.versionis1or higher, indicating that dynamic controller quorum management is supported.Update the configuration on all brokers and controllers in the cluster.
Remove the
controller.quorum.votersproperty. Add thecontroller.quorum.bootstrap.serversproperty.For example, update the configuration as follows:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Restart each node to apply the configuration changes.
14.8. Adding new controllers 复制链接链接已复制到粘贴板!
To add a new controller to an existing dynamic controller quorum in Kafka, create a new controller, monitor its replication status, and then integrate it into the cluster.
Prerequisites
-
Streams for Apache Kafka is installed on the host, and the configuration files and tools are available.
This procedure uses thekafka-storage.sh,kafka-server-start.shandkafka-metadata-quorum.shtools. - Administrative access to the controller nodes.
Procedure
Configure a new controller node using the
controller.propertiesfile.At a minimum, the new controller requires the following configuration:
- A unique node ID
- Listener name used by the controller quorum
A quorum of controllers
Example controller configuration
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The
controller.quorum.bootstrap.serversconfiguration includes the host and port of the new controller and each other controller already present in the cluster.ImportantIf there is any further configuration required, such as authentication, make sure to include it in the
controller.properties.
-
Update
controller.quorum.bootstrap.serversin the configuration of each node in the cluster with the host and port of the new controller. Set the log directory ID for the new controller:
./bin/kafka-storage.sh format --cluster-id <cluster_id> --config controller.properties --no-initial-controllers
./bin/kafka-storage.sh format --cluster-id <cluster_id> --config controller.properties --no-initial-controllersCopy to Clipboard Copied! Toggle word wrap Toggle overflow By using the
no-initial-controllersoption, the controller is initialized without it joining the controller quorum.Start the controller node
./bin/kafka-server-start.sh ./config/controller.properties
./bin/kafka-server-start.sh ./config/controller.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Monitor the replication progress of the new controller:
./bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication
./bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replicationCopy to Clipboard Copied! Toggle word wrap Toggle overflow Wait until the new controller has caught up with the active controller before proceeding.
Add the new controller to the controller quorum:
./bin/kafka-metadata-quorum.sh --command-config controller.properties --bootstrap-controller localhost:9092 add-controller
./bin/kafka-metadata-quorum.sh --command-config controller.properties --bootstrap-controller localhost:9092 add-controllerCopy to Clipboard Copied! Toggle word wrap Toggle overflow
14.9. Removing controllers 复制链接链接已复制到粘贴板!
To remove a controller from an existing dynamic controller quorum in Kafka, use the kafka-metadata-quorum.sh tool.
Prerequisites
-
Streams for Apache Kafka is installed on the host, and the configuration files and tools are available.
This procedure uses thekafka-server-stop.shandkafka-metadata-quorum.shtools. - Administrative access to the controller nodes.
Procedure
Stop the controller node:
./bin/kafka-server-stop.sh
./bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
Locate the ID of the controller and its directory ID to be able to remove it from the controller quorum. You can find this information in the
meta.propertiesfile of the metadata log. Remove the controller from the controller quorum:
./bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9092 remove-controller --controller-id <id> --controller-directory-id <directory_id>
./bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9092 remove-controller --controller-id <id> --controller-directory-id <directory_id>Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
Update
controller.quorum.bootstrap.serversin the configuration of each node in the cluster to remove the host and port of the controller removed from the controller quorum.