第 18 章 处理大量信息


如果您的 AMQ Streams 部署需要处理大量信息,您可以使用配置选项来优化吞吐量和延迟。

生成者和消费者配置有助于控制对 Kafka 代理的请求大小和频率。有关配置选项的更多信息,请参阅以下内容:

您还可以将相同的配置选项与 Kafka Connect 运行时源连接器(包括 MirrorMaker 2)和接收器连接器使用的制作者和消费者一起使用。

源连接器
  • Kafka Connect 运行时的制作者会向 Kafka 集群发送信息。
  • 对于 MirrorMaker 2,因为源系统是 Kafka,消费者从源 Kafka 集群检索信息。
sink 连接器
  • Kafka Connect 运行时的用户从 Kafka 集群检索信息。

对于消费者,您可以增加在单个获取请求中获取的数据量,以减少延迟。您可以使用 fetch.max.bytesmax.partition.fetch.bytes 属性增加 fetch 请求大小。您还可以使用 max.poll.records 属性设置从消费者缓冲区返回的消息数的最大限制。

对于 MirrorMaker 2,在源连接器级别(consumer.*)配置 fetch.max.bytes, max.partition.fetch.bytes, 和 max.poll.records 值,因为它们与从源获取消息的特定消费者相关。

对于生成者,您可以增加在单个生成请求中发送的消息批处理的大小。您可以使用 batch.size 属性增加批处理大小。更大的批处理大小可减少准备好发送的未完成消息的数量以及消息队列中的 backlog 的大小。发送到同一分区的消息会并行批处理。当达到批处理大小时,将生成请求发送到目标集群。通过增加批处理大小,生成请求会延迟,并将更多消息添加到批处理中,并同时发送到代理。当您只有处理大量消息的几个主题分区时,这可以提高吞吐量。

考虑制作者为合适的生产批处理大小处理的记录的数量和大小。

使用 linger.ms 添加等待时间(毫秒)以在生成者负载减少时延迟生成请求。延迟意味着,如果记录在最大批处理大小下,可以添加更多记录到批处理中。

在连接器级别 (producer.override.*) 配置 batch.sizelinger.ms 值,因为它们与向目标 Kafka 集群发送信息的特定制作者相关。

对于 Kafka Connect 源连接器,到目标 Kafka 集群的数据流管道如下:

Kafka Connect 源连接器的数据流管道

External data source →(Kafka Connect tasks) source message queue producer buffer target Kafka topic

对于 Kafka Connect sink 连接器,到目标外部数据源的数据流管道如下:

Kafka Connect sink 连接器的数据流管道

source Kafka topic (Kafka Connect tasks) sink message queue consumer buffer external data source

对于 MirrorMaker 2,目标 Kafka 集群的数据镜像管道如下:

MirrorMaker 2 的数据镜像管道

source Kafka topic (Kafka Connect tasks) source message queue producer buffer target Kafka topic

制作者在其缓冲区中向目标 Kafka 集群中的主题发送信息。当发生这种情况时,Kafka Connect 任务将继续轮询数据源,以将信息添加到源消息队列中。

源连接器的制作者缓冲区的大小使用 producer.override.buffer.memory 属性进行设置。在缓冲区被清除前,任务会等待指定的超时周期(offset.flush.timeout.ms)。这应该有足够的时间,以便代理和偏移数据提交的信息可以确认发送消息。除关闭期间,源任务不会等待制作者为空消息队列,然后再提交偏移。

如果生成者无法满足源消息队列中消息的吞吐量,则缓冲区会被阻断,直到缓冲区内由 max.block.ms 绑定的时间周期内可用。在此期间,缓冲区中仍然发送任何未确认的信息。在确认和清除这些消息之前,新消息不会添加到缓冲区中。

您可以尝试以下配置更改,将未完成的消息的底层源消息队列保持在 manageable 大小下:

  • offset.flush.timeout.ms为单位增加默认值
  • 确保有足够的 CPU 和内存资源
  • 通过执行以下操作增加并行运行的任务数量:

    • 使用 tasksMax 属性增加并行运行的任务数量
    • 使用 replicas 属性增加运行任务的 worker 节点数量

根据可用的 CPU 和内存资源和 worker 节点数量,请考虑可以并行运行的任务数量。您可能需要继续调整配置值,直到它们具有所需的效果。

18.1. 为高容量信息配置 Kafka 连接

Kafka Connect 从源外部数据系统获取数据,并将其传递给 Kafka Connect 运行时制作者,使其复制到目标集群。

以下示例显示了使用 KafkaConnect 自定义资源的 Kafka Connect 配置。

处理大量消息的 Kafka 连接配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 3
  config:
    offset.flush.timeout.ms: 10000
    # ...
  resources:
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # ...
Copy to Clipboard Toggle word wrap

为源连接器添加生成者配置,该连接器通过 KafkaConnector 自定义资源进行管理。

处理大量信息的源连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    producer.override.batch.size: 327680
    producer.override.linger.ms: 100
    # ...
Copy to Clipboard Toggle word wrap

注意

FileStreamSourceConnectorFileStreamSinkConnector 作为示例连接器提供。有关将它们部署为 KafkaConnector 资源的详情,请参考 第 6.4.3.3 节 “部署 KafkaConnector 资源”

为接收器连接器添加消费者配置。

处理大量消息的接收器连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSinkConnector
  tasksMax: 2
  config:
    consumer.fetch.max.bytes: 52428800
    consumer.max.partition.fetch.bytes: 1048576
    consumer.max.poll.records: 500
    # ...
Copy to Clipboard Toggle word wrap

如果您使用 Kafka Connect API 而不是 KafkaConnector 自定义资源来管理连接器,您可以将连接器配置添加为 JSON 对象。

添加用于处理大量消息的源连接器配置的 curl 请求示例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
      "producer.override.batch.size": 327680
      "producer.override.linger.ms": 100
    }
}'
Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat