第 25 章 处理大量信息
如果您的 AMQ Streams 部署需要处理大量信息,您可以使用配置选项优化吞吐量和延迟。
生产者和消费者配置可帮助控制对 Kafka 代理的请求大小和频率。有关配置选项的更多信息,请参阅以下内容:
您还可以将相同的配置选项与 Kafka Connect 运行时源连接器(包括 MirrorMaker 2)和接收器连接器使用的制作者和消费者一起使用。
- 源连接器
- 来自 Kafka Connect 运行时的制作者向 Kafka 集群发送信息。
- 对于 MirrorMaker 2,因为源系统是 Kafka,因此消费者从源 Kafka 集群检索信息。
- sink 连接器
- Kafka Connect 运行时中的消费者从 Kafka 集群检索信息。
对于消费者,您可以增加在单个获取请求中获取的数据量,以减少延迟。您可以使用 fetch.max.bytes 和 max.partition.fetch.bytes 属性增加 fetch 请求大小。您还可以使用 max.poll.records 属性设置从消费者缓冲区返回的消息数量的最大限制。
对于 MirrorMaker 2,在源连接器级别(consumer.*)配置 fetch.max.bytes, max.partition.fetch.bytes, 和 max.poll.records 值,因为它们与从源获取消息的特定消费者相关。
对于制作者,您可以增加在单个生成请求中发送的消息批处理的大小。您可以使用 batch.size 属性增加批处理大小。更大的批处理大小可减少准备好发送的未完成消息的数量,以及消息队列中积压的大小。发送到同一分区的消息将一起批处理。当达到批处理大小时,生成请求将发送到目标集群。通过增加批处理大小,生成请求会延迟,更多的消息添加到批处理中,并同时发送到代理。当您只有几个处理大量消息的主题分区时,这可以提高吞吐量。
考虑制作者为合适的生产批处理大小处理的记录的数量和大小。
使用 linger.ms 添加等待时间(以毫秒为单位),以延迟生产负载减少时生成请求。delay 表示如果记录位于最大批处理大小下,则可以向批处理添加更多记录。
在连接器级别 (producer.override.*) 配置 batch.size 和 linger.ms 值,因为它们与向目标 Kafka 集群发送信息的特定制作者相关。
对于 Kafka Connect 源连接器,到目标 Kafka 集群的数据流管道如下:
Kafka Connect 源连接器的数据流管道
External data source →(Kafka Connect tasks) source message queue
对于 Kafka Connect sink 连接器,数据流管道到目标外部数据源,如下所示:
Kafka Connect sink 连接器的数据流管道
source Kafka topic
对于 MirrorMaker 2,数据将管道镜像到目标 Kafka 集群,如下所示:
MirrorMaker 2 的数据镜像管道
source Kafka topic
producer 会将缓冲区中的信息发送到目标 Kafka 集群中的主题。在发生这种情况时,Kafka Connect 任务继续轮询数据源,以将消息添加到源消息队列中。
源连接器的制作者缓冲区的大小使用 producer.override.buffer.memory 属性设置。任务在清除缓冲区前等待指定的超时时间(offset.flush.timeout.ms)。这应该有足够的时间供代理和提交的偏移数据确认。源任务不会等待生成者在提交偏移前清空消息队列,但关机期间除外。
如果生产者无法满足源消息队列中消息的吞吐量,则缓冲区将被阻止,直到缓冲区中的空间在 max.block.ms 绑定的时间段内有空间可用。在此期间,缓冲区中仍然发送任何未确认的信息。在确认和刷新这些消息前,不会向缓冲区添加新消息。
您可以尝试以下配置更改,将未处理消息的底层源消息队列保持为可管理的大小:
-
将默认值(以毫秒为单位)增加
offset.flush.timeout.ms - 确保有足够的 CPU 和内存资源
通过执行以下操作增加并行运行的任务数量:
-
使用
tasksMax属性增加并行运行的任务数量 -
使用
replicas属性增加运行任务的 worker 节点数量
-
使用
根据可用的 CPU 和内存资源和 worker 节点数量,请考虑可以并行运行的任务数量。您可能需要调整配置值,直到它们有所需的效果。
25.1. 为高卷信息配置 Kafka Connect 复制链接链接已复制到粘贴板!
Kafka Connect 从源外部数据系统获取数据,并将其传递给 Kafka Connect 运行时制作者,使其复制到目标集群。
以下示例显示了使用 KafkaConnect 自定义资源的 Kafka Connect 配置。
用于处理大量消息的 Kafka Connect 配置示例
为源连接器添加生成者配置,该连接器通过 KafkaConnector 自定义资源进行管理。
用于处理大量信息的源连接器配置示例
FileStreamSourceConnector 和 FileStreamSinkConnector 作为示例连接器提供。有关将其部署为 KafkaConnector 资源的信息,请参考 第 6.5.3 节 “部署 KafkaConnector 资源”。
为接收器连接器添加消费者配置。
用于处理大量消息的接收器连接器配置示例
如果您使用 Kafka Connect API 而不是 KafkaConnector 自定义资源来管理连接器,您可以将连接器配置添加为 JSON 对象。
为处理大量信息添加源连接器配置的 curl 请求示例