12.7. 调整 Kafka 配置


使用配置属性优化 Kafka 代理、生产者和使用者的性能。

需要最小的配置属性集合,但您可以添加或调整属性以更改生产者和使用者如何与 Kafka 代理交互。例如,您可以调整消息的延迟和吞吐量,以便客户端能够实时响应数据。

首先,您可以分析指标来测量进行初始配置的位置,然后逐步更改并进一步比较指标,直到您拥有所需的配置。

其它资源

12.7.1. Kafka 代理配置调整

使用配置属性优化 Kafka 代理的性能。您可以使用标准 Kafka 代理配置选项,但由 AMQ Streams 管理的属性除外。

12.7.1.1. 基本代理配置

某些代理配置选项由 Kafka 自定义资源规格直接由 AMQ Streams 管理:

  • broker.id 是 Kafka 代理的 ID
  • log.dirs 是日志数据的目录
  • zookeeper.connect 是用于将 Kafka 与 ZooKeeper 连接的配置
  • 侦听器 向客户端公开 Kafka 集群
  • 授权 机制允许或拒绝用户执行的操作
  • 验证 机制证明需要访问 Kafka 的用户的身份

代理 ID 从 0(零)开始,对应于代理副本数。日志目录根据 Kafka 自定义资源中的 spec. kafka.storage 配置挂载到 /var/lib/kafka/data/kafka -logIDX 中。IDX 是 Kafka 代理 pod 索引。

因此,您无法通过 Kafka 自定义资源的 config 属性配置 这些选项。有关排除列表,请参阅 KafkaClusterSpec 模式引用

但是,典型的代理配置将包括与主题、线程和日志相关的属性设置。

基本代理配置属性

# ...
num.partitions=1
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
num.network.threads=3
num.io.threads=8
num.recovery.threads.per.data.dir=1
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
group.initial.rebalance.delay.ms=0
zookeeper.connection.timeout.ms=6000
# ...

12.7.1.2. 复制高可用性主题

基本主题属性为主题设置默认分区和复制因子,它们将应用到没有明确设置这些属性的主题,包括在自动创建主题时。

# ...
num.partitions=1
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
replica.fetch.max.bytes=1048576
# ...

auto.create.topics.enable 属性默认为启用,因此生产者和使用者需要时会自动创建不存在的主题。如果您使用自动创建主题,可以使用 num.partitions 为主题设置默认分区数。但通常禁用此属性,以便通过创建显式主题来更好地控制主题,您可以使用 AMQ Streams KafkaTopic 资源或应用程序来创建主题。

对于高可用性环境,建议将主题的复制因素增加到至少 3,并将所需同步副本的最小数量设置为比复制因素少 1 个。对于使用 KafkaTopic 资源创建的主题,复制因素使用 spec.replicas 设置。

对于 数据持久性,您还应在 主题 配置中设置 min.insync.replicas,并在 制作者 配置中使用 acks=all 设置消息发送确认。

使用 replica.fetch.max.bytes 设置复制领导分区的每个后续程序获取的消息的最大大小(以字节为单位)。根据平均消息大小和吞吐量更改此值。在考虑读/写缓冲所需的总内存分配时,可用内存还必须能够适应所有跟随者乘以时的最大复制消息大小。

delete.topic.enable 属性默认为启用,以允许删除主题。在生产环境中,您应该禁用此属性以避免意外删除主题,从而导致数据丢失。但是,您可以临时启用它并删除主题,然后再次禁用它。如果启用了 delete.topic.enable,您可以使用 KafkaTopic 资源删除主题。

# ...
auto.create.topics.enable=false
delete.topic.enable=true
# ...

12.7.1.3. 事务和提交的内部主题设置

如果您使用事务来 启用从生产者到分区的原子写入,事务状态将存储在内部 __transaction_state 主题中。默认情况下,代理被配置为有 3 个复制因素,这个主题最少需要 2 个 in-sync 副本,这意味着 Kafka 集群中至少需要三个代理。

# ...
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ...

同样,存储消费者状态的内部 _ consumer_offsets 主题对分区数量和复制因素具有默认设置。

# ...
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
# ...

不要在生产环境中减少这些设置。您可以 在生产环境中 增加设置。作为例外,您可能要减少单broker 测试 环境中的设置。

12.7.1.4. 通过增加 I/O 线程改进请求处理吞吐量

网络线程处理对 Kafka 集群的请求,如从客户端应用程序生成和获取请求。将生成请求放入请求队列中。将响应放入响应队列中。

网络线程数量应当反映复制因素以及与 Kafka 集群交互的客户端生产者和消费者的活动级别。如果您要有大量请求,您可以增加线程数量,使用时间线程闲置来确定何时添加更多线程。

要减少拥塞并规范请求流量,您可以在阻止网络线程前限制请求队列中允许的请求数。

I/O 线程从请求队列获取请求来处理它们。添加更多线程可提高吞吐量,但 CPU 内核数和磁盘带宽会施加一个实际的上限。至少,I/O 线程数量应当等于存储卷的数量。

# ...
num.network.threads=3 1
queued.max.requests=500 2
num.io.threads=8 3
num.recovery.threads.per.data.dir=1 4
# ...
1
Kafka 集群的网络线程数量。
2
请求队列中允许的请求数。
3
Kafka 代理的 I/O 线程数量。
4
用于在启动时加载日志的线程数,在关机时用于清除日志。

所有代理的线程池的配置更新可能会在集群级别动态进行。这些更新限制为当前大小的一半和当前大小的两倍。

注意

Kafka 代理指标可帮助确定所需的线程数量。例如,平均时间网络线程闲置的指标(kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent)表示已使用的资源的百分比。如果有 0% 的空闲时间,则所有资源都在使用,这意味着添加更多线程可能很有用。

如果线程因为磁盘数量慢或受限,您可以尝试增加网络请求的缓冲大小以提高吞吐量:

# ...
replica.socket.receive.buffer.bytes=65536
# ...

另外,增加 bytes Kafka 可接收的最大字节数:

# ...
socket.request.max.bytes=104857600
# ...

12.7.1.5. 为高延迟连接增加带宽

Kafka 批量数据,以便在从 Kafka 到客户端的高延迟连接中实现合理的吞吐量,如数据中心之间的连接。但是,如果存在高延迟问题,您可以增加用于发送和接收消息的缓冲区的大小。

# ...
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# ...

您可以使用 带宽延迟(以字节/秒为单位)估算缓冲区的最大带宽,以估算保持最大 吞吐量所需的缓冲区的大小(以字节/为单位)。

12.7.1.6. 使用数据保留策略管理日志

Kafka 使用日志来存储消息数据。日志是与各种索引关联的一系列片段。新消息写入 活动 段,随后从不修改。服务片段从消费者获取请求时读取。活动片段定期被 滚动 为只读,并创建一个新的活跃网段来替换它。次只有一个部分处于活动状态。旧片段会保留,直到它们符合删除条件。

代理级别的配置设置日志片段的最大大小,以及推出活跃片段前以毫秒为单位的最大时间:

# ...
log.segment.bytes=1073741824
log.roll.ms=604800000
# ...

您可以使用 segment. bytessegment.ms,在主题级别上覆盖这些设置。您是否需要降低还是提升这些值取决于片段删除策略。较大的大小意味着活跃片段包含更多信息,且不频繁地推出。段也不再频繁地符合删除条件。

您可以设置基于时间或基于大小的日志保留和清理策略,以便保持日志可被管理。根据您的要求,您可以使用日志保留配置来删除旧片段。如果使用日志保留策略,在达到保留限制时会删除非活跃日志片段。删除旧片段会绑定日志所需的存储空间,因此您不会超过磁盘容量。

对于基于时间的日志保留,您需要根据小时、分钟和毫秒设置保留期限。保留周期基于附加至网段的时间信息。

毫秒配置具有超过分钟的优先级,其优先级高于小时。默认情况下,分钟和毫秒配置为空,但这三个选项提供了对您要保留的数据的大量控制。应优先使用毫秒配置,因为它是唯一可动态更新的三个属性之一。

# ...
log.retention.ms=1680000
# ...

如果 log.retention.ms 设为 -1,则不会对日志保留应用任何时间限制,因此所有日志都会保留。磁盘使用量应始终受到监控,但通常不建议 -1 设置,因为它可能会导致完整磁盘出现问题,而这可能难以修复。

对于基于大小的日志保留,您需要以字节为单位设置最大日志大小(日志中所有片段):

# ...
log.retention.bytes=1073741824
# ...

换句话说,当日志达到稳定状态后,通常大约有 log.retention.bytes/log.segment.bytes 片段。达到最大日志大小时,会删除旧的网段。

使用最大日志大小时潜在的问题在于它没有考虑消息附加到片段中的时间。您可以将基于时间和大小的日志保留用于清理策略,以获得所需的平衡。达到阈值时首先触发清理。

如果要在从系统中删除片段文件前添加时间延迟,您可以为主题配置中的特定主题使用 log.segment.delete.delay.ms 为代理级别或 file.delay.ms 添加延迟。

# ...
log.segment.delete.delay.ms=60000
# ...

12.7.1.7. 使用清理策略删除日志数据

删除较旧日志数据的方法由 日志清理 配置决定。

默认情况下,为代理启用日志清理功能:

# ...
log.cleaner.enable=true
# ...

您可以在主题或代理级别设置清理策略。代理级配置是尚未设置策略的主题的默认设置。

您可以将策略设置为删除日志、紧凑日志,或同时执行这两者:

# ...
log.cleanup.policy=compact,delete
# ...

delete 策略与使用数据保留策略管理日志对应。当不需要永久保留数据时,它非常适合。紧凑 策略保证为每个消息键保留最新的消息。日志紧凑适合更改消息值,您想要保留最新的更新。

如果将清理策略设置为删除日志,则会根据日志保留限制删除旧的片段。否则,如果没有启用日志清理功能,且没有日志保留限制,日志将继续增加。

如果为日志紧凑设置了清理策略,日志 的头 将作为一个标准 Kafka 日志运行,按顺序为新消息写入操作。在紧凑日志的 尾部 (日志清理操作),如果日志中稍后出现具有相同密钥的另一条记录,则将删除记录。带有 null 值的消息也会被删除。如果不使用键,就无法使用紧凑,因为需要键来识别相关消息。虽然 Kafka 保证每个密钥的最新消息将被保留,但它不能保证整个紧凑的日志不会包含重复项。

图 12.1. 在压缩前以偏移位置显示键值写入的日志

显示键值写入的紧凑图像

使用密钥来识别信息,Kafka 紧凑会保留特定消息键的最新消息(带有最高偏移值),最终丢弃之前具有相同密钥的消息。换句话说,其最新状态的消息始终可用,当日志安全运行时,该特定消息的任何过期记录最终都会被删除。您可以将消息恢复到以前的状态。

即使记录被删除,记录也会保留原始偏移。因此,尾部可能有非连续偏移。当消耗在尾部中不再可用的偏移时,会找到具有下一个高偏移量的记录。

图 12.2. 紧凑后日志

日志清理后紧凑的镜像

如果您只选择紧凑的策略,您的日志仍然可以变得任意大。在这种情况下,您可以将策略设置为压缩 和删除 日志。如果您选择压缩和删除,则首先压缩日志数据,从而删除日志头带有键的记录。之后,在删除日志保留阈值之前发布的数据。

图 12.3. 日志保留点和压缩点

带有保留点的压缩镜像

您需要设置以毫秒为单位检查清理的频率:

# ...
log.retention.check.interval.ms=300000
# ...

调整与日志保留设置相关的日志保留检查间隔。较小的保留大小可能需要更频繁地检查。

清理的频率应该足以管理磁盘空间,但经常影响某个主题的性能。

您还可以以毫秒为单位设置一个时间,以便在没有要清理的日志时将清理干净到备用中:

# ...
log.cleaner.backoff.ms=15000
# ...

如果您选择删除旧的日志数据,您可以在清除数据前设置一个毫秒来保留删除的数据:

# ...
log.cleaner.delete.retention.ms=86400000
# ...

删除的数据保留期限给予了时间,可以注意到数据在不可删除之前已丢失。

若要删除与特定密钥相关的所有消息,制作者可以发送 tomb stone 消息。am bstone 具有 null 值,充当标记来告知消费者删除其值。紧凑后,只保留了 tombstone,它必须持续足够长的时间,使消费者能够知道该消息被删除。删除旧消息后,没有值,也会从分区中删除 tombstone 密钥。

12.7.1.8. 管理磁盘使用率

还有许多其他与日志清理相关的配置设置,但特别重要的是内存分配。

deduplication 属性指定在所有日志清理线程中清理的总内存。您可以设置通过缓冲区负载因数使用的内存百分比的上限。

# ...
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.io.buffer.load.factor=0.9
# ...

每个日志条目都使用正好 24 字节,因此您可以计算一次缓冲区可以处理的日志条目数量,并相应地调整设置。

如果您希望减少日志清理时间,请考虑增加日志清理线程数量:

# ...
log.cleaner.threads=8
# ...

如果您遇到 100% 磁盘带宽使用情况的问题,您可以限制日志清理 I/O,根据执行操作的磁盘功能,读写操作的总和小于指定的双值:

# ...
log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
# ...

12.7.1.9. 处理大消息大小

消息的默认批处理大小为 1MB,这是大多数用例中最大吞吐量的最佳选择。如果有足够的磁盘容量,Kafka 能够以更低的吞吐量来容纳较大的批处理。

处理较大的消息的大小有四种方式:

  1. 生产者侧消息压缩 将压缩的消息写入日志中。
  2. 基于参考的消息传递仅发送对消息值中某个其他系统中存储数据的引用。
  3. 内联消息传递将消息分割为使用相同键的块,然后通过 Kafka Streams 等流处理器将其组合在输出上。
  4. 为处理更大消息大小而构建的代理和制作者/消费者客户端应用配置.

建议使用基于参考的消息和消息压缩选项,并覆盖大多数情况。对于这些选项,必须小心谨慎以避免引入性能问题。

生产者侧压缩

对于制作者配置,您可以指定一个 compression.type (如 Gzip),然后应用到生产者生成的批量数据。使用代理配置 compression.type=producer 时,代理会保留制作者使用的任何压缩。每当生产者和主题压缩不匹配时,代理必须在将批处理附加到日志中,这会影响代理性能。

压缩还会增加制作者的额外处理开销和消费者的解压缩开销,但在批量中包含更多数据,因此在消息数据压缩不错时,通常对吞吐量很有用。

将制作者侧压缩与批处理大小的微调相结合,促进最佳吞吐量。使用指标有助于测量所需的平均批处理大小。

基于参考的消息传递

当您不知道消息的大程度时,基于参考的消息传递对于数据复制非常有用。外部数据存储必须快速、持久且高度可用,此配置才能发挥作用。将数据写入数据存储,并返回对数据的引用。制作者发送一条包含对 Kafka 引用的消息。使用者从消息中获取引用,并使用它来从数据存储中获取数据。

图 12.4. 基于参考的消息传递流

基于参考的消息传递流的镜像

由于消息传递需要更多行程,端到端延迟会增加。这种方法的另一个显著缺点是,在清理 Kafka 消息时,没有自动清理外部系统中的数据。混合方法是仅向数据存储直接发送大型消息,并直接处理标准大小的消息。

内联消息传递

内联消息传递很复杂,但开销不会依赖于基于参考的消息传递等外部系统。

如果消息太大,产生客户端应用必须按顺序进行序列化,然后对数据进行块。然后,制作者使用 Kafka ByteArraySerializer 或类似的方法在发送前再次对每个块进行序列化。使用者跟踪消息和缓冲块,直到消息完整消息为止。消耗的客户端应用程序接收区块,这些区块在反序列化之前组装。根据每组区块消息的第一块或最后一个块的偏移,将完整消息传送到正在使用的应用的其余部分。针对偏移元数据检查成功发送完整消息,以避免在重新平衡期间重复。

图 12.5. 内联消息传递流

内联消息传递流的镜像

由于需要缓冲,内联消息传递在消费者一侧具有性能开销,特别是在并行处理一系列大型消息时。大型消息的块可能会相互交集,因此,如果缓冲区中另一条大消息的块不完整,则始终无法在消息的所有块被使用时提交。因此,缓冲区通常由持久消息区块或实施提交逻辑来支持。

配置以处理较大的信息

如果无法避免较大的消息,并且为了避免在消息流的任意点上阻止,您可以增加消息限值。为此,可在主题级别上配置 message.max.bytes,为各个主题设置最大记录批处理大小。如果您在代理级别设置了 message.max.bytes,则允许所有主题显示较大的信息。

代理将拒绝任何大于使用 message. max.bytes 设置的限制值的消息。生产者(max.request.size)和使用者(message.max.bytes)的缓冲区大小必须能够容纳较大的消息。

12.7.1.10. 控制消息数据的日志清除

日志刷新属性控制缓存的消息数据的定期写入磁盘。调度程序以毫秒为单位指定日志缓存中的检查频率:

# ...
log.flush.scheduler.interval.ms=2000
# ...

您可以根据信息保存内存的最大时间以及日志中的信息在写入磁盘前的最大信息数来控制刷新的频率:

# ...
log.flush.interval.ms=50000
log.flush.interval.messages=100000
# ...

清空之间的等待包括检查的时间和执行清除前的指定间隔。增加清空的频率可能会影响吞吐量。

通常,建议不要设置明确的清空阈值,并让操作系统使用其默认设置执行后台刷新。分区复制比写入单个磁盘的数据持久性要高,因为故障代理可以从其同步副本中恢复。

如果您使用应用程序清空管理,如果使用更快的磁盘,则可能最好设置较低的清空阈值。

12.7.1.11. 对可用性进行分区重新平衡

分区可以在代理之间复制,以实现容错。对于给定的分区,一个代理被选为领导,并处理所有生成的请求(写入到日志)。在出现领导故障时,其他代理的分区跟踪者复制分区领导者的分区数据,以实现数据可靠性。

跟随者通常不为客户端服务,虽然 机架 配置 允许使用者在 Kafka 集群跨越多个数据中心时消耗来自最接近的副本的消息。跟随者只从分区领导机复制消息,并在领导机失败时允许恢复。恢复需要一个同步跟踪器。跟随者通过向领导发送获取请求来保持同步,后者按顺序向跟随者返回消息。如果跟踪者发现了领导上最近提交的邮件,则被视为同步。领导机通过查看跟随者请求的最后偏移来检查这一点。除非允许未清理的领导选举,否则无法同步跟进者作为当前的领导机 故障。

您可以调整跟随者被视为不同步前的滞后时间:

# ...
replica.lag.time.max.ms=30000
# ...

滞后时间为将消息复制到所有同步副本的时间施加了上限,以及生产者必须等待确认的时间。如果跟踪器无法发出获取请求并在指定滞后时间内跟上最新的消息,则会从同步副本中删除它。您可以缩短检测失败副本的滞后时间,但这样做可能会增加无用同步的后续者的数量。正确的滞后时间值取决于网络延迟和代理磁盘带宽。

当领导分区不再可用时,会选择同步内的副本之一作为新领导。分区副本列表中的第一个代理称为 首选 的领导。默认情况下,Kafka 会根据对领导发行的定期检查来为自动分区领导重新平衡启用 Kafka。也就是说,Kafka 检查首选的领导是否是 当前的 领导机。重新平衡可确保领先者在代理和代理之间均匀分布,不会过载。

您可以使用 AMQ Streams 的 Cruise Control 来找出副本分配,以便代理在整个集群中平均平衡负载。其计算考虑了领导者和追随者所经历的不同负载。失败的领导机会影响 Kafka 集群的平衡,因为剩余的代理会获得前导额外分区的额外工作。

要使 Cruise Control 发现的分配真正达到平衡,分区必须要由首选领导领导。Kafka 可以自动确保使用首选的领导机(在可能的情况下),根据需要更改当前的领导机。这样可确保集群保持 Cruise Control 找到的均衡状态。

您可以在触发重新平衡前,以秒为单位控制重新平衡检查的频率和代理允许的最大发生率。

#...
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
#...

代理的领导地位百分比是代理作为当前领导者的分区数量与首选领导者分区数量之间的比例。您可以将百分比设置为零,以确保始终选中首选领导者,假设他们同步。

如果重新平衡的检查需要更多控制,您可以禁用自动重新平衡。然后,您可以选择何时使用 kafka-leader-election.sh 命令行工具触发重新平衡。

注意

随 AMQ Streams 提供的 Grafana 仪表板显示没有活跃领导的复制分区和分区的指标。

12.7.1.12. 未清理领导选举机制

同步副本的领导选举被视为干净,因为它保证不会丢失数据。这是默认情况下会发生的情况。但是,如果没有可以承担领导地位的同步副本,情况会怎样?或许 ISR(同步副本)仅在领导磁盘终止时包含领导机。如果没有设置最少的同步副本数,且其硬盘驱动器出现故障时没有跟随分区领导器同步,则数据已经丢失。不仅 不能这种情况,而且不能选择新的领导,因为不存在同步的追随者。

您可以配置 Kafka 如何处理领导故障:

# ...
unclean.leader.election.enable=false
# ...

Unclean leader 选举默认被禁用,这意味着不同步的副本不能成为领导。使用干净的领导选举机制时,如果没有其他代理在旧领导丢失时处于 ISR 中,Kafka 会等待该领导者重新在线,然后再写入或读取消息。unclean leader 选举机制意味着不同步的副本可能会成为领导者,但您会面临丢失消息的风险。您选择的选择取决于您的需求是否有利于可用性或持久性。

您可以覆盖主题级别上特定主题的默认配置。如果无法承担数据丢失的风险,请保留默认配置。

12.7.1.13. 避免不必要的消费者组重新平衡

对于加入新消费者组的消费者,您可以添加延迟,以避免不必要的重新平衡到代理:

# ...
group.initial.rebalance.delay.ms=3000
# ...

延迟是协调会等待会员加入的时间。延迟时间越长,所有成员越有可能加入并避免重新平衡。但是,这一延迟也会阻止该组在句点结束前被消耗掉。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.