Rechercher

Ce contenu n'est pas disponible dans la langue sélectionnée.

Chapter 7. Creating and managing topics

download PDF

Messages in Kafka are always sent to or received from a topic. This chapter describes how to create and manage Kafka topics.

7.1. Partitions and replicas

A topic is always split into one or more partitions. Partitions act as shards. That means that every message sent by a producer is always written only into a single partition.

Each partition can have one or more replicas, which will be stored on different brokers in the cluster. When creating a topic you can configure the number of replicas using the replication factor. Replication factor defines the number of copies which will be held within the cluster. One of the replicas for a given partition will be elected as a leader. The leader replica will be used by the producers to send new messages and by the consumers to consume messages. The other replicas will be follower replicas. The followers replicate the leader.

If the leader fails, one of the in-sync followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so the load is well balanced within the cluster.

Note

The replication factor determines the number of replicas including the leader and the followers. For example, if you set the replication factor to 3, then there will be one leader and two follower replicas.

7.2. Message retention

The message retention policy defines how long the messages will be stored on the Kafka brokers. It can be defined based on time, partition size or both.

For example, you can define that the messages should be kept:

  • For 7 days
  • Until the partition has 1GB of messages. Once the limit is reached, the oldest messages will be removed.
  • For 7 days or until the 1GB limit has been reached. Whatever limit comes first will be used.
Warning

Kafka brokers store messages in log segments. The messages which are past their retention policy will be deleted only when a new log segment is created. New log segments are created when the previous log segment exceeds the configured log segment size. Additionally, users can request new segments to be created periodically.

Kafka brokers support a compacting policy.

For a topic with the compacted policy, the broker will always keep only the last message for each key. The older messages with the same key will be removed from the partition. Because compacting is a periodically executed action, it does not happen immediately when the new message with the same key is sent to the partition. Instead it might take some time until the older messages are removed.

For more information about the message retention configuration options, see Section 7.5, “Topic configuration”.

7.3. Topic auto-creation

By default, Kafka automatically creates a topic if a producer or consumer attempts to send or receive messages from a non-existent topic. This behavior is governed by the auto.create.topics.enable configuration property, which is set to true by default.

For production environments, it is recommended to disable automatic topic creation. To do so, set auto.create.topics.enable to false in the Kafka configuration properties file:

Disabling automatic topic creation

auto.create.topics.enable=false

7.4. Topic deletion

Kafka provides the option to prevent topic deletion, controlled by the delete.topic.enable property. By default, this property is set to true, allowing topics to be deleted.

However, setting it to false in the Kafka configuration properties file will disable topic deletion. In this case, attempts to delete a topic will return a success status, but the topic itself will not be deleted.

Disabling topic deletion

delete.topic.enable=false

7.5. Topic configuration

Auto-created topics will use the default topic configuration which can be specified in the broker properties file. However, when creating topics manually, their configuration can be specified at creation time. It is also possible to change a topic’s configuration after it has been created. The main topic configuration options for manually created topics are:

cleanup.policy
Configures the retention policy to delete or compact. The delete policy will delete old records. The compact policy will enable log compaction. The default value is delete. For more information about log compaction, see Kafka website.
compression.type
Specifies the compression which is used for stored messages. Valid values are gzip, snappy, lz4, uncompressed (no compression) and producer (retain the compression codec used by the producer). The default value is producer.
max.message.bytes
The maximum size of a batch of messages allowed by the Kafka broker, in bytes. The default value is 1000012.
min.insync.replicas
The minimum number of replicas which must be in sync for a write to be considered successful. The default value is 1.
retention.ms
Maximum number of milliseconds for which log segments will be retained. Log segments older than this value will be deleted. The default value is 604800000 (7 days).
retention.bytes
The maximum number of bytes a partition will retain. Once the partition size grows over this limit, the oldest log segments will be deleted. Value of -1 indicates no limit. The default value is -1.
segment.bytes
The maximum file size of a single commit log segment file in bytes. When the segment reaches its size, a new segment will be started. The default value is 1073741824 bytes (1 gibibyte).

The defaults for auto-created topics can be specified in the Kafka broker configuration using similar options:

log.cleanup.policy
See cleanup.policy above.
compression.type
See compression.type above.
message.max.bytes
See max.message.bytes above.
min.insync.replicas
See min.insync.replicas above.
log.retention.ms
See retention.ms above.
log.retention.bytes
See retention.bytes above.
log.segment.bytes
See segment.bytes above.
default.replication.factor
Default replication factor for automatically created topics. Default value is 1.
num.partitions
Default number of partitions for automatically created topics. Default value is 1.

7.6. Internal topics

Internal topics are created and used internally by the Kafka brokers and clients. Kafka has several internal topics, two of which are used to store consumer offsets (__consumer_offsets) and transaction state (__transaction_state).

__consumer_offsets and __transaction_state topics can be configured using dedicated Kafka broker configuration options starting with prefix offsets.topic. and transaction.state.log..

The most important configuration options are:

offsets.topic.replication.factor
Number of replicas for __consumer_offsets topic. The default value is 3.
offsets.topic.num.partitions
Number of partitions for __consumer_offsets topic. The default value is 50.
transaction.state.log.replication.factor
Number of replicas for __transaction_state topic. The default value is 3.
transaction.state.log.num.partitions
Number of partitions for __transaction_state topic. The default value is 50.
transaction.state.log.min.isr
Minimum number of replicas that must acknowledge a write to __transaction_state topic to be considered successful. If this minimum cannot be met, then the producer will fail with an exception. The default value is 2.

7.7. Creating a topic

Use the kafka-topics.sh tool to manage topics. kafka-topics.sh is part of the Streams for Apache Kafka distribution and is found in the bin directory.

Prerequisites

Creating a topic

  1. Create a topic using the kafka-topics.sh utility and specify the following:

    • Host and port of the Kafka broker in the --bootstrap-server option.
    • The new topic to be created in the --create option.
    • Topic name in the --topic option.
    • The number of partitions in the --partitions option.
    • Topic replication factor in the --replication-factor option.

      You can also override some of the default topic configuration options using the option --config. This option can be used multiple times to override different options.

      /opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --create --topic <TopicName> --partitions <NumberOfPartitions> --replication-factor <ReplicationFactor> --config <Option1>=<Value1> --config <Option2>=<Value2>

      Example of the command to create a topic named mytopic

      /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mytopic --partitions 50 --replication-factor 3 --config cleanup.policy=compact --config min.insync.replicas=2

  2. Verify that the topic exists using kafka-topics.sh.

    /opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --describe --topic <TopicName>

    Example of the command to describe a topic named mytopic

    /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic mytopic

7.8. Listing and describing topics

The kafka-topics.sh tool can be used to list and describe topics. kafka-topics.sh is part of the Streams for Apache Kafka distribution and can be found in the bin directory.

Prerequisites

Describing a topic

  1. Describe a topic using the kafka-topics.sh utility and specify the following:

    • Host and port of the Kafka broker in the --bootstrap-server option.
    • Use the --describe option to specify that you want to describe a topic.
    • Topic name must be specified in the --topic option.
    • When the --topic option is omitted, it describes all available topics.

      /opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_host>:<port> --describe --topic <topic_name>

      Example of the command to describe a topic named mytopic

      /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic mytopic

      The command lists all partitions and replicas which belong to this topic. It also lists all topic configuration options.

7.9. Modifying a topic configuration

The kafka-configs.sh tool can be used to modify topic configurations. kafka-configs.sh is part of the Streams for Apache Kafka distribution and can be found in the bin directory.

Prerequisites

Modify topic configuration

  1. Use the kafka-configs.sh tool to get the current configuration.

    • Specify the host and port of the Kafka broker in the --bootstrap-server option.
    • Set the --entity-type as topic and --entity-name to the name of your topic.
    • Use --describe option to get the current configuration.

      /opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_host>:<port> --entity-type topics --entity-name <topic_name> --describe

      Example of the command to get configuration of a topic named mytopic

      /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mytopic --describe

  2. Use the kafka-configs.sh tool to change the configuration.

    • Specify the host and port of the Kafka broker in the --bootstrap-server option.
    • Set the --entity-type as topic and --entity-name to the name of your topic.
    • Use --alter option to modify the current configuration.
    • Specify the options you want to add or change in the option --add-config.

      /opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_host>:<port> --entity-type topics --entity-name <topic_name> --alter --add-config <option>=<value>

      Example of the command to change configuration of a topic named mytopic

      /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mytopic --alter --add-config min.insync.replicas=1

  3. Use the kafka-configs.sh tool to delete an existing configuration option.

    • Specify the host and port of the Kafka broker in the --bootstrap-server option.
    • Set the --entity-type as topic and --entity-name to the name of your topic.
    • Use --delete-config option to remove existing configuration option.
    • Specify the options you want to remove in the option --remove-config.

      /opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_host>:<port> --entity-type topics --entity-name <topic_name> --alter --delete-config <option>

      Example of the command to change configuration of a topic named mytopic

      /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mytopic --alter --delete-config min.insync.replicas

7.10. Deleting a topic

The kafka-topics.sh tool can be used to manage topics. kafka-topics.sh is part of the Streams for Apache Kafka distribution and can be found in the bin directory.

Prerequisites

Deleting a topic

  1. Delete a topic using the kafka-topics.sh utility.

    • Host and port of the Kafka broker in the --bootstrap-server option.
    • Use the --delete option to specify that an existing topic should be deleted.
    • Topic name must be specified in the --topic option.

      /opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_host>:<port> --delete --topic <topic_name>

      Example of the command to create a topic named mytopic

      /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic mytopic

  2. Verify that the topic was deleted using kafka-topics.sh.

    /opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_host>:<port> --list

    Example of the command to list all topics

    /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Red Hat logoGithubRedditYoutubeTwitter

Apprendre

Essayez, achetez et vendez

Communautés

À propos de la documentation Red Hat

Nous aidons les utilisateurs de Red Hat à innover et à atteindre leurs objectifs grâce à nos produits et services avec un contenu auquel ils peuvent faire confiance.

Rendre l’open source plus inclusif

Red Hat s'engage à remplacer le langage problématique dans notre code, notre documentation et nos propriétés Web. Pour plus de détails, consultez leBlog Red Hat.

À propos de Red Hat

Nous proposons des solutions renforcées qui facilitent le travail des entreprises sur plusieurs plates-formes et environnements, du centre de données central à la périphérie du réseau.

© 2024 Red Hat, Inc.