3.8. 处理大量消息大小
消息的默认批处理大小为 1MB,这是大多数用例中最大吞吐量的最佳选择。Kafka 可以在减少吞吐量下容纳更大的批处理,假设有足够的磁盘容量。
大型消息大小以四种方式处理:
- 生产者消息压缩 将压缩消息写入日志。
- 基于参考的消息传递仅发送对消息值中存储的其他系统中数据的引用。
- 内联消息传递将信息分成使用相同键的块,然后使用流处理器(如 Kafka Streams)在输出中合并。
- 构建的 broker 和 producer/consumer 客户端应用程序配置来处理更大的消息大小。
建议使用基于参考的消息和消息压缩选项,并涵盖大多数情况。对于这些选项,必须小心才能避免引入性能问题。
制作者压缩
对于生成者配置,您可以指定一个 compression.type
,如 Gzip,它被应用到制作者生成的数据的批处理。使用代理配置 compression.type=producer
,代理会保留使用制作者的任何压缩。每当制作者和主题压缩不匹配时,代理必须在将批处理附加到日志前再次压缩批处理,这会影响代理性能。
压缩还会在生产者和解压缩开销上增加额外的处理开销,但在批处理中包含更多的数据,因此当消息数据压缩良好时,吞吐量通常很有用。
将生成者压缩与批处理大小的微调相结合,以促进最佳吞吐量。使用指标有助于量化所需的平均批处理大小。
基于参考的消息传递
当您不知道消息有多大时,基于参考的消息传递对于数据复制非常有用。外部数据存储必须快速、持久且高度可用,才能使此配置正常工作。数据被写入数据存储,并返回对数据的引用。producer 发送一条消息,其中包含对 Kafka 的引用。消费者从消息中获取引用,并使用它来从数据存储中获取数据。
图 3.4. 基于参考的消息传递流

当消息传递需要更多行程时,端到端延迟会增加。这个方法的另一个显著缺陷是,在清理 Kafka 消息时,外部系统中没有自动清理数据。混合方法是仅将大型消息发送到数据存储并直接处理标准化消息。
内联消息传递
内联消息传递非常复杂,但它对基于参考的消息等外部系统没有开销。
如果消息太大,则生成客户端应用必须序列化,然后对数据进行阻塞。然后,生成者使用 Kafka ByteArraySerializer
,或者在发送前再次序列化每个块。消费者跟踪消息和缓冲区块,直到它有完整的消息。消耗客户端应用程序接收块,这些块在进行序列化前被编译。根据每个块消息集合的第一个或最后一个块的偏移量,将完整消息发送到消耗应用程序的剩余部分。针对偏移元数据检查成功交付完整消息,以避免重新平衡期间重复。
图 3.5. 内联消息传递流

内联消息传递在消费者端具有性能开销,因为需要缓冲,特别是在并行处理一系列大型消息时。大型消息的块可能会变为交集,因此如果缓冲区中另一个大消息的块不完整,则无法提交消息的所有区块。因此,通常通过持久保留消息块或实施提交逻辑来支持缓冲。
配置以处理更大的信息
如果无法避免更大的消息,并且避免在消息流的任意点进行块,您可以增加消息限制。为此,请在主题级别上配置 message.max.bytes
,以设置各个主题的最大记录批处理大小。如果您在代理级别上设置 message.max.bytes
,则为所有主题允许更大的消息。
代理将拒绝任何大于 message.max.bytes
设置的限制的消息。producers 的缓冲区大小(max.request.size
)和消费者(message.max.bytes
)必须能够容纳更大的消息。