第 11 章 处理大量信息
如果您的 AMQ Streams 部署需要处理大量信息,您可以使用配置选项优化吞吐量和延迟。
Kafka 制作者和消费者配置可帮助控制请求的大小和频率到 Kafka 代理。有关配置选项的详情请参考以下内容:
您还可以将相同的配置选项与 Kafka Connect 运行时源连接器(包括 MirrorMaker 2)和接收器连接器使用的制作者和消费者一起使用。
- 源连接器
- 来自 Kafka Connect 运行时的制作者将信息发送到 Kafka 集群。
- 对于 MirrorMaker 2,因为源系统是 Kafka,消费者从源 Kafka 集群检索信息。
- sink 连接器
- Kafka Connect 运行时的使用者从 Kafka 集群检索信息。
对于消费者配置(consumer.*),您可以增加在单个获取请求中获取的数据量,以减少延迟。您可以使用 fetch.max.bytes 和 max.partition.fetch.bytes 属性增加 fetch 请求大小。您还可以使用 max.poll.records 属性对从消费者缓冲区返回的消息数设置最大限制。
对于制作者配置(producer.*),您可以增加单个生成请求中发送的消息批处理的大小。您可以使用 batch.size 属性增加批处理大小。较大的批处理大小可减少准备发送的未完成的消息数量,以及消息队列中 backlog 的大小。发送到同一分区的消息将一起批处理。当达到批处理大小时,生成请求将发送到目标集群。通过增大批处理大小,生成请求会延迟,更多的消息会添加到批处理中,并同时发送到代理。当您只有几个处理大量消息的主题分区时,这可以提高吞吐量。
考虑制作者为合适的生产批处理大小处理的记录的数量和大小。
使用 linger.ms 添加等待时间(以毫秒为单位),以延迟生产者负载下降时产生请求。延迟意味着如果更多记录处于最大批处理大小,则可以将它们添加到批处理中。
对于 Kafka Connect 源连接器,到目标 Kafka 集群的数据流管道如下:
Kafka Connect 源连接器的数据流管道
外部数据源 →(Kafka Connect tasks) source message queue
对于 Kafka Connect sink 连接器,到目标外部数据源的数据流管道如下:
Kafka Connect sink connector 的数据流管道
source Kafka topic
对于 MirrorMaker 2,目标 Kafka 集群的数据镜像管道如下:
MirrorMaker 2 的数据镜像管道
source Kafka topic
制作者将其缓冲区中的信息发送到目标 Kafka 集群中的主题。虽然发生这种情况,Kafka Connect 任务将继续轮询数据源,以将消息添加到源消息队列中。
源连接器的制作者缓冲区的大小使用 buffer.memory 属性设置。任务在清除缓冲区前等待指定的超时时间(offset.flush.timeout.ms)。这应该足够了发送的消息,以便代理和偏移所提交的数据被确认。源任务不会在提交偏移之前等待制作者清空消息队列,除非关机期间除外。
如果生成者无法满足源消息队列中消息的吞吐量,缓冲将被阻止,直到由 max.block.ms 绑定的时间期间内有可用空间。在此期间,缓冲区中仍然发送任何未确认的信息。在这些消息被确认并清空之前,新消息才会添加到缓冲区中。
您可以尝试以下配置更改,使未预期的消息的基本源消息队列保持在可管理的大小:
-
增加
offset.flush.timeout.ms的默认值(以毫秒为单位) - 确保有足够的 CPU 和内存资源
通过执行以下操作来增加并行运行的任务数量:
-
使用
tasks.max属性增加并行运行的任务数量 - 为运行任务的 worker 增加节点数量
-
使用
考虑可根据可用的 CPU 和内存资源和 worker 节点数量来并行运行的任务数量。您可能需要继续调整配置值,直到它们具有所需的效果。
11.1. 为高容量信息配置 Kafka Connect 复制链接链接已复制到粘贴板!
Kafka Connect 从源外部数据系统获取数据,并将其传递给 Kafka Connect 运行时制作者,使其复制到目标集群。
以下示例显示了 Kafka Connect 源连接器的配置。
处理大量信息的源连接器配置示例
# ... producer.batch.size=327680 producer.linger.ms=100 # ... tasks.max = 2
# ...
producer.batch.size=327680
producer.linger.ms=100
# ...
tasks.max = 2
为接收器连接器添加使用者配置。
处理大量信息的接收器连接器配置示例