5.5. 为吞吐量和延迟优化制作者


通常,系统的要求是满足给定延迟内消息的特定吞吐量目标。例如,以每秒 500,000 条消息为目标,在 2 秒内确认消息的 95%。

您的制作者的消息语义(消息排序和持久性)可能由您的应用程序的要求定义。例如,您可能没有选择使用 acks=0acks=1,而不破坏一些重要属性或应用程序提供的保证。

代理重启会对高百分比统计产生重大影响。例如,在较长的时间内,代理重启过程中会造成 99 百分比的延迟。在设计基准测试中或比较基准测试与生产环境中看到的性能号进行比较时,这值得考虑。

根据您的目标,Kafka 提供了很多配置参数和技术,用于调优生成者性能以获得吞吐量和延迟。

消息批处理(linger.msbatch.size)
消息批处理延迟会按希望向同一代理发送更多消息,从而使它们被批量批量化到单个生成请求中。批处理在返回更高吞吐量时返回延迟之间很折现。基于时间的批处理使用 linger.ms 进行配置,并且基于大小的批处理则使用 batch.size 进行配置。
压缩(compression.type)
消息压缩会增加生成者(压缩消息的 CPU 时间)的延迟,但可以使请求(以及可能的磁盘写入)更小,从而提高吞吐量。无论是值得压缩还是值得使用的最佳压缩,都将取决于所发送的消息。压缩发生在调用 KafkaProducer.send () 的线程上,因此如果此方法的延迟关系,您应该考虑使用更多线程。
pipelining (max.in.flight.requests.per.connection)
pipelining 表示在收到上一个请求的响应前发送更多请求。一般来说,管道流意味着更好的吞吐量,最多可达到其他影响的阈值,如更糟糕的批处理,开始处理对吞吐量的影响。

降低延迟

当应用程序调用 KafkaProducer.send () 时,消息是:

  • 由任何拦截器处理
  • 序列化
  • 分配给分区
  • 已压缩
  • 添加到每个分区队列中的批量消息

send () 方法返回。因此,阻止了时间 send () 是由以下决定的:

  • 拦截器、serializer 和 partitioner 花费的时间
  • 使用的压缩算法
  • 等待缓冲区用于压缩的时间

批处理将保留在队列中,直到出现以下情况之一:

  • 批处理已满(根据 batch.size
  • linger.ms 引入的延迟已经通过
  • 发件人即将将其他分区的消息批处理发送到同一代理,也可以添加此批处理
  • 生成者正在清除或关闭

查看批处理和缓冲区的配置,以减少 send () 阻止对延迟的影响。

# ...
linger.ms=100 
1

batch.size=16384 
2

buffer.memory=33554432 
3

# ...
Copy to Clipboard Toggle word wrap
1
linger 属性以毫秒为单位添加一个延迟,以便增加大量消息批处理并在请求中发送。默认值为 0'。
2
如果使用了最大 batch.size,则在达到最大值时,将发送请求,或者已排队消息的时间超过 linger.ms (以更早的时间为准)。添加延迟可让批处理将消息递增到批处理大小。
3
缓冲区大小必须至少与批处理大小相同,并且能够适应缓冲区、压缩和动态请求。

增加吞吐量

通过调整消息在发送并完成发送请求前等待的最长时间,提高消息请求的吞吐量。

您还可以通过编写自定义分区来替换默认值,将消息定向到指定的分区。

# ...
delivery.timeout.ms=120000 
1

partitioner.class=my-custom-partitioner 
2


# ...
Copy to Clipboard Toggle word wrap
1
等待完整发送请求的最长时间(毫秒)。您可以将值设为 MAX_LONG,以委派给 Kafka 的重试次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat