Este contenido no está disponible en el idioma seleccionado.

Disaster Recovery using MirrorMaker 2


Red Hat Streams for Apache Kafka 3.1

Disaster recovery for Kafka clusters using MirrorMaker 2

Abstract

This guide describes how to perform disaster recovery when replicating data with MirrorMaker 2. It covers common failure scenarios and provides guidance for failing over and restoring Kafka workloads with minimal disruption.

Providing feedback on Red Hat documentation

We appreciate your feedback on our documentation.

To propose improvements, open a Jira issue and describe your suggested changes. Provide as much detail as possible to enable us to address your request quickly.

Prerequisite

  • You have a Red Hat Customer Portal account. This account enables you to log in to the Red Hat Jira Software instance. If you do not have an account, you will be prompted to create one.

Procedure

  1. Click Create issue.
  2. In the Summary text box, enter a brief description of the issue.
  3. In the Description text box, provide the following information:

    • The URL of the page where you found the issue.
    • A detailed description of the issue.
      You can leave the information in any other fields at their default values.
  4. Add a reporter name.
  5. Click Create to submit the Jira issue to the documentation team.

Thank you for taking the time to provide feedback.

Chapter 1. Disaster recovery for Apache Kafka

Apache Kafka provides strong resiliency and fault tolerance features out of the box. Kafka can replicate data across multiple brokers which allows it to survive broker failures without data loss and maintain full availability to clients. Brokers can also be configured with a "rack", via the broker.rack configuration, to indicate a unit of isolation. By default Kafka spreads partition replicas across as many racks as possible. The rack can represent an actual rack but also a data center or availability zone. Depending on how you deploy and configure the clusters, this can effectively allow being resilient to a full rack, data center or availability zone failure.

Kafka still requires relatively low latency between all brokers, so it’s not recommended to place brokers in different geographical regions and use the rack to represent a region. So for use cases that require stronger resiliency, for example, the loss of an entire region, you can’t rely only on inter-broker replication; a disaster recovery plan is needed.

A disaster recovery plan for Kafka typically consists of tools and processes to maintain or restore access to data. A plan starts with business requirements by deciding, among other things, a Recovery Time Objective (RTO) which is the acceptable duration for recovering, and a Recovery Point Objective (RPO) which is the maximum acceptable data loss. On the operating side, a solid plan requires understanding the state of both clusters at all times, the decisions involved, and who is responsible for taking them, when setting up and operating an environment, as well as the processes and tools to follow.

In most cases there is not a perfect way to handle and fully recover from a disaster. Several decision points involve tradeoffs, for example, availability versus consistency, that need to be understood to recover in a way that is best for your business. You need to carefully consider these in advance and include them in your plan so if a disaster strikes you know what actions to take.

Apache Kafka includes MirrorMaker (version 2), a tool that copies data between different Kafka clusters. This is called mirroring to differentiate it from replication which is used for data copied between brokers within a cluster.

Cluster migration is a different use case where MirrorMaker moves all data from one cluster to another. This is needed for example when the underlying infrastructure of a cluster is decommissioned, so a new cluster must be deployed and we need to copy all the data from the old cluster. The process for a cluster migration is effectively the same as a planned failover.

Chapter 2. Understanding mirroring terminology

In mirroring environments, the terminology can be confusing. It’s important to clearly identify each cluster, as running a command on the wrong cluster can have a catastrophic impact. Throughout this article we have two clusters, A and B. These names are fixed and uniquely identify each cluster.

In a disaster recovery environment, at any time, one cluster is the primary cluster handling all the clients’ workloads, and the other one is the disaster recovery cluster where MirrorMaker mirrors all data. Which cluster is the primary and which is the disaster recovery depends on the state of the environment. For example, you might start with A being the primary cluster and B the disaster recovery cluster. If a disaster happens to A, B becomes the primary cluster and clients fail over to it. Failing over is the act of promoting the disaster recovery cluster to be the primary (and at the same time demoting the primary) and moving all applications to it.

For example, the initial setup with A being the primary cluster:

Figure 2.1. A disaster recovery environment mirroring data from cluster A to cluster B

Disaster recovery environment mirroring data from cluster A to cluster B

If the clusters are in different geographies, organizations often have a preferred cluster for primary. In that case, it makes sense to switch back to the initial cluster. This process is called failing back.

After failing over to B and demoting A to disaster recovery:

Figure 2.2. A disaster recovery environment mirroring data from cluster B to cluster A

Disaster recovery environment mirroring data from cluster B to cluster A

In the MirrorMaker documentation and configurations, the clusters are referred to as "source" and "target". In the disaster recovery scenario, the cluster currently acting as the source for mirroring is the primary cluster, and the target is the disaster recovery cluster.

Chapter 3. MirrorMaker components

MirrorMaker is built on top of the Kafka Connect framework and enables many scenarios such as geo-replication, aggregation, and disaster recovery. MirrorMaker consists of three Kafka Connect connectors:

  • MirrorSourceConnector: it mirrors topics, topics data, configurations, ACLs, and emits offset syncs
  • MirrorCheckpointConnector: it tracks and maps consumer group offsets
  • MirrorHeartbeatConnector: it periodically emits heartbeat used for checking connectivity between clusters

For disaster recovery, you must run the MirrorSourceConnector and the MirrorCheckpointConnector.

Figure 3.1. Using the MirrorSourceConnector and the MirrorCheckpointConnector for disaster recovery

Using the `MirrorSourceConnector` and the `MirrorCheckpointConnector` for disaster recovery

3.1. Mirroring topic data with MirrorSourceConnector

The MirrorSourceConnector mirrors the topics and their records. It periodically checks topics on the source cluster and applies any partition and configuration changes it detects to the target cluster. However topic deletions are not mirrored, so if a topic is deleted on the source cluster, its copy on the target cluster is left untouched. It provides a safety net in case the deletion was a mistake. Otherwise, an operator should go and delete that topic in the target cluster.

An important concept to understand is that records on each cluster are highly likely to have different offsets. This happens for several reasons. First, partitions always start at offset 0. So if a partition on the primary cluster has already had offsets deleted due to retention policies, there is always a difference in offsets. Another reason is that MirrorMaker does not mirror transaction markers. Transaction markers are emitted when a transaction is committed or aborted and they occupy offsets. In order to enable offset translation between both clusters, the connector periodically writes a record into the offset-syncs topic that contains both the source and target offsets of a record it just mirrored.

This connector also mirrors topic ACLs, but note that it does not mirror the ALLOW WRITE topic ACLs. This avoids clients (other than MirrorMaker) writing to the target cluster, which might cause inconsistency. It also does not mirror ACLs set on other resources such as groups, transactional IDs, users, or cluster. In case ACLs are used, you need another process to maintain them. For these reasons, if you require authorizations in your clusters, you can’t only rely on MirrorMaker to mirror them. Operational practices like GitOps are a great way to set and maintain ACLs.

3.2. Mirroring consumer offsets with MirrorCheckpointConnector

The MirrorCheckpointConnector mirrors committed consumer group offsets. It uses the mappings from the offset-syncs topic to translate the committed offsets from the source cluster, and writes the results to the checkpoints topic. By enabling the sync offsets feature, the connector is also able to directly commit the translated offset to the target cluster. Committing the offsets makes it easier to move consuming applications across the clusters, otherwise applications must use the RemoteClusterUtils tool to find their offsets from the checkpoints topic when connecting to the new primary cluster.

3.3. Using MirrorHeartbeatConnector for connectivity checks

The MirrorHeartbeatConnector is not required, and it is recommended that you do not run it for disaster recovery scenarios. This is because the MirrorHeartbeatConnector sends records to the source cluster whereas the other two connectors send records to the target cluster. Since we are deploying MirrorMaker near the target cluster for better performance with the data replication, the MirrorHeartbeatConnector would need to be deployed separately near the source cluster. This added complexity and cost do not justify the benefit, as sufficient metrics to monitor MirrorMaker can be obtained from the MirrorSourceConnector.

Chapter 4. Translating offsets between clusters

As mentioned, records on the source and target clusters have different offsets. To enable applications to move from one cluster to another, MirrorMaker provides offset translation. The offset translation process is lossy. When translating an offset from the source cluster to the target cluster, it’s not guaranteed to get the exact same record. It’s just not possible for MirrorMaker to keep track of the source and target offsets for all records it mirrors. If an exact translation is not possible, MirrorMaker guarantees that the record at the translated offset is always earlier than the actual record. This ensures consuming applications never skip data at the cost of potential reprocessing.

The translation mechanism builds on two processes that run in parallel. First MirrorSourceConnector tasks emit periodic offset-syncs records. Then MirrorCheckpointConnector tasks read the offset-syncs topic and build an in-memory cache of the mappings for each partition.

One key impact is that committed offsets more recent than the latest mirrored offsets can’t be translated. For example with the following state, the last offset-sync emitted by the MirrorSourceConnector is offset 7. If a consumer commits offset 9, the MirrorCheckpointConnector is not able to translate that offset, and returns -1.

Figure 4.1. The difference in offsets between the two clusters

The difference in offsets between the two clusters

When starting, each MirrorCheckpointConnector task consumes the whole offset-syncs topic. For this reason it’s important to keep this topic relatively small by setting retention policies. For each partition each MirrorCheckpointConnector task keeps up to 64 offset-syncs. It always keeps the latest offset-sync, and then keeps mappings separated by approximately exponential space. For this reason, translating offsets of records that have recently been mirrored is accurate but the accuracy of the translation degrades exponentially for older offsets.

At a high level, each MirrorCheckpointConnector task builds a cache of offset mappings for a partition. For example after mirroring 1000000 records for a partition, the cache may look something like this:

Expand
Source OffsetTarget Offset

1000000

A

999900

B

999800

C

999600

D

999200

E

 

921600

K

819200

L

614400

M

409600

N

 

0

Z

When we try to translate an offset for which we have the mapping the exact matching offset is returned. For example, offsets 1000000, 999900 translate exactly. For older offsets, the target offset + 1 for the previous matching offset is returned. For example 999250 is translated to E + 1. This means the number of records consumers may reprocess is in the worst case 400 (999600 - 999200) records. As the gap between offset-syncs is doubling, the quality of the translation degrades exponentially. So offset 420000 is translated to N + 1 which can be up to 204800 (614400 - 409600) records off. Finally, offsets more recent than 1000000 can’t be translated.

Chapter 5. Setting up MirrorMaker for disaster recovery

To move your workloads between clusters, both clusters should have similar resources. In cases when disaster recovery is only necessary for a subset of the topics, the disaster recovery cluster might have less resources. Both clusters should also have similar configurations. In addition, MirrorMaker expects the disaster recovery cluster to only be a hot standby cluster. Clients must only be connected to the primary cluster, and only MirrorMaker should connect and mirror data to the disaster recovery cluster.

The MirrorMaker cluster is typically deployed near the target cluster (e.g. in the same network zone or Kubernetes cluster). This setup reduces latency accessing the internal topics of the connectors as they are stored in the target cluster. Another benefit of this setup is that in the event of a disaster, the MirrorMaker cluster is unlikely to be impacted.

For production use cases, run MirrorMaker on a multi-worker distributed Kafka Connect cluster. This is necessary because distributed mode enables scaling the Kafka Connect workers to match the source cluster throughput and it provides resiliency to worker failures. Distributed mode also enables the REST API which is very useful for operating a MirrorMaker environment.

The first step is to prepare the configurations for the Kafka Connect cluster and the two connectors.

5.1. Configuring Kafka Connect for MirrorMaker

When setting up Kafka Connect for MirrorMaker 2, you need to configure both the Kafka Connect worker and the connectors. Kafka Connect offers dozens of configuration settings. You must carefully consider the following settings:

group.id
All the workers in the Kafka Connect cluster must have the same group.id configuration. This name appears in the metrics of the Kafka clients each worker uses internally.
plugin.discovery
The strategy defines how Kafka Connect finds plugin classes for connectors. Set to service_load because it is the fastest strategy and is compatible with MirrorMaker connectors, significantly improving worker start-up time.
key.converter, value.converter
Set to org.apache.kafka.connect.converters.ByteArrayConverter which MirrorMaker connectors must use. If your Kafka Connect cluster runs only MirrorMaker, set the converters directly in Kafka Connect, so that you don’t need to set them in each connector configuration.
config.storage.topic, offset.storage.topic, status.storage.topic
These define the names for the internal topics used by Kafka Connect. All the workers in the Kafka Connect cluster must have the same values. We recommend setting them to unique names that clearly identify the Kafka Connect cluster. To ensure these topics are not mirrored, prefix them with "__" (double underscores).
listeners
This configuration determines the list of listeners the REST API is exposed on. If not set, the default is "http://:8083", which means the REST API can be accessed via HTTP on the default network interface (e.g. localhost) at port 8083. Workers need to be able to connect to each other via their listeners. More information on how to secure the REST API and configure advertised listeners for Kafka Connect workers can be found on the Apache Kafka website.
bootstrap.servers, security.protocol, sasl.mechanism
These configurations provide the connection details for Kafka Connect to connect to the target cluster (these are just the main ones). The connection details must also be provided for connectors, using the target alias as a prefix for the configurations. Use the same credentials to connect to the target cluster for both Kafka Connect and connectors so that they both use the same identity and are able to share ACLs. Otherwise, you must set ACLs for each identity they use.

5.2. Configuring common MirrorMaker connector settings

These are configurations to set for both MirrorSourceConnector and MirrorCheckpointConnector:

name
The name uniquely identifies a connector. We recommend using names with this format <SOURCE>-to-<TARGET>-<CONNECTOR>, for example "A-to-B-MirrorSourceConnector". We recommend avoiding special characters as the connector names appear in paths of the REST API and in metric names and labels.
tasks.max
This indicates the maximum number of tasks Kafka Connect can start for this connector. Tasks are the unit of parallelism. Having more tasks makes the workload more granular and easier to distribute across all workers, especially when a worker is restarted or crashes. However, each task adds some overhead. Aim for as many tasks as CPU cores in the Kafka Connect clusters.
source.cluster.alias
This is the alias given to the source cluster. This name appears in logs and metrics.
target.cluster.alias
This is the alias given to the target cluster. This name appears in logs and metrics.
offset-syncs.topic.location
This determines where to store the offset-syncs topic. By default, it is set to source; however, it is recommended to set this to target so that MirrorMaker can be granted read-only access to the source cluster and the topic can be created in the target cluster. This topic is named mm2-offset-syncs.<SOURCE_ALIAS>.internal
replication.policy.class
The replication policy determines how replicated topics are named and detects mirroring cycles in bidirectional mirroring. Disaster recovery uses unidirectional mirroring, so set this to “org.apache.kafka.connect.mirror.IdentityReplicationPolicy” to have the same topic names on both clusters.
source.cluster.bootstrap.servers, source.cluster.security.protocol, source.cluster.sasl.mechanism, target.cluster.bootstrap.servers, target.cluster.security.protocol, target.cluster.sasl.mechanism
These configurations are set using aliases "source.cluster." and "target.cluster." as prefixes to provide connection details for both clusters. These are just an example of the main ones but there are more configurations for the connection details that can be set using these prefixes. As mentioned previously, use the same credentials provided for Kafka Connect when configuring these properties with the target alias.

5.3. Configuring MirrorSourceConnector

The following are key configurations for MirrorSourceConnector:

heartbeats.replication.enabled
This determines whether to replicate the heartbeats topics which MirrorHeartbeatConnector emits records to. Set this to false because as explained previously, it is not necessary to run MirrorHeartbeatConnector for disaster recovery, so you can disable this feature.
replication.factor
This defines the replication factor for the new topics on the target cluster. Set this to -1 to use the target Kafka cluster default replication factor.
offset-syncs.topic.replication.factor
This defines the replication factor of the offset-syncs topic. Set this to -1 to use the target Kafka cluster default replication factor. When created in the target cluster, this topic is named "mm2-offset-syncs.<SOURCE_ALIAS>.internal".
topics
This determines which topics are mirrored. It accepts both explicit topic names and regular expression patterns. By default, it is set to "." meaning all topics from the source cluster are mirrored except those that match the topics.exclude filter. This filter defaults to "mm2.\.internal, .\.replica, __." to avoid unintentionally mirroring internal topics. You should always set topics explicitly to the exact list of topic names or regex patterns you intend to mirror.
sync.topic.acls.enabled
This determines whether to enable synchronization of topic ACLs. This is set to true by default. If you don’t use authorizations you should disable this feature by setting it to false.
offset.lag.max
This determines how frequently an offset-sync record is emitted for each partition. By default, it is set to 100, which means an offset-sync is written to the offset-syncs topic every time 100 records have been mirrored from a partition. This configuration is often misunderstood. In most cases, keep the default value. Setting it to a lower value causes more frequent offset-syncs which are individually small records but since this setting affects all partitions, it can significantly increase the throughput and size of the offset-syncs topic.
producer.override.<configuration name>
Connectors can override configurations for the producers used by Kafka Connect to mirror the records to the target cluster. In high throughput environments, it can be beneficial to increase the batch size (batch.size) and enable compression (compression.type). Producer overrides must be specified in the connector configuration with the “producer.override.” prefix, for example “producer.override.compression.type”.

These are the main configurations to be set when you start the connector. However, there are more configurations that you can set for fine tuning. You can list them using the REST API, for example:

curl http://<CONNECT>/connector-plugins/MirrorSourceConnector/config

5.4. Configuring MirrorCheckpointConnector

The following are key configurations for MirrorCheckpointConnector:

checkpoints.topic.replication.factor
This defines the replication factor of the checkpoints topic. Set this to -1 to use the target Kafka cluster default replication factor. This topic is named "<SOURCE_ALIAS>.checkpoints.internal".
sync.group.offsets.enabled
This determines whether translated offsets are periodically committed to the target cluster. Set this to true to allow consumers to easily move between clusters. If set to false, consumer group offsets must be translated manually using a tool like RemoteClusterUtils and then committed explicitly (for example, via Admin API) when moving consumers between clusters.
groups
This determines which consumer groups are mirrored. It accepts both explicit group names and regular expression patterns. By default, it is set to "." meaning all groups from the source cluster are mirrored except those that match the groups.exclude filter. This filter defaults to "console-consumer-., connect-., __." to avoid unintentionally mirroring consumer groups used by internal tools. You should always set groups explicitly to the exact list of group names or patterns you intend to mirror.

These are the main configurations to be set when you start the connector. However, there are more configurations that you can set for fine tuning. You can list them using the REST API, for example:

curl http://<CONNECT>/connector-plugins/MirrorCheckpointConnector/config

5.5. Configuring exactly-once support

To avoid data loss or duplicates in the target cluster while mirroring, enable exactly-once semantics for Kafka Connect and MirrorSourceConnector. Exactly-once support is achieved by using transactions to ensure that the two actions: sending the data to Kafka and committing offsets to the offset storage topic, both succeed or both fail atomically.

5.5.1. Kafka Connect settings for exactly-once support

exactly.once.source.support
This determines whether to enable exactly-once support for source connectors by using transactions to write records and their offsets. Set this to “enabled” so that MirrorSourceConnector can write to the target cluster with exactly-once semantics.

5.5.2. MirrorSourceConnector settings for exactly-once support

consumer.isolation.level
This defines how to read records written using transactions. Set to “read_committed” to enable exactly-once semantics in MirrorSourceConnector so that it only reads transactional records that have been committed.

In most cases, you don’t need to set other configurations. However you can set transaction.boundary to poll and adjust transaction.boundary.interval.ms if you want to reduce the number of transactions created by the connector.

transaction.boundary
This defines when Kafka Connect should create and commit its transactions. MirrorSourceConnector supports setting this configuration to "poll" (the default) and "interval". If set to "poll", a new transaction is started and committed for every batch of records that a connector task provides to Kafka Connect. If set to "interval", transactions are committed after an interval defined by another configuration, "transaction.boundary.interval.ms".
transaction.boundary.interval.ms
This determines the interval at which transactions are committed. If not set, the value from "offset.flush.interval.ms" is used (1 minute by default).

5.6. Configuration examples

This is an example configuration file (e.g. connect-distributed.properties) for a Kafka Connect worker:

bootstrap.servers=<TARGET_BOOTSTRAP_SERVERS>

group.id=connect-mirrormaker-A-to-B

plugin.discovery=service_load

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

offset.storage.topic=__connect-mirrormaker-A-to-B-offsets
offset.storage.replication.factor=-1
config.storage.topic=__connect-mirrormaker-A-to-B-configs
config.storage.replication.factor=-1
status.storage.topic=__connect-mirrormaker-A-to-B-status
status.storage.replication.factor=-1

exactly.once.source.support=enabled

This is an example configuration for a MirrorSourceConnector mirroring data from cluster A to cluster B:

{
    "name": "A-to-B-MirrorSourceConnector",
    "config": {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "tasks.max": "30",

        "source.cluster.alias": "A",
        "target.cluster.alias": "B",
        "source.cluster.bootstrap.servers": "<SOURCE_BOOTSTRAP_SERVERS>",
        "target.cluster.bootstrap.servers": "<TARGET_BOOTSTRAP_SERVERS>",

        "offset-syncs.topic.location": "target",
        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",

        "heartbeats.replication.enabled": "false",
        "replication.factor": "-1",
        "offset-syncs.topic.replication.factor": "-1",
        "sync.topic.acls.enabled": "false",
        "consumer.isolation.level": "read_committed",
        "topics": "<TOPICS>"
    }
}

This is an example configuration for a MirrorCheckpointConnector mirroring consumer groups from cluster A to cluster B:

{
    "name": "A-to-B-MirrorCheckpointConnector",
    "config": {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "tasks.max": "30",

        "source.cluster.alias": "A",
        "target.cluster.alias": "B",
        "source.cluster.bootstrap.servers": "<SOURCE_BOOTSTRAP_SERVERS>",
        "target.cluster.bootstrap.servers": "<TARGET_BOOTSTRAP_SERVERS>",

        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
        "offset-syncs.topic.location": "target",

        "checkpoints.topic.replication.factor": "-1",
        "sync.group.offsets.enabled": "true",
        "groups": "<GROUPS>"
    }
}

5.7. Configuring authorizations for MirrorMaker

Most production environments use authorizations to control access to Kafka resources. In that case, to run MirrorMaker, you must grant permissions to the users employed by the Kafka Connect workers and the MirrorMaker connectors for both source and target clusters.

5.7.1. Source cluster access control

When the offset-syncs topic is placed on the target cluster, you only require READ, DESCRIBE, and DESCRIBE_CONFIGS permissions in the source cluster:

  • READ, DESCRIBE and DESCRIBE_CONFIGS on each topic to mirror
  • DESCRIBE on the cluster
  • DESCRIBE on each consumer group to mirror

5.7.2. Target cluster access control

MirrorMaker requires permissions on the target cluster to create topics, maintain their configurations, mirror their records, and manage consumer group offsets:

  • CREATE, DESCRIBE, WRITE, ALTER, ALTER_CONFIGS for each mirrored topic
  • DESCRIBE, READ on each mirrored consumer group
  • READ on each topic with offsets for each mirrored consumer group
  • CREATE, DESCRIBE, WRITE, READ on the offset-syncs topic
  • CREATE, DESCRIBE, WRITE on the checkpoints topic
  • DESCRIBE, ALTER on the cluster

You can set the ACLs using the command line tool or the Admin Client API.

For example, to set the ACLs for the offset-syncs topic via the kafka-acl.sh tool:

$ bin/kafka-acls.sh \
  --bootstrap-server <TARGET_BOOTSTRAP_SERVERS> \
  --add --allow-principal <PRINCIPAL> \
  --operation Read \
  --operation Write \
  --operation Create \
  --operation Describe \
  --topic mm2-offset-syncs.<SOURCE_ALIAS>.internal

You can also use Admin API to set the ACLs:

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "<TARGET_BOOTSTRAP_SERVERS>");

try (Admin admin = Admin.create(props)) {
    String user = "<PRINCIPAL>";
    String topic = "mm2-offset-syncs.<SOURCE_ALIAS>.internal";

    ResourcePattern resource = new ResourcePattern(
        ResourceType.TOPIC,
        topic,
        PatternType.LITERAL
    );

    List<AclBinding> acls = List.of(
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.READ, AclPermissionType.ALLOW)),
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.WRITE, AclPermissionType.ALLOW)),
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.CREATE, AclPermissionType.ALLOW)),
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
    );

    admin.createAcls(acls).all().get();
}

In both cases, the arguments are:

<TARGET_BOOTSTRAP_SERVERS>
A comma separated list of bootstrap servers of the target cluster. For clusters with authentication, we can put the required settings in a file and use the –command-config argument.
<PRINCIPAL>
The principal given to Kafka Connect and MirrorMaker.
<SOURCE_ALIAS>
The alias of the source cluster.

Chapter 6. Starting MirrorMaker replication

Before starting MirrorMaker, make sure your Kafka Connect cluster is properly sized and configured. The Kafka Connect cluster needs to have enough resources to keep up with traffic arriving on the source cluster. A Kafka Connect cluster can be scaled by adding or removing instances, called workers. Workers in a Kafka Connect cluster need to have the same group.id, config.storage.topic, offset.storage.topic and status.storage.topic configurations and be able to connect to each other’s exposed listener. For resiliency we recommend always running multiple workers. It’s also important to understand the performance characteristics of the workers, especially in terms of network capacity and their latency to each cluster.

Once you are ready, you can start each Kafka Connect worker using the connect-distributed.sh tool.

bin/connect-distributed.sh config/connect-distributed.properties

6.1. Choosing a starting position

The MirrorSourceConnector mirrors data sequentially, so for each partition newer data is only mirrored after older data. This means that if Kafka Connect or the disaster recovery cluster don’t have enough capacity, there is a risk the disaster recovery cluster will never catch up.

The following formula estimates the time it takes for the disaster recovery cluster to catch up to the primary cluster:

t = B / (Rmm2 - Rin)

Where:

  • t is the estimated time in seconds it takes to get both clusters in-sync
  • B is the backlog of data in bytes on the primary cluster we want to mirror
  • Rmm2 is the throughput in bytes per second the Kafka Connect cluster can achieve
  • Rin is the incoming throughput to the primary cluster in bytes per second

Rmm2 must be larger than Rin; otherwise the disaster recovery cluster will never catch up with the primary cluster.

For example, if the primary cluster has 100000 MB of existing data (B), the MirrorMaker cluster can handle 150MB/s (Rmm2), and incoming throughput on the primary cluster is 100MB/s (Rin), the estimated catch-up time for disaster recovery is approximately 33 minutes:

100000 / (150 - 100)   = 2000 seconds (~33 minutes)

You need to decide the starting position of the MirrorSourceConnector, as this determines the value of B. There are three options, earliest, latest or custom.

6.1.1. Earliest position

By default, the MirrorSourceConnector starts from "earliest", which means it starts mirroring from the beginning of all selected partitions. This sets B in the equation to its maximum value, effectively mirroring all existing data to the disaster recovery cluster, but with some drawbacks.

First, if there is a lot of data (such as topics using tiered storage) MirrorMaker begins with a huge backlog of data. To keep up, it must have a much higher throughput than the incoming traffic to the primary cluster.

Lastly, topics have retention policies to determine when to delete data. If these trigger and delete a segment while the MirrorSourceConnector is mirroring it, it may create gaps in the data in the disaster recovery cluster.

Figure 6.1. Offset 2 on the source partition has been deleted so it cannot be mirrored

Offset 2 on the source partition has been deleted so it cannot be mirrored

When MirrorMaker tries to read offset 2, it gets an error from the source cluster and resets its position to offset 3, which is the current earliest offset.

Figure 6.2. Since offset 2 was not mirrored, there is a data gap in the target partition

Offset 2 on the source partition has been deleted so it cannot be mirrored

So it effectively skipped offset 2. For these reasons, unless the source cluster only contains very little data, don’t mirror from the earliest position.

6.1.2. Latest position

The most common other option is to start from "latest" which means MirrorMaker starts mirroring new data only. This sets B in the equation to pretty much 0.

The tradeoff of this approach is that existing data is not mirrored. If a disaster happens and the storage on the primary cluster is lost, all pre-existing data is lost. On the other hand, MirrorMaker does not need to have a higher throughput than the incoming traffic to the primary cluster. To configure the MirrorSourceConnector to start from the latest, you need to set consumer.auto.offset.reset to "latest" in the MirrorSourceConnector configuration.

6.1.3. Custom position

The final option is to explicitly set the starting offsets for all, or some, partitions. This allows mirroring some of the most recent existing data without going back to the beginning, effectively letting you select a value for B in the equation that satisfies your requirements. For example, if you keep seven days of data, you could decide to start mirroring data from the last one or two days.

In order to do so, you need to find the offsets for the desired timestamp for the topics you want to mirror, for example using the kafka-get-offsets.sh tool:

bin/kafka-get-offsets.sh \
  --bootstrap-server <PRIMARY_BOOTSTRAP_SERVERS> \
  --time <TIME> \
  --topic-partitions <TOPICS>

The arguments are:

<PRIMARY_BOOTSTRAP_SERVERS>
A comma separated list of bootstrap servers of the source cluster. For clusters with authentication, we can put the required settings in a file and use the –command-config argument.
<TIME>
The timestamp in milliseconds for the desired date.
<TOPICS>
A comma-separated list of topic names and regexes.

6.1.4. Resetting the starting position

Note that choosing the starting position mostly applies when first starting the connector. After running if the connector restarts, it automatically picks up from its last position. If you want to change the position once the connector is already running, you first need to stop the connector and clear its position via the Kafka Connect REST API:

curl -X PUT http://<CONNECT>/connectors/A-to-B-MirrorSourceConnector/stop

curl -X DELETE http://<CONNECT>/connectors/A-to-B-MirrorSourceConnector/offsets

Before restarting the connector, clear the topics in the target cluster. Otherwise, MirrorMaker will start appending data from the source cluster to the existing target cluster topics resulting in a mix of data mirrored by the previous MirrorMaker connector and data from the new instance.

6.2. Starting MirrorMaker connectors

Having started Kafka Connect, you can now use the Kafka Connect REST API to start the MirrorMaker connectors.

First, start the MirrorSourceConnector. If you want to use a custom position, initialize it in the stopped state by setting "initial_state" to "STOPPED" in its configuration.

For example:

curl -X POST http://<CONNECT>/connectors \
     -H "Content-type: application/json" \
     -d '{
           "name": "A-to-B-MirrorSourceConnector",
           "config": {
             "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
             "tasks.max": "10",
             …
           },
           "initial_state": "STOPPED"
         }'

Then you can set its starting offsets via another call to the Kafka Connect REST API:

curl -X PATCH http://<CONNECT>/connectors/A-to-B-MirrorSourceConnector/offsets \
  -H "Content-type: application/json" \
  -d '{
       "offsets": [
         {
           "partition": {
             "cluster": "A",
             "topic": "my-topic",
             "partition": 0
           },
           "offset": {
             "offset": 100
           }
         }
       ]
    }'

Finally, you can start the connector using the following curl command:

curl -X PUT http://<CONNECT>/connectors/A-to-B-MirrorSourceConnector/resume

When using the earliest or latest starting position, don’t set the "initial_state" configuration. You can start the connector directly:

curl -X POST http://<CONNECT>/connectors \
     -H "Content-type: application/json" \
     -d '{
           "name": "A-to-B-MirrorSourceConnector",
           "config": {
             "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
             "tasks.max": "10",
             …
           }
         }'

MirrorCheckpointConnector does not maintain a position, so you can start it directly:

curl -X POST http://<CONNECT>/connectors \
     -H "Content-type: application/json" \
     -d '{
           "name": "A-to-B-MirrorCheckpointConnector",
           "config": {
             "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
             "tasks.max": "10",
             …
           }
         }'

6.2.1. Tuning internal MirrorMaker topics

This step is optional, but can reduce MirrorCheckpointConnector startup times and significantly shorten the time to translate offsets when using the RemoteClusterUtils tool.

As explained previously, MirrorMaker uses two internal topics:

  • The offset-syncs topic (mm2-offset-syncs.<SOURCE_CLUSTER_ALIAS>.internal)
  • The checkpoints topic (<SOURCE_CLUSTER_ALIAS>.checkpoints.internal)

When MirrorCheckpointConnector tasks start, they consume both the offset-syncs and checkpoints topic entirely, so it’s important to keep their size as small as possible.

Both topics are created with a single partition and use log compaction. With log compaction, Kafka retains the most recent record for each key, and tries to periodically delete older records. Kafka does not compact the active log segment. You can learn more about Log Compaction in the Apache Kafka documentation.

For the offset-syncs topic, the key of each record is the topic partition. An offset-sync record has the following format:

OffsetSync
  {topicPartition=<TOPIC_PARTITION>,
  upstreamOffset=<SOURCE_OFFSET>,
  downstreamOffset=<TARGET_OFFSET>
}

The topicPartition field is the key. The upstreamOffset and downstreamOffset fields form the value.

The offset-syncs topic should still retain enough records to translate offsets for lagging consumers. To prevent log compaction from happening too soon, set min.compaction.lag.ms to define the minimum time before compaction removes old records.

For example, to preserve offset-syncs for at least 2 days, set min.compaction.lag.ms to 172800000 (48 hours). You should also set a smaller segment size (default is 1GiB) to enable compaction to run more often. You can change these settings using the kafka-configs.sh tool:

bin/kafka-configs.sh
  --bootstrap-server <TARGET_BOOTSTRAP_SERVERS> \
  --alter \
  --add-config segment.bytes=536870912 \
  --add-config min.compaction.lag.ms=172800000 \
  --entity-type topics \
  --entity-name mm2-offset-syncs.<SOURCE_CLUSTER_ALIAS>.internal

The arguments are:

<TARGET_BOOTSTRAP_SERVERS>
A comma separated list of bootstrap servers of the target cluster. For clusters with authentication, place the required settings in a file and use the –command-config argument.
<SOURCE_CLUSTER_ALIAS>
The alias of the source cluster.

For the checkpoints topic, the key of each record is a combination of the consumer group ID and the topic partition.

A Checkpoint record has the following format:

Checkpoint{
  consumerGroupId=<GROUP_ID>,
  topicPartition=<TOPIC_PARTITION>,
  upstreamOffset=<SOURCE_OFFSET>,
  downstreamOffset=<TARGET_OFFSET>,
  metadata=<METADATA>
}

The consumerGroupId and topicPartition fields form the key, and the other fields form the value.

When translating committed offsets, only the most recent value is required, so you only need to retain the latest Checkpoint record for each consumer group Id and topic partition combination. The RemoteClusterUtils.translateOffsets() method, which can be used during an unplanned failover, must read the entire checkpoints topic to translate offsets. To improve performance, keep this topic small and limit its segment size. For example, setting a segment size of 50MiB using the kafka-configs.sh tool is typically sufficient:

bin/kafka-configs.sh
  --bootstrap-server <TARGET_BOOTSTRAP_SERVERS> \
  --alter \
  --add-config segment.bytes=52428800 \
  --entity-type topics \
  --entity-name <SOURCE_CLUSTER_ALIAS>.checkpoints.internal

The arguments are:

<TARGET_BOOTSTRAP_SERVERS>
A comma separated list of bootstrap servers of the target cluster. For clusters with authentication, we can put the required settings in a file and use the –command-config argument.
<SOURCE_CLUSTER_ALIAS>
The alias of the source cluster.

Chapter 7. Operating and monitoring mirroring

Once started, mirroring for disaster recovery is a process that runs continuously. At all times you need to be able to check the health, liveness, performance and state of the mirroring. This is done in two ways: Kafka Connect and MirrorMaker metrics and the Kafka Connect REST API.

7.1. Monitoring mirroring with metrics

It’s critical to collect, graph, and monitor metrics from Kafka Connect, MirrorMaker and the two Kafka clusters involved to operate a disaster recovery environment.

Each component emits a lot of metrics. Instead of trying to collect everything, select the key set of metrics that you want to monitor. Set up dashboards displaying the most important metrics. Most monitoring tools also allow setting up alerts when metrics reach certain thresholds, which can be used to detect unexpected states and notify operators.

Kafka Connect and MirrorMaker both expose metrics via JMX - under the domain names used by Kafka Connect in MBean format. If a different tool than JMX is used to export and monitor metrics, be aware that the metric names would look different depending on the tool used.

7.1.1. MirrorSourceConnector

All the MirrorSourceConnector metrics are under the kafka.connect.mirror:type=MirrorSourceConnector,source= (),target=([-.w]),topic=(),partition=([0-9]) name, where target is the alias of the target cluster, topic is the topic name, and partition is the partition number.

Collect and monitor the following metrics to assess performance and reliability, byte-rate/record-rate
This is the average number of bytes or records MirrorMaker mirrors per second. Once both clusters are in-sync, these metrics should closely match the incoming rates of the source clusters for all the mirrored topics.
record-age-ms-<min/max/avg>
These are the minimum, maximum, or average age of records in milliseconds when they are mirrored. The age is computed by taking the time the record was successfully written to the target cluster and subtracting its timestamp that was either set by the producer when it sent this record to the source cluster (the default behavior) or by the source cluster when it received the record. These metrics are critical to evaluate the gap between the two clusters. Reliability of this metric depends on the accuracy of record timestamps, which clients can technically set to anything.
byte-count/record-count
This is the total number of bytes or records MirrorMaker has mirrored from the source cluster to the target cluster. These are emitted by the MirrorSourceConnector task currently mirroring the specified topic-partition. If there’s a rebalance or tasks are stopped and restarted, these will restart from 0. These are very similar to byte-rate and record-rate as they are just the raw counts instead of the computed rates. In most cases, use rate-based metrics as they’re easier to read and compare. If you use the raw counts, you need to convert them to rates in your monitoring system.

All of these metrics are per topic partition, so charting each series is not practical. Instead, aggregate them at a minimum by topic, but in most cases aggregate them all to get an overall view of the state of your mirroring environment.

7.1.2. MirrorCheckpointConnector

All the MirrorCheckpointConnector metrics are under the kafka.connect.mirror:type=MirrorCheckpointConnector,source=(),target=([-.w]),group=(),topic=([-.w]),partition=([0-9]+) name, where source is the alias of the source cluster, target is the alias of the target cluster, group is the consumer group name, topic is the topic name, and partition is the partition number.

This metric is not critical to monitor, but collecting it may help investigate issues regarding offset translation:

checkpoint-latency-ms-<min/max/avg>
Time taken between when the consumer group offsets are read on the source cluster and when the corresponding checkpoint records are written successfully to the target cluster. This is computed by subtracting the timestamps when the consumer group offsets are fetched from the timestamp when the checkpoint records are sent to the internal topic in the target cluster and acknowledged by the target cluster brokers. This is essentially producer end-to-end latency for offsets mirroring.

7.1.3. Kafka Connect

All the Kafka Connect metrics are under kafka.connect.mirror:type=connector-metrics,connector="{connector}", where connector is the connector name:

status

Status of the connector. Possible values:

  • Unassigned: Connector is configured but hasn’t been assigned to a worker.
  • Running: Connector is active and it has tasks assigned, which may or may not be running.
  • Paused: Connector is intentionally paused (e.g. by user via the REST API), which means its tasks stopped processing but they are still assigned to workers. When paused, connector tasks do not lose their assignments, therefore they do not trigger rebalancing.
  • Stopped: Connector is completely stopped and its tasks are in "unassigned" state.
  • Failed: Connector encountered an unrecoverable error (misconfiguration, worker not being able to instantiate etc.).
  • Restarting: Connector is in the process of restarting (e.g. due to reconfiguration), and is temporarily unavailable while the Kafka Connect worker stops all tasks, reloads configuration, and then reassigns tasks.
task={task},status

Status of the connector task. Possible values:

  • Unassigned: Task exists on Kafka Connect config but is not currently assigned to any worker. It could be because there is a rebalance going on.
  • Running: Task is actively executing (e.g. polling and producing)
  • Paused: Task is paused by user or connector pause but they are still assigned to workers. When paused, tasks do not lose their assignments therefore do not trigger rebalancing.
  • Failed: Task encountered an unrecoverable error.
  • Restarting: Task is restarting, usually as part of a connector restart or after a failure.

7.1.4. Kafka clusters

It’s also important to monitor the health and performance of both source and target clusters as well as the resource usage to make sure they are not running out of capacity.

Finally for all components, it’s crucial to understand how to adjust the set of metrics you collect, as well as add or update graphs and dashboards to retrieve information you need to operate your environment or investigate issues.

7.2. Operating MirrorMaker using the REST API

While the metrics allow you to monitor your environment, the REST API enables you to operate MirrorMaker and retrieve important information when you detect an issue.

Once the environment is running the most common operations are:

  • Changing the configuration: This is done via the PATCH /connectors/<CONNECTOR>/config endpoint. You can update the configuration of the connectors and also the maximum number of tasks.
  • Restarting failed connectors and tasks: This is done via the POST /connectors/<CONNECTOR>/restart endpoint. This endpoint accepts two query parameters: includeTasks and onlyFailed.

The following REST endpoints are useful for operating the connectors.

Check the status of a connector and its tasks:

curl -X GET http://<CONNECT>/connectors/<CONNECTOR>/status

Restart a connector and its tasks that have "failed" status by providing "includeTasks" and "onlyFailed" flags:

curl -X POST http://<CONNECT>/connectors/<CONNECTOR>/restart?includeTasks=true&onlyFailed=true

If "includeTasks" is not set or set to false, only the connector instance is restarted. If "onlyFailed" is not set or set to false, the connector and all of its tasks are restarted regardless of their status.

Update configuration such as "tasks.max" to increase the parallelism:

curl -X PATCH \
  -H "Content-Type: application/json" \
  -d '{"tasks.max": "10"}' \
  http://<CONNECT>/connectors/<CONNECTOR>/config

This automatically stops all the current tasks for the connector, applies the configuration update and creates the tasks based on the configured number, assigns them to available workers, and restarts the connector and the tasks. This endpoint is also used to update the other connector configurations.

Check the assignment of all the currently running tasks:

curl -X GET http://<CONNECT>/connectors/<CONNECTOR>/tasks

MirrorSourceConnector tasks have the task.assigned.partitions configuration that lists the partitions the task is mirroring. Similarly MirrorCheckpointConnector tasks have the task.assigned.groups configuration that lists consumer groups the task is translating offsets for.

Pause the connector and its tasks:

curl -X PUT http://<CONNECT>/connectors/<CONNECTOR>/pause

This stops message processing until the connector is resumed. The tasks are still assigned to workers and any resources claimed by them stays allocated. Once resumed, it is quick to start processing data.

Stop the connector and shut down its tasks:

curl -X PUT http://<CONNECT>/connectors/<CONNECTOR>/stop

This removes assigned tasks from the worker and releases any resources used by the tasks. It saves resource usage compared to the pause action, but it can take much longer to begin processing data once resumed.

Resume a stopped or paused connector and its tasks:

curl -X PUT http://<CONNECT>/connectors/<CONNECTOR>/resume

Kafka Connect has a number of other REST API endpoints, you can find them all in the Apache Kafka Connect REST API documentation.

Chapter 8. Failing over to a disaster recovery cluster

Failing over is the act of switching the primary cluster to the disaster recovery cluster. It can be planned, for example to proactively manage an incoming disaster or to exercise a disaster recovery plan, or unplanned when a disaster strikes on the primary cluster. In either case, the decision to failover has to be taken by an operator or an external process, it is not something MirrorMaker does. However, MirrorMaker does provide the information to help you make the decision.

Failing over also implies moving the client applications from one cluster to another. You need processes and tools to quickly shut down applications, and restart them (possibly in another region, if preparing for a full region failure) with an updated configuration so that they connect to the new primary cluster.

8.1. Performing a planned failover (migration)

When doing a planned failover or migration you can expect little to no data loss and a minimal impact on the applications. However, it requires certain conditions to be met before it can be done.

The first condition is to validate the gap (lag) between both clusters. MirrorMaker mirrors data asynchronously, so there is always a small data gap between the clusters. As described in the Metrics section, you can estimate the time delay between the clusters using the record-age-ms metric. For a planned failover, you ideally want this duration to be as short as possible. It should also match the RTO specified in the disaster recovery plan.

The second condition is the ability to shut down and restart applications in a controlled fashion, and with their configuration updated with the bootstrap servers for the other cluster. Before restarting, wait for the following to pass:

  • refresh.topics.interval.seconds since the last topic creation
  • sync.topic.acls.interval.seconds since the last ACL update
  • sync.topic.configs.interval.seconds since the last topic configuration change

The default value for these intervals is the same (600 seconds). If you want to adjust the values, it’s best to keep them aligned to not complicate this process.

Once these conditions are met, you can start the process. In this example, failover is from cluster A to cluster B. In this initial state, cluster B is very close to cluster A, it is just missing a small amount of records as the mirroring process is asynchronous.

Figure 8.1. Mirroring is asynchronous, so there is always some data on the source cluster that has not been mirrored yet

Mirroring is asynchronous

8.2. Steps to perform a planned failover

  1. Stop all applications that are producing to cluster A. Once they are stopped, you need to wait for MirrorSourceConnector to mirror the data gap between both clusters. Since the gap is short, it should be completed quickly. When it’s completed, the byte-rate metrics, such as kafka.connect.mirror:type=MirrorSourceConnector,target=B,topic=my-topic,partition=0,byte-rate, should drop to 0, as there is no more data to copy from cluster A to cluster B.

    Figure 8.2. After stopping producers on the source cluster, MirrorMaker is able to mirror all existing data

    After stopping producers on the source cluster
  2. Stop MirrorSourceConnector via the Kafka Connect REST API using:

    curl -X PUT http://<CONNECT>/connectors/A-to-B-MirrorSourceConnector/stop
  3. (optional) Before restarting any applications, note the Log End Offsets (LEO) of all the mirrored topics on cluster B. Although this step is optional, it is required to allow mirroring the data delta between the clusters when failing back. You retrieve the Log End Offsets via the kafka-get-offsets.sh tool:

    bin/kafka-get-offsets.sh \
      --bootstrap-server <TARGET_BOOTSTRAP_SERVERS> \
      --time latest \
      --topic-partitions <TOPICS>

    The arguments are:

    <TARGET_BOOTSTRAP_SERVERS>
    A comma separated list of bootstrap servers of the target cluster, which is cluster B in this example. For clusters with authentication, you can put the required settings in a file and use the –command-config argument.
    <TOPICS>
    A comma separated list of topic names and regexes. If this cluster only has topics that were mirrored, you don’t need to specify this argument as the command defaults to returning the offsets for all topics.
  4. Restart applications that now will only produce records to cluster B.

    Figure 8.3. The state of the disaster recovery environment after restarting producers on cluster B

    The state of the disaster recovery environment after restarting producers on cluster B
  5. Wait for consuming applications connected to A to reach lag 0 before shutting down.

    This guarantees an exact consumer offset translation. After stopping consumers, wait for sync.group.offsets.interval.seconds to ensure their offsets are synced before restarting them on cluster B.

    However, consumers with too much lag to catch up within an acceptable timeframe would need to be shut down before reaching lag 0. This could result in consumers reprocessing some of the records depending on how far they are behind.

  6. Wait for sync.group.offsets.interval.seconds and then stop MirrorCheckpointConnector via the Kafka Connect REST API:

    curl -X PUT http://<CONNECT>/connectors/A-to-B-MirrorCheckpointConnector/stop
  7. Restart the consuming applications on cluster B to successfully fail over to cluster B.

    Figure 8.4. The state of the disaster recovery environment after a successful planned failover

    The state of the disaster recovery environment after a successful planned failover

8.3. Handling an unplanned failover

Unfortunately, a failover is not always planned and if a hard failure impacts the primary cluster you may have to perform an unplanned failover. There is a large spectrum of possible failures, so the impact on your data and workload can vary significantly.

In the worst case, you can lose all data that has not been mirrored. The amount of data can be estimated using metrics. The record-age-ms metric indicates the gap in seconds between the two clusters. You can also use the last incoming byte rate from the source cluster to estimate the amount of data potentially lost.

The accuracy of the translated consumer group offsets is lower than in a planned failover because translation happens at a regular interval and the time of the last translation may be unknown.

Depending on the state of cluster A, attempting to recover data might be possible. However, plan for a full loss of the cluster. Do not rely on extracting data after a disaster.

Figure 8.5. Cluster A is impacted by a failure

Cluster A is impacted by a failure

In this scenario, the failover is straightforward, as you simply need to stop the two MirrorMaker connectors and then restart all your applications, now configured to connect to cluster B, which has become the new primary cluster.

Before restarting consuming applications, you can use the RemoteClusterUtils utility to translate consumer group offsets using the latest available checkpoints as this can produce slightly better offsets than what MirrorCheckpointConnector last committed. For example, you can run the following logic that translates consumer group offsets and commits them to the target cluster:

Map<String, Object> props = Map.of(
    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<TARGET_BOOTSTRAP_SERVERS>",
    MirrorClientConfig.REPLICATION_POLICY_CLASS, IdentityReplicationPolicy.class.getName()
);

// Get the translated offsets
Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
    RemoteClusterUtils.translateOffsets(
        props, 
1

        "A", 
2

        "my-consumer-group", 
3

        Duration.ofSeconds(600)); 
4


Map<String, Object> adminProps = Map.of(
    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<TARGET_BOOTSTRAP_SERVERS>"
);
try (Admin admin = Admin.create(adminProps)) {
    // Use Admin API to override the offsets for the consumer group
    admin.alterConsumerGroupOffsets("my-consumer-group", translatedOffsets)
        .all()
        .get(10, TimeUnit.SECONDS);
}

The arguments are as follows:

  • props: Configuration properties required for MirrorClient to talk to the target cluster. For example, the properties have connection details to connect cluster B. It should also include the "replication.policy.class" configuration that must be set to the same value set for the MirrorMaker so that MirrorClient knows the format of the mirrored topics.
  • "A": Alias given to the source cluster (A in this example).
  • "my-consumer-group": Group ID that you want the translated offset for.
  • Duration.ofSeconds(600): The timeout to translate offsets. The translateOffsets() method needs to consume the entire checkpoints topics, so ensure the timeout is long enough.

Figure 8.6. The state of the disaster recovery environment after losing cluster A and performing an unplanned failover

The state of the disaster recovery environment after losing cluster A and performing an unplanned failover

Following a planned or unplanned failover, you don’t have a disaster recovery environment anymore, so you need to set up a new disaster recovery cluster.

Chapter 9. Resetting mirroring after failover

After failing over, set up a new disaster recovery environment. The timeline to do so depends on the business requirements defined in your disaster recovery plan.

9.1. Rebuilding a disaster recovery cluster

The most straightforward approach to rebuild a disaster recovery environment is to restart from scratch once a new disaster recovery Kafka cluster is ready. In this case, the new disaster recovery cluster is empty and has no data. You can follow the same steps used when setting up the initial disaster recovery environment, but inverting the role of the cluster so that cluster B is the primary cluster and cluster A is the disaster recovery cluster.

Figure 9.1. Starting a new disaster recovery environment mirroring data from cluster B to cluster A

Starting a new disaster recovery environment mirroring data from cluster B to cluster A

9.2. Reusing the original cluster by mirroring the delta

After a planned failover, you might want to fail back. This can be the case if you just practiced a planned failover, or if you executed a planned failover anticipating a failure and the failure did not happen. In this scenario, it is inefficient to delete all data from the second cluster and restart from scratch as this cluster has most of your data already. It would also significantly increase your recovery time and potentially leave you without an in-sync disaster recovery environment for a prolonged period of time. Instead, you can set up MirrorMaker to only mirror the data delta between the two clusters. For example, two records produced to my-topic-0 in cluster B after the failover are the delta:

Figure 9.2. Mirroring only the delta, the last 2 records in my-topic-0, between cluster B and cluster A

Mirroring only the delta

Following a planned failover, mirroring the delta is only possible if there is some data overlap between the two clusters, meaning that the primary cluster (cluster B) still has the log end offsets captured when failing over. In order to validate whether this is the case, you can use the kafka-get-offsets.sh tool to retrieve the log start offset for each partition and confirm they are smaller or equal to the log end offsets you captured earlier plus 1:

bin/kafka-get-offsets.sh \
  --bootstrap-server <PRIMARY_BOOTSTRAP_SERVERS> \
  --time earliest \
  --topic-partitions <TOPICS>

The arguments are:

<PRIMARY_BOOTSTRAP_SERVERS>
A comma separated list of bootstrap servers of the primary cluster, which is cluster B in this example. For clusters with authentication, you can put the required settings in a file and use the –command-config argument.
<TOPICS>
A comma separated list of topic names and regexes

For all the partitions that still contain the log end offset you captured during the disaster recovery, you can mirror only the data delta. You use the captured log end offsets as the starting positions when restarting MirrorSourceConnector. For the partitions that don’t contain the captured log end offsets, you need to pick their starting position explicitly, as described in the "Starting Position" section.

The following examples show how to determine whether you can mirror the delta or not. After the failover, cluster A is now the disaster recovery cluster, mirroring from primary cluster B. In both cases, the log end offset captured for my-topic-0 when failing over is 2.

In this first example, the earliest offset on the primary cluster is 0. Since it is smaller than 2, you can mirror only the delta (offsets 3 and 4). You can set the starting position to the next offset of the captured offset which is 3 in this case.

Figure 9.3. Cluster B log start offset is smaller than the log end offset captured when failing over

Cluster B log start offset is smaller than the log end offset captured when failing over

In this second example, the earliest offset of the source topic is 6, which is greater than the captured offset (2). Therefore, you have no delta you can mirror, the records with offsets from 3 to 5 are not on the clusters. In this case, you would have to choose the starting position explicitly, and you must clear all data from cluster A and reset mirroring as described in the Fresh Disaster Recovery Cluster section.

Figure 9.4. Cluster B log start offset is larger than the log end offset captured when failing over

Cluster B log start offset is larger than the log end offset captured when failing over

9.3. Failing back to a preferred primary cluster

In some cases a cluster may be preferred as the primary cluster after it is restored. This can be because it’s in a closer or cheaper datacenter.

Failing back is the act of switching the primary cluster back to the preferred cluster. Once the clusters are back in-sync, failing back is done by first doing a planned failover and then resetting MirrorMaker configuration to just mirror the data delta.

Suppose cluster B is the primary cluster that all your applications use, and MirrorMaker is mirroring data from cluster B to cluster A.

Figure 9.5. The state of the disaster recovery environment before failing back to cluster A

The state of the disaster recovery environment before failing back to cluster A

After validating all the prerequisites for a planned failover are achieved, you can fail over to cluster A:

Figure 9.6. The state of the disaster recovery environment after failing back to cluster A

The state of the disaster recovery environment after failing back to cluster A

At this point, you now need to reset mirroring. You should be able to mirror only the delta between the clusters by setting the custom starting positions of the MirrorSourceConnector:

Figure 9.7. The state of the disaster recovery environment mirroring data from cluster A to cluster B

The state of the disaster recovery environment mirroring data from cluster A to cluster B

Once MirrorMaker gets both clusters back in-sync, you have returned to the initial state.

Appendix A. Using your subscription

Streams for Apache Kafka is provided through a software subscription. To manage your subscriptions, access your account at the Red Hat Customer Portal.

A.1. Accessing Your Account

  1. Go to access.redhat.com.
  2. If you do not already have an account, create one.
  3. Log in to your account.

A.2. Activating a Subscription

  1. Go to access.redhat.com.
  2. Navigate to My Subscriptions.
  3. Navigate to Activate a subscription and enter your 16-digit activation number.

A.3. Downloading Zip and Tar Files

To access zip or tar files, use the customer portal to find the relevant files for download. If you are using RPM packages, this step is not required.

  1. Open a browser and log in to the Red Hat Customer Portal Product Downloads page at access.redhat.com/downloads.
  2. Locate the Streams for Apache Kafka entries in the INTEGRATION AND AUTOMATION category.
  3. Select the desired Streams for Apache Kafka product. The Software Downloads page opens.
  4. Click the Download link for your component.

A.4. Installing packages with DNF

To install a package and all the package dependencies, use:

dnf install <package_name>

To install a previously-downloaded package from a local directory, use:

dnf install <path_to_download_package>

Revised on 2026-03-27 11:37:39 UTC

Legal Notice

Copyright © Red Hat.
Except as otherwise noted below, the text of and illustrations in this documentation are licensed by Red Hat under the Creative Commons Attribution–Share Alike 3.0 Unported license . If you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, the Red Hat logo, JBoss, Hibernate, and RHCE are trademarks or registered trademarks of Red Hat, LLC. or its subsidiaries in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
XFS is a trademark or registered trademark of Hewlett Packard Enterprise Development LP or its subsidiaries in the United States and other countries.
The OpenStack® Word Mark and OpenStack logo are trademarks or registered trademarks of the Linux Foundation, used under license.
All other trademarks are the property of their respective owners.
Red Hat logoGithubredditYoutubeTwitter

Aprender

Pruebe, compre y venda

Comunidades

Acerca de la documentación de Red Hat

Ayudamos a los usuarios de Red Hat a innovar y alcanzar sus objetivos con nuestros productos y servicios con contenido en el que pueden confiar. Explore nuestras recientes actualizaciones.

Hacer que el código abierto sea más inclusivo

Red Hat se compromete a reemplazar el lenguaje problemático en nuestro código, documentación y propiedades web. Para más detalles, consulte el Blog de Red Hat.

Acerca de Red Hat

Ofrecemos soluciones reforzadas que facilitan a las empresas trabajar en plataformas y entornos, desde el centro de datos central hasta el perímetro de la red.

Theme

© 2026 Red Hat
Volver arriba