Chapter 4. Running a multi-node environment
A multi-node environment comprises a number of nodes that operate as a cluster. You can have a cluster of replicated ZooKeeper nodes and a cluster of broker nodes, with topic replication across the brokers.
Multi-node environments offer stability and availability.
4.1. Running a multi-node ZooKeeper cluster
Configure and run ZooKeeper as a multi-node cluster.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as ZooKeeper cluster nodes.
Running the cluster
Create the
myid
file in/var/lib/zookeeper/
. Enter ID1
for the first ZooKeeper node,2
for the second ZooKeeper node, and so on.su - kafka echo "<NodeID>" > /var/lib/zookeeper/myid
For example:
su - kafka echo "1" > /var/lib/zookeeper/myid
Edit the ZooKeeper
/opt/kafka/config/zookeeper.properties
configuration file for the following:-
Set the option
dataDir
to/var/lib/zookeeper/
. -
Configure the
initLimit
andsyncLimit
options. -
Configure the
reconfigEnabled
andstandaloneEnabled
options. Add a list of all ZooKeeper nodes. The list should include also the current node.
Example configuration for a node of ZooKeeper cluster with five members
tickTime=2000 dataDir=/var/lib/zookeeper/ initLimit=5 syncLimit=2 reconfigEnabled=true standaloneEnabled=false listener.security.protocol.map=PLAINTEXT:PLAINTEXT,REPLICATION:PLAINTEXT server.1=172.17.0.1:2888:3888:participant;172.17.0.1:2181 server.2=172.17.0.2:2888:3888:participant;172.17.0.2:2181 server.3=172.17.0.3:2888:3888:participant;172.17.0.3:2181 server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181 server.5=172.17.0.5:2888:3888:participant;172.17.0.5:2181
-
Set the option
Start ZooKeeper with the default configuration file.
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
Verify that ZooKeeper is running.
jcmd | grep zookeeper
Returns:
number org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/kafka/config/zookeeper.properties
- Repeat this procedure on all the nodes of the cluster.
Verify that all nodes are members of the cluster by sending a
stat
command to each of the nodes using thencat
utility.Use ncat stat to check the node status
echo stat | ncat localhost 2181
To use four-letter word commands, like
stat
, you need to specify4lw.commands.whitelist=*
inzookeeper.properties
.The output shows that a node is either a
leader
orfollower
.Example output from the ncat command
ZooKeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT Clients: /0:0:0:0:0:0:0:1:59726[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x200000000 Mode: follower Node count: 4
4.2. Running a multi-node Kafka cluster
Configure and run Kafka as a multi-node cluster.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
- A ZooKeeper cluster is configured and running.
Running the cluster
For each Kafka broker in your AMQ Streams cluster:
Edit the
/opt/kafka/config/server.properties
Kafka configuration file as follows:-
Set the
broker.id
field to0
for the first broker,1
for the second broker, and so on. -
Configure the details for connecting to ZooKeeper in the
zookeeper.connect
option. - Configure the Kafka listeners.
Set the directories where the commit logs should be stored in the
logs.dir
directory.Here we see an example configuration for a Kafka broker:
broker.id=0 zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181 listeners=REPLICATION://:9091,PLAINTEXT://:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,REPLICATION:PLAINTEXT inter.broker.listener.name=REPLICATION log.dirs=/var/lib/kafka
In a typical installation where each Kafka broker is running on identical hardware, only the
broker.id
configuration property will differ between each broker config.
-
Set the
Start the Kafka broker with the default configuration file.
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Verify that the Kafka broker is running.
jcmd | grep Kafka
Returns:
number kafka.Kafka /opt/kafka/config/server.properties
Verify that all nodes are members of the Kafka cluster by sending a
dump
command to one of the ZooKeeper nodes using thencat
utility.Use ncat dump to check all Kafka brokers registered in ZooKeeper
echo dump | ncat zoo1.my-domain.com 2181
To use four-letter word commands, like
dump
, you need to specify4lw.commands.whitelist=*
inzookeeper.properties
.The output must contain all Kafka brokers you just configured and started.
Example output from the ncat command for a Kafka cluster with 3 nodes
SessionTracker dump: org.apache.zookeeper.server.quorum.LearnerSessionTracker@28848ab9 ephemeral nodes dump: Sessions with Ephemerals (3): 0x20000015dd00000: /brokers/ids/1 0x10000015dc70000: /controller /brokers/ids/0 0x10000015dc70001: /brokers/ids/2
4.3. Performing a graceful rolling restart of Kafka brokers
This procedure shows how to do a graceful rolling restart of brokers in a multi-node cluster. A rolling restart is usually required following an upgrade or change to the Kafka cluster configuration properties.
Some broker configurations do not need a restart of the broker. For more information, see Updating Broker Configs in the Apache Kafka documentation.
After you perform a restart of a broker, check for under-replicated topic partitions to make sure that replica partitions have caught up.
You can only perform a graceful restart, with no loss of availability, if you are replicating topics and ensuring that at least one replica is in sync. For a multi-node cluster, the standard approach is to have a topic replication factor of at least 3 and a minimum number of in-sync replicas set to 1 less than the replication factor. If you are using acks=all
in your producer configuration for data durability, check that the broker you restarted is in sync with all the partitions it’s replicating before restarting the next broker.
Single-node clusters are unavailable during a restart, since all partitions are on the same broker.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
- A ZooKeeper cluster is configured and running.
The Kafka cluster is operating as expected.
Check for under-replicated partitions or any other issues affecting broker operation. The steps in this procedure describe how to check for under-replicated partitions.
Procedure
Perform the following steps on each Kafka broker. Complete the steps on the first broker before moving on to the next. Perform the steps on the broker that’s the active controller last. Otherwise, the active controller needs to change on more than one restart.
Stop the Kafka broker:
/opt/kafka/bin/kafka-server-stop.sh
Make any changes to the broker configuration that require a restart after completion.
For further information, see the following:
Restart the Kafka broker:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Check that Kafka is running:
jcmd | grep kafka
Returns:
number kafka.Kafka /opt/kafka/config/server.properties
Verify that all nodes are members of the Kafka cluster by sending a
dump
command to one of the ZooKeeper nodes using thencat
utility.Use ncat dump to check all Kafka brokers registered in ZooKeeper
echo dump | ncat zoo1.my-domain.com 2181
To use four-letter word commands, like
dump
, you need to specify4lw.commands.whitelist=*
inzookeeper.properties
.The output must contain the Kafka broker you started.
Example output from the ncat command for a Kafka cluster with 3 nodes
SessionTracker dump: org.apache.zookeeper.server.quorum.LearnerSessionTracker@28848ab9 ephemeral nodes dump: Sessions with Ephemerals (3): 0x20000015dd00000: /brokers/ids/1 0x10000015dc70000: /controller /brokers/ids/0 0x10000015dc70001: /brokers/ids/2
Wait until the broker has zero under-replicated partitions. You can check from the command line or use metrics.
Use the
kafka-topics.sh
command with the--under-replicated-partitions
parameter:/opt/kafka/bin/kafka-topics.sh --bootstrap-server <bootstrap_address> --describe --under-replicated-partitions
For example:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
The command provides a list of topics with under-replicated partitions in a cluster.
Topics with under-replicated partitions
Topic: topic3 Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2 Topic: topic3 Partition: 5 Leader: 3 Replicas: 1,2 Isr: 1 Topic: topic1 Partition: 1 Leader: 3 Replicas: 1,3 Isr: 3 # …
Under-replicated partitions are listed if the ISR (in-sync replica) count is less than the number of replicas. If a list is not returned, there are no under-replicated partitions.
Use the
UnderReplicatedPartitions
metric:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
The metric provides a count of partitions where replicas have not caught up. You wait until the count is zero.
TipUse the Kafka Exporter to create an alert when there are one or more under-replicated partitions for a topic.
Checking logs when restarting
If a broker fails to start, check the application logs for information. You can also check the status of a broker shutdown and restart in the /opt/kafka/logs/server.log
application log.
Log for the successful shutdown of a broker
# ... [2022-06-08 14:32:29,885] INFO Terminating process due to signal SIGTERM (org.apache.kafka.common.utils.LoggingSignalHandler) [2022-06-08 14:32:29,886] INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer) [2022-06-08 14:32:29,887] INFO [KafkaServer id=0] Starting controlled shutdown (kafka.server.KafkaServer) [2022-06-08 14:32:29,896] INFO [KafkaServer id=0] Controlled shutdown request returned successfully after 6ms (kafka.server.KafkaServer) # ...
Log for the successful restart of a broker
# ... [2022-06-08 14:39:35,245] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) # ...