附录 D. 生成者配置参数


key.serializer

type: class
Importance: high

serializer 类用于实现 org.apache.kafka.common.serialization.Serializer 接口的密钥。

value.serializer

type: class
Importance: high

Serializer 类用于实现 org.apache.kafka.common.serialization.Serializer 接口的值。

bootstrap.servers

type: list
Default: ""
Valid Values: non-null string
Importance: high

用于建立到 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器与此处为引导指定的服务器无关 - 此列表仅影响用于发现完整服务器集的初始主机。此列表的格式应为 host1:port1,host2:port2,…​。由于这些服务器仅用于初始连接来发现完整的群集成员身份(可能会动态更改),因此此列表不需要包含整组服务器(但是如果服务器停机,您可能需要多个服务器)。

buffer.memory

type: long
Default: 33554432
Valid Values: [0,…​]
Importance: high

制作者可用于缓冲记录等待发送到服务器的内存总量。如果发送记录速度快于可以发送到服务器,则生成者将阻止 max.block.ms 引发异常。

此设置应当与生成者将使用的内存总量对应,但不是硬绑定,因为生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩),以及维护动态请求。

compression.type

Type: string
Default: none
Importance: high

producer 生成的所有数据的压缩类型。默认为 none (例如,没有压缩)。有效值为 none,gzip,snappy,lz4, 或 zstd。压缩是整个数据的批处理,因此批处理的效率也会影响压缩率(更多批处理意味着更好的压缩)。

retries

type: int
Default: 2147483647
Valid Values: [0,…​,2147483647]
Importance: high

设置大于零的值将导致客户端重新发送发送失败的记录,并显示潜在的临时错误。请注意,这个重试与在收到错误时重新记录不同的是不同的。在不设置 max.in.flight.requests.per.connection 到 1 的重试的情况下,可能会更改记录顺序,因为如果两个批处理发送到单个分区,第一个批处理失败,但第二个批处理中的记录可能会首先出现。请注意,如果在通过 delivery.timeout.ms 配置超时前,生成请求将失败,然后再成功确认前过期。用户通常应不设置此配置,而是使用 delivery.timeout.ms 来控制重试行为。

ssl.key.password

Type: password
Default: null
Importance: high

密钥存储文件或者 'ssl.keystore.key' 中指定的 PEM 密钥的密码。只有在配置了双向身份验证时,客户端才需要这样做。

ssl.keystore.certificate.chain

Type: password
Default: null
Importance: high

使用由 'ssl.keystore.type" 指定的格式的证书链。默认 SSL 引擎工厂仅支持使用 X.509 证书列表的 PEM 格式。

ssl.keystore.key

Type: password
Default: null
Importance: high

使用 'ssl.keystore.type" 指定的格式的私钥。默认 SSL 引擎工厂只支持使用 PKCS#8 密钥的 PEM 格式。如果密钥加密,必须使用 'ssl.key.password' 指定密钥密码。

ssl.keystore.location

Type: string
Default: null
Importance: high

密钥存储文件的位置。这对客户端是可选的,可用于客户端进行双向身份验证。

ssl.keystore.password

Type: password
Default: null
Importance: high

密钥存储文件的存储密码。这对客户端是可选的,只有在配置了 'ssl.keystore.location' 时才需要。PEM 格式不支持密钥存储密码。

ssl.truststore.certificates

Type: password
Default: null
Importance: high

可信证书,格式为 'ssl.truststore.type'。默认 SSL 引擎工厂仅支持使用 X.509 证书使用 PEM 格式。

ssl.truststore.location

Type: string
Default: null
Importance: high

信任存储文件的位置。

ssl.truststore.password

Type: password
Default: null
Importance: high

信任存储文件的密码。如果没有设置密码,则仍然使用配置的信任存储文件,但禁用完整性检查。PEM 格式不支持信任存储密码。

batch.size

type: int
Default: 16384
Valid Values: [0,…​]
Importance: medium

每当将多个记录发送到同一分区时,生成者将尝试将记录一起批处理到较少的请求。这有助于客户端和服务器的性能。此配置控制默认批处理大小(以字节为单位)。

不会尝试批处理记录大于这个大小。

发送到代理的请求将包含多个批处理,每个分区对应一个数据可以发送。

小批处理大小会减少批处理频率,并可能会降低吞吐量(零的批处理将完全禁用批处理)。非常大的批处理大小可能会更严重地使用内存,因为我们将始终在附加记录下分配指定批处理大小的缓冲。

client.dns.lookup

Type: string
Default: use_all_dns_ips
Valid Values: [use_all_dns_ips, resolve_canonical_bootstrap_servers_only]
Importance: medium

控制客户端如何使用 DNS 查找。如果设置为 use_all_dns_ips,请按顺序连接到每个返回的 IP 地址,直到成功建立连接。断开连接后,会使用下一个 IP。当所有 IP 都被一次使用后,客户端会再次解析主机名(JVM 和 OS 缓存 DNS 名称查找)中的 IP。如果设置为 resolve_canonical_bootstrap_servers_only,请将每个 bootstrap 地址解析为规范名称列表。bootstrap 阶段后,它的行为与 use_all_dns_ips 相同。

client.id

Type: string
Default: ""
Importance: medium

在发出请求时传递给服务器的 id 字符串。这样做的目的是通过允许逻辑应用程序名称包含在服务器端请求日志记录中,从而跟踪除 ip/port 以外的请求源。

connections.max.idle.ms

Type: long
Default: 540000 (9 minutes)
Importance: medium

在这个配置指定的毫秒数后关闭闲置连接。

delivery.timeout.ms

type: int
Default: 120000 (2 minutes)
Valid Values: [0,…​]
Importance: medium

在调用 send () 返回后报告成功或失败的上限。这限制了发送前记录总时间、从代理等待确认时间(如果预期),以及可重新发送失败的时间。如果遇到了无法恢复的错误,则生成者可能报告无法发送记录,或者将记录添加到达到早期交付过期期限的批处理中。此配置的值应大于或等于 request.timeout.mslinger.ms 的总和。

linger.ms

type: long
Default: 0
Valid Values: [0,…​]
Importance: medium

生产者将请求传输之间到达单个批处理请求的任何记录组合在一起。通常,这只在记录到达速度快于发送时发生。然而,在某些情况下,客户端可能希望减少请求的数量,即使负载低下也是如此。此设置通过添加少量人工延迟来实现此目的,即,不立即发送记录,而不是立即发送记录,以便发送其他记录,以便发送其他记录。这可能被视为与 TCP 中的 Nagle 算法类似。此设置提供批处理延迟的上限:一旦我们获得 批处理。 无论此设置如何,无论此设置如何,它都会立即发送一次记录,但是如果我们为此分区累积了这个字节,我们将"linger"用于等待更多记录显示。此设置默认为 0 (例如,无延迟)。例如,设置 linger.ms=5 会影响减少发送的请求数,但会在没有负载的情况下向发送的记录添加最多 5ms 的延迟。

max.block.ms

type: long
Default: 60000 (1 minute)
Valid Values: [0,…​]
Importance: medium

这个配置控制 KafkaProducer’s `send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction() 方法将被阻塞多长时间。对于 send (),这个超时会绑定了元数据获取和缓冲区分配的总时间(在用户提供的 serializers 或 partitioner 中不会计算这个超时)。对于 partitionsFor (),这个超时会绑定等待元数据的时间(如果其不可用)。与事务相关的方法始终阻止,但如果事务协调器无法发现或没有在超时时间内响应,则可能会超时。

max.request.size

type: int
Default: 1048576
Valid Values: [0,…​]
Importance: medium

请求的最大大小(以字节为单位)。此设置将限制制作者将在单个请求中发送的记录批处理数量,以避免发送大量请求。这也实际上是最大未压缩的记录批处理大小的上限。请注意,服务器在记录批处理大小上具有自己的上限(如果启用了压缩,则进行压缩后),这可能与此不同。

partitioner.class

type: class
Default: org.apache.kafka.clients.producer.internals.DefaultPartitioner
Importance: medium

实施 org.apache.kafka.clients.producer.Partitioner 接口的 partitioner 类。

receive.buffer.bytes

type: int
Default: 32768 (32 kibibytes)
Valid Values: [-1,…​]
Importance: medium

读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,则使用操作系统默认值。

request.timeout.ms

type: int
Default: 30000 (30 秒)
Valid Values: [0,…​]
Importance: medium

配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在需要时重新发送请求(如果重试用时失败)。这应该大于 replica.lag.time.max.ms (代理配置),以减少因为不必要的制作者重试而出现消息重复的可能性。

sasl.client.callback.handler.class

Type: class
Default: null
Importance: medium

实现 AuthenticateCallbackHandler 接口的 SASL 客户端回调处理程序类的完全限定名称。

sasl.jaas.config

Type: password
Default: null
Importance: medium

用于 SASL 连接的 JAAS 登录上下文参数,其格式供 JAAS 配置文件使用。JAAS 配置文件格式描述 在此处。该值的格式是: loginModuleClass controlFlag (optionName=optionValue)*;。对于代理,配置必须在小写中带有监听前缀和 SASL 机制名称前缀。例如: listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;。

sasl.kerberos.service.name

Type: string
Default: null
Importance: medium

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 配置中定义。

sasl.login.callback.handler.class

Type: class
Default: null
Importance: medium

实现 AuthenticateCallbackHandler 接口的 SASL 登录回调处理程序类的完全限定名称。对于代理,登录回调处理器配置必须以监听器前缀和 SASL 机制名称作为前缀作为前缀。例如: listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler。

sasl.login.class

Type: class
Default: null
Importance: medium

实现登录接口的类的完全限定名称。对于代理,登录配置必须在小写中带有监听前缀和 SASL 机制名称前缀。For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin.

sasl.mechanism

Type: string
Default: GSSAPI
Importance: medium

用于客户端连接的 SASL 机制。这可能是提供安全提供程序的任何机制。GSSAPI 是默认机制。

security.protocol

Type: string
Default: PLAINTEXT
Importance: medium

用于与代理通信的协议。有效值为: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。

send.buffer.bytes

type: int
Default: 131072 (128 kibibytes)
Valid Values: [-1,…​]
Importance: medium

发送数据时要使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,则使用操作系统默认值。

socket.connection.setup.timeout.max.ms

type: long
Default: 30000 (30 秒)
Importance: medium

客户端等待套接字连接建立的最大时间。当每个连续连接失败时,连接设置超时会达到这个最大值。为避免连接差异,将把一个随机化因值应用于超时,导致计算值高于 20% 的随机范围。

socket.connection.setup.timeout.ms

type: long
Default: 10000 (10 seconds)
Importance: medium

客户端等待套接字连接建立的时间长度。如果在超时时间前没有构建连接,客户端将关闭套接字频道。

ssl.enabled.protocols

type: list
Default: TLSv1.2,TLSv1.3
Importance: medium

为 SSL 连接启用的协议列表。在使用 Java 11 或更新版本时,默认为 'TLSv1.2,TLSv1.3',否则 'TLSv1.2'。使用 Java 11 的默认值,如果客户端和服务器同时支持 TLSv1.3,则客户端和服务器将首选 TLSv1.3,否则将回退到 TLSv1.2(假设两者都至少支持 TLSv1.2)。对于大多数情况,这个默认值应该可以正常工作。另请参阅 ssl.protocol 的配置文档。

ssl.keystore.type

Type: string
Default: JKS
Importance: medium

密钥存储文件的文件格式。这对客户端是可选的。

ssl.protocol

type: string
Default: TLSv1.3
Importance: medium

用于生成 SSLContext 的 SSL 协议。使用 Java 11 或更新版本运行时,默认为 'TLSv1.3',否则 'TLSv1.2'。对于大多数用例,这个值应该可以正常工作。最近的 JVM 中允许的值为 'TLSv1.2' 和 'TLSv1.3'。旧的 JVM 可以支持 'TLS', 'TLSv1.1', 'SSLv2', 'SSLv2' 和 'SSLv3',但由于已知的安全漏洞,不建议使用它们。使用这个配置和 'ssl.enabled.protocols' 的默认值,如果服务器不支持 'TLSv1.3',客户端将降级为 'TLSv1.2'。如果此配置被设置为 'TLSv1.2',客户端将不会使用 'TLSv1.3',即使它是 ssl.enabled.protocols 中的值之一,服务器只支持 'TLSv1.3'。

ssl.provider

Type: string
Default: null
Importance: medium

用于 SSL 连接的安全供应商的名称。默认值是 JVM 的默认安全提供程序。

ssl.truststore.type

Type: string
Default: JKS
Importance: medium

信任存储文件的文件格式。

acks

Type: string
Default: all
Valid Values: [all, -1, 0, 1]
Importance: low

在考虑请求完成前,生成者需要收到的确认数量。这控制发送的记录的持久性。允许以下设置:

  • acks=0 如果设为零,则生成者不会等待来自服务器的任何确认。记录将立即添加到套接字缓冲区中并被视为发送。无法保证服务器已收到记录,重试 配置不会生效(因为客户端通常不知道任何失败)。每个记录给出的偏移始终设置为 -1
  • acks=1 意味着领导人将记录写入其本地日志,但不会等待所有后续人的完全确认。在这种情况下,领导机在确认记录后马上失败,但在后续者复制之前,记录将会丢失。
  • acks=all 意味着领导机将等待一整组同步副本确认记录。这样可保证记录在至少一个同步副本仍然处于活动状态时不会丢失。这是最强的可用保证。这等同于 acks=-1 设置。
enable.idempotence

Type: boolean
Default: true
Importance: low

当设置为 'true' 时,生成者将确保每个消息的确切副本在流中写入。如果 'false',则因代理失败而重试的制作者可能会在流中写入重试消息的副本。请注意,启用 idempotence 需要 max.in.flight.requests.per.connection 小于或等于 5 (为任何允许值保留消息排序),重试 大于 0,ack s 必须是 'all'。如果用户没有明确设置这些值,则会选择合适的值。如果设置了不兼容的值,则会抛出 ConfigException

interceptor.classes

type: list
Default: ""
Valid Values: non-null string
Importance: low

用作拦截器的类列表。通过实施 org.apache.kafka.clients.producer.ProducerInterceptor 接口,您可以在生成者发布到 Kafka 集群前截获(并可能修改)制作者收到的记录。默认情况下,没有拦截器。

max.in.flight.requests.per.connection

type: int
Default: 5
Valid Values: [1,…​]
Importance: low

客户端在阻止前在单个连接上发送的最大未确认请求数。请注意,如果将此配置设置为大于 1,并且 enable.idempotence 被设置为 false,则因为重试失败(例如,如果启用了重试)后消息重新排序的风险(例如,如果启用了重试)。

metadata.max.age.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…​]
Importance: low

即使我们未看到任何分区领导力更改来主动发现任何新的代理或分区,我们才会强制刷新元数据的时间(以毫秒为单位)。

metadata.max.idle.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [5000,…​]
Importance: low

控制制作者将缓存闲置主题的元数据的时长。如果自上次生成了一个主题后经过的时间超过元数据空闲持续时间,则主题的元数据会被忘记,下一次访问将强制进行元数据获取请求。

metric.reporters

type: list
Default: ""
Valid Values: non-null string
Importance: low

用作指标报告器的类列表。实施 org.apache.kafka.common.metrics.MetricsReporter 接口,允许插入将收到新指标创建通知的类。总是包括 JmxReporter 来注册 JMX 统计信息。

metrics.num.samples

type: int
Default: 2
Valid Values: [1,…​]
Importance: low

为计算指标维护的示例数量。

metrics.recording.level

Type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low

指标的最高记录级别。

metrics.sample.window.ms

type: long
Default: 30000 (30 秒)
Valid Values: [0,…​]
Importance: low

计算指标示例的时间窗口。

reconnect.backoff.max.ms

type: long
Default: 1000 (1 second)
Valid Values: [0,…​]
Importance: low

重新连接到重复连接失败的代理时等待的最大时间(以毫秒为单位)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。在计算 backoff 增长后,添加了 20% 的随机 jitter,以避免连接状况。

reconnect.backoff.ms

type: long
Default: 50
Valid Values: [0,…​]
Importance: low

尝试重新连接到给定主机前等待的基本时间。这可避免在紧密循环中重复连接到主机。此 backoff 应用到客户端到代理的所有连接尝试。

retry.backoff.ms

Type: long
Default: 100
Valid Values: [0,…​]
Importance: low

尝试重试对给定主题分区失败的请求前等待的时间。这可避免在某些故障场景中的紧密循环中重复发送请求。

sasl.kerberos.kinit.cmd

type: string
Default: /usr/bin/kinit
Importance: low

Kerberos kinit 命令路径。

sasl.kerberos.min.time.before.relogin

Type: long
Default: 60000
Importance: low

刷新尝试之间的登录线程睡眠时间。

sasl.kerberos.ticket.renew.jitter

type: double
Default: 0.05
Importance: low

添加到续订时间的随机 jitter 的百分比。

sasl.kerberos.ticket.renew.window.factor

Type: double
Default: 0.8
Importance: low

登录线程将处于睡眠状态,直到达到最后一次刷新到票据到期的时间因素前处于睡眠状态,此时将尝试续订票据。

sasl.login.refresh.buffer.seconds

Type: short
Default: 300
Valid Values: [0,…​,3600]
Importance: low

在刷新凭证时凭证过期前的缓冲时间(以秒为单位)。如果刷新情况比缓冲区秒数接近过期,则刷新将移动,以尽可能多地维护缓冲区时间。法律值介于 0 到 3600 (1 小时)之间;如果没有指定值,则使用默认值 300 (5 分钟)。如果这个值和 sasl.login.refresh.min.period.seconds 超过了凭证剩余的生命周期,则忽略这个值。目前仅适用于 OAUTHBEARER。

sasl.login.refresh.min.period.seconds

Type: short
Default: 60
Valid Values: [0,…​,900]
Importance: low

登录刷新线程在刷新凭证前等待的时间(以秒为单位)。法律值介于 0 到 900 (15 分钟)之间;如果没有指定值,则使用默认值 60 (1 分钟)。如果这个值和 sasl.login.refresh.buffer.seconds 都会被忽略,如果它们的总和超过凭证的剩余生命周期。目前仅适用于 OAUTHBEARER。

sasl.login.refresh.window.factor

type: double
Default: 0.8
Valid Values: [0.5,…​,1.0]
Importance: low

登录刷新线程将休眠到与凭证生命周期相关的指定窗口因素,此时将尝试刷新凭证。法律值介于 0.5 (50%)和 1.0 (100%)之间,如果没有指定值,则使用默认值 0.8 (80%)。目前仅适用于 OAUTHBEARER。

sasl.login.refresh.window.jitter

type: double
Default: 0.05
Valid Values: [0.0,…​,0.25]
Importance: low

与凭证的生命周期相关的最大随机 jitter 量添加到登录刷新线程的睡眠时间中。法律值介于 0 到 0.25(25%)之间,如果没有指定值,则使用默认值 0.05(5%)。目前仅适用于 OAUTHBEARER。

security.providers

Type: string
Default: null
Importance: low

每个返回供应商实施安全算法的可配置创建者类列表。这些类应实施 org.apache.kafka.common.security.auth.SecurityProviderCreator 接口。

ssl.cipher.suites

Type: list
Default: null
Importance: low

密码套件列表。这是用来使用 TLS 或 SSL 网络协议协商网络连接的安全设置的身份验证、加密、MAC 和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。

ssl.endpoint.identification.algorithm

Type: string
Default: https
Importance: low

使用服务器证书验证服务器主机名的端点标识算法。

ssl.engine.factory.class

Type: class
Default: null
Importance: low

org.apache.kafka.common.security.auth.SslEngineFactory 类型的类提供 SSLEngine 对象。默认值为 org.apache.kafka.common.security.ssl.DefaultSslEngineFactory。

ssl.keymanager.algorithm

Type: string
Default: SunX509
Importance: low

密钥管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的密钥管理器工厂算法。

ssl.secure.random.implementation

Type: string
Default: null
Importance: low

用于 SSL 加密操作的 SecureRandom PRNG 实施。

ssl.trustmanager.algorithm

Type: string
Default: PKIX
Importance: low

信任管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的信任管理器工厂算法。

transaction.timeout.ms

type: int
Default: 60000 (1 minute)
Importance: low

事务协调器在主动中止持续事务处理前等待事务状态更新的最长时间。如果这个值大于代理中的 transaction.max.timeout.ms 设置,请求将失败,并显示 InvalidTxnTimeoutException 错误。

transactional.id

Type: string
Default: null
Valid Values: non-empty string
Importance: low

用于事务发送的 TransactionalId。这可实现跨越多个制作者会话的可靠性语义,因为它允许客户端保证在启动任何新事务前使用相同的 TransactionalId 事务已完成。如果没有提供 TransactionalId,则生成者将限制为幂等交付。如果配置了 TransactionalId,则代表 enable.idempotence。默认情况下,TransactionId 没有配置,这意味着无法使用事务。请注意,默认情况下,事务需要至少三个代理集群,这是生产环境的建议设置;对于开发,您可以通过调整代理设置 transaction.state.log.replication.factor

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部