第 7 章 Kafka Streams 配置属性


application.id

type: string
Importance: high

流处理应用的标识符。在 Kafka 集群中必须是唯一的。它被用作 1,默认的 client-id 前缀 2)用于成员资格管理(3)的 group-id 主题前缀。

bootstrap.servers

type: list
Importance: high

用于建立到 Kafka 集群的初始连接的主机/端口对列表。客户端将使用此处为 bootstrap 指定哪些服务器的所有服务器 - 此列表仅影响用于发现完整服务器的初始主机。此列表的格式应为 host1:port1,host2:port2,…​。由于这些服务器仅用于发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整的服务器集合(但是,在服务器停机时可能需要多个服务器)。

num.standby.replicas

Type: int
Default: 0
Importance: high

每个任务的待机副本数。

state.dir

type: string
Default: /tmp/kafka-streams
Importance: high

状态存储的目录位置。这个路径对于共享相同的底层文件系统的流实例都必须是唯一的。

acceptable.recovery.lag

Type: long
Default: 10000
Valid Values: [0,…​]
Importance: medium

要捕获客户端的最大可接受滞后(要捕获的偏移数)以想被耗尽,以接收活动任务分配。分配后,它仍然会在处理前恢复更改的其余部分。为了避免在重新平衡过程中暂停处理,此配置应该在给定工作负载的一分钟下与恢复时间相对应的恢复时间。必须至少为 0。

cache.max.bytes.buffering

Type: long
Default: 10485760
Valid Values: [0,…​]
Importance: medium

用于在所有线程间缓冲的最大内存字节数。

client.id

type: string
Default: ""
Importance: medium

用于内部消费者、生成者和恢复消费者的客户端 ID 的 ID 前缀字符串,其模式为 < client.id>-StreamThread-<threadSequenceNumber$gt;-<consumer|producer|restore-consumer >。

default.deserialization.exception.handler

type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium

实现 org.apache.kafka.streams.errors.DeserializationExceptionHandler 接口的异常处理类。

default.key.serde

Type: class
Default: null
Importance: medium

用于实现 org.apache.kafka.common.serialization.Serde 接口的密钥的默认序列化器 / deserializer 类。请注意,当使用窗口的 serde 类时,需要一个设置 inner serde 类,该类也通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实现 org.apache.kafka.common.serialization.Serde 接口。

default.list.key.serde.inner

Type: class
Default: null
Importance: medium

默认内部类是实施 org.apache.kafka.common.serialization.Serde 接口的关键。只有在 default.key.serde 配置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde 时才会读取此配置。

default.list.key.serde.type

Type: class
Default: null
Importance: medium

用于实施 java.util.List 接口的密钥的默认类。只有在 default.key.serde 配置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde s.ListSerde 时,才会读取此配置,当使用 list serde 类时,您需要设置 inner serde 类,该类通过 'default.list.key.serde.inner' 实现 org.apache.kafka.common.serialization.Serde 接口。

default.list.value.serde.inner

Type: class
Default: null
Importance: medium

默认内部类是实施 org.apache.kafka.common.serialization.Serde 接口的值。只有在 default.value.serde 配置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde 时才会读取此配置。

default.list.value.serde.type

Type: class
Default: null
Importance: medium

用于实现 java.util.List 接口的值的默认类。只有在 default.value.serde 配置且只有这个设置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde 是才会从这个配置中读取。请注意,当使用 list serde 类时,一个需要设置 inner serde 类,它实现了 org.apache.kafka.common.serialization.Serde 接口(通过 'default.list.value.serde.inner')。

default.production.exception.handler

type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium

实现 org.apache.kafka.streams.errors.ProductionExceptionHandler 接口的异常处理类。

default.timestamp.extractor

type: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium

实现 org.apache.kafka.streams.processor.TimestampExtractor 接口的默认时间戳提取程序类。

default.value.serde

Type: class
Default: null
Importance: medium

默认序列化器 / deserializer 类用于实现 org.apache.kafka.common.serialization.Serde 接口的值。请注意,当使用窗口的 serde 类时,需要一个设置 inner serde 类,该类也通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实现 org.apache.kafka.common.serialization.Serde 接口。

max.task.idle.ms

Type: long
Default: 0
Importance: medium

此配置控制加入和合并可能会产生不是顺序的结果。config 值是当一个流完全处理某些(而非全部)输入分区等待制作者发送额外记录时,流任务将保持闲置的最长时间(毫秒)。默认(零)不会等待制作者发送更多记录,但它会等待代理上已存在的数据。此默认意味着对于代理中已存在的记录,流将按时间戳顺序处理它们。设置为 -1 可完全禁用闲置,并处理任何本地可用的数据,即使这样做可能会产生一定的顺序处理。

max.warmup.replicas

type: int
Default: 2
Valid Values: [1,…​]
Importance: medium

最多温副本数(超过配置的 num.standbys 之外的额外待机),这些副本数可以一次性分配,以便在一个实例上保持任务,同时它在其他实例上被重新分配。用于节流可用于高可用性的额外代理流量和集群状态。必须至少为 1.Note,一个 warmup 副本对应于一个流任务。另外,请注意,每个温副本只能在重新平衡期间提升到活跃任务(通常在所谓的探测重新平衡过程中,这会在 probing.rebalance.interval.ms 配置指定的频率时发生)。这意味着,活跃任务可以从一个 Kafka Streams 实例迁移到另一个实例的最大速率由(max.warmup.replicas / probing.rebalance.interval.ms)决定。

num.stream.threads

Type: int
Default: 1
Importance: medium

执行流处理的线程数量。

processing.guarantee

type: string
Default: at_least_once
Valid Values: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
Importance: medium

处理保证应使用。可能的值有 at_least_once (默认)和 exactly_once_v2 (需要代理版本 2.5 或更高版本)。弃用的选项为 exactly_once (需要代理版本 0.11.0 或更高版本)和 exactly_once_beta (需要代理版本 2.5 或更高版本)。请注意,完全处理需要至少三个代理的集群,这是生产的建议设置;通过调整代理设置 transaction.state.log.replication.factortransaction.state.log.min.isr

rack.aware.assignment.tags

type: list
Default: ""
Valid Values: List contains maximum of 5 element
Importance: medium

用于在 Kafka Streams 实例之间分发备用副本的客户端标签键列表。配置后,Kafka Streams 将最好地将待机任务分发到每个客户端标签维度上。

replication.factor

type: int
Default: -\":\"Importance: medium

更改日志主题和重新分区流处理应用程序创建的复制因素。默认值 -1 (这代表:使用代理默认复制因素)需要代理版本 2.4 或更高版本。

security.protocol

Type: string
Default: PLAINTEXT
Valid Values: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]
Importance: medium

用于与代理通信的协议。有效值为: PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。

statestore.cache.max.bytes

Type: long
Default: 10485760 (10 mebibytes)
Valid Values: [0,…​]
Importance: medium

在所有线程中用于 statestore 缓存的最大内存字节数。

task.timeout.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…​]
Importance: medium

因内部错误并重试直到引发错误前,任务可能会停滞的最长时间(以毫秒为单位)。如果超时为 0ms,任务会引发第一个内部错误的错误。对于大于 0ms 的超时时间,任务将在引发错误前至少重试一次。

topology.optimization

Type: string
Default: none
Valid Values: org.apache.kafka.streams.StreamsConfig$$Lambda$27/0x0000000840079840@6ed3ef1
Importance: medium

配置会告知 Kafka Streams (如果应该优化拓扑)以及要应用的优化。可接受的值有:"NO_OPTIMIZATION"、"OPTIMIZE"或以逗号分隔的特定优化列表:("REUSE_KTABLE_SOURCE_TOPICS", "MERGE_REPARTITION_TOPICS" + "SINGLE_STORE_SELF_JOIN+")."NO_OPTIMIZATION"。

application.server

type: string
Default: ""
Importance: low

host:port 对指向用户定义的端点,可用于在此 KafkaStreams 实例上状态存储发现和交互式查询。

auto.include.jmx.reporter

Type: boolean
Default: true
Importance: low

已弃用。即使没有在 metric.reporters 中列出,也自动包含 JmxReporter。此配置将在 Kafka 4.0 中删除,用户应改为将 org.apache.kafka.common.metrics.JmxReporter 包含在 metric.reporters 中,以便启用 JmxReporter。

buffered.records.per.partition

Type: int
Default: 1000
Importance: low

每个分区缓冲区的最大记录数。

built.in.metrics.version

type: string
Default: latest
Valid Values: [latest]
Importance: low

要使用的内置指标版本。

commit.interval.ms

Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…​]
Importance: low

提交处理进度的频率(毫秒)。对于 at-least-once 处理,提交意味着保存处理器的位置(如偏移)。对于完全处理,这意味着提交包含保存位置的事务,并在输出中提交的数据对具有隔离级别 read_committed 的用户可见。(请注意,如果 processing.guarantee 设置为 exactly_once_v2, exactly_once,则默认值为 100,否则默认值为 30000

connections.max.idle.ms

Type: long
Default: 540000 (9 分钟)
Importance: low

在此配置指定的毫秒数后关闭闲置连接。

default.client.supplier

Type: class
Default: org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
Importance: low

实现 org.apache.kafka.streams.KafkaClientSupplier 接口的客户端供应商类。

default.dsl.store

type: string
Default: rocksDB
Valid Values: [rocksDB, in_memory]
Importance: low

DSL operator 使用的默认状态存储类型。

metadata.max.age.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…​]
Importance: low

我们强制刷新元数据的时间(以毫秒为单位),即使我们没有看到任何分区领导更改来主动发现任何新的代理或分区。

metric.reporters

type: list
Default: ""
Importance: low

用作指标报告器的类列表。实施 org.apache.kafka.common.metrics.MetricsReporter 接口允许插入将在创建新指标创建通知的类中。JmxReporter 始终被包含以注册 JMX 统计信息。

metrics.num.samples

type: int
Default: 2
Valid Values: [1,…​]
Importance: low

为计算指标维护的示例数量。

metrics.recording.level

type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low

指标的最大记录级别。

metrics.sample.window.ms

Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…​]
Importance: low

计算指标示例的时间窗口。

poll.ms

Type: long
Default: 100
Importance: low

阻止等待输入的时间(毫秒)。

probing.rebalance.interval.ms

Type: long
Default: 600000 (10 minutes)
Valid Values: [60000,…​]
Importance: low

在触发重新平衡前等待的最长时间(毫秒)以探测到已完成温并准备好激活的温副本。探测重新平衡将继续触发,直到分配平衡为止。必须至少为 1 分钟。

receive.buffer.bytes

type: int
Default: 32768 (32 kibibytes)
Valid Values: [-1,…​]
Importance: low

读取数据时使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,则使用 OS 默认。

reconnect.backoff.max.ms

Type: long
Default: 1000 (1 second)
Valid Values: [0,…​]
Importance: low

当重新连接到重复无法连接的代理时,等待的最大时间(毫秒)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。计算 backoff 后,会添加 20% 随机 jitter 以避免连接停滞。

reconnect.backoff.ms

Type: long
Default: 50
Valid Values: [0,…​]
Importance: low

尝试重新连接到给定主机前等待的时间。这可避免在严格的循环中重复连接到主机。此 backoff 适用于客户端到代理的所有连接尝试。

repartition.purge.interval.ms

Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…​]
Importance: low

从重新分区主题中删除完全消耗的记录的频率(毫秒)。清除将在自上次清除后至少有这个值后进行,但可能会延迟直到稍后为止。(注意,与 commit.interval.ms 不同,在 processing.guarantee 被设置为 exactly_once_v2时,这个值的默认值不会改变。

request.timeout.ms

type: int
Default: 40000 (40 seconds)
Valid Values: [0,…​]
Importance: low

配置控制客户端等待请求响应的最长时间。如果在超时超时前未收到响应,如果需要,或者如果重试耗尽,则请求将重新发送。

retries

type: int
Default: 0
Valid Values: [0,…​,2147483647]
Importance: low

设置大于零的值将导致客户端重新发送任何失败的请求,并显示潜在的临时错误。建议将值设为零或 MAX_VALUE,并使用对应的超时参数来控制客户端应重试请求的时长。

retry.backoff.ms

Type: long
Default: 100
Valid Values: [0,…​]
Importance: low

尝试重试失败的请求到给定主题分区前等待的时间。这可避免在某些故障场景中重复在严格的循环中发送请求。

rocksdb.config.setter

Type: class
Default: null
Importance: low

实现 org.apache.kafka.streams.state.RocksDBConfigSetter 接口的 Rocks DB config setter 类或类名称。

send.buffer.bytes

type: int
Default: 131072 (128 kibibytes)
Valid Values: [-1,…​]
Importance: low

发送数据时使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,则使用 OS 默认。

state.cleanup.delay.ms

Type: long
Default: 600000 (10 分钟)
Importance: low

当分区迁移时,在删除状态前等待的时间(毫秒)。只有尚未修改至少 state.cleanup.delay.ms 的状态目录才会被删除。

upgrade.from

Type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4]
Importance: low

允许以向后兼容的方式进行升级。从 [0.10.0.0, 1.1] 升级到 2.0+,或者从 [2.0, 2.3] 升级到 2.4+ 时需要。当从 3.3 升级到更新的版本时,不需要指定此配置。默认为 null。接受的值为 "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.1", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "3.0", "3.1", "3.2", "3.3", "3.4", "3.4"

window.size.ms

Type: long
Default: null
Importance: low

为反序列化器设置窗口大小,以计算窗口结束时间。

windowed.inner.class.serde

Type: string
Default: null
Importance: low

默认序列化器 / deserializer 用于窗口记录的内部类。必须实施 org.apache.kafka.common.serialization.Serde 接口。请注意,在 KafkaStreams 应用程序中设置此配置会导致错误,因为它只从 Plain consumer 客户端使用。

windowstore.changelog.additional.retention.ms

Type: long
Default: 86400000 (1 day)
Importance: low

添加到窗口 MaintenanceMs 中,以确保不会预先从日志中删除数据。允许时钟偏移。默认为 1 天。

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部