Chapter 18. Handling high volumes of messages
If your AMQ Streams deployment needs to handle a high volume of messages, you can use configuration options to optimize for throughput and latency.
Producer and consumer configuration can help control the size and frequency of requests to Kafka brokers. For more information on the configuration options, see the following:
You can also use the same configuration options with the producers and consumers used by the Kafka Connect runtime source connectors (including MirrorMaker 2) and sink connectors.
- Source connectors
- Producers from the Kafka Connect runtime send messages to the Kafka cluster.
- For MirrorMaker 2, since the source system is Kafka, consumers retrieve messages from a source Kafka cluster.
- Sink connectors
- Consumers from the Kafka Connect runtime retrieve messages from the Kafka cluster.
For consumers, you might increase the amount of data fetched in a single fetch request to reduce latency. You increase the fetch request size using the fetch.max.bytes
and max.partition.fetch.bytes
properties. You can also set a maximum limit on the number of messages returned from the consumer buffer using the max.poll.records
property.
For MirrorMaker 2, configure the fetch.max.bytes
, max.partition.fetch.bytes
, and max.poll.records
values at the source connector level (consumer.*
), as they relate to the specific consumer that fetches messages from the source.
For producers, you might increase the size of the message batches sent in a single produce request. You increase the batch size using the batch.size
property. A larger batch size reduces the number of outstanding messages ready to be sent and the size of the backlog in the message queue. Messages being sent to the same partition are batched together. A produce request is sent to the target cluster when the batch size is reached. By increasing the batch size, produce requests are delayed and more messages are added to the batch and sent to brokers at the same time. This can improve throughput when you have just a few topic partitions that handle large numbers of messages.
Consider the number and size of the records that the producer handles for a suitable producer batch size.
Use linger.ms
to add a wait time in milliseconds to delay produce requests when producer load decreases. The delay means that more records can be added to batches if they are under the maximum batch size.
Configure the batch.size
and linger.ms
values at the source connector level (producer.override.*
), as they relate to the specific producer that sends messages to the target Kafka cluster.
For Kafka Connect source connectors, the data streaming pipeline to the target Kafka cluster is as follows:
Data streaming pipeline for Kafka Connect source connector
external data source
For Kafka Connect sink connectors, the data streaming pipeline to the target external data source is as follows:
Data streaming pipeline for Kafka Connect sink connector
source Kafka topic
For MirrorMaker 2, the data mirroring pipeline to the target Kafka cluster is as follows:
Data mirroring pipeline for MirrorMaker 2
source Kafka topic
The producer sends messages in its buffer to topics in the target Kafka cluster. While this is happening, Kafka Connect tasks continue to poll the data source to add messages to the source message queue.
The size of the producer buffer for the source connector is set using the producer.override.buffer.memory
property. Tasks wait for a specified timeout period (offset.flush.timeout.ms
) before the buffer is flushed. This should be enough time for the sent messages to be acknowledged by the brokers and offset data committed. The source task does not wait for the producer to empty the message queue before committing offsets, except during shutdown.
If the producer is unable to keep up with the throughput of messages in the source message queue, buffering is blocked until there is space available in the buffer within a time period bounded by max.block.ms
. Any unacknowledged messages still in the buffer are sent during this period. New messages are not added to the buffer until these messages are acknowledged and flushed.
You can try the following configuration changes to keep the underlying source message queue of outstanding messages at a manageable size:
-
Increasing the default value in milliseconds of the
offset.flush.timeout.ms
- Ensuring that there are enough CPU and memory resources
Increasing the number of tasks that run in parallel by doing the following:
-
Increasing the number of tasks that run in parallel using the
tasksMax
property -
Increasing the number of worker nodes that run tasks using the
replicas
property
-
Increasing the number of tasks that run in parallel using the
Consider the number of tasks that can run in parallel according to the available CPU and memory resources and number of worker nodes. You might need to keep adjusting the configuration values until they have the desired effect.
18.1. Configuring Kafka Connect for high-volume messages
Kafka Connect fetches data from the source external data system and hands it to the Kafka Connect runtime producers so that it’s replicated to the target cluster.
The following example shows configuration for Kafka Connect using the KafkaConnect
custom resource.
Example Kafka Connect configuration for handling high volumes of messages
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: replicas: 3 config: offset.flush.timeout.ms: 10000 # ... resources: requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi # ...
Producer configuration is added for the source connector, which is managed using the KafkaConnector
custom resource.
Example source connector configuration for handling high volumes of messages
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: producer.override.batch.size: 327680 producer.override.linger.ms: 100 # ...
FileStreamSourceConnector
and FileStreamSinkConnector
are provided as example connectors. For information on deploying them as KafkaConnector
resources, see Section 6.4.3.3, “Deploying KafkaConnector resources”.
Consumer configuration is added for the sink connector.
Example sink connector configuration for handling high volumes of messages
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector tasksMax: 2 config: consumer.fetch.max.bytes: 52428800 consumer.max.partition.fetch.bytes: 1048576 consumer.max.poll.records: 500 # ...
If you are using the Kafka Connect API instead of the KafkaConnector
custom resource to manage your connectors, you can add the connector configuration as a JSON object.
Example curl request to add source connector configuration for handling high volumes of messages
curl -X POST \ http://my-connect-cluster-connect-api:8083/connectors \ -H 'Content-Type: application/json' \ -d '{ "name": "my-source-connector", "config": { "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "file": "/opt/kafka/LICENSE", "topic":"my-topic", "tasksMax": "4", "type": "source" "producer.override.batch.size": 327680 "producer.override.linger.ms": 100 } }'
18.2. Configuring MirrorMaker 2 for high-volume messages
MirrorMaker 2 fetches data from the source cluster and hands it to the Kafka Connect runtime producers so that it’s replicated to the target cluster.
The following example shows the configuration for MirrorMaker 2 using the KafkaMirrorMaker2
custom resource.
Example MirrorMaker 2 configuration for handling high volumes of messages
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 3.4.0 replicas: 1 connectCluster: "my-cluster-target" clusters: - alias: "my-cluster-source" bootstrapServers: my-cluster-source-kafka-bootstrap:9092 - alias: "my-cluster-target" config: offset.flush.timeout.ms: 10000 bootstrapServers: my-cluster-target-kafka-bootstrap:9092 mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: tasksMax: 2 config: producer.override.batch.size: 327680 producer.override.linger.ms: 100 consumer.fetch.max.bytes: 52428800 consumer.max.partition.fetch.bytes: 1048576 consumer.max.poll.records: 500 # ... resources: requests: cpu: "1" memory: Gi limits: cpu: "2" memory: 4Gi
18.3. Checking the MirrorMaker 2 message flow
If you are using Prometheus and Grafana to monitor your deployment, you can check the MirrorMaker 2 message flow.
The example MirrorMaker 2 Grafana dashboards provided with AMQ Streams show the following metrics related to the flush pipeline.
- The number of messages in Kafka Connect’s outstanding messages queue
- The available bytes of the producer buffer
- The offset commit timeout in milliseconds
You can use these metrics to gauge whether or not you need to tune your configuration based on the volume of messages.