Chapter 9. Using AMQ Streams with MirrorMaker 2
Use MirrorMaker 2 to replicate data between two or more active Kafka clusters, within or across data centers.
To configure MirrorMaker 2, edit the config/connect-mirror-maker.properties
configuration file. If required, you can enable distributed tracing for MirrorMaker 2.
Handling high volumes of messages
You can tune the configuration to handle high volumes of messages. For more information, see Chapter 11, Handling high volumes of messages.
MirrorMaker 2 has features not supported by the previous version of MirrorMaker. However, you can configure MirrorMaker 2 to be used in legacy mode.
9.1. MirrorMaker 2 data replication
Data replication across clusters supports scenarios that require:
- Recovery of data in the event of a system failure
- Aggregation of data for analysis
- Restriction of data access to a specific cluster
- Provision of data at a specific location to improve latency
9.1.1. MirrorMaker 2 configuration
MirrorMaker 2 consumes messages from a source Kafka cluster and writes them to a target Kafka cluster.
MirrorMaker 2 uses:
- Source cluster configuration to consume data from the source cluster
- Target cluster configuration to output data to the target cluster
MirrorMaker 2 is based on the Kafka Connect framework, connectors managing the transfer of data between clusters.
You configure MirrorMaker 2 to define the Kafka Connect deployment, including the connection details of the source and target clusters, and then run a set of MirrorMaker 2 connectors to make the connection.
MirrorMaker 2 consists of the following connectors:
MirrorSourceConnector
-
The source connector replicates topics from a source cluster to a target cluster. It also replicates ACLs and is necessary for the
MirrorCheckpointConnector
to run. MirrorCheckpointConnector
- The checkpoint connector periodically tracks offsets. If enabled, it also synchronizes consumer group offsets between the source and target cluster.
MirrorHeartbeatConnector
- The heartbeat connector periodically checks connectivity between the source and target cluster.
If you are using the User Operator to manage ACLs, ACL replication through the connector is not possible.
The process of mirroring data from a source cluster to a target cluster is asynchronous. Each MirrorMaker 2 instance mirrors data from one source cluster to one target cluster. You can use more than one MirrorMaker 2 instance to mirror data between any number of clusters.
Figure 9.1. Replication across two clusters
By default, a check for new topics in the source cluster is made every 10 minutes. You can change the frequency by adding refresh.topics.interval.seconds
to the source connector configuration.
9.1.1.1. Cluster configuration
You can use MirrorMaker 2 in active/passive or active/active cluster configurations.
- active/active cluster configuration
- An active/active configuration has two active clusters replicating data bidirectionally. Applications can use either cluster. Each cluster can provide the same data. In this way, you can make the same data available in different geographical locations. As consumer groups are active in both clusters, consumer offsets for replicated topics are not synchronized back to the source cluster.
- active/passive cluster configuration
- An active/passive configuration has an active cluster replicating data to a passive cluster. The passive cluster remains on standby. You might use the passive cluster for data recovery in the event of system failure.
The expectation is that producers and consumers connect to active clusters only. A MirrorMaker 2 cluster is required at each target destination.
9.1.1.2. Bidirectional replication (active/active)
The MirrorMaker 2 architecture supports bidirectional replication in an active/active cluster configuration.
Each cluster replicates the data of the other cluster using the concept of source and remote topics. As the same topics are stored in each cluster, remote topics are automatically renamed by MirrorMaker 2 to represent the source cluster. The name of the originating cluster is prepended to the name of the topic.
Figure 9.2. Topic renaming
By flagging the originating cluster, topics are not replicated back to that cluster.
The concept of replication through remote topics is useful when configuring an architecture that requires data aggregation. Consumers can subscribe to source and remote topics within the same cluster, without the need for a separate aggregation cluster.
9.1.1.3. Unidirectional replication (active/passive)
The MirrorMaker 2 architecture supports unidirectional replication in an active/passive cluster configuration.
You can use an active/passive cluster configuration to make backups or migrate data to another cluster. In this situation, you might not want automatic renaming of remote topics.
You can override automatic renaming by adding IdentityReplicationPolicy
to the source connector configuration. With this configuration applied, topics retain their original names.
9.1.2. Topic configuration synchronization
MirrorMaker 2 supports topic configuration synchronization between source and target clusters. You specify source topics in the MirrorMaker 2 configuration. MirrorMaker 2 monitors the source topics. MirrorMaker 2 detects and propagates changes to the source topics to the remote topics. Changes might include automatically creating missing topics and partitions.
In most cases you write to local topics and read from remote topics. Though write operations are not prevented on remote topics, they should be avoided.
9.1.3. Offset tracking
MirrorMaker 2 tracks offsets for consumer groups using internal topics.
offset-syncs
topic-
The
offset-syncs
topic maps the source and target offsets for replicated topic partitions from record metadata. checkpoints
topic-
The
checkpoints
topic maps the last committed offset in the source and target cluster for replicated topic partitions in each consumer group.
As they used internally by MirrorMaker 2, you do not interact directly with these topics.
MirrorCheckpointConnector
emits checkpoints for offset tracking. Offsets for the checkpoints
topic are tracked at predetermined intervals through configuration. Both topics enable replication to be fully restored from the correct offset position on failover.
The location of the offset-syncs
topic is the source
cluster by default. You can use the offset-syncs.topic.location
connector configuration to change this to the target
cluster. You need read/write access to the cluster that contains the topic. Using the target cluster as the location of the offset-syncs
topic allows you to use MirrorMaker 2 even if you have only read access to the source cluster.
9.1.4. Synchronizing consumer group offsets
The __consumer_offsets
topic stores information on committed offsets for each consumer group. Offset synchronization periodically transfers the consumer offsets for the consumer groups of a source cluster into the consumer offsets topic of a target cluster.
Offset synchronization is particularly useful in an active/passive configuration. If the active cluster goes down, consumer applications can switch to the passive (standby) cluster and pick up from the last transferred offset position.
To use topic offset synchronization, enable the synchronization by adding sync.group.offsets.enabled
to the checkpoint connector configuration, and setting the property to true
. Synchronization is disabled by default.
When using the IdentityReplicationPolicy
in the source connector, it also has to be configured in the checkpoint connector configuration. This ensures that the mirrored consumer offsets will be applied for the correct topics.
Consumer offsets are only synchronized for consumer groups that are not active in the target cluster. If the consumer groups are in the target cluster, the synchronization cannot be performed and an UNKNOWN_MEMBER_ID
error is returned.
If enabled, the synchronization of offsets from the source cluster is made periodically. You can change the frequency by adding sync.group.offsets.interval.seconds
and emit.checkpoints.interval.seconds
to the checkpoint connector configuration. The properties specify the frequency in seconds that the consumer group offsets are synchronized, and the frequency of checkpoints emitted for offset tracking. The default for both properties is 60 seconds. You can also change the frequency of checks for new consumer groups using the refresh.groups.interval.seconds
property, which is performed every 10 minutes by default.
Because the synchronization is time-based, any switchover by consumers to a passive cluster will likely result in some duplication of messages.
If you have an application written in Java, you can use the RemoteClusterUtils.java
utility to synchronize offsets through the application. The utility fetches remote offsets for a consumer group from the checkpoints
topic.
9.1.5. Connectivity checks
MirrorHeartbeatConnector
emits heartbeats to check connectivity between clusters.
An internal heartbeat
topic is replicated from the source cluster. Target clusters use the heartbeat
topic to check the following:
- The connector managing connectivity between clusters is running
- The source cluster is available
9.2. Connector configuration
Use Mirrormaker 2 connector configuration for the internal connectors that orchestrate the synchronization of data between Kafka clusters.
The following table describes connector properties and the connectors you configure to use them.
Property | sourceConnector | checkpointConnector | heartbeatConnector |
---|---|---|---|
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ |
9.3. Connector producer and consumer configuration
MirrorMaker 2 connectors use internal producers and consumers. If needed, you can configure these producers and consumers to override the default settings.
Producer and consumer configuration options depend on the MirrorMaker 2 implementation, and may be subject to change.
Producer and consumer configuration applies to all connectors. You specify the configuration in the config/connect-mirror-maker.properties
file.
Use the properties file to override any default configuration for the producers and consumers in the following format:
-
<source_cluster_name>.consumer.<property>
-
<source_cluster_name>.producer.<property>
-
<target_cluster_name>.consumer.<property>
-
<target_cluster_name>.producer.<property>
The following example shows how you configure the producers and consumers. Though the properties are set for all connectors, some configuration properties are only relevant to certain connectors.
Example configuration for connector producers and consumers
clusters=cluster-1,cluster-2 # ... cluster-1.consumer.fetch.max.bytes=52428800 cluster-2.producer.batch.size=327680 cluster-2.producer.linger.ms=100 cluster-2.producer.request.timeout.ms=30000
9.4. Specifying a maximum number of tasks
Connectors create the tasks that are responsible for moving data in and out of Kafka. Each connector comprises one or more tasks that are distributed across a group of worker pods that run the tasks. Increasing the number of tasks can help with performance issues when replicating a large number of partitions or synchronizing the offsets of a large number of consumer groups.
Tasks run in parallel. Workers are assigned one or more tasks. A single task is handled by one worker pod, so you don’t need more worker pods than tasks. If there are more tasks than workers, workers handle multiple tasks.
You can specify the maximum number of connector tasks in your MirrorMaker configuration using the tasks.max
property. Without specifying a maximum number of tasks, the default setting is a single task.
The heartbeat connector always uses a single task.
The number of tasks that are started for the source and checkpoint connectors is the lower value between the maximum number of possible tasks and the value for tasks.max
. For the source connector, the maximum number of tasks possible is one for each partition being replicated from the source cluster. For the checkpoint connector, the maximum number of tasks possible is one for each consumer group being replicated from the source cluster. When setting a maximum number of tasks, consider the number of partitions and the hardware resources that support the process.
If the infrastructure supports the processing overhead, increasing the number of tasks can improve throughput and latency. For example, adding more tasks reduces the time taken to poll the source cluster when there is a high number of partitions or consumer groups.
tasks.max configuration for MirrorMaker connectors
clusters=cluster-1,cluster-2 # ... tasks.max = 10
By default, MirrorMaker 2 checks for new consumer groups every 10 minutes. You can adjust the refresh.groups.interval.seconds
configuration to change the frequency. Take care when adjusting lower. More frequent checks can have a negative impact on performance.
9.5. ACL rules synchronization
If AclAuthorizer
is being used, ACL rules that manage access to brokers also apply to remote topics. Users that can read a source topic can read its remote equivalent.
OAuth 2.0 authorization does not support access to remote topics in this way.
9.6. Running MirrorMaker 2 in dedicated mode
Use MirrorMaker 2 to synchronize data between Kafka clusters through configuration. This procedure shows how to configure and run a dedicated single-node MirrorMaker 2 cluster. Dedicated clusters use Kafka Connect worker nodes to mirror data between Kafka clusters. At present, MirrorMaker 2 in dedicated mode only works with a single worker node.
It is also possible to run MirrorMaker 2 in distributed mode. In distributed mode, MirrorMaker 2 runs as connectors in a Kafka Connect cluster. Kafka provides MirrorMaker source connectors for data replication. If you wish to use the connectors instead of running a dedicated MirrorMaker cluster, the connectors must be configured in the Kafka Connect cluster. For more information, refer to the Apache Kafka documentation.
The previous version of MirrorMaker continues to be supported, by running MirrorMaker 2 in legacy mode.
The configuration must specify:
- Each Kafka cluster
- Connection information for each cluster, including TLS authentication
The replication flow and direction
- Cluster to cluster
- Topic to topic
- Replication rules
- Committed offset tracking intervals
This procedure describes how to implement MirrorMaker 2 by creating the configuration in a properties file, then passing the properties when using the MirrorMaker script file to set up the connections.
You can specify the topics and consumer groups you wish to replicate from a source cluster. You specify the names of the source and target clusters, then specify the topics and consumer groups to replicate.
In the following example, topics and consumer groups are specified for replication from cluster 1 to 2.
Example configuration to replicate specific topics and consumer groups
clusters=cluster-1,cluster-2 cluster-1->cluster-2.topics = topic-1, topic-2 cluster-1->cluster-2.groups = group-1, group-2
You can provide a list of names or use a regular expression. By default, all topics and consumer groups are replicated if you do not set these properties. You can also replicate all topics and consumer groups by using .*
as a regular expression. However, try to specify only the topics and consumer groups you need to avoid causing any unnecessary extra load on the cluster.
Before you begin
A sample configuration properties file is provided in ./config/connect-mirror-maker.properties
.
Prerequisites
- You need AMQ Streams installed on the hosts of each Kafka cluster node you are replicating.
Procedure
Open the sample properties file in a text editor, or create a new one, and edit the file to include connection information and the replication flows for each Kafka cluster.
The following example shows a configuration to connect two clusters, cluster-1 and cluster-2, bidirectionally. Cluster names are configurable through the
clusters
property.Example MirrorMaker 2 configuration
clusters=cluster-1,cluster-2 1 cluster-1.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_one>:443 2 cluster-1.security.protocol=SSL 3 cluster-1.ssl.truststore.password=<truststore_name> cluster-1.ssl.truststore.location=<path_to_truststore>/truststore.cluster-1.jks_ cluster-1.ssl.keystore.password=<keystore_name> cluster-1.ssl.keystore.location=<path_to_keystore>/user.cluster-1.p12 cluster-2.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_two>:443 4 cluster-2.security.protocol=SSL 5 cluster-2.ssl.truststore.password=<truststore_name> cluster-2.ssl.truststore.location=<path_to_truststore>/truststore.cluster-2.jks_ cluster-2.ssl.keystore.password=<keystore_name> cluster-2.ssl.keystore.location=<path_to_keystore>/user.cluster-2.p12 cluster-1->cluster-2.enabled=true 6 cluster-2->cluster-1.enabled=true 7 cluster-1->cluster-2.topics=.* 8 cluster-2->cluster-1.topics=topic-1, topic-2 9 cluster-1->cluster-2.groups=.* 10 cluster-2->cluster-1.groups=group-1, group-2 11 replication.policy.separator=- 12 sync.topic.acls.enabled=false 13 refresh.topics.interval.seconds=60 14 refresh.groups.interval.seconds=60 15
- 1
- Each Kafka cluster is identified with its alias.
- 2
- Connection information for cluster-1, using the bootstrap address and port 443. Both clusters use port 443 to connect to Kafka using OpenShift Routes.
- 3
- The
ssl.
properties define TLS configuration for cluster-1. - 4
- Connection information for cluster-2.
- 5
- The
ssl.
properties define the TLS configuration for cluster-2. - 6
- Replication flow enabled from cluster-1 to cluster-2.
- 7
- Replication flow enabled from cluster-2 to cluster-1.
- 8
- Replication of all topics from cluster-1 to cluster-2. The source connector replicates the specified topics. The checkpoint connector tracks offsets for the specified topics.
- 9
- Replication of specific topics from cluster-2 to cluster-1.
- 10
- Replication of all consumer groups from cluster-1 to cluster-2. The checkpoint connector replicates the specified consumer groups.
- 11
- Replication of specific consumer groups from cluster-2 to cluster-1.
- 12
- Defines the separator used for the renaming of remote topics.
- 13
- When enabled, ACLs are applied to synchronized topics. The default is
false
. - 14
- The period between checks for new topics to synchronize.
- 15
- The period between checks for new consumer groups to synchronize.
OPTION: If required, add a policy that overrides the automatic renaming of remote topics. Instead of prepending the name with the name of the source cluster, the topic retains its original name.
This optional setting is used for active/passive backups and data migration.
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
OPTION: If you want to synchronize consumer group offsets, add configuration to enable and manage the synchronization:
refresh.groups.interval.seconds=60 sync.group.offsets.enabled=true 1 sync.group.offsets.interval.seconds=60 2 emit.checkpoints.interval.seconds=60 3
- 1
- Optional setting to synchronize consumer group offsets, which is useful for recovery in an active/passive configuration. Synchronization is not enabled by default.
- 2
- If the synchronization of consumer group offsets is enabled, you can adjust the frequency of the synchronization.
- 3
- Adjusts the frequency of checks for offset tracking. If you change the frequency of offset synchronization, you might also need to adjust the frequency of these checks.
Start ZooKeeper and Kafka in the target clusters:
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon \ /opt/kafka/config/zookeeper.properties
/opt/kafka/bin/kafka-server-start.sh -daemon \ /opt/kafka/config/server.properties
Start MirrorMaker with the cluster connection configuration and replication policies you defined in your properties file:
/opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.properties
MirrorMaker sets up connections between the clusters.
For each target cluster, verify that the topics are being replicated:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --list
9.7. Using MirrorMaker 2 in legacy mode
This procedure describes how to configure MirrorMaker 2 to use it in legacy mode. Legacy mode supports the previous version of MirrorMaker.
The MirrorMaker script /opt/kafka/bin/kafka-mirror-maker.sh
can run MirrorMaker 2 in legacy mode.
Kafka MirrorMaker 1 (referred to as just MirrorMaker in the documentation) has been deprecated in Apache Kafka 3.0.0 and will be removed in Apache Kafka 4.0.0. As a result, Kafka MirrorMaker 1 has been deprecated in AMQ Streams as well. Kafka MirrorMaker 1 will be removed from AMQ Streams when we adopt Apache Kafka 4.0.0. As a replacement, use MirrorMaker 2 with the IdentityReplicationPolicy
.
Prerequisites
You need the properties files you currently use with the legacy version of MirrorMaker.
-
/opt/kafka/config/consumer.properties
-
/opt/kafka/config/producer.properties
Procedure
Edit the MirrorMaker
consumer.properties
andproducer.properties
files to turn off MirrorMaker 2 features.For example:
replication.policy.class=org.apache.kafka.mirror.LegacyReplicationPolicy 1 refresh.topics.enabled=false 2 refresh.groups.enabled=false emit.checkpoints.enabled=false emit.heartbeats.enabled=false sync.topic.configs.enabled=false sync.topic.acls.enabled=false
Save the changes and restart MirrorMaker with the properties files you used with the previous version of MirrorMaker:
su - kafka /opt/kafka/bin/kafka-mirror-maker.sh \ --consumer.config /opt/kafka/config/consumer.properties \ --producer.config /opt/kafka/config/producer.properties \ --num.streams=2
The
consumer
properties provide the configuration for the source cluster and theproducer
properties provide the target cluster configuration.MirrorMaker sets up connections between the clusters.
Start ZooKeeper and Kafka in the target cluster:
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
For the target cluster, verify that the topics are being replicated:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --list