此内容没有您所选择的语言版本。

Chapter 10. Configuring a deployment


Configure and manage a Streams for Apache Kafka deployment to your precise needs using Streams for Apache Kafka custom resources. Streams for Apache Kafka provides example custom resources with each release, allowing you to configure and create instances of supported Kafka components.

Use custom resources to configure and create instances of the following components:

  • Kafka clusters
  • Kafka Connect clusters
  • Kafka MirrorMaker
  • Kafka Bridge
  • Cruise Control

You can use configuration to manage your instances or modify your deployment to introduce additional features. New features are sometimes introduced through feature gates, which are controlled through operator configuration.

The Streams for Apache Kafka Custom Resource API Reference describes the properties you can use in your configuration.

Centralizing configuration

For key configuration areas, such as metrics, logging, and external Kafka Connect connector settings, you can centralize management as follows:

We recommend configuration providers for securely supplying Kafka Connect connector credentials.

TLS certificate management

When deploying Kafka, the Cluster Operator automatically sets up and renews TLS certificates to enable encryption and authentication within your cluster. If required, you can manually renew the cluster and clients CA certificates before their renewal period starts. You can also replace the keys used by the cluster and clients CA certificates.

For more information, see Renewing CA certificates manually and Replacing private keys.

Note

Labels applied to a custom resource are also applied to the OpenShift resources making up its cluster. This provides a convenient mechanism for resources to be labeled as required.

10.1. Using example configuration files

Further enhance your deployment by incorporating additional supported configuration. Example configuration files are included in the Streams for Apache Kafka deployment files.

The example files include only the essential properties and values for custom resources by default. You can download and apply the examples using the oc command-line tool. The examples can serve as a starting point when building your own Kafka component configuration for deployment.

Note

If you installed Streams for Apache Kafka using the Operator, you can still download the example files and use them to upload configuration.

The release artifacts include an examples directory that contains the configuration examples.

Example configuration files provided with Streams for Apache Kafka

examples
├── user 
1

├── topic 
2

├── security 
3

│   ├── tls-auth
│   ├── scram-sha-512-auth
│   └── keycloak-authorization
├── mirror-maker 
4

├── metrics 
5

├── kafka 
6

├── cruise-control 
7

├── connect 
8

└── bridge 
9

1
KafkaUser custom resource configuration, which is managed by the User Operator.
2
KafkaTopic custom resource configuration, which is managed by Topic Operator.
3
Authentication and authorization configuration for Kafka components. Includes example configuration for TLS and SCRAM-SHA-512 authentication. The Red Hat build of Keycloak example includes Kafka custom resource configuration and a Red Hat build of Keycloak realm specification. You can use the example to try Red Hat build of Keycloak authorization services. There is also an example with enabled oauth authentication and keycloak authorization metrics.
4
KafkaMirrorMaker2 custom resource configurations for a deployment of MirrorMaker 2. Includes example configuration for replication policy and synchronization frequency.
5
Metrics configuration, including Prometheus installation and Grafana dashboard files.
6
Kafka and KafkaNodePool custom resource configurations for a deployment of Kafka clusters that use KRaft mode. Includes example configuration for an ephemeral or persistent single or multi-node deployment.
7
Kafka and KafkaRebalance configurations for deploying and using Cruise Control to manage clusters. Kafka configuration examples enable auto-rebalancing on scaling events and set default optimization goals. KakaRebalance configuration examples set proposal-specific optimization goals and generate optimization proposals in various supported modes.
8
KafkaConnect and KafkaConnector custom resource configuration for a deployment of Kafka Connect. Includes example configurations for a single or multi-node deployment.
9
KafkaBridge custom resource configuration for a deployment of Kafka Bridge.

10.2. Configuring Kafka

Configure your Kafka deployment by updating the spec properties of the Kafka custom resource.

You can also configure Streams for Apache Kafka components and features that support the deployment:

  • Topic Operator and User Operator for managing topics and clients
  • Cruise Control for cluster rebalancing
  • Kafka Exporter for lag metrics
  • Listeners for authenticated client access
  • Data storage
  • Rack awareness

Specify the metadata version for KRaft using .spec.kafka.metadataVersion. It must be compatible with .spec.kafka.version. If not set, the Cluster Operator applies the default for the Kafka version.

Kafka clusters use node pools. Use KafkaNodePool resources to configure distinct groups of nodes within a Kafka cluster.

For a deeper understanding of the Kafka cluster configuration options, refer to the Streams for Apache Kafka Custom Resource API Reference.

Note

The oldest supported metadata version is 3.3. Older versions may disable certain features.

Example Kafka custom resource configuration

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
# Deployment specifications
spec:
  kafka:
    # Listener configuration (required)
    listeners: 
1

      - name: plain 
2

        port: 9092 
3

        type: internal 
4

        tls: false 
5

        configuration:
          useServiceDnsDomain: true 
6

      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication: 
7

          type: tls
      - name: external1 
8

        port: 9094
        type: route
        tls: true
        configuration:
          brokerCertChainAndKey: 
9

            secretName: my-secret
            certificate: my-certificate.crt
            key: my-key.key
    # Kafka version (recommended)
    version: 4.1.0 
10

    # KRaft metadata version (recommended)
    metadataVersion: 4.1 
11

    # Kafka configuration (recommended)
    config: 
12

      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
    # Resources requests and limits (recommended)
    resources: 
13

      requests:
        memory: 64Gi
        cpu: "8"
      limits:
        memory: 64Gi
        cpu: "12"
    # Logging configuration (optional)
    logging: 
14

      type: inline
      loggers:
        # Kafka 4.0+ uses Log4j2
        rootLogger.level: INFO
    # Readiness probe (optional)
    readinessProbe: 
15

      initialDelaySeconds: 15
      timeoutSeconds: 5
    # Liveness probe (optional)
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    # JVM options (optional)
    jvmOptions: 
16

      -Xms: 8192m
      -Xmx: 8192m
    # Custom image (optional)
    image: my-org/my-image:latest 
17

    # Authorization (optional)
    authorization: 
18

      type: simple
    # Rack awareness (optional)
    rack: 
19

      topologyKey: topology.kubernetes.io/zone
    # Metrics configuration (optional)
    metricsConfig: 
20

      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef: 
21

          name: my-config-map
          key: my-key
  # Entity Operator (recommended)
  entityOperator: 
22

    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalMs: 60000
      # Resources requests and limits (recommended)
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
      # Logging configuration (optional)
      logging: 
23

        type: inline
        loggers:
          rootLogger.level: INFO
    userOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalMs: 60000
      # Resources requests and limits (recommended)
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
      # Logging configuration (optional)
      logging: 
24

        type: inline
        loggers:
          rootLogger.level: INFO
  # Kafka Exporter (optional)
  kafkaExporter: 
25

    # ...
  # Cruise Control (optional)
  cruiseControl: 
26

    # ...

1
Listeners configure how clients connect to the Kafka cluster via bootstrap addresses. Listeners are configured as internal or external listeners for connection from inside or outside the OpenShift cluster.
2
Name to identify the listener. Must be unique within the Kafka cluster.
3
Port number used by the listener inside Kafka. The port number has to be unique within a given Kafka cluster. Allowed port numbers are 9092 and higher with the exception of ports 9404 and 9999, which are already used for Prometheus and JMX. Depending on the listener type, the port number might not be the same as the port number that connects Kafka clients.
4
Listener type specified as internal or cluster-ip (to expose Kafka using per-broker ClusterIP services), or for external listeners, as route (OpenShift only), loadbalancer, nodeport or ingress (Kubernetes only).
5
Enables or disables TLS encryption for each listener. For route and ingress type listeners, TLS encryption must always be enabled by setting it to true.
6
Defines whether the fully-qualified DNS names including the cluster service suffix (usually .cluster.local) are assigned.
7
Listener authentication mechanism specified as mTLS, SCRAM-SHA-512, or token-based OAuth 2.0.
8
External listener configuration specifies how the Kafka cluster is exposed outside OpenShift, such as through a route, loadbalancer or nodeport.
9
Optional configuration for a Kafka listener certificate managed by an external CA (certificate authority). The brokerCertChainAndKey specifies a Secret that contains a server certificate and a private key. You can configure Kafka listener certificates on any listener with enabled TLS encryption.
10
Kafka version, which can be changed to a supported version by following the upgrade procedure.
11
Kafka metadata version, which can be changed to a supported version by following the upgrade procedure.
12
Broker configuration. Standard Apache Kafka configuration may be provided, restricted to those properties not managed directly by Streams for Apache Kafka.
13
Requests for reservation of supported resources, currently cpu and memory, and limits to specify the maximum resources that can be consumed.
14
Kafka loggers and log levels added directly (inline) or indirectly (external) through a ConfigMap. Custom Log4j configuration must be placed under the log4j2.properties key in the ConfigMap. You can set log levels to INFO, ERROR, WARN, TRACE, DEBUG, FATAL or OFF.
15
Healthchecks to know when to restart a container (liveness) and when a container can accept traffic (readiness).
16
JVM configuration options to optimize performance for the Virtual Machine (VM) running Kafka.
17
ADVANCED OPTION: Container image configuration, which is recommended only in special situations.
18
Authorization enables simple, OAuth 2.0, custom, or OPA (deprecated) authorization on the Kafka broker. Simple authorization uses the StandardAuthorizer Kafka plugin.
19
Rack awareness configuration to spread replicas across different racks, data centers, or availability zones. The topologyKey must match a node label containing the rack ID. The example used in this configuration specifies a zone using the standard topology.kubernetes.io/zone label.
20
Prometheus metrics enabled. In this example, metrics are configured for the Prometheus JMX Exporter (the default metrics exporter).
21
Rules for exporting metrics in Prometheus format to a Grafana dashboard through the Prometheus JMX Exporter, which are enabled by referencing a ConfigMap containing configuration for the Prometheus JMX exporter. You can enable metrics without further configuration using a reference to a ConfigMap containing an empty file under metricsConfig.valueFrom.configMapKeyRef.key.
22
Entity Operator configuration, which specifies the configuration for the Topic Operator and User Operator.
23
Specified Topic Operator loggers and log levels. This example uses inline logging.
24
Specified User Operator loggers and log levels.
25
Kafka Exporter configuration. Kafka Exporter is an optional component for extracting metrics data from Kafka brokers, in particular consumer lag data. For Kafka Exporter to be able to work properly, consumer groups need to be in use.
26
Optional configuration for Cruise Control, which is used to rebalance the Kafka cluster.
Warning

If you remove the min.insync.replicas property from .spec.kafka.config in the Kafka resource, the Cluster Operator forces Kafka to fall back to the default value (1), regardless of whether ELR (Eligible Leader Replicas) is enabled or disabled. To ensure durability of the cluster, explicitly define min.insync.replicas with a value higher than 1.

10.2.1. Setting throughput and storage limits on brokers

This procedure describes how to set throughput and storage limits on brokers in your Kafka cluster. Enable a quota plugin and configure limits using quotas properties in the Kafka resource.

There are two types of quota plugins available:

  • The strimzi type enables the Strimzi Quotas plugin.
  • The kafka type enables the built-in Kafka plugin.

Only one quota plugin can be enabled at a time. The built-in kafka plugin is enabled by default. Enabling the strimzi plugin automatically disables the built-in plugin.

strimzi plugin

The strimzi plugin provides storage utilization quotas and dynamic distribution of throughput limits.

  • Storage quotas throttle Kafka producers based on disk storage utilization. Limits can be specified in bytes (minAvailableBytesPerVolume) or percentage (minAvailableRatioPerVolume) of available disk space, applying to each disk individually. When any broker in the cluster exceeds the configured disk threshold, clients are throttled to prevent disks from filling up too quickly and exceeding capacity.
  • A total throughput limit is distributed dynamically across all clients. For example, if you set a 40 MBps producer byte-rate threshold, the distribution across two producers is not static. If one producer is using 10 MBps, the other can use up to 30 MBps.
  • Specific users (clients) can be excluded from the restrictions.
Note

With the strimzi plugin, you see only aggregated quota metrics, not per-client metrics.

kafka plugin

The kafka plugin applies throughput limits on a per-user, per-broker basis and includes additional CPU and operation rate limits.

  • Limits are applied per user and per broker. For example, setting a 20 MBps producer byte-rate threshold limits each user to 20 MBps on a per-broker basis across all producer connections for that user. There is no total throughput limit as there is in the strimzi plugin. Limits can be overridden by user-specific quota configurations.
  • CPU utilization limits for each client can be set as a percentage of the network threads and I/O threads on a per-broker basis.
  • The number of concurrent partition creation and deletion operations (mutations) allowed per second can be set on a per-broker basis.

When using the default Kafka quotas plugin, the default quotas (if set) are applied to all users. This includes internal users such as the Topic Operator and Cruise Control, which may impact their operations. To avoid unduly limiting internal users, consider tuning the quotas effectively.

For example, a quota automatically applied to the Topic Operator by the Kafka quotas plugin could constrain the controller mutation rate, potentially throttling topic creation or deletion operations. Therefore, it is important to understand the minimal quotas required by the Topic Operator to function correctly and explicitly set appropriate quotas to avoid such issues. Monitoring relevant controller and broker metrics can help track and optimize the rate of operations on topics. Cruise Control and its metrics reporter also require sufficient produce and fetch rates to conduct rebalances, depending on the scale and configuration of the Kafka cluster. To prevent issues for Cruise Control, you might start with a rate of at least 1 KB/s for its producers and consumers in small clusters, such as three brokers with moderate traffic, and adjust as needed for larger or more active clusters.

Prerequisites

  • The Cluster Operator that manages the Kafka cluster is running.

Procedure

  1. Add the plugin configuration to the quotas section of the Kafka resource.

    Example strimzi plugin configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        quotas:
          type: strimzi
          producerByteRate: 1000000 
    1
    
          consumerByteRate: 1000000 
    2
    
          minAvailableBytesPerVolume: 500000000000 
    3
    
          excludedPrincipals: 
    4
    
            - my-user

    1
    Sets a producer byte-rate threshold of 1 MBps.
    2
    Sets a consumer byte-rate threshold of 1 MBps.
    3
    Sets an available bytes limit for storage of 500 GB.
    4
    Excludes my-user from the restrictions.

    minAvailableBytesPerVolume and minAvailableRatioPerVolume are mutually exclusive. Only configure one of these parameters.

    Example kafka plugin configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        quotas:
          type: kafka
          producerByteRate: 1000000
          consumerByteRate: 1000000
          requestPercentage: 55 
    1
    
          controllerMutationRate: 50 
    2

    1
    Sets the CPU utilization limit to 55%.
    2
    Sets the controller mutation rate to 50 operations per second.
  2. Apply the changes to the Kafka configuration.
Note

Additional options can be configured in the spec.kafka.config section. The full list of supported options can be found in the plugin documentation.

10.2.2. Deleting Kafka nodes using annotations

This procedure describes how to delete an existing Kafka node by using an OpenShift annotation. Deleting a Kafka node consists of deleting both the Pod on which the Kafka broker is running and the related PersistentVolumeClaim (if the cluster was deployed with persistent storage). After deletion, the Pod and its related PersistentVolumeClaim are recreated automatically.

Warning

Deleting a PersistentVolumeClaim can cause permanent data loss and the availability of your cluster cannot be guaranteed. The following procedure should only be performed if you have encountered storage issues.

Prerequisites

  • A running Cluster Operator

Procedure

  1. Find the name of the Pod that you want to delete.

    Kafka broker pods are named <cluster_name>-kafka-<index_number>, where <index_number> starts at zero and ends at the total number of replicas minus one. For example, my-cluster-kafka-0.

  2. Use oc annotate to annotate the Pod resource in OpenShift:

    oc annotate pod <cluster_name>-kafka-<index_number> strimzi.io/delete-pod-and-pvc="true"
  3. Wait for the next reconciliation, when the annotated pod with the underlying persistent volume claim will be deleted and then recreated.

10.3. Configuring node pools

Update the spec properties of the KafkaNodePool custom resource to configure a node pool deployment. A node pool is a distinct group of Kafka nodes within a Kafka cluster. The strimzi.io/cluster metadata label identifies the name of the Kafka custom resource the pool belongs to.

Node pool configuration must define:

  • Node roles within the cluster
  • Number of replica nodes
  • Storage specifications

The .spec.roles property defines whether the nodes in the pool act as controllers, brokers, or both.

Other optional properties may also be set in node pools:

  • resources to specify memory and cpu requests and limits
  • template to specify custom configuration for pods and other OpenShift resources
  • jvmOptions to specify custom JVM configuration for heap size, runtime and other options

The relationship between Kafka and KafkaNodePool resources is as follows:

  • Kafka resources represent the configuration for all nodes in a Kafka cluster.
  • KafkaNodePool resources represent the configuration for nodes only in the node pool.

If a configuration property is not specified in KafkaNodePool, it is inherited from the Kafka resource. Configuration specified in the KafkaNodePool resource takes precedence if set in both resources. For example, if both the node pool and Kafka configuration includes jvmOptions, the values specified in the node pool configuration are used. When -Xmx: 1024m is set in KafkaNodePool.spec.jvmOptions and -Xms: 512m is set in Kafka.spec.kafka.jvmOptions, the node uses the value from its node pool configuration.

Properties from Kafka and KafkaNodePool schemas are not combined. To clarify, if KafkaNodePool.spec.template includes only podSet.metadata.labels, and Kafka.spec.kafka.template includes podSet.metadata.annotations and pod.metadata.labels, the template values from the Kafka configuration are ignored since there is a template value in the node pool configuration.

For a deeper understanding of the node pool configuration options, refer to the Streams for Apache Kafka Custom Resource API Reference.

Example configuration for a node pool in a cluster using KRaft mode

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: kraft-dual-role 
1

  labels:
    strimzi.io/cluster: my-cluster 
2

# Node pool specifications
spec:
  # Replicas (required)
  replicas: 3 
3

  # Roles (required)
  roles: 
4

    - controller
    - broker
  # Storage configuration (required)
  storage: 
5

    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
  # Resources requests and limits (recommended)
  resources: 
6

    requests:
      memory: 64Gi
      cpu: "8"
    limits:
      memory: 64Gi
      cpu: "12"

1
Unique name for the node pool.
2
The Kafka cluster the node pool belongs to. A node pool can only belong to a single cluster.
3
Number of replicas for the nodes.
4
Roles for the nodes in the node pool. In this example, the nodes have dual roles as controllers and brokers.
5
Storage specification for the nodes.
6
Requests for reservation of supported resources, currently cpu and memory, and limits to specify the maximum resources that can be consumed.
Note

The configuration for the Kafka resource must be suitable for KRaft mode. Currently, KRaft mode has a number of limitations.

10.3.1. Assigning IDs to node pools for scaling operations

This procedure describes how to use annotations for advanced node ID handling by the Cluster Operator when performing scaling operations on node pools. You specify the node IDs to use, rather than the Cluster Operator using the next ID in sequence. Management of node IDs in this way gives greater control.

To add a range of IDs, you assign the following annotations to the KafkaNodePool resource:

  • strimzi.io/next-node-ids to add a range of IDs that are used for new brokers
  • strimzi.io/remove-node-ids to add a range of IDs for removing existing brokers

You can specify an array of individual node IDs, ID ranges, or a combination of both. For example, you can specify the following range of IDs: [0, 1, 2, 10-20, 30] for scaling up the Kafka node pool. This format allows you to specify a combination of individual node IDs (0, 1, 2, 30) as well as a range of IDs (10-20).

In a typical scenario, you might specify a range of IDs for scaling up and a single node ID to remove a specific node when scaling down.

In this procedure, we add the scaling annotations to node pools as follows:

  • pool-a is assigned a range of IDs for scaling up
  • pool-b is assigned a range of IDs for scaling down

During the scaling operation, IDs are used as follows:

  • Scale up picks up the lowest available ID in the range for the new node.
  • Scale down removes the node with the highest available ID in the range.

If there are gaps in the sequence of node IDs assigned in the node pool, the next node to be added is assigned an ID that fills the gap.

The annotations don’t need to be updated after every scaling operation. Any unused IDs are still valid for the next scaling event.

The Cluster Operator allows you to specify a range of IDs in either ascending or descending order, so you can define them in the order the nodes are scaled. For example, when scaling up, you can specify a range such as [1000-1999], and the new nodes are assigned the next lowest IDs: 1000, 1001, 1002, 1003, and so on. Conversely, when scaling down, you can specify a range like [1999-1000], ensuring that nodes with the next highest IDs are removed: 1003, 1002, 1001, 1000, and so on.

If you don’t specify an ID range using the annotations, the Cluster Operator follows its default behavior for handling IDs during scaling operations. Node IDs start at 0 (zero) and run sequentially across the Kafka cluster. The next lowest ID is assigned to a new node. Gaps to node IDs are filled across the cluster. This means that they might not run sequentially within a node pool. The default behavior for scaling up is to add the next lowest available node ID across the cluster; and for scaling down, it is to remove the node in the node pool with the highest available node ID. The default approach is also applied if the assigned range of IDs is misformatted, the scaling up range runs out of IDs, or the scaling down range does not apply to any in-use nodes.

Prerequisites

By default, Apache Kafka restricts node IDs to numbers ranging from 0 to 999. To use node ID values greater than 999, add the reserved.broker-max.id configuration property to the Kafka custom resource and specify the required maximum node ID value.

In this example, the maximum node ID is set at 10000. Node IDs can then be assigned up to that value.

Example configuration for the maximum node ID number

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    config:
      reserved.broker.max.id: 10000
  # ...

Procedure

  1. Annotate the node pool with the IDs to use when scaling up or scaling down, as shown in the following examples.

    IDs for scaling up are assigned to node pool pool-a:

    Assigning IDs for scaling up

    oc annotate kafkanodepool pool-a strimzi.io/next-node-ids="[0,1,2,10-20,30]"

    The lowest available ID from this range is used when adding a node to pool-a.

    IDs for scaling down are assigned to node pool pool-b:

    Assigning IDs for scaling down

    oc annotate kafkanodepool pool-b strimzi.io/remove-node-ids="[60-50,9,8,7]"

    The highest available ID from this range is removed when scaling down pool-b.

    Note

    If you want to remove a specific node, you can assign a single node ID to the scaling down annotation: oc annotate kafkanodepool pool-b strimzi.io/remove-node-ids="[3]".

  2. You can now scale the node pool.

    For more information, see the following:

    On reconciliation, a warning is given if the annotations are misformatted.

  3. After you have performed the scaling operation, you can remove the annotation if it’s no longer needed.

    Removing the annotation for scaling up

    oc annotate kafkanodepool pool-a strimzi.io/next-node-ids-

    Removing the annotation for scaling down

    oc annotate kafkanodepool pool-b strimzi.io/remove-node-ids-

10.3.2. Impact on racks when moving nodes from node pools

If rack awareness is enabled on a Kafka cluster, replicas can be spread across different racks, data centers, or availability zones. When moving nodes from node pools, consider the implications on the cluster topology, particularly regarding rack awareness. Removing specific pods from node pools, especially out of order, may break the cluster topology or cause an imbalance in distribution across racks. An imbalance can impact both the distribution of nodes themselves and the partition replicas within the cluster. An uneven distribution of nodes and partitions across racks can affect the performance and resilience of the Kafka cluster.

Plan the removal of nodes strategically to maintain the required balance and resilience across racks. Use the strimzi.io/remove-node-ids annotation to move nodes with specific IDs with caution. Ensure that configuration to spread partition replicas across racks and for clients to consume from the closest replicas is not broken.

Tip

Use Cruise Control and the KafkaRebalance resource with the RackAwareGoal to make sure that replicas remain distributed across different racks.

10.3.3. Adding nodes to a node pool

This procedure describes how to scale up a node pool to add new nodes. Currently, scale up is only possible for broker-only node pools containing nodes that run as dedicated brokers.

In this procedure, we start with three nodes for node pool pool-a:

Kafka nodes in the node pool

NAME                 READY  STATUS   RESTARTS
my-cluster-pool-a-0  1/1    Running  0
my-cluster-pool-a-1  1/1    Running  0
my-cluster-pool-a-2  1/1    Running  0

Node IDs are appended to the name of the node on creation. We add node my-cluster-pool-a-3, which has a node ID of 3.

Note

During this process, the ID of the node that holds the partition replicas changes. Consider any dependencies that reference the node ID.

Prerequisites

Procedure

  1. Create a new node in the node pool.

    For example, node pool pool-a has three replicas. We add a node by increasing the number of replicas:

    oc scale kafkanodepool pool-a --replicas=4
  2. Check the status of the deployment and wait for the pods in the node pool to be created and ready (1/1).

    oc get pods -n <my_cluster_operator_namespace>

    Output shows four Kafka nodes in the node pool

    NAME                 READY  STATUS   RESTARTS
    my-cluster-pool-a-0  1/1    Running  0
    my-cluster-pool-a-1  1/1    Running  0
    my-cluster-pool-a-2  1/1    Running  0
    my-cluster-pool-a-3  1/1    Running  0

  3. Reassign the partitions after increasing the number of nodes in the node pool.

    • If auto-rebalancing is enabled, partitions are reassigned to new nodes automatically, so you can skip this step.
    • If auto-rebalancing is not enabled, use the Cruise Control add-brokers mode to move partition replicas from existing brokers to the newly added brokers.

      Using Cruise Control to reassign partition replicas

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaRebalance
      metadata:
        # ...
      spec:
        mode: add-brokers
        brokers: [3]

      We are reassigning partitions to node my-cluster-pool-a-3. The reassignment can take some time depending on the number of topics and partitions in the cluster.

10.3.4. Removing nodes from a node pool

This procedure describes how to scale down a node pool to remove nodes. Currently, scale down is only possible for broker-only node pools containing nodes that run as dedicated brokers.

In this procedure, we start with four nodes for node pool pool-a:

Kafka nodes in the node pool

NAME                 READY  STATUS   RESTARTS
my-cluster-pool-a-0  1/1    Running  0
my-cluster-pool-a-1  1/1    Running  0
my-cluster-pool-a-2  1/1    Running  0
my-cluster-pool-a-3  1/1    Running  0

Node IDs are appended to the name of the node on creation. We remove node my-cluster-pool-a-3, which has a node ID of 3.

Note

During this process, the ID of the node that holds the partition replicas changes. Consider any dependencies that reference the node ID.

Prerequisites

Procedure

  1. Reassign the partitions before decreasing the number of nodes in the node pool.

    • If auto-rebalancing is enabled, partitions are moved off brokers that are going to be removed automatically, so you can skip this step.
    • If auto-rebalancing is not enabled, use the Cruise Control remove-brokers mode to move partition replicas off the brokers that are going to be removed.

      Using Cruise Control to reassign partition replicas

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaRebalance
      metadata:
        # ...
      spec:
        mode: remove-brokers
        brokers: [3]

      We are reassigning partitions from node my-cluster-pool-a-3. The reassignment can take some time depending on the number of topics and partitions in the cluster.

  2. After the reassignment process is complete, and the node being removed has no live partitions, reduce the number of Kafka nodes in the node pool.

    For example, node pool pool-a has four replicas. We remove a node by decreasing the number of replicas:

    oc scale kafkanodepool pool-a --replicas=3

    Output shows three Kafka nodes in the node pool

    NAME                       READY  STATUS   RESTARTS
    my-cluster-pool-b-kafka-0  1/1    Running  0
    my-cluster-pool-b-kafka-1  1/1    Running  0
    my-cluster-pool-b-kafka-2  1/1    Running  0

10.3.5. Moving nodes between node pools

This procedure describes how to move nodes between source and target Kafka node pools without downtime. You create a new node on the target node pool and reassign partitions to move data from the old node on the source node pool. When the replicas on the new node are in-sync, you can delete the old node.

In this procedure, we start with two node pools:

  • pool-a with three replicas is the target node pool
  • pool-b with four replicas is the source node pool

We scale up pool-a, and reassign partitions and scale down pool-b, which results in the following:

  • pool-a with four replicas
  • pool-b with three replicas

Currently, scaling is only possible for broker-only node pools containing nodes that run as dedicated brokers.

Note

During this process, the ID of the node that holds the partition replicas changes. Consider any dependencies that reference the node ID.

Prerequisites

Procedure

  1. Create a new node in the target node pool.

    For example, node pool pool-a has three replicas. We add a node by increasing the number of replicas:

    oc scale kafkanodepool pool-a --replicas=4
  2. Check the status of the deployment and wait for the pods in the node pool to be created and ready (1/1).

    oc get pods -n <my_cluster_operator_namespace>

    Output shows four Kafka nodes in the source and target node pools

    NAME                 READY  STATUS   RESTARTS
    my-cluster-pool-a-0  1/1    Running  0
    my-cluster-pool-a-1  1/1    Running  0
    my-cluster-pool-a-4  1/1    Running  0
    my-cluster-pool-a-7  1/1    Running  0
    my-cluster-pool-b-2  1/1    Running  0
    my-cluster-pool-b-3  1/1    Running  0
    my-cluster-pool-b-5  1/1    Running  0
    my-cluster-pool-b-6  1/1    Running  0

    Node IDs are appended to the name of the node on creation. We add node my-cluster-pool-a-7, which has a node ID of 7.

    If auto-rebalancing is enabled, partitions are reassigned to new nodes and moved off brokers that are going to be removed automatically, so you can skip the next step.

  3. If auto-rebalancing is not enabled, reassign partitions before decreasing the number of nodes in the source node pool.

    Use the Cruise Control remove-brokers mode to move partition replicas off the brokers that are going to be removed.

    Using Cruise Control to reassign partition replicas

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaRebalance
    metadata:
      # ...
    spec:
      mode: remove-brokers
      brokers: [6]

    We are reassigning partitions from node my-cluster-pool-b-6. The reassignment can take some time depending on the number of topics and partitions in the cluster.

  4. After the reassignment process is complete, reduce the number of Kafka nodes in the source node pool.

    For example, node pool pool-b has four replicas. We remove a node by decreasing the number of replicas:

    oc scale kafkanodepool pool-b --replicas=3

    The node with the highest ID (6) within the pool is removed.

    Output shows three Kafka nodes in the source node pool

    NAME                       READY  STATUS   RESTARTS
    my-cluster-pool-b-kafka-2  1/1    Running  0
    my-cluster-pool-b-kafka-3  1/1    Running  0
    my-cluster-pool-b-kafka-5  1/1    Running  0

10.3.6. Changing node pool roles

Node pools are used with Kafka clusters that operate in KRaft mode (using Kafka Raft metadata). If you are using KRaft mode, you can specify roles for all nodes in the node pool to operate as brokers, controllers, or both.

In certain circumstances you might want to change the roles assigned to a node pool. For example, you may have a node pool that contains nodes that perform dual broker and controller roles, and then decide to split the roles between two node pools. In this case, you create a new node pool with nodes that act only as brokers, and then reassign partitions from the dual-role nodes to the new brokers. You can then switch the old node pool to a controller-only role.

You can also perform the reverse operation by moving from node pools with controller-only and broker-only roles to a node pool that contains nodes that perform dual broker and controller roles. In this case, you add the broker role to the existing controller-only node pool, reassign partitions from the broker-only nodes to the dual-role nodes, and then delete the broker-only node pool.

When removing broker roles in the node pool configuration, keep in mind that Kafka does not automatically reassign partitions. Before removing the broker role, ensure that nodes changing to controller-only roles do not have any assigned partitions. If partitions are assigned, the change is prevented. No replicas must be left on the node before removing the broker role. The best way to reassign partitions before changing roles is to apply a Cruise Control optimization proposal in remove-brokers mode. For more information, see Section 21.3, “Generating optimization proposals”.

Note

Scaling controller nodes in node pools is currently not supported because the related Kafka feature is still under development. For more information, see KAFKA-16538.

This procedure describes how to transition to using node pools with separate roles. If your Kafka cluster is using a node pool with combined controller and broker roles, you can transition to using two node pools with separate roles. To do this, rebalance the cluster to move partition replicas to a node pool with a broker-only role, and then switch the old node pool to a controller-only role.

In this procedure, we start with node pool pool-a, which has controller and broker roles:

Dual-role node pool

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - controller
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 20Gi
        deleteClaim: false
  # ...

The node pool has three nodes:

Kafka nodes in the node pool

NAME                 READY  STATUS   RESTARTS
my-cluster-pool-a-0  1/1    Running  0
my-cluster-pool-a-1  1/1    Running  0
my-cluster-pool-a-2  1/1    Running  0

Each node performs a combined role of broker and controller. We create a second node pool called pool-b, with three nodes that act as brokers only.

Note

During this process, the ID of the node that holds the partition replicas changes. Consider any dependencies that reference the node ID.

Procedure

  1. Create a node pool with a broker role.

    Example node pool configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: pool-b
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      roles:
        - broker
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
      # ...

    The new node pool also has three nodes. If you already have a broker-only node pool, you can skip this step.

  2. Apply the new KafkaNodePool resource to create the brokers.
  3. Check the status of the deployment and wait for the pods in the node pool to be created and ready (1/1).

    oc get pods -n <my_cluster_operator_namespace>

    Output shows pods running in two node pools

    NAME                 READY  STATUS   RESTARTS
    my-cluster-pool-a-0  1/1    Running  0
    my-cluster-pool-a-1  1/1    Running  0
    my-cluster-pool-a-2  1/1    Running  0
    my-cluster-pool-b-3  1/1    Running  0
    my-cluster-pool-b-4  1/1    Running  0
    my-cluster-pool-b-5  1/1    Running  0

    Node IDs are appended to the name of the node on creation.

  4. Use the Cruise Control remove-brokers mode to reassign partition replicas from the dual-role nodes to the newly added brokers.

    Using Cruise Control to reassign partition replicas

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaRebalance
    metadata:
      # ...
    spec:
      mode: remove-brokers
      brokers: [0, 1, 2]

    The reassignment can take some time depending on the number of topics and partitions in the cluster.

    Note

    If nodes changing to controller-only roles have any assigned partitions, the change is prevented. The status.conditions of the Kafka resource provide details of events preventing the change.

  5. Remove the broker role from the node pool that originally had a combined role.

    Dual-role nodes switched to controllers

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: pool-a
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      roles:
        - controller
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 20Gi
            deleteClaim: false
      # ...

  6. Apply the configuration change so that the node pool switches to a controller-only role.

10.3.8. Transitioning to dual-role nodes

This procedure describes how to transition from separate node pools with broker-only and controller-only roles to using a dual-role node pool. If your Kafka cluster is using node pools with dedicated controller and broker nodes, you can transition to using a single node pool with both roles. To do this, add the broker role to the controller-only node pool, rebalance the cluster to move partition replicas to the dual-role node pool, and then delete the old broker-only node pool.

In this procedure, we start with two node pools pool-a, which has only the controller role and pool-b which has only the broker role:

Single role node pools

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
  # ...
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-b
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
  # ...

The Kafka cluster has six nodes:

Kafka nodes in the node pools

NAME                 READY  STATUS   RESTARTS
my-cluster-pool-a-0  1/1    Running  0
my-cluster-pool-a-1  1/1    Running  0
my-cluster-pool-a-2  1/1    Running  0
my-cluster-pool-b-3  1/1    Running  0
my-cluster-pool-b-4  1/1    Running  0
my-cluster-pool-b-5  1/1    Running  0

The pool-a nodes perform the role of controller. The pool-b nodes perform the role of broker.

Note

During this process, the ID of the node that holds the partition replicas changes. Consider any dependencies that reference the node ID.

Procedure

  1. Edit the node pool pool-a and add the broker role to it.

    Example node pool configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: pool-a
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      roles:
        - controller
        - broker
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
      # ...

  2. Check the status and wait for the pods in the node pool to be restarted and ready (1/1).

    oc get pods -n <my_cluster_operator_namespace>

    Output shows pods running in two node pools

    NAME                 READY  STATUS   RESTARTS
    my-cluster-pool-a-0  1/1    Running  0
    my-cluster-pool-a-1  1/1    Running  0
    my-cluster-pool-a-2  1/1    Running  0
    my-cluster-pool-b-3  1/1    Running  0
    my-cluster-pool-b-4  1/1    Running  0
    my-cluster-pool-b-5  1/1    Running  0

    Node IDs are appended to the name of the node on creation.

  3. Use the Cruise Control remove-brokers mode to reassign partition replicas from the broker-only nodes to the dual-role nodes.

    Using Cruise Control to reassign partition replicas

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaRebalance
    metadata:
      # ...
    spec:
      mode: remove-brokers
      brokers: [3, 4, 5]

    The reassignment can take some time depending on the number of topics and partitions in the cluster.

  4. Remove the pool-b node pool that has the old broker-only nodes.

    oc delete kafkanodepool pool-b -n <my_cluster_operator_namespace>

10.4. Configuring Kafka storage

Streams for Apache Kafka supports different Kafka storage options. You can choose between the following basic types:

Ephemeral storage
Ephemeral storage is temporary and only persists while a pod is running. When a pod is deleted, the data is lost, though data can be recovered in a highly available environment. Due to its transient nature, ephemeral storage is only recommended for development and testing environments.
Persistent storage
Persistent storage retains data across pod restarts and system disruptions, making it ideal for production environments.

JBOD (Just a Bunch of Disks) storage allows you to configure your Kafka cluster to use multiple disks or volumes as ephemeral or persistent storage.

JBOD storage (multiple volumes)

When specifying JBOD storage, you must still decide between using ephemeral or persistent volumes for each disk. Even if you start with only one volume, using JBOD allows for future scaling by adding more volumes as needed, and that is why it is always recommended.

Note

Persistent, ephemeral, and JBOD storage types cannot be changed after a Kafka cluster is deployed. However, you can add or remove volumes of different types from the JBOD storage. You can also create and migrate to node pools with new storage specifications.

Tiered storage (advanced)

Tiered storage provides additional flexibility for managing Kafka data by combining different storage types with varying performance and cost characteristics. It allows Kafka to offload older data to cheaper, long-term storage (such as object storage) while keeping recent, frequently accessed data on faster, more expensive storage (such as block storage).

Tiered storage is an add-on capability. After configuring storage (ephemeral, persistent, or JBOD) for Kafka nodes, you can configure tiered storage at the cluster level and enable it for specific topics using the remote.storage.enable topic-level configuration.

10.4.1. Storage considerations

Efficient data storage is essential for Streams for Apache Kafka to operate effectively. Streams for Apache Kafka has been tested with block storage as the primary storage type for Kafka brokers, and block storage is strongly recommended. File system-based storage (such as NFS) is not guaranteed to work for primary broker storage and may cause stability or performance issues.

Common block storage types supported by OpenShift include:

  • Cloud-based block storage solutions:

    • Amazon EBS (for AWS)
    • Azure Disk Storage (for Microsoft Azure)
    • Persistent Disk (for Google Cloud)
  • Persistent storage (for bare metal deployments) using local persistent volumes
  • Storage Area Network (SAN) volumes accessed by protocols like Fibre Channel or iSCSI
Note

Streams for Apache Kafka does not require OpenShift raw block volumes.

10.4.1.1. File systems

Kafka uses a file system for storing messages. Streams for Apache Kafka is compatible with the XFS and ext4 file systems, which are commonly used with Kafka. Consider the underlying architecture and requirements of your deployment when choosing and setting up your file system.

For more information, refer to Filesystem Selection in the Kafka documentation.

10.4.1.2. Tiered storage

Kafka’s tiered storage feature is supported in Streams for Apache Kafka as an optional capability.

With tiered storage enabled:

  • Primary broker storage,such as persistent volumes or JBOD, handles recent data
  • Remote tiered storage, such as object storage, is used for historical data

Streams for Apache Kafka allows users to integrate their own tiered storage plugins to support specific remote storage backends. If using a custom plugin, ensure that it meets performance and compatibility requirements before deploying to production.

10.4.1.3. Disk usage

Solid-state drives (SSDs), though not essential, can improve the performance of Kafka in large clusters where data is sent to and received from multiple topics asynchronously.

Note

Replicated storage is not required, as Kafka provides built-in data replication.

10.4.2. Configuring storage types

Use the storage properties of the KafkaNodePool custom resource to configure storage for a deployment of Kafka in KRaft mode.

10.4.2.1. Configuring ephemeral storage

To use ephemeral storage, specify ephemeral as the storage type.

Example configuration for ephemeral storage

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: my-node-pool
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: ephemeral
  # ...

Ephemeral storage uses emptyDir volumes, which are created when a pod is assigned to a node. You can limit the size of the emptyDir volume with the sizeLimit property.

The ephemeral volume used by Kafka brokers for log directories is mounted at /var/lib/kafka/data/kafka-log<pod_id>.

Important

Ephemeral storage is not suitable for Kafka topics with a replication factor of 1.

For more information on ephemeral storage configuration options, see the EphemeralStorage schema reference.

10.4.2.2. Configuring persistent storage

To use persistent storage, specify one of the following as the storage type:

  • persistent-claim for a single persistent volume
  • jbod for multiple persistent volumes in a Kafka cluster (Recommended for Kafka in a production environment)

Example configuration for persistent storage

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: my-node-pool
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: persistent-claim
    size: 500Gi
    deleteClaim: true
  # ...

Streams for Apache Kafka uses Persistent Volume Claims (PVCs) to request storage on persistent volumes (PVs). The PVC binds to a PV that meets the requested storage criteria, without needing to know the underlying storage infrastructure.

PVCs created for Kafka pods follow the naming convention data-<kafka_cluster_name>-<pool_name>-<pod_id>, and the persistent volumes for Kafka logs are mounted at /var/lib/kafka/data/kafka-log<pod_id>.

You can also specify custom storage classes (StorageClass) and volume selectors in the storage configuration.

Example class and selector configuration

# ...
  storage:
    type: persistent-claim
    size: 500Gi
    class: my-storage-class
    selector:
      hdd-type: ssd
    deleteClaim: true
# ...

Storage classes define storage profiles and dynamically provision persistent volumes (PVs) based on those profiles. This is useful, for example, when storage classes are restricted to different availability zones or data centers. If a storage class is not specified, the default storage class in the OpenShift cluster is used. Selectors specify persistent volumes that offer specific features, such as solid-state drive (SSD) volumes.

For more information on persistent storage configuration options, see the PersistentClaimStorage schema reference.

10.4.2.3. Resizing persistent volumes

Persistent volumes can be resized by changing the size storage property without any risk of data loss, as long as the storage infrastructure supports it. Following a configuration update to change the size of the storage, Streams for Apache Kafka instructs the storage infrastructure to make the change.

Storage expansion is supported in Streams for Apache Kafka clusters that use persistent-claim volumes. Decreasing the size of persistent volumes is not supported in OpenShift. For more information about resizing persistent volumes in OpenShift, see Resizing Persistent Volumes using Kubernetes.

After increasing the value of the size property, OpenShift increases the capacity of the selected persistent volumes in response to a request from the Cluster Operator. When the resizing is complete, the Cluster Operator restarts all pods that use the resized persistent volumes. This happens automatically.

In this example, the volumes are increased to 2000Gi.

Kafka configuration to increase volume size to 2000Gi

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: my-node-pool
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 2000Gi
        deleteClaim: false
      - id: 1
        type: persistent-claim
        size: 2000Gi
        deleteClaim: false
      - id: 2
        type: persistent-claim
        size: 2000Gi
        deleteClaim: false
  # ...

Returning information on the PVs verifies the changes:

oc get pv

Storage capacity of PVs

NAME               CAPACITY   CLAIM
pvc-0ca459ce-...   2000Gi     my-project/data-my-cluster-my-node-pool-2
pvc-6e1810be-...   2000Gi     my-project/data-my-cluster-my-node-pool-0
pvc-82dc78c9-...   2000Gi     my-project/data-my-cluster-my-node-pool-1

The output shows the names of each PVC associated with a broker pod.

Note

Storage reduction is only possible when using multiple disks per broker. You can remove a disk after moving all partitions on the disk to other volumes within the same broker (intra-broker) or to other brokers within the same cluster (intra-cluster).

10.4.2.4. Configuring JBOD storage

To use JBOD storage, specify jbod as the storage type and add configuration for the JBOD volumes. JBOD volumes can be persistent or ephemeral, with the configuration options and constraints applicable to each type.

Example configuration for JBOD storage

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: my-node-pool
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 1
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 2
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
  # ...

PVCs are created for the JBOD volumes using the naming convention data-<volume_id>-<kafka_cluster_name>-<pool_name>-<pod_id>, and the JBOD volumes used for log directories are mounted at /var/lib/kafka/data-<volume_id>/kafka-log<pod_id>.

10.4.2.5. Adding or removing volumes from JBOD storage

Volume IDs cannot be changed once JBOD volumes are created, though you can add or remove volumes. When adding a new volume to the to the volumes array under an id which was already used in the past and removed, make sure that the previously used PersistentVolumeClaims have been deleted.

Use Cruise Control to reassign partitions when adding or removing volumes. For information on intra-broker disk balancing, see Section 21.1.3, “Tuning options for rebalances”.

10.4.3. Configuring KRaft metadata log storage

In KRaft mode, each node (including brokers and controllers) stores a copy of the Kafka cluster’s metadata log on one of its data volumes. By default, the log is stored on the volume with the lowest ID, but you can specify a different volume using the kraftMetadata property.

For controller-only nodes, storage is exclusively for the metadata log. Since the log is always stored on a single volume, using JBOD storage with multiple volumes does not improve performance or increase available disk space.

In contrast, broker nodes or nodes that combine broker and controller roles can share the same volume for both the metadata log and partition replica data, optimizing disk utilization. They can also use JBOD storage, where one volume is shared for the metadata log and partition replica data, while additional volumes are used solely for partition replica data.

Changing the volume that stores the metadata log triggers a rolling update of the cluster nodes, involving the deletion of the old log and the creation of a new one in the specified location. If kraftMetadata isn’t specified, adding a new volume with a lower ID also prompts an update and relocation of the metadata log.

Example JBOD storage configuration using volume with ID 1 to store the KRaft metadata

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: pool-a
  # ...
spec:
  storage:
    type: jbod
    volumes:
    - id: 0
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
    - id: 1
      type: persistent-claim
      size: 100Gi
      kraftMetadata: shared
      deleteClaim: false
  # ...

10.4.4. Managing storage using node pools

Storage management in Streams for Apache Kafka is usually straightforward, and requires little change when set up, but there might be situations where you need to modify your storage configurations. Node pools simplify this process, because you can set up separate node pools that specify your new storage requirements.

In this procedure we create and manage storage for a node pool called pool-a containing three nodes. The steps require a scaling operation to add a new node pool. Currently, scaling is only possible for broker-only node pools containing nodes that run as dedicated brokers.

We show how to change the storage class (volumes.class) that defines the type of persistent storage it uses. You can use the same steps to change the storage size (volumes.size). This approach is particularly useful if you want to reduce disk sizes. When increasing disk sizes, you have the option to dynamically resize persistent volumes.

Note

We strongly recommend using block storage. Streams for Apache Kafka is only tested for use with block storage.

Prerequisites

Procedure

  1. Create the node pool with its own storage settings.

    For example, node pool pool-a uses JBOD storage with persistent volumes:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: pool-a
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      roles:
        - broker
      replicas: 3
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 500Gi
            class: gp2-ebs
      # ...

    Nodes in pool-a are configured to use Amazon EBS (Elastic Block Store) GP2 volumes.

  2. Apply the node pool configuration for pool-a.
  3. Check the status of the deployment and wait for the pods in pool-a to be created and ready (1/1).

    oc get pods -n <my_cluster_operator_namespace>

    Output shows three Kafka nodes in the node pool

    NAME                 READY  STATUS   RESTARTS
    my-cluster-pool-a-0  1/1    Running  0
    my-cluster-pool-a-1  1/1    Running  0
    my-cluster-pool-a-2  1/1    Running  0

  4. To migrate to a new storage class, create a new node pool with the required storage configuration:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: pool-b
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      roles:
        - broker
      replicas: 3
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 1Ti
            class: gp3-ebs
      # ...

    Nodes in pool-b are configured to use Amazon EBS (Elastic Block Store) GP3 volumes.

  5. Apply the node pool configuration for pool-b.
  6. Check the status of the deployment and wait for the pods in pool-b to be created and ready.
  7. Reassign the partitions from pool-a to pool-b.

    When migrating to a new storage configuration, use the Cruise Control remove-brokers mode to move partition replicas off the brokers that are going to be removed.

    Using Cruise Control to reassign partition replicas

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaRebalance
    metadata:
      # ...
    spec:
      mode: remove-brokers
      brokers: [0, 1, 2]

    We are reassigning partitions from pool-a. The reassignment can take some time depending on the number of topics and partitions in the cluster.

  8. After the reassignment process is complete, delete the old node pool:

    oc delete kafkanodepool pool-a

10.4.5. Managing storage affinity using node pools

In situations where storage resources, such as local persistent volumes, are constrained to specific worker nodes, or availability zones, configuring storage affinity helps to schedule pods to use the right nodes.

Node pools allow you to configure affinity independently. In this procedure, we create and manage storage affinity for two availability zones: zone-1 and zone-2.

You can configure node pools for separate availability zones, but use the same storage class. We define an all-zones persistent storage class representing the storage resources available in each zone.

We also use the .spec.template.pod properties to configure the node affinity and schedule Kafka pods on zone-1 and zone-2 worker nodes.

The storage class and affinity is specified in node pools representing the nodes in each availability zone:

  • pool-zone-1
  • pool-zone-2.

Prerequisites

Procedure

  1. Define the storage class for use with each availability zone:

    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: all-zones
    provisioner: kubernetes.io/my-storage
    parameters:
      type: ssd
    volumeBindingMode: WaitForFirstConsumer
  2. Create node pools representing the two availability zones, specifying the all-zones storage class and the affinity for each zone:

    Node pool configuration for zone-1

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: pool-zone-1
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 500Gi
            class: all-zones
      template:
        pod:
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                  - matchExpressions:
                    - key: topology.kubernetes.io/zone
                      operator: In
                      values:
                      - zone-1
      # ...

    Node pool configuration for zone-2

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: pool-zone-2
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 4
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 500Gi
            class: all-zones
      template:
        pod:
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                  - matchExpressions:
                    - key: topology.kubernetes.io/zone
                      operator: In
                      values:
                      - zone-2
      # ...

  3. Apply the node pool configuration.
  4. Check the status of the deployment and wait for the pods in the node pools to be created and ready (1/1).

    oc get pods -n <my_cluster_operator_namespace>

    Output shows 3 Kafka nodes in pool-zone-1 and 4 Kafka nodes in pool-zone-2

    NAME                            READY  STATUS   RESTARTS
    my-cluster-pool-zone-1-kafka-0  1/1    Running  0
    my-cluster-pool-zone-1-kafka-1  1/1    Running  0
    my-cluster-pool-zone-1-kafka-2  1/1    Running  0
    my-cluster-pool-zone-2-kafka-3  1/1    Running  0
    my-cluster-pool-zone-2-kafka-4  1/1    Running  0
    my-cluster-pool-zone-2-kafka-5  1/1    Running  0
    my-cluster-pool-zone-2-kafka-6  1/1    Running  0

10.4.6. Tiered storage

Tiered storage introduces a flexible approach to managing Kafka data whereby log segments are moved to a separate storage system. For example, you can combine the use of block storage on brokers for frequently accessed data and offload older or less frequently accessed data from the block storage to more cost-effective, scalable remote storage solutions, such as Amazon S3, without compromising data accessibility and durability.

Note

Tiered storage is a production-ready feature in Kafka since version 3.9.0, and it is also supported in Streams for Apache Kafka. Before introducing tiered storage to your environment, review the known limitations of this feature.

Tiered storage requires an implementation of Kafka’s RemoteStorageManager interface to handle communication between Kafka and the remote storage system, which is enabled through configuration of the Kafka resource. Streams for Apache Kafka uses Kafka’s TopicBasedRemoteLogMetadataManager for Remote Log Metadata Management (RLMM) when custom tiered storage is enabled. The RLMM manages the metadata related to remote storage.

To use custom tiered storage, do the following:

  • Include a tiered storage plugin for Kafka in the Streams for Apache Kafka image by building a custom container image. The plugin must provide the necessary functionality for a Kafka cluster managed by Streams for Apache Kafka to interact with the tiered storage solution.
  • Configure Kafka for tiered storage using tieredStorage properties in the Kafka resource. Specify the class name and path for the custom RemoteStorageManager implementation, as well as any additional configuration.
  • If required, specify RLMM-specific tiered storage configuration.

Example custom tiered storage configuration for Kafka

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    tieredStorage:
      type: custom 
1

      remoteStorageManager: 
2

        className: com.example.kafka.tiered.storage.s3.S3RemoteStorageManager
        classPath: /opt/kafka/plugins/tiered-storage-s3/*
        config:
          storage.bucket.name: my-bucket 
3

          # ...
    config:
      rlmm.config.remote.log.metadata.topic.replication.factor: 1 
4

  # ...

1
The type must be set to custom.
2
The configuration for the custom RemoteStorageManager implementation, including class name and path.
3
Configuration to pass to the custom RemoteStorageManager implementation, which Streams for Apache Kafka automatically prefixes with rsm.config..
4
Tiered storage configuration to pass to the RLMM, which requires an rlmm.config. prefix. For more information on tiered storage configuration, see the Apache Kafka documentation.

10.4.6.1. Tiered storage plugin libraries

An open-source tiered-storage-for-apache-kafka project from Aiven demonstrates how Apache Kafka can use remote object storage for tiered storage. The project provides an implementation of the RemoteStorageManager interface for Kafka’s tiered storage feature.

Streams for Apache Kafka includes plugin libraries from this project for AWS S3 and file system storage by default in Kafka v4.0 and later.

For more information, see the tiered-storage-for-apache-kafka project on GitHub.

10.4.6.2. AWS S3 remote storage configuration

Streams for Apache Kafka includes plugin libraries from the open-source tiered-storage-for-apache-kafka project to enable tiered storage using AWS S3.

To configure AWS S3 as the remote storage, specify the following RemoteStorageManager properties in the Kafka resource:

  • className: io.aiven.kafka.tieredstorage.RemoteStorageManager
  • classPath: /opt/kafka/libs/tiered-storage/*
  • storage.backend.class: io.aiven.kafka.tieredstorage.storage.s3.S3Storage
  • Any additional S3-related configuration properties

Example Kafka configuration for Amazon S3 tiered storage

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    tieredStorage:
      type: custom
      remoteStorageManager:
        className: io.aiven.kafka.tieredstorage.RemoteStorageManager
        classPath: /opt/kafka/libs/tiered-storage/*
        config:
          storage.backend.class: io.aiven.kafka.tieredstorage.storage.s3.S3Storage
          storage.s3.bucket.name: <bucket_name> 
1

          storage.s3.region: <region> 
2

          storage.aws.access.key.id: <aws_access_key_id> 
3

          storage.aws.secret.access.key: <aws_secret_key> 
4

          chunk.size: <chunk_size> 
5

          ...
    config:
      rlmm.config.remote.log.metadata.topic.replication.factor: 3
  # ...

1
The name of the AWS S3 bucket used for storing and retrieving data.
2
The AWS region of the S3 bucket (for example, us-east-1).
3
The AWS access key ID used to access the S3 bucket.
4
The AWS secret access key used to access the S3 bucket.
5
The size (in bytes) of chunks into which segment files are split. It’s recommended to start with "4194304" (4MiB).

10.4.6.3. File system remote storage configuration

Streams for Apache Kafka includes plugin libraries from the open-source tiered-storage-for-apache-kafka project to enable tiered storage using file system paths. The file system storage plugin supports any mountable file system that can be accessed using a local path (for example, /mnt/tiered-storage/).

To configure a file system as remote storage (this example uses NFS), follow these steps:

  • Mount the NFS volume in all KafkaNodePool resources performing a broker role using the additional volumes feature. KafkaNodePool resources with only the controller role do not require this configuration, as they don’t use tiered storage.
  • Specify the following RemoteStorageManager properties in the Kafka resource:

    • className: io.aiven.kafka.tieredstorage.RemoteStorageManager
    • classPath: /opt/kafka/libs/tiered-storage/*
    • storage.backend.class: io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage
    • Any additional file system–related configuration properties

Example node pool configuration for file system (NFS) tiered storage

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker
  labels:
    strimzi.io/cluster: my-cluster
spec:
  roles:
    - broker
  # ...
  template:
    pod:
      volumes:
        - name: tiered-storage
          persistentVolumeClaim:
            claimName: tiered-storage-nfs 
1

    kafkaContainer:
      volumeMounts:
        - name: tiered-storage
          mountPath: /mnt/tiered-storage/ 
2

# ...

1
The Persistent Volume Claim (PVC) uses the nfs storage class to provision the NFS volume
2
The mount path where the NFS volume is made available in the container. This path must match the storage.root value specified in the Kafka resource.

Example Kafka configuration for file system (NFS) tiered storage

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    tieredStorage:
      type: custom
      remoteStorageManager:
        className: io.aiven.kafka.tieredstorage.RemoteStorageManager
        classPath: /opt/kafka/libs/tiered-storage/*
        config:
          storage.backend.class: io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage
          storage.root: /mnt/tiered-storage/ 
1

          storage.overwrite.enabled: "true" 
2

          chunk.size: <chunk_size> 
3

    config:
      rlmm.config.remote.log.metadata.topic.replication.factor: 3
  # ...

1
The path to the mounted file system used to write and read tiered storage data.
2
Specifies whether existing files can be overwritten. Accepts true or false.
3
The size (in bytes) of chunks into which segment files are split. It’s recommended to start with "4194304" (4MiB).

10.4.6.4. Common tiered storage tuning properties

In addition to the required configuration properties, Streams for Apache Kafka supports a set of optional tuning properties for the tiered storage plugin. These properties can help optimize storage performance, throughput, and resource usage depending on your environment.

Common tuning properties include the following:

storage.chunk.cache.prefetch.max.size
Controls the maximum amount of data (in bytes) to prefetch and cache. This setting can improve performance when reading chunked segments.
Default: 0
storage.upload.rate.limit.bytes.per.second
Sets an upper bound on the upload rate from disk (in bytes per second). The value must be at least 1 MiB/s.
Default: No limit
storage.compression.enabled
Enables additional compression of segment data to reduce storage usage.
Default: false
storage.s3.multipart.upload.part.size
Sets the part size (in bytes) used when performing multipart uploads to S3. Tuning this value can improve upload performance and compatibility with S3 limits.
Default: 26214400 (25 MiB)

For a full list of supported configuration options, see the tiered-storage-for-apache-kafka project documentation.

10.5. Configuring the Entity Operator

Use the entityOperator property in Kafka.spec to configure the Entity Operator. The Entity Operator is responsible for managing Kafka-related entities in a running Kafka cluster. It comprises the following operators:

  • Topic Operator to manage Kafka topics
  • User Operator to manage Kafka users

By configuring the Kafka resource, the Cluster Operator can deploy the Entity Operator, including one or both operators. Once deployed, the operators are automatically configured to handle the topics and users of the Kafka cluster.

Each operator can only monitor a single namespace. For more information, see Section 1.2.1, “Operator-watched Kafka resources”.

The entityOperator property supports several sub-properties:

  • topicOperator
  • userOperator
  • template

The template property contains the configuration of the Entity Operator pod, such as labels, annotations, affinity, and tolerations. For more information on configuring templates, see Section 10.12.14, “Customizing OpenShift resources”.

The topicOperator property contains the configuration of the Topic Operator. When this option is missing, the Entity Operator is deployed without the Topic Operator.

The userOperator property contains the configuration of the User Operator. When this option is missing, the Entity Operator is deployed without the User Operator.

For more information on the properties used to configure the Entity Operator, see the EntityOperatorSpec schema reference.

Example of basic configuration enabling both operators

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  entityOperator:
    topicOperator: {}
    userOperator: {}

If an empty object ({}) is used for the topicOperator and userOperator, all properties use their default values.

When both topicOperator and userOperator properties are missing, the Entity Operator is not deployed.

10.5.1. Configuring the Topic Operator

Use topicOperator properties in Kafka.spec.entityOperator to configure the Topic Operator.

The following properties are supported:

watchedNamespace
The OpenShift namespace in which the Topic Operator watches for KafkaTopic resources. Default is the namespace where the Kafka cluster is deployed.
reconciliationIntervalMs
The interval between periodic reconciliations in milliseconds. Default 120000.
image
The image property can be used to configure the container image which is used. To learn more, refer to the information provided on configuring the image property`.
resources
The resources property configures the amount of resources allocated to the Topic Operator. You can specify requests and limits for memory and cpu resources. The requests should be enough to ensure a stable performance of the operator.
logging
The logging property configures the logging of the Topic Operator. To learn more, refer to the information provided on Topic Operator logging.

Example Topic Operator configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalMs: 60000
      resources:
        requests:
          cpu: "1"
          memory: 500Mi
        limits:
          cpu: "1"
          memory: 500Mi
    # ...

10.5.2. Configuring the User Operator

Use userOperator properties in Kafka.spec.entityOperator to configure the User Operator. The following properties are supported:

watchedNamespace
The OpenShift namespace in which the User Operator watches for KafkaUser resources. Default is the namespace where the Kafka cluster is deployed.
reconciliationIntervalMs
The interval between periodic reconciliations in milliseconds. Default 120000.
image
The image property can be used to configure the container image which will be used. To learn more, refer to the information provided on configuring the image property`.
resources
The resources property configures the amount of resources allocated to the User Operator. You can specify requests and limits for memory and cpu resources. The requests should be enough to ensure a stable performance of the operator.
logging
The logging property configures the logging of the User Operator. To learn more, refer to the information provided on User Operator logging.
secretPrefix
The secretPrefix property adds a prefix to the name of all Secrets created from the KafkaUser resource. For example, secretPrefix: kafka- would prefix all Secret names with kafka-. So a KafkaUser named my-user would create a Secret named kafka-my-user.

Example User Operator configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  entityOperator:
    # ...
    userOperator:
      watchedNamespace: my-user-namespace
      reconciliationIntervalMs: 60000
      resources:
        requests:
          cpu: "1"
          memory: 500Mi
        limits:
          cpu: "1"
          memory: 500Mi
    # ...

10.6. Configuring the Cluster Operator

Use environment variables to configure the Cluster Operator. Specify the environment variables for the container image of the Cluster Operator in its Deployment configuration file. You can use the following environment variables to configure the Cluster Operator. If you are running Cluster Operator replicas in standby mode, there are additional environment variables for enabling leader election.

Kafka, Kafka Connect, and Kafka MirrorMaker support multiple versions. Use their STRIMZI_<COMPONENT_NAME>_IMAGES environment variables to configure the default container images used for each version. The configuration provides a mapping between a version and an image. The required syntax is whitespace or comma-separated <version> = <image> pairs, which determine the image to use for a given version. For example, 4.1.0=registry.redhat.io/amq-streams/kafka-41-rhel9:3.1.0. Theses default images are overridden if image property values are specified in the configuration of a component. For more information on image configuration of components, see the Streams for Apache Kafka Custom Resource API Reference.

Note

The Deployment configuration file provided with the Streams for Apache Kafka release artifacts is install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml.

STRIMZI_NAMESPACE

A comma-separated list of namespaces that the operator operates in. When not set, set to empty string, or set to *, the Cluster Operator operates in all namespaces.

The Cluster Operator deployment might use the downward API to set this automatically to the namespace the Cluster Operator is deployed in.

Example configuration for Cluster Operator namespaces

env:
  - name: STRIMZI_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace

STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
Optional, default is 120000 ms. The interval between periodic reconciliations, in milliseconds.
STRIMZI_OPERATION_TIMEOUT_MS
Optional, default 300000 ms. The timeout for internal operations, in milliseconds. Increase this value when using Streams for Apache Kafka on clusters where regular OpenShift operations take longer than usual (due to factors such as prolonged download times for container images, for example).
STRIMZI_OPERATIONS_THREAD_POOL_SIZE
Optional, default 10. The worker thread pool size, which is used for various asynchronous and blocking operations that are run by the Cluster Operator.
STRIMZI_OPERATOR_NAME
Optional, defaults to the pod’s hostname. The operator name identifies the Streams for Apache Kafka instance when emitting OpenShift events.
STRIMZI_OPERATOR_NAMESPACE

The name of the namespace where the Cluster Operator is running. Do not configure this variable manually. Use the downward API.

env:
  - name: STRIMZI_OPERATOR_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace
STRIMZI_OPERATOR_NAMESPACE_LABELS

Optional. The labels of the namespace where the Streams for Apache Kafka Cluster Operator is running. Use namespace labels to configure the namespace selector in network policies. Network policies allow the Streams for Apache Kafka Cluster Operator access only to the operands from the namespace with these labels. When not set, the namespace selector in network policies is configured to allow access to the Cluster Operator from any namespace in the OpenShift cluster.

env:
  - name: STRIMZI_OPERATOR_NAMESPACE_LABELS
    value: label1=value1,label2=value2
STRIMZI_POD_DISRUPTION_BUDGET_GENERATION

Optional. Default is true. Controls automatic creation of PodDisruptionBudget resources for Kafka, Kafka Connect, MirrorMaker2, and Kafka Bridge. Each budget applies across all pods deployed for the associated component. For Kafka clusters, this includes all node pool pods.

A pod disruption budget with the maxUnavailable value set to zero prevents OpenShift from evicting pods automatically.

Set this environment variable to false to disable pod disruption budget generation. You might do this, for example, if you want to manage the pod disruption budgets yourself, or if you have a development environment where availability is not important.

STRIMZI_LABELS_EXCLUSION_PATTERN

Optional, default regex pattern is (^app.kubernetes.io/(?!part-of).*|^kustomize.toolkit.fluxcd.io.*). The regex exclusion pattern used to filter labels propagation from the main custom resource to its subresources. The labels exclusion filter is not applied to labels in template sections such as spec.kafka.template.pod.metadata.labels.

env:
  - name: STRIMZI_LABELS_EXCLUSION_PATTERN
    value: "^key1.*"
STRIMZI_CUSTOM_<COMPONENT_NAME>_LABELS

Optional. One or more custom labels to apply to all the pods created by the custom resource of the component. The Cluster Operator labels the pods when the custom resource is created or is next reconciled.

Labels can be applied to the following components:

  • KAFKA
  • KAFKA_CONNECT
  • KAFKA_CONNECT_BUILD
  • ENTITY_OPERATOR
  • KAFKA_MIRROR_MAKER2
  • KAFKA_MIRROR_MAKER
  • CRUISE_CONTROL
  • KAFKA_BRIDGE
  • KAFKA_EXPORTER
STRIMZI_CUSTOM_RESOURCE_SELECTOR

Optional. The label selector to filter the custom resources handled by the Cluster Operator. The operator will operate only on those custom resources that have the specified labels set. Resources without these labels will not be seen by the operator. The label selector applies to Kafka, KafkaConnect, KafkaBridge, and KafkaMirrorMaker2 resources. KafkaRebalance and KafkaConnector resources are operated only when their corresponding Kafka and Kafka Connect clusters have the matching labels.

env:
  - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR
    value: label1=value1,label2=value2
STRIMZI_KAFKA_IMAGES
Required. The mapping from the Kafka version to the corresponding image containing a Kafka broker for that version. For example 4.0.0=registry.redhat.io/amq-streams/kafka-40-rhel9:3.1.0, 4.1.0=registry.redhat.io/amq-streams/kafka-41-rhel9:3.1.0.
STRIMZI_KAFKA_CONNECT_IMAGES
Required. The mapping from the Kafka version to the corresponding image of Kafka Connect for that version. For example 4.0.0=registry.redhat.io/amq-streams/kafka-40-rhel9:3.1.0, 4.1.0=registry.redhat.io/amq-streams/kafka-41-rhel9:3.1.0.
STRIMZI_KAFKA_MIRROR_MAKER2_IMAGES
Required. The mapping from the Kafka version to the corresponding image of MirrorMaker 2 for that version. For example 4.0.0=registry.redhat.io/amq-streams/kafka-40-rhel9:3.1.0, 4.1.0=registry.redhat.io/amq-streams/kafka-41-rhel9:3.1.0.
STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
Optional. The default is registry.redhat.io/amq-streams/strimzi-rhel9-operator:3.1.0. The image name to use as the default when deploying the Topic Operator if no image is specified as the Kafka.spec.entityOperator.topicOperator.image in the Kafka resource.
STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
Optional. The default is registry.redhat.io/amq-streams/strimzi-rhel9-operator:3.1.0. The image name to use as the default when deploying the User Operator if no image is specified as the Kafka.spec.entityOperator.userOperator.image in the Kafka resource.
STRIMZI_DEFAULT_KAFKA_EXPORTER_IMAGE
Optional. The default is registry.redhat.io/amq-streams/kafka-41-rhel9:3.1.0. The image name to use as the default when deploying the Kafka Exporter if no image is specified as the Kafka.spec.kafkaExporter.image in the Kafka resource.
STRIMZI_DEFAULT_CRUISE_CONTROL_IMAGE
Optional. The default is registry.redhat.io/amq-streams/kafka-41-rhel9:3.1.0. The image name to use as the default when deploying Cruise Control if no image is specified as the Kafka.spec.cruiseControl.image in the Kafka resource.
STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE
Optional. The default is registry.redhat.io/amq-streams/bridge-rhel9:3.1.0. The image name to use as the default when deploying the Kafka Bridge if no image is specified as the Kafka.spec.kafkaBridge.image in the Kafka resource.
STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
Optional. The default is registry.redhat.io/amq-streams/kafka-41-rhel9:3.1.0. The image name to use as the default for the Kafka initializer container if no image is specified in the brokerRackInitImage of the Kafka resource or the clientRackInitImage of the Kafka Connect resource. The init container is started before the Kafka cluster for initial configuration work, such as rack support.
STRIMZI_IMAGE_PULL_POLICY
Optional. The ImagePullPolicy that is applied to containers in all pods managed by the Cluster Operator. The valid values are Always, IfNotPresent, and Never. If not specified, the OpenShift defaults are used. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters.
STRIMZI_IMAGE_PULL_SECRETS
Optional. A comma-separated list of Secret names. The secrets referenced here contain the credentials to the container registries where the container images are pulled from. The secrets are specified in the imagePullSecrets property for all pods created by the Cluster Operator. Changing this list results in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters.
STRIMZI_KUBERNETES_VERSION

Optional. Overrides the OpenShift version information detected from the API server.

Example configuration for OpenShift version override

env:
  - name: STRIMZI_KUBERNETES_VERSION
    value: |
           major=1
           minor=16
           gitVersion=v1.16.2
           gitCommit=c97fe5036ef3df2967d086711e6c0c405941e14b
           gitTreeState=clean
           buildDate=2019-10-15T19:09:08Z
           goVersion=go1.12.10
           compiler=gc
           platform=linux/amd64

KUBERNETES_SERVICE_DNS_DOMAIN

Optional. Overrides the default OpenShift DNS domain name suffix.

By default, services assigned in the OpenShift cluster have a DNS domain name that uses the default suffix cluster.local.

For example, for broker kafka-0:

<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local

The DNS domain name is added to the Kafka broker certificates used for hostname verification.

If you are using a different DNS domain name suffix in your cluster, change the KUBERNETES_SERVICE_DNS_DOMAIN environment variable from the default to the one you are using in order to establish a connection with the Kafka brokers.

STRIMZI_CONNECT_BUILD_TIMEOUT_MS
Optional, default 300000 ms. The timeout for building new Kafka Connect images with additional connectors, in milliseconds. Consider increasing this value when using Streams for Apache Kafka to build container images containing many connectors or using a slow container registry.
STRIMZI_NETWORK_POLICY_GENERATION

Optional, default true. Network policy for resources. Network policies allow connections between Kafka components.

Set this environment variable to false to disable network policy generation. You might do this, for example, if you want to use custom network policies. Custom network policies allow more control over maintaining the connections between components.

STRIMZI_DNS_CACHE_TTL
Optional, default 30. Number of seconds to cache successful name lookups in local DNS resolver. Any negative value means cache forever. Zero means do not cache, which can be useful for avoiding connection errors due to long caching policies being applied.
STRIMZI_POD_SET_RECONCILIATION_ONLY
Optional, default false. When set to true, the Cluster Operator reconciles only the StrimziPodSet resources and any changes to the other custom resources (Kafka, KafkaConnect, and so on) are ignored. This mode is useful for ensuring that your pods are recreated if needed, but no other changes happen to the clusters.
STRIMZI_FEATURE_GATES
Optional. Enables or disables the features and functionality controlled by feature gates.
STRIMZI_POD_SECURITY_PROVIDER_CLASS
Optional. Configuration for the pluggable PodSecurityProvider class, which can be used to provide the security context configuration for Pods and containers.

Use the STRIMZI_OPERATOR_NAMESPACE_LABELS environment variable to establish network policy for the Cluster Operator using namespace labels.

The Cluster Operator can run in the same namespace as the resources it manages, or in a separate namespace. By default, the STRIMZI_OPERATOR_NAMESPACE environment variable is configured to use the downward API to find the namespace the Cluster Operator is running in. If the Cluster Operator is running in the same namespace as the resources, only local access is required and allowed by Streams for Apache Kafka.

If the Cluster Operator is running in a separate namespace to the resources it manages, any namespace in the OpenShift cluster is allowed access to the Cluster Operator unless network policy is configured. By adding namespace labels, access to the Cluster Operator is restricted to the namespaces specified.

Network policy configured for the Cluster Operator deployment

#...
env:
  # ...
  - name: STRIMZI_OPERATOR_NAMESPACE_LABELS
    value: label1=value1,label2=value2
  #...

10.6.2. Setting periodic reconciliation of custom resources

Use the STRIMZI_FULL_RECONCILIATION_INTERVAL_MS variable to set the time interval for periodic reconciliations by the Cluster Operator. Replace its value with the required interval in milliseconds.

Reconciliation period configured for the Cluster Operator deployment

#...
env:
  # ...
  - name: STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
    value: "120000"
  #...

The Cluster Operator reacts to all notifications about applicable cluster resources received from the OpenShift cluster. If the operator is not running, or if a notification is not received for any reason, resources will get out of sync with the state of the running OpenShift cluster. In order to handle failovers properly, a periodic reconciliation process is executed by the Cluster Operator so that it can compare the state of the resources with the current cluster deployments in order to have a consistent state across all of them.

Sometimes it is useful to pause the reconciliation of custom resources managed by Streams for Apache Kafka operators, so that you can perform fixes or make updates. If reconciliations are paused, any changes made to custom resources are ignored by the operators until the pause ends.

If you want to pause reconciliation of a custom resource, set the strimzi.io/pause-reconciliation annotation to true in its configuration. This instructs the appropriate operator to pause reconciliation of the custom resource. For example, you can apply the annotation to the KafkaConnect resource so that reconciliation by the Cluster Operator is paused.

You can also create a custom resource with the pause annotation enabled. The custom resource is created, but it is ignored.

Prerequisites

  • The Streams for Apache Kafka Operator that manages the custom resource is running.

Procedure

  1. Annotate the custom resource in OpenShift, setting pause-reconciliation to true:

    oc annotate <kind_of_custom_resource> <name_of_custom_resource> strimzi.io/pause-reconciliation="true"

    For example, for the KafkaConnect custom resource:

    oc annotate KafkaConnect my-connect strimzi.io/pause-reconciliation="true"
  2. Check that the status conditions of the custom resource show a change to ReconciliationPaused:

    oc describe <kind_of_custom_resource> <name_of_custom_resource>

    The type condition changes to ReconciliationPaused at the lastTransitionTime.

    Example custom resource with a paused reconciliation condition type

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      annotations:
        strimzi.io/pause-reconciliation: "true"
        strimzi.io/use-connector-resources: "true"
      creationTimestamp: 2021-03-12T10:47:11Z
      #...
    spec:
      # ...
    status:
      conditions:
      - lastTransitionTime: 2021-03-12T10:47:41.689249Z
        status: "True"
        type: ReconciliationPaused

Resuming from pause

  • To resume reconciliation, you can set the annotation to false, or remove the annotation.

The default Cluster Operator configuration enables leader election to run multiple parallel replicas of the Cluster Operator. One replica is elected as the active leader and operates the deployed resources. The other replicas run in standby mode. When the leader stops or fails, one of the standby replicas is elected as the new leader and starts operating the deployed resources.

By default, Streams for Apache Kafka runs with a single Cluster Operator replica that is always the leader replica. When a single Cluster Operator replica stops or fails, OpenShift starts a new replica.

Running the Cluster Operator with multiple replicas is not essential. But it’s useful to have replicas on standby in case of large-scale disruptions caused by major failure. For example, suppose multiple worker nodes or an entire availability zone fails. This failure might cause the Cluster Operator pod and many Kafka pods to go down at the same time. If subsequent pod scheduling causes congestion through lack of resources, this can delay operations when running a single Cluster Operator.

Configure leader election environment variables when running additional Cluster Operator replicas. The following environment variables are supported:

STRIMZI_LEADER_ELECTION_ENABLED
Optional, disabled (false) by default. Enables or disables leader election, which allows additional Cluster Operator replicas to run on standby.
Note

Leader election is disabled by default. It is only enabled when applying this environment variable on installation.

STRIMZI_LEADER_ELECTION_LEASE_NAME
Required when leader election is enabled. The name of the OpenShift Lease resource that is used for the leader election.
STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE

Required when leader election is enabled. The namespace where the OpenShift Lease resource used for leader election is created. You can use the downward API to configure it to the namespace where the Cluster Operator is deployed.

env:
  - name: STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace
STRIMZI_LEADER_ELECTION_IDENTITY

Required when leader election is enabled. Configures the identity of a given Cluster Operator instance used during the leader election. The identity must be unique for each operator instance. You can use the downward API to configure it to the name of the pod where the Cluster Operator is deployed.

env:
  - name: STRIMZI_LEADER_ELECTION_IDENTITY
    valueFrom:
      fieldRef:
        fieldPath: metadata.name
STRIMZI_LEADER_ELECTION_LEASE_DURATION_MS
Optional, default 15000 ms. Specifies the duration the acquired lease is valid.
STRIMZI_LEADER_ELECTION_RENEW_DEADLINE_MS
Optional, default 10000 ms. Specifies the period the leader should try to maintain leadership.
STRIMZI_LEADER_ELECTION_RETRY_PERIOD_MS
Optional, default 2000 ms. Specifies the frequency of updates to the lease lock by the leader.

10.6.4.2. Configuring Cluster Operator replicas

To run additional Cluster Operator replicas in standby mode, you will need to increase the number of replicas and enable leader election. To configure leader election, use the leader election environment variables.

To make the required changes, configure the following Cluster Operator installation files located in install/cluster-operator/:

  • 060-Deployment-strimzi-cluster-operator.yaml
  • 022-ClusterRole-strimzi-cluster-operator-role.yaml
  • 022-RoleBinding-strimzi-cluster-operator.yaml

Leader election has its own ClusterRole and RoleBinding RBAC resources that target the namespace where the Cluster Operator is running, rather than the namespace it is watching.

The default deployment configuration creates a Lease resource called strimzi-cluster-operator in the same namespace as the Cluster Operator. The Cluster Operator uses leases to manage leader election. The RBAC resources provide the permissions to use the Lease resource. If you use a different Lease name or namespace, update the ClusterRole and RoleBinding files accordingly.

Prerequisites

  • You need an account with permission to create and manage CustomResourceDefinition and RBAC (ClusterRole, and RoleBinding) resources.

Procedure

Edit the Deployment resource that is used to deploy the Cluster Operator, which is defined in the 060-Deployment-strimzi-cluster-operator.yaml file.

  1. Change the replicas property from the default (1) to a value that matches the required number of replicas.

    Increasing the number of Cluster Operator replicas

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: strimzi-cluster-operator
      labels:
        app: strimzi
    spec:
      replicas: 3

  2. Check that the leader election env properties are set.

    If they are not set, configure them.

    To enable leader election, STRIMZI_LEADER_ELECTION_ENABLED must be set to true (default).

    In this example, the name of the lease is changed to my-strimzi-cluster-operator.

    Configuring leader election environment variables for the Cluster Operator

    # ...
    spec
      containers:
        - name: strimzi-cluster-operator
          # ...
          env:
            - name: STRIMZI_LEADER_ELECTION_ENABLED
              value: "true"
            - name: STRIMZI_LEADER_ELECTION_LEASE_NAME
              value: "my-strimzi-cluster-operator"
            - name: STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE
                valueFrom:
                  fieldRef:
                    fieldPath: metadata.namespace
            - name: STRIMZI_LEADER_ELECTION_IDENTITY
                valueFrom:
                  fieldRef:
                    fieldPath: metadata.name

    For a description of the available environment variables, see Section 10.6.4.1, “Enabling leader election for Cluster Operator replicas”.

    If you specified a different name or namespace for the Lease resource used in leader election, update the RBAC resources.

  3. (optional) Edit the ClusterRole resource in the 022-ClusterRole-strimzi-cluster-operator-role.yaml file.

    Update resourceNames with the name of the Lease resource.

    Updating the ClusterRole references to the lease

    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRole
    metadata:
      name: strimzi-cluster-operator-leader-election
      labels:
        app: strimzi
    rules:
      - apiGroups:
          - coordination.k8s.io
        resourceNames:
          - my-strimzi-cluster-operator
    # ...

  4. (optional) Edit the RoleBinding resource in the 022-RoleBinding-strimzi-cluster-operator.yaml file.

    Update subjects.name and subjects.namespace with the name of the Lease resource and the namespace where it was created.

    Updating the RoleBinding references to the lease

    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: strimzi-cluster-operator-leader-election
      labels:
        app: strimzi
    subjects:
      - kind: ServiceAccount
        name: my-strimzi-cluster-operator
        namespace: myproject
    # ...

  5. Deploy the Cluster Operator:

    oc create -f install/cluster-operator -n myproject
  6. Check the status of the deployment:

    oc get deployments -n myproject

    Output shows the deployment name and readiness

    NAME                      READY  UP-TO-DATE  AVAILABLE
    strimzi-cluster-operator  3/3    3           3

    READY shows the number of replicas that are ready/expected. The deployment is successful when the AVAILABLE output shows the correct number of replicas.

10.6.5. Configuring Cluster Operator HTTP proxy settings

If you are running a Kafka cluster behind a HTTP proxy, you can still pass data in and out of the cluster. For example, you can run Kafka Connect with connectors that push and pull data from outside the proxy. Or you can use a proxy to connect with an authorization server.

Configure the Cluster Operator deployment to specify the proxy environment variables. The Cluster Operator accepts standard proxy configuration (HTTP_PROXY, HTTPS_PROXY and NO_PROXY) as environment variables. The proxy settings are applied to all Streams for Apache Kafka containers.

The format for a proxy address is http://<ip_address>:<port_number>. To set up a proxy with a name and password, the format is http://<username>:<password>@<ip-address>:<port_number>.

Prerequisites

  • You need an account with permission to create and manage CustomResourceDefinition and RBAC (ClusterRole, and RoleBinding) resources.

Procedure

  1. To add proxy environment variables to the Cluster Operator, update its Deployment configuration (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml).

    Example proxy configuration for the Cluster Operator

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
            # ...
            env:
            # ...
            - name: "HTTP_PROXY"
              value: "http://proxy.com" 
    1
    
            - name: "HTTPS_PROXY"
              value: "https://proxy.com" 
    2
    
            - name: "NO_PROXY"
              value: "internal.com, other.domain.com" 
    3
    
      # ...

    1
    Address of the proxy server.
    2
    Secure address of the proxy server.
    3
    Addresses for servers that are accessed directly as exceptions to the proxy server. The URLs are comma-separated.

    Alternatively, edit the Deployment directly:

    oc edit deployment strimzi-cluster-operator
  2. If you updated the YAML file instead of editing the Deployment directly, apply the changes:

    oc create -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml

Streams for Apache Kafka automatically switches to FIPS mode when running on a FIPS-enabled OpenShift cluster. Disable FIPS mode by setting the FIPS_MODE environment variable to disabled in the deployment configuration for the Cluster Operator. With FIPS mode disabled, Streams for Apache Kafka automatically disables FIPS in the OpenJDK for all components. With FIPS mode disabled, Streams for Apache Kafka is not FIPS compliant. The Streams for Apache Kafka operators, as well as all operands, run in the same way as if they were running on an OpenShift cluster without FIPS enabled.

Procedure

  1. To disable the FIPS mode in the Cluster Operator, update its Deployment configuration (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml) and add the FIPS_MODE environment variable.

    Example FIPS configuration for the Cluster Operator

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
            # ...
            env:
            # ...
            - name: "FIPS_MODE"
              value: "disabled" 
    1
    
      # ...

    1
    Disables the FIPS mode.

    Alternatively, edit the Deployment directly:

    oc edit deployment strimzi-cluster-operator
  2. If you updated the YAML file instead of editing the Deployment directly, apply the changes:

    oc apply -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml

10.7. Configuring Kafka Connect

Update the spec properties of the KafkaConnect custom resource to configure your Kafka Connect deployment.

Use Kafka Connect to set up external data connections to your Kafka cluster. Use the properties of the KafkaConnect resource to configure your Kafka Connect deployment.

You can also use the KafkaConnect resource to specify the following:

  • Connector plugin configuration to build a container image that includes the plugins to make connections
  • Configuration for the Kafka Connect worker pods that run connectors
  • An annotation to enable use of the KafkaConnector resource to manage connectors

The Cluster Operator manages Kafka Connect clusters deployed using the KafkaConnect resource and connectors created using the KafkaConnector resource.

For a deeper understanding of the Kafka Connect cluster configuration options, refer to the Streams for Apache Kafka Custom Resource API Reference.

Handling high volumes of messages

You can tune the configuration to handle high volumes of messages. For more information, see Handling high volumes of messages^.

Example KafkaConnect custom resource configuration

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect 
1

metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true" 
2

# Deployment specifications
spec:
  # Replicas (required)
  replicas: 3 
3

  # Bootstrap servers (required)
  bootstrapServers: my-cluster-kafka-bootstrap:9092 
4

  # Kafka Connect configuration (recommended)
  config: 
5

    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  # Resources requests and limits (recommended)
  resources: 
6

    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # Authentication (optional)
  authentication: 
7

    type: tls
    certificateAndKey:
      certificate: source.crt
      key: source.key
      secretName: my-user-source
  # TLS configuration (optional)
  tls: 
8

    trustedCertificates:
      - secretName: my-cluster-cluster-cert
        pattern: "*.crt"
      - secretName: my-cluster-cluster-cert
        pattern: "*.crt"
  # Build configuration (optional)
  build: 
9

    output: 
10

      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 
11

      - name: connector-1
        artifacts:
          - type: tgz
            url: <url_to_download_connector_1_artifact>
            sha512sum: <SHA-512_checksum_of_connector_1_artifact>
      - name: connector-2
        artifacts:
          - type: jar
            url: <url_to_download_connector_2_artifact>
            sha512sum: <SHA-512_checksum_of_connector_2_artifact>
  # Logging configuration (optional)
  logging: 
12

    type: inline
    loggers:
      # Kafka 4.0+ uses Log4j2
      rootLogger.level: INFO
  # Readiness probe (optional)
  readinessProbe: 
13

    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Liveness probe (optional)
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Metrics configuration (optional)
  metricsConfig: 
14

    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: my-config-map
        key: my-key
  # JVM options (optional)
  jvmOptions: 
15

    "-Xmx": "1g"
    "-Xms": "1g"
  # Custom image (optional)
  image: my-org/my-image:latest 
16

  # Rack awareness (optional)
  rack:
    topologyKey: topology.kubernetes.io/zone 
17

  # Pod and container template (optional)
  template: 
18

    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: 
19

      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: aws-creds
              key: awsAccessKey
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: aws-creds
              key: awsSecretAccessKey
  # Tracing configuration (optional)
  tracing:
    type: opentelemetry 
20

1
Use KafkaConnect.
2
Enables the use of KafkaConnector resources to start, stop, and manage connector instances.
3
The number of replica nodes for the workers that run tasks.
4
Bootstrap address for connection to the Kafka cluster. The address takes the format <cluster_name>-kafka-bootstrap:<port_number>. The Kafka cluster doesn’t need to be managed by Streams for Apache Kafka or deployed to an OpenShift cluster.
5
Kafka Connect configuration of workers (not connectors) that run connectors and their tasks. Standard Apache Kafka configuration may be provided, restricted to those properties not managed directly by Streams for Apache Kafka. In this example, JSON convertors are specified. A replication factor of 3 is set for the internal topics used by Kafka Connect (minimum requirement for production environment). Changing the replication factor after the topics have been created has no effect.
6
Requests for reservation of supported resources, currently cpu and memory, and limits to specify the maximum resources that can be consumed.
7
Authentication for the Kafka Connect cluster, specified as tls, scram-sha-256, scram-sha-512, plain, or oauth. By default, Kafka Connect connects to Kafka brokers using a plaintext connection. For details on configuring authentication, see the KafkaConnectSpec schema properties.
8
TLS configuration for encrypted connections to the Kafka cluster, with trusted certificates stored in X.509 format within the specified secrets.
9
Build configuration properties for building a container image with connector plugins automatically.
10
(Required) Configuration of the container registry where new images are pushed.
11
(Required) List of connector plugins and their artifacts to add to the new container image. Each plugin must be configured with at least one artifact.
12
Kafka Connect loggers and log levels added directly (inline) or indirectly (external) through a ConfigMap. Custom Log4j configuration must be placed under the log4j2.properties key in the ConfigMap. You can set log levels to INFO, ERROR, WARN, TRACE, DEBUG, FATAL or OFF.
13
Healthchecks to know when to restart a container (liveness) and when a container can accept traffic (readiness).
14
Prometheus metrics, which are enabled by referencing a ConfigMap containing configuration for the Prometheus JMX exporter in this example. You can enable metrics without further configuration using a reference to a ConfigMap containing an empty file under metricsConfig.valueFrom.configMapKeyRef.key.
15
JVM configuration options to optimize performance for the Virtual Machine (VM) running Kafka Connect.
16
ADVANCED OPTION: Container image configuration, which is recommended only in special situations.
17
SPECIALIZED OPTION: Rack awareness configuration for the deployment. This is a specialized option intended for a deployment within the same location, not across regions. Use this option if you want connectors to consume from the closest replica rather than the leader replica. In certain cases, consuming from the closest replica can improve network utilization or reduce costs . The topologyKey must match a node label containing the rack ID. The example used in this configuration specifies a zone using the standard topology.kubernetes.io/zone label. To consume from the closest replica, enable the RackAwareReplicaSelector in the Kafka broker configuration.
18
Template customization. Here a pod is scheduled with anti-affinity, so the pod is not scheduled on nodes with the same hostname.
19
Environment variables are set for distributed tracing and to pass credentials to connectors.
20
Distributed tracing is enabled by using OpenTelemetry.

10.7.1. Configuring Kafka Connect for multiple instances

By default, Streams for Apache Kafka configures the group ID and names of the internal topics used by Kafka Connect. When running multiple instances of Kafka Connect, you must change these default settings using the following config properties:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  config:
    group.id: my-connect-cluster 
1

    offset.storage.topic: my-connect-cluster-offsets 
2

    config.storage.topic: my-connect-cluster-configs 
3

    status.storage.topic: my-connect-cluster-status 
4

    # ...
  # ...
1
The Kafka Connect cluster group ID within Kafka.
2
Kafka topic that stores connector offsets.
3
Kafka topic that stores connector and task status configurations.
4
Kafka topic that stores connector and task status updates.
Note

Values for the three topics must be the same for all instances with the same group.id.

Unless you modify these default settings, each instance connecting to the same Kafka cluster is deployed with the same values. In practice, this means all instances form a cluster and use the same internal topics.

Multiple instances attempting to use the same internal topics will cause unexpected errors, so you must change the values of these properties for each instance.

10.7.2. Configuring Kafka Connect user authorization

When using authorization in Kafka, a Kafka Connect user requires read/write access to the cluster group and internal topics of Kafka Connect. This procedure outlines how access is granted using simple authorization and ACLs.

Properties for the Kafka Connect cluster group ID and internal topics are configured by Streams for Apache Kafka by default. Alternatively, you can define them explicitly in the spec of the KafkaConnect resource. This is useful when configuring Kafka Connect for multiple instances, as the values for the group ID and topics must differ when running multiple Kafka Connect instances.

Simple authorization uses ACL rules managed by the Kafka StandardAuthorizer plugin to ensure appropriate access levels. For more information on configuring a KafkaUser resource to use simple authorization, see the AclRule schema reference.

Prerequisites

  • An OpenShift cluster
  • A running Cluster Operator

Procedure

  1. Edit the authorization property in the KafkaUser resource to provide access rights to the user.

    Access rights are configured for the Kafka Connect topics and cluster group using literal name values. The following table shows the default names configured for the topics and cluster group ID.

    Expand
    Table 10.1. Names for the access rights configuration
    PropertyName

    offset.storage.topic

    connect-cluster-offsets

    status.storage.topic

    connect-cluster-status

    config.storage.topic

    connect-cluster-configs

    group

    connect-cluster

    In this example configuration, the default names are used to specify access rights. If you are using different names for a Kafka Connect instance, use those names in the ACLs configuration.

    Example configuration for simple authorization

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      # ...
      authorization:
        type: simple
        acls:
          # access to offset.storage.topic
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to status.storage.topic
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to config.storage.topic
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # cluster group
          - resource:
              type: group
              name: connect-cluster
              patternType: literal
            operations:
              - Read
            host: "*"

  2. Create or update the resource.

    oc apply -f KAFKA-USER-CONFIG-FILE

10.8. Configuring Kafka Connect connectors

The KafkaConnector resource provides an OpenShift-native approach to management of connectors by the Cluster Operator. To create, delete, or reconfigure connectors with KafkaConnector resources, you must set the use-connector-resources annotation to true in your KafkaConnect custom resource.

Annotation to enable KafkaConnectors

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...

When the use-connector-resources annotation is enabled in your KafkaConnect configuration, you must define and manage connectors using KafkaConnector resources.

Note

Alternatively, you can manage connectors using the Kafka Connect REST API instead of KafkaConnector resources. To use the API, you must remove the strimzi.io/use-connector-resources annotation to use KafkaConnector resources in the KafkaConnect the resource.

KafkaConnector resources provide the configuration needed to create connectors within a Kafka Connect cluster, which interacts with a Kafka cluster as specified in the KafkaConnect configuration. The Kafka cluster does not need to be managed by Streams for Apache Kafka or deployed to an OpenShift cluster.

Kafka components contained in the same OpenShift cluster

Kafka and Kafka Connect clusters

The configuration also specifies how the connector instances interact with external data systems, including any required authentication methods. Additionally, you must define the data to watch. For example, in a source connector that reads data from a database, the configuration might include the database name. You can also define where this data should be placed in Kafka by specifying the target topic name.

Use the tasksMax property to specify the maximum number of tasks. For instance, a source connector with tasksMax: 2 might split the import of source data into two tasks.

Example source connector configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  
1

  labels:
    strimzi.io/cluster: my-connect-cluster 
2

spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 
3

  tasksMax: 2 
4

  autoRestart: 
5

    enabled: true
  config: 
6

    file: "/opt/kafka/LICENSE" 
7

    topic: my-topic 
8

    # ...

1
Name of the KafkaConnector resource, which is used as the name of the connector. Use any name that is valid for an OpenShift resource.
2
Name of the Kafka Connect cluster to create the connector instance in. Connectors must be deployed to the same namespace as the Kafka Connect cluster they link to.
3
Full name of the connector class. This should be present in the image being used by the Kafka Connect cluster.
4
Maximum number of Kafka Connect tasks that the connector can create.
5
Enables automatic restarts of failed connectors and tasks. By default, the number of restarts is indefinite, but you can set a maximum on the number of automatic restarts using the maxRestarts property.
6
Connector configuration as key-value pairs.
7
Location of the external data file. In this example, we’re configuring the FileStreamSourceConnector to read from the /opt/kafka/LICENSE file.
8
Kafka topic to publish the source data to.

To include external connector configurations, such as user access credentials stored in a secret, use the template property of the KafkaConnect resource. You can also load values using configuration providers.

If you are using KafkaConnector resources to configure connectors, use the state configuration to either stop or pause a connector. In contrast to the paused state, where the connector and tasks remain instantiated, stopping a connector retains only the configuration, with no active processes. Stopping a connector from running may be more suitable for longer durations than just pausing. While a paused connector is quicker to resume, a stopped connector has the advantages of freeing up memory and resources.

Note

The state configuration replaces the (deprecated) pause configuration in the KafkaConnectorSpec schema, which allows pauses on connectors. If you were previously using the pause configuration to pause connectors, we encourage you to transition to using the state configuration only to avoid conflicts.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Find the name of the KafkaConnector custom resource that controls the connector you want to pause or stop:

    oc get KafkaConnector
  2. Edit the KafkaConnector resource to stop or pause the connector.

    Example configuration for stopping a Kafka Connect connector

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector
      tasksMax: 2
      config:
        file: "/opt/kafka/LICENSE"
        topic: my-topic
      state: stopped
      # ...

    Change the state configuration to stopped or paused. The default state for the connector when this property is not set is running.

  3. Apply the changes to the KafkaConnector configuration.

    You can resume the connector by changing state to running or removing the configuration.

Note

Alternatively, you can expose the Kafka Connect API and use the stop and pause endpoints to stop a connector from running. For example, PUT /connectors/<connector_name>/stop. You can then use the resume endpoint to restart it.

10.8.2. Manually restarting Kafka Connect connectors

If you are using KafkaConnector resources to manage connectors, use the strimzi.io/restart annotation to manually trigger a restart of a connector.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Find the name of the KafkaConnector custom resource that controls the Kafka connector you want to restart:

    oc get KafkaConnector
  2. Restart the connector by annotating the KafkaConnector resource in OpenShift:

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"

    The restart annotation is set to true.

    You can also refine the restart behavior with the includeTasks and onlyFailed parameters, which both default to false.

    • includeTasks restarts both the connector instance and its task instances.
    • onlyFailed restarts only instances with a FAILED status when set to true.

    For example:

    strimzi.io/restart="includeTasks,onlyFailed"
  3. Wait for the next reconciliation to occur (every two minutes by default).

    The Kafka connector is restarted, as long as the annotation was detected by the reconciliation process. When Kafka Connect accepts the restart request, the annotation is removed from the KafkaConnector custom resource.

10.8.3. Manually restarting Kafka Connect connector tasks

If you are using KafkaConnector resources to manage connectors, use the strimzi.io/restart-task annotation to manually trigger a restart of a connector task.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Find the name of the KafkaConnector custom resource that controls the Kafka connector task you want to restart:

    oc get KafkaConnector
  2. Find the ID of the task to be restarted from the KafkaConnector custom resource:

    oc describe KafkaConnector <kafka_connector_name>

    Task IDs are non-negative integers, starting from 0.

  3. Use the ID to restart the connector task by annotating the KafkaConnector resource in OpenShift:

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"

    In this example, task 0 is restarted.

  4. Wait for the next reconciliation to occur (every two minutes by default).

    The Kafka connector task is restarted, as long as the annotation was detected by the reconciliation process. When Kafka Connect accepts the restart request, the annotation is removed from the KafkaConnector custom resource.

10.8.4. Listing connector offsets

To track connector offsets using KafkaConnector resources, add the listOffsets configuration. The offsets, which keep track of the flow of data, are written to a config map specified in the configuration. If the config map does not exist, Streams for Apache Kafka creates it.

After the configuration is in place, annotate the KafkaConnector resource to write the list to the config map.

Sink connectors use Kafka’s standard consumer offset mechanism, while source connectors store offsets in a custom format within a Kafka topic.

  • For sink connectors, the list shows Kafka topic partitions and the last committed offset for each partition.
  • For source connectors, the list shows the source system’s partition and the last offset processed.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Edit the KafkaConnector resource for the connector to include the listOffsets configuration.

    Example configuration to list offsets

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      listOffsets:
        toConfigMap: 
    1
    
          name: my-connector-offsets 
    2
    
      # ...

    1
    The reference to the config map where the list of offsets will be written to.
    2
    The name of the config map, which is named my-connector-offsets in this example.
  2. Run the command to write the list to the config map by annotating the KafkaConnector resource:

    oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n <namespace>

    The annotation remains until either the list operation succeeds or it is manually removed from the resource.

  3. After the KafkaConnector resource is updated, use the following command to check if the config map with the offsets was created:

    oc get configmap my-connector-offsets -n <namespace>
  4. Inspect the contents of the config map to verify the offsets are being listed:

    oc describe configmap my-connector-offsets -n <namespace>

    Streams for Apache Kafka puts the offset information into the offsets.json property. This does not overwrite any other properties when updating an existing config map.

    Example source connector offset list

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
      ownerReferences: 
    1
    
      - apiVersion: kafka.strimzi.io/v1beta2
        blockOwnerDeletion: false
        controller: false
        kind: KafkaConnector
        name: my-source-connector
        uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
      resourceVersion: "66951"
      uid: 641d60a9-36eb-4f29-9895-8f2c1eb9638e
    data:
      offsets.json: |-
        {
          "offsets" : [ {
            "partition" : {
              "filename" : "/data/myfile.txt" 
    2
    
            },
            "offset" : {
              "position" : 15295 
    3
    
            }
          } ]
        }

    1
    The owner reference pointing to the KafkaConnector resource for the source connector. To provide a custom owner reference, create the config map in advance and set the owner reference.
    2
    The source partition, represented by the filename /data/myfile.txt in this example for a file-based connector.
    3
    The last processed offset position in the source partition.

    Example sink connector offset list

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
      ownerReferences: 
    1
    
      - apiVersion: kafka.strimzi.io/v1beta2
        blockOwnerDeletion: false
        controller: false
        kind: KafkaConnector
        name: my-sink-connector
        uid: 84a29d7f-77e6-43ac-bfbb-719f9b9a4b3b
      resourceVersion: "79241"
      uid: 721e30bc-23df-41a2-9b48-fb2b7d9b042c
    data:
      offsets.json: |-
        {
          "offsets": [
            {
              "partition": {
                "kafka_topic": "my-topic", 
    2
    
                "kafka_partition": 2 
    3
    
              },
              "offset": {
                "kafka_offset": 4 
    4
    
              }
            }
          ]
        }

    1
    The owner reference pointing to the KafkaConnector resource for the sink connector.
    2
    The Kafka topic that the sink connector is consuming from.
    3
    The partition of the Kafka topic.
    4
    The last committed Kafka offset for this topic and partition.

10.8.5. Altering connector offsets

To alter connector offsets using KafkaConnector resources, configure the resource to stop the connector and add alterOffsets configuration to specify the offset changes in a config map. You can reuse the same config map used to list offsets.

After the connector is stopped and the configuration is in place, annotate the KafkaConnector resource to apply the offset alteration, then restart the connector.

Altering connector offsets can be useful, for example, to skip a poison record or replay a record.

In this procedure, we alter the offset position for a source connector named my-source-connector.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Edit the KafkaConnector resource to stop the connector and include the alterOffsets configuration.

    Example configuration to stop a connector and alter offsets

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      state: stopped 
    1
    
      alterOffsets:
        fromConfigMap: 
    2
    
          name: my-connector-offsets 
    3
    
      # ...

    1
    Changes the state of the connector to stopped. The default state for the connector when this property is not set is running.
    2
    The reference to the config map that provides the update.
    3
    The name of the config map, which is named my-connector-offsets in this example.
  2. Edit the config map to make the alteration.

    In this example, we’re resetting the offset position for a source connector to 15000.

    Example source connector offset list configuration

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
    data:
      offsets.json: |- 
    1
    
        {
          "offsets" : [ {
            "partition" : {
              "filename" : "/data/myfile.txt"
            },
            "offset" : {
              "position" : 15000 
    2
    
            }
          } ]
        }

    1
    Edits must be made within the offsets.json property.
    2
    The updated offset position in the source partition.
  3. Run the command to update the offset position by annotating the KafkaConnector resource:

    oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n <namespace>

    The annotation remains until either the update operation succeeds or it is manually removed from the resource.

  4. Check the changes by using the procedure to list connector offsets.
  5. Restart the connector by changing the state to running.

    Example configuration to start a connector

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      state: running
      # ...

10.8.6. Resetting connector offsets

To reset connector offsets using KafkaConnector resources, configure the resource to stop the connector.

After the connector is stopped, annotate the KafkaConnector resource to clear the offsets, then restart the connector.

In this procedure, we reset the offset position for a source connector named my-source-connector.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Edit the KafkaConnector resource to stop the connector.

    Example configuration to stop a connector

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      # ...
      state: stopped 
    1
    
      # ...

    1
    Changes the state of the connector to stopped. The default state for the connector when this property is not set is running.
  2. Run the command to reset the offset position by annotating the KafkaConnector resource:

    oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=reset -n <namespace>

    The annotation remains until either the reset operation succeeds or it is manually removed from the resource.

  3. Check the changes by using the procedure to list connector offsets.

    After resetting, the offsets.json property is empty.

    Example source connector offset list

    apiVersion: v1
    kind: ConfigMap
    metadata:
      # ...
    data:
      offsets.json: |-
        {
          "offsets" : []
        }

  4. Restart the connector by changing the state to running.

    Example configuration to start a connector

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      state: running
      # ...

10.9. Configuring MirrorMaker 2

Configure your MirrorMaker 2 deployment by updating the spec properties of the KafkaMirrorMaker2 custom resource.

Start with a minimal configuration that defines the core requirements. Extend it with optional settings to support the following:

  • Securing connections (TLS and authentication)
  • Managing of replicated topic names
  • Synchronizing consumer group offsets
  • Synchronizing ACL rules
  • Tuning Kafka Connect worker settings
  • common configuration
    Common configuration includes setting resource limits and requests (recommended), JVM tuning, and metrics.

Certain settings, though optional, are recommended for production deployments, such as security and resource allocation.

For details of all configuration options, see the Streams for Apache Kafka Custom Resource API Reference.

10.9.1. Minimal configuration for MirrorMaker 2

A minimal KafkaMirrorMaker2 resource requires the following in its spec:

  • clusters: The connection details for source and target Kafka clusters. Each requires a unique alias.
  • connectCluster: The alias of the target cluster. MirrorMaker 2 runs on the Kafka Connect framework, and its configuration and status are stored on this cluster.
  • replicas: The number of Kafka Connect worker pods to deploy.
  • mirrors: The replication flow from a source cluster to a target cluster.

By default, MirrorMaker 2 replicates all topics and consumer groups. To reduce load on the cluster and avoid replicating unnecessary data, specify which topics and groups to include using filters:

  • topicsPattern: A regex to select topics for replication.
  • groupsPattern: A regex to select consumer groups for offset synchronization.

Minimal configuration for MirrorMaker 2

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # Kafka version (recommended)
  version: 4.1.0
  # Replicas (required)
  replicas: 3 
1

  # Connect cluster name (required)
  connectCluster: "my-cluster-target" 
2

  # Cluster configurations (required)
  clusters:
  - alias: "my-cluster-source" 
3

    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  # Mirroring configurations (required)
  mirrors: 
4

  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    # Source connector configuration (required) 
5

    sourceConnector: {}
    # Topic and group patterns (required)
      topicsPattern: "topic1.|topic2." 
6

      groupsPattern: "group1.*|group2-[0-9]{2}" 
7

1
Number of Kafka Connect worker replicas to run.
2
Alias of the Kafka cluster used by Kafka Connect to store internal topics. This must be the alias of the target cluster.
3
Each Kafka cluster (source and target) must define a unique alias and a bootstrap address. The bootstrap address takes the format <cluster_name>-kafka-bootstrap:<port>.
4
Defines replication between the source and target clusters.
5
Configuration for the source connector that replicates topics. At a minimum, you can add an empty sourceConnector property to use the default configuration.
6
Replicates only topics starting with topic1 or topic2.
7
Replicates consumer groups starting with group1 or group2 followed by two digits.
Note

The Kafka clusters do not need to be managed by Streams for Apache Kafka or run on OpenShift.

10.9.2. Securing MirrorMaker 2 connections

This procedure describes in outline the configuration required to secure MirrorMaker 2 connections.

You configure these settings independently for the source Kafka cluster and the target Kafka cluster. You also need separate user configuration to provide the credentials required for MirrorMaker to connect to the source and target Kafka clusters.

For the Kafka clusters, you specify internal listeners for secure connections within an OpenShift cluster and external listeners for connections outside the OpenShift cluster.

You can configure authentication and authorization mechanisms. The security options implemented for the source and target Kafka clusters must be compatible with the security options implemented for MirrorMaker 2.

After you have created the cluster and user authentication credentials, you specify them in your MirrorMaker configuration for secure connections.

Note

In this procedure, the certificates generated by the Cluster Operator are used, but you can replace them by installing your own certificates. You can also configure your listener to use a Kafka listener certificate managed by an external CA (certificate authority).

Before you start

Before starting this procedure, take a look at the example configuration files provided by Streams for Apache Kafka. They include examples for securing a deployment of MirrorMaker 2 using mTLS or SCRAM-SHA-512 authentication. The examples specify internal listeners for connecting within an OpenShift cluster.

The examples also provide the configuration for full authorization, including the ACLs that allow user operations on the source and target Kafka clusters.

When configuring user access to source and target Kafka clusters, ACLs must grant access rights to internal MirrorMaker 2 connectors and read/write access to the cluster group and internal topics used by the underlying Kafka Connect framework in the target cluster. If you’ve renamed the cluster group or internal topics, such as when configuring MirrorMaker 2 for multiple instances, use those names in the ACLs configuration.

Simple authorization uses ACL rules managed by the Kafka StandardAuthorizer plugin to ensure appropriate access levels. For more information on configuring a KafkaUser resource to use simple authorization, see the AclRule schema reference.

Prerequisites

  • Streams for Apache Kafka is running
  • Separate namespaces for source and target clusters

The procedure assumes that the source and target Kafka clusters are installed to separate namespaces. If you want to use the Topic Operator, you’ll need to do this. The Topic Operator only watches a single cluster in a specified namespace.

By separating the clusters into namespaces, you will need to copy the cluster secrets so they can be accessed outside the namespace. You need to reference the secrets in the MirrorMaker configuration.

Procedure

  1. Configure two Kafka resources, one to secure the source Kafka cluster and one to secure the target Kafka cluster.

    You can add listener configuration for authentication and enable authorization.

    In this example, an internal listener is configured for a Kafka cluster with TLS encryption and mTLS authentication. Kafka simple authorization is enabled.

    Example source Kafka cluster configuration with TLS encryption and mTLS authentication

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-source-cluster
    spec:
      kafka:
        version: 4.1.0
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
      entityOperator:
        topicOperator: {}
        userOperator: {}

    Example target Kafka cluster configuration with TLS encryption and mTLS authentication

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-target-cluster
    spec:
      kafka:
        version: 4.1.0
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
      entityOperator:
        topicOperator: {}
        userOperator: {}

  2. Create or update the Kafka resources in separate namespaces.

    oc apply -f <kafka_configuration_file> -n <namespace>

    The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.

    The certificates are created in the secret <cluster_name>-cluster-ca-cert.

  3. Configure two KafkaUser resources, one for a user of the source Kafka cluster and one for a user of the target Kafka cluster.

    1. Configure the same authentication and authorization types as the corresponding source and target Kafka cluster. For example, if you used tls authentication and the simple authorization type in the Kafka configuration for the source Kafka cluster, use the same in the KafkaUser configuration.
    2. Configure the ACLs needed by MirrorMaker 2 to allow operations on the source and target Kafka clusters.

    Example source user configuration for mTLS authentication

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-source-user
      labels:
        strimzi.io/cluster: my-source-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # MirrorSourceConnector
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Create
              - DescribeConfigs
              - Read
              - Write
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - DescribeConfigs
              - Read
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource: # Needed for every group for which offsets are synced
              type: group
              name: "*"
            operations:
              - Describe
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Read

    Example target user configuration for mTLS authentication

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-target-user
      labels:
        strimzi.io/cluster: my-target-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # cluster group
          - resource:
              type: group
              name: mirrormaker2-cluster
            operations:
              - Read
          # access to config.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # access to status.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # access to offset.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # MirrorSourceConnector
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - Create
              - Alter
              - AlterConfigs
              - Write
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource:
              type: topic
              name: my-source-cluster.checkpoints.internal
            operations:
              - Create
              - Describe
              - Read
              - Write
          - resource: # Needed for every group for which the offset is synced
              type: group
              name: "*"
            operations:
              - Read
              - Describe
          # MirrorHeartbeatConnector
          - resource:
              type: topic
              name: heartbeats
            operations:
              - Create
              - Describe
              - Write

    Note

    You can use a certificate issued outside the User Operator by setting type to tls-external. For more information, see the KafkaUserSpec schema reference.

  4. Create or update a KafkaUser resource in each of the namespaces you created for the source and target Kafka clusters.

    oc apply -f <kafka_user_configuration_file> -n <namespace>

    The User Operator creates the users representing the client (MirrorMaker), and the security credentials used for client authentication, based on the chosen authentication type.

    The User Operator creates a new secret with the same name as the KafkaUser resource. The secret contains a private and public key for mTLS authentication. The public key is contained in a user certificate, which is signed by the clients CA.

  5. Configure a KafkaMirrorMaker2 resource with the authentication details to connect to the source and target Kafka clusters.

    Example MirrorMaker 2 configuration with TLS encryption and mTLS authentication

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker-2
    spec:
      version: 4.1.0
      replicas: 1
      connectCluster: "my-target-cluster"
      clusters:
        - alias: "my-source-cluster"
          bootstrapServers: my-source-cluster-kafka-bootstrap:9093
          tls: 
    1
    
            trustedCertificates:
              - secretName: my-source-cluster-cluster-ca-cert
                pattern: "*.crt"
          authentication: 
    2
    
            type: tls
            certificateAndKey:
              secretName: my-source-user
              certificate: user.crt
              key: user.key
        - alias: "my-target-cluster"
          bootstrapServers: my-target-cluster-kafka-bootstrap:9093
          tls: 
    3
    
            trustedCertificates:
              - secretName: my-target-cluster-cluster-ca-cert
                pattern: "*.crt"
          authentication: 
    4
    
            type: tls
            certificateAndKey:
              secretName: my-target-user
              certificate: user.crt
              key: user.key
          config:
            # -1 means it will use the default replication factor configured in the broker
            config.storage.replication.factor: -1
            offset.storage.replication.factor: -1
            status.storage.replication.factor: -1
      mirrors:
        - sourceCluster: "my-source-cluster"
          targetCluster: "my-target-cluster"
          sourceConnector:
            config:
              replication.factor: 1
              offset-syncs.topic.replication.factor: -1
              sync.topic.acls.enabled: "false"
          heartbeatConnector:
            config:
              heartbeats.topic.replication.factor: 1
          checkpointConnector:
            config:
              checkpoints.topic.replication.factor: 1
              sync.group.offsets.enabled: "true"
          topicsPattern: "topic1|topic2|topic3"
          groupsPattern: "group1|group2|group3"

    1
    The TLS certificates for the source Kafka cluster. If they are in a separate namespace, copy the cluster secrets from the namespace of the Kafka cluster.
    2
    The user authentication for accessing the source Kafka cluster using the TLS mechanism. Supported authentication methods include tls, scram-sha-256, scram-sha-512, plain, and oauth.
    3
    The TLS certificates for the target Kafka cluster.
    4
    The user authentication for accessing the target Kafka cluster.
  6. Apply the changes to the KafkaMirrorMaker2 resource to the same namespace as the target Kafka cluster.

10.9.3. Configuring replicated topic naming

By default, MirrorMaker 2 renames replicated topics by prepending the source cluster’s alias. For example, a topic named topic1 from a cluster called my-cluster-source is replicated as my-cluster-source.topic1. This allows MirrorMaker 2 to detect mirroring cycles and is especially useful when deploying complex topologies or performing bidirectional replication.

You can change this behavior using the replication.policy.class property in the connector configuration. There are two built-in policies available:

  • org.apache.kafka.connect.mirror.IdentityReplicationPolicy keeps original topic names.
    This approach is suitable for unidirectional replication, migration, or failover scenarios.
  • org.apache.kafka.connect.mirror.DefaultReplicationPolicy (default) prefixes topic names.
    This is recommended for bidirectional replication. Use replication.policy.separator to specify the character that separates the cluster name from the topic name in the replicated topic.

Example configuration to keep topic names (IdentityReplicationPolicy)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
    - sourceCluster: "my-cluster-source"
      targetCluster: "my-cluster-target"
      sourceConnector:
        config:
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" 
1

      heartbeatConnector:
        config:
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      checkpointConnector:
        config:
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"

1
Replicates topics without prefixing the source cluster name.
Important

Values for replication.policy.class and replication.policy.separator must be the same across all MirrorMaker 2 connectors (sourceConnector, heartbeatConnector, and checkpointConnector).

10.9.4. Synchronizing consumer group offsets

Configure MirrorMaker 2 to synchronize consumer group offsets from the source cluster to the target cluster.

The MirrorSourceConnector and MirrorCheckpointConnector work together using internal topics to coordinate offset tracking between clusters.

offset-syncs topic
Format: mm2-offset-syncs<separator><cluster_alias><separator>internal
Populated by the MirrorSourceConnector, this topic stores offset mappings between source and target clusters. By default, it’s created in the source cluster.
checkpoints topic
Format: <cluster_alias><separator>checkpoints<separator>internal
Populated by the MirrorCheckpointConnector in the target cluster, this topic captures the last committed offsets for each consumer group.

Configuring the MirrorCheckpointConnector to emit periodic offset checkpoints enables:

  • Active/passive offset synchronization
  • Failover recovery as consumers switch to the target cluster at the correct position

Offset synchronization occurs at regular intervals. The MirrorCheckpointConnector emits checkpoints for all consumer groups. However, applying those checkpoints in the target cluster requires the target group to be inactive. If consumers switch to the target cluster between checkpoints, they might reprocess some messages because the last synchronized offset lags behind the source. This duplication is expected.

Note

The connector continues to emit checkpoints to the checkpoints topic even if the target consumer group is active. However, the offsets cannot be committed while the group has active members. In this case, the connector logs a warning and the offsets are not synchronized.

Example configuration to enable offset synchronization

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
  # ...
  mirrors:
    - sourceCluster: "my-cluster-source"
      targetCluster: "my-cluster-target"
      sourceConnector:
        config:
          offset-syncs.topic.replication.factor: -1 
1

          refresh.topics.interval.seconds: 60 
2

      checkpointConnector:
        config:
          sync.group.offsets.enabled: true 
3

          sync.group.offsets.interval.seconds: 60 
4

          emit.checkpoints.interval.seconds: 60 
5

          refresh.groups.interval.seconds: 600 
6

          checkpoints.topic.replication.factor: -1 
7

1
Replication factor for the offset-syncs internal topic that maps the offsets of the source and target clusters. A value of -1 uses the broker’s default replication factor, which is typically set to provide resilience (for example, 3).
2
Optional setting to change the frequency of checks for new topics.
3
Enables consumer group offset synchronization.
4
The frequency of the synchronization.
5
Adjusts the frequency of checks for offset tracking. If you change the frequency of offset synchronization, you might also need to adjust the frequency of these checks.
6
The frequency of checks for new consumer groups.
7
Replication factor for the internal checkpoints topic that stores the last committed offsets for each consumer group. A value of -1 uses the broker’s default replication factor, which is typically set to provide resilience (for example, 3).
Tip

If you have an application written in Java, you can use the RemoteClusterUtils.java utility to fetch offsets through the application. The utility fetches remote offsets for a consumer group from the checkpoints topic.

The location of the offset-syncs topic is the source cluster by default. You can use the offset-syncs.topic.location connector configuration to change this to the target cluster. You need read/write access to the cluster that contains the topic. Using the target cluster as the location of the offset-syncs topic allows you to use MirrorMaker 2 even if you have only read access to the source cluster.

10.9.4.2. Listing the offsets of connectors

To list the offset positions of the internal MirrorMaker 2 connectors, use the same configuration that’s used to manage Kafka Connect connectors. For more information on setting up the configuration and listing offsets, see Section 10.8.4, “Listing connector offsets”.

In this example, the sourceConnector configuration is updated to return the connector offset position. The offset information is written to a specified ConfigMap.

Example configuration for MirrorMaker 2 connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 4.1.0
  # ...
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      listOffsets:
        toConfigMap:
          name: my-connector-offsets
        # ...

You must apply the following annotations to the KafkaMirrorMaker2 resource be able to manage connector offsets:

  • strimzi.io/connector-offsets
  • strimzi.io/mirrormaker-connector

The strimzi.io/mirrormaker-connector annotation must be set to the name of the connector. These annotations remain until the operation succeeds or they are manually removed from the resource.

MirrorMaker 2 connectors are named using the aliases of the source and target clusters, followed by the connector type: <source_alias>-><target_alias>.<connector_type>.

In the following example, the annotations are applied for a connector named my-cluster-source->my-cluster-target.MirrorSourceConnector.

Example application of annotations for connector

oc annotate kafkamirrormaker2 my-mirror-maker-2 strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="my-cluster-source->my-cluster-target.MirrorSourceConnector" -n kafka

The offsets are listed in the specified ConfigMap. Streams for Apache Kafka puts the offset information into a .json property named after the connector. This does not overwrite any other properties when updating an existing ConfigMap.

Example source connector offset list

apiVersion: v1
kind: ConfigMap
metadata:
  # ...
  ownerReferences: 
1

  - apiVersion: kafka.strimzi.io/v1beta2
    blockOwnerDeletion: false
    controller: false
    kind: KafkaMirrorMaker2
    name: my-mirror-maker2
    uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
data:
  my-cluster-source--my-cluster-target.MirrorSourceConnector.json: |- 
2

    {
      "offsets": [
        {
          "partition": {
            "cluster": "east-kafka",
            "partition": 0,
            "topic": "mirrormaker2-cluster-configs"
          },
          "offset": {
            "offset": 0
          }
        }
      ]
    }

1
The owner reference pointing to the KafkaMirrorMaker2 resource. To provide a custom owner reference, create the ConfigMap in advance and set the owner reference.
2
The .json property uses the connector name. Since -> characters are not allowed in ConfigMap keys, -> is changed to -- in the connector name.
Note

It is possible to use configuration to alter or reset connector offsets, though this is rarely necessary.

10.9.5. Synchronizing ACL rules

MirrorMaker 2 can synchronize topic ACL rules from source to target clusters. Enable this feature by configuring the MirrorSourceConnector.

With simple authorization:

  • ACL rules apply to both source and target topics
  • Users with source topic access automatically get equivalent target topic access
Important
  • This feature is not compatible with the User Operator, so disable by setting sync.topic.acls.enabled to false.
  • When using OAuth 2.0 authorization, the setting has no effect.

Example configuration to enable offset synchronization

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
  # ...
  mirrors:
    - sourceCluster: "my-cluster-source"
      targetCluster: "my-cluster-target"
      sourceConnector:
        config:
          sync.topic.acls.enabled: "true" 
1

1
Replicates all source topic ACLs to the target cluster.

10.9.6. Tuning Kafka Connect worker settings

Adjust Kafka Connect worker behavior for MirrorMaker 2 by adding configuration to your target cluster’s config section.

Example Kafka Connect configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  # Connect cluster name (required)
  connectCluster: "my-cluster-target" 
1

  # Cluster configurations (required)
  clusters:
    - alias: "my-cluster-target"
      bootstrapServers: my-cluster-target-kafka-bootstrap:9092
      # Kafka Connect configuration (optional)
      config: 
2

        task.shutdown.graceful.timeout.ms: 30000
        scheduled.rebalance.max.delay.ms: 300000
        offset.flush.interval.ms: 10000
        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1

1
The Kafka Connect cluster alias (must match the target cluster). Used to store internal topics.
2
These properties configure the Kafka Connect workers in this cluster.

By default, Streams for Apache Kafka configures the group ID and names of the internal topics used by the Kafka Connect framework that MirrorMaker 2 runs on. When running multiple instances of MirrorMaker 2, and they share the same connectCluster value, you must change these default settings using the following config properties:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-target"
    config:
      group.id: my-connect-cluster 
1

      offset.storage.topic: my-connect-cluster-offsets 
2

      config.storage.topic: my-connect-cluster-configs 
3

      status.storage.topic: my-connect-cluster-status 
4

      # ...
    # ...
1
The Kafka Connect cluster group ID within Kafka.
2
Kafka topic that stores connector offsets.
3
Kafka topic that stores connector and task status configurations.
4
Kafka topic that stores connector and task status updates.
Note

Values for the three topics must be the same for all instances with the same group.id.

The connectCluster setting specifies the alias of the target Kafka cluster used by Kafka Connect for its internal topics. As a result, modifications to the connectCluster, group ID, and internal topic naming configuration are specific to the target Kafka cluster. You don’t need to make changes if two MirrorMaker 2 instances are using the same source Kafka cluster or in an active-active mode where each MirrorMaker 2 instance has a different connectCluster setting and target cluster.

However, if multiple MirrorMaker 2 instances share the same connectCluster, each instance connecting to the same target Kafka cluster is deployed with the same values. In practice, this means all instances form a cluster and use the same internal topics.

Multiple instances attempting to use the same internal topics will cause unexpected errors, so you must change the values of these properties for each instance.

MirrorMaker 2 can be configured for active/passive disaster recovery. To support this, the Kafka cluster should also be monitored for health and performance to detect issues that require failover promptly.

If failover occurs, which can be automated, operations switch from the active cluster to the passive cluster when the active cluster becomes unavailable. The original active cluster is typically considered permanently lost. The passive cluster is promoted to active status, taking over as the source for all application traffic. In this state, MirrorMaker 2 no longer replicates data from the original active cluster while it remains unavailable.

Failback, or restoring operations to the original active cluster, requires careful planning.

It is technically possible to reverse roles in MirrorMaker 2 by swapping the source and target clusters and deploying this configuration as a new instance. However, this approach risks data duplication, as records mirrored to the passive cluster may be mirrored back to the original active cluster. Avoiding duplicates requires resetting consumer offsets, which adds complexity. For a simpler and more reliable failback process, rebuild the original active cluster in a clean state and mirror data from the disaster recovery cluster.

Follow these best practices for disaster recovery in the event of failure of the active cluster in an active/passive configuration:

  1. Promote the passive recovery cluster to an active role.
    Designate the passive cluster as the active cluster for all client connections. This minimizes downtime and ensures operations can continue.
  2. Redirect applications to the new active recovery cluster.
    MirrorMaker 2 synchronizes committed offsets to passive clusters, allowing consumer applications to resume from the last transferred offset when switching to the recovery cluster. However, because of the time lag in offset synchronization, switching consumers may result in some message duplication. To minimize duplication, switch all members of a consumer group together as soon as possible. Keeping the group intact minimizes the chance of a consumer processing duplicate messages.
  3. Remove the MirrorMaker 2 configuration for replication from the original active cluster to the passive cluster.
    After failover, the original configuration is no longer needed and should be removed to avoid conflicts.
  4. Re-create the failed cluster in a clean state, adhering to the original configuration.
  5. Deploy a new MirrorMaker 2 instance to replicate data from the active recovery cluster to the rebuilt cluster.
    Treat the rebuilt cluster as the passive cluster during this replication process. To prevent automatic renaming of topics, configure MirrorMaker 2 to use the IdentityReplicationPolicy by setting the replication.policy.class property in the MirrorMaker 2 configuration. With this configuration applied, topics retain their original names in the target cluster.
  6. Ensure the rebuilt cluster mirrors all data from the now-active recovery cluster.
  7. (Optional) Promote the rebuilt cluster back to active status by redirecting applications to the rebuilt cluster, after ensuring it is fully synchronized with the active cluster.
Note

Before implementing any failover or failback processes, test your recovery approach in a controlled environment to minimize downtime and maintain data integrity.

10.10. Configuring MirrorMaker 2 connectors

MirrorMaker 2 uses internal connectors to synchronize data between Kafka clusters. Configure these connectors in your KafkaMirrorMaker2 custom resource.

MirrorMaker 2 consists of the following connectors:

MirrorSourceConnector
(Required) Replicates topics and ACLs from source to target cluster.
MirrorCheckpointConnector
(Optional) Synchronizes consumer group offsets for failover support. Only needed if offset tracking is required.
MirrorHeartbeatConnector
(Optional) Monitors connectivity between clusters. Useful for alerts but not required for replication.

The following table describes connector properties and the connectors you configure to use them.

Expand
Table 10.2. MirrorMaker 2 connector configuration properties
PropertysourceConnectorcheckpointConnectorheartbeatConnector
admin.timeout.ms
Timeout for admin tasks, such as detecting new topics. Default is 60000 (1 minute).

replication.policy.class
Policy to define the target topic naming convention. Default is org.apache.kafka.connect.mirror.DefaultReplicationPolicy.

replication.policy.separator
The separator used for topic naming in the target cluster. By default, the separator is set to a dot (.). Separator configuration is only applicable to the DefaultReplicationPolicy replication policy class, which defines target topic names. The IdentityReplicationPolicy class does not use the property as topics retain their original names.

consumer.poll.timeout.ms
Timeout when polling the source cluster. Default is 1000 (1 second).

 
offset-syncs.topic.location
The location of the offset-syncs topic, which can be the source (default) or target cluster.

 
topic.filter.class
Topic filter to select the topics to replicate. Default is org.apache.kafka.connect.mirror.DefaultTopicFilter.

 
config.property.filter.class
Topic filter to select the topic configuration properties to replicate. Default is org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter.

  
config.properties.exclude
Topic configuration properties that should not be replicated. Supports comma-separated property names and regular expressions.

  
offset.lag.max
Maximum allowable (out-of-sync) offset lag before a remote partition is synchronized. Default is 100.

  
offset-syncs.topic.replication.factor
Replication factor for the internal offset-syncs topic. Default is 3.

  
refresh.topics.enabled
Enables check for new topics and partitions. Default is true.

  
refresh.topics.interval.seconds
Frequency of topic refresh. Default is 600 (10 minutes). By default, a check for new topics in the source cluster is made every 10 minutes. You can change the frequency by adding refresh.topics.interval.seconds to the source connector configuration.

  
replication.factor
The replication factor for new topics. Default is 2.

  
sync.topic.acls.enabled
Enables synchronization of ACLs from the source cluster. Default is true. For more information, see Section 10.9, “Configuring MirrorMaker 2”.

  
sync.topic.acls.interval.seconds
Frequency of ACL synchronization. Default is 600 (10 minutes).

  
sync.topic.configs.enabled
Enables synchronization of topic configuration from the source cluster. Default is true.

  
sync.topic.configs.interval.seconds
Frequency of topic configuration synchronization. Default 600 (10 minutes).

  
checkpoints.topic.replication.factor
Replication factor for the internal checkpoints topic. Default is 3.
 

 
emit.checkpoints.enabled
Enables synchronization of consumer offsets to the target cluster. Default is true.
 

 
emit.checkpoints.interval.seconds
Frequency of consumer offset synchronization. Default is 60 (1 minute).
 

 
group.filter.class
Group filter to select the consumer groups to replicate. Default is org.apache.kafka.connect.mirror.DefaultGroupFilter.
 

 
refresh.groups.enabled
Enables check for new consumer groups. Default is true.
 

 
refresh.groups.interval.seconds
Frequency of consumer group refresh. Default is 600 (10 minutes).
 

 
sync.group.offsets.enabled
Enables synchronization of consumer group offsets to the target cluster __consumer_offsets topic. Default is false.
 

 
sync.group.offsets.interval.seconds
Frequency of consumer group offset synchronization. Default is 60 (1 minute).
 

 
emit.heartbeats.enabled
Enables connectivity checks on the target cluster. Default is true.
  

emit.heartbeats.interval.seconds
Frequency of connectivity checks. Default is 1 (1 second).
  

heartbeats.topic.replication.factor
Replication factor for the internal heartbeats topic. Default is 3.
  

Warning

The following properties must be identically configured across all three connectors (source, checkpoint, and heartbeat):

  • replication.policy.class
  • replication.policy.separator
  • offset-syncs.topic.location
  • topic.filter.class

Mismatches cause replication failures or offset sync issues.

You can enable the optional MirrorHeartbeatConnector to monitor the replication stream between clusters. When enabled, this connector periodically creates heartbeat messages and sends them to a dedicated heartbeats topic in the source cluster.

These messages are then replicated by the MirrorSourceConnector to the target cluster. By consuming the heartbeats topic on the target cluster, you can confirm that data is flowing correctly from source to target. If you stop seeing new heartbeats on the target, it indicates a potential issue with the replication process.

Because the MirrorHeartbeatConnector writes heartbeat messages to the source cluster, it usually requires a different Connect cluster from the one that runs the MirrorSourceConnector and MirrorCheckpointConnector.

Before enabling the heartbeat connector in the KafkaMirrorMaker2 resource, consider the advantages and disadvantages of running and maintaining an additional Kafka Connect cluster. If you already operate a Kafka Connect cluster that stores state in the source Kafka cluster, you can deploy the MirrorHeartbeatConnector there instead of using the KafkaMirrorMaker2 resource.

Example configuration for the heartbeat connector defined in a KafkaMirrorMaker2 resource

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
  # ...
  connectCluster: "my-cluster-target" 
1

  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-bootstrap:9092 
2

  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-source-bootstrap:9092 
3

  mirrors:
    - sourceCluster: "my-cluster-source" 
4

      targetCluster: "my-cluster-target" 
5

      topicsPattern: "topic1|topic2|topic3"
      groupsPattern: "group1|group2|group3"
      # ...
      heartbeatConnector: 
6

        autoRestart:
          enabled: true
        config:
          heartbeats.topic.replication.factor: 1 
7

          emit.heartbeats.interval.seconds: 10 
8

  # ...

1
Alias of the Kafka Connect cluster where the connectors run. Must match the alias given in targetCluster.
2
Bootstrap address of the source cluster.
3
Bootstrap address of the target cluster. Use the same alias as the target cluster, but specify the bootstrap servers of the source cluster, because the heartbeat topic is created in the source.
4
Alias of the source cluster. Must match the source alias defined for the MirrorSourceConnector.
5
Alias of the target cluster. Must match the target alias defined for the MirrorSourceConnector.
6
Configuration for the MirrorHeartbeatConnector that performs connectivity checks. Overrides the default configuration options.
7
Replication factor for the heartbeat topic.
8
Interval, in seconds, at which heartbeat messages are sent.
Important

The source and target cluster aliases in the MirrorHeartbeatConnector configuration must match the aliases defined for the MirrorSourceConnector in the KafkaMirrorMaker2 resource.

10.10.2. Setting a maximum number of data replication tasks

Use tasksMax to control how many tasks are assigned to connectors. Increasing the number of tasks helps improve performance when replicating many partitions or synchronizing the offsets of a large number of consumer groups.

MirrorMaker 2 connectors create distributed tasks that move data between clusters. These tasks follow a set of operational behaviors and allocation rules.

Task behavior:

  • Each task runs on a single worker pod.
  • Tasks run in parallel.
  • A worker pod can run multiple tasks, but each task runs in isolation.
  • You don’t need more pods than tasks. If there are fewer pods, tasks are distributed across available workers.

Task allocation rules:

  • By default, connectors run with 1 task, unless tasksMax is set.
  • The MirrorHeartbeatConnector always uses 1 task.
  • For the MirrorSourceConnector, the maximum possible tasks = number of partitions.
  • For the MirrorCheckpointConnector, the maximum possible tasks = number of consumer groups.
  • Actual tasks started is the lower value of tasksMax and the maximum possible tasks.

If the infrastructure can support it, increasing the number of tasks improves throughput and reduces latency during high-volume replication.

Increasing the number of tasks for the source connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 10
      autoRestart:
          enabled: true
  # ...

Enable automatic restarts of failed connectors and tasks using autoRestart. By default, the number of restarts is indefinite, but you can set a maximum on the number of automatic restarts using the maxRestarts property.

Increasing the number of tasks for the checkpoint connector is useful when you have a large number of consumer groups.

Increasing the number of tasks for the checkpoint connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    checkpointConnector:
      tasksMax: 10
      autoRestart:
          enabled: true
  # ...

By default, MirrorMaker 2 checks for new consumer groups every 10 minutes. You can adjust the refresh.groups.interval.seconds configuration to change the frequency. Take care when adjusting lower. More frequent checks can have a negative impact on performance.

10.10.2.1. Checking connector task operations

If you are using Prometheus and Grafana to monitor your deployment, you can check MirrorMaker 2 performance. The example MirrorMaker 2 Grafana dashboard provided with Streams for Apache Kafka shows the following metrics related to tasks and latency.

  • The number of tasks
  • Replication latency
  • Offset synchronization latency

If you are using KafkaMirrorMaker2 resources to configure internal MirrorMaker connectors, use the state configuration to either stop or pause a connector. In contrast to the paused state, where the connector and tasks remain instantiated, stopping a connector retains only the configuration, with no active processes. Stopping a connector from running may be more suitable for longer durations than just pausing. While a paused connector is quicker to resume, a stopped connector has the advantages of freeing up memory and resources.

Note

The state configuration replaces the (deprecated) pause configuration in the KafkaMirrorMaker2ConnectorSpec schema, which allows pauses on connectors. If you were previously using the pause configuration to pause connectors, we encourage you to transition to using the state configuration only to avoid conflicts.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Find the name of the KafkaMirrorMaker2 custom resource that controls the MirrorMaker 2 connector you want to pause or stop:

    oc get KafkaMirrorMaker2
  2. Edit the KafkaMirrorMaker2 resource to stop or pause the connector.

    Example configuration for stopping a MirrorMaker 2 connector

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 4.1.0
      replicas: 3
      connectCluster: "my-cluster-target"
      clusters:
        # ...
      mirrors:
      - sourceCluster: "my-cluster-source"
        targetCluster: "my-cluster-target"
        sourceConnector:
          tasksMax: 10
          autoRestart:
            enabled: true
          state: stopped
      # ...

    Change the state configuration to stopped or paused. The default state for the connector when this property is not set is running.

  3. Apply the changes to the KafkaMirrorMaker2 configuration.

    You can resume the connector by changing state to running or removing the configuration.

Note

Alternatively, you can expose the Kafka Connect API and use the stop and pause endpoints to stop a connector from running. For example, PUT /connectors/<connector_name>/stop. You can then use the resume endpoint to restart it.

10.10.4. Manually restarting MirrorMaker 2 connectors

Use the strimzi.io/restart-connector annotation to manually trigger a restart of a MirrorMaker 2 connector.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Find the name of the KafkaMirrorMaker2 custom resource that controls the Kafka MirrorMaker 2 connector you want to restart:

    oc get KafkaMirrorMaker2
  2. Find the name of the Kafka MirrorMaker 2 connector to be restarted from the KafkaMirrorMaker2 custom resource:

    oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>
  3. Use the name of the connector to restart the connector by annotating the KafkaMirrorMaker2 resource in OpenShift:

    oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector=<mirrormaker_connector_name>"

    In this example, connector source-cluster→target-cluster.MirrorCheckpointConnector in the my-mirror-maker-2 cluster is restarted:

    oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector=source-cluster->target-cluster.MirrorCheckpointConnector"

    You can also refine the restart behavior with the includeTasks and onlyFailed parameters, which both default to false.

    • includeTasks restarts both the connector instance and its task instances.
    • onlyFailed restarts only instances with a FAILED status when set to true.

    Separate the connector name and the parameters with a colon (:). For example:

    oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector=source-cluster->target-cluster.MirrorCheckpointConnector:includeTasks,onlyFailed"
  4. Wait for the next reconciliation to occur (every two minutes by default).

    The MirrorMaker 2 connector is restarted, as long as the annotation was detected by the reconciliation process. When MirrorMaker 2 accepts the request, the annotation is removed from the KafkaMirrorMaker2 custom resource.

10.10.5. Manually restarting MirrorMaker 2 connector tasks

Use the strimzi.io/restart-connector-task annotation to manually trigger a restart of a MirrorMaker 2 connector.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Find the name of the KafkaMirrorMaker2 custom resource that controls the MirrorMaker 2 connector task you want to restart:

    oc get KafkaMirrorMaker2
  2. Find the name of the connector and the ID of the task to be restarted from the KafkaMirrorMaker2 custom resource:

    oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>

    Task IDs are non-negative integers, starting from 0.

  3. Use the name and ID to restart the connector task by annotating the KafkaMirrorMaker2 resource in OpenShift:

    oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector-task=<mirrormaker_connector_name>:<task_id>"

    In this example, task 0 for connector source-cluster→target-cluster.MirrorSourceConnector in the my-mirror-maker-2 cluster is restarted:

    oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector-task=source-cluster->target-cluster.MirrorSourceConnector:2"
  4. Wait for the next reconciliation to occur (every two minutes by default).

    The MirrorMaker 2 connector task is restarted, as long as the annotation was detected by the reconciliation process. When MirrorMaker 2 accepts the request, the annotation is removed from the KafkaMirrorMaker2 custom resource.

MirrorMaker 2 connectors use internal producers and consumers. If needed, you can configure these producers and consumers to override the default settings.

For example, you can increase the batch.size for the source producer that sends topics to the target Kafka cluster to better accommodate large volumes of messages.

Important

Producer and consumer configuration options depend on the MirrorMaker 2 implementation, and may be subject to change.

The following tables describe the producers and consumers for each of the connectors and where you can add configuration.

Expand
Table 10.3. Source connector producers and consumers
TypeDescriptionConfiguration

Producer

Sends topic messages to the target Kafka cluster. Consider tuning the configuration of this producer when it is handling large volumes of data.

mirrors.sourceConnector.config: producer.override.*

Producer

Writes to the offset-syncs topic, which maps the source and target offsets for replicated topic partitions.

mirrors.sourceConnector.config: producer.*

Consumer

Retrieves topic messages from the source Kafka cluster.

mirrors.sourceConnector.config: consumer.*

Expand
Table 10.4. Checkpoint connector producers and consumers
TypeDescriptionConfiguration

Producer

Emits consumer offset checkpoints.

mirrors.checkpointConnector.config: producer.override.*

Consumer

Loads the offset-syncs topic.

mirrors.checkpointConnector.config: consumer.*

Note

You can set offset-syncs.topic.location to target to use the target Kafka cluster as the location of the offset-syncs topic.

Expand
Table 10.5. Heartbeat connector producer
TypeDescriptionConfiguration

Producer

Emits heartbeats.

mirrors.heartbeatConnector.config: producer.override.*

The following example shows how you configure the producers and consumers.

Example configuration for connector producers and consumers

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 4.1.0
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 5
      config:
        producer.override.batch.size: 327680
        producer.override.linger.ms: 100
        producer.request.timeout.ms: 30000
        consumer.fetch.max.bytes: 52428800
        # ...
    checkpointConnector:
      config:
        producer.override.request.timeout.ms: 30000
        consumer.max.poll.interval.ms: 300000
        # ...
    heartbeatConnector:
      config:
        producer.override.request.timeout.ms: 30000
        # ...

10.11. Configuring the Kafka Bridge

Update the spec properties of the KafkaBridge custom resource to configure your Kafka Bridge deployment.

In order to prevent issues arising when client consumer requests are processed by different Kafka Bridge instances, address-based routing must be employed to ensure that requests are routed to the right Kafka Bridge instance. Additionally, each independent Kafka Bridge instance must have a replica. A Kafka Bridge instance has its own state which is not shared with another instances.

For a deeper understanding of the Kafka Bridge and its cluster configuration options, refer to the Using the Kafka Bridge guide and the Streams for Apache Kafka Custom Resource API Reference.

Example KafkaBridge custom resource configuration

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # Replicas (required)
  replicas: 3 
1

  # Kafka bootstrap servers (required)
  bootstrapServers: <cluster_name>-cluster-kafka-bootstrap:9092 
2

  # HTTP configuration (required)
  http: 
3

    port: 8080
    # CORS configuration (optional)
    cors: 
4

      allowedOrigins: "https://strimzi.io"
      allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH"
  # Resources requests and limits (recommended)
  resources: 
5

    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # TLS configuration (optional)
  tls: 
6

    trustedCertificates:
      - secretName: my-cluster-cluster-cert
        pattern: "*.crt"
      - secretName: my-cluster-cluster-cert
        certificate: ca2.crt
  # Authentication (optional)
  authentication: 
7

    type: tls
    certificateAndKey:
      secretName: my-secret
      certificate: public.crt
      key: private.key
  # Consumer configuration (optional)
  consumer: 
8

    config:
      auto.offset.reset: earliest
  # Producer configuration (optional)
  producer: 
9

    config:
      delivery.timeout.ms: 300000
  # Logging configuration (optional)
  logging: 
10

    type: inline
    loggers:
      rootLogger.level: INFO
      # Enabling DEBUG just for send operation
      logger.send.name: http.openapi.operation.send
      logger.send.level: DEBUG
  # JVM options (optional)
  jvmOptions: 
11

    "-Xmx": "1g"
    "-Xms": "1g"
  # Readiness probe (optional)
  readinessProbe: 
12

    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Liveness probe (optional)
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Custom image (optional)
  image: my-org/my-image:latest 
13

  # Pod template (optional)
  template: 
14

    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    bridgeContainer: 
15

      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
  # Tracing configuration (optional)
  tracing:
    type: opentelemetry 
16

  # Metrics configuration (optional)
  metricsConfig:
    type: jmxPrometheusExporter 
17

    valueFrom:
      configMapKeyRef:
        name: kafka-metrics
        key: kafka-metrics-config.yml

1
The number of replica nodes.
2
Bootstrap address for connection to the target Kafka cluster. The address takes the format <cluster_name>-kafka-bootstrap:<port_number>. The Kafka cluster doesn’t need to be managed by Streams for Apache Kafka or deployed to an OpenShift cluster.
3
HTTP access to Kafka brokers.
4
CORS access specifying selected resources and access methods. Additional HTTP headers in requests describe the origins that are permitted access to the Kafka cluster.
5
Requests for reservation of supported resources, currently cpu and memory, and limits to specify the maximum resources that can be consumed.
6
TLS configuration for encrypted connections to the Kafka cluster, with trusted certificates stored in X.509 format within the specified secrets.
7
Authentication for the Kafka Bridge cluster, specified as tls, scram-sha-256, scram-sha-512, plain, or oauth. By default, the Kafka Bridge connects to Kafka brokers without authentication. For details on configuring authentication, see the KafkaBridgeSpec schema properties
8
Consumer configuration options.
9
Producer configuration options.
10
Kafka Bridge loggers and log levels added directly (inline) or indirectly (external) through a ConfigMap. Custom Log4j configuration must be placed under the log4j2.properties key in the ConfigMap. You can set log levels to INFO, ERROR, WARN, TRACE, DEBUG, FATAL or OFF.
11
JVM configuration options to optimize performance for the Virtual Machine (VM) running the Kafka Bridge.
12
Healthchecks to know when to restart a container (liveness) and when a container can accept traffic (readiness).
13
Optional: Container image configuration, which is recommended only in special situations.
14
Template customization. Here a pod is scheduled with anti-affinity, so the pod is not scheduled on nodes with the same hostname.
15
Environment variables are set for distributed tracing.
16
Distributed tracing is enabled by using OpenTelemetry.
17
Prometheus metrics enabled. In this example, metrics are configured for the Prometheus JMX Exporter.

10.12. Applying optional common configuration

You can further configure Streams for Apache Kafka components by applying any of the following optional common configuration settings. Common configuration is configured independently for each component, such as the following:

  • Resource limits and requests (Recommended)
  • Metrics configuration
  • Liveness and readiness probes
  • JVM options for maximum and minimum memory allocation
  • Adding additional volumes and volume mounts
  • Template configuration for pods and containers
  • Logging frequency

Advanced or specialized options include:

  • Custom container images
  • Rack awareness
  • Distributed tracing

Configure common options for Streams for Apache Kafka custom resources in the .spec section of the custom resource. For more information on these configuration options, refer to Common configuration properties.

10.12.2. Metrics configuration

Enable metrics collection for monitoring.

Example metrics configuration

# ...
spec:
  metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: my-metrics-config
        key: kafka-metrics-config.yml
  # ...

Configuration varies depending on the component and exporter used: Prometheus JMX Exporter or Streams for Apache Kafka Metrics Reporter. For more information, see Introducing metrics.

10.12.3. Liveness and readiness probes

Configure health checks for the container.

Example liveness and readiness probes

# ...
spec:
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  readinessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 5
  # ...

10.12.4. JVM options

Configure the Java Virtual Machine (JVM) for the component. To enable garbage collector (GC) logging, set gcLoggingEnabled to true.

Example JVM options

# ...
spec:
  jvmOptions:
    -Xms: "512m"
    -Xmx: "1g"
    gcLoggingEnabled: true
  # ...

10.12.5. Additional volumes and mounts

Add extra volumes to the container and mount them in specific locations.

Example additional volumes

# ...
spec:
  kafka:
    template:
      pod:
        volumes:
          - name: example-secret
            secret:
              secretName: secret-name
          - name: example-configmap
            configMap:
              name: config-map-name
      kafkaContainer:
        volumeMounts:
          - name: example-secret
            mountPath: /mnt/secret-volume
          - name: example-configmap
            mountPath: /mnt/cm-volume
  # ...

Note

You can use template configuration to add other customizations to pods and containers, such as affinity and security context. For more information, see Configuring pod scheduling and Applying security context to Streams for Apache Kafka pods and containers.

10.12.6. Custom container image

Override the default container image. Use only in special situations.

Example custom image

# ...
spec:
  image: my-org/custom-kafka-image:latest
  # ...

10.12.7. Rack awareness

Enable rack-aware broker assignment to improve fault tolerance. This is a specialized option intended for a deployment within the same location, not across regions.

Example rack awareness configuration

# ...
spec:
  rack:
    topologyKey: topology.kubernetes.io/zone
  # ...

10.12.8. Distributed tracing configuration

Enable distributed tracing using OpenTelemetry to monitor Kafka component operations.

Example tracing configuration

# ...
spec:
  tracing:
    type: opentelemetry
  # ...

For more information see Introducing distributed tracing.

10.12.9. Configuring logging levels

Warning

Streams for Apache Kafka operators and Kafka components use log4j2 for logging. However, Kafka 3.9 and earlier versions rely on log4j1. For log4j1-based configuration examples, refer to the Streams for Apache Kafka 2.9 documentation.

You can set log levels to INFO, ERROR, WARN, TRACE, DEBUG, FATAL or OFF. Configure the logging levels of Kafka components and Streams for Apache Kafka operators through their custom resources. You can use either of these options:

  • Specify logging levels directly in the spec.logging property of the custom resource.
  • Define logging properties in a custom ConfigMap and reference it using the configMapKeyRef property.

Advantages of using a ConfigMap:

  • Centralized maintenance
  • Reusable with multiple resources
  • Flexibility to append logging specifications to add filters

Specify a logging type in your logging specification:

  • inline when specifying logging levels directly
  • external when referencing a ConfigMap

For inline configuration, use the loggers property to set the root logger level and levels for specific classes or loggers.

Example inline logging configuration

# ...
logging:
  type: inline
  loggers:
    rootLogger.level: INFO
# ...

For external configuration, use a ConfigMap to define logging configurations using a full log4j2.properties file. Set the logging name and key properties to reference the ConfigMap. Both properties are mandatory.

Example external logging configuration

# ...
logging:
  type: external
  valueFrom:
    configMapKeyRef:
      name: my-config-map
      key: log4j2.properties
# ...

Default logging is used if logging is not specified in the resource using either method. Loggers that haven’t been explicitly configured inherit settings from their parent loggers.

When a resource managed by the Cluster Operator is created, a ConfigMap with the specified logging configuration is also created. For components managed by the Streams for Apache Kafka operators, changes to logging levels are applied dynamically.

Warning

Setting a log level to DEBUG or TRACE may result in a large amount of log output and may have performance implications.

10.12.9.1. Configurable loggers

The following Kafka components and operators have specific loggers available for configuration:

For information about log levels, see Apache logging services.

10.12.9.2. Creating a ConfigMap for logging

To use a ConfigMap to define logging properties, you create and then reference it as part of the logging definition in the spec of a resource. Place the configuration under log4j2.properties

In this procedure a ConfigMap defines a root logger for a Kafka resource.

Procedure

  1. Create the ConfigMap as a YAML file or from a properties file.

    Example ConfigMap for logging

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: logging-configmap
    data:
      log4j2.properties: |
        rootLogger.level = "INFO"
        appender.console.type = Console
        appender.console.name = STDOUT
        appender.console.layout.type = PatternLayout
        appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n
        rootLogger.level = INFO
        rootLogger.appenderRefs = console
        rootLogger.appenderRef.console.ref = STDOUT
        rootLogger.additivity = false

    If you are using a properties file, define the logging configuration and specify the file at the command line when creating the ConfigMap.

    Properties file definition

    # Define the logger
    rootLogger.level = "INFO"
    # ...

    Specifying the properties file

    oc create configmap logging-configmap --from-file=log4j2.properties

  2. Add external logging to the spec of the Kafka resource, specifying the name and key of the ConfigMap:

    # ...
    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: logging-configmap
          key: log4j2.properties
    # ...
  3. Apply the changes to the Kafka configuration.

10.12.9.3. Configuring Cluster Operator logging

Cluster Operator logging is configured through a ConfigMap named strimzi-cluster-operator. This ConfigMap, created with default values during installation, is described in the file install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml. Configure Cluster Operator logging by changing data.log4j2.properties values in the file.

To update the logging configuration, edit the 050-ConfigMap-strimzi-cluster-operator.yaml file and then run the following command:

oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml

Alternatively, edit the ConfigMap directly:

oc edit configmap strimzi-cluster-operator

With this ConfigMap, you can control the following aspects of logging:

  • Root logger level
  • Log output format
  • Log levels for different components
  • Kafka AdminClient logging levels
  • Netty logging Levels
  • How often logging configuration is loaded

Netty is a framework used in Streams for Apache Kafka for network communication. The monitorInterval setting determines how often in seconds the logging configuration is dynamically reloaded. The default is 30 seconds.

If the ConfigMap is missing when the Cluster Operator is deployed, the default logging values are used.

If the ConfigMap is accidentally deleted after the Cluster Operator is deployed, the most recently loaded logging configuration is used. Create a new ConfigMap to load a new logging configuration.

Warning

Do not remove the monitorInterval option from the ConfigMap.

Add logging filters to Streams for Apache Kafka operators by using a ConfigMap that contains a complete log4j2.properties file with your custom log4j2 configuration.

Filters are useful when too many logging messages are being produced. For instance, if rootLogger.level="DEBUG", filters reduce the number of logs to focus on a specific resource by logging only messages matching the filter.

Markers specify what to include in the log using kind, namespace, and resource name values. For example, to isolate the logs of a failing Kafka cluster, set the kind to Kafka and use the namespace and name of the cluster.

Basic logging filter configuration

rootLogger.level = "INFO"
rootLogger.appenderRefs = console
rootLogger.appenderRef.console.ref = STDOUT

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n

appender.console.filter.filter1.type = MarkerFilter 
1

appender.console.filter.filter1.onMatch = ACCEPT 
2

appender.console.filter.filter1.onMismatch = DENY 
3

appender.console.filter.filter1.marker = Kafka(my-namespace/my-kafka-cluster) 
4

1
The MarkerFilter compares a specified marker.
2
onMatch accepts logs if the marker matches.
3
onMismatch rejects logs if the marker does not match.
4
Marker format: kind(namespace/resource_name).

For multiple filters, define each one separately:

Multiple logging filter configuration

# ...

appender.console.name = STDOUT

appender.console.filter.filter1.type = MarkerFilter
appender.console.filter.filter1.onMatch = ACCEPT
appender.console.filter.filter1.onMismatch = DENY
appender.console.filter.filter1.marker = Kafka(my-namespace/my-kafka-cluster-1)

appender.console.filter.filter2.type = MarkerFilter
appender.console.filter.filter2.onMatch = ACCEPT
appender.console.filter.filter2.onMismatch = DENY
appender.console.filter.filter2.marker = Kafka(my-namespace/my-kafka-cluster-2)

Adding filters to the Cluster Operator

To add filters to the Cluster Operator, update the ConfigMap YAML file (install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml):

Procedure

  1. Update the 050-ConfigMap-strimzi-cluster-operator.yaml file to add the filter properties:

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: strimzi-cluster-operator
    data:
      log4j2.properties: |
        rootLogger.level = "INFO"
        rootLogger.appenderRefs = console
        rootLogger.appenderRef.console.ref = STDOUT
    
        appender.console.type = Console
        appender.console.name = STDOUT
        appender.console.layout.type = PatternLayout
        appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n
    
        appender.console.filter.filter1.type=MarkerFilter
        appender.console.filter.filter1.onMatch=ACCEPT
        appender.console.filter.filter1.onMismatch=DENY
        appender.console.filter.filter1.marker=Kafka(my-namespace/my-kafka-cluster)

    Alternatively, edit the ConfigMap directly:

    oc edit configmap strimzi-cluster-operator
  2. If updating the YAML file, apply the changes to the ConfigMap configuration.

Adding filters to the Topic or User Operator

To add filters to the Topic or User Operator, create or edit a logging ConfigMap. The same method applies for both operators.

Procedure

  1. Create the ConfigMap as a YAML file or from a properties file.

    Example filter properties for my-topic topic

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: logging-configmap
    data:
      log4j2.properties: |
        rootLogger.level = "INFO"
        rootLogger.appenderRefs = console
        rootLogger.appenderRef.console.ref = STDOUT
    
        appender.console.type = Console
        appender.console.name = STDOUT
        appender.console.layout.type = PatternLayout
        appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n
    
        appender.console.filter.filter1.type = MarkerFilter
        appender.console.filter.filter1.onMatch = ACCEPT
        appender.console.filter.filter1.onMismatch = DENY
        appender.console.filter.filter1.marker = KafkaTopic(my-namespace/my-topic)

    If you are using a properties file, define the logging configuration and specify the file at the command line when creating the ConfigMap.

    Properties file definition

    # Define the logger
    rootLogger.level = "INFO"
    rootLogger.appenderRefs = console
    rootLogger.appenderRef.console.ref = STDOUT
    # Define the appenders
    appender.console.type = Console
    appender.console.name = STDOUT
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n
    # Set the filters
    appender.console.filter.filter1.type = MarkerFilter
    appender.console.filter.filter1.onMatch = ACCEPT
    appender.console.filter.filter1.onMismatch = DENY
    appender.console.filter.filter1.marker = KafkaTopic(my-namespace/my-topic)
    # ...

    Specifying the properties file

    oc create configmap logging-configmap --from-file=log4j2.properties

  2. Define external logging in the topicOperator or userOperator configuration of the Kafka resource, specifying the name and key of the ConfigMap:

    spec:
      # ...
      entityOperator:
        topicOperator:
          logging:
            type: external
            valueFrom:
              configMapKeyRef:
                name: logging-configmap
                key: log4j2.properties
  3. Apply the changes to the Kafka configuration.

10.12.10. Configuring pod scheduling

To optimize the resilience and performance of your Kafka cluster, you can control how its pods are scheduled across OpenShift nodes. Pod scheduling strategies can help you to achieve the following:

  • Increase fault tolerance by spreading Kafka pods across different nodes.
  • Avoid resource contention by separating pods from critical workloads.
  • Maintain resource availability by assigning Kafka pods to nodes with sufficient capacity.

10.12.10.1. Scheduling strategies

Kafka components can be scheduled onto OpenShift nodes using affinity rules, tolerations, and topology constraints. These strategies help isolate workloads, optimize resource usage, and improve overall cluster performance.

The following scheduling techniques support different deployment goals:

Use pod anti-affinity to avoid critical applications sharing nodes
Use pod anti-affinity to prevent critical applications from being scheduled on the same disk. In Kafka deployments, configure pod anti-affinity to ensure that Kafka brokers do not share nodes with other workloads, such as databases.
Use node affinity to schedule workloads onto specific nodes
OpenShift clusters often include nodes optimized for different workloads, such as CPU, memory, storage, or network. Node affinity enables scheduling Kafka components onto nodes that match specific labels, such as node.kubernetes.io/instance-type or custom labels, to optimize performance and cost.
Use node affinity, taints, and tolerations for dedicated nodes

To reserve nodes for Kafka, you can taint them to exclude them from general workloads. Kafka pods can still be scheduled onto these nodes by configuring:

  • Tolerations, which allow the pods to bypass the taint.
  • Node affinity, for the pods to run on those specific nodes.

These settings direct Kafka pods to dedicated nodes while preventing other workloads from being scheduled there This approach isolates Kafka from other workloads, reducing resource contention and improving stability.

Use topology spread constraints to balance pods across zones or nodes
Topology spread constraints help distribute pods evenly across specified topology domains, such as zones, regions, or nodes. For Kafka, this strategy reduces the risk of scheduling too many brokers on the same zone or node, improving availability and resilience.

You can apply these scheduling strategies in the template.pod property within the spec of a Streams for Apache Kafka custom resource.

For Kafka brokers specifically, Streams for Apache Kafka provides a two-level configuration:

  • Kafka.spec.kafka.template.pod
    Use this to set a cluster-wide default scheduling policy for all Kafka broker and controller pods.
  • KafkaNodePool.spec.template.pod
    For more granular control, if needed, you can override the default policy for specific node pools using the KafkaNodePool resource.
Important

If any property is defined under KafkaNodePool.spec.template, all Kafka.spec.kafka.template settings are ignored for pods in that node pool. Properties are not merged. In other words, node pools using KafkaNodePool.spec.template settings do not inherit any settings from the cluster-wide template. Every required setting must be specified in the pool’s template.

Other Streams for Apache Kafka components have their own template properties for scheduling:

  • Kafka.spec.kafka.template.pod
  • KafkaNodePool.spec.template.pod
  • Kafka.spec.entityOperator.template.pod
  • Kafka.spec.cruiseControl.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaBridge.spec.template.pod
  • KafkaMirrorMaker2.spec.template.pod

The following procedures provide scheduling configuration examples for the Kafka and KafkaNodePool resources. The same configurations shown in these examples can be applied to the template.pod property of any other component.

Scheduling properties follow the OpenShift specification.

10.12.10.2. Configuring pod anti-affinity for Kafka nodes

To improve fault tolerance, you can prevent multiple Kafka broker pods from running on the same worker node. Configure podAntiAffinity so that the pods are scheduled on different worker nodes.

This procedure provides configuration examples for the cluster-wide Kafka resource and the pool-specific KafkaNodePool resource.

Note

If you configure KafkaNodePool.spec.template, its settings replace Kafka.spec.kafka.template for that node pool. Properties are not merged. For more information, see Scheduling strategies.

Procedure

  1. Configure podAntiAffinity in either the Kafka or KafkaNodePool resource.

    • To set a cluster-wide rule, edit the affinity property in spec.kafka.template.pod of your Kafka resource. Use the strimzi.io/name label to select all broker pods.
    • To set a pool-specific rule, edit the affinity property in spec.template.pod of your KafkaNodePool resource. Use the strimzi.io/pool-name label to select only the pods in that pool.

    In both cases, set the topologyKey to "kubernetes.io/hostname" to prevent pods from being placed on the same host.

    This example applies a rule to a Kafka resource named my-cluster that prevents any of its broker pods from running on the same node.

    Example cluster-wide anti-affinity configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        template:
          pod:
            affinity:
              podAntiAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  - labelSelector:
                      matchExpressions:
                        - key: strimzi.io/name
                          operator: In
                          values:
                            - my-cluster-kafka
                    topologyKey: "kubernetes.io/hostname"
      # ...

    This example applies a rule to a KafkaNodePool resource named broker that prevents pods from that specific pool from running on the same node.

    Example node pool-specific anti-affinity configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: broker
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      roles:
        - broker
      template:
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: strimzi.io/pool-name
                        operator: In
                        values:
                          - broker
                  topologyKey: "kubernetes.io/hostname"
      # ...

  2. Apply the changes to your custom resource configuration.

To improve stability and performance, you can prevent Kafka pods from running on the same worker nodes as other resource-intensive applications, such as databases. Configure podAntiAffinity so that these workloads are scheduled on separate nodes.

This procedure provides configuration examples for the cluster-wide Kafka resource and the pool-specific KafkaNodePool resource.

Note

If you configure KafkaNodePool.spec.template, its settings replace Kafka.spec.kafka.template for that node pool. Properties are not merged. For more information, see Scheduling strategies.

Prerequisites

Procedure

  1. Configure podAntiAffinity in either the Kafka or KafkaNodePool resource.

    • To set a cluster-wide rule, edit the affinity property in spec.kafka.template.pod of your Kafka resource.
    • To set a pool-specific rule, edit the affinity property in spec.template.pod of your KafkaNodePool resource.

    In both cases, use a labelSelector to identify the application pods you want to keep separate from your Kafka pods and set the topologyKey to "kubernetes.io/hostname" to prevent pods from being placed on the same host.

    This example applies a rule to a Kafka resource named my-cluster that prevents any of its broker pods from running on the same node as pods labeled postgresql and mongodb.

    Example cluster-wide anti-affinity configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        template:
          pod:
            affinity:
              podAntiAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  - labelSelector:
                      matchExpressions:
                        - key: application
                          operator: In
                          values:
                          - postgresql
                          - mongodb
                    topologyKey: "kubernetes.io/hostname"
      # ...

    This example applies a rule to a KafkaNodePool resource named broker that prevents pods from that pool from running on the same node as pods labeled postgresql and mongodb.

    Example node pool-specific anti-affinity configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: broker
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      roles:
        - broker
      template:
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
      # ...

  2. Apply the changes to your custom resource configuration.

10.12.10.4. Configuring pod scheduling for specific nodes

Your OpenShift cluster might have different types of worker nodes, some with specialized hardware (fast SSDs, powerful CPUs) or a particular location (a specific availability zone). To optimize performance and resource usage, configure nodeAffinity so that Kafka pods are scheduled only on the nodes that match your requirements.

This procedure provides configuration examples for the cluster-wide Kafka resource and the pool-specific KafkaNodePool resource.

Note

If you configure KafkaNodePool.spec.template, its settings replace Kafka.spec.kafka.template for that node pool. Properties are not merged. For more information, see Scheduling strategies.

Prerequisites

Procedure

  1. Label the worker nodes to identify them when scheduling the kafka pods.

    oc label node <name_of_node> disk=sdd
  2. Configure nodeAffinity in either the Kafka or KafkaNodePool resource to match the label.

    • To set a cluster-wide rule, edit the affinity property in spec.kafka.template.pod of your Kafka resource.
    • To set a pool-specific rule, edit the affinity property in spec.template.pod of your KafkaNodePool resource.

    In both cases, use nodeSelectorTerms with matchExpressions to specify the key-value label of the nodes you want to schedule pods on.

    This example applies a rule to a Kafka resource that assigns all its broker pods to run only on nodes with the label disk: ssd.

    Example cluster-wide affinity configuration for specific nodes

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        template:
          pod:
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                    - matchExpressions:
                      - key: disk
                        operator: In
                        values:
                        - sdd
      # ...

    Example availability zone scheduling for node pools

    For node pools, a common production scenario is to configure node pools so that pods are scheduled only on nodes within a specified availability zone. For more information, see Managing storage affinity using node pools

  3. Apply the changes to your custom resource configuration.

10.12.10.5. Configuring pod scheduling for dedicated nodes

You can dedicate a set of worker nodes exclusively to your Kafka brokers so that no other applications can compete with Kafka for resources on those nodes.

To configure dedicated worker nodes for Kafka pods in a specific pool, combine the following:

Taints
Apply taints to worker nodes to prevent other pods from being scheduled on them.
Tolerations
Apply tolerations to Kafka pods to allow them to be scheduled on tainted nodes.
Affinity
Apply affinity to Kafka pods to schedule them on specifically labeled nodes.

This procedure provides configuration examples for the cluster-wide Kafka resource and the pool-specific KafkaNodePool resource.

Note

If you configure KafkaNodePool.spec.template, its settings replace Kafka.spec.kafka.template for that node pool. Properties are not merged. For more information, see Scheduling strategies.

Prerequisites

Procedure

  1. Taint and label the dedicated worker nodes to prevent other pods from being scheduled on them and to identify them when scheduling the Kafka pods:

    oc adm taint node <name_of_node> dedicated=kafka:NoSchedule
    oc label node <name_of_node> dedicated=kafka
  2. Configure tolerations and nodeAffinity in either your Kafka or KafkaNodePool custom resource to match the taint and label.

    • To set a cluster-wide rule, edit the affinity property in spec.kafka.template.pod of your Kafka resource.
    • To set a pool-specific rule, edit the affinity property in spec.template.pod of your KafkaNodePool resource.

    In both cases, use nodeSelectorTerms with matchExpressions to specify the key-value label of the nodes you want to schedule pods on.

    This example applies a rule to a Kafka resource that assigns all its broker pods to run only on nodes that have been tainted and labeled with dedicated=kafka.

    Example cluster-wide affinity configuration for dedicated nodes

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        template:
          pod:
            tolerations:
              - key: "dedicated"
                operator: "Equal"
                value: "kafka"
                effect: "NoSchedule"
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                  - matchExpressions:
                    - key: dedicated
                      operator: In
                      values:
                      - kafka
      # ...

    This example applies a rule to a KafkaNodePool resource named broker that assigns its pods to run on dedicated nodes marked with dedicated=broker-kafka.

    Example node pool-specific affinity configuration for dedicated nodes

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: broker
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      replicas: 3
      roles:
        - broker
      template:
        pod:
          tolerations:
            - key: "dedicated"
              operator: "Equal"
              value: "kafka"
              effect: "NoSchedule"
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                - matchExpressions:
                  - key: dedicated
                    operator: In
                    values:
                    - broker-kafka
      # ...

  3. Apply the changes to your custom resource configuration.

10.12.11. Disabling PodDisruptionBudget generation

Streams for Apache Kafka automatically generates a PodDisruptionBudget resource for each of the following components:

  • Kafka cluster
  • Kafka Connect
  • MirrorMaker 2
  • Kafka Bridge

Each budget applies across all pods deployed for that component.

The Kafka cluster’s PodDisruptionBudget covers all associated node pool pods.

To disable automatic PodDisruptionBudget generation, set the STRIMZI_POD_DISRUPTION_BUDGET_GENERATION environment variable to false in the Cluster Operator configuration. You can then define custom PodDisruptionBudget resources if needed. For more information, see Section 10.6, “Configuring the Cluster Operator”.

10.12.12. Using ConfigMap resources to add configuration

Add specific configuration to your Streams for Apache Kafka deployment using ConfigMap resources. Config maps use key-value pairs to store non-confidential data. Configuration data added to config maps is maintained in one place and can be reused amongst components.

Config maps can only store the following types of configuration data:

  • Logging configuration
  • Metrics configuration
  • External configuration for Kafka Connect connectors

You can’t use config maps for other areas of configuration.

When you configure a component, you can add a reference to a ConfigMap using the configMapKeyRef property.

For example, you can use configMapKeyRef to reference a ConfigMap that provides configuration for logging. You define logging levels using the log4j2.properties key in the ConfigMap and then reference it in the logging configuration of the resource.

Example reference to a ConfigMap

# ...
logging:
  type: external
  valueFrom:
    configMapKeyRef:
      name: my-config-map
      key: log4j2.properties
# ...

To use a ConfigMap for metrics configuration, you add a reference to the metricsConfig configuration of the component in the same way.

template properties allow data from a ConfigMap or Secret to be mounted in a pod as environment variables or volumes. You can use external configuration data for the connectors used by Kafka Connect. The data might be related to an external data source, providing the values needed for the connector to communicate with that data source.

For example, you can use the configMapKeyRef property to pass configuration data from a ConfigMap as an environment variable.

Example ConfigMap providing environment variable values

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  template:
    connectContainer:
      env:
        - name: MY_ENVIRONMENT_VARIABLE
          valueFrom:
            configMapKeyRef:
              name: my-config-map
              key: my-key

If you are using config maps that are managed externally, use configuration providers to load the data in the config maps.

10.12.12.1. Naming custom config maps

Streams for Apache Kafka creates its own config maps and other resources when it is deployed to OpenShift. The config maps contain data necessary for running components. The config maps created by Streams for Apache Kafka must not be edited.

Make sure that any custom config maps you create do not have the same name as these default config maps. If they have the same name, they are overwritten. For example, if the custom ConfigMap has the same name as the ConfigMap for the Kafka cluster, it is overwritten when there is an update to the Kafka cluster.

Use configuration providers to load configuration data from external sources. The providers operate independently of Streams for Apache Kafka. You can use them to load configuration data for all Kafka components, including producers and consumers. You reference the external source in the configuration of the component and provide access rights. The provider loads data without needing to restart the Kafka component or extracting files, even when referencing a new external source. For example, use providers to supply the credentials for the Kafka Connect connector configuration. The configuration must include any access rights to the external source.

10.12.13.1. Enabling configuration providers

You can enable one or more configuration providers using the config.providers properties in the spec configuration of a component.

Example configuration to enable a configuration provider

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  config:
    # ...
    config.providers: env
    config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider
  # ...

KubernetesSecretConfigProvider
Loads configuration data from OpenShift secrets. You specify the name of the secret and the key within the secret where the configuration data is stored. This provider is useful for storing sensitive configuration data like passwords or other user credentials.
KubernetesConfigMapConfigProvider
Loads configuration data from OpenShift config maps. You specify the name of the config map and the key within the config map where the configuration data is stored. This provider is useful for storing non-sensitive configuration data.
EnvVarConfigProvider
Loads configuration data from environment variables. You specify the name of the environment variable where the configuration data is stored. This provider is useful for configuring applications running in containers, for example, to load certificates or JAAS configuration from environment variables mapped from secrets.
FileConfigProvider
Loads configuration data from a file. You specify the path to the file where the configuration data is stored. This provider is useful for loading configuration data from files that are mounted into containers.
DirectoryConfigProvider
Loads configuration data from files within a directory. You specify the path to the directory where the configuration files are stored. This provider is useful for loading multiple configuration files and for organizing configuration data into separate files.

To use KubernetesSecretConfigProvider and KubernetesConfigMapConfigProvider, which are part of the OpenShift Configuration Provider plugin, you must set up access rights to the namespace that contains the configuration file.

You can use the other providers without setting up access rights. You can supply connector configuration for Kafka Connect or MirrorMaker 2 in this way by doing the following:

  • Mount config maps or secrets into the Kafka Connect pod as environment variables or volumes
  • Enable EnvVarConfigProvider, FileConfigProvider, or DirectoryConfigProvider in the Kafka Connect or MirrorMaker 2 configuration
  • Pass connector configuration using the template property in the spec of the KafkaConnect or KafkaMirrorMaker2 resource

Using providers help prevent the passing of restricted information through the Kafka Connect REST interface. You can use this approach in the following scenarios:

  • Mounting environment variables with the values a connector uses to connect and communicate with a data source
  • Mounting a properties file with values that are used to configure Kafka Connect connectors
  • Mounting files in a directory that contains values for the TLS truststore and keystore used by a connector
Note

A restart is required when using a new Secret or ConfigMap for a connector, which can disrupt other connectors.

Use the KubernetesSecretConfigProvider to provide configuration properties from a secret or the KubernetesConfigMapConfigProvider to provide configuration properties from a config map.

In this procedure, a config map provides configuration properties for a connector. The properties are specified as key values of the config map. The config map is mounted into the Kafka Connect pod as a volume.

Prerequisites

  • A Kafka cluster is running.
  • The Cluster Operator is running.
  • You have a config map containing the connector configuration.

Example config map with connector properties

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-connector-configuration
data:
  option1: value1
  option2: value2

Procedure

  1. Configure the KafkaConnect resource.

    • Enable the KubernetesConfigMapConfigProvider

    The specification shown here can support loading values from config maps and secrets.

    Example Kafka Connect configuration to use config maps and secrets

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
      # ...
      config:
        # ...
        config.providers: secrets,configmaps 
    1
    
        config.providers.configmaps.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider 
    2
    
        config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider 
    3
    
      # ...

    1
    The alias for the configuration provider is used to define other configuration parameters. The provider parameters use the alias from config.providers, taking the form config.providers.${alias}.class.
    2
    KubernetesConfigMapConfigProvider provides values from config maps.
    3
    KubernetesSecretConfigProvider provides values from secrets.
  2. Create or update the resource to enable the provider.

    oc apply -f <kafka_connect_configuration_file>
  3. Create a role that permits access to the values in the external config map.

    Example role to access values from a config map

    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: connector-configuration-role
    rules:
    - apiGroups: [""]
      resources: ["configmaps"]
      resourceNames: ["my-connector-configuration"]
      verbs: ["get"]
    # ...

    The rule gives the role permission to access the my-connector-configuration config map.

  4. Create a role binding to permit access to the namespace that contains the config map.

    Example role binding to access the namespace that contains the config map

    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: connector-configuration-role-binding
    subjects:
    - kind: ServiceAccount
      name: my-connect-connect
      namespace: my-project
    roleRef:
      kind: Role
      name: connector-configuration-role
      apiGroup: rbac.authorization.k8s.io
    # ...

    The role binding gives the role permission to access the my-project namespace.

    The service account must be the same one used by the Kafka Connect deployment. The service account name format is <cluster_name>-connect, where <cluster_name> is the name of the KafkaConnect custom resource.

  5. Reference the config map in the connector configuration.

    Example connector configuration referencing the config map

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      # ...
      config:
        option: ${configmaps:my-project/my-connector-configuration:option1}
        # ...
    # ...

    The placeholder structure is configmaps:<path_and_file_name>:<property>. KubernetesConfigMapConfigProvider reads and extracts the option1 property value from the external config map.

Use the EnvVarConfigProvider to provide configuration properties as environment variables. Environment variables can contain values from config maps or secrets.

In this procedure, environment variables provide configuration properties for a connector to communicate with Amazon AWS. The connector must be able to read the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. The values of the environment variables are derived from a secret mounted into the Kafka Connect pod.

Note

The names of user-defined environment variables cannot start with KAFKA_ or STRIMZI_.

Prerequisites

  • A Kafka cluster is running.
  • The Cluster Operator is running.
  • You have a secret containing the connector configuration.

Example secret with values for environment variables

apiVersion: v1
kind: Secret
metadata:
  name: aws-creds
type: Opaque
data:
  awsAccessKey: QUtJQVhYWFhYWFhYWFhYWFg=
  awsSecretAccessKey: Ylhsd1lYTnpkMjl5WkE=

Procedure

  1. Configure the KafkaConnect resource.

    • Enable the EnvVarConfigProvider
    • Specify the environment variables using the template property.

    Example Kafka Connect configuration to use external environment variables

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
      # ...
      config:
        # ...
        config.providers: env 
    1
    
        config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider 
    2
    
      # ...
      template:
        connectContainer:
          env:
            - name: AWS_ACCESS_KEY_ID 
    3
    
              valueFrom:
                secretKeyRef:
                  name: aws-creds 
    4
    
                  key: awsAccessKey 
    5
    
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                secretKeyRef:
                  name: aws-creds
                  key: awsSecretAccessKey
      # ...

    1
    The alias for the configuration provider is used to define other configuration parameters. The provider parameters use the alias from config.providers, taking the form config.providers.${alias}.class.
    2
    EnvVarConfigProvider provides values from environment variables.
    3
    The environment variable takes a value from the secret.
    4
    The name of the secret containing the environment variable.
    5
    The name of the key stored in the secret.
    Note

    The secretKeyRef property references keys in a secret. If you are using a config map instead of a secret, use the configMapKeyRef property.

  2. Create or update the resource to enable the provider.

    oc apply -f <kafka_connect_configuration_file>
  3. Reference the environment variable in the connector configuration.

    Example connector configuration referencing the environment variable

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      # ...
      config:
        option: ${env:AWS_ACCESS_KEY_ID}
        option: ${env:AWS_SECRET_ACCESS_KEY}
        # ...
    # ...

    The placeholder structure is env:<environment_variable_name>. EnvVarConfigProvider reads and extracts the environment variable values from the mounted secret.

Use the FileConfigProvider to provide configuration properties from a file within a directory. Files can be stored in config maps or secrets.

In this procedure, a file provides configuration properties for a connector. A database name and password are specified as properties of a secret. The secret is mounted to the Kafka Connect pod as a volume. Volumes are mounted on the path /mnt/<volume-name>.

Prerequisites

  • A Kafka cluster is running.
  • The Cluster Operator is running.
  • You have a secret containing the connector configuration.

Example secret with database properties

apiVersion: v1
kind: Secret
metadata:
  name: mysecret
type: Opaque
stringData:
  connector.properties: |- 
1

    dbUsername: my-username 
2

    dbPassword: my-password

1
The connector configuration in properties file format.
2
Database username and password properties used in the configuration.

Procedure

  1. Configure the KafkaConnect resource.

    • Enable the FileConfigProvider
    • Specify the additional volume using the template property.

    Example Kafka Connect configuration to use an external property file

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect
    spec:
      # ...
      config:
        config.providers: file 
    1
    
        config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider 
    2
    
      #...
      template:
        pod:
          volumes:
            - name: connector-config-volume 
    3
    
              secret:
                secretName: mysecret 
    4
    
        connectContainer:
          volumeMounts:
            - name: connector-config-volume 
    5
    
              mountPath: /mnt/mysecret 
    6

    1
    The alias for the configuration provider is used to define other configuration parameters.
    2
    FileConfigProvider provides values from properties files. The parameter uses the alias from config.providers, taking the form config.providers.${alias}.class.
    3
    The name of the volume containing the secret.
    4
    The name of the secret.
    5
    The name of the mounted volume, which must match the volume name in the volumes list.
    6
    The path where the secret is mounted, which must start with /mnt/.
  2. Create or update the resource to enable the provider.

    oc apply -f <kafka_connect_configuration_file>
  3. Reference the file properties in the connector configuration as placeholders.

    Example connector configuration referencing the file

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: io.debezium.connector.mysql.MySqlConnector
      tasksMax: 2
      config:
        database.hostname: 192.168.99.1
        database.port: "3306"
        database.user: "${file:/mnt/mysecret/connector.properties:dbUsername}"
        database.password: "${file:/mnt/mysecret/connector.properties:dbPassword}"
        database.server.id: "184054"
        #...

    The placeholder structure is file:<path_and_file_name>:<property>. FileConfigProvider reads and extracts the database username and password property values from the mounted secret.

Use the DirectoryConfigProvider to provide configuration properties from multiple files within a directory. Files can be config maps or secrets.

In this procedure, a secret provides the TLS keystore and truststore user credentials for a connector. The credentials are in separate files. The secrets are mounted into the Kafka Connect pod as volumes. Volumes are mounted on the path /mnt/<volume-name>.

Prerequisites

  • A Kafka cluster is running.
  • The Cluster Operator is running.
  • You have a secret containing the user credentials.

Example secret with user credentials

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate
  user.crt: <user_certificate> # Public key of the user
  user.key: <user_private_key> # Private key of the user
  user.p12: <store> # PKCS #12 store for user certificates and keys
  user.password: <password_for_store> # Protects the PKCS #12 store

The my-user secret provides the keystore credentials (user.crt and user.key) for the connector.

The <cluster_name>-cluster-ca-cert secret generated when deploying the Kafka cluster provides the cluster CA certificate as truststore credentials (ca.crt).

Procedure

  1. Configure the KafkaConnect resource.

    • Enable the DirectoryConfigProvider
    • Specify the additional volume using the template property.

    Example Kafka Connect configuration to use external property files

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect
    spec:
      # ...
      config:
        config.providers: directory 
    1
    
        config.providers.directory.class: org.apache.kafka.common.config.provider.DirectoryConfigProvider 
    2
    
      #...
      template:
        pod:
          volumes:
            - name: my-user-volume 
    3
    
              secret:
                secretName: my-user 
    4
    
            - name: cluster-ca-volume
              secret:
                secretName: my-cluster-cluster-ca-cert
        connectContainer:
          volumeMounts:
            - name: my-user-volume 
    5
    
              mountPath: /mnt/my-user 
    6
    
            - name: cluster-ca-volume
              mountPath: /mnt/cluster-ca

    1
    The alias for the configuration provider is used to define other configuration parameters.
    2
    DirectoryConfigProvider provides values from files in a directory. The parameter uses the alias from config.providers, taking the form config.providers.${alias}.class.
    3
    The name of the volume containing the secret.
    4
    The name of the secret.
    5
    The name of the mounted volume, which must match the volume name in the volumes list.
    6
    The path where the secret is mounted, which must start with /mnt/.
  2. Create or update the resource to enable the provider.

    oc apply -f <kafka_connect_configuration_file>
  3. Reference the file properties in the connector configuration as placeholders.

    Example connector configuration referencing the files

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: io.debezium.connector.mysql.MySqlConnector
      tasksMax: 2
      config:
        # ...
        database.history.producer.security.protocol: SSL
        database.history.producer.ssl.truststore.type: PEM
        database.history.producer.ssl.truststore.certificates: "${directory:/mnt/cluster-ca:ca.crt}"
        database.history.producer.ssl.keystore.type: PEM
        database.history.producer.ssl.keystore.certificate.chain: "${directory:/mnt/my-user:user.crt}"
        database.history.producer.ssl.keystore.key: "${directory:/mnt/my-user:user.key}"
        #...

    The placeholder structure is directory:<path>:<file_name>. DirectoryConfigProvider reads and extracts the credentials from the mounted secrets.

10.12.14. Customizing OpenShift resources

A Streams for Apache Kafka deployment creates OpenShift resources, such as Deployment, Pod, and Service resources. These resources are managed by Streams for Apache Kafka operators. Only the operator that is responsible for managing a particular OpenShift resource can change that resource. If you try to manually change an operator-managed OpenShift resource, the operator will revert your changes back.

Changing an operator-managed OpenShift resource can be useful if you want to perform certain tasks, such as the following:

  • Adding custom labels or annotations that control how Pods are treated by Istio or other services
  • Managing how Loadbalancer-type Services are created by the cluster

To make the changes to an OpenShift resource, you can use the template property within the spec section of various Streams for Apache Kafka custom resources.

Here is a list of the custom resources where you can apply the changes:

  • Kafka.spec.kafka
  • Kafka.spec.entityOperator
  • Kafka.spec.kafkaExporter
  • Kafka.spec.cruiseControl
  • KafkaNodePool.spec
  • KafkaConnect.spec
  • KafkaMirrorMaker2.spec
  • KafkaBridge.spec
  • KafkaUser.spec

For more information about these properties, see the Streams for Apache Kafka Custom Resource API Reference.

The Streams for Apache Kafka Custom Resource API Reference provides more details about the customizable fields.

In the following example, the template property is used to modify the labels in a Kafka broker’s pod.

Example template customization

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  labels:
    app: my-cluster
spec:
  kafka:
    # ...
    template:
      pod:
        metadata:
          labels:
            mylabel: myvalue
    # ...

10.12.14.1. Customizing the image pull policy

Streams for Apache Kafka allows you to customize the image pull policy for containers in all pods deployed by the Cluster Operator. The image pull policy is configured using the environment variable STRIMZI_IMAGE_PULL_POLICY in the Cluster Operator deployment. The STRIMZI_IMAGE_PULL_POLICY environment variable can be set to three different values:

Always
Container images are pulled from the registry every time the pod is started or restarted.
IfNotPresent
Container images are pulled from the registry only when they were not pulled before.
Never
Container images are never pulled from the registry.

Currently, the image pull policy can only be customized for all Kafka, Kafka Connect, and Kafka MirrorMaker clusters at once. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters.

10.12.14.2. Applying a termination grace period

Apply a termination grace period to give a Kafka cluster enough time to shut down cleanly.

Specify the time using the terminationGracePeriodSeconds property. Add the property to the template.pod configuration of the Kafka custom resource.

The time you add will depend on the size of your Kafka cluster. The OpenShift default for the termination grace period is 30 seconds. If you observe that your clusters are not shutting down cleanly, you can increase the termination grace period.

A termination grace period is applied every time a pod is restarted. The period begins when OpenShift sends a term (termination) signal to the processes running in the pod. The period should reflect the amount of time required to transfer the processes of the terminating pod to another pod before they are stopped. After the period ends, a kill signal stops any processes still running in the pod.

The following example adds a termination grace period of 120 seconds to the Kafka custom resource. You can also specify the configuration in the custom resources of other Kafka components.

Example termination grace period configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    template:
      pod:
        terminationGracePeriodSeconds: 120
        # ...
    # ...

10.13. Restrictions on OpenShift labels

OpenShift labels make it easier to organize, manage, and discover OpenShift resources within your applications. The Cluster Operator is responsible for applying the following OpenShift labels to the operands it deploys. These labels cannot be overridden through template configuration of Streams for Apache Kafka resources:

  • app.kubernetes.io/name: Identifies the component type within Streams for Apache Kafka, such as kafka or cruise-control.
  • app.kubernetes.io/instance: Represents the name of the custom resource to which the operand belongs to. For instance, if a Kafka custom resource is named my-cluster, this label will bear that name on the associated pods.
  • app.kubernetes.io/part-of: Similar to app.kubernetes.io/instance, but prefixed with strimzi-.
  • app.kubernetes.io/managed-by: Defines the application responsible for managing the operand, such as strimzi-cluster-operator or strimzi-user-operator.

Example OpenShift labels on a Kafka pod when deploying a Kafka custom resource named my-cluster

apiVersion: kafka.strimzi.io/v1beta2
kind: Pod
metadata:
  name: my-cluster-kafka-0
  labels:
    app.kubernetes.io/instance: my-cluster
    app.kubernetes.io/managed-by: strimzi-cluster-operator
    app.kubernetes.io/name: kafka
    app.kubernetes.io/part-of: strimzi-my-cluster
spec:
  # ...

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部