2.7. 处理大量消息
如果您的 AMQ Streams 部署需要处理大量信息,您可以使用配置选项来优化吞吐量和延迟。
Kafka producer 和使用者配置可帮助控制对 Kafka 代理的请求的大小和频率。有关配置选项的详情请参考以下内容:
您还可以将相同的配置选项与 Kafka Connect 运行时源连接器(包括 MirrorMaker 2.0)和接收器连接器使用的生产者和消费者使用相同的配置选项。
- 源连接器
- Kafka Connect 运行时中的制作者将信息发送到 Kafka 集群。
- 对于 MirrorMaker 2.0,由于源系统是 Kafka,消费者从源 Kafka 集群检索信息。
- sink 连接器
- Kafka Connect 运行时的使用者从 Kafka 集群检索信息。
对于消费者,您可以增加在单个提取请求中获取的数据量,以减少延迟。您可以使用 fetch.max.bytes 和 max.partition.fetch.bytes 属性来增加 fetch 请求大小。您还可以使用 max.poll.records 属性针对使用者缓冲区返回的消息数量设置最大限制。
对于 MirrorMaker 2.0,配置 fetch.max.bytes、max.partition.fetch.bytes 和 max.poll.records 值的来源连接器级别(consumer.*),因为它们与从来源获取消息的特定使用者相关。
对于制作者而言,您可以增加一个生成请求中发送的消息批处理的大小。您可以使用 batch.size 属性来增加批处理大小。较大的批处理大小可减少已发送的未处理消息的数量,以及消息队列中 backlog 的大小。发送到同一分区的消息一起批处理。当达到批处理大小时,生成的请求将发送到目标集群。通过增加批处理大小,生成请求会延迟,更多的消息会添加到批处理中,并同时发送到代理。当您只拥有处理大量消息的几个主题分区时,这可以提高吞吐量。
考虑制作者为适合生产批处理大小处理的记录数和大小。
使用 linger.ms 添加等待时间(以毫秒为单位),以在生产负载减少时延迟生成请求。延迟意味着,如果这些记录在最大批处理大小下,可以向批处理添加更多记录。
在源连接器级别上配置 batch.size 和 linger.ms 值(producer.override.*),因为它们与将消息发送到目标 Kafka 集群的特定制作者相关。
对于 Kafka Connect 源连接器,到目标 Kafka 集群的数据流管道如下:
Kafka Connect 源连接器的数据流管道
外部数据源 →(Kafka Connect 任务)源消息队列
对于 Kafka Connect sink 连接器,到目标外部数据源的数据流管道如下:
Kafka Connect sink 连接器的数据流管道
Source Kafka topic →(Kafka Connect tasks) sink 消息队列
对于 MirrorMaker 2.0,数据镜像管道到目标 Kafka 集群如下:
MirrorMaker 2.0 的数据镜像管道
Source Kafka topic →(Kafka Connect tasks)源消息队列
制作者在其缓冲中发送消息发送到目标 Kafka 集群中的主题。发生这种情况时,Kafka Connect 任务将继续轮询数据源,以将消息添加到源消息队列中。
源连接器的制作者缓冲区的大小使用 producer.override.buffer.memory 属性来设置。任务等待指定的超时时间(offset.flush.timeout.ms)等待缓冲区被清除。这应该有足够的时间,用于发送的消息被代理和提交偏移数据确认。源任务不会等待制作者在提交偏移之前清空消息队列,但关机期间除外。
如果制作者无法在源消息队列中保留消息吞吐量,则缓冲会被阻断,直到缓冲区内有限制的 max.block.ms 的时间里是否存在空间。在此期间,缓冲区中仍然发送任何未确认的信息。在这些消息被确认并清空前,不会向缓冲区添加新消息。
您可以尝试以下配置更改,将原始消息的基本源消息队列保持在可管理的大小:
-
以
offset.flush.timeout.ms的毫秒为单位增加默认值 - 确保有足够的 CPU 和内存资源
通过执行以下操作来增加并行运行的任务数量:
-
使用
tasksMax属性增加并行运行的任务数量 -
使用
replicas属性增加运行任务的 worker 节点数量
-
使用
根据可用 CPU 和内存资源和 worker 节点数量,考虑可以并行运行的任务数量。您可能需要调整配置值,直到它们具有所需的效果。
2.7.1. 为高卷信息配置 Kafka 连接 复制链接链接已复制到粘贴板!
Kafka Connect 从源外部数据系统获取数据,并将其传递给 Kafka Connect 运行时制作者,以便它复制到目标集群。
以下示例显示了使用 KafkaConnect 自定义资源的 Kafka Connect 配置。
处理大量消息的 Kafka 连接配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 3
config:
offset.flush.timeout.ms: 10000
# ...
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 2Gi
# ...
为源连接器添加制作者配置,该连接器通过 KafkaConnector 自定义资源进行管理。
处理大量消息的源连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 2
config:
producer.override.batch.size: 327680
producer.override.linger.ms: 100
# ...
FileStreamSourceConnector 和 FileStreamSinkConnector 作为示例连接器提供。有关将其部署为 KafkaConnector 资源的信息,请参阅 部署 KafkaConnector 资源。
为 sink 连接器添加了消费者配置。
处理大量消息的接收器连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-sink-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSinkConnector
tasksMax: 2
config:
consumer.fetch.max.bytes: 52428800
consumer.max.partition.fetch.bytes: 1048576
consumer.max.poll.records: 500
# ...
如果您使用 Kafka Connect API 而不是 KafkaConnector 自定义资源来管理您的连接器,您可以把连接器配置添加为 JSON 对象。
为处理大量消息的 curl 请求添加源连接器配置示例
curl -X POST \
http://my-connect-cluster-connect-api:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "my-source-connector",
"config":
{
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/opt/kafka/LICENSE",
"topic":"my-topic",
"tasksMax": "4",
"type": "source"
"producer.override.batch.size": 327680
"producer.override.linger.ms": 100
}
}'
2.7.2. 为高卷信息配置 MirrorMaker 2.0 复制链接链接已复制到粘贴板!
MirrorMaker 2.0 从源集群获取数据并将其写入 Kafka Connect 运行时制作者,以便它复制到目标集群。
以下示例显示了使用 KafkaMirrorMaker2 自定义资源的 MirrorMaker 2.0 的配置。
用于处理大量消息卷的 MirrorMaker 2.0 配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
version: 3.2.3
replicas: 1
connectCluster: "my-cluster-target"
clusters:
- alias: "my-cluster-source"
bootstrapServers: my-cluster-source-kafka-bootstrap:9092
- alias: "my-cluster-target"
config:
offset.flush.timeout.ms: 10000
bootstrapServers: my-cluster-target-kafka-bootstrap:9092
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector:
tasksMax: 2
config:
producer.override.batch.size: 327680
producer.override.linger.ms: 100
consumer.fetch.max.bytes: 52428800
consumer.max.partition.fetch.bytes: 1048576
consumer.max.poll.records: 500
# ...
resources:
requests:
cpu: "1"
memory: Gi
limits:
cpu: "2"
memory: 4Gi
2.7.3. 检查 MirrorMaker 2.0 消息流 复制链接链接已复制到粘贴板!
如果使用 Prometheus 和 Grafana 监控部署,您可以检查 MirrorMaker 2.0 消息流。
AMQ Streams 提供的 MirrorMaker 2.0 Grafana 仪表板示例显示了与 flush 管道相关的以下指标。
- Kafka Connect 的未处理消息队列中的信息数
- producer 缓冲的可用字节
- 偏移提交超时(以毫秒为单位)
您可以使用这些指标来衡量是否需要根据消息卷来调整配置。