Chapter 12. Managing Kafka
Use additional configuration properties to maintain a deployment of AMQ Streams. You can add and adjust settings to respond to the performance of AMQ Streams. For example, you can introduce additional configuration to improve throughput and data reliability.
12.1. Setting limits on brokers using the Kafka Static Quota plugin
The Kafka Static Quota plugin is a Technology Preview only. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.
Use the Kafka Static Quota plugin to set throughput and storage limits on brokers in your Kafka cluster. You enable the plugin and set limits by adding properties to the Kafka configuration file. You can set a byte-rate threshold and storage quotas to put limits on the clients interacting with your brokers.
You can set byte-rate thresholds for producer and consumer bandwidth. The total limit is distributed across all clients accessing the broker. For example, you can set a byte-rate threshold of 40 MBps for producers. If two producers are running, they are each limited to a throughput of 20 MBps.
Storage quotas throttle Kafka disk storage limits between a soft limit and hard limit. The limits apply to all available disk space. Producers are slowed gradually between the soft and hard limit. The limits prevent disks filling up too quickly and exceeding their capacity. Full disks can lead to issues that are hard to rectify. The hard limit is the maximum storage limit.
For JBOD storage, the limit applies across all disks. If a broker is using two 1 TB disks and the quota is 1.1 TB, one disk might fill and the other disk will be almost empty.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
- A ZooKeeper cluster is configured and running.
Procedure
Edit the
/opt/kafka/config/server.properties
Kafka configuration file.The plugin properties are shown in this example configuration.
Example Kafka Static Quota plugin configuration
# ... client.quota.callback.class=io.strimzi.kafka.quotas.StaticQuotaCallback 1 client.quota.callback.static.produce=1000000 2 client.quota.callback.static.fetch=1000000 3 client.quota.callback.static.storage.soft=400000000000 4 client.quota.callback.static.storage.hard=500000000000 5 client.quota.callback.static.storage.check-interval=5 6 # ...
- 1
- Loads the Kafka Static Quota plugin.
- 2
- Sets the producer byte-rate threshold. 1 MBps in this example.
- 3
- Sets the consumer byte-rate threshold. 1 MBps in this example.
- 4
- Sets the lower soft limit for storage. 400 GB in this example.
- 5
- Sets the higher hard limit for storage. 500 GB in this example.
- 6
- Sets the interval in seconds between checks on storage. 5 seconds in this example. You can set this to 0 to disable the check.
Start the Kafka broker with the default configuration file.
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Verify that the Kafka broker is running.
jcmd | grep Kafka
12.2. Scaling Kafka clusters
You can add or remove brokers from a Kafka cluster. You can also add or remove nodes from ZooKeeper cluster.
When you add or remove brokers, you can use the kafka-reassign-partitions.sh
to assign partitions.
You can also use Cruise Control to incorporate changes to the number of brokers when rebalancing a Kafka cluster. You can install new brokers and include them in the rebalance. You can perform a rebalance that excludes brokers from the rebalance before removing them. For more information, see Chapter 14, Using Cruise Control for cluster rebalancing.
12.2.1. Adding and removing brokers from a Kafka cluster
The primary way of increasing throughput for a topic is to increase the number of partitions for that topic. The partitions allow the load for that topic to be shared between the brokers in the cluster. When the brokers are constrained by a resource (typically I/O), then using more partitions will not yield an increase in throughput. Instead, you must add brokers to the cluster.
When you add an extra broker to the cluster, AMQ Streams does not assign any partitions to it automatically. You have to decide which partitions to move from the existing brokers to the new broker. Once the partitions have been redistributed between all brokers, each broker should have a lower resource utilization.
Before you remove a broker from a cluster, you must ensure that it is not assigned to any partitions. You should decide which remaining brokers will be responsible for each of the partitions on the broker being decommissioned. Once the broker has no assigned partitions, you can stop it.
12.2.2. Reassignment of partitions
The kafka-reassign-partitions.sh
utility is used to reassign partitions to different brokers.
It has three different modes:
--generate
- Takes a set of topics and brokers and generates a reassignment JSON file which will result in the partitions of those topics being assigned to those brokers. It is an easy way to generate a reassignment JSON file, but it operates on whole topics, so its use is not always appropriate.
--execute
- Takes a reassignment JSON file and applies it to the partitions and brokers in the cluster. Brokers which are gaining partitions will become followers of the partition leader. For a given partition, once the new broker has caught up and joined the ISR the old broker will stop being a follower and will delete its replica.
--verify
-
Using the same reassignment JSON file as the
--execute
step,--verify
checks whether all of the partitions in the file have been moved to their intended brokers. If the reassignment is complete it will also remove any throttles which are in effect. Unless removed, throttles will continue to affect the cluster even after the reassignment has finished.
It is only possible to have one reassignment running in the cluster at any given time, and it is not possible to cancel a running reassignment. If you need to cancel a reassignment you have to wait for it to complete and then perform another reassignment to revert the effects of the first one. The kafka-reassign-partitions.sh
will print the reassignment JSON for this reversion as part of its output. Very large reassignments should be broken down into a number of smaller reassignments in case there is a need to stop in-progress reassignment.
12.2.2.1. Reassignment JSON file
The reassignment JSON file has a specific structure:
{
"version": 1,
"partitions": [
<PartitionObjects>
]
}
Where <PartitionObjects> is a comma-separated list of objects like:
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ], "log_dirs": [<LogDirs>] }
The "log_dirs"
property is optional and is used to move the partition to a specific log directory.
The following is an example reassignment JSON file that assigns topic topic-a
, partition 4
to brokers 2
, 4
and 7
, and topic topic-b
partition 2
to brokers 1
, 5
and 7
:
{ "version": 1, "partitions": [ { "topic": "topic-a", "partition": 4, "replicas": [2,4,7] }, { "topic": "topic-b", "partition": 2, "replicas": [1,5,7] } ] }
Partitions not included in the JSON are not changed.
12.2.2.2. Generating reassignment JSON files
The easiest way to assign all the partitions for a given set of topics to a given set of brokers is to generate a reassignment JSON file using the kafka-reassign-partitions.sh --generate
command.
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --topics-to-move-json-file <topics_file> --broker-list <broker_list> --generate
The <topics_file>
is a JSON file which lists the topics to move. It has the following structure:
{
"version": 1,
"topics": [
<topic_objects>
]
}
where <topic_objects> is a comma-separated list of objects like:
{
"topic": <TopicName>
}
For example to move all the partitions of topic-a
and topic-b
to brokers 4
and 7
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
where topics-to-be-moved.json
has contents:
{ "version": 1, "topics": [ { "topic": "topic-a"}, { "topic": "topic-b"} ] }
12.2.2.3. Creating reassignment JSON files manually
You can manually create the reassignment JSON file if you want to move specific partitions.
12.2.3. Reassignment throttles
Reassigning partitions can be a slow process because it can require moving lots of data between brokers. To avoid this having a detrimental impact on clients it is possible to throttle the reassignment. Using a throttle can mean the reassignment takes longer. If the throttle is too low then the newly assigned brokers will not be able to keep up with records being published and the reassignment will never complete. If the throttle is too high then clients will be impacted. For example, for producers, this could manifest as higher than normal latency waiting for acknowledgement. For consumers, this could manifest as a drop in throughput caused by higher latency between polls.
12.2.4. Scaling up a Kafka cluster
This procedure describes how to increase the number of brokers in a Kafka cluster.
Prerequisites
- An existing Kafka cluster.
- A new machine with the AMQ broker installed.
- A reassignment JSON file of how partitions should be reassigned to brokers in the enlarged cluster.
Procedure
-
Create a configuration file for the new broker using the same settings as for the other brokers in your cluster, except for
broker.id
which should be a number that is not already used by any of the other brokers. Start the new Kafka broker passing the configuration file you created in the previous step as the argument to the
kafka-server-start.sh
script:su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Verify that the Kafka broker is running.
jcmd | grep Kafka
- Repeat the above steps for each new broker.
Run the partition reassignment using the
kafka-reassign-partitions.sh
command line tool./opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute
If you are going to throttle replication you can also pass the
--throttle
option with an inter-broker throttled rate in bytes per second. For example:/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute
This command will print out two reassignment JSON objects. The first records the current assignment for the partitions being moved. You should save this to a file in case you need to revert the reassignment later on. The second JSON object is the target reassignment you have passed in your reassignment JSON file.
If you need to change the throttle during reassignment you can use the same command line with a different throttled rate. For example:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute
Periodically verify whether the reassignment has completed using the
kafka-reassign-partitions.sh
command line tool. This is the same command as the previous step but with the--verify
option instead of the--execute
option./opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify
For example:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify
-
The reassignment has finished when the
--verify
command reports each of the partitions being moved as completed successfully. This final--verify
will also have the effect of removing any reassignment throttles. You can now delete the revert file if you saved the JSON for reverting the assignment to their original brokers.
12.2.5. Scaling down a Kafka cluster
This procedure describes how to decrease the number of brokers in a Kafka cluster.
Prerequisites
- An existing Kafka cluster.
- A reassignment JSON file of how partitions should be reassigned to brokers in the cluster once the broker(s) have been removed.
Procedure
Run the partition reassignment using the
kafka-reassign-partitions.sh
command line tool./opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute
If you are going to throttle replication you can also pass the
--throttle
option with an inter-broker throttled rate in bytes per second. For example:/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute
This command will print out two reassignment JSON objects. The first records the current assignment for the partitions being moved. You should save this to a file in case you need to revert the reassignment later on. The second JSON object is the target reassignment you have passed in your reassignment JSON file.
If you need to change the throttle during reassignment you can use the same command line with a different throttled rate. For example:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute
Periodically verify whether the reassignment has completed using the
kafka-reassign-partitions.sh
command line tool. This is the same command as the previous step but with the--verify
option instead of the--execute
option./opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify
For example:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify
-
The reassignment has finished when the
--verify
command reports each of the partitions being moved as completed successfully. This final--verify
will also have the effect of removing any reassignment throttles. You can now delete the revert file if you saved the JSON for reverting the assignment to their original brokers. Check that each broker being removed does not have any live partitions in its log (
log.dirs
).ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
If a log directory does not match the regular expression
\.[a-z0-9]-delete$
, active partitions are still present. If you have active partitions, check the reassignment has finished or the configuration in the reassignment JSON file. You can run the reassignment again. Make sure that there are no active partitions before moving on to the next step.Stop the broker.
su - kafka /opt/kafka/bin/kafka-server-stop.sh
Confirm that the Kafka broker has stopped.
jcmd | grep kafka
12.2.6. Scaling up a ZooKeeper cluster
This procedure describes how to add servers (nodes) to a ZooKeeper cluster. The dynamic reconfiguration feature of ZooKeeper maintains a stable ZooKeeper cluster during the scale up process.
Prerequisites
-
Dynamic reconfiguration is enabled in the ZooKeeper configuration file (
reconfigEnabled=true
). - ZooKeeper authentication is enabled and you can access the new server using the authentication mechanism.
Procedure
Perform the following steps for each ZooKeeper server, one at a time:
- Add a server to the ZooKeeper cluster as described in Section 4.1, “Running a multi-node ZooKeeper cluster” and then start ZooKeeper.
- Note the IP address and configured access ports of the new server.
Start a
zookeeper-shell
session for the server. Run the following command from a machine that has access to the cluster (this might be one of the ZooKeeper nodes or your local machine, if it has access).su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>
In the shell session, with the ZooKeeper node running, enter the following line to add the new server to the quorum as a voting member:
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>
For example:
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181
Where
<positive-id>
is the new server ID4
.For the two ports,
<port1>
2888 is for communication between ZooKeeper servers, and<port2>
3888 is for leader election.The new configuration propagates to the other servers in the ZooKeeper cluster; the new server is now a full member of the quorum.
- Repeat steps 1-4 for the other servers that you want to add.
12.2.7. Scaling down a ZooKeeper cluster
This procedure describes how to remove servers (nodes) from a ZooKeeper cluster. The dynamic reconfiguration feature of ZooKeeper maintains a stable ZooKeeper cluster during the scale down process.
Prerequisites
-
Dynamic reconfiguration is enabled in the ZooKeeper configuration file (
reconfigEnabled=true
). - ZooKeeper authentication is enabled and you can access the new server using the authentication mechanism.
Procedure
Perform the following steps for each ZooKeeper server, one at a time:
Log in to the
zookeeper-shell
on one of the servers that will be retained after the scale down (for example, server 1).NoteAccess the server using the authentication mechanism configured for the ZooKeeper cluster.
Remove a server, for example server 5.
reconfig -remove 5
- Deactivate the server that you removed.
- Repeat steps 1-3 to reduce the cluster size.
Additional resources
- Removing servers in the ZooKeeper documentation