Rechercher

Ce contenu n'est pas disponible dans la langue sélectionnée.

Chapter 29. Managing Streams for Apache Kafka

download PDF

Managing Streams for Apache Kafka requires performing various tasks to keep the Kafka clusters and associated resources running smoothly. Use oc commands to check the status of resources, configure maintenance windows for rolling updates, and leverage tools such as the Streams for Apache Kafka Drain Cleaner and Kafka Static Quota plugin to manage your deployment effectively.

29.1. Maintenance time windows for rolling updates

Maintenance time windows allow you to schedule certain rolling updates of your Kafka and ZooKeeper clusters to start at a convenient time.

29.1.1. Maintenance time windows overview

In most cases, the Cluster Operator only updates your Kafka or ZooKeeper clusters in response to changes to the corresponding Kafka resource. This enables you to plan when to apply changes to a Kafka resource to minimize the impact on Kafka client applications.

However, some updates to your Kafka and ZooKeeper clusters can happen without any corresponding change to the Kafka resource. For example, the Cluster Operator will need to perform a rolling restart if a CA (certificate authority) certificate that it manages is close to expiry.

While a rolling restart of the pods should not affect availability of the service (assuming correct broker and topic configurations), it could affect performance of the Kafka client applications. Maintenance time windows allow you to schedule such spontaneous rolling updates of your Kafka and ZooKeeper clusters to start at a convenient time. If maintenance time windows are not configured for a cluster then it is possible that such spontaneous rolling updates will happen at an inconvenient time, such as during a predictable period of high load.

29.1.2. Maintenance time window definition

You configure maintenance time windows by entering an array of strings in the Kafka.spec.maintenanceTimeWindows property. Each string is a cron expression interpreted as being in UTC (Coordinated Universal Time, which for practical purposes is the same as Greenwich Mean Time).

The following example configures a single maintenance time window that starts at midnight and ends at 01:59am (UTC), on Sundays, Mondays, Tuesdays, Wednesdays, and Thursdays:

# ...
maintenanceTimeWindows:
  - "* * 0-1 ? * SUN,MON,TUE,WED,THU *"
# ...

In practice, maintenance windows should be set in conjunction with the Kafka.spec.clusterCa.renewalDays and Kafka.spec.clientsCa.renewalDays properties of the Kafka resource, to ensure that the necessary CA certificate renewal can be completed in the configured maintenance time windows.

Note

Streams for Apache Kafka does not schedule maintenance operations exactly according to the given windows. Instead, for each reconciliation, it checks whether a maintenance window is currently "open". This means that the start of maintenance operations within a given time window can be delayed by up to the Cluster Operator reconciliation interval. Maintenance time windows must therefore be at least this long.

29.1.3. Configuring a maintenance time window

You can configure a maintenance time window for rolling updates triggered by supported processes.

Prerequisites

  • An OpenShift cluster.
  • The Cluster Operator is running.

Procedure

  1. Add or edit the maintenanceTimeWindows property in the Kafka resource. For example to allow maintenance between 0800 and 1059 and between 1400 and 1559 you would set the maintenanceTimeWindows as shown below:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      maintenanceTimeWindows:
        - "* * 8-10 * * ?"
        - "* * 14-15 * * ?"
  2. Create or update the resource:

    oc apply -f <kafka_configuration_file>

29.2. Starting rolling updates of Kafka and other operands using annotations

Streams for Apache Kafka supports the use of annotations to manually trigger a rolling update of Kafka and other operands through the Cluster Operator. Use annotations to initiate rolling updates of Kafka, Kafka Connect, MirrorMaker 2, and ZooKeeper clusters.

Manually performing a rolling update on a specific pod or set of pods is usually only required in exceptional circumstances. However, rather than deleting the pods directly, if you perform the rolling update through the Cluster Operator you ensure the following:

  • The manual deletion of the pod does not conflict with simultaneous Cluster Operator operations, such as deleting other pods in parallel.
  • The Cluster Operator logic handles the Kafka configuration specifications, such as the number of in-sync replicas.

29.2.1. Performing a rolling update using a pod management annotation

This procedure describes how to trigger a rolling update of Kafka, Kafka Connect, MirrorMaker 2, or ZooKeeper clusters. To trigger the update, you add an annotation to the StrimziPodSet that manages the pods running on the cluster.

Prerequisites

To perform a manual rolling update, you need a running Cluster Operator. The cluster for the component you are updating, whether it’s Kafka, Kafka Connect, MirrorMaker 2, or ZooKeeper, must also be running.

Procedure

  1. Find the name of the resource that controls the pods you want to manually update.

    For example, if your Kafka cluster is named my-cluster, the corresponding names are my-cluster-kafka and my-cluster-zookeeper. For a Kafka Connect cluster named my-connect-cluster, the corresponding name is my-connect-cluster-connect. And for a MirrorMaker 2 cluster named my-mm2-cluster, the corresponding name is my-mm2-cluster-mirrormaker2.

  2. Use oc annotate to annotate the appropriate resource in OpenShift.

    Annotating a StrimziPodSet

    oc annotate strimzipodset <cluster_name>-kafka strimzi.io/manual-rolling-update="true"
    
    oc annotate strimzipodset <cluster_name>-zookeeper strimzi.io/manual-rolling-update="true"
    
    oc annotate strimzipodset <cluster_name>-connect strimzi.io/manual-rolling-update="true"
    
    oc annotate strimzipodset <cluster_name>-mirrormaker2 strimzi.io/manual-rolling-update="true"

  3. Wait for the next reconciliation to occur (every two minutes by default). A rolling update of all pods within the annotated resource is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of all the pods is complete, the annotation is automatically removed from the resource.

29.2.2. Performing a rolling update using a pod annotation

This procedure describes how to manually trigger a rolling update of existing Kafka, Kafka Connect, MirrorMaker 2, or ZooKeeper clusters using an OpenShift Pod annotation. When multiple pods are annotated, consecutive rolling updates are performed within the same reconciliation run.

Prerequisites

To perform a manual rolling update, you need a running Cluster Operator. The cluster for the component you are updating, whether it’s Kafka, Kafka Connect, MirrorMaker 2, or ZooKeeper, must also be running.

You can perform a rolling update on a Kafka cluster regardless of the topic replication factor used. But for Kafka to stay operational during the update, you’ll need the following:

  • A highly available Kafka cluster deployment running with nodes that you wish to update.
  • Topics replicated for high availability.

    Topic configuration specifies a replication factor of at least 3 and a minimum number of in-sync replicas to 1 less than the replication factor.

    Kafka topic replicated for high availability

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 1
      replicas: 3
      config:
        # ...
        min.insync.replicas: 2
        # ...

Procedure

  1. Find the name of the Pod you want to manually update.

    Pod naming conventions are as follows:

    • <cluster_name>-kafka-<index_number> for a Kafka cluster
    • <cluster_name>-zookeeper-<index_number> for a ZooKeeper cluster
    • <cluster_name>-connect-<index_number> for a Kafka Connect cluster
    • <cluster_name>-mirrormaker2-<index_number> for a MirrorMaker 2 cluster

    The <index_number> assigned to a pod starts at zero and ends at the total number of replicas minus one.

  2. Use oc annotate to annotate the Pod resource in OpenShift:

    oc annotate pod <cluster_name>-kafka-<index_number> strimzi.io/manual-rolling-update="true"
    
    oc annotate pod <cluster_name>-zookeeper-<index_number> strimzi.io/manual-rolling-update="true"
    
    oc annotate pod <cluster_name>-connect-<index_number> strimzi.io/manual-rolling-update="true"
    
    oc annotate pod <cluster_name>-mirrormaker2-<index_number> strimzi.io/manual-rolling-update="true"
  3. Wait for the next reconciliation to occur (every two minutes by default). A rolling update of the annotated Pod is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of a pod is complete, the annotation is automatically removed from the Pod.

29.3. Recovering a cluster from persistent volumes

You can recover a Kafka cluster from persistent volumes (PVs) if they are still present.

You might want to do this, for example, after:

  • A namespace was deleted unintentionally
  • A whole OpenShift cluster is lost, but the PVs remain in the infrastructure

29.3.1. Recovery from namespace deletion

Recovery from namespace deletion is possible because of the relationship between persistent volumes and namespaces. A PersistentVolume (PV) is a storage resource that lives outside of a namespace. A PV is mounted into a Kafka pod using a PersistentVolumeClaim (PVC), which lives inside a namespace.

The reclaim policy for a PV tells a cluster how to act when a namespace is deleted. If the reclaim policy is set as:

  • Delete (default), PVs are deleted when PVCs are deleted within a namespace
  • Retain, PVs are not deleted when a namespace is deleted

To ensure that you can recover from a PV if a namespace is deleted unintentionally, the policy must be reset from Delete to Retain in the PV specification using the persistentVolumeReclaimPolicy property:

apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  persistentVolumeReclaimPolicy: Retain

Alternatively, PVs can inherit the reclaim policy of an associated storage class. Storage classes are used for dynamic volume allocation.

By configuring the reclaimPolicy property for the storage class, PVs that use the storage class are created with the appropriate reclaim policy. The storage class is configured for the PV using the storageClassName property.

apiVersion: v1
kind: StorageClass
metadata:
  name: gp2-retain
parameters:
  # ...
# ...
reclaimPolicy: Retain
apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  storageClassName: gp2-retain
Note

If you are using Retain as the reclaim policy, but you want to delete an entire cluster, you need to delete the PVs manually. Otherwise they will not be deleted, and may cause unnecessary expenditure on resources.

29.3.2. Recovery from loss of an OpenShift cluster

When a cluster is lost, you can use the data from disks/volumes to recover the cluster if they were preserved within the infrastructure. The recovery procedure is the same as with namespace deletion, assuming PVs can be recovered and they were created manually.

29.3.3. Recovering a deleted cluster from persistent volumes

This procedure describes how to recover a deleted cluster from persistent volumes (PVs).

In this situation, the Topic Operator identifies that topics exist in Kafka, but the KafkaTopic resources do not exist.

When you get to the step to recreate your cluster, you have two options:

  1. Use Option 1 when you can recover all KafkaTopic resources.

    The KafkaTopic resources must therefore be recovered before the cluster is started so that the corresponding topics are not deleted by the Topic Operator.

  2. Use Option 2 when you are unable to recover all KafkaTopic resources.

    In this case, you deploy your cluster without the Topic Operator, delete the Topic Operator topic store metadata, and then redeploy the Kafka cluster with the Topic Operator so it can recreate the KafkaTopic resources from the corresponding topics.

Note

If the Topic Operator is not deployed, you only need to recover the PersistentVolumeClaim (PVC) resources.

Before you begin

In this procedure, it is essential that PVs are mounted into the correct PVC to avoid data corruption. A volumeName is specified for the PVC and this must match the name of the PV.

For more information, see Persistent storage.

Note

The procedure does not include recovery of KafkaUser resources, which must be recreated manually. If passwords and certificates need to be retained, secrets must be recreated before creating the KafkaUser resources.

Procedure

  1. Check information on the PVs in the cluster:

    oc get pv

    Information is presented for PVs with data.

    Example output showing columns important to this procedure:

    NAME                                         RECLAIMPOLICY CLAIM
    pvc-5e9c5c7f-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-my-cluster-zookeeper-1
    pvc-5e9cc72d-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-0
    pvc-5ead43d1-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-2
    pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-0-my-cluster-kafka-0
    pvc-7e21042e-3317-11ea-9786-02deaf9aa87e ... Retain ...    myproject/data-0-my-cluster-kafka-1
    pvc-7e226978-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-0-my-cluster-kafka-2
    • NAME shows the name of each PV.
    • RECLAIM POLICY shows that PVs are retained.
    • CLAIM shows the link to the original PVCs.
  2. Recreate the original namespace:

    oc create namespace myproject
  3. Recreate the original PVC resource specifications, linking the PVCs to the appropriate PV:

    For example:

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: data-0-my-cluster-kafka-0
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 100Gi
      storageClassName: gp2-retain
      volumeMode: Filesystem
      volumeName: pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c
  4. Edit the PV specifications to delete the claimRef properties that bound the original PVC.

    For example:

    apiVersion: v1
    kind: PersistentVolume
    metadata:
      annotations:
        kubernetes.io/createdby: aws-ebs-dynamic-provisioner
        pv.kubernetes.io/bound-by-controller: "yes"
        pv.kubernetes.io/provisioned-by: kubernetes.io/aws-ebs
      creationTimestamp: "<date>"
      finalizers:
      - kubernetes.io/pv-protection
      labels:
        failure-domain.beta.kubernetes.io/region: eu-west-1
        failure-domain.beta.kubernetes.io/zone: eu-west-1c
      name: pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      resourceVersion: "39431"
      selfLink: /api/v1/persistentvolumes/pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      uid: 7efe6b0d-3317-11ea-a650-06e1eadd9a4c
    spec:
      accessModes:
      - ReadWriteOnce
      awsElasticBlockStore:
        fsType: xfs
        volumeID: aws://eu-west-1c/vol-09db3141656d1c258
      capacity:
        storage: 100Gi
      claimRef:
        apiVersion: v1
        kind: PersistentVolumeClaim
        name: data-0-my-cluster-kafka-2
        namespace: myproject
        resourceVersion: "39113"
        uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: failure-domain.beta.kubernetes.io/zone
              operator: In
              values:
              - eu-west-1c
            - key: failure-domain.beta.kubernetes.io/region
              operator: In
              values:
              - eu-west-1
      persistentVolumeReclaimPolicy: Retain
      storageClassName: gp2-retain
      volumeMode: Filesystem

    In the example, the following properties are deleted:

    claimRef:
      apiVersion: v1
      kind: PersistentVolumeClaim
      name: data-0-my-cluster-kafka-2
      namespace: myproject
      resourceVersion: "39113"
      uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
  5. Deploy the Cluster Operator.

    oc create -f install/cluster-operator -n my-project
  6. Recreate your cluster.

    Follow the steps depending on whether or not you have all the KafkaTopic resources needed to recreate your cluster.

    Option 1: If you have all the KafkaTopic resources that existed before you lost your cluster, including internal topics such as committed offsets from __consumer_offsets:

    1. Recreate all KafkaTopic resources.

      It is essential that you recreate the resources before deploying the cluster, or the Topic Operator will delete the topics.

    2. Deploy the Kafka cluster.

      For example:

      oc apply -f kafka.yaml

    Option 2: If you do not have all the KafkaTopic resources that existed before you lost your cluster:

    1. Deploy the Kafka cluster, as with the first option, but without the Topic Operator by removing the topicOperator property from the Kafka resource before deploying.

      If you include the Topic Operator in the deployment, the Topic Operator will delete all the topics.

    2. Delete the internal topic store topics from the Kafka cluster:

      oc run kafka-admin -ti --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete

      The command must correspond to the type of listener and authentication used to access the Kafka cluster.

    3. Enable the Topic Operator by redeploying the Kafka cluster with the topicOperator property to recreate the KafkaTopic resources.

      For example:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        #...
        entityOperator:
          topicOperator: {} 1
          #...
    1
    Here we show the default configuration, which has no additional properties. You specify the required configuration using the properties described in the EntityTopicOperatorSpec schema reference.
  7. Verify the recovery by listing the KafkaTopic resources:

    oc get KafkaTopic

29.4. Frequently asked questions

Red Hat logoGithubRedditYoutubeTwitter

Apprendre

Essayez, achetez et vendez

Communautés

À propos de la documentation Red Hat

Nous aidons les utilisateurs de Red Hat à innover et à atteindre leurs objectifs grâce à nos produits et services avec un contenu auquel ils peuvent faire confiance.

Rendre l’open source plus inclusif

Red Hat s'engage à remplacer le langage problématique dans notre code, notre documentation et nos propriétés Web. Pour plus de détails, consultez leBlog Red Hat.

À propos de Red Hat

Nous proposons des solutions renforcées qui facilitent le travail des entreprises sur plusieurs plates-formes et environnements, du centre de données central à la périphérie du réseau.

© 2024 Red Hat, Inc.