5.5. 为吞吐量和延迟优化制作者
通常,系统的要求是满足给定延迟内消息的特定吞吐量目标。例如,以每秒 500,000 条消息为目标,在 2 秒内确认消息的 95%。
您的制作者的消息语义(消息排序和持久性)可能由您的应用程序的要求定义。例如,您可能没有选择使用 acks=0
或 acks=1
,而不破坏一些重要属性或应用程序提供的保证。
代理重启会对高百分比统计产生重大影响。例如,在较长的时间内,代理重启过程中会造成 99 百分比的延迟。在设计基准测试中或比较基准测试与生产环境中看到的性能号进行比较时,这值得考虑。
根据您的目标,Kafka 提供了很多配置参数和技术,用于调优生成者性能以获得吞吐量和延迟。
- 消息批处理(
linger.ms
和batch.size
) -
消息批处理延迟会按希望向同一代理发送更多消息,从而使它们被批量批量化到单个生成请求中。批处理在返回更高吞吐量时返回延迟之间很折现。基于时间的批处理使用
linger.ms
进行配置,并且基于大小的批处理则使用batch.size
进行配置。 - 压缩(
compression.type
) -
消息压缩会增加生成者(压缩消息的 CPU 时间)的延迟,但可以使请求(以及可能的磁盘写入)更小,从而提高吞吐量。无论是值得压缩还是值得使用的最佳压缩,都将取决于所发送的消息。压缩发生在调用
KafkaProducer.send ()
的线程上,因此如果此方法的延迟关系,您应该考虑使用更多线程。 - pipelining (
max.in.flight.requests.per.connection
) - pipelining 表示在收到上一个请求的响应前发送更多请求。一般来说,管道流意味着更好的吞吐量,最多可达到其他影响的阈值,如更糟糕的批处理,开始处理对吞吐量的影响。
降低延迟
当应用程序调用 KafkaProducer.send ()
方法时,在发送前会处理一系列操作:
- 拦截器 :消息由任何配置的拦截器处理。
- 序列化 :消息被序列化为适当的格式。
- 分区分配 :每个消息都分配给一个特定的分区。
- 压缩:消息被压缩以节省网络带宽。
- 批处理:压缩的消息添加到特定于分区的队列中的批处理中。
在这些操作过程中,send ()
方法会完全阻止。如果 buffer.memory
已满或者元数据不可用,它也会被阻断。
批处理将保留在队列中,直到出现以下情况之一:
-
批处理已满(根据
batch.size
)。 -
linger.ms
引入的延迟已通过。 - 发件人准备好将其他分区的批处理分配到同一代理,并可包含此批处理。
- 生成者正在清除或关闭。
为最大程度降低 send ()
阻塞对延迟的影响,请优化批处理和缓冲配置。使用 linger.ms
和 batch.size
属性将更多消息批处理到单个生成请求中,以获得更高的吞吐量。
# ... linger.ms=100 1 batch.size=16384 2 buffer.memory=33554432 3 # ...
增加吞吐量
您可以使用自定义分区程序将消息定向到指定分区来替换默认值,来提高消息请求的吞吐量。
# ...
partitioner.class=my-custom-partitioner 1
# ...
- 1
- 指定自定义分区器的类名称。