이 콘텐츠는 선택한 언어로 제공되지 않습니다.
Chapter 14. Scaling clusters by adding or removing brokers
Scaling Kafka clusters by adding brokers can increase the performance and reliability of the cluster. Adding more brokers increases available resources, allowing the cluster to handle larger workloads and process more messages. It can also improve fault tolerance by providing more replicas and backups. Conversely, removing underutilized brokers can reduce resource consumption and improve efficiency. Scaling must be done carefully to avoid disruption or data loss. By redistributing partitions across all brokers in the cluster, the resource utilization of each broker is reduced, which can increase the overall throughput of the cluster.
To increase the throughput of a Kafka topic, you can increase the number of partitions for that topic. This allows the load of the topic to be shared between different brokers in the cluster. However, if every broker is constrained by a specific resource (such as I/O), adding more partitions will not increase the throughput. In this case, you need to add more brokers to the cluster.
Adding brokers when running a multi-node Kafka cluster affects the number of brokers in the cluster that act as replicas. The actual replication factor for topics is determined by settings for the default.replication.factor
and min.insync.replicas
, and the number of available brokers. For example, a replication factor of 3 means that each partition of a topic is replicated across three brokers, ensuring fault tolerance in the event of a broker failure.
Example replica configuration
default.replication.factor = 3 min.insync.replicas = 2
When you add or remove brokers, Kafka does not automatically reassign partitions. The best way to do this is using Cruise Control. You can use Cruise Control’s add_broker
and remove_broker
modes when scaling a cluster up or down.
-
Use the
add_broker
mode after scaling up a Kafka cluster to move partition replicas from existing brokers to the newly added brokers. -
Use the
remove_broker
mode before scaling down a Kafka cluster to move partition replicas off the brokers that are going to be removed.
14.1. Scaling controller clusters dynamically
Dynamic controller quorums support scaling without requiring system downtime. Dynamic scaling is useful not only for adding or removing controllers, but supports the following:
- Replacing controllers because of hardware failure
- Migrating clusters onto new machines
- Moving nodes from dedicated controller roles to combined roles or vice versa
A dynamic quorum is specified in the controller configuration using the controller.quorum.bootstrap.servers
property to list host:port
endpoints for each controller. Only one controller can be added or removed from the cluster at a time, so complex quorum changes are implemented as a series of single changes. New controllers join as observers, replicating the metadata log but not counting towards the quorum. When caught up with the active controller, the new controller is eligible to join the quorum.
When removing controllers, it’s recommended that they are first shutdown to avoid unnecessary leader elections. If the removed controller is the active one, it will step down from the quorum only after the new quorum is confirmed. However, it will not include itself when calculating the last commit position in the __cluster_metadata
log.
In a dynamic quorum, the active Kraft version is at 1 or above for all cluster nodes. Find the active KRaft version using the kafka-features.sh
tool:
./bin/kafka-features.sh --bootstrap-controller localhost:9093 describe | grep kraft.version
In this example output, the active version (FinalizedVersionLevel
) in the Kafka cluster is 1:
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
If the kraft.version
property shows an active version level of 0 or is absent, you are using a static quorum. If it is 1 or above, you are using a dynamic quorum.
It’s possible to configure a static quorum, but it is not a recommended approach as it requires downtime when scaling.
14.2. Adding new controllers
To add a new controller to an existing dynamic controller quorum in Kafka, create a new controller, monitor its replication status, and then integrate it into the cluster.
Prerequisites
-
Streams for Apache Kafka is installed on the host, and the configuration files and tools are available.
This procedure uses thekafka-storage.sh
,kafka-server-start.sh
andkafka-metadata-quorum.sh
tools. - Administrative access to the controller nodes.
Procedure
Configure a new controller node using the
controller.properties
file.At a minimum, the new controller requires the following configuration:
- A unique node ID
- Listener name used by the controller quorum
A quorum of controllers
Example controller configuration
process.roles=controller node.id=1 listeners=CONTROLLER://0.0.0.0:9092 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT controller.quorum.bootstrap.servers=localhost:9090, localhost:9091, localhost:9092
The
controller.quorum.bootstrap.servers
configuration includes the host and port of the new controller and each other controller already present in the cluster.
-
Update
controller.quorum.bootstrap.servers
in the configuration of each node in the cluster with the host and port of the new controller. Set the log directory ID for the new controller:
./bin/kafka-storage.sh format --cluster-id <cluster_id> --config server.properties --no-initial-controllers
By using the
no-initial-controllers
option, the controller is initialized without it joining the controller quorum.Start the controller node
./bin/kafka-server-start.sh ./config/kraft/controller.properties
Monitor the replication progress of the new controller:
./bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --replication
Wait until the new controller has caught up with the active controller before proceeding.
Add the new controller to the controller quorum:
./bin/kafka-metadata-quorum.sh --command-config controller.properties --bootstrap-controller localhost:9092 add-controller
14.3. Removing controllers
To remove a controller from an existing dynamic controller quorum in Kafka, use the kafka-metadata-quorum.sh
tool.
Prerequisites
-
Streams for Apache Kafka is installed on the host, and the configuration files and tools are available.
This procedure uses thekafka-server-stop.sh
andkafka-metadata-quorum.sh
tools. - Administrative access to the controller nodes.
Procedure
Stop the controller node
./bin/kafka-server-stop.sh
-
Locate the ID of the controller and its directory ID to be able to remove it from the controller quorum. You can find this information in the
meta.properties
file of the metadata log. Remove the controller from the controller quorum:
./bin/kafka-metadata-quorum.sh --bootstrap-controller localhost:9092 remove-controller --controller-id <id> --controller-directory-id <directory_id>
-
Update
controller.quorum.bootstrap.servers
in the configuration of each node in the cluster to remove the host and port of the controller removed from the controller quorum.
14.4. Unregistering nodes after scale-down operations
After removing a node from a Kafka cluster, use the kafka-cluster.sh
script to unregister the node from the cluster metadata. Failing to unregister removed nodes leads to stale metadata, which causes operational issues.
Prerequisites
Before unregistering a node, ensure the following tasks are completed:
-
Reassign the partitions from the node you plan to remove to the remaining brokers using the Cruise control
remove-nodes
operation. -
Update the cluster configuration, if necessary, to adjust the replication factor for topics (
default.replication.factor
) and the minimum required number of in-sync replica acknowledgements (min.insync.replicas
). - Stop the Kafka broker service on the node and remove the node from the cluster.
Procedure
Unregister the removed node from the cluster:
./bin/kafka-cluster.sh unregister \ --bootstrap-server <broker_host>:<port> \ --id <node_id_number>
Verify the current state of the cluster by describing the topics:
./bin/kafka-topics.sh \ --bootstrap-server <broker_host>:<port> \ --describe