Ce contenu n'est pas disponible dans la langue sélectionnée.

Chapter 10. Using the Topic Operator to manage Kafka topics


The KafkaTopic resource configures topics, including partition and replication factor settings. When you create, modify, or delete a topic using KafkaTopic, the Topic Operator ensures that these changes are reflected in the Kafka cluster.

For more information on the KafkaTopic resource, see the KafkaTopic schema reference.

10.1. Topic management

The KafkaTopic resource is responsible for managing a single topic within a Kafka cluster.

The Topic Operator operates as follows:

  • When a KafkaTopic is created, deleted, or changed, the Topic Operator performs the corresponding operation on the Kafka topic.

If a topic is created, deleted, or modified directly within the Kafka cluster, without the presence of a corresponding KafkaTopic resource, the Topic Operator does not manage that topic. The Topic Operator will only manage Kafka topics associated with KafkaTopic resources and does not interfere with topics managed independently within the Kafka cluster. If a KafkaTopic does exist for a Kafka topic, any configuration changes made outside the resource are reverted.

The Topic Operator can detect cases where where multiple KafkaTopic resources are attempting to manage a Kafka topic using the same .spec.topicName. Only the oldest resource is reconciled, while the other resources fail with a resource conflict error.

10.2. Topic naming conventions

A KafkaTopic resource includes a name for the topic and a label that identifies the name of the Kafka cluster it belongs to.

Label identifying a Kafka cluster for topic handling

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
  labels:
    strimzi.io/cluster: my-cluster
spec:
  topicName: topic-name-1
Copy to Clipboard Toggle word wrap

The label provides the cluster name of the Kafka resource. The Topic Operator uses the label as a mechanism for determining which KafkaTopic resources to manage. If the label does not match the Kafka cluster, the Topic Operator cannot see the KafkaTopic, and the topic is not created.

Kafka and OpenShift have their own naming validation rules, and a Kafka topic name might not be a valid resource name in OpenShift. If possible, try and stick to a naming convention that works for both.

Consider the following guidelines:

  • Use topic names that reflect the nature of the topic
  • Be concise and keep the name under 63 characters
  • Use all lower case and hyphens
  • Avoid special characters, spaces or symbols

The KafkaTopic resource allows you to specify the Kafka topic name using the metadata.name field. However, if the desired Kafka topic name is not a valid OpenShift resource name, you can use the spec.topicName property to specify the actual name. The spec.topicName field is optional, and when it’s absent, the Kafka topic name defaults to the metadata.name of the topic. When a topic is created, the topic name cannot be changed later.

Example of supplying a valid Kafka topic name

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic-1 
1

spec:
  topicName: My.Topic.1 
2

  # ...
Copy to Clipboard Toggle word wrap

1
A valid topic name that works in OpenShift.
2
A Kafka topic name that uses upper case and periods, which are invalid in OpenShift.

If more than one KafkaTopic resource refers to the same Kafka topic, the resource that was created first is considered to be the one managing the topic. The status of the newer resources is updated to indicate a conflict, and their Ready status is changed to False.

A Kafka client application, such as Kafka Streams, can automatically create topics with invalid OpenShift resource names. If you want to manage these topics, you must create KafkaTopic resources with a different .metadata.name, as shown in the previous example.

Note

For more information on the requirements for identifiers and names in a cluster, refer to the OpenShift documentation Object Names and IDs.

10.3. Handling changes to topics

Configuration changes only go in one direction: from the KafkaTopic resource to the Kafka topic. Any changes to a Kafka topic managed outside the KafkaTopic resource are reverted.

If you are reverting back to a version of Streams for Apache Kafka earlier than 2.8, which uses internal topics for the storage of topic metadata, you still downgrade your Cluster Operator to the previous version, then downgrade Kafka brokers and client applications to the previous Kafka version as standard.

If you are reverting back to a version of Streams for Apache Kafka earlier than 1.7, which uses ZooKeeper for the storage of topic metadata, you still downgrade your Cluster Operator to the previous version, then downgrade Kafka brokers and client applications to the previous Kafka version as standard.

However, you must also delete the topics that were created for the topic store using a kafka-topics command, specifying the bootstrap address of the Kafka cluster. For example:

oc run kafka-admin -ti --image=registry.redhat.io/amq-streams/kafka-38-rhel9:2.8.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
Copy to Clipboard Toggle word wrap

The command must correspond to the type of listener and authentication used to access the Kafka cluster.

The Topic Operator will reconstruct the ZooKeeper topic metadata from the state of the topics in Kafka.

10.3.3. Automatic creation of topics

Applications can trigger the automatic creation of topics in the Kafka cluster. By default, the Kafka broker configuration auto.create.topics.enable is set to true, allowing the broker to create topics automatically when an application attempts to produce or consume from a non-existing topic. Applications might also use the Kafka AdminClient to automatically create topics. When an application is deployed along with its KafkaTopic resources, it is possible that automatic topic creation in the cluster happens before the Topic Operator can react to the KafkaTopic.

The topics created for an application deployment are initially created with default topic configuration. If the Topic Operator attempts to reconfigure the topics based on KafkaTopic resource specifications included with the application deployment, the operation might fail because the required change to the configuration is not allowed. For example, if the change means lowering the number of topic partitions. For this reason, it is recommended to disable auto.create.topics.enable in the Kafka cluster configuration.

10.4. Configuring Kafka topics

Use the properties of the KafkaTopic resource to configure Kafka topics. Changes made to topic configuration in the KafkaTopic are propagated to Kafka.

You can use oc apply to create or modify topics, and oc delete to delete existing topics.

For example:

  • oc apply -f <topic_config_file>
  • oc delete KafkaTopic <topic_name>

To be able to delete topics, delete.topic.enable must be set to true (default) in the spec.kafka.config of the Kafka resource.

This procedure shows how to create a topic with 10 partitions and 2 replicas.

Before you begin

The KafkaTopic resource does not allow the following changes:

  • Renaming the topic defined in spec.topicName. A mismatch between spec.topicName and status.topicName will be detected.
  • Decreasing the number of partitions using spec.partitions (not supported by Kafka).
  • Modifying the number of replicas specified in spec.replicas.
Warning

Increasing spec.partitions for topics with keys will alter the partitioning of records, which can cause issues, especially when the topic uses semantic partitioning.

Prerequisites

  • A running Kafka cluster configured with a Kafka broker listener using mTLS authentication and TLS encryption.
  • A running Topic Operator (typically deployed with the Entity Operator).
  • For deleting a topic, delete.topic.enable=true (default) in the spec.kafka.config of the Kafka resource.

Procedure

  1. Configure the KafkaTopic resource.

    Example Kafka topic configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic-1
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2
    Copy to Clipboard Toggle word wrap

    Tip

    When modifying a topic, you can get the current version of the resource using oc get kafkatopic my-topic-1 -o yaml.

  2. Create the KafkaTopic resource in OpenShift.

    oc apply -f <topic_config_file>
    Copy to Clipboard Toggle word wrap
  3. Wait for the ready status of the topic to change to True:

    oc get kafkatopics -o wide -w -n <namespace>
    Copy to Clipboard Toggle word wrap

    Kafka topic status

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-1   my-cluster  10          3                  True
    my-topic-2   my-cluster  10          3
    my-topic-3   my-cluster  10          3                  True
    Copy to Clipboard Toggle word wrap

    Topic creation is successful when the READY output shows True.

  4. If the READY column stays blank, get more details on the status from the resource YAML or from the Topic Operator logs.

    Status messages provide details on the reason for the current status.

    oc get kafkatopics my-topic-2 -o yaml
    Copy to Clipboard Toggle word wrap

    Details on a topic with a NotReady status

    # ...
    status:
      conditions:
      - lastTransitionTime: "2022-06-13T10:14:43.351550Z"
        message: Number of partitions cannot be decreased
        reason: PartitionDecreaseException
        status: "True"
        type: NotReady
    Copy to Clipboard Toggle word wrap

    In this example, the reason the topic is not ready is because the original number of partitions was reduced in the KafkaTopic configuration. Kafka does not support this.

    After resetting the topic configuration, the status shows the topic is ready.

    oc get kafkatopics my-topic-2 -o wide -w -n <namespace>
    Copy to Clipboard Toggle word wrap

    Status update of the topic

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-2   my-cluster  10          3                  True
    Copy to Clipboard Toggle word wrap

    Fetching the details shows no messages

    oc get kafkatopics my-topic-2 -o yaml
    Copy to Clipboard Toggle word wrap

    Details on a topic with a READY status

    # ...
    status:
      conditions:
      - lastTransitionTime: '2022-06-13T10:15:03.761084Z'
        status: 'True'
        type: Ready
    Copy to Clipboard Toggle word wrap

The recommended configuration for topics managed by the Topic Operator is a topic replication factor of 3, and a minimum of 2 in-sync replicas.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10 
1

  replicas: 3 
2

  config:
    min.insync.replicas: 2 
3

  #...
Copy to Clipboard Toggle word wrap
1
The number of partitions for the topic.
2
The number of replica topic partitions. Changing the number of replicas in the topic configuration requires a deployment of Cruise Control. For more information, see Chapter 21, Using Cruise Control to modify topic replication factor.
3
The minimum number of replica partitions that a message must be successfully written to, or an exception is raised.
Note

In-sync replicas are used in conjunction with the acks configuration for producer applications. The acks configuration determines the number of follower partitions a message must be replicated to before the message is acknowledged as successfully received. Replicas need to be reassigned when adding or removing brokers (see Chapter 19, Scaling clusters by adding or removing brokers).

This procedure describes how to convert Kafka topics that are currently managed through the KafkaTopic resource into non-managed topics. This capability can be useful in various scenarios. For instance, you might want to update the metadata.name of a KafkaTopic resource. You can only do that by deleting the original KafkaTopic resource and recreating a new one.

By annotating a KafkaTopic resource with strimzi.io/managed=false, you indicate that the Topic Operator should no longer manage that particular topic. This allows you to retain the Kafka topic while making changes to the resource’s configuration or other administrative tasks.

Procedure

  1. Annotate the KafkaTopic resource in OpenShift, setting strimzi.io/managed to false:

    oc annotate kafkatopic my-topic-1 strimzi.io/managed="false"
    Copy to Clipboard Toggle word wrap

    Specify the metadata.name of the topic in your KafkaTopic resource, which is my-topic-1 in this example.

  2. Check the status of the KafkaTopic resource to make sure the request was successful:

    oc get kafkatopics my-topic-1 -o yaml
    Copy to Clipboard Toggle word wrap

    Example topic with a Ready status

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      generation: 124
      name: my-topic-1
      finalizer:
        strimzi.io/topic-operator
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2
    
    # ...
    status:
      observedGeneration: 124 
    1
    
      topicName: my-topic-1
      conditions:
      - type: Ready
        status: True
        lastTransitionTime: 20230301T103000Z
    Copy to Clipboard Toggle word wrap

    1
    Successful reconciliation of the resource means the topic is no longer managed.

    The value of metadata.generation (the current version of the deployment) must match status.observedGeneration (the latest reconciliation of the resource).

  3. You can now make changes to the KafkaTopic resource without it affecting the Kafka topic it was managing.

    For example, to change the metadata.name, do as follows:

    1. Delete the original KafkTopic resource:

      oc delete kafkatopic <kafka_topic_name>
      Copy to Clipboard Toggle word wrap
    2. Recreate the KafkTopic resource with a different metadata.name, but use spec.topicName to refer to the same topic that was managed by the original
  4. If you haven’t deleted the original KafkaTopic resource, and you wish to resume management of the Kafka topic again, set the strimzi.io/managed annotation to true or remove the annotation.

10.7. Enabling topic management for existing Kafka topics

This procedure describes how to enable topic management for topics that are not currently managed through the KafkaTopic resource. You do this by creating a matching KafkaTopic resource.

Procedure

  1. Create a KafkaTopic resource with a metadata.name that is the same as the Kafka topic.

    Or use spec.topicName if the name of the topic in Kafka would not be a legal OpenShift resource name.

    Example Kafka topic configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic-1
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2
    Copy to Clipboard Toggle word wrap

    In this example, the Kafka topic is named my-topic-1.

    The Topic Operator checks whether the topic is managed by another KafkaTopic resource. If it is, the older resource takes precedence and a resource conflict error is returned in the status of the new resource.

  2. Apply the KafkaTopic resource:

    oc apply -f <topic_configuration_file>
    Copy to Clipboard Toggle word wrap
  3. Wait for the operator to update the topic in Kafka.

    The operator updates the Kafka topic with the spec of the KafkaTopic that has the same name.

  4. Check the status of the KafkaTopic resource to make sure the request was successful:

    oc get kafkatopics my-topic-1 -o yaml
    Copy to Clipboard Toggle word wrap

    Example topic with a Ready status

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      generation: 1
      name: my-topic-1
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2
    # ...
    status:
      observedGeneration: 1 
    1
    
      topicName: my-topic-1
      conditions:
      - type: Ready
        status: True
        lastTransitionTime: 20230301T103000Z
    Copy to Clipboard Toggle word wrap

    1
    Successful reconciliation of the resource means the topic is now managed.

    The value of metadata.generation (the current version of the deployment) must match status.observedGeneration (the latest reconciliation of the resource).

10.8. Deleting managed topics

The Topic Operator supports the deletion of topics managed through the KafkaTopic resource with or without OpenShift finalizers. This is determined by the STRIMZI_USE_FINALIZERS Topic Operator environment variable. By default, this is set to true, though it can be set to false in the Topic Operator env configuration if you do not want the Topic Operator to add finalizers.

Finalizers ensure orderly and controlled deletion of KafkaTopic resources. A finalizer for the Topic Operator is added to the metadata of the KafkaTopic resource:

Finalizer to control topic deletion

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  generation: 1
  name: my-topic-1
  finalizers:
    - strimzi.io/topic-operator
  labels:
    strimzi.io/cluster: my-cluster
Copy to Clipboard Toggle word wrap

In this example, the finalizer is added for topic my-topic-1. The finalizer prevents the topic from being fully deleted until the finalization process is complete. If you then delete the topic using oc delete kafkatopic my-topic-1, a timestamp is added to the metadata:

Finalizer timestamp on deletion

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  generation: 1
  name: my-topic-1
  finalizers:
    - strimzi.io/topic-operator
  labels:
    strimzi.io/cluster: my-cluster
  deletionTimestamp: 20230301T000000.000
Copy to Clipboard Toggle word wrap

The resource is still present. If the deletion fails, it is shown in the status of the resource.

When the finalization tasks are successfully executed, the finalizer is removed from the metadata, and the resource is fully deleted.

Finalizers also serve to prevent related resources from being deleted. If the Topic Operator is not running, it won’t be able to remove its finalizer from the metadata.finalizers. And any attempt to directly delete the KafkaTopic resources or the namespace will fail or timeout, leaving the namespace in a stuck terminating state. If this happens, you can bypass the finalization process by removing the finalizers on topics.

10.9. Removing finalizers on topics

If the Topic Operator is not running, and you want to bypass the finalization process when deleting managed topics, you must remove the finalizers. You can do this manually by editing the resources directly or by using a command.

To remove finalizers on all topics, use the following command:

Removing finalizers on topics

oc get kt -o=json | jq '.items[].metadata.finalizers = null' | oc apply -f -
Copy to Clipboard Toggle word wrap

The command uses the jq command line JSON parser tool to modify the KafkaTopic (kt) resources by setting the finalizers to null. You can also use the command for a specific topic:

Removing a finalizer on a specific topic

oc get kt <topic_name> -o=json | jq '.metadata.finalizers = null' | oc apply -f -
Copy to Clipboard Toggle word wrap

After running the command, you can go ahead and delete the topics. Alternatively, if the topics were already being deleted but were blocked due to outstanding finalizers then their deletion should complete.

Warning

Be careful when removing finalizers, as any cleanup operations associated with the finalization process are not performed if the Topic Operator is not running. For example, if you remove the finalizer from a KafkaTopic resource and subsequently delete the resource, the related Kafka topic won’t be deleted.

10.10. Considerations when disabling topic deletion

When the delete.topic.enable configuration in Kafka is set to false, topics cannot be deleted. This might be required in certain scenarios, but it introduces a consideration when using the Topic Operator.

As topics cannot be deleted, finalizers added to the metadata of a KafkaTopic resource to control topic deletion are never removed by the Topic Operator (though they can be removed manually). Similarly, any Custom Resource Definitions (CRDs) or namespaces associated with topics cannot be deleted.

Before configuring delete.topic.enable=false, assess these implications to ensure it aligns with your specific requirements.

Note

To avoid using finalizers, you can set the STRIMZI_USE_FINALIZERS Topic Operator environment variable to false.

10.11. Tuning request batches for topic operations

The Topic Operator uses the request batching capabilities of the Kafka Admin API for operations on topic resources. You can fine-tune the batching mechanism using the following operator configuration properties:

  • STRIMZI_MAX_QUEUE_SIZE to set the maximum size of the topic event queue. The default value is 1024.
  • STRIMZI_MAX_BATCH_SIZE to set the maximum number of topic events allowed in a single batch. The default value is 100.
  • MAX_BATCH_LINGER_MS to specify the maximum time to wait for a batch to accumulate items before processing. The default is 100 milliseconds.

If the maximum size of the request batching queue is exceeded, the Topic Operator shuts down and is restarted. To prevent frequent restarts, consider adjusting the STRIMZI_MAX_QUEUE_SIZE property to accommodate the typical load.

Retour au début
Red Hat logoGithubredditYoutubeTwitter

Apprendre

Essayez, achetez et vendez

Communautés

À propos de la documentation Red Hat

Nous aidons les utilisateurs de Red Hat à innover et à atteindre leurs objectifs grâce à nos produits et services avec un contenu auquel ils peuvent faire confiance. Découvrez nos récentes mises à jour.

Rendre l’open source plus inclusif

Red Hat s'engage à remplacer le langage problématique dans notre code, notre documentation et nos propriétés Web. Pour plus de détails, consultez le Blog Red Hat.

À propos de Red Hat

Nous proposons des solutions renforcées qui facilitent le travail des entreprises sur plusieurs plates-formes et environnements, du centre de données central à la périphérie du réseau.

Theme

© 2025 Red Hat