5.5. 优化吞吐量和延迟的制作者
通常,系统要求是满足给定延迟内消息的具体吞吐量目标。例如,以每秒 500,000 个消息为目标,在 2 秒内确认了 95% 的消息。
生成者的消息语义(消息排序和持久性)可能由应用程序的要求定义。例如,您可能没有选择在不破坏一些重要属性或保证应用程序提供的 acks=0
或 acks=1
的情况下使用 acks=0 或 acks=1。
代理重启会对高百分比统计产生重大影响。例如,在一个长时间内,99 百分比的延迟由代理重启的行为所降低。在设计基准测试或将性能号与生产环境中看到的性能号进行比较时,这值得考虑。
根据您的目的,Kafka 提供了很多配置参数和技术,用于调整吞吐量和延迟的性能。
- 消息批处理(
linger.ms
和batch.size
) -
在希望用于同一代理的消息中会发送消息批处理延迟,从而允许将它们批处理到一个生成请求中。批量(批处理)是更高的延迟,以返回更高的吞吐量。基于时间的批处理使用
linger.ms
配置,基于大小的批处理是使用batch.size
来配置的。 - compression (
compression.type
) -
消息压缩会增加生成者(CPU 时间压缩消息),但使请求(和可能的磁盘写入)更小,这可以提高吞吐量。压缩是否值得注意,以及使用的最佳压缩将取决于正在发送的消息。压缩会在调用
KafkaProducer.send ()
的线程上进行压缩,因此如果此方法的延迟对于您的应用程序来说很重要,您应该考虑使用更多线程。 - Pipelining (
max.in.flight.requests.per.connection
) - pipelining 意味着在收到响应前发送更多请求。通常,更多 pipelining 意味着更好的吞吐量,最多会达到其他影响的阈值,如更差的批处理,从而开始对吞吐量的影响。
降低延迟
当应用程序调用 KafkaProducer.send ()
时,信息为:
- 由任何拦截器处理
- serialized
- 分配给分区
- 压缩
- 添加到每个分区队列中的批处理消息
在哪个时候,send ()
方法返回。因此,时间 send ()
会被阻断:
- 拦截器、序列化器和分区器花费的时间
- 使用的压缩算法
- 等待缓冲区压缩的时间
批处理将保留在队列中,直到出现以下情况之一:
-
批处理已满(到
batch.size
) -
linger.ms
引入的延迟已经通过 - 发送方即将将其他分区的消息批处理发送到同一代理,并可添加此批处理
- 生成者正在刷新或关闭
查看批处理和缓冲的配置,以减少 send ()
阻止延迟的影响。
# ... linger.ms=100 1 batch.size=16384 2 buffer.memory=33554432 3 # ...
增加吞吐量
通过调整发送消息前等待的最长时间并完成发送请求,提高消息请求的吞吐量。
您还可以通过编写自定义分区程序来替换默认值,将消息定向到指定的分区。
# ... delivery.timeout.ms=120000 1 partitioner.class=my-custom-partitioner 2 # ...