5.5. 优化吞吐量和延迟的制作者


通常,系统要求是满足给定延迟内消息的具体吞吐量目标。例如,以每秒 500,000 个消息为目标,在 2 秒内确认了 95% 的消息。

生成者的消息语义(消息排序和持久性)可能由应用程序的要求定义。例如,您可能没有选择在不破坏一些重要属性或保证应用程序提供的 acks=0acks=1 的情况下使用 acks=0 或 acks=1。

代理重启会对高百分比统计产生重大影响。例如,在一个长时间内,99 百分比的延迟由代理重启的行为所降低。在设计基准测试或将性能号与生产环境中看到的性能号进行比较时,这值得考虑。

根据您的目的,Kafka 提供了很多配置参数和技术,用于调整吞吐量和延迟的性能。

消息批处理(linger.msbatch.size)
在希望用于同一代理的消息中会发送消息批处理延迟,从而允许将它们批处理到一个生成请求中。批量(批处理)是更高的延迟,以返回更高的吞吐量。基于时间的批处理使用 linger.ms 配置,基于大小的批处理是使用 batch.size 来配置的。
compression (compression.type)
消息压缩会增加生成者(CPU 时间压缩消息),但使请求(和可能的磁盘写入)更小,这可以提高吞吐量。压缩是否值得注意,以及使用的最佳压缩将取决于正在发送的消息。压缩会在调用 KafkaProducer.send () 的线程上进行压缩,因此如果此方法的延迟对于您的应用程序来说很重要,您应该考虑使用更多线程。
Pipelining (max.in.flight.requests.per.connection)
pipelining 意味着在收到响应前发送更多请求。通常,更多 pipelining 意味着更好的吞吐量,最多会达到其他影响的阈值,如更差的批处理,从而开始对吞吐量的影响。

降低延迟

当应用程序调用 KafkaProducer.send () 时,信息为:

  • 由任何拦截器处理
  • serialized
  • 分配给分区
  • 压缩
  • 添加到每个分区队列中的批处理消息

在哪个时候,send () 方法返回。因此,时间 send () 会被阻断:

  • 拦截器、序列化器和分区器花费的时间
  • 使用的压缩算法
  • 等待缓冲区压缩的时间

批处理将保留在队列中,直到出现以下情况之一:

  • 批处理已满(到 batch.size
  • linger.ms 引入的延迟已经通过
  • 发送方即将将其他分区的消息批处理发送到同一代理,并可添加此批处理
  • 生成者正在刷新或关闭

查看批处理和缓冲的配置,以减少 send () 阻止延迟的影响。

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger 属性添加一个毫秒的延迟,以便在请求中累积并发送更大的消息。默认值为 0'。
2
如果使用最大 batch.size,则在达到最大值时发送请求,或者消息已排队的时间超过 linger.ms (以更早的时间为准)。添加延迟允许批处理消息累计到批处理大小。
3
缓冲区大小必须至少与批处理大小相同,并能够容纳缓冲区、压缩和动态请求。

增加吞吐量

通过调整发送消息前等待的最长时间并完成发送请求,提高消息请求的吞吐量。

您还可以通过编写自定义分区程序来替换默认值,将消息定向到指定的分区。

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
等待完整发送请求的最长时间(毫秒)。您可以将值设为 MAX_LONG,以委派给 Kafka 的一个无限期重试次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.