12.7.2.5. 优化吞吐量和延迟


通常,系统的要求是满足给定延迟内一定比例消息的特定吞吐量目标。例如,以每秒 500,000条消息为目标,95% 的消息会在 2 秒内得到确认。

生产者的消息传递语义(消息排序和持久性)很可能根据您的应用程序的要求进行定义。例如,您可能没有选项在不破坏某些重要属性的情况下使用 acks=0acks=1,或者无法保证应用程序提供。

Broker 重新启动对高百分比统计数据有显著影响。例如,在很长一段时间内,99 百分点延迟由围绕代理重启的行为占据主导地位。在设计基准测试时,或比较基准测试与生产中显示的性能数字时,需要考虑这一点。

根据您的目标,Kafka 提供了多个配置参数和技术来调节吞吐量和延迟的性能。

消息批处理(linger.msbatch.size
消息批处理会延迟发送消息,希望将发送更多目标为同一代理的消息,允许它们批处理到单个生成请求。批处理是在高吞吐量时返回的更高延迟之间的妥协。基于时间的批处理使用 linger .ms 配置,而基于大小的批处理则使用 batch.size 配置。
压缩(压缩.type
消息压缩增加了制作者延迟(CPU 时间用于压缩消息),但会更小(可能进行磁盘写入),这可以提高吞吐量。压缩是否必要,以及要使用的最佳压缩程度取决于所发送的消息。压缩发生在调用 KafkaProducer.send()的 线程上,因此如果此方法的延迟与您需要使用更多线程的应用程序相关。
pipelining(max.in.flight.requests.per.connection)
pipelining 意味着在收到对上一个请求的响应前发送更多请求。通常,更多流水线意味着更好的吞吐量,最高是一个阈值,达到其他效果(例如更糟糕的批处理)开始消除对吞吐量的影响。

降低延迟

当您的应用程序调用 KafkaProducer.send() 时,消息为:

  • 由任何拦截器处理
  • serialized
  • 分配给分区
  • 已压缩
  • 添加到每个分区队列中的批量消息

send() 方法返回的时间点。因此, send() 的时间由以下方法决定:

  • 拦截器、序列程序和分区器花费的时间
  • 使用的压缩算法
  • 等待缓冲区用于压缩所需的时间

批处理将保留在队列中,直到出现以下情况之一:

  • 批处理已满(根据 batch.size
  • linger .ms 引入的延迟已过
  • 发送方即将向同一代理发送其他分区的消息批处理,也可以添加此批处理
  • 生产者被清空或关闭

查看批处理和缓冲的配置,以减轻 send() 阻止对延迟的影响。

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger 属性添加毫秒的延迟,以便在请求中累积和发送更大规模的消息。默认值为 0'。
2
如果使用最大 批处理.size,则在达到最大值时发送请求,或者消息已排队的时间超过 linger .ms (以较快者为准)。添加延迟可让批处理积累消息到批处理大小。
3
缓冲区的大小必须至少与批处理大小相同,并且能够适应缓冲区、压缩和内向请求。

增加吞吐量

通过调整消息传输和完成发送请求前等待的最长时间,提高消息请求的吞吐量。

您还可以通过编写自定义分区程序来替换默认分区,将消息定向到指定分区。

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
等待完整发送请求的最长时间,以毫秒为单位。您可以将值设为 MAX_LONG,以将值委派给 Kafka 重试次数的未定义次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.