第 7 章 处理大量消息大小


Kafka 的默认批处理大小为 1MB,这是大多数用例中最大吞吐量的最佳选择。Kafka 可以在减少吞吐量下容纳更大的批处理,假设有足够的磁盘容量。

大型消息大小可以通过以下四种方式进行处理:

  1. 代理、生产者和消费者配置为容纳更大的消息大小。
  2. 生产者消息压缩 将压缩消息写入日志。
  3. 基于参考的消息传递仅发送对消息有效负载中某些其他系统中存储的数据的引用。
  4. 内联消息传递将信息分成使用相同键的块,然后使用流处理器(如 Kafka Streams)在输出中合并。

除非您要处理非常大的消息,否则建议使用配置方法。基于参考的消息和消息压缩选项涵盖了大多数其他情况。对于这些选项,必须小心,以避免引入性能问题。

7.1. 配置 Kafka 组件以处理更大的信息

大型消息可能会影响系统性能,并在消息处理中引入复杂性。如果无法避免它们,则有可用的配置选项。要高效地处理较大的消息并防止消息流中的块,请考虑调整以下配置:

  • 调整最大记录批处理大小:

    • 在代理级别设置 message.max.bytes,以支持所有主题的较大的记录批处理大小。
    • 在主题级别上设置 max.message.bytes,以支持单个主题的较大的记录批处理大小。
  • 增加每个分区跟踪器获取的最大消息大小(replica.fetch.max.bytes)。
  • 增加制作者的批处理大小(batch.size),以增加单个生成请求中发送的消息批处理大小。
  • 为制作者(max.request.size)和消费者配置最大请求大小(fetch.max.bytes),以容纳更大的记录批处理。
  • 设置一个更高的最大限制(max.partition.fetch.bytes),以为每个分区返回多少数据。

确保批处理请求的最大大小至少为 message.max.bytes,以适应最大的记录批处理大小。

代理配置示例

message.max.bytes: 10000000
replica.fetch.max.bytes: 10485760
Copy to Clipboard Toggle word wrap

生成者配置示例

batch.size: 327680
max.request.size: 10000000
Copy to Clipboard Toggle word wrap

消费者配置示例

fetch.max.bytes: 10000000
max.partition.fetch.bytes: 10485760
Copy to Clipboard Toggle word wrap

也可以配置由 Kafka Bridge、Kafka Connect 和 MirrorMaker 2 等其他 Kafka 组件使用的制作者和消费者,以更有效地处理更大的信息。

Kafka Bridge

使用特定的制作者和消费者配置属性配置 Kafka Bridge:

  • producer.config for producers
  • consumer.config 用于消费者

Kafka Bridge 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # ...
  producer:
    config:
      batch.size: 327680
      max.request.size: 10000000
  consumer:
    config:
      fetch.max.bytes: 10000000
      max.partition.fetch.bytes: 10485760
      # ...
Copy to Clipboard Toggle word wrap

Kafka Connect

对于 Kafka Connect,配置负责使用制作者和消费者配置属性的前缀来发送和接收消息的源和接收器连接器:

  • 源连接器用来向 Kafka 集群发送消息的制作者的 producer.override
  • sink 连接器用于从 Kafka 集群检索消息 的消费者

Kafka Connect 源连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  # ...
  config:
    producer.override.batch.size: 327680
    producer.override.max.request.size: 10000000
    # ...
Copy to Clipboard Toggle word wrap

Kafka Connect sink 连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  # ...
  config:
    consumer.fetch.max.bytes: 10000000
    consumer.max.partition.fetch.bytes: 10485760
    # ...
Copy to Clipboard Toggle word wrap

MirrorMaker 2

对于 MirrorMaker 2,使用制作者和消费者配置属性的前缀配置源 Kafka 集群检索信息的源连接器:

  • 用于将数据复制到目标 Kafka 集群的运行时 Kafka Connect producer 的 producer.override
  • sink 连接器用于从源 Kafka 集群检索消息 的消费者

MirrorMaker 2 源连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 2
      config:
        producer.override.batch.size: 327680
        producer.override.max.request.size: 10000000
        consumer.fetch.max.bytes: 10000000
        consumer.max.partition.fetch.bytes: 10485760
        # ...
Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat