Chapter 4. Running a multi-node environment
A multi-node environment comprises a number of nodes that operate as a cluster. You can have a cluster of replicated ZooKeeper nodes and a cluster of broker nodes, with topic replication across the brokers.
Multi-node environments offer stability and availability.
4.1. Running a multi-node ZooKeeper cluster Copy linkLink copied to clipboard!
Configure and run ZooKeeper as a multi-node cluster.
Prerequisites
- Streams for Apache Kafka is installed on all hosts which will be used as ZooKeeper cluster nodes.
Running the cluster
Create the
myidfile in/var/lib/zookeeper/. Enter ID1for the first ZooKeeper node,2for the second ZooKeeper node, and so on.su - kafka echo "<NodeID>" > /var/lib/zookeeper/myid
su - kafka echo "<NodeID>" > /var/lib/zookeeper/myidCopy to Clipboard Copied! Toggle word wrap Toggle overflow For example:
su - kafka echo "1" > /var/lib/zookeeper/myid
su - kafka echo "1" > /var/lib/zookeeper/myidCopy to Clipboard Copied! Toggle word wrap Toggle overflow Edit the ZooKeeper
/opt/kafka/config/zookeeper.propertiesconfiguration file for the following:-
Set the option
dataDirto/var/lib/zookeeper/. -
Configure the
initLimitandsyncLimitoptions. -
Configure the
reconfigEnabledandstandaloneEnabledoptions. Add a list of all ZooKeeper nodes. The list should include also the current node.
Example configuration for a node of ZooKeeper cluster with five members
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
-
Set the option
Start ZooKeeper with the default configuration file.
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that ZooKeeper is running.
jcmd | grep zookeeper
jcmd | grep zookeeperCopy to Clipboard Copied! Toggle word wrap Toggle overflow Returns:
number org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/kafka/config/zookeeper.properties
number org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/kafka/config/zookeeper.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow - Repeat this procedure on all the nodes of the cluster.
Verify that all nodes are members of the cluster by sending a
statcommand to each of the nodes using thencatutility.Use ncat stat to check the node status
echo stat | ncat localhost 2181
echo stat | ncat localhost 2181Copy to Clipboard Copied! Toggle word wrap Toggle overflow To use four-letter word commands, like
stat, you need to specify4lw.commands.whitelist=*inzookeeper.properties.The output shows that a node is either a
leaderorfollower.Example output from the ncat command
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.2. Running a multi-node Kafka cluster Copy linkLink copied to clipboard!
Configure and run Kafka as a multi-node cluster.
Prerequisites
- Streams for Apache Kafka is installed on each host, and the configuration files are available.
- A ZooKeeper cluster is configured and running.
Running the cluster
For each Kafka broker in your Streams for Apache Kafka cluster:
Edit the
/opt/kafka/config/server.propertiesKafka configuration file as follows:-
Set the
broker.idfield to0for the first broker,1for the second broker, and so on. -
Configure the details for connecting to ZooKeeper in the
zookeeper.connectoption. - Configure the Kafka listeners.
Set the directories where the commit logs should be stored in the
logs.dirdirectory.Here we see an example configuration for a Kafka broker:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow In a typical installation where each Kafka broker is running on identical hardware, only the
broker.idconfiguration property will differ between each broker config.
-
Set the
Start the Kafka broker with the default configuration file.
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the Kafka broker is running.
jcmd | grep Kafka
jcmd | grep KafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Returns:
process ID kafka.Kafka /opt/kafka/config/server.properties
process ID kafka.Kafka /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that all nodes are members of the Kafka cluster by sending a
dumpcommand to one of the ZooKeeper nodes using thencatutility.Use ncat dump to check all Kafka brokers registered in ZooKeeper
echo dump | ncat zoo1.my-domain.com 2181
echo dump | ncat zoo1.my-domain.com 2181Copy to Clipboard Copied! Toggle word wrap Toggle overflow To use four-letter word commands, like
dump, you need to specify4lw.commands.whitelist=*inzookeeper.properties.The output must contain all Kafka brokers you just configured and started.
Example output from the ncat command for a Kafka cluster with 3 nodes
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.3. Performing a graceful rolling restart of Kafka brokers Copy linkLink copied to clipboard!
This procedure shows how to do a graceful rolling restart of brokers in a multi-node cluster. A rolling restart is usually required following an upgrade or change to the Kafka cluster configuration properties.
Some broker configurations do not need a restart of the broker. For more information, see Updating Broker Configs in the Apache Kafka documentation.
After you perform a restart of brokers, check for under-replicated topic partitions to make sure that replica partitions have caught up.
To achieve a graceful restart with no loss of availability, ensure that you are replicating topics and that at least the minimum number of replicas (min.insync.replicas) replicas are in sync. The min.insync.replicas configuration determines the minimum number of replicas that must acknowledge a write for the write to be considered successful.
For a multi-node cluster, the standard approach is to have a topic replication factor of at least 3 and a minimum number of in-sync replicas set to 1 less than the replication factor. If you are using acks=all in your producer configuration for data durability, check that the broker you restarted is in sync with all the partitions it’s replicating before restarting the next broker.
Single-node clusters are unavailable during a restart, since all partitions are on the same broker.
Prerequisites
- A ZooKeeper cluster is configured and running.
The Kafka cluster is operating as expected.
Check for under-replicated partitions or any other issues affecting broker operation. The steps in this procedure describe how to check for under-replicated partitions.
Procedure
Perform the following steps on each Kafka broker. Complete the steps on the first broker before moving on to the next. Perform the steps on the broker that’s the active controller last. Otherwise, the active controller needs to change on more than one restart.
Stop the Kafka broker:
/opt/kafka/bin/kafka-server-stop.sh
/opt/kafka/bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow Make any changes to the broker configuration that require a restart after completion.
For further information, see the following:
Restart the Kafka broker:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check that Kafka is running:
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Returns:
process ID kafka.Kafka /opt/kafka/config/server.properties
process ID kafka.Kafka /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that all nodes are members of the Kafka cluster by sending a
dumpcommand to one of the ZooKeeper nodes using thencatutility.Use ncat dump to check all Kafka brokers registered in ZooKeeper
echo dump | ncat zoo1.my-domain.com 2181
echo dump | ncat zoo1.my-domain.com 2181Copy to Clipboard Copied! Toggle word wrap Toggle overflow To use four-letter word commands, like
dump, you need to specify4lw.commands.whitelist=*inzookeeper.properties.The output must contain the Kafka broker you started.
Example output from the ncat command for a Kafka cluster with 3 nodes
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Wait until the broker has zero under-replicated partitions. You can check from the command line or use metrics.
Use the
kafka-topics.shcommand with the--under-replicated-partitionsparameter:/opt/kafka/bin/kafka-topics.sh --bootstrap-server <bootstrap_address> --describe --under-replicated-partitions
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <bootstrap_address> --describe --under-replicated-partitionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow For example:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow The command provides a list of topics with under-replicated partitions in a cluster.
Topics with under-replicated partitions
Topic: topic3 Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2 Topic: topic3 Partition: 5 Leader: 3 Replicas: 1,2 Isr: 1 Topic: topic1 Partition: 1 Leader: 3 Replicas: 1,3 Isr: 3 # …
Topic: topic3 Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2 Topic: topic3 Partition: 5 Leader: 3 Replicas: 1,2 Isr: 1 Topic: topic1 Partition: 1 Leader: 3 Replicas: 1,3 Isr: 3 # …Copy to Clipboard Copied! Toggle word wrap Toggle overflow Under-replicated partitions are listed if the ISR (in-sync replica) count is less than the number of replicas. If a list is not returned, there are no under-replicated partitions.
Use the
UnderReplicatedPartitionsmetric:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow The metric provides a count of partitions where replicas have not caught up. You wait until the count is zero.
TipUse the Kafka Exporter to create an alert when there are one or more under-replicated partitions for a topic.
Checking logs when restarting
If a broker fails to start, check the application logs for information. You can also check the status of a broker shutdown and restart in the /opt/kafka/logs/server.log application log.
Log for the successful shutdown of a broker
Log for the successful restart of a broker
... ...
# ...
[2022-06-08 14:39:35,245] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
# ...