2.7. 处理大量消息
如果您的 AMQ Streams 部署需要处理大量信息,您可以使用配置选项来优化吞吐量和延迟。
Kafka producer 和使用者配置可帮助控制对 Kafka 代理的请求的大小和频率。有关配置选项的详情请参考以下内容:
您还可以将相同的配置选项与 Kafka Connect 运行时源连接器(包括 MirrorMaker 2.0)和接收器连接器使用的生产者和消费者使用相同的配置选项。
- 源连接器
- Kafka Connect 运行时中的制作者将信息发送到 Kafka 集群。
- 对于 MirrorMaker 2.0,由于源系统是 Kafka,消费者从源 Kafka 集群检索信息。
- sink 连接器
- Kafka Connect 运行时的使用者从 Kafka 集群检索信息。
对于消费者,您可以增加在单个提取请求中获取的数据量,以减少延迟。您可以使用 fetch.max.bytes 和 max.partition.fetch.bytes 属性来增加 fetch 请求大小。您还可以使用 max.poll.records 属性针对使用者缓冲区返回的消息数量设置最大限制。
对于 MirrorMaker 2.0,配置 fetch.max.bytes、max.partition.fetch.bytes 和 max.poll.records 值的来源连接器级别(consumer.*),因为它们与从来源获取消息的特定使用者相关。
对于制作者而言,您可以增加一个生成请求中发送的消息批处理的大小。您可以使用 batch.size 属性来增加批处理大小。较大的批处理大小可减少已发送的未处理消息的数量,以及消息队列中 backlog 的大小。发送到同一分区的消息一起批处理。当达到批处理大小时,生成的请求将发送到目标集群。通过增加批处理大小,生成请求会延迟,更多的消息会添加到批处理中,并同时发送到代理。当您只拥有处理大量消息的几个主题分区时,这可以提高吞吐量。
考虑制作者为适合生产批处理大小处理的记录数和大小。
使用 linger.ms 添加等待时间(以毫秒为单位),以在生产负载减少时延迟生成请求。延迟意味着,如果这些记录在最大批处理大小下,可以向批处理添加更多记录。
在源连接器级别上配置 batch.size 和 linger.ms 值(producer.override.*),因为它们与将消息发送到目标 Kafka 集群的特定制作者相关。
对于 Kafka Connect 源连接器,到目标 Kafka 集群的数据流管道如下:
Kafka Connect 源连接器的数据流管道
外部数据源 →(Kafka Connect 任务)源消息队列
对于 Kafka Connect sink 连接器,到目标外部数据源的数据流管道如下:
Kafka Connect sink 连接器的数据流管道
Source Kafka topic →(Kafka Connect tasks) sink 消息队列
对于 MirrorMaker 2.0,数据镜像管道到目标 Kafka 集群如下:
MirrorMaker 2.0 的数据镜像管道
Source Kafka topic →(Kafka Connect tasks)源消息队列
制作者在其缓冲中发送消息发送到目标 Kafka 集群中的主题。发生这种情况时,Kafka Connect 任务将继续轮询数据源,以将消息添加到源消息队列中。
源连接器的制作者缓冲区的大小使用 producer.override.buffer.memory 属性来设置。任务等待指定的超时时间(offset.flush.timeout.ms)等待缓冲区被清除。这应该有足够的时间,用于发送的消息被代理和提交偏移数据确认。源任务不会等待制作者在提交偏移之前清空消息队列,但关机期间除外。
如果制作者无法在源消息队列中保留消息吞吐量,则缓冲会被阻断,直到缓冲区内有限制的 max.block.ms 的时间里是否存在空间。在此期间,缓冲区中仍然发送任何未确认的信息。在这些消息被确认并清空前,不会向缓冲区添加新消息。
您可以尝试以下配置更改,将原始消息的基本源消息队列保持在可管理的大小:
-
以
offset.flush.timeout.ms的毫秒为单位增加默认值 - 确保有足够的 CPU 和内存资源
通过执行以下操作来增加并行运行的任务数量:
-
使用
tasksMax属性增加并行运行的任务数量 -
使用
replicas属性增加运行任务的 worker 节点数量
-
使用
根据可用 CPU 和内存资源和 worker 节点数量,考虑可以并行运行的任务数量。您可能需要调整配置值,直到它们具有所需的效果。
2.7.1. 为高卷信息配置 Kafka 连接 复制链接链接已复制到粘贴板!
Kafka Connect 从源外部数据系统获取数据,并将其传递给 Kafka Connect 运行时制作者,以便它复制到目标集群。
以下示例显示了使用 KafkaConnect 自定义资源的 Kafka Connect 配置。
处理大量消息的 Kafka 连接配置示例
为源连接器添加制作者配置,该连接器通过 KafkaConnector 自定义资源进行管理。
处理大量消息的源连接器配置示例
FileStreamSourceConnector 和 FileStreamSinkConnector 作为示例连接器提供。有关将其部署为 KafkaConnector 资源的信息,请参阅 部署 KafkaConnector 资源。
为 sink 连接器添加了消费者配置。
处理大量消息的接收器连接器配置示例
如果您使用 Kafka Connect API 而不是 KafkaConnector 自定义资源来管理您的连接器,您可以把连接器配置添加为 JSON 对象。
为处理大量消息的 curl 请求添加源连接器配置示例
2.7.2. 为高卷信息配置 MirrorMaker 2.0 复制链接链接已复制到粘贴板!
MirrorMaker 2.0 从源集群获取数据并将其写入 Kafka Connect 运行时制作者,以便它复制到目标集群。
以下示例显示了使用 KafkaMirrorMaker2 自定义资源的 MirrorMaker 2.0 的配置。
用于处理大量消息卷的 MirrorMaker 2.0 配置示例
2.7.3. 检查 MirrorMaker 2.0 消息流 复制链接链接已复制到粘贴板!
如果使用 Prometheus 和 Grafana 监控部署,您可以检查 MirrorMaker 2.0 消息流。
AMQ Streams 提供的 MirrorMaker 2.0 Grafana 仪表板示例显示了与 flush 管道相关的以下指标。
- Kafka Connect 的未处理消息队列中的信息数
- producer 缓冲的可用字节
- 偏移提交超时(以毫秒为单位)
您可以使用这些指标来衡量是否需要根据消息卷来调整配置。