附录 C. 使用者配置参数
key.deserializer
type: class
Importance: high
实现
org.apache.kafka.common.serialization.Deserializer
接口的密钥的 deserializer 类。value.deserializer
type: class
Importance: high
实现
org.apache.kafka.common.serialization.Deserializer
接口的值的 deserializer 类。bootstrap.servers
type: list
Default: ""
Valid Values: non-null string
Importance: high
用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用与此处为 bootstrapping 指定的服务器一样的所有服务器,此列表只会影响用于发现整套服务器的初始主机。这个列表应该采用
host1:port1,host2:port2,…
格式。由于这些服务器仅用于初始连接来发现完整的集群成员身份(可能会动态更改),因此该列表不需要包含完整的服务器集合(尽管,如果服务器停机,您可能希望多台服务器)。fetch.min.bytes
type: int
Default: 1
Valid Values: [0,…]
Importance: high
服务器为获取请求返回的最小数据量。如果数据不足,请求将等待大量数据积累,然后再回答请求。1 字节的默认设置意味着,一旦有单个数据可用或获取请求超时等待数据到达时,便会立即回答获取请求。将它设置为大于 1 的内容将导致服务器等待更大数量的数据积累,从而以一些额外的延迟为代价提高服务器吞吐量。
group.id
type: string
Default: null
Importance: high
个唯一字符串,用于标识该消费者所属的消费者组。如果消费者使用
subscribe(topic)
或基于 Kafka 的偏移管理策略使用组管理功能,则需要此属性。heartbeat.interval.ms
type: int
Default: 3000(3 秒)
Importance: high
在使用 Kafka 的组管理设施时,心跳与消费者协调员之间的预计时间。心跳用于确保消费者的会话保持活跃,并促进新消费者加入或离开组时的重新平衡。该值必须小于
session.timeout.ms
,但通常应不高于该值的 1/3。它甚至可以调整,以控制正常重新平衡的预期时间。max.partition.fetch.bytes
type: int
Default: 1048576(1 mebibyte)
Valid Values: [0,…]
Importance: high
服务器将返回的每分区的最大数据量。记录由使用者批量获取。如果获取的第一个非空分区中的第一个记录批处理大于此限制,则仍会返回批处理以确保消费者能够进行进度。代理接受的最大记录批处理大小通过
message.max.bytes
(broker config)或max.message.bytes
(主题配置)定义。请参阅 fetch.max.bytes 以限制消费者请求大小。session.timeout.ms
type: int
Default: 10000(10 秒)
Importance: high
在使用 Kafka 的组管理功能时用于检测客户端故障的超时时间。客户端会定期发送心跳,以向代理表明其存活度。如果代理没有在此会话超时前收到心跳,代理会从组中删除此客户端并启动重新平衡。请注意,该值必须处于允许范围内,如
group.min.session.timeout.ms
和group.max.session.timeout.ms
代理配置中所配置。ssl.key.password
type: password
Default: null
Importance: high
密钥存储文件中私钥的密码,或者在 'ssl.keystore.key 中指定的 PEM 密钥的密码。客户端只有在配置了双向身份验证时才需要此参数。
ssl.keystore.certificate.chain
type: password
Default: null
Importance: high
以 'ssl.keystore.type" 指定的格式的证书链。默认 SSL 引擎工厂仅支持 PEM 格式及 X.509 证书列表。
ssl.keystore.key
type: password
Default: null
Importance: high
以 'ssl.keystore.type" 指定的格式的私钥。默认 SSL 引擎工厂仅支持使用 PKCS#8 密钥的 PEM 格式。如果密钥加密,则必须使用 'ssl.key.password' 指定密钥密码。
ssl.keystore.location
type: string
Default: null
Importance: high
密钥存储文件的位置。这对于客户端而言是可选的,可用于进行客户端的双向身份验证。
ssl.keystore.password
type: password
Default: null
Importance: high
存储密钥存储文件的密码。这对客户端是可选的,并且仅在配置了 'ssl.keystore.location' 时才需要。PEM 格式不支持密钥存储密码。
ssl.truststore.certificates
type: password
Default: null
Importance: high
以 'ssl.truststore.type" 指定的格式的可信证书。默认 SSL 引擎工厂仅支持具有 X.509 证书的 PEM 格式。
ssl.truststore.location
type: string
Default: null
Importance: high
信任存储文件的位置。
ssl.truststore.password
type: password
Default: null
Importance: high
信任存储文件的密码。如果未设置密码,仍然将使用配置的信任存储文件,但完整性检查将被禁用。PEM 格式不支持信任存储密码。
allow.auto.create.topics
type: boolean
Default: true
Importance: medium
在订阅或分配主题时,允许代理上自动创建主题。只有在代理允许使用
auto.create.topics.enable
代理配置时,才会自动创建订阅的主题。当使用超过 0.11.0 的代理时,此配置必须设置为false
。auto.offset.reset
type: string
Default: latest
Valid Values: [latest, early, none]
Importance: medium
当 Kafka 中没有初始偏移,或者当当前偏移在服务器上不再存在时(例如,因为该数据已被删除),应该执行什么操作:
- 最早:自动将偏移重置为最早的偏移
- latest:自动将偏移重置为最新偏移
- none:如果没有找到消费者组以前的偏移,则将异常抛出给消费者
- 任何其他内容:向消费者提出例外。
client.dns.lookup
type: string
Default: use_all_dns_ips
Valid Values: [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only]
Importance: media
控制客户端如何使用 DNS 查找。如果设置为
use_all_dns_ips
,按顺序连接到每个返回的 IP 地址,直到建立成功连接为止。断开连接后使用下一个 IP。所有 IP 都使用一次后,客户端会再次从主机名解析 IP(JVM 和 OS 缓存 DNS 名称查找)。如果设置为resolve_canonical_bootstrap_servers_only
,请将每个 bootstrap 地址解析为规范名称列表。bootstrap 阶段后,其行为与use_all_dns_ips
相同。如果设置为default
(已弃用),请尝试连接到查询返回的第一个 IP 地址,即使查找返回多个 IP 地址。connections.max.idle.ms
type: long
Default: 540000(9 分钟)
Importance: medium
在此配置指定的毫秒数后关闭空闲连接。
default.api.timeout.ms
type: int
Default: 60000(1 minute)
Valid Values: [0,…]
Importance: medium
指定客户端 API 的超时时间(以毫秒为单位)。这个配置被用作没有指定
timeout
参数的所有客户端操作的默认超时时间。enable.auto.commit
type: boolean
Default: true
Importance: medium
如果为 true,消费者的偏移将在后台定期提交。
exclude.internal.topics
type: boolean
Default: true
Importance: medium
是否应该从订阅中排除与订阅模式匹配的内部主题。始终可以明确订阅内部主题。
fetch.max.bytes
type: int
Default: 52428800(50 mebibytes)
Valid Values: [0,…]
Importance: media
服务器为获取请求返回的最大数据量。记录由使用者批量获取,如果获取的第一个非空分区中的第一个记录批处理大于此值,则仍会返回记录批处理以确保消费者能够进行进度。因此,这不是绝对的最大值。代理接受的最大记录批处理大小通过
message.max.bytes
(broker config)或max.message.bytes
(主题配置)定义。请注意,使用者并行执行多次获取。group.instance.id
type: string
Default: null
Importance: medium
由最终用户提供的消费者实例的唯一标识符。仅允许非空字符串。如果设置,使用者将被视为静态成员,这意味着使用者在任何时候都仅允许具有此 ID 的实例。这可与更大的会话超时结合使用,以避免临时不可用(如进程重启)导致的组重新平衡。如果未设置,使用者将作为动态成员加入该组,这是传统行为。
isolation.level
type: string
Default: read_uncomsigned
Valid Values: [read_committed, read_uncomcommit]
Importance: medium
控制如何读取事务编写的消息。如果设置为
read_committed
,sd.poll()将只返回已提交的事务信息。如果设置为read_uncommitted
(默认值),sd.poll()将返回所有消息,即使已中止的事务信息也是如此。非事务性消息将以任一模式无条件返回。消息将始终以偏移顺序返回。因此,在
read_committed
模式中, consumer.poll()只会返回到最后稳定偏移(LSO)的消息,它比第一个打开事务的偏移小。在相关事务完成之前,属于当前事务的消息之后出现的任何消息都将被拒绝。因此,read_committed
用户将无法在飞航交易中读取高水位线。Further, when in `read_committed` the seekToEnd method will return the LSO.
max.poll.interval.ms
type: int
Default: 300000(5 分钟)
Valid Values: [1,…]
Importance: medium
使用使用者组管理时,在调用 poll()之间的最大延迟。这会将一个上限放在消费者在获取更多记录之前可以闲置的时间。如果在此超时过期之前未调用 poll(),则使用者将被视为失败,并且组将重新平衡以将分区重新分配给另一成员。对于使用非空
group.instance.id
且达到这个超时的消费者,不会立即重新分配分区。相反,消费者将停止发送心跳,并在session.timeout.ms
过期后重新分配分区。这反映了已关闭的静态使用者的行为。max.poll.records
type: int
Default: 500
Valid Values: [1,…]
Importance: medium
单个调用中返回到 poll()的最大记录数。请注意,
max.poll.records
不会影响底层获取的行为。消费者将缓存每个获取请求中的记录,并以递增方式从每个轮询返回它们。partition.assignment.strategy
type: list
Default: class org.apache.kafka.clients.consumer.RangeAssignor
Valid Values: non-null string
Importance: media
使用组管理时客户端将用于在使用者实例之间分配分区所有权的受支持分区分配策略的类名称或类类型列表。可用选项有:
-
org.apache.kafka.clients.consumer.RangeAssignor
:默认的分配器,以每个主题为基础工作。 -
org.apache.kafka.clients.consumer.RoundRobinAssignor
:以轮循方式将分区分配给消费者。 -
org.apache.kafka.clients.consumer.StickyAssignor
:保证分配达到最大平衡,同时保留尽可能多的现有分区分配。 org.apache.kafka.clients.consumer.CooperativeStickyAssignor
: 遵循相同的粘滞分配器逻辑,但允许合作重新平衡。实现
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
接口允许您插入自定义分配策略。
-
receive.buffer.bytes
type: int
Default: 65536(64 kibibytes)
Valid Values: [-1,…]
Importance: media
读取数据时使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,将使用 OS 默认值。
request.timeout.ms
type: int
Default: 30000(30 秒)
Valid Values: [0,…]
Importance: medium
配置控制客户端等待请求响应的最大时间。如果在超时前未收到响应,客户端将在必要时重新发送请求,或者在重试结束时失败请求。
sasl.client.callback.handler.class
type: class
Default: null
Importance: medium
实施 AuthenticateCallbackHandler 接口的 SASL 客户端回调处理程序类的完全限定名称。
sasl.jaas.config
type: password
Default: null
Importance: medium
用于 SASL 连接的 JAAS 登录上下文参数,采用 JAAS 配置文件使用的格式。JAAS 配置文件格式 如下所述。该值的格式为:
loginModuleClass controlFlag (optionName=optionValue)*;
。对于代理,配置必须加上监听器前缀和 SASL 机制名称(小写)。例如,listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule 需要;。sasl.kerberos.service.name
type: string
Default: null
Importance: medium
Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的 config 中定义。
sasl.login.callback.handler.class
type: class
Default: null
Importance: medium
实施 AuthenticateCallbackHandler 接口的 SASL 登录回调处理程序类的完全限定名称。对于代理,登录回调处理程序配置必须以小写的监听器前缀和 SASL 机制名称作为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler。
sasl.login.class
type: class
Default: null
Importance: medium
实施登录接口的类的完全限定名称。对于代理,登录配置必须加上监听器前缀和 SASL 机制名称(小写)。For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin.
sasl.mechanism
type: string
Default: GSSAPI
Importance: medium
用于客户端连接的 SASL 机制.这可能是提供安全提供程序的任何机制。GSSAPI 是默认机制。
security.protocol
type: string
Default: PLAINTEXT
Importance: medium
用于与代理通信的协议.有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL.
send.buffer.bytes
type: int
Default: 131072(128 Kbibytes)
Valid Values: [-1,…]
Importance: media
发送数据时使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,将使用 OS 默认值。
socket.connection.setup.timeout.max.ms
type: long
Default: 30000(30 秒)
Importance: medium
客户端在套接字连接建立前等待的最大时间。连接设置超时将为每个连续连接失败的指数级增长,最高为此最大值。为避免连接风暴,超时将应用 0.2 的随机因数,因此在计算值之上的 20% 到 20% 之间的随机范围。
socket.connection.setup.timeout.ms
type: long
Default: 10000(10 秒)
Importance: medium
客户端等待套接字连接建立的时间。如果连接没有在超时前构建,客户端会关闭套接字频道。
ssl.enabled.protocols
type: list
Default: TLSv1.2,TLSv1.3
Importance: medium
为 SSL 连接启用的协议列表。使用 Java 11 或更新版本运行时,默认为 'TLSv1.2,TLSv1.3',否则为 'TLSv1.2'。使用 Java 11 的默认值时,如果支持 TLSv1.3,否则客户端和服务器都支持 TLSv1.3,否则客户端和服务器都将首选 TLSv1.2(假设至少支持 TLSv1.2)。对于大多数情况,此默认值应当适用。另请参阅
ssl.protocol
的配置文档。ssl.keystore.type
type: string
Default: JKS
Importance: medium
密钥存储文件的文件格式。这是客户端的可选选项。
ssl.protocol
type: string
Default: TLSv1.3
Importance: medium
用于生成 SSLContext 的 SSL 协议。使用 Java 11 或更新版本运行时,默认为 'TLSv1.3',否则为 'TLSv1.2'。这个值应该适用于大多数用例。最近 JVM 中的允许值是 'TLSv1.2' 和 'TLSv1.3'。'TLS'、'TLSv1.1'、'SSL'、'SSLv2' 和 'SSLv3' 可能会在旧的 JVM 中受到支持,但会因为已知的安全漏洞而鼓励使用它们。使用这个配置的默认值和 'ssl.enabled.protocols',如果服务器不支持 'TLSv1.3',客户端将降级为 'TLSv1.2'。如果此配置被设置为 'TLSv1.2',客户端将不会使用 'TLSv1.3',即使它是 ssl.enabled.protocols 中的值之一,服务器只支持 'TLSv1.3'。
ssl.provider
type: string
Default: null
Importance: medium
用于 SSL 连接的安全提供商名称。默认值是 JVM 的默认安全提供程序。
ssl.truststore.type
type: string
Default: JKS
Importance: medium
信任存储文件的文件格式。
auto.commit.interval.ms
type: int
Default: 5000(5 秒)
Valid Values: [0,…]
Importance: low
如果
enable.auto.commit
设为true
,则使用者偏移数以毫秒为单位被自动分配给 Kafka。check.crcs
type: boolean
Default: true
Importance: low
自动检查所消耗记录的 CRC32。这样可确保不会对消息发生任何磁盘上或磁盘上损坏。此检查会增加一些开销,因此在寻求极高的性能时可能会禁用它。
client.id
type: string
Default: ""
Importance: low
在发出请求时要传递给服务器的 id 字符串。这样做的目的是通过允许逻辑应用程序名称包含在服务器端请求日志记录中,跟踪请求源,而不仅仅是 ip/port。
client.rack
type: string
Default: ""
Importance: low
此客户端的机架标识符。这可以是指明此客户端实际位置的任何字符串值。它与代理配置 'broker.rack' 对应。
fetch.max.wait.ms
type: int
Default: 500
Valid Values: [0,…]
Importance: low
如果没有足够的数据立即满足 fetch.min.bytes 给出的要求,则服务器在回答获取请求前将停止的最大时间。
interceptor.classes
type: list
Default: ""
Valid Values: non-null string
Importance: low
用作拦截器的类的列表。通过实施
org.apache.kafka.clients.consumer.ConsumerInterceptor
接口,您可以截获使用者收到的(可能为 mutate)记录。默认情况下,没有拦截器。metadata.max.age.ms
type: long
Default: 300000(5 分钟)
Valid Values: [0,…]
Importance: low
在经过这一时间之后,我们强制刷新元数据,即使我们尚未看到任何分区领导变化来主动发现任何新的代理或分区。
metric.reporters
type: list
Default: ""
Valid Values: non-null string
Importance: low
用作指标报告器的类的列表。实施
org.apache.kafka.common.metrics.MetricsReporter
接口允许在类中插入,这些类将通知新指标创建。JmxReporter 始终包含在内,用于注册 JMX 统计数据。metrics.num.samples
type: int
Default: 2
Valid Values: [1,…]
Importance: low
维护到计算指标的示例数量。
metrics.recording.level
type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low
指标的最高记录级别。
metrics.sample.window.ms
type: long
Default: 30000(30 秒)
Valid Values: [0,…]
Importance: low
指标样本计算完的时间窗口。
reconnect.backoff.max.ms
type: long
Default: 1000(1 second)
Valid Values: [0,…]
Importance: low
重新连接到一个代理时需要等待的时间上限(以毫秒为单位),且该代理重复无法连接。如果提供,每个主机的 backoff 都会为每个连续连接失败呈指数级增长,最高可达此最大值。计算 backoff 后,添加 20% 的随机 jitter,以避免连接风暴。
reconnect.backoff.ms
type: long
Default: 50
Valid Values: [0,…]
Importance: low
尝试重新连接到给定主机前等待的基本时间。这可避免在紧凑循环中重复连接主机。这个后端适用于客户端对代理的所有连接尝试。
retry.backoff.ms
type: long
Default: 100
Valid Values: [0,…]
Importance: low
尝试将失败的请求重试到给定主题分区前等待的时间。这可避免在某些故障情况下在紧凑循环中重复发送请求。
sasl.kerberos.kinit.cmd
type: string
Default: /usr/bin/kinit
Importance: low
Kerberos kinit 命令路径.
sasl.kerberos.min.time.before.relogin
type: long
Default: 60000
Importance: low
刷新尝试之间的登录线程睡眠时间.
sasl.kerberos.ticket.renew.jitter
type: double
Default: 0.05
Importance: low
添加到续订时间的随机 jitter 百分比.
sasl.kerberos.ticket.renew.window.factor
type: double
Default: 0.8
Importance: low
登录线程将休眠,直到达到上次刷新到票据过期时间的指定窗口因素,此时将尝试续订票据。
sasl.login.refresh.buffer.seconds
type: short
Default: 300
Valid Values: [0,…,3600]
Importance: low
刷新凭证时凭据过期前的缓冲时间(以秒为单位)。否则,如果刷新的时间比缓冲区秒数接近到期,则将移动刷新以尽可能保留缓冲区时间。法律值介于 0 到 3600(1 小时)之间;如果没有指定值,则使用默认值 300(5 分钟)。如果总和超过凭证剩余生命周期,则这个值和 sasl.login.refresh.min.period.seconds 都会被忽略。目前仅适用于 OAUTHBEARER。
sasl.login.refresh.min.period.seconds
type: short
Default: 60
Valid Values: [0,…,900]
Importance: low
登录刷新线程在刷新凭据前所需的最短时间(以秒为单位)。法律值介于 0 到 900(15 分钟)之间;如果没有指定值,则使用默认值 60(1 分钟)。如果值和 sasl.login.refresh.buffer.seconds 的剩余生命周期超过其剩余生命周期,则该值和 sasl.login.refresh.buffer.seconds 都会被忽略。目前仅适用于 OAUTHBEARER。
sasl.login.refresh.window.factor
type: double
Default: 0.8
Valid Values: [0.5,…,1.0]
Importance: low
登录刷新线程将休眠,直到达到相对于凭证生命周期的指定窗口因子,此时将尝试刷新凭证。法律值介于 0.5(50%)和 1.0(100%)之间;如果没有指定值,则使用默认值 0.8(80%)。目前仅适用于 OAUTHBEARER。
sasl.login.refresh.window.jitter
type: double
Default: 0.05
Valid Values: [0.0,…,0.25]
Importance: low
与凭证生命周期相关的最大随机 jitter 数量,添加到登录刷新线程的睡眠时间。法律值介于 0 到 0.25(25%)之间;如果没有指定值,则使用默认值 0.05(5%)。目前仅适用于 OAUTHBEARER。
security.providers
type: string
Default: null
Importance: low
可配置创建者列表每个返回实施安全算法的供应商。这些类应该实施
org.apache.kafka.common.security.auth.SecurityProviderCreator
接口。ssl.cipher.suites
type: list
Default: null
Importance: low
密码套件列表。这是身份验证、加密、MAC 和密钥交换算法的命名组合,用于使用 TLS 或 SSL 网络协议协商网络连接的安全设置。默认情况下,支持所有可用的密码套件。
ssl.endpoint.identification.algorithm
type: string
Default: https
Importance: low
使用服务器证书验证服务器主机名的端点识别算法。
ssl.engine.factory.class
type: class
Default: null
Importance: low
kind org.apache.kafka.common.security.auth.SslEngineFactory 的类型,以提供 SSLEngine 对象。Default value is org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.
ssl.keymanager.algorithm
type: string
Default: SunX509
Importance: low
密钥管理器工厂用于 SSL 连接的算法。默认值为为 Java 虚拟机配置的关键管理器工厂算法。
ssl.secure.random.implementation
type: string
Default: null
Importance: low
用于 SSL 加密操作的 SecureRandom PRNG 实施。
ssl.trustmanager.algorithm
type: string
Default: PKIX
Importance: low
信任管理器用于 SSL 连接的算法。默认值为为 Java 虚拟机配置的信任管理器工厂算法。