Chapter 15. Using the partition reassignment tool
When scaling a Kafka cluster, you may need to add or remove brokers and update the distribution of partitions or the replication factor of topics. To update partitions and topics, you can use the kafka-reassign-partitions.sh
tool.
You can change the replication factor of a topic using the kafka-reassign-partitions.sh
tool. The tool can also be used to reassign partitions and balance the distribution of partitions across brokers to improve performance. However, it is recommended to use Cruise Control for automated partition reassignments and cluster rebalancing and changing the topic replication factor. Cruise Control can move topics from one broker to another without any downtime, and it is the most efficient way to reassign partitions.
15.1. Partition reassignment tool overview
The partition reassignment tool provides the following capabilities for managing Kafka partitions and brokers:
- Redistributing partition replicas
- Scale your cluster up and down by adding or removing brokers, and move Kafka partitions from heavily loaded brokers to under-utilized brokers. To do this, you must create a partition reassignment plan that identifies which topics and partitions to move and where to move them. Cruise Control is recommended for this type of operation as it automates the cluster rebalancing process.
- Scaling topic replication factor up and down
- Increase or decrease the replication factor of your Kafka topics. To do this, you must create a partition reassignment plan that identifies the existing replication assignment across partitions and an updated assignment with the replication factor changes.
- Changing the preferred leader
- Change the preferred leader of a Kafka partition. This can be useful if the current preferred leader is unavailable or if you want to redistribute load across the brokers in the cluster. To do this, you must create a partition reassignment plan that specifies the new preferred leader for each partition by changing the order of replicas.
- Changing the log directories to use a specific JBOD volume
- Change the log directories of your Kafka brokers to use a specific JBOD volume. This can be useful if you want to move your Kafka data to a different disk or storage device. To do this, you must create a partition reassignment plan that specifies the new log directory for each topic.
15.1.1. Generating a partition reassignment plan
The partition reassignment tool (kafka-reassign-partitions.sh
) works by generating a partition assignment plan that specifies which partitions should be moved from their current broker to a new broker.
If you are satisfied with the plan, you can execute it. The tool then does the following:
- Migrates the partition data to the new broker
- Updates the metadata on the Kafka brokers to reflect the new partition assignments
- Triggers a rolling restart of the Kafka brokers to ensure that the new assignments take effect
The partition reassignment tool has three different modes:
--generate
- Takes a set of topics and brokers and generates a reassignment JSON file which will result in the partitions of those topics being assigned to those brokers. Because this operates on whole topics, it cannot be used when you only want to reassign some partitions of some topics.
--execute
- Takes a reassignment JSON file and applies it to the partitions and brokers in the cluster. Brokers that gain partitions as a result become followers of the partition leader. For a given partition, once the new broker has caught up and joined the ISR (in-sync replicas) the old broker will stop being a follower and will delete its replica.
--verify
-
Using the same reassignment JSON file as the
--execute
step,--verify
checks whether all the partitions in the file have been moved to their intended brokers. If the reassignment is complete,--verify
also removes any traffic throttles (--throttle
) that are in effect. Unless removed, throttles will continue to affect the cluster even after the reassignment has finished.
It is only possible to have one reassignment running in a cluster at any given time, and it is not possible to cancel a running reassignment. If you must cancel a reassignment, wait for it to complete and then perform another reassignment to revert the effects of the first reassignment. The kafka-reassign-partitions.sh
will print the reassignment JSON for this reversion as part of its output. Very large reassignments should be broken down into a number of smaller reassignments in case there is a need to stop in-progress reassignment.
15.1.2. Specifying topics in a partition reassignment JSON file
The kafka-reassign-partitions.sh
tool uses a reassignment JSON file that specifies the topics to reassign. You can generate a reassignment JSON file or create a file manually if you want to move specific partitions.
A basic reassignment JSON file has the structure presented in the following example, which describes three partitions belonging to two Kafka topics. Each partition is reassigned to a new set of replicas, which are identified by their broker IDs. The version
, topic
, partition
, and replicas
properties are all required.
Example partition reassignment JSON file structure
{ "version": 1, 1 "partitions": [ 2 { "topic": "example-topic-1", 3 "partition": 0, 4 "replicas": [1, 2, 3] 5 }, { "topic": "example-topic-1", "partition": 1, "replicas": [2, 3, 4] }, { "topic": "example-topic-2", "partition": 0, "replicas": [3, 4, 5] } ] }
- 1
- The version of the reassignment JSON file format. Currently, only version 1 is supported, so this should always be 1.
- 2
- An array that specifies the partitions to be reassigned.
- 3
- The name of the Kafka topic that the partition belongs to.
- 4
- The ID of the partition being reassigned.
- 5
- An ordered array of the IDs of the brokers that should be assigned as replicas for this partition. The first broker in the list is the leader replica.
Partitions not included in the JSON are not changed.
If you specify only topics using a topics
array, the partition reassignment tool reassigns all the partitions belonging to the specified topics.
Example reassignment JSON file structure for reassigning all partitions for a topic
{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
15.1.3. Reassigning partitions between JBOD volumes
When using JBOD storage in your Kafka cluster, you can reassign the partitions between specific volumes and their log directories (each volume has a single log directory).
To reassign a partition to a specific volume, add log_dirs
values for each partition in the reassignment JSON file. Each log_dirs
array contains the same number of entries as the replicas
array, since each replica should be assigned to a specific log directory. The log_dirs
array contains either an absolute path to a log directory or the special value any
. The any
value indicates that Kafka can choose any available log directory for that replica, which can be useful when reassigning partitions between JBOD volumes.
Example reassignment JSON file structure with log directories
{ "version": 1, "partitions": [ { "topic": "example-topic-1", "partition": 0, "replicas": [1, 2, 3] "log_dirs": ["/var/lib/kafka/data-0/kafka-log1", "any", "/var/lib/kafka/data-1/kafka-log2"] }, { "topic": "example-topic-1", "partition": 1, "replicas": [2, 3, 4] "log_dirs": ["any", "/var/lib/kafka/data-2/kafka-log3", "/var/lib/kafka/data-3/kafka-log4"] }, { "topic": "example-topic-2", "partition": 0, "replicas": [3, 4, 5] "log_dirs": ["/var/lib/kafka/data-4/kafka-log5", "any", "/var/lib/kafka/data-5/kafka-log6"] } ] }
15.1.4. Throttling partition reassignment
Partition reassignment can be a slow process because it involves transferring large amounts of data between brokers. To avoid a detrimental impact on clients, you can throttle the reassignment process. Use the --throttle
parameter with the kafka-reassign-partitions.sh
tool to throttle a reassignment. You specify a maximum threshold in bytes per second for the movement of partitions between brokers. For example, --throttle 5000000
sets a maximum threshold for moving partitions of 50 MBps.
Throttling might cause the reassignment to take longer to complete.
- If the throttle is too low, the newly assigned brokers will not be able to keep up with records being published and the reassignment will never complete.
- If the throttle is too high, clients will be impacted.
For example, for producers, this could manifest as higher than normal latency waiting for acknowledgment. For consumers, this could manifest as a drop in throughput caused by higher latency between polls.
15.2. Reassigning partitions after adding brokers
Use a reassignment file generated by the kafka-reassign-partitions.sh
tool to reassign partitions after increasing the number of brokers in a Kafka cluster. The reassignment file should describe how partitions are reassigned to brokers in the enlarged Kafka cluster. You apply the reassignment specified in the file to the brokers and then verify the new partition assignments.
This procedure describes a secure scaling process that uses TLS. You’ll need a Kafka cluster that uses TLS encryption and mTLS authentication.
Though you can use the kafka-reassign-partitions.sh
tool, Cruise Control is recommended for automated partition reassignments and cluster rebalancing. Cruise Control can move topics from one broker to another without any downtime, and it is the most efficient way to reassign partitions.
Prerequisites
- An existing Kafka cluster.
- A new machine with the additional AMQ broker installed.
You have created a JSON file to specify how partitions should be reassigned to brokers in the enlarged cluster.
In this procedure, we are reassigning all partitions for a topic called
my-topic
. A JSON file namedtopics.json
specifies the topic, and is used to generate areassignment.json
file.
Example JSON file specifies my-topic
{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
Procedure
-
Create a configuration file for the new broker using the same settings as for the other brokers in your cluster, except for
broker.id
, which should be a number that is not already used by any of the other brokers. Start the new Kafka broker passing the configuration file you created in the previous step as the argument to the
kafka-server-start.sh
script:su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
Verify that the Kafka broker is running.
jcmd | grep Kafka
- Repeat the above steps for each new broker.
If you haven’t done so, generate a reassignment JSON file named
reassignment.json
using thekafka-reassign-partitions.sh
tool.Example command to generate the reassignment JSON file
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --topics-to-move-json-file topics.json \ 1 --broker-list 0,1,2,3,4 \ 2 --generate
Example reassignment JSON file showing the current and proposed replica assignment
Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3,4],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4,0],"log_dirs":["any","any","any","any"]}]}
Save a copy of this file locally in case you need to revert the changes later on.
Run the partition reassignment using the
--execute
option./opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --execute
If you are going to throttle replication you can also pass the
--throttle
option with an inter-broker throttled rate in bytes per second. For example:/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --throttle 5000000 \ --execute
Verify that the reassignment has completed using the
--verify
option./opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --verify
The reassignment has finished when the
--verify
command reports that each of the partitions being moved has completed successfully. This final--verify
will also have the effect of removing any reassignment throttles.
15.3. Reassigning partitions before removing brokers
Use a reassignment file generated by the kafka-reassign-partitions.sh
tool to reassign partitions before decreasing the number of brokers in a Kafka cluster. The reassignment file must describe how partitions are reassigned to the remaining brokers in the Kafka cluster. You apply the reassignment specified in the file to the brokers and then verify the new partition assignments. Brokers in the highest numbered pods are removed first.
This procedure describes a secure scaling process that uses TLS. You’ll need a Kafka cluster that uses TLS encryption and mTLS authentication.
Though you can use the kafka-reassign-partitions.sh
tool, Cruise Control is recommended for automated partition reassignments and cluster rebalancing. Cruise Control can move topics from one broker to another without any downtime, and it is the most efficient way to reassign partitions.
Prerequisites
- An existing Kafka cluster.
You have created a JSON file to specify how partitions should be reassigned to brokers in the reduced cluster.
In this procedure, we are reassigning all partitions for a topic called
my-topic
. A JSON file namedtopics.json
specifies the topic, and is used to generate areassignment.json
file.
Example JSON file specifies my-topic
{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
Procedure
If you haven’t done so, generate a reassignment JSON file named
reassignment.json
using thekafka-reassign-partitions.sh
tool.Example command to generate the reassignment JSON file
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --topics-to-move-json-file topics.json \ 1 --broker-list 0,1,2,3 \ 2 --generate
Example reassignment JSON file showing the current and proposed replica assignment
Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[3,4,2,0],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[0,2,3,1],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[1,3,0,4],"log_dirs":["any","any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]}]}
Save a copy of this file locally in case you need to revert the changes later on.
Run the partition reassignment using the
--execute
option./opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --execute
If you are going to throttle replication you can also pass the
--throttle
option with an inter-broker throttled rate in bytes per second. For example:/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --throttle 5000000 \ --execute
Verify that the reassignment has completed using the
--verify
option./opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --verify
The reassignment has finished when the
--verify
command reports that each of the partitions being moved has completed successfully. This final--verify
will also have the effect of removing any reassignment throttles.Check that each broker being removed does not have any live partitions in its log (
log.dirs
).ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
If a log directory does not match the regular expression
\.[a-z0-9]-delete$
, active partitions are still present. If you have active partitions, check the reassignment has finished or the configuration in the reassignment JSON file. You can run the reassignment again. Make sure that there are no active partitions before moving on to the next step.Stop the broker.
su - kafka /opt/kafka/bin/kafka-server-stop.sh
Confirm that the Kafka broker has stopped.
jcmd | grep kafka
15.4. Changing the replication factor of topics
Use the kafka-reassign-partitions.sh
tool to change the replication factor of topics in a Kafka cluster. This can be done using a reassignment file to describe how the topic replicas should be changed.
Prerequisites
- An existing Kafka cluster.
You have created a JSON file to specify the topics to include in the operation.
In this procedure, a topic called
my-topic
has 4 replicas and we want to reduce it to 3. A JSON file namedtopics.json
specifies the topic, and is used to generate areassignment.json
file.
Example JSON file specifies my-topic
{ "version": 1, "topics": [ { "topic": "my-topic"} ] }
Procedure
If you haven’t done so, generate a reassignment JSON file named
reassignment.json
using thekafka-reassign-partitions.sh
tool.Example command to generate the reassignment JSON file
/opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --topics-to-move-json-file topics.json \ 1 --broker-list 0,1,2,3,4 \ 2 --generate
Example reassignment JSON file showing the current and proposed replica assignment
Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[3,4,2,0],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[0,2,3,1],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[1,3,0,4],"log_dirs":["any","any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3,4],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4,0],"log_dirs":["any","any","any","any"]}]}
Save a copy of this file locally in case you need to revert the changes later on.
Edit the
reassignment.json
to remove a replica from each partition.For example use
jq
to remove the last replica in the list for each partition of the topic:Removing the last topic replica for each partition
jq '.partitions[].replicas |= del(.[-1])' reassignment.json > reassignment.json
Example reassignment file showing the updated replicas
{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4],"log_dirs":["any","any","any","any"]}]}
Make the topic replica change using the
--execute
option./opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --execute
NoteRemoving replicas from a broker does not require any inter-broker data movement, so there is no need to throttle replication. If you are adding replicas, then you may want to change the throttle rate.
Verify that the change to the topic replicas has completed using the
--verify
option./opt/kafka/bin/kafka-reassign-partitions.sh \ --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --verify
The reassignment has finished when the
--verify
command reports that each of the partitions being moved has completed successfully. This final--verify
will also have the effect of removing any reassignment throttles.Run the
bin/kafka-topics.sh
command with the--describe
option to see the results of the change to the topics./opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --describe
Results of reducing the number of replicas for a topic
my-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 my-topic Partition: 1 Leader: 2 Replicas: 1,2,3 Isr: 1,2,3 my-topic Partition: 2 Leader: 3 Replicas: 2,3,4 Isr: 2,3,4