搜索

12.4. 调整客户端配置

download PDF

使用配置属性优化 Kafka 制作者和使用者的性能。

需要最小的配置属性集合,但您可以添加或调整属性以更改生产者和使用者如何与 Kafka 交互。例如,对于制作者,您可以调整消息的延迟和吞吐量,以便客户端能够实时响应数据。或者您可以更改配置以提供更强大的消息持久性保证。

首先,您可以分析客户端指标来测量进行初始配置的位置,然后进行增量更改和进一步比较,直到您拥有所需的配置。

12.4.1. Kafka 生成器配置调整

使用基本制作者配置,以及为特定用例量身定制的可选属性。

调整配置以最大化吞吐量可能会增加延迟,反之亦然。您将需要试验并调优制作者配置,以获得所需的平衡。

12.4.1.1. 基本制作者配置

每个制作者都需要连接和序列化程序属性。通常而言,最好是添加客户端 ID 以进行跟踪,并对制作者使用压缩来减少请求中的批处理大小。

在基本制作者配置中:

  • 无法保证分区中消息的顺序。
  • 确认到达代理的消息不能保证持久性。
# ...
bootstrap.servers=localhost:9092 1
key.serializer=org.apache.kafka.common.serialization.StringSerializer 2
value.serializer=org.apache.kafka.common.serialization.StringSerializer 3
client.id=my-client 4
compression.type=gzip 5
# ...
1
(必需)告诉制作者使用 Kafka 代理的 host:port bootstrap 服务器地址连接到 Kafka 集群。制作者使用该地址来发现和连接集群中的所有代理。在服务器停机时使用逗号分隔列表来指定两个或三个地址,但不需要提供集群中所有代理的列表。
2
(必需)在将每条消息的密钥发送到代理前将其转换为字节。
3
(必需)在将每个消息发送到代理前将每条消息的值转换为字节。
4
(可选)客户端的逻辑名称,用于日志和指标来标识请求的来源。
5
(可选)压缩消息的编码器(发送并可能以压缩格式存储),然后在到达消费者时解压缩。压缩对于提高吞吐量和减少存储负载非常有用,但可能不适用于低延迟应用程序,因为压缩或解压成本可能过高。

12.4.1.2. 数据持久性

您可以使用消息发送确认,应用更大的数据持久性,以最大程度降低消息丢失的可能性。

# ...
acks=all 1
# ...
1
指定 acks=all 会强制分区领导将消息复制到一定数量的跟随者,然后确认消息请求已成功收到。由于附加检查,acks=all 会增加生产者发送消息和接收确认之间的延迟。

在将确认发送到生产者之前,需要将消息附加至其日志中的代理数量由主题的 min.insync.replicas 配置决定。典型的起点是将主题复制因数为 3,其他代理上有两个内联副本。在这种配置中,如果单个代理不可用,生产者可以继续不受影响。如果第二个代理不可用,生产者将不会收到确认并且无法生成更多消息。

支持 acks=all 的主题配置

# ...
min.insync.replicas=2 1
# ...

1
使用 2 内同步的副本。默认值为 1
注意

如果系统失败,则缓冲区中存在不正确的数据丢失的风险。

12.4.1.3. 订购交付

幂等制作者避免重复,因为消息只发送一次。为消息分配了 ID 和序列号,以确保传送顺序,即使出现故障也是如此。如果您使用 acks=all 来实现数据一致性,则启用幂等性对有序交付有利。

使用幂等方式订购交付

# ...
enable.idempotence=true 1
max.in.flight.requests.per.connection=5 2
acks=all 3
retries=2147483647 4
# ...

1
设置为 true,以启用幂等制作者。
2
通过幂等发送,即时请求数可能大于 1,同时仍然提供消息排序保证。默认值为 5 个 in-flight 请求。
3
将一个cks 设置为 所有.
4
设置重新发送失败消息请求的尝试次数。

如果您没有由于性能成本而使用 acks=all 和 幂等性,请将待机(未确认)请求数设置为 1 以保持排序。否则,只有在 Message- B 已写入代理后 Message- A 可能无法成功。

在没有幂等的情况下订购交付

# ...
enable.idempotence=false 1
max.in.flight.requests.per.connection=1 2
retries=2147483647
# ...

1
设置为 false,以禁用幂等制作者。
2
将 in-flight 请求数设置为正好 1

12.4.1.4. 可靠性保证

仅对写入单个分区一次,Idempotence 非常有用。事务处理与幂等性一起使用时,允许在多个分区间只写入一次。

事务可确保使用相同事务 ID 的消息只生成一次,并且将 所有 消息都成功写入到对应的日志中,或者其中任何消息 都不是.

# ...
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647
transactional.id=UNIQUE-ID 1
transaction.timeout.ms=900000 2
# ...
1
指定唯一的事务 ID。
2
在返回超时错误前,设置以毫秒为单位进行事务的最大允许时间。默认值为 900000 或 15 分钟。

transaction .id 的选择对于保持事务保证非常重要。每个事务 id 都应该用于一组唯一的主题分区。例如,这可以通过外部映射主题分区名称到事务 id 来实现,或者通过使用避免冲突的功能计算主题分区名称中的事务 ID。

12.4.1.5. 优化吞吐量和延迟

通常,系统的要求是满足给定延迟内一定比例消息的特定吞吐量目标。例如,以每秒 500,000条消息为目标,95% 的消息会在 2 秒内得到确认。

生产者的消息传递语义(消息排序和持久性)很可能根据您的应用程序的要求进行定义。例如,您可能没有选项在不破坏某些重要属性的情况下使用 acks=0acks=1,或者无法保证应用程序提供。

Broker 重新启动对高百分比统计数据有显著影响。例如,在很长一段时间内,99 百分点延迟由围绕代理重启的行为占据主导地位。在设计基准测试时,或比较基准测试与生产中显示的性能数字时,需要考虑这一点。

根据您的目标,Kafka 提供了多个配置参数和技术来调节吞吐量和延迟的性能。

消息批处理(linger.msbatch.size
消息批处理会延迟发送消息,希望将发送更多目标为同一代理的消息,允许它们批处理到单个生成请求。批处理是在高吞吐量时返回的更高延迟之间的妥协。基于时间的批处理使用 linger .ms 配置,而基于大小的批处理则使用 batch.size 配置。
压缩(压缩.type
消息压缩增加了制作者延迟(CPU 时间用于压缩消息),但会更小(可能进行磁盘写入),这可以提高吞吐量。压缩是否必要,以及要使用的最佳压缩程度取决于所发送的消息。压缩发生在调用 KafkaProducer.send()的 线程上,因此如果此方法的延迟与您需要使用更多线程的应用程序相关。
pipelining(max.in.flight.requests.per.connection)
pipelining 意味着在收到对上一个请求的响应前发送更多请求。通常,更多流水线意味着更好的吞吐量,最高是一个阈值,达到其他效果(例如更糟糕的批处理)开始消除对吞吐量的影响。

降低延迟

当您的应用程序调用 KafkaProducer.send() 时,消息为:

  • 由任何拦截器处理
  • serialized
  • 分配给分区
  • 已压缩
  • 添加到每个分区队列中的批量消息

send() 方法返回的时间点。因此, send() 的时间由以下方法决定:

  • 拦截器、序列程序和分区器花费的时间
  • 使用的压缩算法
  • 等待缓冲区用于压缩所需的时间

批处理将保留在队列中,直到出现以下情况之一:

  • 批处理已满(根据 batch.size
  • linger .ms 引入的延迟已过
  • 发送方即将向同一代理发送其他分区的消息批处理,也可以添加此批处理
  • 生产者被清空或关闭

查看批处理和缓冲的配置,以减轻 send() 阻止对延迟的影响。

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger 属性添加毫秒的延迟,以便在请求中累积和发送更大规模的消息。默认值为 0'。
2
如果使用最大 批处理.size,则在达到最大值时发送请求,或者消息已排队的时间超过 linger .ms (以较快者为准)。添加延迟可让批处理积累消息到批处理大小。
3
缓冲区的大小必须至少与批处理大小相同,并且能够适应缓冲区、压缩和内向请求。

增加吞吐量

通过调整消息传输和完成发送请求前等待的最长时间,提高消息请求的吞吐量。

您还可以通过编写自定义分区程序来替换默认分区,将消息定向到指定分区。

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
等待完整发送请求的最长时间,以毫秒为单位。您可以将值设为 MAX_LONG,以将值委派给 Kafka 重试次数的未定义次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.