Kafka 配置调整
使开源包含更多
红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息。
第 1 章 Kafka 调整概述
使用配置属性优化 Kafka 代理、生产者和消费者的性能。您可以在 OCP 和 RHEL 上为 AMQ Streams 指定配置属性。
需要最小配置属性集合,但您可以添加或调整属性来更改生产者和用户如何与 Kafka 代理交互。例如,您可以调整消息的延迟和吞吐量,以便客户端实时响应数据。
您可以通过分析指标来衡量初始配置的位置,然后进行增量更改并进一步比较指标,直到您具有您需要的配置。
有关 Apache Kafka 配置属性的更多信息,请参阅 Apache Kafka 文档。
1.1. 映射属性和值
如何指定配置属性取决于部署的类型。如果您在 OCP 上部署了 AMQ Streams,您可以使用 Kafka
资源通过 config
属性为 Kafka 代理添加配置。在 RHEL 上使用 AMQ Streams,您将配置作为环境变量添加到属性文件中。
当您向自定义资源添加 config
属性时,您可以使用冒号(':')来映射属性和值。
自定义资源中的配置示例
num.partitions:1
当您将属性添加为环境变量时,您可以使用等号('=')来映射属性和值。
作为环境变量的示例配置
num.partitions=1
1.2. 有助于调整的工具
以下工具有助于 Kafka 调整:
- Cruise Control 生成优化建议,可用于评估和实施集群重新平衡
- Kafka 静态配额插件在代理上设置限制
- 机架配置将代理分区分散到机架中,并允许消费者从最接近的副本中获取数据
有关这些工具的更多信息,请参阅以下指南:
第 2 章 受管代理配置
在 OpenShift 上部署 AMQ Streams 时,您可以通过 Kafka
自定义资源的 config
属性来指定代理配置。但是,某些代理配置选项由 AMQ Streams 直接管理。
因此,如果您在 OpenShift 中使用 AMQ Streams,则无法配置以下选项:
-
broker.id
指定 Kafka 代理的 ID -
log.dirs
目录用于日志数据 -
zookeeper.connect
配置,使用 ZooKeeper 连接 Kafka -
将 Kafka 集群公开给客户端
的监听程序
-
允许或拒绝用户执行操作的
授权机制
-
证明需要访问 Kafka 的用户的身份的
验证机制
代理 ID 从 0 (零)开始,对应于代理副本数量。根据 Kafka
自定义资源中的 spec.kafka.storage
配置,日志目录被挂载到 /var/lib/kafka/data/kafka-logIDX
。IDX 是 Kafka 代理 pod 索引。
有关排除的列表,请参阅 KafkaClusterSpec
模式参考。
在 RHEL 上使用 AMQ Streams 时,这些排除并不适用。在这种情况下,您需要在基本代理配置中添加这些属性来识别代理并提供安全访问。
RHEL 上 AMQ Streams 的代理配置示例
# ... broker.id = 1 log.dirs = /var/lib/kafka zookeeper.connect = zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181 listeners = internal-1://:9092 authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer ssl.truststore.location = /path/to/truststore.jks ssl.truststore.password = 123456 ssl.client.auth = required # ...
第 3 章 Kafka 代理配置调整
使用配置属性优化 Kafka 代理的性能。您可以使用标准 Kafka 代理配置选项,但 AMQ Streams 直接管理的属性除外。
3.1. 基本代理配置
典型的代理配置将包括与主题、线程和日志相关的属性的设置。
基本代理配置属性
# ... num.partitions=1 default.replication.factor=3 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 num.network.threads=3 num.io.threads=8 num.recovery.threads.per.data.dir=1 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 group.initial.rebalance.delay.ms=0 zookeeper.connection.timeout.ms=6000 # ...
3.2. 为高可用性复制主题
基本主题属性设置主题的默认分区数和复制因素,这些主题将应用到在没有显式设置这些属性的情况下创建的主题,包括在自动创建主题时。
# ... num.partitions=1 auto.create.topics.enable=false default.replication.factor=3 min.insync.replicas=2 replica.fetch.max.bytes=1048576 # ...
对于高可用性环境,建议将复制因素增加到至少 3 个主题,并将最小同步副本的数量设置为比复制因素少 1 个。
auto.create.topics.enable
属性默认启用,以便在生产者和消费者需要时自动创建不存在的主题。如果您使用自动主题创建,您可以使用 num.partitions
设置主题的默认分区数。但是,这个属性会被禁用,以便在通过显式主题创建时对主题进行更多的控制。
为了实现数据持久性,需要在 topic 配置中设置 min.insync.replicas
,并在 producer 配置中使用 acks=all
来发送提交确认。
使用 replica.fetch.max.bytes
来设置复制领导分区的每个后续消息的最大大小(以字节为单位)。根据平均消息大小和吞吐量更改此值。当考虑读/写缓冲所需的内存分配总量时,可用内存还必须能够在与所有跟随者乘以最大复制的消息大小时容纳最大复制的消息大小。
delete.topic.enable
属性被默认启用,以允许删除主题。在生产环境中,您应该禁用此属性以避免意外删除主题,从而导致数据丢失。但是,您可以临时启用它并删除主题,然后再次禁用它。
在 OpenShift 上运行 AMQ Streams 时,主题 Operator 可以提供 operator 风格的主题管理。您可以使用 KafkaTopic
资源来创建主题。对于使用 KafkaTopic
资源创建的主题,复制因素是使用 spec.replicas
设置的。如果启用了 delete.topic.enable
,您还可以使用 KafkaTopic
资源删除主题。
# ... auto.create.topics.enable=false delete.topic.enable=true # ...
3.3. 事务和提交的内部主题设置
如果您使用事务 来启用从生产者对分区的原子写入,事务的状态将存储在内部 __transaction_state
主题中。默认情况下,代理使用复制因子(3)和最少 2 个同步副本进行配置,这意味着 Kafka 集群中至少需要三个代理。
# ... transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 # ...
类似地,内部 __consumer_offsets
主题存储消费者状态,具有分区和复制因子数量的默认设置。
# ... offsets.topic.num.partitions=50 offsets.topic.replication.factor=3 # ...
不要在生产环境中减少这些设置。在生产环境中,您可以提高设置。作为例外,您可能希望在单代理 测试 环境中减少设置。
3.4. 通过增加 I/O 线程改进请求处理吞吐量
网络线程处理对 Kafka 集群的请求,如生成并从客户端应用程序获取请求。生成请求放置在请求队列中。响应放置在响应队列中。
每个监听器的网络线程数量应该反映复制因素以及客户端制作者和与 Kafka 集群交互的用户的活动级别。如果您要有大量请求,您可以增加线程数量,使用时间线程数来确定何时添加更多线程。
要减少拥塞和规范请求流量,您可以限制请求队列中允许的请求数量。当请求队列已满时,所有传入的流量都会被阻断。
I/O 线程从请求队列获取请求来处理它们。添加更多线程可以提高吞吐量,但 CPU 内核和磁盘带宽的数量会带来实际的上限。至少,I/O 线程数量应该等于存储卷的数量。
# ... num.network.threads=3 1 queued.max.requests=500 2 num.io.threads=8 3 num.recovery.threads.per.data.dir=4 4 # ...
所有代理的线程池的配置更新可能会在集群级别动态发生。这些更新仅限于当前大小为一半和当前大小的两倍。
以下 Kafka 代理指标可帮助处理所需的线程数量:
-
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
在平均时间网络线程上提供指标作为百分比。 -
kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
提供平均时间 I/O 线程闲置的指标。
如果有 0% 空闲时间,则所有资源都使用,这意味着添加更多线程可能会很有用。当闲置时间低于 30% 时,性能可能会开始下降。
如果线程因为磁盘数量缓慢或限制,您可以尝试增加网络请求的缓冲区大小来提高吞吐量:
# ... replica.socket.receive.buffer.bytes=65536 # ...
另外,增加 Kafka 可以接收的最大字节数:
# ... socket.request.max.bytes=104857600 # ...
3.5. 增加高延迟连接的带宽
Kafka 批处理数据,通过从 Kafka 到客户端(如数据中心之间的连接)实现合理的吞吐量。但是,如果高延迟是问题,您可以增加用于发送和接收消息的缓冲区的大小。
# ... socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 # ...
您可以使用 bandwidth-delay 产品 计算来估算缓冲区的最佳大小,这将乘以链路的最大带宽(以字节为单位/秒),以提供保持最大吞吐量所需的大型缓冲区。
3.6. 使用数据保留策略管理日志
Kafka 使用日志来存储消息数据。日志是与各种索引关联的一系列片段。新消息被写入 active 片段,之后永远不会修改。在服务从消费者获取请求时读取片段。有时候,活跃段会滚动到变为只读,一个新的活跃段会被创建来替代它。次只有一个活跃段。旧段将被保留,直到它们有资格删除。
代理级别的配置设置日志片段的最大大小(以字节为单位),以及推出活跃段前的时间(毫秒):
# ... log.segment.bytes=1073741824 log.roll.ms=604800000 # ...
您可以使用 segment.bytes
和 segment.ms
在主题级别上覆盖这些设置。无论您需要降低还是提升这些值,都取决于删除片段的策略。较大的大小表示活跃片段包含更多消息,且更频繁地推出。段还具有更短的删除条件。
您可以设置基于时间的日志保留和清理策略,以便保留日志。根据您的要求,您可以使用日志保留配置来删除旧的片段。如果使用日志保留策略,则在达到保留限制时会删除非活跃日志片段。删除旧的片段会绑定日志所需的存储空间,这样您不会超过磁盘容量。
对于基于时间的日志保留,您可以根据小时、分钟和毫秒设置保留周期。保留周期基于将时间信息附加到片段中。
毫秒的配置具有超过分钟的时间,其优先级持续数小时。默认情况下,分钟和毫秒配置为空,但三个选项提供对您要保留的数据进行大量控制。首选应提供给毫秒配置,因为它是唯一可动态更新的三个属性之一。
# ... log.retention.ms=1680000 # ...
如果 log.retention.ms
设为 -1,则不会将时间限制应用到日志保留,因此所有日志都会被保留。应始终监控磁盘用量,但通常不建议 -1 设置,因为它可能会导致完整磁盘出现问题,这可能会难以对磁盘进行重新处理。
对于基于大小的日志保留,您可以以字节为单位设置最大日志大小(日志中的所有片段):
# ... log.retention.bytes=1073741824 # ...
换句话说,日志通常会有大约 log.retention.bytes/log.segment.bytes 片段,当它达到稳定状态。当达到最大日志大小时,会删除旧的片段。
使用最大日志大小的一个潜在问题是不会考虑将时间消息附加到段中。您可以对清理策略使用基于时间和大小的日志保留,以获得所需的平衡。达到哪个阈值首先触发清理。
如果要在从系统中删除段文件前添加时间延迟,您可以使用在代理级别或 file.delete.delay.ms
中特定主题的 log.segment.delete.delay.ms
来添加延迟。
# ... log.segment.delete.delay.ms=60000 # ...
3.7. 使用清理策略删除日志数据
删除旧日志数据的方法由日志 清理 配置决定。
默认情况下,代理启用了 log cleaner:
# ... log.cleaner.enable=true # ...
如果您使用日志压缩清理策略,则需要启用日志清理程序。您可以在主题或代理级别设置清理策略。代理级配置是没有设置策略的主题的默认设置。
您可以设置策略来删除日志、压缩日志或同时操作:
# ... log.cleanup.policy=compact,delete # ...
删除策略
与使用数据保留策略管理日志对应。当不需要永久保留数据时,它非常适合。紧凑
策略保证为每个消息键保留最新的消息。日志压缩适合消息值可更改的位置,并且您想要保留最新的更新。
如果将清理策略设置为删除日志,则根据日志保留限制删除旧的片段。否则,如果没有启用日志清理程序,且没有日志保留限制,日志将继续增长。
如果为日志压缩设置了 cleanup 策略,日志 头 作为标准 Kafka 日志运行,并按顺序写入新信息。在紧凑日志的尾部中,如果稍后在日志中发生具有相同键的记录,则日志清理将被删除。也会删除具有 null 值的消息。如果您不使用密钥,则无法使用压缩,因为需要使用密钥来识别相关信息。虽然 Kafka 确保保留每个密钥的最新信息,但它不能保证整个压缩日志不会包含重复。
图 3.1. 在压缩前显示使用偏移位置写入的键值日志

使用密钥识别信息,Kafka 压缩为特定消息键保留最新消息(具有最高偏移量),最终丢弃具有相同键的早期消息。换句话说,其最新状态的消息始终可用,在日志清理运行时最终会删除特定消息的最新记录。您可以将消息恢复回以前的状态。
即使删除周围的记录,记录也会保留其原始偏移值。因此,tail 可能会有非连续的偏移。当使用尾部中不再提供的偏移量时,会找到带有下一个高偏移的记录。
图 3.2. 压缩后的日志

如果您只选择紧凑策略,日志仍可能会变得非常大。在这种情况下,您可以将策略设置为紧凑和删除日志。如果您选择压缩和删除,则首先压缩日志数据,使用日志头中的键删除记录。之后,在日志保留阈值被删除前的数据。
图 3.3. 日志保留点和压缩点

您以毫秒为单位设置日志检查的频率:
# ... log.retention.check.interval.ms=300000 # ...
调整与日志保留设置相关的日志保留检查间隔。较小的保留大小可能需要更频繁的检查。
清理的频率应该足以管理磁盘空间,但通常它影响到主题的性能。
如果没有日志清理,您也可以以毫秒为单位设置一个时间,将清理设置为待机:
# ... log.cleaner.backoff.ms=15000 # ...
如果您选择删除旧的日志数据,您可以在清除前设置以毫秒为单位的周期来保留删除的数据:
# ... log.cleaner.delete.retention.ms=86400000 # ...
删除的数据保留周期提供了在删除数据前注意数据的时间。
若要删除与特定密钥相关的所有消息,生产者可以发送一个 tombstone 消息。tombstone 有一个 null 值,并充当一个标记来告知值已删除的消费者。压缩后,只会保留 tombstone,这必须足够长的时间才能使消费者知道消息被删除。删除旧的消息时,没有值,则 tombstone 键也会从分区中删除。
3.8. 管理磁盘利用率
还有很多与日志清理相关的其他配置设置,但特定的重要性是内存分配。
deduplication 属性指定在所有日志清理线程中清理的总内存。您可以对通过缓冲区负载因素使用的内存百分比设置上限。
# ... log.cleaner.dedupe.buffer.size=134217728 log.cleaner.io.buffer.load.factor=0.9 # ...
每个日志条目正好使用 24 字节,因此您可以在单个运行中处理缓冲区可以处理的日志条目,并相应地调整设置。
如果可能,如果您想减少日志清理时间,请考虑增加日志清理程序线程数量:
# ... log.cleaner.threads=8 # ...
如果您遇到 100% 磁盘带宽使用情况的问题,您可以节流日志清理 I/O,以便读/写操作的总和小于执行操作的磁盘功能的指定的双倍值:
# ... log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 # ...
3.9. 处理大型消息大小
消息的默认批处理大小为 1MB,这是大多数用例中的最大吞吐量的最佳选择。如果有足够的磁盘容量,Kafka 可以在较低的吞吐量上容纳更大的批处理。
以四种方式处理大型消息大小:
- 生产端消息压缩 将压缩消息写入日志中。
- 基于参考的消息传递仅向存储在消息值中的其他系统中的数据发送引用。
- 内联消息传递将信息拆分为使用相同键的块,然后使用 Kafka Streams 等流处理器将输出合并。
- 为处理更大消息大小而构建的代理和生产者/使用者应用程序配置。
建议使用基于参考的消息传递和消息压缩选项,并涵盖大多数情况。对于其中任何一个选项,必须小心以避免出现性能问题。
制作者压缩
对于制作者配置,您可以指定 compression.type
,如 Gzip,然后应用到制作者生成的批处理数据。使用代理配置 compression.type=producer
,代理会保留使用制作者的任何压缩。每当生成者和主题压缩不匹配时,代理必须在将批处理附加到日志中之前再次压缩,这会影响代理性能。
压缩还会增加消费者上的生产者的额外处理开销,但包括批处理中的更多数据,因此在消息数据压缩时通常很有用处。
将生成者压缩与批处理大小进行微调相结合,以促进最佳吞吐量。使用指标有助于衡量所需的平均批处理大小。
基于参考的消息传递
当您不知道消息的大容量时,基于参考的消息传递对数据复制非常有用。外部数据存储必须快速、持久且高度可用,才能使此配置正常工作。数据被写入数据存储,并返回对数据的引用。制作者发送一条消息,其中包含对 Kafka 的引用。使用者从消息获取引用,并使用它来从数据存储中获取数据。
图 3.4. 基于参考的消息传递流

当消息传递需要更多时,端到端延迟将会增加。这个方法的另一个显著缺陷是,在清理 Kafka 消息时,外部系统中没有自动清理数据。混合方法是仅向数据存储和进程标准消息发送大型消息。
内联消息传递
内联消息传递比较复杂,但它没有额外的开销,具体取决于基于参考的消息传递等外部系统。
生成客户端应用程序必须序列化,并在消息太大时对数据进行块。然后,制作者使用 Kafka ByteArraySerializer
或与发送每个块前再次序列化。消费者会跟踪消息和缓冲块,直到它有完整的消息。消耗的客户端应用程序接收块,这些块在反序列化前编译。完整的消息会传送到其余消耗应用程序,以根据每组块消息集合的第一个或最后一个块的偏移来。针对偏移元数据检查完整消息,以避免重新平衡期间重复。
图 3.5. 内联消息传递流

由于需要缓冲区,内联消息传递对消费者而言具有性能开销,特别是在并行处理一系列大型消息时。大消息的块可能会变得交错,因此如果缓冲区中另一个大消息的块不完整,并不总是能够提交消息的所有块。因此,缓冲通常是通过持久消息块或实施提交逻辑来支持的。
配置以处理更大消息
如果无法避免较大的消息,并避免在消息流的任何点上阻止,您可以提高消息限值。为此,请在主题级别上配置 message.max.bytes
,以设置各个主题的最大记录批处理大小。如果您在代理级别设置了 message.max.bytes
,则所有主题都允许较大的消息。
代理将拒绝任何大于使用 message.max.bytes
设置的限制的消息。生产者(max.request.size
)和使用者(message.max.bytes
)的缓冲区大小必须能够容纳较大的消息。
3.10. 控制消息数据的日志冲刷
通常,建议不要设置显式清空阈值,并让操作系统使用其默认设置执行后台刷新。分区复制比写入任何单个磁盘提供更大的数据持久性,因为失败的代理可以从其同步副本中恢复。
日志清除属性控制定期将缓存消息数据写入磁盘。调度程序以毫秒为单位指定日志缓存检查的频率:
# ... log.flush.scheduler.interval.ms=2000 # ...
您可以在写入磁盘前根据消息保存的最大时间以及日志中的最大信息数来控制 flush 的频率:
# ... log.flush.interval.ms=50000 log.flush.interval.messages=100000 # ...
刷新之间的等待包括执行清除前的检查和指定间隔的时间。增加清空的频率可能会影响吞吐量。
如果您使用应用程序清除管理,如果您使用更快的磁盘,则设置较低冲刷阈值可能是合适的。
3.11. 分区重新平衡可用性
可在代理间复制分区以进行容错。对于给定分区,一个代理被选举领导机,并处理所有生成请求(写入日志)。在其他代理上,分区遵循 在出现领导故障时复制分区领导分区数据以达到数据可靠性。
跟随者通常不服务客户端,虽然 机架
配置允许使用者在 Kafka 集群跨越多个数据中心时使用来自最接近的副本的消息。跟随者只操作只从分区领导中复制消息,并允许恢复(leader)失败。恢复需要同步的后续程序。跟随者通过发送获取请求到领导请求来保持同步,这将向后续消息返回消息。如果后续者已与领导上最近提交的消息捕获出来,则后续者被视为不同步。领导机通过查看后续者请求的最后一个偏移来进行检查。如果当前领导领导失败,则一个不同步的跟随者通常不符合资格,除非 允许不干净的领导选举机制。
您可以在后续时间没有同步前调整滞后的时间:
# ... replica.lag.time.max.ms=30000 # ...
滞后时间对将消息复制到所有同步副本的时间施加上限,以及制作者必须等待确认的时长。如果后续者无法制作获取请求,并在指定的滞后时间内捕获最新的消息,则会从同步副本中删除。您可以减少滞后时间检测失败的副本,但这样做可能会增加不必要的同步的跟随者数量。正确的滞后时间值取决于网络延迟和代理磁盘带宽。
当领导分区不再可用时,会选择其中一个同步副本作为新的领导。分区列表中的第一个代理称为 首选的 领导。默认情况下,Kafka 会根据定期检查领导发行版为自动分区领导重新平衡启用。也就是说,Kafka 会检查首选领导是否是 当前的 领导。重新平衡可确保领导在代理和代理间平均分布不会被过载。
您可以使用 Cruise Control for AMQ Streams 来找出副本分配到代理,该代理会在集群间平均平衡负载。其计算考虑了领导和后续者遇到的不同负载。失败的领导会影响 Kafka 集群的平衡,因为剩余的代理会获得领先额外分区的额外工作。
对于 Cruise Control 所发现的分配,实际处于均衡状态,需要分区由首选领导。Kafka 可以自动确保使用首选领导(在可能的情况下),根据需要更改当前的领导。这样可确保集群保持在 Cruise Control 找到的 balanced 状态。
您可以控制重新平衡检查的频率(以秒为单位),以及触发重新平衡前允许代理的最大 imbalance 百分比。
#... auto.leader.rebalance.enable=true leader.imbalance.check.interval.seconds=300 leader.imbalance.per.broker.percentage=10 #...
代理的领导中断百分比是代理当前领导分区数与首选领导分区数之间的比例。您可以将百分比设为零,以确保首选领导始终被选择,假设它们同步。
如果检查重新平衡需要更多控制,您可以禁用自动重新平衡。然后,您可以选择何时使用 kafka-leader-election.sh
命令行工具触发重新平衡。
AMQ Streams 提供的 Grafana 仪表板显示没有活跃领导的分区和分区的指标。
3.12. unclean leader 选举机制
对 in-sync 副本的领导选举被视为干净,因为它保证不会丢失数据。这是默认情况。但是,如果没有同步副本来接管领导,情况会怎样?当领导的磁盘被禁止时,ISR (同步副本)只会包括领导。如果没有设置最小同步副本数,且其硬盘失败时没有跟随与分区领导同步,则数据已经丢失。不仅如此,新的领导产品也无法被选举,因为没有同步的跟随者。
您可以配置 Kafka 如何处理领导失败:
# ... unclean.leader.election.enable=false # ...
在默认情况下,未清除的领导选举机制被禁用,这意味着不同步的副本不能成为领导。对于干净的领导选举机制,如果在旧的领导丢失时,如果其他代理没有 ISR,则 Kafka 会等待到领导重新在线,然后才能写入或读取信息。Unclean leader 选举意味着不同步副本可能会成为领导,但可能会丢失消息。您做出的选择取决于您的要求是否首选使用可用性还是持久性。
您可以在主题级别覆盖特定主题的默认配置。如果无法负担数据丢失的风险,请保留默认配置。
3.13. 避免不必要的消费者组重新平衡
对于加入新消费者组的消费者,您可以添加延迟,以避免对代理进行不必要的重新平衡:
# ... group.initial.rebalance.delay.ms=3000 # ...
延迟是协调器等待成员加入的时间长度。延迟越长,很可能是所有成员将一次加入并避免重新平衡。但是,延迟也会阻止组消耗到周期结束为止。
第 4 章 Kafka 使用者配置调整
使用带有针对特定用例量身定制的可选属性的基本消费者配置。
对消费者进行调优时,您的主要问题将确保它们与数据数量高效进行。与生产者调优一样,准备进行增量更改,直到使用者按预期运行。
4.1. 基本消费者配置
每个消费者都需要 connection 和 deserializer 属性。通常,为跟踪添加客户端 ID 是不错的做法。
在消费者配置中,无论后续配置是什么:
- 消费者从给定的偏移中获取并按顺序使用消息,除非偏移被更改为跳过或重新读取消息。
- 代理不知道消费者是否处理响应,即使向 Kafka 提交偏移,因为偏移可能会发送到集群中的不同代理。
基本消费者配置属性
# ... bootstrap.servers=localhost:9092 1 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 2 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer 3 client.id=my-client 4 group.id=my-group-id 5 # ...
- 1
- (必需)告诉消费者使用 Kafka 代理的 host:port bootstrap 服务器地址连接到 Kafka 集群。消费者使用地址来发现并连接到集群中的所有代理。如果服务器停机,请使用逗号分隔的列表来指定两个或三个地址,但不需要提供集群中所有代理的列表。如果您使用 loadbalancer 服务来公开 Kafka 集群,则只需要该服务的地址,因为可用性是由 loadbalancer 处理的。
- 2
- (必需) Deserializer 将从 Kafka 代理获取的字节转换为消息键。
- 3
- (必需) Deserializer 将从 Kafka 代理获取的字节转换为消息值。
- 4
- (可选)客户端的逻辑名称,用于日志和指标来识别请求源。id 也可用于根据处理时间配额对消费者进行节流。
- 5
- (条件) 需要一个 组 id,以便消费者能够加入消费者组。
4.2. 使用消费者组扩展数据消耗
消费者组共享由给定主题中的一个或多个制作者生成的大型数据流。消费者使用 group.id
属性分组,允许消息分散到成员中。组中的一个消费者选择领导,并决定如何为组中的消费者分配分区。每个分区只能分配给一个消费者。
如果您还没有作为分区的用户数量,您可以通过添加更多具有相同 group.id
的消费者实例来扩展数据消耗。向组中添加比分区更多的消费者不会帮助吞吐量,但这意味着处于待机状态的用户应该有一个停止工作。如果您可以使用较少的用户达到吞吐量目标,则可以节省资源。
同一使用者组中的消费者将偏移提交和心跳发送到同一代理。因此,组中的使用者数量越大,代理上的请求负载越高。
# ...
group.id=my-group-id 1
# ...
- 1
- 使用组 id 将使用者添加到消费者组中。
4.3. 消息排序保证
Kafka 代理从请求代理从请求代理从主题、分区和偏移位置列表发送信息的用户接收请求。
消费者以与它们提交代理相同的顺序观察单个分区中的信息,这意味着 Kafka 只 为单一分区中的信息提供排序保证。相反,如果消费者消耗来自多个分区的消息,则消费者观察到的不同分区中的消息顺序不一定反映它们发送的顺序。
如果您希望从一个主题对信息进行严格的排序,请为每个消费者使用一个分区。
4.4. 优化消费者进行吞吐量和延迟
控制客户端应用程序调用 KafkaConsumer.poll ()
时返回的消息数量。
使用 fetch.max.wait.ms
和 fetch.min.bytes
属性来增加由 Kafka 代理使用者获取的最小数据量。基于时间的批处理是使用 fetch.max.wait.ms
进行配置的,基于大小的批处理则使用 fetch.min.bytes
进行配置。
如果消费者或代理中的 CPU 使用率很高,这可能是因为来自消费者的请求太多。您可以调整 fetch.max.wait.ms
和 fetch.min.bytes
属性,以便请求数量较少,消息就会在较大的批处理中交付。通过调整更高的吞吐量,以一定的延迟成本提高吞吐量。如果生成的数据量较低,您也可以调整更高的值。
例如,如果您将 fetch.max.wait.ms
设置为 500ms,并将 fetch.min.bytes
设置为 16384 字节,当 Kafka 收到从消费者获取请求时,它将在达到其中一个阈值时响应。
相反,您可以调整 fetch.max.wait.ms
和 fetch.min.bytes
属性,以改进端到端延迟。
# ... fetch.max.wait.ms=500 1 fetch.min.bytes=16384 2 # ...
通过增加获取请求大小来降低延迟
使用 fetch.max.bytes
和 max.partition.fetch.bytes
属性来增加 Kafka 代理使用者获取的最大数据量。
fetch.max.bytes
属性一次对从代理获取的数据量设置一个最大限制(以字节为单位)。
max.partition.fetch.bytes
会以字节为单位设置每个分区返回的最大限制,每个分区必须始终大于代理或主题配置中设置的 max.message.bytes
字节数。
客户端可消耗的最大内存量大约为:
NUMBER-OF-BROKERS * fetch.max.bytes and NUMBER-OF-PARTITIONS * max.partition.fetch.bytes
如果内存用量可以容纳它,您可以增加这两个属性的值。通过在每个请求中允许更多数据,当获取请求较少时,会改进延迟。
# ... fetch.max.bytes=52428800 1 max.partition.fetch.bytes=1048576 2 # ...
4.5. 在提交偏移时避免数据丢失或重复
Kafka auto-commit 机制 允许使用者自动提交消息偏移。如果启用,消费者将提交从以 5000ms 间隔轮询代理接收的偏移量。
auto-commit 机制很方便,但会带来数据丢失和重复的风险。如果消费者获取并转换了多个消息,但系统在执行自动提交时会在消费者缓冲区中崩溃处理的消息,则该数据将会丢失。如果系统在处理消息后崩溃,但在执行 auto-commit 之前,数据会在重新平衡后在另一个消费者实例上重复。
只有在下一次轮询代理或消费者关闭前处理所有信息时,自动提交才能避免数据丢失。
要最小化数据丢失或重复的可能性,您可以将 enable.auto.commit
设置为 false
,并开发客户端应用来更好地控制提交偏移。或者,您可以使用 auto.commit.interval.ms
来降低提交之间的间隔。
# ...
enable.auto.commit=false 1
# ...
- 1
- 自动提交设置为 false,以提供对提交偏移的更多控制。
通过将 enable.auto.commit
设置为 false
,您可以在 执行所有 处理后提交偏移,并且消息已被使用。例如,您可以设置应用程序来调用 Kafka commitSync
和 commitAsync
提交 API。
commitSync
API 在从轮询返回的消息批处理中提交偏移量。完成处理批处理后,您将调用 API。如果使用 commitSync
API,则应用程序不会轮询新消息,直到批处理中的最后一个偏移提交为止。如果这个负面影响吞吐量,您可以更频繁提交,也可以使用 commitAsync
API。commitAsync
API 不会等待代理响应提交请求,但风险在重新平衡时创建更多重复。一个常见的方法是,将应用程序中的提交 API 与仅在关闭消费者或重新平衡前使用的 commitSync
API 组合,以确保最终提交成功。
4.5.1. 控制事务消息
考虑在制作者一侧使用事务 ID 和启用 idempotence (enable.idempotence=true
)来保证完全传输。然后,您可以使用 isolation.level
属性来控制使用者读取事务消息的方式。
isolation.level
属性有两个有效值:
-
read_committed
-
read_uncommitted
(默认)
使用 read_committed
来确保只有提交的事务消息才会被消费者读取。但是,这将增加端到端延迟,因为消费者将无法返回消息,直到代理编写了记录事务结果的事务标记(提交 或 中止)。
# ...
enable.auto.commit=false
isolation.level=read_committed 1
# ...
- 1
- 设置
read_committed
,以便只有提交的消息才会被消费者读取。
4.6. 恢复失败以避免数据丢失
使用 session.timeout.ms
和 heartbeat.interval.ms
属性配置检查和从消费者组中的消费者故障中恢复所需的时间。
session.timeout.ms
属性指定消费者组中的消费者在被视为不活动前无法 使用代理 的最长时间(毫秒)。当组重新平衡时,分区会被重新分配给组成员。
heartbeat
.interval.ms 属性指定对消费者组协调器的心跳检查间隔(以毫秒为单位),以指示消费者处于活动状态并连接。心跳间隔必须较低(通常由第三个间隔),而不是会话超时间隔。
如果您设置的 session.timeout.ms
属性较低,则先检测到失败的用户,并且重新平衡可能会更快。但是,要小心设置超时,从而使代理无法及时接收心跳信号,并触发不必要的重新平衡。
减少心跳间隔可减少意外重新平衡的可能性,但更频繁的心跳会增加代理资源的开销。
4.7. 管理偏移策略
使用 auto.offset.reset
属性来控制消费者在没有提交偏移时的行为方式,或者提交的偏移不再有效或删除。
假设您第一次部署消费者应用,并且从现有主题读取消息。由于第一次使用 group.id
,因此 __consumer_offsets
主题不包含此应用的任何偏移信息。新应用可以开始处理日志开始时的所有现有消息,或者仅处理新消息。默认的重置值是 latest
,它从分区的末尾启动,因此会遗漏了一些消息。为避免数据丢失,但要增加处理量,将 auto.offset.reset
设置为 earliest
以在分区的头部开始。
另外,还要考虑使用 earliest
选项,以避免在为代理配置的偏移保留周期(offsets.retention.minutes
)时丢失消息。如果消费者组或独立使用者不活跃,且在保留期间提交没有偏移,则之前提交的偏移将从 __consumer_offsets
中删除。
# ... heartbeat.interval.ms=3000 1 session.timeout.ms=45000 2 auto.offset.reset=earliest 3 # ...
如果单个获取请求中返回的数据量较大,则使用者处理请求前可能会发生超时。在这种情况下,您可以降低 max.partition.fetch.bytes
或增加 session.timeout.ms
。
4.8. 最小化重新平衡的影响
在组中的活跃消费者之间重新平衡分区是需要的时间:
- 消费者提交其偏移量
- 要组的新消费者组
- 为组成员分配分区的组领导
- 组中的消费者接收其分配并开始获取
显然,这个过程会增加服务的停机时间,特别是在在消费者组集群滚动重启过程中重复发生时。
在这种情况下,您可以使用 静态成员资格 的概念来减少重新平衡数量。重新平衡会在消费者组成员之间均匀分配主题分区。静态成员资格使用持久性,以便在会话超时后在重启后识别消费者实例。
消费者组协调器可以使用通过 group.instance.id
属性指定的唯一 ID 来识别新的消费者实例。在重启过程中,消费者被分配一个新的成员 ID,但作为静态成员,它会继续使用相同的实例 ID,并且创建相同的主题分区分配。
如果消费者应用没有调用轮询至少每个 max.poll.interval.ms
毫秒,则消费者被视为失败,从而导致重新平衡。如果应用程序无法处理一段时间内从轮询返回的所有记录,您可以使用 max.poll.interval.ms
属性指定来自消费者的新消息的轮询间隔(毫秒)。或者,您可以使用 max.poll.records
属性设置从消费者缓冲区返回的记录数的最大限制,允许应用程序处理 max.poll.interval.ms
限制中的记录数。
# ... group.instance.id=UNIQUE-ID 1 max.poll.interval.ms=300000 2 max.poll.records=500 3 # ...
第 5 章 Kafka 制作者配置调整
使用带有针对特定用例量身定制的可选属性的基本制作者配置。
调整您的配置以最大化吞吐量可能会增加延迟,反之亦然。您需要试验并调优制作者配置,以获取所需的平衡。
5.1. 基本制作者配置
每个制作者都需要连接和序列化属性。通常,为跟踪添加客户端 ID,并在生成者上使用压缩来减少请求中的批处理大小。
在基本制作者配置中:
- 分区中的信息顺序不能保证。
- 确认到达代理的消息不能保证持久性。
基本制作者配置属性
# ... bootstrap.servers=localhost:9092 1 key.serializer=org.apache.kafka.common.serialization.StringSerializer 2 value.serializer=org.apache.kafka.common.serialization.StringSerializer 3 client.id=my-client 4 compression.type=gzip 5 # ...
- 1
- (必需)告诉制作者,使用 Kafka 代理的 host:port bootstrap 服务器地址连接到 Kafka 集群。制作者使用地址来发现和连接到集群中的所有代理。如果服务器停机,请使用逗号分隔的列表来指定两个或三个地址,但不需要提供集群中所有代理的列表。
- 2
- (必需) Serializer 将每个消息的密钥转换为字节,然后再将其发送到代理。
- 3
- (必需) Serializer 将每个消息的值转换为字节,然后再将其发送到代理。
- 4
- (可选)客户端的逻辑名称,用于日志和指标来识别请求源。
- 5
- (可选)压缩消息的 codec,这些消息以压缩格式存储,然后在到达消费者时进行解压缩。压缩可用于提高吞吐量和降低存储负载,但可能不适用于压缩或解压缩成本的低延迟应用程序。
5.2. 数据持久性
邮件发送确认可最大程度降低消息丢失的可能性。默认情况下,使用 acks=all
设置的 acks
属性启用确认。
确认消息交付
# ...
acks=all 1
# ...
- 1
acks=all
强制领导副本将消息复制到特定数量的后续者,然后再确认成功收到消息请求。
acks=all
设置提供了最强的保证,但它会增加制作者发送消息和接收确认之间的延迟。如果您不要求此类强大的保证,则设置 acks=0
或 acks=1
不提供交付保证,或者只确认领导副本已将记录写入其日志。
使用 acks=all
时,领导机会等待所有同步副本确认消息发送。主题的 min.insync.replicas
配置设置最低要求的 in-sync 副本确认数。确认次数包括领导和跟随者。
典型的起点是使用以下配置:
制作者配置:
-
acks=all
(默认)
-
主题复制的代理配置:
-
default.replication.factor=3
(default =1
) -
min.insync.replicas=2
(default =1
)
-
在创建主题时,您可以覆盖默认的复制因素。您还可以在主题配置中的主题级别覆盖 min.insync.replicas
。
AMQ Streams 在示例配置文件中使用此配置进行 Kafka 的多节点部署。
下表描述了此配置如何根据复制领导副本的遵循者来运行。
可用和同步的跟随者数量 | 致谢 | 制作者可以发送消息? |
---|---|---|
2 | 领导等待 2 个跟随者的确认 | 是 |
1 | 领导等待 1 个跟随者的确认 | 是 |
0 | 领导会引发异常 | 否 |
3 的主题复制因素会创建一个领导副本和两个后续者。在此配置中,如果单个后续者不可用,则制作者可以继续。有些延迟可能会使从同步副本中删除失败的代理或创建新领导。如果第二个后续程序也不可用,消息发送将无法成功。领导机向生产者发送错误(不是足够副本),而不是确认成功发送的消息。生产者引发一个对等的异常。使用 重试
配置时,生产者可以重新发送失败的消息请求。
如果系统失败,则缓冲区中不会丢失数据的风险。
5.3. 排序交付
幂等的制作者避免重复,因为消息准确传送一次。ID 和序列号分配给消息,以确保交付的顺序,即使出现失败时也是如此。如果您将 acks=all
用于数据一致性,使用 idempotency 有助于排序交付。默认情况下,为制作者启用幂等性。启用 idempotency 后,您可以将并发内请求数量设置为最多 5 个,以便保留消息排序。
使用 idempotency 排序交付
# ... enable.idempotence=true 1 max.in.flight.requests.per.connection=5 2 acks=all 3 retries=2147483647 4 # ...
如果您选择不使用 acks=all
,并且由于性能成本而禁用幂等性,请将 in-flight (忽略)请求的数量设置为 1 以保持排序。否则,只有在 Message-B 已写入代理后 Message-A 才会成功。
订购没有幂等的交付
# ... enable.idempotence=false 1 max.in.flight.requests.per.connection=1 2 retries=2147483647 # ...
5.4. 可靠性保证
幂等性对于精确写入单个分区很有用。事务与 idempotence 一起使用时,精确允许在多个分区间写入一次。
使用相同事务 ID 发送的事务消息只生成一次,因此只会所有都成功写入到相应的日志,或所有都没有写入。
# ... enable.idempotence=true max.in.flight.requests.per.connection=5 acks=all retries=2147483647 transactional.id=UNIQUE-ID 1 transaction.timeout.ms=900000 2 # ...
为保持事务保证,选择 transactional.id
非常重要。每个事务 ID 都应该用于一组唯一的主题分区。例如,这可以通过使用主题分区名称外部映射到事务 ID 来实现的,也可以使用避免冲突的功能从主题分区名称计算事务 ID。
5.5. 优化制作者以实现吞吐量和延迟
通常,系统的要求是满足特定吞吐量目标,以实现在给定延迟内对消息的比例进行比例。例如,在 2 秒内被确认为每秒 500,000条消息,其中 95% 的消息被确认。
您的生产者的消息传递语义(消息顺序和持久性)可能由应用程序的要求定义。例如,您可能没有使用 acks=0
或 acks=1
的选项,而不破坏某些重要属性或保证由您的应用程序提供。
代理重启对高百分比的统计信息有显著影响。例如,在一个长时间内,99th percentile 的延迟是由代理重启的行为所代表的。这在设计基准测试或将性能数量与生产中性能数进行比较时需要考虑。
根据您的目标,Kafka 提供了多个配置参数,以及用于调优吞吐量和延迟的制作者性能的技术。
- 邮件批处理(
linger.ms
和batch.size
) -
消息批量发送延迟,希望发送更多用于相同代理的消息,允许将它们批处理到单个生成请求中。批处理是实现更高吞吐量的返回延迟之间的折衷。使用
linger.ms
配置基于时间的批处理,并且基于大小的批处理则使用batch.size
进行配置。 - 压缩(
压缩.type
) -
消息压缩会增加制作者的延迟(CPU 时间用于压缩消息),但会发出请求(以及可能磁盘写入)更小,这可能会提高吞吐量。无论压缩是否值得使用,以及要使用的最佳压缩将取决于所发送的消息。压缩发生于调用
KafkaProducer.send ()
的线程上,因此,如果此方法的延迟对于应用程序而言,您应该考虑使用更多线程。 - pipelining (
max.in.flight.requests.per.connection
) - pipelining 意味着在收到对之前请求的响应前发送更多请求。通常,更多的 pipelining 意味着更好的吞吐量,达到了其他影响的阈值(如更糟糕的批处理)开始对吞吐量的影响反击。
降低延迟
当应用程序调用 KafkaProducer.send ()
时,信息是:
- 由任何拦截器处理
- serialized
- 分配给分区
- 压缩
- 添加到每个分区队列中的批处理消息
在哪一个点上,发送()
方法返回。因此,时间 send ()
被阻止由以下决定:
- 拦截器、序列化器和分区器中花费的时间
- 使用的压缩算法
- 等待缓冲区用于压缩所需的时间
批处理将保留在队列中,直到出现以下情况之一:
-
批处理已满(根据
batch.size
) -
linger.ms
引入的延迟已经通过 - 发件人是将其他分区的消息批处理发送到同一代理,也可以添加此批处理
- 制作者正在刷新或关闭
查看批处理和缓冲的配置,以减轻 send ()
阻塞延迟的影响。
# ... linger.ms=100 1 batch.size=16384 2 buffer.memory=33554432 3 # ...
增加吞吐量
通过调整消息发送前等待的最长时间来改进消息请求的吞吐量,并完成发送请求。
您还可以通过编写自定义分区来将消息定向到指定的分区,以替换默认值。
# ... delivery.timeout.ms=120000 1 partitioner.class=my-custom-partitioner 2 # ...
附录 A. 使用您的订阅
AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
访问您的帐户
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
激活订阅
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
下载 Zip 和 Tar 文件
要访问 zip 或 tar 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 在 INTEGRATION AND AUTOMATION 目录中找到 AMQ Streams for Apache Kafka 项。
- 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
- 单击组件的 Download 链接。
使用 DNF 安装软件包
要安装软件包以及所有软件包的依赖软件包,请使用:
dnf install <package_name>
要从本地目录中安装之前下载的软件包,请使用:
dnf install <path_to_download_package>
更新于 2023-05-19