Chapter 11. Handling high volumes of messages
If your AMQ Streams deployment needs to handle a high volume of messages, you can use configuration options to optimize for throughput and latency.
Kafka producer and consumer configuration can help control the size and frequency of requests to Kafka brokers. For more information on the configuration options, see the following:
You can also use the same configuration options with the producers and consumers used by the Kafka Connect runtime source connectors (including MirrorMaker 2) and sink connectors.
- Source connectors
- Producers from the Kafka Connect runtime send messages to the Kafka cluster.
- For MirrorMaker 2, since the source system is Kafka, consumers retrieve messages from a source Kafka cluster.
- Sink connectors
- Consumers from the Kafka Connect runtime retrieve messages from the Kafka cluster.
For consumer configuration (consumer.*
), you might increase the amount of data fetched in a single fetch request to reduce latency. You increase the fetch request size using the fetch.max.bytes
and max.partition.fetch.bytes
properties. You can also set a maximum limit on the number of messages returned from the consumer buffer using the max.poll.records
property.
For producer configuration (producer.*
), you might increase the size of the message batches sent in a single produce request. You increase the batch size using the batch.size
property. A larger batch size reduces the number of outstanding messages ready to be sent and the size of the backlog in the message queue. Messages being sent to the same partition are batched together. A produce request is sent to the target cluster when the batch size is reached. By increasing the batch size, produce requests are delayed and more messages are added to the batch and sent to brokers at the same time. This can improve throughput when you have just a few topic partitions that handle large numbers of messages.
Consider the number and size of the records that the producer handles for a suitable producer batch size.
Use linger.ms
to add a wait time in milliseconds to delay produce requests when producer load decreases. The delay means that more records can be added to batches if they are under the maximum batch size.
For Kafka Connect source connectors, the data streaming pipeline to the target Kafka cluster is as follows:
Data streaming pipeline for Kafka Connect source connector
external data source
For Kafka Connect sink connectors, the data streaming pipeline to the target external data source is as follows:
Data streaming pipeline for Kafka Connect sink connector
source Kafka topic
For MirrorMaker 2, the data mirroring pipeline to the target Kafka cluster is as follows:
Data mirroring pipeline for MirrorMaker 2
source Kafka topic
The producer sends messages in its buffer to topics in the target Kafka cluster. While this is happening, Kafka Connect tasks continue to poll the data source to add messages to the source message queue.
The size of the producer buffer for the source connector is set using the buffer.memory
property. Tasks wait for a specified timeout period (offset.flush.timeout.ms
) before the buffer is flushed. This should be enough time for the sent messages to be acknowledged by the brokers and offset data committed. The source task does not wait for the producer to empty the message queue before committing offsets, except during shutdown.
If the producer is unable to keep up with the throughput of messages in the source message queue, buffering is blocked until there is space available in the buffer within a time period bounded by max.block.ms
. Any unacknowledged messages still in the buffer are sent during this period. New messages are not added to the buffer until these messages are acknowledged and flushed.
You can try the following configuration changes to keep the underlying source message queue of outstanding messages at a manageable size:
-
Increasing the default value in milliseconds of the
offset.flush.timeout.ms
- Ensuring that there are enough CPU and memory resources
Increasing the number of tasks that run in parallel by doing the following:
-
Increasing the number of tasks that run in parallel using the
tasks.max
property - Increasing the number of nodes for the workers that run tasks
-
Increasing the number of tasks that run in parallel using the
Consider the number of tasks that can run in parallel according to the available CPU and memory resources and number of worker nodes. You might need to keep adjusting the configuration values until they have the desired effect.
11.1. Configuring Kafka Connect for high-volume messages
Kafka Connect fetches data from the source external data system and hands it to the Kafka Connect runtime producers so that it’s replicated to the target cluster.
The following example shows configuration for a Kafka Connect source connector.
Example source connector configuration for handling high volumes of messages
# ... producer.batch.size=327680 producer.linger.ms=100 # ... tasks.max = 2
Consumer configuration is added for the sink connector.
Example sink connector configuration for handling high volumes of messages
# ... consumer.fetch.max.bytes=52428800 consumer.max.partition.fetch.bytes=1048576 consumer.max.poll.records=500 # ... tasks.max = 2
11.2. Configuring MirrorMaker 2 for high-volume messages
MirrorMaker 2 fetches data from the source cluster and hands it to the Kafka Connect runtime producers so that it’s replicated to the target cluster.
The following example shows the configuration for MirrorMaker 2. The configuration relates to the consumer that fetches messages from the source and the producer that sends messages to the target Kafka cluster.
Example MirrorMaker 2 configuration for handling high volumes of messages
clusters=cluster-1,cluster-2 # ... cluster-2.producer.batch.size=327680 cluster-2.producer.linger.ms=100 cluster-1.consumer.fetch.max.bytes=52428800 cluster-1.consumer.max.partition.fetch.bytes=1048576 cluster-1.consumer.max.poll.records=500 # ... tasks.max = 2