第 7 章 Kafka Streams 配置属性
application.id
type: string
Importance: high
流处理应用的标识符。在 Kafka 集群中必须是唯一的。它被用作默认 client-id 前缀,即 2)用于成员资格管理的 group-id,3) changelog 主题前缀。
bootstrap.servers
type: list
Importance: high
用于建立到 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器与此处为引导指定的服务器无关 - 此列表仅影响用于发现完整服务器集的初始主机。此列表的格式应为
host1:port1,host2:port2,…
。由于这些服务器仅用于初始连接来发现完整的群集成员身份(可能会动态更改),因此此列表不需要包含整组服务器(但是如果服务器停机,您可能需要多个服务器)。num.standby.replicas
Type: int
Default: 0
Importance: high
每个任务的待机副本数。
state.dir
type: string
Default: /tmp/kafka-streams
Importance: high
状态存储的目录位置。这个路径对于共享同一底层文件系统的每个流实例都必须是唯一的。
acceptable.recovery.lag
type: long
Default: 10000
Valid Values: [0,…]
Importance: medium
可接受的最大滞后(要捕获的偏移数),客户端被视为足以接收活跃的任务分配。在分配后,它仍然会在处理前恢复更改日志的其余部分。为了避免在重新平衡过程中暂停处理,这个配置应该在一个给定工作负载的一分钟内与恢复时间相对应。必须至少为 0。
cache.max.bytes.buffering
type: long
Default: 10485760
Valid Values: [0,…]
Importance: medium
在所有线程之间用于缓冲的最大内存字节数。
client.id
Type: string
Default: ""
Importance: medium
用于内部消费者、生成者和 restore-consumer 的客户端 ID 的 ID 前缀字符串,特征为 <
client.id>-StreamThread-<threadSequenceNumber$gt;-<consumer|producer|restore-consumer
>。default.deserialization.exception.handler
type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium
实现
org.apache.kafka.streams.errors.DeserializationExceptionHandler
接口的异常处理类。default.key.serde
Type: class
Default: null
Importance: medium
用于实现
org.apache.kafka.common.serialization.Serde
接口的密钥的默认序列化器/反序列化器类。请注意,当使用窗口的 serde 类时,还需要设置实现org.apache.kafka.common.serialization.Serde
接口(通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner')的 inner serde 类。default.list.key.serde.inner
Type: class
Default: null
Importance: medium
默认内部类是实施
org.apache.kafka.common.serialization.Serde
接口的关键。只有在default.key.serde
配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde
时,才会读取此配置。default.list.key.serde.type
Type: class
Default: null
Importance: medium
用于实施
java.util.List
接口的密钥的默认类。当使用列表 serde 类时,只有default.key.serde
配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde
s.ListSerde,才会读取此配置,它需要设置实现org.apache.kafka.common.serialization.Serde
接口的 inner serde 类。default.list.value.serde.inner
Type: class
Default: null
Importance: medium
默认内部类是实施
org.apache.kafka.common.serialization.Serde
接口的值。只有在default.value.serde
配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde
时,才会读取此配置。default.list.value.serde.type
Type: class
Default: null
Importance: medium
用于实现
java.util.List
接口的值的默认类。只有在default.value.serde
配置且只有这个设置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde
是才会从这个配置中读取。请注意,当使用 list serde 类时,一个需要设置 inner serde 类,它实现了org.apache.kafka.common.serialization.Serde
接口(通过 'default.list.value.serde.inner')。default.production.exception.handler
type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium
实现
org.apache.kafka.streams.errors.ProductionExceptionHandler
接口的异常处理类。default.timestamp.extractor
type: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium
实现
org.apache.kafka.streams.processor.TimestampExtractor
接口的默认时间戳提取器类。default.value.serde
Type: class
Default: null
Importance: medium
默认序列化器/反序列化器类用于实现
org.apache.kafka.common.serialization.Serde
接口的值。请注意,当使用窗口的 serde 类时,还需要设置实现org.apache.kafka.common.serialization.Serde
接口(通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner')的 inner serde 类。max.task.idle.ms
Type: long
Default: 0
Importance: medium
此配置控制加入和合并是否可能会产生超出顺序的结果。当流任务完全在某些(但不全部)输入分区上等待生成者发送额外记录并避免在多个输入流间处理时,配置值是流任务的最大时间(以毫秒为单位)。默认(零)不会等待生成者发送更多记录,但它会等待代理上已存在的数据。此默认值意味着对于已在代理上存在的记录,流将以时间戳顺序处理它们。设置为 -1 可完全禁用闲置并完全处理任何本地可用数据,即使这样做可能会生成没有顺序处理。
max.warmup.replicas
type: int
Default: 2
Valid Values: [1,…]
Importance: medium
一次可以一次分配的 warmup 副本的最大数量(除配置的 num.standbys 之外),保持任务在一个实例上可用,同时将其重新分配给另一个实例。用于节流额外的代理流量和集群状态可用于高可用性。必须至少为 1.Note,一个 warmup 副本对应于一个流任务。另外,请注意,每个温副本只能在重新平衡期间提升到活跃任务(通常在所谓的探测重新平衡过程中,这会在
probing.rebalance.interval.ms
配置指定的频率时发生)。这意味着,活跃任务可以从一个 Kafka Streams 实例迁移到另一个实例的最大速率由(max.warmup.replicas
/probing.rebalance.interval.ms
)决定。num.stream.threads
Type: int
Default: 1
Importance: medium
执行流处理的线程数量。
processing.guarantee
Type: string
Default: at_least_once
Valid Values: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
Importance: medium
处理保证应使用。可能的值有
at_least_once
(默认)和exactly_once_v2
(需要代理版本 2.5 或更高版本)。弃用的选项为exactly_once
(需要代理版本 0.11.0 或更高版本)和exactly_once_beta
(需要代理版本 2.5 或更高版本)。请注意,完全处理需要至少三个代理的集群,这是生产的建议设置;通过调整代理设置transaction.state.log.replication.factor
和transaction.state.log.min.isr
。rack.aware.assignment.non_overlap_cost
type: int
Default: null
Importance: medium
与从现有分配移动任务相关的成本。这个 config 和
rack.aware.assignment.assignment.traffic_cost
控制优化算法是否首选最小化跨机架流量,或最小化现有分配中的任务移动。如果设置较大的值org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor
将优化以维护现有分配。默认值为 null,这意味着它将在不同的分配器中使用默认的 non_overlap 成本值。rack.aware.assignment.strategy
Type: string
Default: none
Valid Values: [none, min_traffic]
Importance: medium
我们用于机架感知分配的策略。在分配任务以最小化跨机架流量时,机架感知分配将把
client.rack
和racks
纳入考量。有效设置是 :
none
(默认),它将禁用机架感知分配;min_traffic
,它将计算最小跨机架流量分配。rack.aware.assignment.tags
type: list
Default: ""
Valid Values: List contains maximum 5 elements
Importance: medium
用于在 Kafka Streams 实例之间分发备用副本的客户端标签密钥列表。配置后,Kafka Streams 将有一个 best-effort 来在每个客户端标签维度上分发待机任务。
rack.aware.assignment.traffic_cost
type: int
Default: null
Importance: medium
与跨机架流量相关的成本。此 config 和
rack.aware.assignment.non_overlap_cost
控制优化算法是否首选最小化跨机架流量,或最小化现有分配中的任务移动。如果设置较大的值org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor
将优化以最小化跨机架流量。默认值为 null,这意味着它将在不同的分配器中使用默认流量成本值。replication.factor
type: int
Default: -1
Importance: medium
更改由流处理应用程序创建的日志主题和重新分区主题的复制因素。默认值
-1
(主题:使用代理默认复制因素)需要代理版本 2.4 或更新版本。security.protocol
Type: string
Default: PLAINTEXT
Valid Values: (case insensitive)[SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT]
Importance: medium
用于与代理通信的协议。有效值为: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。
statestore.cache.max.bytes
type: long
Default: 10485760 (10 mebibytes)
Valid Values: [0,…]
Importance: medium
在所有线程中用于 statestore 缓存的最大内存字节数。
task.timeout.ms
Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…]
Importance: medium
任务因为内部错误而停止的最大时间(以毫秒为单位),并重试直到引发错误为止。对于超时为 0ms,任务会引发第一个内部错误的错误。对于大于 0ms 的任何超时,任务将在引发错误前至少重试一次。
topology.optimization
Type: string
Default: none
Valid Values: org.apache.kafka.streams.StreamsConfig$$Lambda/0x00007fe89800a400@31cefde0
Importance: medium
如果应该优化拓扑以及要应用的优化,则告知 Kafka Streams 的配置。可接受值为:"NO_OPTIMIZATION", "OPTIMIZE", "OPTIMIZE",或以逗号分隔的特定优化列表:"REUSE_KTABLE_SOURCE_TOPICS", "MERGE_REPARTITION_TOPICS" + "SINGLE_STORE_SELF_JOIN+")."NO_OPTIMIZATION", "MERGE_REPARTITION_TOPATION"
application.server
Type: string
Default: ""
Importance: low
host:port 对指向用户定义的端点,可用于此 KafkaStreams 实例上的状态存储发现和交互式查询。
auto.include.jmx.reporter
Type: boolean
Default: true
Importance: low
已弃用。是否自动包含 JmxReporter,即使它在
metric.reporters
中未列出。此配置将在 Kafka 4.0 中删除,用户应在metric.reporters
中包含org.apache.kafka.common.metrics.JmxReporter
,以启用 JmxReporter。buffered.records.per.partition
type: int
Default: 1000
Importance: low
每个分区要缓冲的最大记录数。
built.in.metrics.version
Type: string
Default: latest
Valid Values: [latest]
Importance: low
要使用的内置指标的版本。
commit.interval.ms
type: long
Default: 30000 (30 秒)
Valid Values: [0,…]
Importance: low
提交处理进度的频率(以毫秒为单位)。对于 at-least-once 处理,提交方法可以保存处理器的位置(如偏移)。对于精确处理,需要提交包含保存位置的事务,并使输出主题中的提交数据对具有隔离级别 read_committed 的用户可见。(请注意,如果
processing.guarantee
设置为exactly_once_v2
,exactly_once
,则默认值为100
,否则默认值为30000
。connections.max.idle.ms
Type: long
Default: 540000 (9 minutes)
Importance: low
在这个配置指定的毫秒数后关闭闲置连接。
default.client.supplier
Type: class
Default: org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
Importance: low
实现
org.apache.kafka.streams.KafkaClientSupplier
接口的客户端供应商类。default.dsl.store
type: string
Default: rocksDB
Valid Values: [rocksDB, in_memory]
Importance: low
DSL 运算符使用的默认状态存储类型。
metadata.max.age.ms
Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…]
Importance: low
即使我们未看到任何分区领导力更改来主动发现任何新的代理或分区,我们才会强制刷新元数据的时间(以毫秒为单位)。
metric.reporters
Type: list
Default: ""
Importance: low
用作指标报告器的类列表。实施
org.apache.kafka.common.metrics.MetricsReporter
接口,允许插入将收到新指标创建通知的类。总是包括 JmxReporter 来注册 JMX 统计信息。metrics.num.samples
type: int
Default: 2
Valid Values: [1,…]
Importance: low
为计算指标维护的示例数量。
metrics.recording.level
Type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low
指标的最高记录级别。
metrics.sample.window.ms
type: long
Default: 30000 (30 秒)
Valid Values: [0,…]
Importance: low
计算指标示例的时间窗口。
poll.ms
Type: long
Default: 100
Importance: low
阻止等待输入的时间(以毫秒为单位)。
probing.rebalance.interval.ms
Type: long
Default: 600000 (10 minutes)
Valid Values: [60000,…]
Importance: low
触发重新平衡以探测到探测到回收完成并准备好激活的温副本前等待的时间(以毫秒为单位)。在分配平衡前,将继续触发重新平衡。必须至少为 1 分钟。
receive.buffer.bytes
type: int
Default: 32768 (32 kibibytes)
Valid Values: [-1,…]
Importance: low
读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,则使用操作系统默认值。
reconnect.backoff.max.ms
type: long
Default: 1000 (1 second)
Valid Values: [0,…]
Importance: low
重新连接到重复连接失败的代理时等待的最大时间(以毫秒为单位)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。在计算 backoff 增长后,添加了 20% 的随机 jitter,以避免连接状况。
reconnect.backoff.ms
type: long
Default: 50
Valid Values: [0,…]
Importance: low
尝试重新连接到给定主机前等待的基本时间。这可避免在紧密循环中重复连接到主机。此 backoff 应用到客户端到代理的所有连接尝试。
repartition.purge.interval.ms
type: long
Default: 30000 (30 秒)
Valid Values: [0,…]
Importance: low
从重新分区主题中删除完全消耗的记录的频率(以毫秒为单位)。清除将在最后一次清除以来至少在这个值后进行,但可能会延迟直到之后为止。(请注意,与
commit.interval.ms
不同,当处理时,这个值的默认值保持不变。guarantee
被设置为exactly_once_v2
。request.timeout.ms
type: int
Default: 40000 (40 seconds)
Valid Values: [0,…]
Importance: low
配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在需要时重新发送请求(如果重试用时失败)。
retries
type: int
Default: 0
Valid Values: [0,…,2147483647]
Importance: low
设置大于零的值将导致客户端重新发送失败并可能出现临时错误的请求。建议将值设为 0 或
MAX_VALUE
,并使用对应的超时参数来控制客户端应重试请求的时长。retry.backoff.ms
Type: long
Default: 100
Valid Values: [0,…]
Importance: low
尝试重试对给定主题分区失败的请求前等待的时间。这可避免在某些故障场景中的紧密循环中重复发送请求。
rocksdb.config.setter
Type: class
Default: null
Importance: low
实现
org.apache.kafka.streams.state.RocksDBConfigSetter
接口的 Rocks DB 配置 setter 类或类名称。send.buffer.bytes
type: int
Default: 131072 (128 kibibytes)
Valid Values: [-1,…]
Importance: low
发送数据时要使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,则使用操作系统默认值。
state.cleanup.delay.ms
Type: long
Default: 600000 (10 minutes)
Importance: low
分区迁移时,在删除状态前等待的时间(以毫秒为单位)。只有尚未修改至少
状态的目录。cleanup.delay.ms
将被删除。upgrade.from
Type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4]
Importance: low
允许以向后兼容的方式进行升级。从 [0.10.0.0, 1.1] 升级到 2.0+,或者从 [2.0, 2.3] 升级到 2.4+ 时需要。当从 3.3 升级到更新的版本时,不需要指定此配置。默认为
null
。接受的值为 "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.1", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "3.0", "3.1", "3.2", "3.3", "3.4", "3.4"window.size.ms
Type: long
Default: null
Importance: low
为反序列化器设置窗口大小,以计算窗口结束时间。
windowed.inner.class.serde
Type: string
Default: null
Importance: low
窗口记录的内类的默认序列化器/反序列化器。必须实施
org.apache.kafka.common.serialization.Serde
接口。请注意,在 KafkaStreams 应用程序中设置此配置会导致错误,因为它只用于从 Plain consumer 客户端中使用。windowstore.changelog.additional.retention.ms
type: long
Default: 86400000 (1 day)
Importance: low
向窗口添加 maintainMs,以确保不会预先从日志中删除数据。允许时钟偏移。默认值为 1 天。