Chapter 5. Setting up MirrorMaker for disaster recovery


To move your workloads between clusters, both clusters should have similar resources. In cases when disaster recovery is only necessary for a subset of the topics, the disaster recovery cluster might have less resources. Both clusters should also have similar configurations. In addition, MirrorMaker expects the disaster recovery cluster to only be a hot standby cluster. Clients must only be connected to the primary cluster, and only MirrorMaker should connect and mirror data to the disaster recovery cluster.

The MirrorMaker cluster is typically deployed near the target cluster (e.g. in the same network zone or Kubernetes cluster). This setup reduces latency accessing the internal topics of the connectors as they are stored in the target cluster. Another benefit of this setup is that in the event of a disaster, the MirrorMaker cluster is unlikely to be impacted.

For production use cases, run MirrorMaker on a multi-worker distributed Kafka Connect cluster. This is necessary because distributed mode enables scaling the Kafka Connect workers to match the source cluster throughput and it provides resiliency to worker failures. Distributed mode also enables the REST API which is very useful for operating a MirrorMaker environment.

The first step is to prepare the configurations for the Kafka Connect cluster and the two connectors.

5.1. Configuring Kafka Connect for MirrorMaker

When setting up Kafka Connect for MirrorMaker 2, you need to configure both the Kafka Connect worker and the connectors. Kafka Connect offers dozens of configuration settings. You must carefully consider the following settings:

group.id
All the workers in the Kafka Connect cluster must have the same group.id configuration. This name appears in the metrics of the Kafka clients each worker uses internally.
plugin.discovery
The strategy defines how Kafka Connect finds plugin classes for connectors. Set to service_load because it is the fastest strategy and is compatible with MirrorMaker connectors, significantly improving worker start-up time.
key.converter, value.converter
Set to org.apache.kafka.connect.converters.ByteArrayConverter which MirrorMaker connectors must use. If your Kafka Connect cluster runs only MirrorMaker, set the converters directly in Kafka Connect, so that you don’t need to set them in each connector configuration.
config.storage.topic, offset.storage.topic, status.storage.topic
These define the names for the internal topics used by Kafka Connect. All the workers in the Kafka Connect cluster must have the same values. We recommend setting them to unique names that clearly identify the Kafka Connect cluster. To ensure these topics are not mirrored, prefix them with "__" (double underscores).
listeners
This configuration determines the list of listeners the REST API is exposed on. If not set, the default is "http://:8083", which means the REST API can be accessed via HTTP on the default network interface (e.g. localhost) at port 8083. Workers need to be able to connect to each other via their listeners. More information on how to secure the REST API and configure advertised listeners for Kafka Connect workers can be found on the Apache Kafka website.
bootstrap.servers, security.protocol, sasl.mechanism
These configurations provide the connection details for Kafka Connect to connect to the target cluster (these are just the main ones). The connection details must also be provided for connectors, using the target alias as a prefix for the configurations. Use the same credentials to connect to the target cluster for both Kafka Connect and connectors so that they both use the same identity and are able to share ACLs. Otherwise, you must set ACLs for each identity they use.

These are configurations to set for both MirrorSourceConnector and MirrorCheckpointConnector:

name
The name uniquely identifies a connector. We recommend using names with this format <SOURCE>-to-<TARGET>-<CONNECTOR>, for example "A-to-B-MirrorSourceConnector". We recommend avoiding special characters as the connector names appear in paths of the REST API and in metric names and labels.
tasks.max
This indicates the maximum number of tasks Kafka Connect can start for this connector. Tasks are the unit of parallelism. Having more tasks makes the workload more granular and easier to distribute across all workers, especially when a worker is restarted or crashes. However, each task adds some overhead. Aim for as many tasks as CPU cores in the Kafka Connect clusters.
source.cluster.alias
This is the alias given to the source cluster. This name appears in logs and metrics.
target.cluster.alias
This is the alias given to the target cluster. This name appears in logs and metrics.
offset-syncs.topic.location
This determines where to store the offset-syncs topic. By default, it is set to source; however, it is recommended to set this to target so that MirrorMaker can be granted read-only access to the source cluster and the topic can be created in the target cluster. This topic is named mm2-offset-syncs.<SOURCE_ALIAS>.internal
replication.policy.class
The replication policy determines how replicated topics are named and detects mirroring cycles in bidirectional mirroring. Disaster recovery uses unidirectional mirroring, so set this to “org.apache.kafka.connect.mirror.IdentityReplicationPolicy” to have the same topic names on both clusters.
source.cluster.bootstrap.servers, source.cluster.security.protocol, source.cluster.sasl.mechanism, target.cluster.bootstrap.servers, target.cluster.security.protocol, target.cluster.sasl.mechanism
These configurations are set using aliases "source.cluster." and "target.cluster." as prefixes to provide connection details for both clusters. These are just an example of the main ones but there are more configurations for the connection details that can be set using these prefixes. As mentioned previously, use the same credentials provided for Kafka Connect when configuring these properties with the target alias.

5.3. Configuring MirrorSourceConnector

The following are key configurations for MirrorSourceConnector:

heartbeats.replication.enabled
This determines whether to replicate the heartbeats topics which MirrorHeartbeatConnector emits records to. Set this to false because as explained previously, it is not necessary to run MirrorHeartbeatConnector for disaster recovery, so you can disable this feature.
replication.factor
This defines the replication factor for the new topics on the target cluster. Set this to -1 to use the target Kafka cluster default replication factor.
offset-syncs.topic.replication.factor
This defines the replication factor of the offset-syncs topic. Set this to -1 to use the target Kafka cluster default replication factor. When created in the target cluster, this topic is named "mm2-offset-syncs.<SOURCE_ALIAS>.internal".
topics
This determines which topics are mirrored. It accepts both explicit topic names and regular expression patterns. By default, it is set to "." meaning all topics from the source cluster are mirrored except those that match the topics.exclude filter. This filter defaults to "mm2.\.internal, .\.replica, __." to avoid unintentionally mirroring internal topics. You should always set topics explicitly to the exact list of topic names or regex patterns you intend to mirror.
sync.topic.acls.enabled
This determines whether to enable synchronization of topic ACLs. This is set to true by default. If you don’t use authorizations you should disable this feature by setting it to false.
offset.lag.max
This determines how frequently an offset-sync record is emitted for each partition. By default, it is set to 100, which means an offset-sync is written to the offset-syncs topic every time 100 records have been mirrored from a partition. This configuration is often misunderstood. In most cases, keep the default value. Setting it to a lower value causes more frequent offset-syncs which are individually small records but since this setting affects all partitions, it can significantly increase the throughput and size of the offset-syncs topic.
producer.override.<configuration name>
Connectors can override configurations for the producers used by Kafka Connect to mirror the records to the target cluster. In high throughput environments, it can be beneficial to increase the batch size (batch.size) and enable compression (compression.type). Producer overrides must be specified in the connector configuration with the “producer.override.” prefix, for example “producer.override.compression.type”.

These are the main configurations to be set when you start the connector. However, there are more configurations that you can set for fine tuning. You can list them using the REST API, for example:

curl http://<CONNECT>/connector-plugins/MirrorSourceConnector/config

5.4. Configuring MirrorCheckpointConnector

The following are key configurations for MirrorCheckpointConnector:

checkpoints.topic.replication.factor
This defines the replication factor of the checkpoints topic. Set this to -1 to use the target Kafka cluster default replication factor. This topic is named "<SOURCE_ALIAS>.checkpoints.internal".
sync.group.offsets.enabled
This determines whether translated offsets are periodically committed to the target cluster. Set this to true to allow consumers to easily move between clusters. If set to false, consumer group offsets must be translated manually using a tool like RemoteClusterUtils and then committed explicitly (for example, via Admin API) when moving consumers between clusters.
groups
This determines which consumer groups are mirrored. It accepts both explicit group names and regular expression patterns. By default, it is set to "." meaning all groups from the source cluster are mirrored except those that match the groups.exclude filter. This filter defaults to "console-consumer-., connect-., __." to avoid unintentionally mirroring consumer groups used by internal tools. You should always set groups explicitly to the exact list of group names or patterns you intend to mirror.

These are the main configurations to be set when you start the connector. However, there are more configurations that you can set for fine tuning. You can list them using the REST API, for example:

curl http://<CONNECT>/connector-plugins/MirrorCheckpointConnector/config

5.5. Configuring exactly-once support

To avoid data loss or duplicates in the target cluster while mirroring, enable exactly-once semantics for Kafka Connect and MirrorSourceConnector. Exactly-once support is achieved by using transactions to ensure that the two actions: sending the data to Kafka and committing offsets to the offset storage topic, both succeed or both fail atomically.

exactly.once.source.support
This determines whether to enable exactly-once support for source connectors by using transactions to write records and their offsets. Set this to “enabled” so that MirrorSourceConnector can write to the target cluster with exactly-once semantics.
consumer.isolation.level
This defines how to read records written using transactions. Set to “read_committed” to enable exactly-once semantics in MirrorSourceConnector so that it only reads transactional records that have been committed.

In most cases, you don’t need to set other configurations. However you can set transaction.boundary to poll and adjust transaction.boundary.interval.ms if you want to reduce the number of transactions created by the connector.

transaction.boundary
This defines when Kafka Connect should create and commit its transactions. MirrorSourceConnector supports setting this configuration to "poll" (the default) and "interval". If set to "poll", a new transaction is started and committed for every batch of records that a connector task provides to Kafka Connect. If set to "interval", transactions are committed after an interval defined by another configuration, "transaction.boundary.interval.ms".
transaction.boundary.interval.ms
This determines the interval at which transactions are committed. If not set, the value from "offset.flush.interval.ms" is used (1 minute by default).

5.6. Configuration examples

This is an example configuration file (e.g. connect-distributed.properties) for a Kafka Connect worker:

bootstrap.servers=<TARGET_BOOTSTRAP_SERVERS>

group.id=connect-mirrormaker-A-to-B

plugin.discovery=service_load

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

offset.storage.topic=__connect-mirrormaker-A-to-B-offsets
offset.storage.replication.factor=-1
config.storage.topic=__connect-mirrormaker-A-to-B-configs
config.storage.replication.factor=-1
status.storage.topic=__connect-mirrormaker-A-to-B-status
status.storage.replication.factor=-1

exactly.once.source.support=enabled

This is an example configuration for a MirrorSourceConnector mirroring data from cluster A to cluster B:

{
    "name": "A-to-B-MirrorSourceConnector",
    "config": {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "tasks.max": "30",

        "source.cluster.alias": "A",
        "target.cluster.alias": "B",
        "source.cluster.bootstrap.servers": "<SOURCE_BOOTSTRAP_SERVERS>",
        "target.cluster.bootstrap.servers": "<TARGET_BOOTSTRAP_SERVERS>",

        "offset-syncs.topic.location": "target",
        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",

        "heartbeats.replication.enabled": "false",
        "replication.factor": "-1",
        "offset-syncs.topic.replication.factor": "-1",
        "sync.topic.acls.enabled": "false",
        "consumer.isolation.level": "read_committed",
        "topics": "<TOPICS>"
    }
}

This is an example configuration for a MirrorCheckpointConnector mirroring consumer groups from cluster A to cluster B:

{
    "name": "A-to-B-MirrorCheckpointConnector",
    "config": {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "tasks.max": "30",

        "source.cluster.alias": "A",
        "target.cluster.alias": "B",
        "source.cluster.bootstrap.servers": "<SOURCE_BOOTSTRAP_SERVERS>",
        "target.cluster.bootstrap.servers": "<TARGET_BOOTSTRAP_SERVERS>",

        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
        "offset-syncs.topic.location": "target",

        "checkpoints.topic.replication.factor": "-1",
        "sync.group.offsets.enabled": "true",
        "groups": "<GROUPS>"
    }
}

5.7. Configuring authorizations for MirrorMaker

Most production environments use authorizations to control access to Kafka resources. In that case, to run MirrorMaker, you must grant permissions to the users employed by the Kafka Connect workers and the MirrorMaker connectors for both source and target clusters.

5.7.1. Source cluster access control

When the offset-syncs topic is placed on the target cluster, you only require READ, DESCRIBE, and DESCRIBE_CONFIGS permissions in the source cluster:

  • READ, DESCRIBE and DESCRIBE_CONFIGS on each topic to mirror
  • DESCRIBE on the cluster
  • DESCRIBE on each consumer group to mirror

5.7.2. Target cluster access control

MirrorMaker requires permissions on the target cluster to create topics, maintain their configurations, mirror their records, and manage consumer group offsets:

  • CREATE, DESCRIBE, WRITE, ALTER, ALTER_CONFIGS for each mirrored topic
  • DESCRIBE, READ on each mirrored consumer group
  • READ on each topic with offsets for each mirrored consumer group
  • CREATE, DESCRIBE, WRITE, READ on the offset-syncs topic
  • CREATE, DESCRIBE, WRITE on the checkpoints topic
  • DESCRIBE, ALTER on the cluster

You can set the ACLs using the command line tool or the Admin Client API.

For example, to set the ACLs for the offset-syncs topic via the kafka-acl.sh tool:

$ bin/kafka-acls.sh \
  --bootstrap-server <TARGET_BOOTSTRAP_SERVERS> \
  --add --allow-principal <PRINCIPAL> \
  --operation Read \
  --operation Write \
  --operation Create \
  --operation Describe \
  --topic mm2-offset-syncs.<SOURCE_ALIAS>.internal

You can also use Admin API to set the ACLs:

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "<TARGET_BOOTSTRAP_SERVERS>");

try (Admin admin = Admin.create(props)) {
    String user = "<PRINCIPAL>";
    String topic = "mm2-offset-syncs.<SOURCE_ALIAS>.internal";

    ResourcePattern resource = new ResourcePattern(
        ResourceType.TOPIC,
        topic,
        PatternType.LITERAL
    );

    List<AclBinding> acls = List.of(
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.READ, AclPermissionType.ALLOW)),
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.WRITE, AclPermissionType.ALLOW)),
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.CREATE, AclPermissionType.ALLOW)),
        new AclBinding(resource, new AccessControlEntry(user, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
    );

    admin.createAcls(acls).all().get();
}

In both cases, the arguments are:

<TARGET_BOOTSTRAP_SERVERS>
A comma separated list of bootstrap servers of the target cluster. For clusters with authentication, we can put the required settings in a file and use the –command-config argument.
<PRINCIPAL>
The principal given to Kafka Connect and MirrorMaker.
<SOURCE_ALIAS>
The alias of the source cluster.
Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat Documentation

Legal Notice

Theme

© 2026 Red Hat
Back to top