Chapter 8. Failing over to a disaster recovery cluster
Failing over is the act of switching the primary cluster to the disaster recovery cluster. It can be planned, for example to proactively manage an incoming disaster or to exercise a disaster recovery plan, or unplanned when a disaster strikes on the primary cluster. In either case, the decision to failover has to be taken by an operator or an external process, it is not something MirrorMaker does. However, MirrorMaker does provide the information to help you make the decision.
Failing over also implies moving the client applications from one cluster to another. You need processes and tools to quickly shut down applications, and restart them (possibly in another region, if preparing for a full region failure) with an updated configuration so that they connect to the new primary cluster.
8.1. Performing a planned failover (migration) Copy linkLink copied to clipboard!
When doing a planned failover or migration you can expect little to no data loss and a minimal impact on the applications. However, it requires certain conditions to be met before it can be done.
The first condition is to validate the gap (lag) between both clusters. MirrorMaker mirrors data asynchronously, so there is always a small data gap between the clusters. As described in the Metrics section, you can estimate the time delay between the clusters using the record-age-ms metric. For a planned failover, you ideally want this duration to be as short as possible. It should also match the RTO specified in the disaster recovery plan.
The second condition is the ability to shut down and restart applications in a controlled fashion, and with their configuration updated with the bootstrap servers for the other cluster. Before restarting, wait for the following to pass:
-
refresh.topics.interval.secondssince the last topic creation -
sync.topic.acls.interval.secondssince the last ACL update -
sync.topic.configs.interval.secondssince the last topic configuration change
The default value for these intervals is the same (600 seconds). If you want to adjust the values, it’s best to keep them aligned to not complicate this process.
Once these conditions are met, you can start the process. In this example, failover is from cluster A to cluster B. In this initial state, cluster B is very close to cluster A, it is just missing a small amount of records as the mirroring process is asynchronous.
Figure 8.1. Mirroring is asynchronous, so there is always some data on the source cluster that has not been mirrored yet
8.2. Steps to perform a planned failover Copy linkLink copied to clipboard!
Stop all applications that are producing to cluster A. Once they are stopped, you need to wait for
MirrorSourceConnectorto mirror the data gap between both clusters. Since the gap is short, it should be completed quickly. When it’s completed, the byte-rate metrics, such askafka.connect.mirror:type=MirrorSourceConnector,target=B,topic=my-topic,partition=0,byte-rate, should drop to 0, as there is no more data to copy from cluster A to cluster B.Figure 8.2. After stopping producers on the source cluster, MirrorMaker is able to mirror all existing data
Stop
MirrorSourceConnectorvia the Kafka Connect REST API using:curl -X PUT http://<CONNECT>/connectors/A-to-B-MirrorSourceConnector/stop(optional) Before restarting any applications, note the Log End Offsets (LEO) of all the mirrored topics on cluster B. Although this step is optional, it is required to allow mirroring the data delta between the clusters when failing back. You retrieve the Log End Offsets via the kafka-get-offsets.sh tool:
bin/kafka-get-offsets.sh \ --bootstrap-server <TARGET_BOOTSTRAP_SERVERS> \ --time latest \ --topic-partitions <TOPICS>The arguments are:
- <TARGET_BOOTSTRAP_SERVERS>
- A comma separated list of bootstrap servers of the target cluster, which is cluster B in this example. For clusters with authentication, you can put the required settings in a file and use the –command-config argument.
- <TOPICS>
- A comma separated list of topic names and regexes. If this cluster only has topics that were mirrored, you don’t need to specify this argument as the command defaults to returning the offsets for all topics.
Restart applications that now will only produce records to cluster B.
Figure 8.3. The state of the disaster recovery environment after restarting producers on cluster B
Wait for consuming applications connected to A to reach lag 0 before shutting down.
This guarantees an exact consumer offset translation. After stopping consumers, wait for
sync.group.offsets.interval.secondsto ensure their offsets are synced before restarting them on cluster B.However, consumers with too much lag to catch up within an acceptable timeframe would need to be shut down before reaching lag 0. This could result in consumers reprocessing some of the records depending on how far they are behind.
Wait for
sync.group.offsets.interval.secondsand then stopMirrorCheckpointConnectorvia the Kafka Connect REST API:curl -X PUT http://<CONNECT>/connectors/A-to-B-MirrorCheckpointConnector/stopRestart the consuming applications on cluster B to successfully fail over to cluster B.
Figure 8.4. The state of the disaster recovery environment after a successful planned failover
8.3. Handling an unplanned failover Copy linkLink copied to clipboard!
Unfortunately, a failover is not always planned and if a hard failure impacts the primary cluster you may have to perform an unplanned failover. There is a large spectrum of possible failures, so the impact on your data and workload can vary significantly.
In the worst case, you can lose all data that has not been mirrored. The amount of data can be estimated using metrics. The record-age-ms metric indicates the gap in seconds between the two clusters. You can also use the last incoming byte rate from the source cluster to estimate the amount of data potentially lost.
The accuracy of the translated consumer group offsets is lower than in a planned failover because translation happens at a regular interval and the time of the last translation may be unknown.
Depending on the state of cluster A, attempting to recover data might be possible. However, plan for a full loss of the cluster. Do not rely on extracting data after a disaster.
Figure 8.5. Cluster A is impacted by a failure
In this scenario, the failover is straightforward, as you simply need to stop the two MirrorMaker connectors and then restart all your applications, now configured to connect to cluster B, which has become the new primary cluster.
Before restarting consuming applications, you can use the RemoteClusterUtils utility to translate consumer group offsets using the latest available checkpoints as this can produce slightly better offsets than what MirrorCheckpointConnector last committed. For example, you can run the following logic that translates consumer group offsets and commits them to the target cluster:
Map<String, Object> props = Map.of(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<TARGET_BOOTSTRAP_SERVERS>",
MirrorClientConfig.REPLICATION_POLICY_CLASS, IdentityReplicationPolicy.class.getName()
);
// Get the translated offsets
Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
RemoteClusterUtils.translateOffsets(
props,
"A",
"my-consumer-group",
Duration.ofSeconds(600));
Map<String, Object> adminProps = Map.of(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<TARGET_BOOTSTRAP_SERVERS>"
);
try (Admin admin = Admin.create(adminProps)) {
// Use Admin API to override the offsets for the consumer group
admin.alterConsumerGroupOffsets("my-consumer-group", translatedOffsets)
.all()
.get(10, TimeUnit.SECONDS);
}
The arguments are as follows:
-
props: Configuration properties required for MirrorClient to talk to the target cluster. For example, the properties have connection details to connect cluster B. It should also include the "replication.policy.class" configuration that must be set to the same value set for the MirrorMaker so that MirrorClient knows the format of the mirrored topics. -
"A": Alias given to the source cluster (A in this example). -
"my-consumer-group": Group ID that you want the translated offset for. -
Duration.ofSeconds(600): The timeout to translate offsets. The translateOffsets() method needs to consume the entire checkpoints topics, so ensure the timeout is long enough.
Figure 8.6. The state of the disaster recovery environment after losing cluster A and performing an unplanned failover
Following a planned or unplanned failover, you don’t have a disaster recovery environment anymore, so you need to set up a new disaster recovery cluster.