Dieser Inhalt ist in der von Ihnen ausgewählten Sprache nicht verfügbar.
Chapter 19. Using the partition reassignment tool
When scaling a Kafka cluster, you may need to add or remove brokers and update the distribution of partitions or the replication factor of topics. To update partitions and topics, you can use the kafka-reassign-partitions.sh tool.
Neither the AMQ Streams Cruise Control integration nor the Topic Operator support changing the replication factor of a topic. However, you can change the replication factor of a topic using the kafka-reassign-partitions.sh tool.
The tool can also be used to reassign partitions and balance the distribution of partitions across brokers to improve performance. However, it is recommended to use Cruise Control for automated partition reassignments and cluster rebalancing. Cruise Control can move topics from one broker to another without any downtime, and it is the most efficient way to reassign partitions.
It is recommended to run the kafka-reassign-partitions.sh tool as a separate interactive pod rather than within the broker container. Running the Kafka bin/ scripts within the broker container may cause a JVM to start with the same settings as the Kafka broker, which can potentially cause disruptions. By running the kafka-reassign-partitions.sh tool in a separate pod, you can avoid this issue. Running a pod with the -ti option creates an interactive pod with a terminal for running shell commands inside the pod.
Running an interactive pod with a terminal
oc run helper-pod -ti --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.2 --rm=true --restart=Never -- bash
oc run helper-pod -ti --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.2 --rm=true --restart=Never -- bash
19.1. Partition reassignment tool overview Link kopierenLink in die Zwischenablage kopiert!
The partition reassignment tool provides the following capabilities for managing Kafka partitions and brokers:
- Redistributing partition replicas
- Scale your cluster up and down by adding or removing brokers, and move Kafka partitions from heavily loaded brokers to under-utilized brokers. To do this, you must create a partition reassignment plan that identifies which topics and partitions to move and where to move them. Cruise Control is recommended for this type of operation as it automates the cluster rebalancing process.
- Scaling topic replication factor up and down
- Increase or decrease the replication factor of your Kafka topics. To do this, you must create a partition reassignment plan that identifies the existing replication assignment across partitions and an updated assignment with the replication factor changes.
- Changing the preferred leader
- Change the preferred leader of a Kafka partition. This can be useful if the current preferred leader is unavailable or if you want to redistribute load across the brokers in the cluster. To do this, you must create a partition reassignment plan that specifies the new preferred leader for each partition by changing the order of replicas.
- Changing the log directories to use a specific JBOD volume
- Change the log directories of your Kafka brokers to use a specific JBOD volume. This can be useful if you want to move your Kafka data to a different disk or storage device. To do this, you must create a partition reassignment plan that specifies the new log directory for each topic.
19.1.1. Generating a partition reassignment plan Link kopierenLink in die Zwischenablage kopiert!
The partition reassignment tool (kafka-reassign-partitions.sh) works by generating a partition assignment plan that specifies which partitions should be moved from their current broker to a new broker.
If you are satisfied with the plan, you can execute it. The tool then does the following:
- Migrates the partition data to the new broker
- Updates the metadata on the Kafka brokers to reflect the new partition assignments
- Triggers a rolling restart of the Kafka brokers to ensure that the new assignments take effect
The partition reassignment tool has three different modes:
--generate- Takes a set of topics and brokers and generates a reassignment JSON file which will result in the partitions of those topics being assigned to those brokers. Because this operates on whole topics, it cannot be used when you only want to reassign some partitions of some topics.
--execute- Takes a reassignment JSON file and applies it to the partitions and brokers in the cluster. Brokers that gain partitions as a result become followers of the partition leader. For a given partition, once the new broker has caught up and joined the ISR (in-sync replicas) the old broker will stop being a follower and will delete its replica.
--verify-
Using the same reassignment JSON file as the
--executestep,--verifychecks whether all the partitions in the file have been moved to their intended brokers. If the reassignment is complete,--verifyalso removes any traffic throttles (--throttle) that are in effect. Unless removed, throttles will continue to affect the cluster even after the reassignment has finished.
It is only possible to have one reassignment running in a cluster at any given time, and it is not possible to cancel a running reassignment. If you must cancel a reassignment, wait for it to complete and then perform another reassignment to revert the effects of the first reassignment. The kafka-reassign-partitions.sh will print the reassignment JSON for this reversion as part of its output. Very large reassignments should be broken down into a number of smaller reassignments in case there is a need to stop in-progress reassignment.
19.1.2. Specifying topics in a partition reassignment JSON file Link kopierenLink in die Zwischenablage kopiert!
The kafka-reassign-partitions.sh tool uses a reassignment JSON file that specifies the topics to reassign. You can generate a reassignment JSON file or create a file manually if you want to move specific partitions.
A basic reassignment JSON file has the structure presented in the following example, which describes three partitions belonging to two Kafka topics. Each partition is reassigned to a new set of replicas, which are identified by their broker IDs. The version, topic, partition, and replicas properties are all required.
Example partition reassignment JSON file structure
- 1
- The version of the reassignment JSON file format. Currently, only version 1 is supported, so this should always be 1.
- 2
- An array that specifies the partitions to be reassigned.
- 3
- The name of the Kafka topic that the partition belongs to.
- 4
- The ID of the partition being reassigned.
- 5
- An ordered array of the IDs of the brokers that should be assigned as replicas for this partition. The first broker in the list is the leader replica.
Partitions not included in the JSON are not changed.
If you specify only topics using a topics array, the partition reassignment tool reassigns all the partitions belonging to the specified topics.
Example reassignment JSON file structure for reassigning all partitions for a topic
19.1.3. Reassigning partitions between JBOD volumes Link kopierenLink in die Zwischenablage kopiert!
When using JBOD storage in your Kafka cluster, you can reassign the partitions between specific volumes and their log directories (each volume has a single log directory).
To reassign a partition to a specific volume, add log_dirs values for each partition in the reassignment JSON file. Each log_dirs array contains the same number of entries as the replicas array, since each replica should be assigned to a specific log directory. The log_dirs array contains either an absolute path to a log directory or the special value any. The any value indicates that Kafka can choose any available log directory for that replica, which can be useful when reassigning partitions between JBOD volumes.
Example reassignment JSON file structure with log directories
19.1.4. Throttling partition reassignment Link kopierenLink in die Zwischenablage kopiert!
Partition reassignment can be a slow process because it involves transferring large amounts of data between brokers. To avoid a detrimental impact on clients, you can throttle the reassignment process. Use the --throttle parameter with the kafka-reassign-partitions.sh tool to throttle a reassignment. You specify a maximum threshold in bytes per second for the movement of partitions between brokers. For example, --throttle 5000000 sets a maximum threshold for moving partitions of 50 MBps.
Throttling might cause the reassignment to take longer to complete.
- If the throttle is too low, the newly assigned brokers will not be able to keep up with records being published and the reassignment will never complete.
- If the throttle is too high, clients will be impacted.
For example, for producers, this could manifest as higher than normal latency waiting for acknowledgment. For consumers, this could manifest as a drop in throughput caused by higher latency between polls.
19.2. Generating a reassignment JSON file to reassign partitions Link kopierenLink in die Zwischenablage kopiert!
Generate a reassignment JSON file with the kafka-reassign-partitions.sh tool to reassign partitions after scaling a Kafka cluster. Adding or removing brokers does not automatically redistribute the existing partitions. To balance the partition distribution and take full advantage of the new brokers, you can reassign the partitions using the kafka-reassign-partitions.sh tool.
You run the tool from an interactive pod container connected to the Kafka cluster.
The following procedure describes a secure reassignment process that uses mTLS. You’ll need a Kafka cluster that uses TLS encryption and mTLS authentication.
You’ll need the following to establish a connection:
- The cluster CA certificate and password generated by the Cluster Operator when the Kafka cluster is created
- The user CA certificate and password generated by the User Operator when a user is created for client access to the Kafka cluster
In this procedure, the CA certificates and corresponding passwords are extracted from the cluster and user secrets that contain them in PKCS #12 (.p12 and .password) format. The passwords allow access to the .p12 stores that contain the certificates. You use the .p12 stores to specify a truststore and keystore to authenticate connection to the Kafka cluster.
Prerequisites
- You have a running Cluster Operator.
You have a running Kafka cluster based on a
Kafkaresource configured with internal TLS encryption and mTLS authentication.Kafka configuration with TLS encryption and mTLS authentication
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The running Kafka cluster contains a set of topics and partitions to reassign.
Example topic configuration for
my-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow You have a
KafkaUserconfigured with ACL rules that specify permission to produce and consume topics from the Kafka brokers.Example Kafka user configuration with ACL rules to allow operations on
my-topicandmy-clusterCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Procedure
Extract the cluster CA certificate and password from the
<cluster_name>-cluster-ca-certsecret of the Kafka cluster.oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordoc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <cluster_name> with the name of the Kafka cluster. When you deploy Kafka using the
Kafkaresource, a secret with the cluster CA certificate is created with the Kafka cluster name (<cluster_name>-cluster-ca-cert). For example,my-cluster-cluster-ca-cert.Run a new interactive pod container using the AMQ Streams Kafka image to connect to a running Kafka broker.
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.2 <interactive_pod_name> -- /bin/sh -c "sleep 3600"
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.2 <interactive_pod_name> -- /bin/sh -c "sleep 3600"Copy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <interactive_pod_name> with the name of the pod.
Copy the cluster CA certificate to the interactive pod container.
oc cp ca.p12 <interactive_pod_name>:/tmp
oc cp ca.p12 <interactive_pod_name>:/tmpCopy to Clipboard Copied! Toggle word wrap Toggle overflow Extract the user CA certificate and password from the secret of the Kafka user that has permission to access the Kafka brokers.
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordoc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <kafka_user> with the name of the Kafka user. When you create a Kafka user using the
KafkaUserresource, a secret with the user CA certificate is created with the Kafka user name. For example,my-user.Copy the user CA certificate to the interactive pod container.
oc cp user.p12 <interactive_pod_name>:/tmp
oc cp user.p12 <interactive_pod_name>:/tmpCopy to Clipboard Copied! Toggle word wrap Toggle overflow The CA certificates allow the interactive pod container to connect to the Kafka broker using TLS.
Create a
config.propertiesfile to specify the truststore and keystore used to authenticate connection to the Kafka cluster.Use the certificates and passwords you extracted in the previous steps.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- The bootstrap server address to connect to the Kafka cluster. Use your own Kafka cluster name to replace <kafka_cluster_name>.
- 2
- The security protocol option when using TLS for encryption.
- 3
- The truststore location contains the public key certificate (
ca.p12) for the Kafka cluster. - 4
- The password (
ca.password) for accessing the truststore. - 5
- The keystore location contains the public key certificate (
user.p12) for the Kafka user. - 6
- The password (
user.password) for accessing the keystore.
Copy the
config.propertiesfile to the interactive pod container.oc cp config.properties <interactive_pod_name>:/tmp/config.properties
oc cp config.properties <interactive_pod_name>:/tmp/config.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Prepare a JSON file named
topics.jsonthat specifies the topics to move.Specify topic names as a comma-separated list.
Example JSON file to reassign all the partitions of
my-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow You can also use this file to change the replication factor of a topic.
Copy the
topics.jsonfile to the interactive pod container.oc cp topics.json <interactive_pod_name>:/tmp/topics.json
oc cp topics.json <interactive_pod_name>:/tmp/topics.jsonCopy to Clipboard Copied! Toggle word wrap Toggle overflow Start a shell process in the interactive pod container.
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bashCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <namespace> with the OpenShift namespace where the pod is running.
Use the
kafka-reassign-partitions.shcommand to generate the reassignment JSON.Example command to move the partitions of
my-topicto specified brokersbin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate
bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generateCopy to Clipboard Copied! Toggle word wrap Toggle overflow
19.3. Reassigning partitions after adding brokers Link kopierenLink in die Zwischenablage kopiert!
Use a reassignment file generated by the kafka-reassign-partitions.sh tool to reassign partitions after increasing the number of brokers in a Kafka cluster. The reassignment file should describe how partitions are reassigned to brokers in the enlarged Kafka cluster. You apply the reassignment specified in the file to the brokers and then verify the new partition assignments.
This procedure describes a secure scaling process that uses TLS. You’ll need a Kafka cluster that uses TLS encryption and mTLS authentication.
The kafka-reassign-partitions.sh tool can be used to reassign partitions within a Kafka cluster, regardless of whether you are managing all nodes through the cluster or using the node pools preview to manage groups of nodes within the cluster.
Though you can use the kafka-reassign-partitions.sh tool, Cruise Control is recommended for automated partition reassignments and cluster rebalancing. Cruise Control can move topics from one broker to another without any downtime, and it is the most efficient way to reassign partitions.
Prerequisites
-
You have a running Kafka cluster based on a
Kafkaresource configured with internal TLS encryption and mTLS authentication. -
You have generated a reassignment JSON file named
reassignment.json. - You are running an interactive pod container that is connected to the running Kafka broker.
-
You are connected as a
KafkaUserconfigured with ACL rules that specify permission to manage the Kafka cluster and its topics.
Procedure
-
Add as many new brokers as you need by increasing the
Kafka.spec.kafka.replicasconfiguration option. - Verify that the new broker pods have started.
-
If you haven’t done so, run an interactive pod container to generate a reassignment JSON file named
reassignment.json. Copy the
reassignment.jsonfile to the interactive pod container.oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.json
oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.jsonCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <interactive_pod_name> with the name of the pod.
Start a shell process in the interactive pod container.
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bashCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <namespace> with the OpenShift namespace where the pod is running.
Run the partition reassignment using the
kafka-reassign-partitions.shscript from the interactive pod container.bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --execute
bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <cluster_name> with the name of your Kafka cluster. For example,
my-cluster-kafka-bootstrap:9093If you are going to throttle replication, you can also pass the
--throttleoption with an inter-broker throttled rate in bytes per second. For example:Copy to Clipboard Copied! Toggle word wrap Toggle overflow This command will print out two reassignment JSON objects. The first records the current assignment for the partitions being moved. You should save this to a local file (not a file in the pod) in case you need to revert the reassignment later on. The second JSON object is the target reassignment you have passed in your reassignment JSON file.
If you need to change the throttle during reassignment, you can use the same command with a different throttled rate. For example:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the reassignment has completed using the
kafka-reassign-partitions.shcommand line tool from any of the broker pods. This is the same command as the previous step, but with the--verifyoption instead of the--executeoption.bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --verify
bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow The reassignment has finished when the
--verifycommand reports that each of the partitions being moved has completed successfully. This final--verifywill also have the effect of removing any reassignment throttles.- You can now delete the revert file if you saved the JSON for reverting the assignment to their original brokers.
19.4. Reassigning partitions before removing brokers Link kopierenLink in die Zwischenablage kopiert!
Use a reassignment file generated by the kafka-reassign-partitions.sh tool to reassign partitions before decreasing the number of brokers in a Kafka cluster. The reassignment file must describe how partitions are reassigned to the remaining brokers in the Kafka cluster. You apply the reassignment specified in the file to the brokers and then verify the new partition assignments. Brokers in the highest numbered pods are removed first.
This procedure describes a secure scaling process that uses TLS. You’ll need a Kafka cluster that uses TLS encryption and mTLS authentication.
The kafka-reassign-partitions.sh tool can be used to reassign partitions within a Kafka cluster, regardless of whether you are managing all nodes through the cluster or using the node pools preview to manage groups of nodes within the cluster.
Though you can use the kafka-reassign-partitions.sh tool, Cruise Control is recommended for automated partition reassignments and cluster rebalancing. Cruise Control can move topics from one broker to another without any downtime, and it is the most efficient way to reassign partitions.
Prerequisites
-
You have a running Kafka cluster based on a
Kafkaresource configured with internal TLS encryption and mTLS authentication. -
You have generated a reassignment JSON file named
reassignment.json. - You are running an interactive pod container that is connected to the running Kafka broker.
-
You are connected as a
KafkaUserconfigured with ACL rules that specify permission to manage the Kafka cluster and its topics.
Procedure
-
If you haven’t done so, run an interactive pod container to generate a reassignment JSON file named
reassignment.json. Copy the
reassignment.jsonfile to the interactive pod container.oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.json
oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.jsonCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <interactive_pod_name> with the name of the pod.
Start a shell process in the interactive pod container.
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bashCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <namespace> with the OpenShift namespace where the pod is running.
Run the partition reassignment using the
kafka-reassign-partitions.shscript from the interactive pod container.bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --execute
bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <cluster_name> with the name of your Kafka cluster. For example,
my-cluster-kafka-bootstrap:9093If you are going to throttle replication, you can also pass the
--throttleoption with an inter-broker throttled rate in bytes per second. For example:Copy to Clipboard Copied! Toggle word wrap Toggle overflow This command will print out two reassignment JSON objects. The first records the current assignment for the partitions being moved. You should save this to a local file (not a file in the pod) in case you need to revert the reassignment later on. The second JSON object is the target reassignment you have passed in your reassignment JSON file.
If you need to change the throttle during reassignment, you can use the same command with a different throttled rate. For example:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the reassignment has completed using the
kafka-reassign-partitions.shcommand line tool from any of the broker pods. This is the same command as the previous step, but with the--verifyoption instead of the--executeoption.bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --verify
bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow The reassignment has finished when the
--verifycommand reports that each of the partitions being moved has completed successfully. This final--verifywill also have the effect of removing any reassignment throttles.- You can now delete the revert file if you saved the JSON for reverting the assignment to their original brokers.
When all the partition reassignments have finished, the brokers being removed should not have responsibility for any of the partitions in the cluster. You can verify this by checking that the broker’s data log directory does not contain any live partition logs. If the log directory on the broker contains a directory that does not match the extended regular expression
\.[a-z0-9]-delete$, the broker still has live partitions and should not be stopped.You can check this by executing the command:
oc exec my-cluster-kafka-0 -c kafka -it -- \ /bin/bash -c \ "ls -l /var/lib/kafka/kafka-log_<n>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"
oc exec my-cluster-kafka-0 -c kafka -it -- \ /bin/bash -c \ "ls -l /var/lib/kafka/kafka-log_<n>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"Copy to Clipboard Copied! Toggle word wrap Toggle overflow where n is the number of the pods being deleted.
If the above command prints any output then the broker still has live partitions. In this case, either the reassignment has not finished or the reassignment JSON file was incorrect.
-
When you have confirmed that the broker has no live partitions, you can edit the
Kafka.spec.kafka.replicasproperty of yourKafkaresource to reduce the number of brokers.
19.5. Changing the replication factor of topics Link kopierenLink in die Zwischenablage kopiert!
To change the replication factor of topics in a Kafka cluster, use the kafka-reassign-partitions.sh tool. This can be done by running the tool from an interactive pod container that is connected to the Kafka cluster, and using a reassignment file to describe how the topic replicas should be changed.
This procedure describes a secure process that uses TLS. You’ll need a Kafka cluster that uses TLS encryption and mTLS authentication.
Prerequisites
-
You have a running Kafka cluster based on a
Kafkaresource configured with internal TLS encryption and mTLS authentication. - You are running an interactive pod container that is connected to the running Kafka broker.
-
You have generated a reassignment JSON file named
reassignment.json. -
You are connected as a
KafkaUserconfigured with ACL rules that specify permission to manage the Kafka cluster and its topics.
See Generating reassignment JSON files.
In this procedure, a topic called my-topic has 4 replicas and we want to reduce it to 3. A JSON file named topics.json specifies the topic, and was used to generate the reassignment.json file.
Example JSON file specifies my-topic
Procedure
If you haven’t done so, run an interactive pod container to generate a reassignment JSON file named
reassignment.json.Example reassignment JSON file showing the current and proposed replica assignment
Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[3,4,2,0],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[0,2,3,1],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[1,3,0,4],"log_dirs":["any","any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3,4],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4,0],"log_dirs":["any","any","any","any"]}]}Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[3,4,2,0],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[0,2,3,1],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[1,3,0,4],"log_dirs":["any","any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3,4],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4,0],"log_dirs":["any","any","any","any"]}]}Copy to Clipboard Copied! Toggle word wrap Toggle overflow Save a copy of this file locally in case you need to revert the changes later on.
Edit the
reassignment.jsonto remove a replica from each partition.For example use
jqto remove the last replica in the list for each partition of the topic:Removing the last topic replica for each partition
jq '.partitions[].replicas |= del(.[-1])' reassignment.json > reassignment.json
jq '.partitions[].replicas |= del(.[-1])' reassignment.json > reassignment.jsonCopy to Clipboard Copied! Toggle word wrap Toggle overflow Example reassignment file showing the updated replicas
{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4],"log_dirs":["any","any","any","any"]}]}{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any","any"]},{"topic":"my-topic","partition":2,"replicas":[2,3,4],"log_dirs":["any","any","any","any"]}]}Copy to Clipboard Copied! Toggle word wrap Toggle overflow Copy the
reassignment.jsonfile to the interactive pod container.oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.json
oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.jsonCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <interactive_pod_name> with the name of the pod.
Start a shell process in the interactive pod container.
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bashCopy to Clipboard Copied! Toggle word wrap Toggle overflow Replace <namespace> with the OpenShift namespace where the pod is running.
Make the topic replica change using the
kafka-reassign-partitions.shscript from the interactive pod container.bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --execute
bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow NoteRemoving replicas from a broker does not require any inter-broker data movement, so there is no need to throttle replication. If you are adding replicas, then you may want to change the throttle rate.
Verify that the change to the topic replicas has completed using the
kafka-reassign-partitions.shcommand line tool from any of the broker pods. This is the same command as the previous step, but with the--verifyoption instead of the--executeoption.bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --verify
bin/kafka-reassign-partitions.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --reassignment-json-file /tmp/reassignment.json \ --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow The reassignment has finished when the
--verifycommand reports that each of the partitions being moved has completed successfully. This final--verifywill also have the effect of removing any reassignment throttles.Run the
bin/kafka-topics.shcommand with the--describeoption to see the results of the change to the topics.bin/kafka-topics.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --describe
bin/kafka-topics.sh --bootstrap-server <cluster_name>-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --describeCopy to Clipboard Copied! Toggle word wrap Toggle overflow Results of reducing the number of replicas for a topic
my-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 my-topic Partition: 1 Leader: 2 Replicas: 1,2,3 Isr: 1,2,3 my-topic Partition: 2 Leader: 3 Replicas: 2,3,4 Isr: 2,3,4
my-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 my-topic Partition: 1 Leader: 2 Replicas: 1,2,3 Isr: 1,2,3 my-topic Partition: 2 Leader: 3 Replicas: 2,3,4 Isr: 2,3,4Copy to Clipboard Copied! Toggle word wrap Toggle overflow