5.5. 优化生产者以实现吞吐量和延迟
通常,系统要求是满足特定吞吐量目标,以满足给定延迟内消息的比例。例如,针对每秒 500,000 消息,2 秒内被确认了 95% 的消息。
您的制作者的消息传递语义(消息排序和持续时间)可能由您的应用程序的要求定义。例如,您不能选择使用 acks=0 或 acks=1 选项,而无需破坏某些重要属性或保证应用程序提供。
代理重启会对高百分比统计造成显著影响。例如,在较长时间内,代理重启的行为是 99%ile 延迟的划分。在设计基准或比较了生产中的性能编号时,可以考虑这一点。
根据您的目标,Kafka 提供了多个配置参数和技巧,用于针对吞吐量和延迟调整制作者性能。
- 消息批处理(
linger.ms和batch.size) -
消息批处理延迟发送希望发送更多用于同一代理的消息,以便将它们批量化到一个生成的请求中。批处理是一种折衷,以返回更高的吞吐量。基于时间的批处理是使用
linger.ms配置,基于大小的批处理则使用batch.size配置。 - 压缩(
压缩.type) -
消息压缩增加了生产者中的延迟(CPU 时间用于压缩消息),但会使请求(以及可能的磁盘写入)较小,这可能会增加吞吐量。无论压缩是否有必要,使用的最佳压缩都将取决于正在发送的消息。在线程上进行压缩,它会调用
KafkaProducer.send (),因此,如果此方法的延迟需要您考虑使用更多线程。 - pipelining (
max.in.flight.requests.per.connection) - pipelining 意味着在收到对前一个请求的响应前发送更多请求。通常而言,在更广的传送上意味着更好的吞吐量,在其他效果上最多为阈值(如更更糟糕的情况)开始改变吞吐量的影响。
较低延迟
当应用程序调用 KafkaProducer.send () 时,消息是:
- 由任何拦截器处理
- 序列化
- 分配给分区
- 压缩
- 添加到每个分区队列中的批消息
在哪个指向 send () 方法返回。因此,时间 send () 由以下决定:
- 拦截器、序列化和分区程序花费的时间
- 使用的压缩算法
- 等待缓冲区使用压缩的时间
批处理将保留在队列中,直到发生以下之一:
-
批处理已满(根据
批处理。) -
linger.ms引入的延迟已经通过 - 发件人是将其他分区的消息批处理发送到同一代理,也可以添加此批处理
- 制作者正在刷新或关闭
查看批处理和缓冲的配置,以缓解 send () 阻止延迟的影响。
# ...
linger.ms=100
batch.size=16384
buffer.memory=33554432
# ...
增加吞吐量
通过调整信息发送并完成发送请求前等待的最大时间来提高您的消息请求的吞吐量。
您还可以通过编写自定义分区程序替换默认值,将消息定向到指定分区。
# ...
delivery.timeout.ms=120000
partitioner.class=my-custom-partitioner
# ...