12.4. 调整客户端配置
使用配置属性优化 Kafka 制作者和使用者的性能。
需要最小的配置属性集合,但您可以添加或调整属性以更改生产者和使用者如何与 Kafka 交互。例如,对于制作者,您可以调整消息的延迟和吞吐量,以便客户端能够实时响应数据。或者您可以更改配置以提供更强大的消息持久性保证。
首先,您可以分析客户端指标来测量进行初始配置的位置,然后进行增量更改和进一步比较,直到您拥有所需的配置。
12.4.1. Kafka 生成器配置调整
使用基本制作者配置,以及为特定用例量身定制的可选属性。
调整配置以最大化吞吐量可能会增加延迟,反之亦然。您将需要试验并调优制作者配置,以获得所需的平衡。
12.4.1.1. 基本制作者配置
每个制作者都需要连接和序列化程序属性。通常而言,最好是添加客户端 ID 以进行跟踪,并对制作者使用压缩来减少请求中的批处理大小。
在基本制作者配置中:
- 无法保证分区中消息的顺序。
- 确认到达代理的消息不能保证持久性。
# ... bootstrap.servers=localhost:9092 1 key.serializer=org.apache.kafka.common.serialization.StringSerializer 2 value.serializer=org.apache.kafka.common.serialization.StringSerializer 3 client.id=my-client 4 compression.type=gzip 5 # ...
- 1
- (必需)告诉制作者使用 Kafka 代理的 host:port bootstrap 服务器地址连接到 Kafka 集群。制作者使用该地址来发现和连接集群中的所有代理。在服务器停机时使用逗号分隔列表来指定两个或三个地址,但不需要提供集群中所有代理的列表。
- 2
- (必需)在将每条消息的密钥发送到代理前将其转换为字节。
- 3
- (必需)在将每个消息发送到代理前将每条消息的值转换为字节。
- 4
- (可选)客户端的逻辑名称,用于日志和指标来标识请求的来源。
- 5
- (可选)压缩消息的编码器(发送并可能以压缩格式存储),然后在到达消费者时解压缩。压缩对于提高吞吐量和减少存储负载非常有用,但可能不适用于低延迟应用程序,因为压缩或解压成本可能过高。
12.4.1.2. 数据持久性
您可以使用消息发送确认,应用更大的数据持久性,以最大程度降低消息丢失的可能性。
# ...
acks=all 1
# ...
- 1
- 指定
acks=all
会强制分区领导将消息复制到一定数量的跟随者,然后确认消息请求已成功收到。由于附加检查,acks=all
会增加生产者发送消息和接收确认之间的延迟。
在将确认发送到生产者之前,需要将消息附加至其日志中的代理数量由主题的 min.insync.replicas
配置决定。典型的起点是将主题复制因数为 3,其他代理上有两个内联副本。在这种配置中,如果单个代理不可用,生产者可以继续不受影响。如果第二个代理不可用,生产者将不会收到确认并且无法生成更多消息。
支持 acks=all 的主题
配置
# ...
min.insync.replicas=2 1
# ...
- 1
- 使用
2
内同步的副本。默认值为1
。
如果系统失败,则缓冲区中存在不正确的数据丢失的风险。
12.4.1.3. 订购交付
幂等制作者避免重复,因为消息只发送一次。为消息分配了 ID 和序列号,以确保传送顺序,即使出现故障也是如此。如果您使用 acks=all
来实现数据一致性,则启用幂等性对有序交付有利。
使用幂等方式订购交付
# ... enable.idempotence=true 1 max.in.flight.requests.per.connection=5 2 acks=all 3 retries=2147483647 4 # ...
如果您没有由于性能成本而使用 acks=all
和 幂等性,请将待机(未确认)请求数设置为 1 以保持排序。否则,只有在 Message- B 已写入代理后 Message- A 可能无法成功。
在没有幂等的情况下订购交付
# ... enable.idempotence=false 1 max.in.flight.requests.per.connection=1 2 retries=2147483647 # ...
12.4.1.4. 可靠性保证
仅对写入单个分区一次,Idempotence 非常有用。事务处理与幂等性一起使用时,允许在多个分区间只写入一次。
事务可确保使用相同事务 ID 的消息只生成一次,并且将 所有 消息都成功写入到对应的日志中,或者其中任何消息 都不是.
# ... enable.idempotence=true max.in.flight.requests.per.connection=5 acks=all retries=2147483647 transactional.id=UNIQUE-ID 1 transaction.timeout.ms=900000 2 # ...
transaction .id
的选择对于保持事务保证非常重要。每个事务 id 都应该用于一组唯一的主题分区。例如,这可以通过外部映射主题分区名称到事务 id 来实现,或者通过使用避免冲突的功能计算主题分区名称中的事务 ID。
12.4.1.5. 优化吞吐量和延迟
通常,系统的要求是满足给定延迟内一定比例消息的特定吞吐量目标。例如,以每秒 500,000条消息为目标,95% 的消息会在 2 秒内得到确认。
生产者的消息传递语义(消息排序和持久性)很可能根据您的应用程序的要求进行定义。例如,您可能没有选项在不破坏某些重要属性的情况下使用 acks=0
或 acks=1
,或者无法保证应用程序提供。
Broker 重新启动对高百分比统计数据有显著影响。例如,在很长一段时间内,99 百分点延迟由围绕代理重启的行为占据主导地位。在设计基准测试时,或比较基准测试与生产中显示的性能数字时,需要考虑这一点。
根据您的目标,Kafka 提供了多个配置参数和技术来调节吞吐量和延迟的性能。
- 消息批处理(ling
er.ms
和batch.size
) -
消息批处理会延迟发送消息,希望将发送更多目标为同一代理的消息,允许它们批处理到单个生成请求。批处理是在高吞吐量时返回的更高延迟之间的妥协。基于时间的批处理使用 linger
.ms
配置,而基于大小的批处理则使用batch.size
配置。 压缩(压缩.type
)-
消息压缩增加了制作者延迟(CPU 时间用于压缩消息),但会更小(可能进行磁盘写入),这可以提高吞吐量。压缩是否必要,以及要使用的最佳压缩程度取决于所发送的消息。压缩发生在调用
KafkaProducer.send()的
线程上,因此如果此方法的延迟与您需要使用更多线程的应用程序相关。 - pipelining(
max.in.flight.requests.per.connection
) - pipelining 意味着在收到对上一个请求的响应前发送更多请求。通常,更多流水线意味着更好的吞吐量,最高是一个阈值,达到其他效果(例如更糟糕的批处理)开始消除对吞吐量的影响。
降低延迟
当您的应用程序调用 KafkaProducer.send()
时,消息为:
- 由任何拦截器处理
- serialized
- 分配给分区
- 已压缩
- 添加到每个分区队列中的批量消息
send()
方法返回的时间点。因此, send()
的时间由以下方法决定:
- 拦截器、序列程序和分区器花费的时间
- 使用的压缩算法
- 等待缓冲区用于压缩所需的时间
批处理将保留在队列中,直到出现以下情况之一:
-
批处理已满(根据
batch.size
) -
linger
.ms
引入的延迟已过 - 发送方即将向同一代理发送其他分区的消息批处理,也可以添加此批处理
- 生产者被清空或关闭
查看批处理和缓冲的配置,以减轻 send()
阻止对延迟的影响。
# ... linger.ms=100 1 batch.size=16384 2 buffer.memory=33554432 3 # ...
增加吞吐量
通过调整消息传输和完成发送请求前等待的最长时间,提高消息请求的吞吐量。
您还可以通过编写自定义分区程序来替换默认分区,将消息定向到指定分区。
# ... delivery.timeout.ms=120000 1 partitioner.class=my-custom-partitioner 2 # ...