Chapter 11. Managing AMQ Streams
This chapter covers tasks to maintain a deployment of AMQ Streams.
11.1. Working with custom resources
You can use oc
commands to retrieve information and perform other operations on AMQ Streams custom resources.
Using oc
with the status
subresource of a custom resource allows you to get the information about the resource.
11.1.1. Performing oc
operations on custom resources
Use oc
commands, such as get
, describe
, edit
, or delete
, to perform operations on resource types. For example, oc get kafkatopics
retrieves a list of all Kafka topics and oc get kafkas
retrieves all deployed Kafka clusters.
When referencing resource types, you can use both singular and plural names: oc get kafkas
gets the same results as oc get kafka
.
You can also use the short name of the resource. Learning short names can save you time when managing AMQ Streams. The short name for Kafka
is k
, so you can also run oc get k
to list all Kafka clusters.
oc get k NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS my-cluster 3 3
AMQ Streams resource | Long name | Short name |
---|---|---|
Kafka | kafka | k |
Kafka Topic | kafkatopic | kt |
Kafka User | kafkauser | ku |
Kafka Connect | kafkaconnect | kc |
Kafka Connector | kafkaconnector | kctr |
Kafka Mirror Maker | kafkamirrormaker | kmm |
Kafka Mirror Maker 2 | kafkamirrormaker2 | kmm2 |
Kafka Bridge | kafkabridge | kb |
Kafka Rebalance | kafkarebalance | kr |
11.1.1.1. Resource categories
Categories of custom resources can also be used in oc
commands.
All AMQ Streams custom resources belong to the category strimzi
, so you can use strimzi
to get all the AMQ Streams resources with one command.
For example, running oc get strimzi
lists all AMQ Streams custom resources in a given namespace.
oc get strimzi NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS kafka.kafka.strimzi.io/my-cluster 3 3 NAME PARTITIONS REPLICATION FACTOR kafkatopic.kafka.strimzi.io/kafka-apps 3 3 NAME AUTHENTICATION AUTHORIZATION kafkauser.kafka.strimzi.io/my-user tls simple
The oc get strimzi -o name
command returns all resource types and resource names. The -o name
option fetches the output in the type/name format
oc get strimzi -o name kafka.kafka.strimzi.io/my-cluster kafkatopic.kafka.strimzi.io/kafka-apps kafkauser.kafka.strimzi.io/my-user
You can combine this strimzi
command with other commands. For example, you can pass it into a oc delete
command to delete all resources in a single command.
oc delete $(oc get strimzi -o name) kafka.kafka.strimzi.io "my-cluster" deleted kafkatopic.kafka.strimzi.io "kafka-apps" deleted kafkauser.kafka.strimzi.io "my-user" deleted
Deleting all resources in a single operation might be useful, for example, when you are testing new AMQ Streams features.
11.1.1.2. Querying the status of sub-resources
There are other values you can pass to the -o
option. For example, by using -o yaml
you get the output in YAML format. Using -o json
will return it as JSON.
You can see all the options in oc get --help
.
One of the most useful options is the JSONPath support, which allows you to pass JSONPath expressions to query the Kubernetes API. A JSONPath expression can extract or navigate specific parts of any resource.
For example, you can use the JSONPath expression {.status.listeners[?(@.name=="tls")].bootstrapServers}
to get the bootstrap address from the status of the Kafka custom resource and use it in your Kafka clients.
Here, the command finds the bootstrapServers
value of the listener named tls
:
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="tls")].bootstrapServers}{"\n"}' my-cluster-kafka-bootstrap.myproject.svc:9093
By changing the name condition you can also get the address of the other Kafka listeners.
You can use jsonpath
to extract any other property or group of properties from any custom resource.
11.1.2. AMQ Streams custom resource status information
Several resources have a status
property, as described in the following table.
AMQ Streams resource | Schema reference | Publishes status information on… |
---|---|---|
| The Kafka cluster. | |
| The Kafka Connect cluster, if deployed. | |
| KafkaConnector resources, if deployed. | |
| The Kafka MirrorMaker tool, if deployed. | |
| Kafka topics in your Kafka cluster. | |
| Kafka users in your Kafka cluster. | |
| The AMQ Streams Kafka Bridge, if deployed. |
The status
property of a resource provides information on the resource’s:
-
Current state, in the
status.conditions
property -
Last observed generation, in the
status.observedGeneration
property
The status
property also provides resource-specific information. For example:
-
KafkaStatus
provides information on listener addresses, and the id of the Kafka cluster. -
KafkaConnectStatus
provides the REST API endpoint for Kafka Connect connectors. -
KafkaUserStatus
provides the user name of the Kafka user and theSecret
in which their credentials are stored. -
KafkaBridgeStatus
provides the HTTP address at which external client applications can access the Bridge service.
A resource’s current state is useful for tracking progress related to the resource achieving its desired state, as defined by the spec
property. The status conditions provide the time and reason the state of the resource changed and details of events preventing or delaying the operator from realizing the resource’s desired state.
The last observed generation is the generation of the resource that was last reconciled by the Cluster Operator. If the value of observedGeneration
is different from the value of metadata.generation
, the operator has not yet processed the latest update to the resource. If these values are the same, the status information reflects the most recent changes to the resource.
AMQ Streams creates and maintains the status of custom resources, periodically evaluating the current state of the custom resource and updating its status accordingly. When performing an update on a custom resource using oc edit
, for example, its status
is not editable. Moreover, changing the status
would not affect the configuration of the Kafka cluster.
Here we see the status
property specified for a Kafka custom resource.
Kafka custom resource with status
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: spec: # ... status: conditions: 1 - lastTransitionTime: 2021-07-23T23:46:57+0000 status: "True" type: Ready 2 observedGeneration: 4 3 listeners: 4 - addresses: - host: my-cluster-kafka-bootstrap.myproject.svc port: 9092 type: plain - addresses: - host: my-cluster-kafka-bootstrap.myproject.svc port: 9093 certificates: - | -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- type: tls - addresses: - host: 172.29.49.180 port: 9094 certificates: - | -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- type: external clusterId: CLUSTER-ID 5 # ...
- 1
- Status
conditions
describe criteria related to the status that cannot be deduced from the existing resource information, or are specific to the instance of a resource. - 2
- The
Ready
condition indicates whether the Cluster Operator currently considers the Kafka cluster able to handle traffic. - 3
- The
observedGeneration
indicates the generation of theKafka
custom resource that was last reconciled by the Cluster Operator. - 4
- The
listeners
describe the current Kafka bootstrap addresses by type. - 5
- The Kafka cluster id.Important
The address in the custom resource status for external listeners with type
nodeport
is currently not supported.
The Kafka bootstrap addresses listed in the status do not signify that those endpoints or the Kafka cluster is in a ready state.
Accessing status information
You can access status information for a resource from the command line. For more information, see Section 11.1.3, “Finding the status of a custom resource”.
11.1.3. Finding the status of a custom resource
This procedure describes how to find the status of a custom resource.
Prerequisites
- An OpenShift cluster.
- The Cluster Operator is running.
Procedure
Specify the custom resource and use the
-o jsonpath
option to apply a standard JSONPath expression to select thestatus
property:oc get kafka <kafka_resource_name> -o jsonpath='{.status}'
This expression returns all the status information for the specified custom resource. You can use dot notation, such as
status.listeners
orstatus.observedGeneration
, to fine-tune the status information you wish to see.
Additional resources
- Section 11.1.2, “AMQ Streams custom resource status information”
- For more information about using JSONPath, see JSONPath support.
11.2. Pausing reconciliation of custom resources
Sometimes it is useful to pause the reconciliation of custom resources managed by AMQ Streams Operators, so that you can perform fixes or make updates. If reconciliations are paused, any changes made to custom resources are ignored by the Operators until the pause ends.
If you want to pause reconciliation of a custom resource, set the strimzi.io/pause-reconciliation
annotation to true
in its configuration. This instructs the appropriate Operator to pause reconciliation of the custom resource. For example, you can apply the annotation to the KafkaConnect
resource so that reconciliation by the Cluster Operator is paused.
You can also create a custom resource with the pause annotation enabled. The custom resource is created, but it is ignored.
Prerequisites
- The AMQ Streams Operator that manages the custom resource is running.
Procedure
Annotate the custom resource in OpenShift, setting
pause-reconciliation
totrue
:oc annotate <kind_of_custom_resource> <name_of_custom_resource> strimzi.io/pause-reconciliation="true"
For example, for the
KafkaConnect
custom resource:oc annotate KafkaConnect my-connect strimzi.io/pause-reconciliation="true"
Check that the status conditions of the custom resource show a change to
ReconciliationPaused
:oc describe <kind_of_custom_resource> <name_of_custom_resource>
The
type
condition changes toReconciliationPaused
at thelastTransitionTime
.Example custom resource with a paused reconciliation condition type
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: annotations: strimzi.io/pause-reconciliation: "true" strimzi.io/use-connector-resources: "true" creationTimestamp: 2021-03-12T10:47:11Z #... spec: # ... status: conditions: - lastTransitionTime: 2021-03-12T10:47:41.689249Z status: "True" type: ReconciliationPaused
Resuming from pause
-
To resume reconciliation, you can set the annotation to
false
, or remove the annotation.
Additional resources
11.3. Evicting pods with AMQ Streams Drain Cleaner
Kafka and ZooKeeper pods might be evicted during OpenShift upgrades, maintenance or pod rescheduling. If your Kafka broker and ZooKeeper pods were deployed by AMQ Streams, you can use the AMQ Streams Drain Cleaner tool to handle the pod evictions. Since the AMQ Streams Drain Cleaner will handle the eviction instead of OpenShift, you need to set the podDisruptionBudget
for your Kafka deployment to 0
(zero). OpenShift will then no longer be allowed to evict the pod automatically.
By deploying the AMQ Streams Drain Cleaner, you can use the Cluster Operator to move Kafka pods instead of OpenShift. The Cluster Operator ensures that topics are never under-replicated. Kafka can remain operational during the eviction process. The Cluster Operator waits for topics to synchronize, as the OpenShift worker nodes drain consecutively.
An admission webhook notifies the AMQ Streams Drain Cleaner of pod eviction requests to the Kubernetes API. The AMQ Streams Drain Cleaner then adds a rolling update annotation to the pods to be drained. This informs the Cluster Operator to perform a rolling update of an evicted pod.
If you are not using the AMQ Streams Drain Cleaner, you can add pod annotations to perform rolling updates manually.
Webhook configuration
The AMQ Streams Drain Cleaner deployment files include a ValidatingWebhookConfiguration
resource file. The resource provides the configuration for registering the webhook with the Kubernetes API.
The configuration defines the rules
for the Kubernetes API to follow in the event of a pod eviction request. The rules specify that only CREATE
operations related to pods/eviction
sub-resources are intercepted. If these rules are met, the API forwards the notification.
The clientConfig
points to the AMQ Streams Drain Cleaner service and /drainer
endpoint that exposes the webhook. The webhook uses a secure TLS connection, which requires authentication. The caBundle
property specifies the certificate chain to validate HTTPS communication. Certificates are encoded in Base64.
Webhook configuration for pod eviction notifications
apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration # ... webhooks: - name: strimzi-drain-cleaner.strimzi.io rules: - apiGroups: [""] apiVersions: ["v1"] operations: ["CREATE"] resources: ["pods/eviction"] scope: "Namespaced" clientConfig: service: namespace: "strimzi-drain-cleaner" name: "strimzi-drain-cleaner" path: /drainer port: 443 caBundle: Cg== # ...
11.3.1. Prerequisites
To deploy and use the AMQ Streams Drain Cleaner, you need to download the deployment files.
The AMQ Streams Drain Cleaner deployment files are available from the AMQ Streams software downloads page.
11.3.2. Deploying the AMQ Streams Drain Cleaner
Deploy the AMQ Streams Drain Cleaner to the OpenShift cluster where the Cluster Operator and Kafka cluster are running.
Prerequisites
- You have downloaded the AMQ Streams Drain Cleaner deployment files.
- You have a highly available Kafka cluster deployment running with OpenShift worker nodes that you would like to update.
Topics are replicated for high availability.
Topic configuration specifies a replication factor of at least 3 and a minimum number of in-sync replicas to 1 less than the replication factor.
Kafka topic replicated for high availability
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 1 replicas: 3 config: # ... min.insync.replicas: 2 # ...
Excluding ZooKeeper
If you don’t want to include ZooKeeper, you can remove the --zookeeper
command option from the AMQ Streams Drain Cleaner Deployment
configuration file.
apiVersion: apps/v1
kind: Deployment
spec:
# ...
template:
spec:
serviceAccountName: strimzi-drain-cleaner
containers:
- name: strimzi-drain-cleaner
# ...
command:
- "/application"
- "-Dquarkus.http.host=0.0.0.0"
- "--kafka"
- "--zookeeper" 1
# ...
- 1
- Remove this option to exclude ZooKeeper from AMQ Streams Drain Cleaner operations.
Procedure
Configure a pod disruption budget of
0
(zero) for your Kafka deployment usingtemplate
settings in theKafka
resource.Specifying a pod disruption budget
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: template: podDisruptionBudget: maxUnavailable: 0 # ... zookeeper: template: podDisruptionBudget: maxUnavailable: 0 # ...
Reducing the maximum pod disruption budget to zero prevents OpenShift from automatically evicting the pods in case of voluntary disruptions, leaving the AMQ Streams Drain Cleaner and AMQ Streams Cluster Operator to roll the pod which will be scheduled by OpenShift on a different worker node.
Add the same configuration for ZooKeeper if you want to use AMQ Streams Drain Cleaner to drain ZooKeeper nodes.
Update the
Kafka
resource:oc apply -f <kafka-configuration-file>
Deploy the AMQ Streams Drain Cleaner.
To run the Drain Cleaner on OpenShift, apply the resources in the
/install/drain-cleaner/openshift
directory.oc apply -f ./install/drain-cleaner/openshift
11.3.3. Using the AMQ Streams Drain Cleaner
Use the AMQ Streams Drain Cleaner in combination with the Cluster Operator to move Kafka broker or ZooKeeper pods from nodes that are being drained. When you run the AMQ Streams Drain Cleaner, it annotates pods with a rolling update pod annotation. The Cluster Operator performs rolling updates based on the annotation.
Prerequisites
- You have deployed the AMQ Streams Drain Cleaner.
Procedure
Drain a specified OpenShift node hosting the Kafka broker or ZooKeeper pods.
oc get nodes oc drain <name-of-node> --delete-emptydir-data --ignore-daemonsets --timeout=6000s --force
Check the eviction events in the AMQ Streams Drain Cleaner log to verify that the pods have been annotated for restart.
AMQ Streams Drain Cleaner log show annotations of pods
INFO ... Received eviction webhook for Pod my-cluster-zookeeper-2 in namespace my-project INFO ... Pod my-cluster-zookeeper-2 in namespace my-project will be annotated for restart INFO ... Pod my-cluster-zookeeper-2 in namespace my-project found and annotated for restart INFO ... Received eviction webhook for Pod my-cluster-kafka-0 in namespace my-project INFO ... Pod my-cluster-kafka-0 in namespace my-project will be annotated for restart INFO ... Pod my-cluster-kafka-0 in namespace my-project found and annotated for restart
Check the reconciliation events in the Cluster Operator log to verify the rolling updates.
Cluster Operator log shows rolling updates
INFO PodOperator:68 - Reconciliation #13(timer) Kafka(my-project/my-cluster): Rolling Pod my-cluster-zookeeper-2 INFO PodOperator:68 - Reconciliation #13(timer) Kafka(my-project/my-cluster): Rolling Pod my-cluster-kafka-0 INFO AbstractOperator:500 - Reconciliation #13(timer) Kafka(my-project/my-cluster): reconciled
11.4. Manually starting rolling updates of Kafka and ZooKeeper clusters
AMQ Streams supports the use of annotations on resources to manually trigger a rolling update of Kafka and ZooKeeper clusters through the Cluster Operator. Rolling updates restart the pods of the resource with new ones.
Manually performing a rolling update on a specific pod or set of pods is usually only required in exceptional circumstances. However, rather than deleting the pods directly, if you perform the rolling update through the Cluster Operator you ensure the following:
- The manual deletion of the pod does not conflict with simultaneous Cluster Operator operations, such as deleting other pods in parallel.
- The Cluster Operator logic handles the Kafka configuration specifications, such as the number of in-sync replicas.
11.4.1. Prerequisites
To perform a manual rolling update, you need a running Cluster Operator and Kafka cluster.
See the Deploying and Upgrading AMQ Streams on OpenShift guide for instructions on running a:
11.4.2. Performing a rolling update using a pod management annotation
This procedure describes how to trigger a rolling update of a Kafka cluster or ZooKeeper cluster.
To trigger the update, you add an annotation to the resource you are using to manage the pods running on the cluster. You annotate the StatefulSet
or StrimziPodSet
resource (if you enabled the UseStrimziPodSets feature gate).
Procedure
Find the name of the resource that controls the Kafka or ZooKeeper pods you want to manually update.
For example, if your Kafka cluster is named my-cluster, the corresponding names are my-cluster-kafka and my-cluster-zookeeper.
Use
oc annotate
to annotate the appropriate resource in OpenShift.Annotating a StatefulSet
oc annotate statefulset <cluster_name>-kafka strimzi.io/manual-rolling-update=true oc annotate statefulset <cluster_name>-zookeeper strimzi.io/manual-rolling-update=true
Annotating a StrimziPodSet
oc annotate strimzipodset <cluster_name>-kafka strimzi.io/manual-rolling-update=true oc annotate strimzipodset <cluster_name>-zookeeper strimzi.io/manual-rolling-update=true
- Wait for the next reconciliation to occur (every two minutes by default). A rolling update of all pods within the annotated resource is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of all the pods is complete, the annotation is removed from the resource.
11.4.3. Performing a rolling update using a Pod annotation
This procedure describes how to manually trigger a rolling update of an existing Kafka cluster or ZooKeeper cluster using an OpenShift Pod
annotation. When multiple pods are annotated, consecutive rolling updates are performed within the same reconciliation run.
Prerequisites
You can perform a rolling update on a Kafka cluster regardless of the topic replication factor used. But for Kafka to stay operational during the update, you’ll need the following:
- A highly available Kafka cluster deployment running with nodes that you wish to update.
Topics replicated for high availability.
Topic configuration specifies a replication factor of at least 3 and a minimum number of in-sync replicas to 1 less than the replication factor.
Kafka topic replicated for high availability
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 1 replicas: 3 config: # ... min.insync.replicas: 2 # ...
Procedure
Find the name of the Kafka or ZooKeeper
Pod
you want to manually update.For example, if your Kafka cluster is named my-cluster, the corresponding
Pod
names are my-cluster-kafka-index and my-cluster-zookeeper-index. The index starts at zero and ends at the total number of replicas minus one.Annotate the
Pod
resource in OpenShift.Use
oc annotate
:oc annotate pod cluster-name-kafka-index strimzi.io/manual-rolling-update=true oc annotate pod cluster-name-zookeeper-index strimzi.io/manual-rolling-update=true
-
Wait for the next reconciliation to occur (every two minutes by default). A rolling update of the annotated
Pod
is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of a pod is complete, the annotation is removed from thePod
.
11.5. Discovering services using labels and annotations
Service discovery makes it easier for client applications running in the same OpenShift cluster as AMQ Streams to interact with a Kafka cluster.
A service discovery label and annotation is generated for services used to access the Kafka cluster:
- Internal Kafka bootstrap service
- HTTP Bridge service
The label helps to make the service discoverable, and the annotation provides connection details that a client application can use to make the connection.
The service discovery label, strimzi.io/discovery
, is set as true
for the Service
resources. The service discovery annotation has the same key, providing connection details in JSON format for each service.
Example internal Kafka bootstrap service
apiVersion: v1 kind: Service metadata: annotations: strimzi.io/discovery: |- [ { "port" : 9092, "tls" : false, "protocol" : "kafka", "auth" : "scram-sha-512" }, { "port" : 9093, "tls" : true, "protocol" : "kafka", "auth" : "tls" } ] labels: strimzi.io/cluster: my-cluster strimzi.io/discovery: "true" strimzi.io/kind: Kafka strimzi.io/name: my-cluster-kafka-bootstrap name: my-cluster-kafka-bootstrap spec: #...
Example HTTP Bridge service
apiVersion: v1 kind: Service metadata: annotations: strimzi.io/discovery: |- [ { "port" : 8080, "tls" : false, "auth" : "none", "protocol" : "http" } ] labels: strimzi.io/cluster: my-bridge strimzi.io/discovery: "true" strimzi.io/kind: KafkaBridge strimzi.io/name: my-bridge-bridge-service
11.5.1. Returning connection details on services
You can find the services by specifying the discovery label when fetching services from the command line or a corresponding API call.
oc get service -l strimzi.io/discovery=true
The connection details are returned when retrieving the service discovery label.
11.6. Recovering a cluster from persistent volumes
You can recover a Kafka cluster from persistent volumes (PVs) if they are still present.
You might want to do this, for example, after:
- A namespace was deleted unintentionally
- A whole OpenShift cluster is lost, but the PVs remain in the infrastructure
11.6.1. Recovery from namespace deletion
Recovery from namespace deletion is possible because of the relationship between persistent volumes and namespaces. A PersistentVolume
(PV) is a storage resource that lives outside of a namespace. A PV is mounted into a Kafka pod using a PersistentVolumeClaim
(PVC), which lives inside a namespace.
The reclaim policy for a PV tells a cluster how to act when a namespace is deleted. If the reclaim policy is set as:
- Delete (default), PVs are deleted when PVCs are deleted within a namespace
- Retain, PVs are not deleted when a namespace is deleted
To ensure that you can recover from a PV if a namespace is deleted unintentionally, the policy must be reset from Delete to Retain in the PV specification using the persistentVolumeReclaimPolicy
property:
apiVersion: v1
kind: PersistentVolume
# ...
spec:
# ...
persistentVolumeReclaimPolicy: Retain
Alternatively, PVs can inherit the reclaim policy of an associated storage class. Storage classes are used for dynamic volume allocation.
By configuring the reclaimPolicy
property for the storage class, PVs that use the storage class are created with the appropriate reclaim policy. The storage class is configured for the PV using the storageClassName
property.
apiVersion: v1 kind: StorageClass metadata: name: gp2-retain parameters: # ... # ... reclaimPolicy: Retain
apiVersion: v1
kind: PersistentVolume
# ...
spec:
# ...
storageClassName: gp2-retain
If you are using Retain as the reclaim policy, but you want to delete an entire cluster, you need to delete the PVs manually. Otherwise they will not be deleted, and may cause unnecessary expenditure on resources.
11.6.2. Recovery from loss of an OpenShift cluster
When a cluster is lost, you can use the data from disks/volumes to recover the cluster if they were preserved within the infrastructure. The recovery procedure is the same as with namespace deletion, assuming PVs can be recovered and they were created manually.
11.6.3. Recovering a deleted cluster from persistent volumes
This procedure describes how to recover a deleted cluster from persistent volumes (PVs).
In this situation, the Topic Operator identifies that topics exist in Kafka, but the KafkaTopic
resources do not exist.
When you get to the step to recreate your cluster, you have two options:
Use Option 1 when you can recover all
KafkaTopic
resources.The
KafkaTopic
resources must therefore be recovered before the cluster is started so that the corresponding topics are not deleted by the Topic Operator.Use Option 2 when you are unable to recover all
KafkaTopic
resources.In this case, you deploy your cluster without the Topic Operator, delete the Topic Operator topic store metadata, and then redeploy the Kafka cluster with the Topic Operator so it can recreate the
KafkaTopic
resources from the corresponding topics.
If the Topic Operator is not deployed, you only need to recover the PersistentVolumeClaim
(PVC) resources.
Before you begin
In this procedure, it is essential that PVs are mounted into the correct PVC to avoid data corruption. A volumeName
is specified for the PVC and this must match the name of the PV.
For more information, see:
The procedure does not include recovery of KafkaUser
resources, which must be recreated manually. If passwords and certificates need to be retained, secrets must be recreated before creating the KafkaUser
resources.
Procedure
Check information on the PVs in the cluster:
oc get pv
Information is presented for PVs with data.
Example output showing columns important to this procedure:
NAME RECLAIMPOLICY CLAIM pvc-5e9c5c7f-3317-11ea-a650-06e1eadd9a4c ... Retain ... myproject/data-my-cluster-zookeeper-1 pvc-5e9cc72d-3317-11ea-97b0-0aef8816c7ea ... Retain ... myproject/data-my-cluster-zookeeper-0 pvc-5ead43d1-3317-11ea-97b0-0aef8816c7ea ... Retain ... myproject/data-my-cluster-zookeeper-2 pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c ... Retain ... myproject/data-0-my-cluster-kafka-0 pvc-7e21042e-3317-11ea-9786-02deaf9aa87e ... Retain ... myproject/data-0-my-cluster-kafka-1 pvc-7e226978-3317-11ea-97b0-0aef8816c7ea ... Retain ... myproject/data-0-my-cluster-kafka-2
- NAME shows the name of each PV.
- RECLAIM POLICY shows that PVs are retained.
- CLAIM shows the link to the original PVCs.
Recreate the original namespace:
oc create namespace myproject
Recreate the original PVC resource specifications, linking the PVCs to the appropriate PV:
For example:
apiVersion: v1 kind: PersistentVolumeClaim metadata: name: data-0-my-cluster-kafka-0 spec: accessModes: - ReadWriteOnce resources: requests: storage: 100Gi storageClassName: gp2-retain volumeMode: Filesystem volumeName: pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c
Edit the PV specifications to delete the
claimRef
properties that bound the original PVC.For example:
apiVersion: v1 kind: PersistentVolume metadata: annotations: kubernetes.io/createdby: aws-ebs-dynamic-provisioner pv.kubernetes.io/bound-by-controller: "yes" pv.kubernetes.io/provisioned-by: kubernetes.io/aws-ebs creationTimestamp: "<date>" finalizers: - kubernetes.io/pv-protection labels: failure-domain.beta.kubernetes.io/region: eu-west-1 failure-domain.beta.kubernetes.io/zone: eu-west-1c name: pvc-7e226978-3317-11ea-97b0-0aef8816c7ea resourceVersion: "39431" selfLink: /api/v1/persistentvolumes/pvc-7e226978-3317-11ea-97b0-0aef8816c7ea uid: 7efe6b0d-3317-11ea-a650-06e1eadd9a4c spec: accessModes: - ReadWriteOnce awsElasticBlockStore: fsType: xfs volumeID: aws://eu-west-1c/vol-09db3141656d1c258 capacity: storage: 100Gi claimRef: apiVersion: v1 kind: PersistentVolumeClaim name: data-0-my-cluster-kafka-2 namespace: myproject resourceVersion: "39113" uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: failure-domain.beta.kubernetes.io/zone operator: In values: - eu-west-1c - key: failure-domain.beta.kubernetes.io/region operator: In values: - eu-west-1 persistentVolumeReclaimPolicy: Retain storageClassName: gp2-retain volumeMode: Filesystem
In the example, the following properties are deleted:
claimRef: apiVersion: v1 kind: PersistentVolumeClaim name: data-0-my-cluster-kafka-2 namespace: myproject resourceVersion: "39113" uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
Deploy the Cluster Operator.
oc create -f install/cluster-operator -n my-project
Recreate your cluster.
Follow the steps depending on whether or not you have all the
KafkaTopic
resources needed to recreate your cluster.Option 1: If you have all the
KafkaTopic
resources that existed before you lost your cluster, including internal topics such as committed offsets from__consumer_offsets
:Recreate all
KafkaTopic
resources.It is essential that you recreate the resources before deploying the cluster, or the Topic Operator will delete the topics.
Deploy the Kafka cluster.
For example:
oc apply -f kafka.yaml
Option 2: If you do not have all the
KafkaTopic
resources that existed before you lost your cluster:Deploy the Kafka cluster, as with the first option, but without the Topic Operator by removing the
topicOperator
property from the Kafka resource before deploying.If you include the Topic Operator in the deployment, the Topic Operator will delete all the topics.
Delete the internal topic store topics from the Kafka cluster:
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
The command must correspond to the type of listener and authentication used to access the Kafka cluster.
Enable the Topic Operator by redeploying the Kafka cluster with the
topicOperator
property to recreate theKafkaTopic
resources.For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} 1 #...
- 1
- Here we show the default configuration, which has no additional properties. You specify the required configuration using the properties described in Section 12.2.45, “
EntityTopicOperatorSpec
schema reference”.
Verify the recovery by listing the
KafkaTopic
resources:oc get KafkaTopic
11.7. Setting limits on brokers using the Kafka Static Quota plugin
Use the Kafka Static Quota plugin to set throughput and storage limits on brokers in your Kafka cluster. You enable the plugin and set limits by configuring the Kafka
resource. You can set a byte-rate threshold and storage quotas to put limits on the clients interacting with your brokers.
You can set byte-rate thresholds for producer and consumer bandwidth. The total limit is distributed across all clients accessing the broker. For example, you can set a byte-rate threshold of 40 MBps for producers. If two producers are running, they are each limited to a throughput of 20 MBps.
Storage quotas throttle Kafka disk storage limits between a soft limit and hard limit. The limits apply to all available disk space. Producers are slowed gradually between the soft and hard limit. The limits prevent disks filling up too quickly and exceeding their capacity. Full disks can lead to issues that are hard to rectify. The hard limit is the maximum storage limit.
For JBOD storage, the limit applies across all disks. If a broker is using two 1 TB disks and the quota is 1.1 TB, one disk might fill and the other disk will be almost empty.
Prerequisites
- The Cluster Operator that manages the Kafka cluster is running.
Procedure
Add the plugin properties to the
config
of theKafka
resource.The plugin properties are shown in this example configuration.
Example Kafka Static Quota plugin configuration
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... config: client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback 1 client.quota.callback.static.produce: 1000000 2 client.quota.callback.static.fetch: 1000000 3 client.quota.callback.static.storage.soft: 400000000000 4 client.quota.callback.static.storage.hard: 500000000000 5 client.quota.callback.static.storage.check-interval: 5 6
- 1
- Loads the Kafka Static Quota plugin.
- 2
- Sets the producer byte-rate threshold. 1 MBps in this example.
- 3
- Sets the consumer byte-rate threshold. 1 MBps in this example.
- 4
- Sets the lower soft limit for storage. 400 GB in this example.
- 5
- Sets the higher hard limit for storage. 500 GB in this example.
- 6
- Sets the interval in seconds between checks on storage. 5 seconds in this example. You can set this to 0 to disable the check.
Update the resource.
oc apply -f <kafka_configuration_file>
Additional resources