Search

Chapter 5. Kafka producer configuration tuning

download PDF

Use configuration properties to optimize the performance of Kafka producers. You can use standard Kafka producer configuration options. Adjusting your configuration to maximize throughput might increase latency or vice versa. You will need to experiment and tune your producer configuration to get the balance you need.

When configuring a producer, consider the following aspects carefully, as they significantly impact its performance and behavior:

Compression
By compressing messages before they are sent over the network, you can conserve network bandwidth and reduce disk storage requirements, but with the additional cost of increased CPU utilization due to the compression and decompression processes.
Batching
Adjusting the batch size and time intervals when the producer sends messages can affect throughput and latency.
Partitioning
Partitioning strategies in the Kafka cluster can support producers through parallelism and load balancing, whereby producers can write to multiple partitions concurrently and each partition receives an equal share of messages. Other strategies might include topic replication for fault tolerance.
Securing access
Implement security measures for authentication, encryption, and authorization by setting up user accounts to manage secure access to Kafka.

5.1. Basic producer configuration

Connection and serializer properties are required for every producer. Generally, it is good practice to add a client id for tracking, and use compression on the producer to reduce batch sizes in requests.

In a basic producer configuration:

  • The order of messages in a partition is not guaranteed.
  • The acknowledgment of messages reaching the broker does not guarantee durability.

Basic producer configuration properties

# ...
bootstrap.servers=localhost:9092 1
key.serializer=org.apache.kafka.common.serialization.StringSerializer 2
value.serializer=org.apache.kafka.common.serialization.StringSerializer 3
client.id=my-client 4
compression.type=gzip 5
# ...

1
(Required) Tells the producer to connect to a Kafka cluster using a host:port bootstrap server address for a Kafka broker. The producer uses the address to discover and connect to all brokers in the cluster. Use a comma-separated list to specify two or three addresses in case a server is down, but it’s not necessary to provide a list of all the brokers in the cluster.
2
(Required) Serializer to transform the key of each message to bytes prior to them being sent to a broker.
3
(Required) Serializer to transform the value of each message to bytes prior to them being sent to a broker.
4
(Optional) The logical name for the client, which is used in logs and metrics to identify the source of a request.
5
(Optional) The codec for compressing messages, which are sent and might be stored in compressed format and then decompressed when reaching a consumer. Compression is useful for improving throughput and reducing the load on storage, but might not be suitable for low latency applications where the cost of compression or decompression could be prohibitive.

5.2. Data durability

Message delivery acknowledgments minimize the likelihood that messages are lost. By default, acknowledgments are enabled with the acks property set to acks=all. To control the maximum time the producer waits for acknowledgments from the broker and handle potential delays in sending messages, you can use the delivery.timeout.ms property.

Acknowledging message delivery

# ...
acks=all 1
delivery.timeout.ms=120000 2
# ...

1
acks=all forces a leader replica to replicate messages to a certain number of followers before acknowledging that the message request was successfully received.
2
The maximum time in milliseconds to wait for a complete send request. You can set the value to MAX_LONG to delegate to Kafka an indefinite number of retries. The default is 120000 or 2 minutes.

The acks=all setting offers the strongest guarantee of delivery, but it will increase the latency between the producer sending a message and receiving acknowledgment. If you don’t require such strong guarantees, a setting of acks=0 or acks=1 provides either no delivery guarantees or only acknowledgment that the leader replica has written the record to its log.

With acks=all, the leader waits for all in-sync replicas to acknowledge message delivery. A topic’s min.insync.replicas configuration sets the minimum required number of in-sync replica acknowledgements. The number of acknowledgements include that of the leader and followers.

A typical starting point is to use the following configuration:

  • Producer configuration:

    • acks=all (default)
  • Broker configuration for topic replication:

    • default.replication.factor=3 (default = 1)
    • min.insync.replicas=2 (default = 1)

When you create a topic, you can override the default replication factor. You can also override min.insync.replicas at the topic level in the topic configuration.

Streams for Apache Kafka uses this configuration in the example configuration files for multi-node deployment of Kafka.

The following table describes how this configuration operates depending on the availability of followers that replicate the leader replica.

Table 5.1. Follower availability
Number of followers available and in-syncAcknowledgementsProducer can send messages?

2

The leader waits for 2 follower acknowledgements

Yes

1

The leader waits for 1 follower acknowledgement

Yes

0

The leader raises an exception

No

A topic replication factor of 3 creates one leader replica and two followers. In this configuration, the producer can continue if a single follower is unavailable. Some delay can occur whilst removing a failed broker from the in-sync replicas or a creating a new leader. If the second follower is also unavailable, message delivery will not be successful. Instead of acknowledging successful message delivery, the leader sends an error (not enough replicas) to the producer. The producer raises an equivalent exception. With retries configuration, the producer can resend the failed message request.

Note

If the system fails, there is a risk of unsent data in the buffer being lost.

5.3. Ordered delivery

Idempotent producers avoid duplicates as messages are delivered exactly once. IDs and sequence numbers are assigned to messages to ensure the order of delivery, even in the event of failure. If you are using acks=all for data consistency, using idempotency makes sense for ordered delivery. Idempotency is enabled for producers by default. With idempotency enabled, you can set the number of concurrent in-flight requests to a maximum of 5 for message ordering to be preserved.

Ordered delivery with idempotency

# ...
enable.idempotence=true 1
max.in.flight.requests.per.connection=5 2
acks=all 3
retries=2147483647 4
# ...

1
Set to true to enable the idempotent producer.
2
With idempotent delivery the number of in-flight requests may be greater than 1 while still providing the message ordering guarantee. The default is 5 in-flight requests.
3
Set acks to all.
4
Set the number of attempts to resend a failed message request.

If you choose not to use acks=all and disable idempotency because of the performance cost, set the number of in-flight (unacknowledged) requests to 1 to preserve ordering. Otherwise, a situation is possible where Message-A fails only to succeed after Message-B was already written to the broker.

Ordered delivery without idempotency

# ...
enable.idempotence=false 1
max.in.flight.requests.per.connection=1 2
retries=2147483647
# ...

1
Set to false to disable the idempotent producer.
2
Set the number of in-flight requests to exactly 1.

5.4. Reliability guarantees

Idempotence is useful for exactly once writes to a single partition. Transactions, when used with idempotence, allow exactly once writes across multiple partitions.

Transactions guarantee that messages using the same transactional ID are produced once, and either all are successfully written to the respective logs or none of them are.

# ...
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647
transactional.id=UNIQUE-ID 1
transaction.timeout.ms=900000 2
# ...
1
Specify a unique transactional ID.
2
Set the maximum allowed time for transactions in milliseconds before a timeout error is returned. The default is 900000 or 15 minutes.

The choice of transactional.id is important in order that the transactional guarantee is maintained. Each transactional id should be used for a unique set of topic partitions. For example, this can be achieved using an external mapping of topic partition names to transactional ids, or by computing the transactional id from the topic partition names using a function that avoids collisions.

5.5. Optimizing producers for throughput and latency

Usually, the requirement of a system is to satisfy a particular throughput target for a proportion of messages within a given latency. For example, targeting 500,000 messages per second with 95% of messages being acknowledged within 2 seconds.

It’s likely that the messaging semantics (message ordering and durability) of your producer are defined by the requirements for your application. For instance, it’s possible that you don’t have the option of using acks=0 or acks=1 without breaking some important property or guarantee provided by your application.

Broker restarts have a significant impact on high percentile statistics. For example, over a long period the 99th percentile latency is dominated by behavior around broker restarts. This is worth considering when designing benchmarks or comparing performance numbers from benchmarking with performance numbers seen in production.

Depending on your objective, Kafka offers a number of configuration parameters and techniques for tuning producer performance for throughput and latency.

Message batching (linger.ms and batch.size)
Message batching delays sending messages in the hope that more messages destined for the same broker will be sent, allowing them to be batched into a single produce request. Batching is a compromise between higher latency in return for higher throughput. Time-based batching is configured using linger.ms, and size-based batching is configured using batch.size.
Compression (compression.type)
Message compression adds latency in the producer (CPU time spent compressing the messages), but makes requests (and potentially disk writes) smaller, which can increase throughput. Whether compression is worthwhile, and the best compression to use, will depend on the messages being sent. Compression happens on the thread which calls KafkaProducer.send(), so if the latency of this method matters for your application you should consider using more threads.
Pipelining (max.in.flight.requests.per.connection)
Pipelining means sending more requests before the response to a previous request has been received. In general more pipelining means better throughput, up to a threshold at which other effects, such as worse batching, start to counteract the effect on throughput.

Lowering latency

When your application calls the KafkaProducer.send() method, messages undergo a series of operations before being sent:

  • Interception: Messages are processed by any configured interceptors.
  • Serialization: Messages are serialized into the appropriate format.
  • Partition assignment: Each message is assigned to a specific partition.
  • Compression: Messages are compressed to conserve network bandwidth.
  • Batching: Compressed messages are added to a batch in a partition-specific queue.

During these operations, the send() method is momentarily blocked. It also remains blocked if the buffer.memory is full or if metadata is unavailable.

Batches will remain in the queue until one of the following occurs:

  • The batch is full (according to batch.size).
  • The delay introduced by linger.ms has passed.
  • The sender is ready to dispatch batches for other partitions to the same broker and can include this batch.
  • The producer is being flushed or closed.

To minimize the impact of send() blocking on latency, optimize batching and buffering configurations. Use the linger.ms and batch.size properties to batch more messages into a single produce request for higher throughput.

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
The linger.ms property adds a delay in milliseconds so that larger batches of messages are accumulated and sent in a request. The default is 0.
2
If a maximum batch.size in bytes is used, a request is sent when the maximum is reached, or messages have been queued for longer than linger.ms (whichever comes sooner). Adding the delay allows batches to accumulate messages up to the batch size.
3
The buffer size must be at least as big as the batch size, and be able to accommodate buffering, compression, and in-flight requests.

Increasing throughput

You can improve throughput of your message requests by directing messages to a specified partition using a custom partitioner to replace the default.

# ...
partitioner.class=my-custom-partitioner 1

# ...
1
Specify the class name of your custom partitioner.
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.