第 9 章 处理大量消息


如果您的 AMQ Streams 部署需要处理大量信息,您可以使用配置选项优化吞吐量和延迟。

Kafka producer 和使用者配置可帮助控制到 Kafka 代理的请求的大小和频率。有关配置选项的更多信息,请参阅以下内容:

您还可以将相同的配置选项用于 Kafka Connect 运行时源连接器(包括 MirrorMaker 2.0)和接收器连接器使用的生产者和使用者。

源连接器
  • Kafka Connect 运行时的生产者向 Kafka 集群发送信息。
  • 对于 MirrorMaker 2.0,因为源系统是 Kafka,消费者从源 Kafka 集群检索信息。
sink 连接器
  • Kafka Connect 运行时的用户从 Kafka 集群检索信息。

对于消费者配置(consumer.*),您可以增加在单个获取请求中获取的数据量,以减少延迟。您可以使用 fetch.max.bytesmax.partition.fetch.bytes 属性增加 fetch 请求大小。您还可以使用 max.poll.records 属性,设置从消费者缓冲区返回的消息数上限。

对于制作者配置(制作者*),您可以增加在单个生成请求中发送的消息批处理大小。您可以使用 batch.size 属性增加批处理大小。较大的批处理大小可减少已准备好发送的未处理消息的数量,以及消息队列中 backlog 的大小。发送到同一分区的消息将计入一起。当达到批处理大小时,生成请求将发送到目标集群。通过增加批处理大小,生成请求会延迟,并将更多消息添加到批处理中,并同时发送到代理。当您只有一个处理大量消息的主题分区时,这可以提高吞吐量。

考虑制作者为合适的生产批处理大小处理的记录的数量和大小。

使用 linger.ms 添加等待时间(以毫秒为单位),以在制作者负载下降时延迟生成请求。延迟意味着如果更多记录在最大批处理大小下,可以将更多记录添加到批处理中。

对于 Kafka Connect 源连接器,目标 Kafka 集群的数据流管道如下:

Kafka Connect 源连接器的数据流管道

外部数据源 →(Kafka Connect tasks)源消息队列 producer buffer target Kafka 主题

对于 Kafka Connect sink 连接器,目标外部数据源的数据流管道如下:

Kafka Connect sink 连接器的数据流管道

source Kafka topic (Kafka Connect tasks) sink message queue consumer buffer external data source

对于 MirrorMaker 2.0,目标 Kafka 集群的数据镜像管道如下:

MirrorMaker 2.0 的数据镜像管道

source Kafka topic (Kafka Connect tasks) source message queue producer buffer target Kafka topic

producer 将其缓冲区中的消息发送到目标 Kafka 集群中的主题。虽然发生这种情况,Kafka Connect 任务将继续轮询数据源,以将消息添加到源消息队列中。

源连接器的制作缓冲区的大小使用 buffer.memory 属性进行设置。任务在清空缓冲区前等待指定的超时期限(offset.flush.timeout.ms。这应该有足够的时间使发送的消息被提交的代理和偏移数据确认。除在关闭期间,源任务不会在提交偏移前等待生产者清空消息队列。

如果制作者无法与源消息队列中消息的吞吐量保持同步,则会阻断缓冲,直到缓冲区中可用的空间被 max.block.ms 绑定。在这个时间段内,仍会在缓冲区中发送任何未确认的消息。在确认并清空这些消息之前,新消息不会添加到缓冲区中。

您可以尝试以下配置更改,使基础源消息队列保持未完成消息的可管理大小:

  • offset.flush.timeout.ms的毫秒内增加默认值
  • 确保有足够的 CPU 和内存资源
  • 通过以下方法增加并行运行的任务数量:

    • 使用 tasks.max 属性增加并行运行的任务数量
    • 为运行任务的 worker 增加节点数量

考虑根据可用的 CPU 和内存资源和 worker 节点数量来并行运行的任务数量。您可能需要调整配置值,直到它们具有所需的效果。

9.1. 为高容量信息配置 Kafka Connect

Kafka Connect 从源外部数据系统获取数据,并将其传递给 Kafka Connect 运行时制作者,使其复制到目标集群。

以下示例显示了 Kafka Connect 源连接器的配置。

处理大量消息的源连接器配置示例

# ...
producer.batch.size=327680
producer.linger.ms=100
# ...
tasks.max = 2

为 sink 连接器添加了消费者配置。

处理大量信息的接收器连接器配置示例

# ...
consumer.fetch.max.bytes=52428800
consumer.max.partition.fetch.bytes=1048576
consumer.max.poll.records=500
# ...
tasks.max = 2

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部