3.6. 使用 delete 和 compact 策略管理 Kafka 日志
Kafka 依赖于日志来存储消息数据。日志由一系列片段组成,每个片段都与基于偏移和基于时间戳的索引相关联。新消息被写入 活跃 片段,永远不会被修改。当服务从消费者获取请求时,将读取片段。活跃的片段 会定期 变为只读,并创建一个新的活跃网段来替换它。每个代理的每个主题分区只能有一个活跃网段。保留旧的片段,直到它们有资格删除。
在代理级别配置决定了日志片段的最大大小(以字节为单位),以及活跃的片段前的时间(毫秒):
# ... log.segment.bytes=1073741824 log.roll.ms=604800000 # ...
这些设置可以使用 segment.bytes
和 segment.ms
在主题级别上覆盖。降低或引发这些值的选择取决于删除网段的策略。较大的大小表示活跃片段包含更多信息,且会更频繁地推出。片段还有资格更频繁地删除。
在 Kafka 中,日志清理策略决定如何管理日志数据。在大多数情况下,您不需要更改集群级别的默认配置,它指定了 删除
清理策略,并启用 紧凑
清理策略使用的日志清理:
# ... log.cleanup.policy=delete log.cleaner.enable=true # ...
- 删除清理策略
- 删除清理策略是所有主题的默认集群范围的策略。该策略应用于没有配置特定主题级别策略的主题。Kafka 根据基于时间的日志保留限制删除旧的片段。
- 紧凑清理策略
-
紧凑清理策略通常被配置为主题级策略(
cleanup.policy=compact
)。Kafka 的日志清理程序对特定主题应用压缩,只保留主题中键的最新值。您还可以将主题配置为使用这两个策略(cleanup.policy=compact,delete
)。
为删除策略设置保留限制
删除清理策略与管理带有数据保留的日志相对应。当不需要永久保留数据时,该策略是合适的。您可以建立基于时间或基于大小的日志保留和清理策略,以保持绑定的日志。
当使用日志保留策略时,当达到保留限制时,非活跃日志片段会被删除。删除旧片段有助于防止超过磁盘容量。
对于基于时间的日志保留,您可以根据小时、分钟或毫秒设置保留周期:
# ... log.retention.ms=1680000 # ...
保留周期基于消息附加到段中的时间。Kafka 使用片段中最新消息的时间戳来确定该片段是否已过期。毫秒配置的优先级超过分钟,其优先级高于小时。默认情况下,分钟和毫秒配置是 null,但三个选项提供了对您要保留数据的大量控制级别。首选项应提供给毫秒配置,因为它是唯一可以动态更新的三个属性之一。
如果 log.retention.ms
设置为 -1,则不会将时间限制应用到日志保留,并且保留所有日志。但是,通常不建议此设置,因为它可能会导致出现难以重新排序的完整磁盘的问题。
对于基于大小的日志保留,您可以指定最小日志大小(以字节为单位):
# ... log.retention.bytes=1073741824 # ...
这意味着 Kafka 将确保至少有指定数量的日志数据可用。
例如,如果您将 log.retention.bytes
设置为 1000,并将 log.segment.bytes
设为 300,则 Kafka 将保留 4 个片段加活跃片段,确保至少有 1000 字节可用。当活跃片段已满并创建新片段时,会删除最旧的网段。此时,磁盘的大小可能会超过指定的 1000 字节,可能范围介于 1200 到 1500 字节(不包括索引文件)。
使用日志大小的潜在问题是,它不会考虑时间信息被附加到段中。您可以对清理策略使用基于时间和大小的日志保留,以获得您需要的平衡。首先达到哪个阈值会触发清理。
要在从系统中删除分段文件前添加时间延迟,您可以在所有主题的代理级别使用 log.segment.delete.delay.ms
:
# ... log.segment.delete.delay.ms=60000 # ...
或在主题级别配置 file.delete.delay.ms
。
您可以设置检查日志进行清理的频率(以毫秒为单位):
# ... log.retention.check.interval.ms=300000 # ...
调整与日志保留设置相关的日志保留检查间隔。较小的保留大小可能需要更频繁地检查。清理的频率通常足以管理磁盘空间,但通常不会影响代理的性能。
使用紧凑策略保留最新消息
当您通过设置 cleanup.policy=compact
来为主题启用日志压缩时,Kafka 使用日志清理程序作为后台线程来执行压缩。紧凑策略确保保留每个消息键的最新消息,从而有效地清理旧版本的记录。当消息值可改变时,该策略是合适的,您想要保留最新的更新。
如果为日志压缩设置了清理策略,则日志 头 作为标准 Kafka 日志运行,并按顺序写入新消息。在紧凑的日志的 尾部 中,日志清理程序运行,如果稍后日志中发生具有相同键的另一个记录,则会删除记录。具有 null 值的消息也会被删除。要使用压缩,您必须有识别相关消息的密钥,因为 Kafka 保证保留每个密钥的最新信息,但它不能保证整个紧凑的日志不会包含重复。
图 3.1. 在压缩前显示带有偏移位置的键值写入的日志
使用键识别信息,Kafka 压缩会保留日志尾部针对特定消息键中存在的最新消息(具有最高偏移),最终丢弃具有相同键的早期消息。其最新状态的消息始终可用,在日志清理程序运行时最终会删除该特定消息的任何过时的记录。您可以将消息恢复到以前的状态。即使周围记录被删除,记录也会保留其原始偏移量。因此,tail 可以具有非连续偏移。当消耗在 tail 中可用偏移时,会发现带有下一个偏移的记录。
图 3.2. 压缩后日志
如果适当,您可以在压缩过程中添加一个延迟:
# ... log.cleaner.delete.retention.ms=86400000 # ...
删除的数据保留周期提供了在数据被不可避免删除前注意到数据的时间。
要删除与特定密钥相关的所有消息,制作者可以发送 tombstone 消息。tombstone 有一个 null 值,并作为标记来通知消费者删除该键的对应消息。一段时间后,只会保留 tombstone 标记。假设新消息继续进入,标记会保留在 log.cleaner.delete.retention.ms
指定的持续时间内,以便消费者有足够的时间识别删除。
如果没有要清理的日志,您还可以设置时间(以毫秒为单位),以将清理器放在待机中:
# ... log.cleaner.backoff.ms=15000 # ...
使用组合紧凑和删除策略
如果您只选择紧凑策略,您的日志仍然可以变为任意的。在这种情况下,您可以将主题的清理策略设置为紧凑和删除日志。Kafka 应用日志压缩,删除旧版本的记录,并只保留每个密钥的最新版本。Kafka 还根据指定的基于时间的日志保留设置或基于大小的日志保留设置删除记录。
例如,在下图中,特定消息键的最新消息(具有最高偏移量)被保留到压缩点。如果保留点上有任何记录,则它们会被删除。在这种情况下,压缩过程将删除所有重复。
图 3.3. 日志保留点和压缩点