Chapter 5. Topics
Messages in Kafka are always sent to or received from a topic. This chapter describes how to configure and manage Kafka topics.
5.1. Partitions and replicas
Messages in Kafka are always sent to or received from a topic. A topic is always split into one or more partitions. Partitions act as shards. That means that every message sent by a producer is always written only into a single partition. Thanks to the sharding of messages into different partitions, topics are easy to scale horizontally.
Each partition can have one or more replicas, which will be stored on different brokers in the cluster. When creating a topic you can configure the number of replicas using the replication factor. Replication factor defines the number of copies which will be held within the cluster. One of the replicas for given partition will be elected as a leader. The leader replica will be used by the producers to send new messages and by the consumers to consume messages. The other replicas will be follower replicas. The followers replicate the leader.
If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so the load is well balanced within the cluster.
The replication factor determines the number of replicas including the leader and the followers. For example, if you set the replication factor to 3
, then there will one leader and two follower replicas.
5.2. Message retention
The message retention policy defines how long the messages will be stored on the Kafka brokers. It can be defined based on time, partition size or both.
For example, you can define that the messages should be kept:
- For 7 days
- Until the parition has 1GB of messages. Once the limit is reached, the oldest messages will be removed.
- For 7 days or until the 1GB limit has been reached. Whatever limit comes first will be used.
Kafka brokers store messages in log segments. The messages which are past their retention policy will be deleted only when a new log segment is created. New log segments are created when the previous log segment exceeds the configured log segment size. Additionally, users can request new segments to be created periodically.
Additionally, Kafka brokers support a compacting policy.
For a topic with the compacted policy, the broker will always keep only the last message for each key. The older messages with the same key will be removed from the partition. Because compacting is a periodically executed action, it does not happen immediately when the new message with the same key are sent to the partition. Instead it might take some time until the older messages are removed.
For more information about the message retention configuration options, see Section 5.5, “Topic configuration”.
5.3. Topic auto-creation
When a producer or consumer tries to send messages to or receive messages from a topic that does not exist, Kafka will, by default, automatically create that topic. This behavior is controlled by the auto.create.topics.enable
configuration property which is set to true
by default.
To disable it, set auto.create.topics.enable
to false
in the Kafka broker configuration file:
auto.create.topics.enable=false
5.4. Topic deletion
Kafka offers the possibility to disable deletion of topics. This is configured through the delete.topic.enable
property, which is set to true
by default (that is, deleting topics is possible). When this property is set to false
it will be not possible to delete topics and all attempts to delete topic will return success but the topic will not be deleted.
delete.topic.enable=false
5.5. Topic configuration
Auto-created topics will use the default topic configuration which can be specified in the broker properties file. However, when creating topics manually, their configuration can be specified at creation time. It is also possible to change a topic’s configuration after it has been created. The main topic configuration options for manually created topics are:
cleanup.policy
-
Configures the retention policy to
delete
orcompact
. Thedelete
policy will delete old records. Thecompact
policy will enable log compaction. The default value isdelete
. For more information about log compaction, see Kafka website. compression.type
-
Specifies the compression which is used for stored messages. Valid values are
gzip
,snappy
,lz4
,uncompressed
(no compression) andproducer
(retain the compression codec used by the producer). The default value isproducer
. max.message.bytes
-
The maximum size of a batch of messages allowed by the Kafka broker, in bytes. The default value is
1000012
. min.insync.replicas
-
The minimum number of replicas which must be in sync for a write to be considered successful. The default value is
1
. retention.ms
-
Maximum number of milliseconds for which log segments will be retained. Log segments older than this value will be deleted. The default value is
604800000
(7 days). retention.bytes
-
The maximum number of bytes a partition will retain. Once the partition size grows over this limit, the oldest log segments will be deleted. Value of
-1
indicates no limit. The default value is-1
. segment.bytes
-
The maximum file size of a single commit log segment file in bytes. When the segment reaches its size, a new segment will be started. The default value is
1073741824
bytes (1 gibibyte).
For list of all supported topic configuration options, see Appendix B, Topic configuration parameters.
The defaults for auto-created topics can be specified in the Kafka broker configuration using similar options:
log.cleanup.policy
-
See
cleanup.policy
above. compression.type
-
See
compression.type
above. message.max.bytes
-
See
max.message.bytes
above. min.insync.replicas
-
See
min.insync.replicas
above. log.retention.ms
-
See
retention.ms
above. log.retention.bytes
-
See
retention.bytes
above. log.segment.bytes
-
See
segment.bytes
above. default.replication.factor
-
Default replication factor for automatically created topics. Default value is
1
. num.partitions
-
Default number of partitions for automatically created topics. Default value is
1
.
For list of all supported Kafka broker configuration options, see Appendix A, Broker configuration parameters.
5.6. Internal topics
Internal topics are created and used internally by the Kafka brokers and clients. Kafka has several internal topics. These are used to store consumer offsets (__consumer_offsets
) or transaction state (__transaction_state
). These topics can be configured using dedicated Kafka broker configuration options starting with prefix offsets.topic.
and transaction.state.log.
. The most important configuration options are:
offsets.topic.replication.factor
-
Number of replicas for
__consumer_offsets
topic. The default value is3
. offsets.topic.num.partitions
-
Number of partitions for
__consumer_offsets
topic. The default value is50
. transaction.state.log.replication.factor
-
Number of replicas for
__transaction_state
topic. The default value is3
. transaction.state.log.num.partitions
-
Number of partitions for
__transaction_state
topic. The default value is50
. transaction.state.log.min.isr
-
Minimum number of replicas that must acknowledge a write to
__transaction_state
topic to be considered successful. If this minimum cannot be met, then the producer will fail with an exception. The default value is2
.
5.7. Creating a topic
The kafka-topics.sh
tool can be used to manage topics. kafka-topics.sh
is part of the AMQ Streams distribution and can be found in the bin
directory.
Prerequisites
- AMQ Streams cluster is installed and running
Creating a topic
Create a topic using the
kafka-topics.sh
utility and specify the following: ZooKeeper URL in the--zookeeper
option. The new topic to be created in the--create
option. Topic name in the--topic
option. The number of partitions in the--partitions
option. Replication factor in the--replication-factor
option.You can also override some of the default topic configuration options using the option
--config
. This option can be used multiple times to override different options.bin/kafka-topics.sh --zookeeper <ZooKeeperAddress> --create --topic <TopicName> --partitions <NumberOfPartitions> --replication-factor <ReplicationFactor> --config <Option1>=<Value1> --config <Option2>=<Value2>
Example of the command to create a topic named
mytopic
bin/kafka-topics.sh --zookeeper zoo1.my-domain.com:2181 --create --topic mytopic --partitions 50 --replication-factor 3 --config cleanup.policy=compact --config min.insync.replicas=2
Verify that the topic exists using
kafka-topics.sh
.bin/kafka-topics.sh --zookeeper <ZooKeeperAddress> --describe --topic <TopicName>
Example of the command to describe a topic named
mytopic
bin/kafka-topics.sh --zookeeper zoo1.my-domain.com:2181 --describe --topic mytopic
Additional resources
- For more information about topic configuration, see Section 5.5, “Topic configuration”.
- For list of all supported topic configuration options, see Appendix B, Topic configuration parameters.
5.8. Listing and describing topics
The kafka-topics.sh
tool can be used to list and describe topics. kafka-topics.sh
is part of the AMQ Streams distribution and can be found in the bin
directory.
Prerequisites
- AMQ Streams cluster is installed and running
-
Topic
mytopic
exists
Describing a topic
Describe a topic using the
kafka-topics.sh
utility.-
Specify the ZooKeeper URL in the
--zookeeper
option. -
Use
--describe
option to specify that you want to describe a topic. -
Topic name has to be specified in the
--topic
option. When the
--topic
option is omitted, it will describe all available topics.bin/kafka-topics.sh --zookeeper <ZooKeeperAddress> --describe --topic <TopicName>
Example of the command to describe a topic named
mytopic
bin/kafka-topics.sh --zookeeper zoo1.my-domain.com:2181 --describe --topic mytopic
The describe command will list all partitions and replicas which belong to this topic. It will also list all topic configuration options.
-
Specify the ZooKeeper URL in the
Additional resources
- For more information about topic configuration, see Section 5.5, “Topic configuration”.
- For more information about creating topics, see Section 5.7, “Creating a topic”.
5.9. Modifying a topic configuration
The kafka-configs.sh
tool can be used to modify topic configurations. kafka-configs.sh
is part of the AMQ Streams distribution and can be found in the bin
directory.
Prerequisites
- AMQ Streams cluster is installed and running
-
Topic
mytopic
exists
Modify topic configuration
Use the
kafka-configs.sh
tool to get the current configuration.-
Specify the ZooKeeper URL in the
--zookeeper
option. -
Set the
--entity-type
astopic
and--entity-name
to the name of your topic. Use
--describe
option to get the current configuration.bin/kafka-configs.sh --zookeeper <ZooKeeperAddress> --entity-type topics --entity-name <TopicName> --describe
Example of the command to get configuration of a topic named
mytopic
bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --entity-type topics --entity-name mytopic --describe
-
Specify the ZooKeeper URL in the
Use the
kafka-configs.sh
tool to change the configuration.-
Specify the ZooKeeper URL in the
--zookeeper
options. -
Set the
--entity-type
astopic
and--entity-name
to the name of your topic. -
Use
--alter
option to modify the current configuration. Specify the options you want to add or change in the option
--add-config
.bin/kafka-configs.sh --zookeeper <ZooKeeperAddress> --entity-type topics --entity-name <TopicName> --alter --add-config <Option>=<Value>
Example of the command to change configuration of a topic named
mytopic
bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --entity-type topics --entity-name mytopic --alter --add-config min.insync.replicas=1
-
Specify the ZooKeeper URL in the
Use the
kafka-configs.sh
tool to delete an existing configuration option.-
Specify the ZooKeeper URL in the
--zookeeper
options. -
Set the
--entity-type
astopic
and--entity-name
to the name of your topic. -
Use
--delete-config
option to remove existing configuration option. Specify the options you want to remove in the option
--remove-config
.bin/kafka-configs.sh --zookeeper <ZooKeeperAddress> --entity-type topics --entity-name <TopicName> --alter --delete-config <Option>
Example of the command to change configuration of a topic named
mytopic
bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --entity-type topics --entity-name mytopic --alter --delete-config min.insync.replicas
-
Specify the ZooKeeper URL in the
Additional resources
- For more information about topic configuration, see Section 5.5, “Topic configuration”.
- For more information about creating topics, see Section 5.7, “Creating a topic”.
- For list of all supported topic configuration options, see Appendix B, Topic configuration parameters.
5.10. Deleting a topic
The kafka-topics.sh
tool can be used to manage topics. kafka-topics.sh
is part of the AMQ Streams distribution and can be found in the bin
directory.
Prerequisites
- AMQ Streams cluster is installed and running
-
Topic
mytopic
exists
Deleting a topic
Delete a topic using the
kafka-topics.sh
utility.-
Specify the ZooKeeper URL in the
--zookeeper
option. -
Use
--delete
option to specify that an existing topic should be deleted. Topic name has to be specified in the
--topic
option.bin/kafka-topics.sh --zookeeper <ZooKeeperAddress> --delete --topic <TopicName>
Example of the command to create a topic named
mytopic
bin/kafka-topics.sh --zookeeper zoo1.my-domain.com:2181 --delete --topic mytopic
-
Specify the ZooKeeper URL in the
Verify that the topic was deleted using
kafka-topics.sh
.bin/kafka-topics.sh --zookeeper <ZooKeeperAddress> --list
Example of the command to list all topics
bin/kafka-topics.sh --zookeeper zoo1.my-domain.com:2181 --list
Additional resources
- For more information about creating topics, see Section 5.7, “Creating a topic”.