9.9. Configuring Kafka MirrorMaker 2
Update the spec properties of the KafkaMirrorMaker2 custom resource to configure your MirrorMaker 2 deployment. MirrorMaker 2 uses source cluster configuration for data consumption and target cluster configuration for data output.
MirrorMaker 2 is based on the Kafka Connect framework, connectors managing the transfer of data between clusters.
You configure MirrorMaker 2 to define the Kafka Connect deployment, including the connection details of the source and target clusters, and then run a set of MirrorMaker 2 connectors to make the connection.
MirrorMaker 2 supports topic configuration synchronization between the source and target clusters. You specify source topics in the MirrorMaker 2 configuration. MirrorMaker 2 monitors the source topics. MirrorMaker 2 detects and propagates changes to the source topics to the remote topics. Changes might include automatically creating missing topics and partitions.
In most cases you write to local topics and read from remote topics. Though write operations are not prevented on remote topics, they should be avoided.
The configuration must specify:
- Each Kafka cluster
- Connection information for each cluster, including authentication
The replication flow and direction
- Cluster to cluster
- Topic to topic
For a deeper understanding of the Kafka MirrorMaker 2 cluster configuration options, refer to the Custom Resource API Reference.
MirrorMaker 2 resource configuration differs from the previous version of MirrorMaker, which is now deprecated. There is currently no legacy support, so any resources must be manually converted into the new format.
Default configuration
MirrorMaker 2 provides default configuration values for properties such as replication factors. A minimal configuration, with defaults left unchanged, would be something like this example:
Minimal configuration for MirrorMaker 2
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
version: 3.8.0
connectCluster: "my-cluster-target"
clusters:
- alias: "my-cluster-source"
bootstrapServers: my-cluster-source-kafka-bootstrap:9092
- alias: "my-cluster-target"
bootstrapServers: my-cluster-target-kafka-bootstrap:9092
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector: {}
You can configure access control for source and target clusters using mTLS or SASL authentication. This procedure shows a configuration that uses TLS encryption and mTLS authentication for the source and target cluster.
You can specify the topics and consumer groups you wish to replicate from a source cluster in the KafkaMirrorMaker2 resource. You use the topicsPattern and groupsPattern properties to do this. You can provide a list of names or use a regular expression. By default, all topics and consumer groups are replicated if you do not set the topicsPattern and groupsPattern properties. You can also replicate all topics and consumer groups by using ".*" as a regular expression. However, try to specify only the topics and consumer groups you need to avoid causing any unnecessary extra load on the cluster.
Handling high volumes of messages
You can tune the configuration to handle high volumes of messages. For more information, see Handling high volumes of messages.
Example KafkaMirrorMaker2 custom resource configuration
# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
# Deployment specifications
spec:
# Replicas (required)
replicas: 3
# Connect cluster name (required)
connectCluster: "my-cluster-target"
# Cluster configurations (required)
clusters:
- alias: "my-cluster-source"
# Authentication (optional)
authentication:
certificateAndKey:
certificate: source.crt
key: source.key
secretName: my-user-source
type: tls
bootstrapServers: my-cluster-source-kafka-bootstrap:9092
# TLS configuration (optional)
tls:
trustedCertificates:
- pattern: "*.crt"
secretName: my-cluster-source-cluster-ca-cert
- alias: "my-cluster-target"
# Authentication (optional)
authentication:
certificateAndKey:
certificate: target.crt
key: target.key
secretName: my-user-target
type: tls
bootstrapServers: my-cluster-target-kafka-bootstrap:9092
# Kafka Connect configuration (optional)
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
# TLS configuration (optional)
tls:
trustedCertificates:
- pattern: "*.crt"
secretName: my-cluster-target-cluster-ca-cert
# Mirroring configurations (required)
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
# Topic and group patterns (required)
topicsPattern: "topic1|topic2|topic3"
groupsPattern: "group1|group2|group3"
# Source connector configuration (required)
sourceConnector:
tasksMax: 10
autoRestart:
enabled: true
config:
replication.factor: 1
offset-syncs.topic.replication.factor: 1
sync.topic.acls.enabled: "false"
refresh.topics.interval.seconds: 60
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
# Heartbeat connector configuration (optional)
heartbeatConnector:
autoRestart:
enabled: true
config:
heartbeats.topic.replication.factor: 1
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
# Checkpoint connector configuration (optional)
checkpointConnector:
autoRestart:
enabled: true
config:
checkpoints.topic.replication.factor: 1
refresh.groups.interval.seconds: 600
sync.group.offsets.enabled: true
sync.group.offsets.interval.seconds: 60
emit.checkpoints.interval.seconds: 60
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
# Kafka version (recommended)
version: 3.8.0
# Resources requests and limits (recommended)
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 2Gi
# Logging configuration (optional)
logging:
type: inline
loggers:
connect.root.logger.level: INFO
# Readiness probe (optional)
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
# Liveness probe (optional)
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
# JVM options (optional)
jvmOptions:
"-Xmx": "1g"
"-Xms": "1g"
# Custom image (optional)
image: my-org/my-image:latest
# Rack awareness (optional)
rack:
topologyKey: topology.kubernetes.io/zone
# Pod template (optional)
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: application
operator: In
values:
- postgresql
- mongodb
topologyKey: "kubernetes.io/hostname"
connectContainer:
env:
- name: OTEL_SERVICE_NAME
value: my-otel-service
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otlp-host:4317"
# Tracing configuration (optional)
tracing:
type: opentelemetry
- 1
- The number of replica nodes for the workers that run tasks.
- 2
- Kafka cluster alias for Kafka Connect, which must specify the target Kafka cluster. The Kafka cluster is used by Kafka Connect for its internal topics.
- 3
- Specification for the Kafka clusters being synchronized.
- 4
- Cluster alias for the source Kafka cluster.
- 5
- Authentication for the source cluster, specified as mTLS, token-based OAuth, SASL-based SCRAM-SHA-256/SCRAM-SHA-512, or PLAIN.
- 6
- Bootstrap address for connection to the source Kafka cluster. The address takes the format
<cluster_name>-kafka-bootstrap:<port_number>. The Kafka cluster doesn’t need to be managed by Streams for Apache Kafka or deployed to a Kubernetes cluster. - 7
- TLS configuration for encrypted connections to the Kafka cluster, with trusted certificates stored in X.509 format within the specified secrets.
- 8
- Cluster alias for the target Kafka cluster.
- 9
- Authentication for the target Kafka cluster is configured in the same way as for the source Kafka cluster.
- 10
- Bootstrap address for connection to the target Kafka cluster. The address takes the format
<cluster_name>-kafka-bootstrap:<port_number>. The Kafka cluster doesn’t need to be managed by Streams for Apache Kafka or deployed to a Kubernetes cluster. - 11
- Kafka Connect configuration. Standard Apache Kafka configuration may be provided, restricted to those properties not managed directly by Streams for Apache Kafka.
- 12
- TLS encryption for the target Kafka cluster is configured in the same way as for the source Kafka cluster.
- 13
- MirrorMaker 2 connectors.
- 14
- Cluster alias for the source cluster used by the MirrorMaker 2 connectors.
- 15
- Cluster alias for the target cluster used by the MirrorMaker 2 connectors.
- 16
- Topic replication from the source cluster defined as a comma-separated list or regular expression pattern. The source connector replicates the specified topics. The checkpoint connector tracks offsets for the specified topics. Here we request three topics by name.
- 17
- Consumer group replication from the source cluster defined as a comma-separated list or regular expression pattern. The checkpoint connector replicates the specified consumer groups. Here we request three consumer groups by name.
- 18
- Configuration for the
MirrorSourceConnectorthat creates remote topics. Theconfigoverrides the default configuration options. - 19
- The maximum number of tasks that the connector may create. Tasks handle the data replication and run in parallel. If the infrastructure supports the processing overhead, increasing this value can improve throughput. Kafka Connect distributes the tasks between members of the cluster. If there are more tasks than workers, workers are assigned multiple tasks. For sink connectors, aim to have one task for each topic partition consumed. For source connectors, the number of tasks that can run in parallel may also depend on the external system. The connector creates fewer than the maximum number of tasks if it cannot achieve the parallelism.
- 20
- Enables automatic restarts of failed connectors and tasks. By default, the number of restarts is indefinite, but you can set a maximum on the number of automatic restarts using the
maxRestartsproperty. - 21
- Replication factor for mirrored topics created at the target cluster.
- 22
- Replication factor for the
MirrorSourceConnectoroffset-syncsinternal topic that maps the offsets of the source and target clusters. - 23
- When ACL rules synchronization is enabled, ACLs are applied to synchronized topics. The default is
true. This feature is not compatible with the User Operator. If you are using the User Operator, set this property tofalse. - 24
- Optional setting to change the frequency of checks for new topics. The default is for a check every 10 minutes.
- 25
- Adds a policy that overrides the automatic renaming of remote topics. Instead of prepending the name with the name of the source cluster, the topic retains its original name. This optional setting is useful for active/passive backups and data migration. The property must be specified for all connectors. For bidirectional (active/active) replication, use the
DefaultReplicationPolicyclass to automatically rename remote topics and specify thereplication.policy.separatorproperty for all connectors to add a custom separator. - 26
- Configuration for the
MirrorHeartbeatConnectorthat performs connectivity checks. Theconfigoverrides the default configuration options. - 27
- Replication factor for the heartbeat topic created at the target cluster.
- 28
- Configuration for the
MirrorCheckpointConnectorthat tracks offsets. Theconfigoverrides the default configuration options. - 29
- Replication factor for the checkpoints topic created at the target cluster.
- 30
- Optional setting to change the frequency of checks for new consumer groups. The default is for a check every 10 minutes.
- 31
- Optional setting to synchronize consumer group offsets, which is useful for recovery in an active/passive configuration. Synchronization is not enabled by default.
- 32
- If the synchronization of consumer group offsets is enabled, you can adjust the frequency of the synchronization.
- 33
- Adjusts the frequency of checks for offset tracking. If you change the frequency of offset synchronization, you might also need to adjust the frequency of these checks.
- 34
- The Kafka Connect and MirrorMaker 2 version, which will always be the same.
- 35
- Requests for reservation of supported resources, currently
cpuandmemory, and limits to specify the maximum resources that can be consumed. - 36
- Specified Kafka Connect loggers and log levels added directly (
inline) or indirectly (external) through a ConfigMap. A custom Log4j configuration must be placed under thelog4j.propertiesorlog4j2.propertieskey in the ConfigMap. For the Kafka Connectlog4j.rootLoggerlogger, you can set the log level to INFO, ERROR, WARN, TRACE, DEBUG, FATAL or OFF. - 37
- Healthchecks to know when to restart a container (liveness) and when a container can accept traffic (readiness).
- 38
- JVM configuration options to optimize performance for the Virtual Machine (VM) running Kafka MirrorMaker.
- 39
- ADVANCED OPTION: Container image configuration, which is recommended only in special situations.
- 40
- SPECIALIZED OPTION: Rack awareness configuration for the deployment. This is a specialized option intended for a deployment within the same location, not across regions. Use this option if you want connectors to consume from the closest replica rather than the leader replica. In certain cases, consuming from the closest replica can improve network utilization or reduce costs . The
topologyKeymust match a node label containing the rack ID. The example used in this configuration specifies a zone using the standardtopology.kubernetes.io/zonelabel. To consume from the closest replica, enable theRackAwareReplicaSelectorin the Kafka broker configuration. - 41
- Template customization. Here a pod is scheduled with anti-affinity, so the pod is not scheduled on nodes with the same hostname.
- 42
- Environment variables are set for distributed tracing.
- 43
- Distributed tracing is enabled by using OpenTelemetry.
9.9.1. Configuring active/active or active/passive modes 复制链接链接已复制到粘贴板!
You can use MirrorMaker 2 in active/passive or active/active cluster configurations.
- active/active cluster configuration
- An active/active configuration has two active clusters replicating data bidirectionally. Applications can use either cluster. Each cluster can provide the same data. In this way, you can make the same data available in different geographical locations. As consumer groups are active in both clusters, consumer offsets for replicated topics are not synchronized back to the source cluster.
- active/passive cluster configuration
- An active/passive configuration has an active cluster replicating data to a passive cluster. The passive cluster remains on standby. You might use the passive cluster for data recovery in the event of system failure.
The expectation is that producers and consumers connect to active clusters only. A MirrorMaker 2 cluster is required at each target destination.
9.9.1.1. Bidirectional replication (active/active) 复制链接链接已复制到粘贴板!
The MirrorMaker 2 architecture supports bidirectional replication in an active/active cluster configuration.
Each cluster replicates the data of the other cluster using the concept of source and remote topics. As the same topics are stored in each cluster, remote topics are automatically renamed by MirrorMaker 2 to represent the source cluster. The name of the originating cluster is prepended to the name of the topic.
图 9.1. Topic renaming
By flagging the originating cluster, topics are not replicated back to that cluster.
The concept of replication through remote topics is useful when configuring an architecture that requires data aggregation. Consumers can subscribe to source and remote topics within the same cluster, without the need for a separate aggregation cluster.
9.9.1.2. Unidirectional replication (active/passive) 复制链接链接已复制到粘贴板!
The MirrorMaker 2 architecture supports unidirectional replication in an active/passive cluster configuration.
You can use an active/passive cluster configuration to make backups or migrate data to another cluster. In this situation, you might not want automatic renaming of remote topics.
You can override automatic renaming by adding IdentityReplicationPolicy to the source connector configuration. With this configuration applied, topics retain their original names.
9.9.2. Configuring MirrorMaker 2 for multiple instances 复制链接链接已复制到粘贴板!
By default, Streams for Apache Kafka configures the group ID and names of the internal topics used by the Kafka Connect framework that MirrorMaker 2 runs on. When running multiple instances of MirrorMaker 2, and they share the same connectCluster value, you must change these default settings using the following config properties:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
connectCluster: "my-cluster-target"
clusters:
- alias: "my-cluster-target"
config:
group.id: my-connect-cluster
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
# ...
# ...
Values for the three topics must be the same for all instances with the same group.id.
The connectCluster setting specifies the alias of the target Kafka cluster used by Kafka Connect for its internal topics. As a result, modifications to the connectCluster, group ID, and internal topic naming configuration are specific to the target Kafka cluster. You don’t need to make changes if two MirrorMaker 2 instances are using the same source Kafka cluster or in an active-active mode where each MirrorMaker 2 instance has a different connectCluster setting and target cluster.
However, if multiple MirrorMaker 2 instances share the same connectCluster, each instance connecting to the same target Kafka cluster is deployed with the same values. In practice, this means all instances form a cluster and use the same internal topics.
Multiple instances attempting to use the same internal topics will cause unexpected errors, so you must change the values of these properties for each instance.
9.9.3. Configuring MirrorMaker 2 connectors 复制链接链接已复制到粘贴板!
Use MirrorMaker 2 connector configuration for the internal connectors that orchestrate the synchronization of data between Kafka clusters.
MirrorMaker 2 consists of the following connectors:
MirrorSourceConnector-
The source connector replicates topics from a source cluster to a target cluster. It also replicates ACLs and is necessary for the
MirrorCheckpointConnectorto run. MirrorCheckpointConnector- The checkpoint connector periodically tracks offsets. If enabled, it also synchronizes consumer group offsets between the source and target cluster.
MirrorHeartbeatConnector- The heartbeat connector periodically checks connectivity between the source and target cluster.
The following table describes connector properties and the connectors you configure to use them.
| Property | sourceConnector | checkpointConnector | heartbeatConnector |
|---|---|---|---|
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ |
MirrorMaker 2 tracks offsets for consumer groups using internal topics.
offset-syncstopic-
The
offset-syncstopic maps the source and target offsets for replicated topic partitions from record metadata. checkpointstopic-
The
checkpointstopic maps the last committed offset in the source and target cluster for replicated topic partitions in each consumer group.
As they are used internally by MirrorMaker 2, you do not interact directly with these topics.
MirrorCheckpointConnector emits checkpoints for offset tracking. Offsets for the checkpoints topic are tracked at predetermined intervals through configuration. Both topics enable replication to be fully restored from the correct offset position on failover.
The location of the offset-syncs topic is the source cluster by default. You can use the offset-syncs.topic.location connector configuration to change this to the target cluster. You need read/write access to the cluster that contains the topic. Using the target cluster as the location of the offset-syncs topic allows you to use MirrorMaker 2 even if you have only read access to the source cluster.
9.9.3.2. Synchronizing consumer group offsets 复制链接链接已复制到粘贴板!
The __consumer_offsets topic stores information on committed offsets for each consumer group. Offset synchronization periodically transfers the consumer offsets for the consumer groups of a source cluster into the consumer offsets topic of a target cluster.
Offset synchronization is particularly useful in an active/passive configuration. If the active cluster goes down, consumer applications can switch to the passive (standby) cluster and pick up from the last transferred offset position.
To use topic offset synchronization, enable the synchronization by adding sync.group.offsets.enabled to the checkpoint connector configuration, and setting the property to true. Synchronization is disabled by default.
When using the IdentityReplicationPolicy in the source connector, it also has to be configured in the checkpoint connector configuration. This ensures that the mirrored consumer offsets will be applied for the correct topics.
Consumer offsets are only synchronized for consumer groups that are not active in the target cluster. If the consumer groups are in the target cluster, the synchronization cannot be performed and an UNKNOWN_MEMBER_ID error is returned.
If enabled, the synchronization of offsets from the source cluster is made periodically. You can change the frequency by adding sync.group.offsets.interval.seconds and emit.checkpoints.interval.seconds to the checkpoint connector configuration. The properties specify the frequency in seconds that the consumer group offsets are synchronized, and the frequency of checkpoints emitted for offset tracking. The default for both properties is 60 seconds. You can also change the frequency of checks for new consumer groups using the refresh.groups.interval.seconds property, which is performed every 10 minutes by default.
Because the synchronization is time-based, any switchover by consumers to a passive cluster will likely result in some duplication of messages.
If you have an application written in Java, you can use the RemoteClusterUtils.java utility to synchronize offsets through the application. The utility fetches remote offsets for a consumer group from the checkpoints topic.
9.9.3.3. Deciding when to use the heartbeat connector 复制链接链接已复制到粘贴板!
The heartbeat connector emits heartbeats to check connectivity between source and target Kafka clusters. An internal heartbeat topic is replicated from the source cluster, which means that the heartbeat connector must be connected to the source cluster. The heartbeat topic is located on the target cluster, which allows it to do the following:
- Identify all source clusters it is mirroring data from
- Verify the liveness and latency of the mirroring process
This helps to make sure that the process is not stuck or has stopped for any reason. While the heartbeat connector can be a valuable tool for monitoring the mirroring processes between Kafka clusters, it’s not always necessary to use it. For example, if your deployment has low network latency or a small number of topics, you might prefer to monitor the mirroring process using log messages or other monitoring tools. If you decide not to use the heartbeat connector, simply omit it from your MirrorMaker 2 configuration.
To ensure that MirrorMaker 2 connectors work properly, make sure to align certain configuration settings across connectors. Specifically, ensure that the following properties have the same value across all applicable connectors:
-
replication.policy.class -
replication.policy.separator -
offset-syncs.topic.location -
topic.filter.class
For example, the value for replication.policy.class must be the same for the source, checkpoint, and heartbeat connectors. Mismatched or missing settings cause issues with data replication or offset syncing, so it’s essential to keep all relevant connectors configured with the same settings.
MirrorMaker 2 connectors use internal producers and consumers. If needed, you can configure these producers and consumers to override the default settings.
For example, you can increase the batch.size for the source producer that sends topics to the target Kafka cluster to better accommodate large volumes of messages.
Producer and consumer configuration options depend on the MirrorMaker 2 implementation, and may be subject to change.
The following tables describe the producers and consumers for each of the connectors and where you can add configuration.
| Type | Description | Configuration |
|---|---|---|
| Producer | Sends topic messages to the target Kafka cluster. Consider tuning the configuration of this producer when it is handling large volumes of data. |
|
| Producer |
Writes to the |
|
| Consumer | Retrieves topic messages from the source Kafka cluster. |
|
| Type | Description | Configuration |
|---|---|---|
| Producer | Emits consumer offset checkpoints. |
|
| Consumer |
Loads the |
|
You can set offset-syncs.topic.location to target to use the target Kafka cluster as the location of the offset-syncs topic.
| Type | Description | Configuration |
|---|---|---|
| Producer | Emits heartbeats. |
|
The following example shows how you configure the producers and consumers.
Example configuration for connector producers and consumers
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
version: 3.8.0
# ...
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector:
tasksMax: 5
config:
producer.override.batch.size: 327680
producer.override.linger.ms: 100
producer.request.timeout.ms: 30000
consumer.fetch.max.bytes: 52428800
# ...
checkpointConnector:
config:
producer.override.request.timeout.ms: 30000
consumer.max.poll.interval.ms: 300000
# ...
heartbeatConnector:
config:
producer.override.request.timeout.ms: 30000
# ...
Connectors create the tasks that are responsible for moving data in and out of Kafka. Each connector comprises one or more tasks that are distributed across a group of worker pods that run the tasks. Increasing the number of tasks can help with performance issues when replicating a large number of partitions or synchronizing the offsets of a large number of consumer groups.
Tasks run in parallel. Workers are assigned one or more tasks. A single task is handled by one worker pod, so you don’t need more worker pods than tasks. If there are more tasks than workers, workers handle multiple tasks.
You can specify the maximum number of connector tasks in your MirrorMaker configuration using the tasksMax property. Without specifying a maximum number of tasks, the default setting is a single task.
The heartbeat connector always uses a single task.
The number of tasks that are started for the source and checkpoint connectors is the lower value between the maximum number of possible tasks and the value for tasksMax. For the source connector, the maximum number of tasks possible is one for each partition being replicated from the source cluster. For the checkpoint connector, the maximum number of tasks possible is one for each consumer group being replicated from the source cluster. When setting a maximum number of tasks, consider the number of partitions and the hardware resources that support the process.
If the infrastructure supports the processing overhead, increasing the number of tasks can improve throughput and latency. For example, adding more tasks reduces the time taken to poll the source cluster when there is a high number of partitions or consumer groups.
Increasing the number of tasks for the source connector is useful when you have a large number of partitions.
Increasing the number of tasks for the source connector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
# ...
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector:
tasksMax: 10
# ...
Increasing the number of tasks for the checkpoint connector is useful when you have a large number of consumer groups.
Increasing the number of tasks for the checkpoint connector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
# ...
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
checkpointConnector:
tasksMax: 10
# ...
By default, MirrorMaker 2 checks for new consumer groups every 10 minutes. You can adjust the refresh.groups.interval.seconds configuration to change the frequency. Take care when adjusting lower. More frequent checks can have a negative impact on performance.
9.9.5.1. Checking connector task operations 复制链接链接已复制到粘贴板!
If you are using Prometheus and Grafana to monitor your deployment, you can check MirrorMaker 2 performance. The example MirrorMaker 2 Grafana dashboard provided with Streams for Apache Kafka shows the following metrics related to tasks and latency.
- The number of tasks
- Replication latency
- Offset synchronization latency
9.9.6. Synchronizing ACL rules for remote topics 复制链接链接已复制到粘贴板!
When using MirrorMaker 2 with Streams for Apache Kafka, it is possible to synchronize ACL rules for remote topics. However, this feature is only available if you are not using the User Operator.
If you are using type: simple authorization without the User Operator, the ACL rules that manage access to brokers also apply to remote topics. This means that users who have read access to a source topic can also read its remote equivalent.
OAuth 2.0 authorization does not support access to remote topics in this way.
9.9.7. Securing a Kafka MirrorMaker 2 deployment 复制链接链接已复制到粘贴板!
This procedure describes in outline the configuration required to secure a MirrorMaker 2 deployment.
You need separate configuration for the source Kafka cluster and the target Kafka cluster. You also need separate user configuration to provide the credentials required for MirrorMaker to connect to the source and target Kafka clusters.
For the Kafka clusters, you specify internal listeners for secure connections within an OpenShift cluster and external listeners for connections outside the OpenShift cluster.
You can configure authentication and authorization mechanisms. The security options implemented for the source and target Kafka clusters must be compatible with the security options implemented for MirrorMaker 2.
After you have created the cluster and user authentication credentials, you specify them in your MirrorMaker configuration for secure connections.
In this procedure, the certificates generated by the Cluster Operator are used, but you can replace them by installing your own certificates. You can also configure your listener to use a Kafka listener certificate managed by an external CA (certificate authority).
Before you start
Before starting this procedure, take a look at the example configuration files provided by Streams for Apache Kafka. They include examples for securing a deployment of MirrorMaker 2 using mTLS or SCRAM-SHA-512 authentication. The examples specify internal listeners for connecting within an OpenShift cluster.
The examples also provide the configuration for full authorization, including the ACLs that allow user operations on the source and target Kafka clusters.
When configuring user access to source and target Kafka clusters, ACLs must grant access rights to internal MirrorMaker 2 connectors and read/write access to the cluster group and internal topics used by the underlying Kafka Connect framework in the target cluster. If you’ve renamed the cluster group or internal topics, such as when configuring MirrorMaker 2 for multiple instances, use those names in the ACLs configuration.
Simple authorization uses ACL rules managed by the Kafka AclAuthorizer and StandardAuthorizer plugins to ensure appropriate access levels. For more information on configuring a KafkaUser resource to use simple authorization, see the AclRule schema reference.
Prerequisites
- Streams for Apache Kafka is running
- Separate namespaces for source and target clusters
The procedure assumes that the source and target Kafka clusters are installed to separate namespaces. If you want to use the Topic Operator, you’ll need to do this. The Topic Operator only watches a single cluster in a specified namespace.
By separating the clusters into namespaces, you will need to copy the cluster secrets so they can be accessed outside the namespace. You need to reference the secrets in the MirrorMaker configuration.
Procedure
Configure two
Kafkaresources, one to secure the source Kafka cluster and one to secure the target Kafka cluster.You can add listener configuration for authentication and enable authorization.
In this example, an internal listener is configured for a Kafka cluster with TLS encryption and mTLS authentication. Kafka
simpleauthorization is enabled.Example source Kafka cluster configuration with TLS encryption and mTLS authentication
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-source-cluster spec: kafka: version: 3.8.0 replicas: 1 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls authorization: type: simple config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.8" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}Example target Kafka cluster configuration with TLS encryption and mTLS authentication
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-target-cluster spec: kafka: version: 3.8.0 replicas: 1 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls authorization: type: simple config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.8" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}Create or update the
Kafkaresources in separate namespaces.oc apply -f <kafka_configuration_file> -n <namespace>The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.
The certificates are created in the secret
<cluster_name>-cluster-ca-cert.Configure two
KafkaUserresources, one for a user of the source Kafka cluster and one for a user of the target Kafka cluster.-
Configure the same authentication and authorization types as the corresponding source and target Kafka cluster. For example, if you used
tlsauthentication and thesimpleauthorization type in theKafkaconfiguration for the source Kafka cluster, use the same in theKafkaUserconfiguration. - Configure the ACLs needed by MirrorMaker 2 to allow operations on the source and target Kafka clusters.
Example source user configuration for mTLS authentication
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-source-user labels: strimzi.io/cluster: my-source-cluster spec: authentication: type: tls authorization: type: simple acls: # MirrorSourceConnector - resource: # Not needed if offset-syncs.topic.location=target type: topic name: mm2-offset-syncs.my-target-cluster.internal operations: - Create - DescribeConfigs - Read - Write - resource: # Needed for every topic which is mirrored type: topic name: "*" operations: - DescribeConfigs - Read # MirrorCheckpointConnector - resource: type: cluster operations: - Describe - resource: # Needed for every group for which offsets are synced type: group name: "*" operations: - Describe - resource: # Not needed if offset-syncs.topic.location=target type: topic name: mm2-offset-syncs.my-target-cluster.internal operations: - ReadExample target user configuration for mTLS authentication
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-target-user labels: strimzi.io/cluster: my-target-cluster spec: authentication: type: tls authorization: type: simple acls: # cluster group - resource: type: group name: mirrormaker2-cluster operations: - Read # access to config.storage.topic - resource: type: topic name: mirrormaker2-cluster-configs operations: - Create - Describe - DescribeConfigs - Read - Write # access to status.storage.topic - resource: type: topic name: mirrormaker2-cluster-status operations: - Create - Describe - DescribeConfigs - Read - Write # access to offset.storage.topic - resource: type: topic name: mirrormaker2-cluster-offsets operations: - Create - Describe - DescribeConfigs - Read - Write # MirrorSourceConnector - resource: # Needed for every topic which is mirrored type: topic name: "*" operations: - Create - Alter - AlterConfigs - Write # MirrorCheckpointConnector - resource: type: cluster operations: - Describe - resource: type: topic name: my-source-cluster.checkpoints.internal operations: - Create - Describe - Read - Write - resource: # Needed for every group for which the offset is synced type: group name: "*" operations: - Read - Describe # MirrorHeartbeatConnector - resource: type: topic name: heartbeats operations: - Create - Describe - Write注意You can use a certificate issued outside the User Operator by setting
typetotls-external. For more information, see theKafkaUserSpecschema reference.-
Configure the same authentication and authorization types as the corresponding source and target Kafka cluster. For example, if you used
Create or update a
KafkaUserresource in each of the namespaces you created for the source and target Kafka clusters.oc apply -f <kafka_user_configuration_file> -n <namespace>The User Operator creates the users representing the client (MirrorMaker), and the security credentials used for client authentication, based on the chosen authentication type.
The User Operator creates a new secret with the same name as the
KafkaUserresource. The secret contains a private and public key for mTLS authentication. The public key is contained in a user certificate, which is signed by the clients CA.Configure a
KafkaMirrorMaker2resource with the authentication details to connect to the source and target Kafka clusters.Example MirrorMaker 2 configuration with TLS encryption and mTLS authentication
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker-2 spec: version: 3.8.0 replicas: 1 connectCluster: "my-target-cluster" clusters: - alias: "my-source-cluster" bootstrapServers: my-source-cluster-kafka-bootstrap:9093 tls:1 trustedCertificates: - secretName: my-source-cluster-cluster-ca-cert pattern: "*.crt" authentication:2 type: tls certificateAndKey: secretName: my-source-user certificate: user.crt key: user.key - alias: "my-target-cluster" bootstrapServers: my-target-cluster-kafka-bootstrap:9093 tls:3 trustedCertificates: - secretName: my-target-cluster-cluster-ca-cert pattern: "*.crt" authentication:4 type: tls certificateAndKey: secretName: my-target-user certificate: user.crt key: user.key config: # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 mirrors: - sourceCluster: "my-source-cluster" targetCluster: "my-target-cluster" sourceConnector: config: replication.factor: 1 offset-syncs.topic.replication.factor: 1 sync.topic.acls.enabled: "false" heartbeatConnector: config: heartbeats.topic.replication.factor: 1 checkpointConnector: config: checkpoints.topic.replication.factor: 1 sync.group.offsets.enabled: "true" topicsPattern: "topic1|topic2|topic3" groupsPattern: "group1|group2|group3"- 1
- The TLS certificates for the source Kafka cluster. If they are in a separate namespace, copy the cluster secrets from the namespace of the Kafka cluster.
- 2
- The user authentication for accessing the source Kafka cluster using the TLS mechanism.
- 3
- The TLS certificates for the target Kafka cluster.
- 4
- The user authentication for accessing the target Kafka cluster.
Create or update the
KafkaMirrorMaker2resource in the same namespace as the target Kafka cluster.oc apply -f <mirrormaker2_configuration_file> -n <namespace_of_target_cluster>
If you are using KafkaMirrorMaker2 resources to configure internal MirrorMaker connectors, use the state configuration to either stop or pause a connector. In contrast to the paused state, where the connector and tasks remain instantiated, stopping a connector retains only the configuration, with no active processes. Stopping a connector from running may be more suitable for longer durations than just pausing. While a paused connector is quicker to resume, a stopped connector has the advantages of freeing up memory and resources.
The state configuration replaces the (deprecated) pause configuration in the KafkaMirrorMaker2ConnectorSpec schema, which allows pauses on connectors. If you were previously using the pause configuration to pause connectors, we encourage you to transition to using the state configuration only to avoid conflicts.
Prerequisites
- The Cluster Operator is running.
Procedure
Find the name of the
KafkaMirrorMaker2custom resource that controls the MirrorMaker 2 connector you want to pause or stop:oc get KafkaMirrorMaker2Edit the
KafkaMirrorMaker2resource to stop or pause the connector.Example configuration for stopping a MirrorMaker 2 connector
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 3.8.0 replicas: 3 connectCluster: "my-cluster-target" clusters: # ... mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: tasksMax: 10 autoRestart: enabled: true state: stopped # ...Change the
stateconfiguration tostoppedorpaused. The default state for the connector when this property is not set isrunning.Apply the changes to the
KafkaMirrorMaker2configuration.You can resume the connector by changing
statetorunningor removing the configuration.
Alternatively, you can expose the Kafka Connect API and use the stop and pause endpoints to stop a connector from running. For example, PUT /connectors/<connector_name>/stop. You can then use the resume endpoint to restart it.
9.9.9. Manually restarting MirrorMaker 2 connectors 复制链接链接已复制到粘贴板!
Use the strimzi.io/restart-connector annotation to manually trigger a restart of a MirrorMaker 2 connector.
Prerequisites
- The Cluster Operator is running.
Procedure
Find the name of the
KafkaMirrorMaker2custom resource that controls the Kafka MirrorMaker 2 connector you want to restart:oc get KafkaMirrorMaker2Find the name of the Kafka MirrorMaker 2 connector to be restarted from the
KafkaMirrorMaker2custom resource:oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>Use the name of the connector to restart the connector by annotating the
KafkaMirrorMaker2resource in OpenShift:oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector=<mirrormaker_connector_name>"In this example, connector
my-connectorin themy-mirror-maker-2cluster is restarted:oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector=my-connector"Wait for the next reconciliation to occur (every two minutes by default).
The MirrorMaker 2 connector is restarted, as long as the annotation was detected by the reconciliation process. When MirrorMaker 2 accepts the request, the annotation is removed from the
KafkaMirrorMaker2custom resource.
Use the strimzi.io/restart-connector-task annotation to manually trigger a restart of a MirrorMaker 2 connector.
Prerequisites
- The Cluster Operator is running.
Procedure
Find the name of the
KafkaMirrorMaker2custom resource that controls the MirrorMaker 2 connector task you want to restart:oc get KafkaMirrorMaker2Find the name of the connector and the ID of the task to be restarted from the
KafkaMirrorMaker2custom resource:oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>Task IDs are non-negative integers, starting from 0.
Use the name and ID to restart the connector task by annotating the
KafkaMirrorMaker2resource in OpenShift:oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector-task=<mirrormaker_connector_name>:<task_id>"In this example, task
0for connectormy-connectorin themy-mirror-maker-2cluster is restarted:oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector-task=my-connector:0"Wait for the next reconciliation to occur (every two minutes by default).
The MirrorMaker 2 connector task is restarted, as long as the annotation was detected by the reconciliation process. When MirrorMaker 2 accepts the request, the annotation is removed from the
KafkaMirrorMaker2custom resource.