第 6 章 管理 Kafka


使用额外的配置属性来维护 AMQ Streams 的部署。您可以添加和调整设置,以响应 AMQ Streams 的性能。例如,您可以引入其他配置来提高吞吐量和数据可靠性。

6.1. 调整 Kafka 配置

使用配置属性优化 Kafka 代理、生产者和使用者的性能。

需要最小的配置属性集合,但您可以添加或调整属性以更改生产者和使用者如何与 Kafka 代理交互。例如,您可以调整消息的延迟和吞吐量,以便客户端能够实时响应数据。

首先,您可以分析指标来测量进行初始配置的位置,然后逐步更改并进一步比较指标,直到您拥有所需的配置。

其它资源

6.1.1. Kafka 代理配置调整

使用配置属性优化 Kafka 代理的性能。您可以使用标准 Kafka 代理配置选项,但由 AMQ Streams 管理的属性除外。

6.1.1.1. 基本代理配置

基本配置将包括以下属性来识别代理并提供安全访问:

  • broker.id 是 Kafka 代理的 ID
  • log.dirs 是日志数据的目录
  • zookeeper.connect 是用于将 Kafka 与 ZooKeeper 连接的配置
  • listener 向客户端公开 Kafka 集群
  • authorization 机制允许或拒绝用户执行的操作
  • authentication 机制可证明需要访问 Kafka 的用户的身份

您可以在 配置 Kafka 中找到更多有关基本配置选项的详细信息。

典型的代理配置还包括与主题、线程和日志相关的属性设置。

基本代理配置属性

# ...
num.partitions=1
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
num.network.threads=3
num.io.threads=8
num.recovery.threads.per.data.dir=1
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
group.initial.rebalance.delay.ms=0
zookeeper.connection.timeout.ms=6000
# ...

6.1.1.2. 复制高可用性主题

基本主题属性为主题设置默认分区和复制因子,它们将应用到没有明确设置这些属性的主题,包括在自动创建主题时。

# ...
num.partitions=1
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
replica.fetch.max.bytes=1048576
# ...

auto.create.topics.enable 属性默认启用,以便生产者和消费者需要时自动创建不存在的主题。如果您使用自动创建主题,可以使用 num.partitions 为主题设置默认分区数。但通常禁用此属性,以便通过显式创建主题来提供对主题的更多控制

对于高可用性环境,建议将主题的复制因素增加到至少 3,并将所需同步副本的最小数量设置为比复制因素少 1 个。

对于 数据持久性,您应该在主题配置中设置 min.insync.replicas,并在制作者配置中使用 acks=all 设置消息发送确认。

使用 replica.fetch.max.bytes 设置复制领导分区的每个后续程序获取的信息的最大大小(以字节为单位)。根据平均消息大小和吞吐量更改此值。在考虑读/写缓冲所需的总内存分配时,可用内存还必须能够适应所有跟随者乘以时的最大复制消息大小。大小必须大于 message.max.bytes,以便复制所有信息。

delete.topic.enable 属性默认启用,以允许删除主题。在生产环境中,您应该禁用此属性以避免意外删除主题,从而导致数据丢失。但是,您可以临时启用它并删除主题,然后再次禁用它。

# ...
auto.create.topics.enable=false
delete.topic.enable=true
# ...

6.1.1.3. 事务和提交的内部主题设置

如果您使用事务来 启用从生产者到分区的原子写入,事务状态会存储在内部 __transaction_state 主题中。默认情况下,代理被配置为有 3 个复制因素,这个主题最少需要 2 个 in-sync 副本,这意味着 Kafka 集群中至少需要三个代理。

# ...
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ...

同样,存储消费者状态的内部 __consumer_offsets 主题对分区数量和复制因素具有默认设置。

# ...
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
# ...

不要在生产环境中减少这些设置。您可以 在生产环境中 增加设置。作为例外,您可能要减少单broker 测试 环境中的设置。

6.1.1.4. 通过增加 I/O 线程改进请求处理吞吐量

网络线程处理对 Kafka 集群的请求,如从客户端应用程序生成和获取请求。将生成请求放入请求队列中。将响应放入响应队列中。

网络线程数量应当反映复制因素以及与 Kafka 集群交互的客户端生产者和消费者的活动级别。如果您要有大量请求,您可以增加线程数量,使用时间线程闲置来确定何时添加更多线程。

要减少拥塞并规范请求流量,您可以在阻止网络线程前限制请求队列中允许的请求数。

I/O 线程从请求队列获取请求来处理它们。添加更多线程可提高吞吐量,但 CPU 内核数和磁盘带宽会施加一个实际的上限。至少,I/O 线程数量应当等于存储卷的数量。

# ...
num.network.threads=3 1
queued.max.requests=500 2
num.io.threads=8 3
num.recovery.threads.per.data.dir=1 4
# ...
1
Kafka 集群的网络线程数量。
2
请求队列中允许的请求数。
3
Kafka 代理的 I/O 线程数量。
4
用于在启动时加载日志的线程数,在关机时用于清除日志。

所有代理的线程池的配置更新可能会在集群级别动态进行。这些更新限制为当前大小的一半和当前大小的两倍。

注意

Kafka 代理指标可帮助确定所需的线程数量。例如,平均时间网络线程闲置的指标(kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent)表示使用的资源百分比。如果有 0% 的空闲时间,则所有资源都在使用,这意味着添加更多线程可能很有用。

如果线程因为磁盘数量慢或受限,您可以尝试增加网络请求的缓冲大小以提高吞吐量:

# ...
replica.socket.receive.buffer.bytes=65536
# ...

另外,增加 bytes Kafka 可接收的最大字节数:

# ...
socket.request.max.bytes=104857600
# ...

6.1.1.5. 为高延迟连接增加带宽

Kafka 批量数据,以便在从 Kafka 到客户端的高延迟连接中实现合理的吞吐量,如数据中心之间的连接。但是,如果存在高延迟问题,您可以增加用于发送和接收消息的缓冲区的大小。

# ...
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# ...

您可以使用 带宽延迟(以字节/秒为单位)估算缓冲区的最大带宽,以估算保持最大 吞吐量所需的缓冲区的大小(以字节/为单位)。

6.1.1.6. 使用数据保留策略管理日志

Kafka 使用日志来存储消息数据。日志是与各种索引关联的一系列片段。新消息写入 活动 段,随后从不修改。服务片段从消费者获取请求时读取。活动片段定期被 滚动 为只读,并创建一个新的活跃网段来替换它。次只有一个部分处于活动状态。旧片段会保留,直到它们符合删除条件。

代理级别的配置设置日志片段的最大大小,以及推出活跃片段前以毫秒为单位的最大时间:

# ...
log.segment.bytes=1073741824
log.roll.ms=604800000
# ...

您可以使用 segment.bytessegment.ms 在主题级别覆盖这些设置。您是否需要降低还是提升这些值取决于片段删除策略。较大的大小意味着活跃片段包含更多信息,且不频繁地推出。段也不再频繁地符合删除条件。

您可以设置基于时间或基于大小的日志保留和清理策略,以便保持日志可被管理。根据您的要求,您可以使用日志保留配置来删除旧片段。如果使用日志保留策略,在达到保留限制时会删除非活跃日志片段。删除旧片段会绑定日志所需的存储空间,因此您不会超过磁盘容量。

对于基于时间的日志保留,您需要根据小时、分钟和毫秒设置保留期限。保留周期基于附加至网段的时间信息。

毫秒配置具有超过分钟的优先级,其优先级高于小时。默认情况下,分钟和毫秒配置为空,但这三个选项提供了对您要保留的数据的大量控制。应优先使用毫秒配置,因为它是唯一可动态更新的三个属性之一。

# ...
log.retention.ms=1680000
# ...

如果 log.retention.ms 设置为 -1,则不会对日志保留应用任何时间限制,因此所有日志都会保留。磁盘使用量应始终受到监控,但通常不建议 -1 设置,因为它可能会导致完整磁盘出现问题,而这可能难以修复。

对于基于大小的日志保留,您需要以字节为单位设置最大日志大小(日志中所有片段):

# ...
log.retention.bytes=1073741824
# ...

换句话说,当日志达到稳定状态后,通常大约有 log.retention.bytes/log.segment.bytes 片段。达到最大日志大小时,会删除旧的网段。

使用最大日志大小时潜在的问题在于它没有考虑消息附加到片段中的时间。您可以将基于时间和大小的日志保留用于清理策略,以获得所需的平衡。达到阈值时首先触发清理。

如果要在从系统中删除片段文件前添加时间延迟,您可以为代理级别的所有主题使用 log.segment.delete.delay.msfile.delete.delay.ms 为主题配置中的特定主题添加延迟。

# ...
log.segment.delete.delay.ms=60000
# ...

6.1.1.7. 使用清理策略删除日志数据

删除较旧日志数据的方法由 日志清理 配置决定。

默认情况下,为代理启用日志清理功能:

# ...
log.cleaner.enable=true
# ...

您可以在主题或代理级别设置清理策略。代理级配置是尚未设置策略的主题的默认设置。

您可以将策略设置为删除日志、紧凑日志,或同时执行这两者:

# ...
log.cleanup.policy=compact,delete
# ...

delete 策略与使用数据保留策略管理日志对应。当不需要永久保留数据时,它非常适合。compact 策略保证为每个消息键保留最新的消息。日志紧凑适合更改消息值,您想要保留最新的更新。

如果将清理策略设置为删除日志,则会根据日志保留限制删除旧的片段。否则,如果没有启用日志清理功能,且没有日志保留限制,日志将继续增加。

如果为日志紧凑设置了清理策略,日志 的头 将作为一个标准 Kafka 日志运行,按顺序为新消息写入操作。在紧凑日志的 尾部 (日志清理操作),如果日志中稍后出现具有相同密钥的另一条记录,则将删除记录。带有 null 值的消息也会被删除。如果不使用键,就无法使用紧凑,因为需要键来识别相关消息。虽然 Kafka 保证每个密钥的最新消息将被保留,但它不能保证整个紧凑的日志不会包含重复项。

图 6.1. 在压缩前以偏移位置显示键值写入的日志

Image of compaction showing key value writes

使用密钥来识别信息,Kafka 紧凑会保留特定消息键的最新消息(带有最高偏移值),最终丢弃之前具有相同密钥的消息。换句话说,其最新状态的消息始终可用,当日志安全运行时,该特定消息的任何过期记录最终都会被删除。您可以将消息恢复到以前的状态。

即使记录被删除,记录也会保留原始偏移。因此,尾部可能有非连续偏移。当消耗在尾部中不再可用的偏移时,会找到具有下一个高偏移量的记录。

图 6.2. 紧凑后日志

Image of compaction after log cleanup

如果您只选择紧凑的策略,您的日志仍然可以变得任意大。在这种情况下,您可以将策略设置为压缩 和删除 日志。如果您选择压缩和删除,则首先压缩日志数据,从而删除日志头带有键的记录。之后,在删除日志保留阈值之前发布的数据。

图 6.3. 日志保留点和压缩点

Image of compaction with retention point

您需要设置以毫秒为单位检查清理的频率:

# ...
log.retention.check.interval.ms=300000
# ...

调整与日志保留设置相关的日志保留检查间隔。较小的保留大小可能需要更频繁地检查。

清理的频率应该足以管理磁盘空间,但经常影响某个主题的性能。

您还可以以毫秒为单位设置一个时间,以便在没有要清理的日志时将清理干净到备用中:

# ...
log.cleaner.backoff.ms=15000
# ...

如果您选择删除旧的日志数据,您可以在清除数据前设置一个毫秒来保留删除的数据:

# ...
log.cleaner.delete.retention.ms=86400000
# ...

删除的数据保留期限给予了时间,可以注意到数据在不可删除之前已丢失。

若要删除与特定密钥相关的所有消息,制作者可以发送 tomb stone 消息。am bstone 具有 null 值,充当标记来告知消费者删除其值。紧凑后,只保留了 tombstone,它必须持续足够长的时间,使消费者能够知道该消息被删除。删除旧消息后,没有值,也会从分区中删除 tombstone 密钥。

6.1.1.8. 管理磁盘使用率

还有许多其他与日志清理相关的配置设置,但特别重要的是内存分配。

deduplication 属性指定在所有日志清理线程中清理的总内存。您可以设置通过缓冲区负载因数使用的内存百分比的上限。

# ...
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.io.buffer.load.factor=0.9
# ...

每个日志条目都使用正好 24 字节,因此您可以计算一次缓冲区可以处理的日志条目数量,并相应地调整设置。

如果您希望减少日志清理时间,请考虑增加日志清理线程数量:

# ...
log.cleaner.threads=8
# ...

如果您遇到 100% 磁盘带宽使用情况的问题,您可以限制日志清理 I/O,根据执行操作的磁盘功能,读写操作的总和小于指定的双值:

# ...
log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
# ...

6.1.1.9. 处理大消息大小

消息的默认批处理大小为 1MB,这是大多数用例中最大吞吐量的最佳选择。如果有足够的磁盘容量,Kafka 能够以更低的吞吐量来容纳较大的批处理。

处理较大的消息的大小有四种方式:

  1. 生产者侧消息压缩 将压缩的消息写入日志中。
  2. 基于参考的消息传递仅发送对消息值中某个其他系统中存储数据的引用。
  3. 内联消息传递将消息分割为使用相同键的块,然后通过 Kafka Streams 等流处理器将其组合在输出上。
  4. 为处理更大消息大小而构建的代理和制作者/消费者客户端应用配置.

建议使用基于参考的消息和消息压缩选项,并覆盖大多数情况。对于这些选项,必须小心谨慎以避免引入性能问题。

生产者侧压缩

对于制作者配置,您可以指定一个 compression.type,如 Gzip,然后应用到生产者生成的数据的批处理。使用代理配置 compression.type=producer,代理会保留使用的任何压缩程序。每当生产者和主题压缩不匹配时,代理必须在将批处理附加到日志中,这会影响代理性能。

压缩还会增加制作者的额外处理开销和消费者的解压缩开销,但在批量中包含更多数据,因此在消息数据压缩不错时,通常对吞吐量很有用。

将制作者侧压缩与批处理大小的微调相结合,促进最佳吞吐量。使用指标有助于测量所需的平均批处理大小。

基于参考的消息传递

当您不知道消息的大程度时,基于参考的消息传递对于数据复制非常有用。外部数据存储必须快速、持久且高度可用,此配置才能发挥作用。将数据写入数据存储,并返回对数据的引用。制作者发送一条包含对 Kafka 引用的消息。使用者从消息中获取引用,并使用它来从数据存储中获取数据。

图 6.4. 基于参考的消息传递流

Image of reference-based messaging flow

由于消息传递需要更多行程,端到端延迟会增加。这种方法的另一个显著缺点是,在清理 Kafka 消息时,没有自动清理外部系统中的数据。混合方法是仅向数据存储直接发送大型消息,并直接处理标准大小的消息。

内联消息传递

内联消息传递很复杂,但开销不会依赖于基于参考的消息传递等外部系统。

如果消息太大,产生客户端应用必须按顺序进行序列化,然后对数据进行块。然后,制作者使用 Kafka ByteArraySerializer 或者类似地再次对每个块进行序列化,然后再发送它。使用者跟踪消息和缓冲块,直到消息完整消息为止。消耗的客户端应用程序接收区块,这些区块在反序列化之前组装。根据每组区块消息的第一块或最后一个块的偏移,将完整消息传送到正在使用的应用的其余部分。针对偏移元数据检查成功发送完整消息,以避免在重新平衡期间重复。

图 6.5. 内联消息传递流

Image of inline messaging flow

由于需要缓冲,内联消息传递在消费者一侧具有性能开销,特别是在并行处理一系列大型消息时。大型消息的块可能会相互交集,因此,如果缓冲区中另一条大消息的块不完整,则始终无法在消息的所有块被使用时提交。因此,缓冲区通常由持久消息区块或实施提交逻辑来支持。

配置以处理较大的信息

如果无法避免较大的消息,并且为了避免在消息流的任意点上阻止,您可以增加消息限值。为此,请在主题级别配置 message.max.bytes,为各个主题设置最大记录批处理大小。如果您在代理级别设置了 message.max.bytes,则允许所有主题显示更大的信息。

代理将拒绝任何大于 message.max.bytes 设置的限制信息。生产者(max.request.size)和使用者(message.max.bytes)的缓冲大小必须能够容纳较大的信息。

6.1.1.10. 控制消息数据的日志清除

日志刷新属性控制缓存的消息数据的定期写入磁盘。调度程序以毫秒为单位指定日志缓存中的检查频率:

# ...
log.flush.scheduler.interval.ms=2000
# ...

您可以根据信息保存内存的最大时间以及日志中的信息在写入磁盘前的最大信息数来控制刷新的频率:

# ...
log.flush.interval.ms=50000
log.flush.interval.messages=100000
# ...

清空之间的等待包括检查的时间和执行清除前的指定间隔。增加清空的频率可能会影响吞吐量。

通常,建议不要设置明确的清空阈值,并让操作系统使用其默认设置执行后台刷新。分区复制比写入单个磁盘的数据持久性要高,因为故障代理可以从其同步副本中恢复。

如果您使用应用程序清空管理,如果使用更快的磁盘,则可能最好设置较低的清空阈值。

6.1.1.11. 对可用性进行分区重新平衡

分区可以在代理之间复制,以实现容错。对于给定的分区,一个代理被选为领导,并处理所有生成的请求(写入到日志)。在出现领导故障时,其他代理的分区跟踪者复制分区领导者的分区数据,以实现数据可靠性。

跟随者通常不为客户端服务,但 broker.rack 允许使用者在 Kafka 集群跨越多个数据中心时消耗来自最接近的副本的消息。跟随者只从分区领导机复制消息,并在领导机失败时允许恢复。恢复需要一个同步跟踪器。跟随者通过向领导发送获取请求来保持同步,后者按顺序向跟随者返回消息。如果跟踪者发现了领导上最近提交的邮件,则被视为同步。领导机通过查看跟随者请求的最后偏移来检查这一点。除非允许未清理的领导选举,否则无法同步跟进者作为当前的领导机 故障。

您可以调整跟随者被视为不同步前的滞后时间:

# ...
replica.lag.time.max.ms=30000
# ...

滞后时间为将消息复制到所有同步副本的时间施加了上限,以及生产者必须等待确认的时间。如果跟踪器无法发出获取请求并在指定滞后时间内跟上最新的消息,则会从同步副本中删除它。您可以缩短检测失败副本的滞后时间,但这样做可能会增加无用同步的后续者的数量。正确的滞后时间值取决于网络延迟和代理磁盘带宽。

当领导分区不再可用时,会选择同步内的副本之一作为新领导。分区副本列表中的第一个代理称为 首选 的领导。默认情况下,Kafka 会根据对领导发行的定期检查来为自动分区领导重新平衡启用 Kafka。也就是说,Kafka 检查首选的领导是否是 当前的 领导机。重新平衡可确保领先者在代理和代理之间均匀分布,不会过载。

您可以使用 AMQ Streams 的 Cruise Control 来找出副本分配,以便代理在整个集群中平均平衡负载。其计算考虑了领导者和追随者所经历的不同负载。失败的领导机会影响 Kafka 集群的平衡,因为剩余的代理会获得前导额外分区的额外工作。

要使 Cruise Control 发现的分配真正达到平衡,分区必须要由首选领导领导。Kafka 可以自动确保使用首选的领导机(在可能的情况下),根据需要更改当前的领导机。这样可确保集群保持 Cruise Control 找到的均衡状态。

您可以在触发重新平衡前,以秒为单位控制重新平衡检查的频率和代理允许的最大发生率。

#...
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
#...

代理的领导地位百分比是代理作为当前领导者的分区数量与首选领导者分区数量之间的比例。您可以将百分比设置为零,以确保始终选中首选领导者,假设他们同步。

如果重新平衡的检查需要更多控制,您可以禁用自动重新平衡。然后,您可以选择何时使用 kafka-leader-election.sh 命令行工具触发重新平衡。

注意

随 AMQ Streams 提供的 Grafana 仪表板显示没有活跃领导的复制分区和分区的指标。

6.1.1.12. 未清理领导选举机制

同步副本的领导选举被视为干净,因为它保证不会丢失数据。这是默认情况下会发生的情况。但是,如果没有可以承担领导地位的同步副本,情况会怎样?或许 ISR(同步副本)仅在领导磁盘终止时包含领导机。如果没有设置最少的同步副本数,且其硬盘驱动器出现故障时没有跟随分区领导器同步,则数据已经丢失。不仅 不能这种情况,而且不能选择新的领导,因为不存在同步的追随者。

您可以配置 Kafka 如何处理领导故障:

# ...
unclean.leader.election.enable=false
# ...

Unclean leader 选举默认被禁用,这意味着不同步的副本不能成为领导。使用干净的领导选举机制时,如果没有其他代理在旧领导丢失时处于 ISR 中,Kafka 会等待该领导者重新在线,然后再写入或读取消息。unclean leader 选举机制意味着不同步的副本可能会成为领导者,但您会面临丢失消息的风险。您选择的选择取决于您的需求是否有利于可用性或持久性。

您可以覆盖主题级别上特定主题的默认配置。如果无法承担数据丢失的风险,请保留默认配置。

6.1.1.13. 避免不必要的消费者组重新平衡

对于加入新消费者组的消费者,您可以添加延迟,以避免不必要的重新平衡到代理:

# ...
group.initial.rebalance.delay.ms=3000
# ...

延迟是协调会等待会员加入的时间。延迟时间越长,所有成员越有可能加入并避免重新平衡。但是,这一延迟也会阻止该组在句点结束前被消耗掉。

6.1.2. Kafka 生成器配置调整

使用基本制作者配置,以及为特定用例量身定制的可选属性。

调整配置以最大化吞吐量可能会增加延迟,反之亦然。您将需要试验并调优制作者配置,以获得所需的平衡。

6.1.2.1. 基本制作者配置

每个制作者都需要连接和序列化程序属性。通常而言,最好是添加客户端 ID 以进行跟踪,并对制作者使用压缩来减少请求中的批处理大小。

在基本制作者配置中:

  • 无法保证分区中消息的顺序。
  • 确认到达代理的消息不能保证持久性。

基本制作者配置属性

# ...
bootstrap.servers=localhost:9092 1
key.serializer=org.apache.kafka.common.serialization.StringSerializer 2
value.serializer=org.apache.kafka.common.serialization.StringSerializer 3
client.id=my-client 4
compression.type=gzip 5
# ...

1
(必需)告诉制作者使用 Kafka 代理的 host:port bootstrap 服务器地址连接到 Kafka 集群。制作者使用该地址来发现和连接集群中的所有代理。在服务器停机时使用逗号分隔列表来指定两个或三个地址,但不需要提供集群中所有代理的列表。
2
(必需)在将每条消息的密钥发送到代理前将其转换为字节。
3
(必需)在将每个消息发送到代理前将每条消息的值转换为字节。
4
(可选)客户端的逻辑名称,用于日志和指标来标识请求的来源。
5
(可选)压缩消息的编码器(发送并可能以压缩格式存储),然后在到达消费者时解压缩。压缩对于提高吞吐量和减少存储负载非常有用,但可能不适用于低延迟应用程序,因为压缩或解压成本可能过高。

6.1.2.2. 数据持久性

您可以使用消息发送确认,应用更大的数据持久性,以最大程度降低消息丢失的可能性。

# ...
acks=all 1
# ...
1
指定 acks=all 在确认消息请求已成功接收前,强制分区领导将信息复制到一定数量的跟随者。由于附加检查,acks=all 会增加制作者发送消息和接收确认之间的延迟。

在将确认发送到生产者之前,需要将信息附加到日志的代理数量由主题的 min.insync.replicas 配置决定。典型的起点是将主题复制因数为 3,其他代理上有两个内联副本。在这种配置中,如果单个代理不可用,生产者可以继续不受影响。如果第二个代理不可用,生产者将不会收到确认并且无法生成更多消息。

要支持的主题配置 acks=all

# ...
min.insync.replicas=2 1
# ...

1
使用同步副本 2。默认值为 1
注意

如果系统失败,则缓冲区中存在不正确的数据丢失的风险。

6.1.2.3. 订购交付

幂等制作者避免重复,因为消息只发送一次。为消息分配了 ID 和序列号,以确保传送顺序,即使出现故障也是如此。如果您使用 acks=all 以实现数据一致性,启用幂等性对有序交付有利。

使用幂等方式订购交付

# ...
enable.idempotence=true 1
max.in.flight.requests.per.connection=5 2
acks=all 3
retries=2147483647 4
# ...

1
设置为 true 以启用幂等生成器。
2
通过幂等发送,即时请求数可能大于 1,同时仍然提供消息排序保证。默认值为 5 个 in-flight 请求。
3
acks 设置为 all
4
设置重新发送失败消息请求的尝试次数。

如果您没有因为性能成本而使用 acks=all 和幂等性,请将提示内(未确认)请求数设置为 1 来保持排序。否则,只有在 Message- B 已写入代理后 Message- A 可能无法成功。

在没有幂等的情况下订购交付

# ...
enable.idempotence=false 1
max.in.flight.requests.per.connection=1 2
retries=2147483647
# ...

1
设置为 false 以禁用幂等生成器。
2
将 in-flight 请求数设置为正好 1

6.1.2.4. 可靠性保证

仅对写入单个分区一次,Idempotence 非常有用。事务处理与幂等性一起使用时,允许在多个分区间只写入一次。

事务可确保使用相同事务 ID 的消息只生成一次,并且将 所有 消息都成功写入到对应的日志中,或者其中任何消息 都不是.

# ...
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647
transactional.id=UNIQUE-ID 1
transaction.timeout.ms=900000 2
# ...
1
指定唯一的事务 ID。
2
在返回超时错误前,设置以毫秒为单位进行事务的最大允许时间。默认值为 900000 或 15 分钟。

选择 transactional.id 非常重要,以便保持事务保证。每个事务 id 都应该用于一组唯一的主题分区。例如,这可以通过外部映射主题分区名称到事务 id 来实现,或者通过使用避免冲突的功能计算主题分区名称中的事务 ID。

6.1.2.5. 优化吞吐量和延迟

通常,系统的要求是满足给定延迟内一定比例消息的特定吞吐量目标。例如,以每秒 500,000条消息为目标,95% 的消息会在 2 秒内得到确认。

生产者的消息传递语义(消息排序和持久性)很可能根据您的应用程序的要求进行定义。例如,您可以选择在不破坏某些重要属性的情况下使用 acks=0acks=1 或者应用程序提供保证。

Broker 重新启动对高百分比统计数据有显著影响。例如,在很长一段时间内,99 百分点延迟由围绕代理重启的行为占据主导地位。在设计基准测试时,或比较基准测试与生产中显示的性能数字时,需要考虑这一点。

根据您的目标,Kafka 提供了多个配置参数和技术来调节吞吐量和延迟的性能。

消息批处理(linger.msbatch.size
消息批处理会延迟发送消息,希望将发送更多目标为同一代理的消息,允许它们批处理到单个生成请求。批处理是在高吞吐量时返回的更高延迟之间的妥协。使用 linger.ms 配置基于时间的批处理,并使用 batch.size 配置基于大小的批处理。
压缩(compression.type)
消息压缩增加了制作者延迟(CPU 时间用于压缩消息),但会更小(可能进行磁盘写入),这可以提高吞吐量。压缩是否必要,以及要使用的最佳压缩程度取决于所发送的消息。压缩发生在调用 KafkaProducer.send() 的线程中,因此如果此方法的延迟与您需要使用更多线程的应用程序相关。
pipelining(max.in.flight.requests.per.connection)
pipelining 意味着在收到对上一个请求的响应前发送更多请求。通常,更多流水线意味着更好的吞吐量,最高是一个阈值,达到其他效果(例如更糟糕的批处理)开始消除对吞吐量的影响。

降低延迟

当您的应用程序调用 KafkaProducer.send() 时,信息为:

  • 由任何拦截器处理
  • serialized
  • 分配给分区
  • 已压缩
  • 添加到每个分区队列中的批量消息

send() 方法返回的时间点。send() 被阻塞的时间由以下方法决定:

  • 拦截器、序列程序和分区器花费的时间
  • 使用的压缩算法
  • 等待缓冲区用于压缩所需的时间

批处理将保留在队列中,直到出现以下情况之一:

  • 批处理已满(根据 batch.size
  • linger.ms 引入的延迟已过
  • 发送方即将向同一代理发送其他分区的消息批处理,也可以添加此批处理
  • 生产者被清空或关闭

查看批处理和缓冲的配置,以减轻 send() 阻塞对延迟的影响。

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger 属性添加了一个毫秒的延迟,以便在请求中累积和发送更大规模的消息。默认值为 0'.
2
如果使用最大 batch.size (以字节为单位),则会在达到最大值时发送请求,或者信息排队的时间超过 linger.ms (以较快者为准)。添加延迟可让批处理积累消息到批处理大小。
3
缓冲区的大小必须至少与批处理大小相同,并且能够适应缓冲区、压缩和内向请求。

增加吞吐量

通过调整消息传输和完成发送请求前等待的最长时间,提高消息请求的吞吐量。

您还可以通过编写自定义分区程序来替换默认分区,将消息定向到指定分区。

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
等待完整发送请求的最长时间,以毫秒为单位。您可以将值设置为 MAX_LONG 来委派给 Kafka 无限期重试次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。

6.1.3. Kafka 使用者配置调整

使用基本使用者配置,以及根据特定用例量身定制的可选属性。

调优您的消费者时,您的主要顾虑是确保它们能高效地应对被窃取的数据量。与制作者调优一样,准备好进行增量更改,直到消费者按预期工作。

6.1.3.1. 基本使用者配置

每个消费者都需要连接和反序列化器属性。通常,最好添加客户端 ID 进行跟踪。

在使用者配置中,无论后续配置如何:

  • 消费者从给定的偏移获取并按顺序使用消息,除非偏差被更改为跳过或重新读取消息。
  • 代理不知道消费者是否处理了响应,即使对 Kafka 提交偏移也是如此,因为偏移可能会发送到集群中的不同代理。

基本使用者配置属性

# ...
bootstrap.servers=localhost:9092 1
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  2
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer  3
client.id=my-client 4
group.id=my-group-id 5
# ...

1
(必需)告诉使用者使用 Kafka 代理的 host:port bootstrap 服务器地址连接到 Kafka 集群。使用者使用该地址来发现并连接到集群中的所有代理。在服务器停机时使用逗号分隔列表来指定两个或三个地址,但不需要提供集群中所有代理的列表。如果您使用负载均衡器服务公开 Kafka 集群,则只需要该服务的地址,因为负载均衡器处理其可用性。
2
(必需)Deserializer 将从 Kafka 代理获取的字节转换为消息密钥。
3
(必需)Deserializer 将从 Kafka 代理获取的字节转换为消息值。
4
(可选)客户端的逻辑名称,用于日志和指标来标识请求的来源。id 也可用于根据处理时间配额对消费者进行限流。
5
(条件) 用户需要 组 ID 才能加入消费者组。

6.1.3.2. 使用消费者组扩展数据消耗

消费者组共享一个由一个或多个生产者从给定主题生成的大型数据流。用户使用 group.id 属性分组,允许信息分散到成员中。组中的一个消费者被选为领导,并决定如何将该分区分配给组中的使用者。每个分区只能分配给一个消费者。

如果您还没有多少消费者作为分区,可以通过添加相同 group.id 的更多消费者实例来扩展数据消耗。将更多的消费者添加到组中,超过现有分区,但这意味着如果一个分区停止工作,就表示待机使用者处于待机状态。如果您能够以更少的消费者达到吞吐量目标,就可以节省资源。

同一消费者组中的消费者发送偏移提交和心跳到同一代理。因此组中的消费者数量越多,代理上的请求负载就越大。

# ...
group.id=my-group-id 1
# ...
1
使用组 ID 将使用者添加到消费者组中。

6.1.3.3. 消息排序保证

Kafka 代理从客户接收请求,要求代理从主题、分区和偏移位置列表中发送消息。

用户会按照提交至代理的顺序以单个分区中观察消息,这意味着 Kafka 仅在 单一分区中为消息提供排序保证。相反,如果消费者使用来自多个分区的消息,则使用者观察到的不同分区中消息的顺序不一定反映它们的发送顺序。

如果您需要严格排序一个主题的消息,请为每个使用者使用一个分区。

6.1.3.4. 优化吞吐量和延迟

控制客户端应用调用 KafkaConsumer.poll() 时返回的消息数量。

使用 fetch.max.wait.msfetch.min.bytes 属性增加消费者从 Kafka 代理获取的最小数据量。使用 fetch.max.wait.ms 配置基于时间的批处理,并使用 fetch.min.bytes 配置基于大小的批处理。

如果消费者或代理中的 CPU 使用率较高,则可能是因为消费者的请求太多。您可以调整 fetch.max.wait.msfetch.min.bytes 属性,以便在更大的批处理中发送的请求和信息较少。通过调整较高的吞吐量,可以降低延迟成本。如果产生的数据量较低,您也可以调整更高的值。

例如:如果您将 fetch.max.wait.ms 设置为 500ms,fetch.min.bytes 设置为 16384 字节,当 Kafka 收到来自消费者的获取请求时,它会在达到任一阈值的第一个阈值时响应。

相反,您可以降低 fetch.max.wait.msfetch.min.bytes 属性调整,以改进端到端延迟。

# ...
fetch.max.wait.ms=500 1
fetch.min.bytes=16384 2
# ...
1
代理在完成获取请求前将等待的最长时间,以毫秒为单位。默认值为 500 毫秒。
2
如果使用的最小批处理大小(以字节为单位),则会在达到最小值时发送请求,或者消息排队的时间超过 fetch.max.wait.ms (以较快者为准)。添加延迟可让批处理积累消息到批处理大小。

通过增大获取请求大小来降低延迟

使用 fetch.max.bytesmax.partition.fetch.bytes 属性增加使用者从 Kafka 代理获取的最大数据量。

fetch.max.bytes 属性设置一次从代理获取的数据量上限,以字节为单位。

max.partition.fetch.bytes 为每个分区返回的数据量以字节为单位设置的最大限制,必须始终大于代理或 max.message.bytes 主题配置中设置的字节数。

客户端可消耗的最大内存量计算如下:

NUMBER-OF-BROKERS * fetch.max.bytes and NUMBER-OF-PARTITIONS * max.partition.fetch.bytes

如果内存用量可以容纳它,您可以增加这两个属性的值。通过在每个请求中允许更多数据,可以提高延迟,因为获取请求的数量较少。

# ...
fetch.max.bytes=52428800 1
max.partition.fetch.bytes=1048576 2
# ...
1
为获取请求返回的最大数据量,以字节为单位。
2
每个分区返回的最大数据量,以字节为单位。

6.1.3.5. 在提交偏移时避免数据丢失或重复

Kafka 自动提交机制 允许使用者自动提交消息偏移。如果启用,消费者将以 5000ms 间隔提交从轮询代理收到的偏移。

自动提交机制很方便,但会带来数据丢失和复制风险。如果使用者已获取并转换了大量消息,但执行自动提交时,系统会对消费者缓冲区中已处理的消息崩溃,该数据将会丢失。如果系统在处理消息后崩溃,但在执行自动使用前,数据会在重新平衡后在另一个消费者实例上重复。

仅当在下一次轮询到代理或消费者关闭之前处理所有消息时,自动使用才可以避免数据丢失。

要最小化数据丢失或重复的可能性,您可以将 enable.auto.commit 设置为 false,并开发您的客户端应用程序,使其对提交的偏移拥有更多控制权。或者您可以使用 auto.commit.interval.ms 减少提交之间的间隔。

# ...
enable.auto.commit=false 1
# ...
1
自动提交设为 false,以提供对提交偏移的更多控制。

通过将 enable.auto.commit 设置为 false,您可以在执行了 所有 处理并且消息已被使用后提交偏移。例如,您可以将应用程序设置为调用 Kafka commitSynccommitAsync 提交 API。

commitSync API 在从轮询返回的消息批处理中提交偏移。完成批处理中的所有消息后,您将调用 API。如果您使用 commitSync API,在提交批处理中的最后一个偏移前,应用不会轮询新信息。如果这会对吞吐量造成负面影响,您可以降低提交的频率,也可以使用 commitAsync API。commitAsync API 不等待代理响应提交请求,但会在重新平衡时造成更多的重复。种常见的做法是将应用中的两个提交 API 与刚刚在关闭使用者或重新平衡前使用的 commitSync API 合并,以确保最终提交成功。

6.1.3.5.1. 控制事务性消息

考虑在制作者一侧使用事务 id 并启用幂等性(enable.idempotence=true)来保证准确交付。在消费者方面,您可以使用 isolation.level 属性控制消费者如何读取事务性消息。

isolation.level 属性有两个有效值:

  • read_committed
  • read_uncommitted (默认)

使用 read_committed 确保消费者只读取提交的事务信息。但是,这会导致端到端延迟增加,因为在代理写入了记录事务结果的事务标记(已承诺 或中止)之前,消费者将无法返回消息

# ...
enable.auto.commit=false
isolation.level=read_committed 1
# ...
1
设置为 read_committed,以便消费者只读取提交的信息。

6.1.3.6. 从失败中恢复,以避免数据丢失

使用 session.timeout.msheartbeat.interval.ms 属性配置检查并恢复消费者组中的消费者故障的时间。

session.timeout.ms 属性指定在被视为不活跃并在组中活跃消费者之间触发 重新平衡 前,消费者组中的最大时间(毫秒)可以无法与代理联系。当组重新平衡时,这些分区将重新分配给组的成员。

heartbeat.interval.ms 属性指定心跳与消费者组协调器之间的 心跳 之间的间隔,以指示消费者活跃并连接。heartbeat 间隔必须小于会话超时间隔,通常为第三个。

如果您将 session.timeout.ms 属性设置为较低,则之前检测到失败的使用者,重新平衡可以更快地进行。但是,请不要设置超时时间,以便代理无法及时收到心跳,并触发不必要的重新平衡。

减少心跳间隔降低了意外重新平衡的可能性,但更频繁的心跳会增加对代理资源的开销。

6.1.3.7. 管理偏移策略

使用 auto.offset.reset 属性控制当没有提交偏移时消费者的行为方式,或者提交偏移无效或删除。

假设您第一次部署使用者应用,并且它从现有主题读取消息。因为这是第一次使用 group.id,所以 __consumer_offsets 主题不包含这个应用程序的任何偏移信息。新应用可以从日志开始时开始处理所有现有消息,或者仅处理新消息。默认重置值为 latest,它从分区末尾开始,因此表示丢失了一些信息。要避免数据丢失,但增加处理量,将 auto.offset.reset 设置为 earliest 以从分区开头开始。

另请考虑使用 earliest 选项避免在为代理配置的偏移保留周期(offsets.retention.minutes)终止时丢失消息。如果消费者组或独立消费者不活跃,且在保留周期内未提交偏移,则之前提交的偏移会从 __consumer_offsets 中删除。

# ...
heartbeat.interval.ms=3000 1
session.timeout.ms=10000 2
auto.offset.reset=earliest 3
# ...
1
根据预期的重新平衡,调整心跳间隔越低。
2
如果 Kafka 代理在超时期限到期前未收到 heartbeat,则消费者会从消费者组中移除,并启动重新平衡。如果代理配置有 group.min.session.timeout.msgroup.max.session.timeout.ms,会话超时值必须在那个范围内。
3
设置为 earliest 以返回到分区的开头,并在未提交偏移时避免数据丢失。

如果单个获取请求中返回的数据量较大,则使用者处理数据之前可能会发生超时。在这种情况下,您可以降低 max.partition.fetch.bytes 或增加 session.timeout.ms

6.1.3.8. 最小化重新平衡的影响

在组中活跃使用者之间重新平衡分区是以下时间:

  • 消费者提交偏移
  • 要成立的新消费者组
  • 将分区分配给组成员的组领导
  • 组中的消费者接收其分配并开始获取

显然,这个过程会增加服务的停机时间,特别是在客户组群集滚动重启期间重复发生时。

在这种情况下,您可以使用 静态成员资格 的概念来减少重新平衡的数量。重新平衡使用者组成员之间均匀分配主题分区。静态成员资格使用持久性,以便在会话超时后在重启期间识别使用者实例。

用户组协调可以使用使用 group.instance.id 属性指定的唯一 ID 来识别新的消费者实例。在重启期间,会为消费者分配一个新成员 ID,但作为静态成员,它将继续使用相同的实例 ID,并分配相同的主题分区。

如果消费者应用程序没有调用来至少轮询每个 max.poll.interval.ms 毫秒,则消费者会被视为失败,从而导致重新平衡。如果应用无法及时处理轮询返回的所有记录,您可以使用 max.poll.interval.ms 属性为消费者的新消息指定轮询间隔,以避免重新平衡。或者您可以使用 max.poll.records 属性设置从消费者缓冲区返回的记录数上限,允许您的应用程序处理较少的 max.poll.interval.ms 限制中的记录。

# ...
group.instance.id=_UNIQUE-ID_ 1
max.poll.interval.ms=300000 2
max.poll.records=500 3
# ...
1
唯一的实例 ID 可确保新使用者实例接收相同的主题分区分配。
2
设置检查消费者是否继续处理消息的时间间隔。
3
设置从使用者返回的已处理记录数。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.