5.5. 优化生产者以实现吞吐量和延迟


通常,系统要求是满足特定吞吐量目标,以满足给定延迟内消息的比例。例如,针对每秒 500,000 消息,2 秒内被确认了 95% 的消息。

您的制作者的消息传递语义(消息排序和持续时间)可能由您的应用程序的要求定义。例如,您不能选择使用 acks=0acks=1 选项,而无需破坏某些重要属性或保证应用程序提供。

代理重启会对高百分比统计造成显著影响。例如,在较长时间内,代理重启的行为是 99%ile 延迟的划分。在设计基准或比较了生产中的性能编号时,可以考虑这一点。

根据您的目标,Kafka 提供了多个配置参数和技巧,用于针对吞吐量和延迟调整制作者性能。

消息批处理(linger.msbatch.size)
消息批处理延迟发送希望发送更多用于同一代理的消息,以便将它们批量化到一个生成的请求中。批处理是一种折衷,以返回更高的吞吐量。基于时间的批处理是使用 linger.ms 配置,基于大小的批处理则使用 batch.size 配置。
压缩(压缩.type)
消息压缩增加了生产者中的延迟(CPU 时间用于压缩消息),但会使请求(以及可能的磁盘写入)较小,这可能会增加吞吐量。无论压缩是否有必要,使用的最佳压缩都将取决于正在发送的消息。在线程上进行压缩,它会调用 KafkaProducer.send (),因此,如果此方法的延迟需要您考虑使用更多线程。
pipelining (max.in.flight.requests.per.connection)
pipelining 意味着在收到对前一个请求的响应前发送更多请求。通常而言,在更广的传送上意味着更好的吞吐量,在其他效果上最多为阈值(如更更糟糕的情况)开始改变吞吐量的影响。

较低延迟

当应用程序调用 KafkaProducer.send () 时,消息是:

  • 由任何拦截器处理
  • 序列化
  • 分配给分区
  • 压缩
  • 添加到每个分区队列中的批消息

在哪个指向 send () 方法返回。因此,时间 send () 由以下决定:

  • 拦截器、序列化和分区程序花费的时间
  • 使用的压缩算法
  • 等待缓冲区使用压缩的时间

批处理将保留在队列中,直到发生以下之一:

  • 批处理已满(根据 批处理。
  • linger.ms 引入的延迟已经通过
  • 发件人是将其他分区的消息批处理发送到同一代理,也可以添加此批处理
  • 制作者正在刷新或关闭

查看批处理和缓冲的配置,以缓解 send () 阻止延迟的影响。

# ...
linger.ms=100 
1

batch.size=16384 
2

buffer.memory=33554432 
3

# ...
1
linger 属性以毫秒为单位添加延迟,以便请求中累积和发送大量消息。默认值为 0。
2
如果使用最大的 batch.size (以字节为单位),会在达到最大时发送请求,或者已排队的消息超过 linger.ms (以更早为准)。添加延迟可让批量批量消息增加到批处理大小。
3
缓冲区大小必须至少为批处理大小,并且能够容纳缓冲区、压缩和动态请求。

增加吞吐量

通过调整信息发送并完成发送请求前等待的最大时间来提高您的消息请求的吞吐量。

您还可以通过编写自定义分区程序替换默认值,将消息定向到指定分区。

# ...
delivery.timeout.ms=120000 
1

partitioner.class=my-custom-partitioner 
2


# ...
1
等待完成发送请求的最长时间(毫秒)。您可以将值设为 MAX_LONG 委派到 Kafka 无限次重试次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部