5.5. 为吞吐量和延迟优化制作者
通常,系统的要求是满足给定延迟内消息的特定吞吐量目标。例如,以每秒 500,000 条消息为目标,在 2 秒内确认消息的 95%。
您的制作者的消息语义(消息排序和持久性)可能由您的应用程序的要求定义。例如,您可能没有选择使用 acks=0
或 acks=1
,而不破坏一些重要属性或应用程序提供的保证。
代理重启会对高百分比统计产生重大影响。例如,在较长的时间内,代理重启过程中会造成 99 百分比的延迟。在设计基准测试中或比较基准测试与生产环境中看到的性能号进行比较时,这值得考虑。
根据您的目标,Kafka 提供了很多配置参数和技术,用于调优生成者性能以获得吞吐量和延迟。
- 消息批处理(
linger.ms
和batch.size
) -
消息批处理延迟会按希望向同一代理发送更多消息,从而使它们被批量批量化到单个生成请求中。批处理在返回更高吞吐量时返回延迟之间很折现。基于时间的批处理使用
linger.ms
进行配置,并且基于大小的批处理则使用batch.size
进行配置。 - 压缩(
compression.type
) -
消息压缩会增加生成者(压缩消息的 CPU 时间)的延迟,但可以使请求(以及可能的磁盘写入)更小,从而提高吞吐量。无论是值得压缩还是值得使用的最佳压缩,都将取决于所发送的消息。压缩发生在调用
KafkaProducer.send ()
的线程上,因此如果此方法的延迟关系,您应该考虑使用更多线程。 - pipelining (
max.in.flight.requests.per.connection
) - pipelining 表示在收到上一个请求的响应前发送更多请求。一般来说,管道流意味着更好的吞吐量,最多可达到其他影响的阈值,如更糟糕的批处理,开始处理对吞吐量的影响。
降低延迟
当应用程序调用 KafkaProducer.send ()
时,消息是:
- 由任何拦截器处理
- 序列化
- 分配给分区
- 已压缩
- 添加到每个分区队列中的批量消息
点 send ()
方法返回。因此,阻止了时间 send ()
是由以下决定的:
- 拦截器、serializer 和 partitioner 花费的时间
- 使用的压缩算法
- 等待缓冲区用于压缩的时间
批处理将保留在队列中,直到出现以下情况之一:
-
批处理已满(根据
batch.size
) -
linger.ms
引入的延迟已经通过 - 发件人即将将其他分区的消息批处理发送到同一代理,也可以添加此批处理
- 生成者正在清除或关闭
查看批处理和缓冲区的配置,以减少 send ()
阻止对延迟的影响。
... ...
# ...
linger.ms=100
batch.size=16384
buffer.memory=33554432
# ...
增加吞吐量
通过调整消息在发送并完成发送请求前等待的最长时间,提高消息请求的吞吐量。
您还可以通过编写自定义分区来替换默认值,将消息定向到指定的分区。
... ...
# ...
delivery.timeout.ms=120000
partitioner.class=my-custom-partitioner
# ...