第 7 章 Kafka Streams 配置属性
application.idtype: string
Importance: high
流处理应用的标识符。在 Kafka 集群中必须是唯一的。它被用作 1,默认的 client-id 前缀 2)用于成员资格管理(3)的 group-id 主题前缀。
bootstrap.serverstype: list
Importance: high
用于建立到 Kafka 集群的初始连接的主机/端口对列表。客户端将使用此处为 bootstrap 指定哪些服务器的所有服务器 - 此列表仅影响用于发现完整服务器的初始主机。此列表的格式应为
host1:port1,host2:port2,…。由于这些服务器仅用于发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整的服务器集合(但是,在服务器停机时可能需要多个服务器)。num.standby.replicasType: int
Default: 0
Importance: high
每个任务的待机副本数。
state.dirtype: string
Default: /tmp/kafka-streams
Importance: high
状态存储的目录位置。这个路径对于共享相同的底层文件系统的流实例都必须是唯一的。
acceptable.recovery.lagType: long
Default: 10000
Valid Values: [0,…]
Importance: medium
要捕获客户端的最大可接受滞后(要捕获的偏移数)以想被耗尽,以接收活动任务分配。分配后,它仍然会在处理前恢复更改的其余部分。为了避免在重新平衡过程中暂停处理,此配置应该在给定工作负载的一分钟下与恢复时间相对应的恢复时间。必须至少为 0。
cache.max.bytes.bufferingType: long
Default: 10485760
Valid Values: [0,…]
Importance: medium
用于在所有线程间缓冲的最大内存字节数。
client.idtype: string
Default: ""
Importance: medium
用于内部消费者、生成者和恢复消费者的客户端 ID 的 ID 前缀字符串,其模式为 <
client.id>-StreamThread-<threadSequenceNumber$gt;-<consumer|producer|restore-consumer>。default.deserialization.exception.handlertype: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium
实现
org.apache.kafka.streams.errors.DeserializationExceptionHandler接口的异常处理类。default.key.serdeType: class
Default: null
Importance: medium
用于实现
org.apache.kafka.common.serialization.Serde接口的密钥的默认序列化器 / deserializer 类。请注意,当使用窗口的 serde 类时,需要一个设置 inner serde 类,该类也通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实现org.apache.kafka.common.serialization.Serde接口。default.list.key.serde.innerType: class
Default: null
Importance: medium
默认内部类是实施
org.apache.kafka.common.serialization.Serde接口的关键。只有在default.key.serde配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde时才会读取此配置。default.list.key.serde.typeType: class
Default: null
Importance: medium
用于实施
java.util.List接口的密钥的默认类。只有在default.key.serde配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerdes.ListSerde 时,才会读取此配置,当使用 list serde 类时,您需要设置 inner serde 类,该类通过 'default.list.key.serde.inner' 实现org.apache.kafka.common.serialization.Serde接口。default.list.value.serde.innerType: class
Default: null
Importance: medium
默认内部类是实施
org.apache.kafka.common.serialization.Serde接口的值。只有在default.value.serde配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde时才会读取此配置。default.list.value.serde.typeType: class
Default: null
Importance: medium
用于实现
java.util.List接口的值的默认类。只有在default.value.serde配置且只有这个设置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde是才会从这个配置中读取。请注意,当使用 list serde 类时,一个需要设置 inner serde 类,它实现了org.apache.kafka.common.serialization.Serde接口(通过 'default.list.value.serde.inner')。default.production.exception.handlertype: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium
实现
org.apache.kafka.streams.errors.ProductionExceptionHandler接口的异常处理类。default.timestamp.extractortype: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium
实现
org.apache.kafka.streams.processor.TimestampExtractor接口的默认时间戳提取程序类。default.value.serdeType: class
Default: null
Importance: medium
默认序列化器 / deserializer 类用于实现
org.apache.kafka.common.serialization.Serde接口的值。请注意,当使用窗口的 serde 类时,需要一个设置 inner serde 类,该类也通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实现org.apache.kafka.common.serialization.Serde接口。max.task.idle.msType: long
Default: 0
Importance: medium
此配置控制加入和合并可能会产生不是顺序的结果。config 值是当一个流完全处理某些(而非全部)输入分区等待制作者发送额外记录时,流任务将保持闲置的最长时间(毫秒)。默认(零)不会等待制作者发送更多记录,但它会等待代理上已存在的数据。此默认意味着对于代理中已存在的记录,流将按时间戳顺序处理它们。设置为 -1 可完全禁用闲置,并处理任何本地可用的数据,即使这样做可能会产生一定的顺序处理。
max.warmup.replicastype: int
Default: 2
Valid Values: [1,…]
Importance: medium
最多温副本数(超过配置的 num.standbys 之外的额外待机),这些副本数可以一次性分配,以便在一个实例上保持任务,同时它在其他实例上被重新分配。用于节流可用于高可用性的额外代理流量和集群状态。必须至少为 1.Note,一个 warmup 副本对应于一个流任务。另外,请注意,每个温副本只能在重新平衡期间提升到活跃任务(通常在所谓的探测重新平衡过程中,这会在
probing.rebalance.interval.ms配置指定的频率时发生)。这意味着,活跃任务可以从一个 Kafka Streams 实例迁移到另一个实例的最大速率由(max.warmup.replicas/probing.rebalance.interval.ms)决定。num.stream.threadsType: int
Default: 1
Importance: medium
执行流处理的线程数量。
processing.guaranteetype: string
Default: at_least_once
Valid Values: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
Importance: medium
处理保证应使用。可能的值有
at_least_once(默认)和exactly_once_v2(需要代理版本 2.5 或更高版本)。弃用的选项为exactly_once(需要代理版本 0.11.0 或更高版本)和exactly_once_beta(需要代理版本 2.5 或更高版本)。请注意,完全处理需要至少三个代理的集群,这是生产的建议设置;通过调整代理设置transaction.state.log.replication.factor和transaction.state.log.min.isr。rack.aware.assignment.tagstype: list
Default: ""
Valid Values: List contains maximum of 5 element
Importance: medium
用于在 Kafka Streams 实例之间分发备用副本的客户端标签键列表。配置后,Kafka Streams 将最好地将待机任务分发到每个客户端标签维度上。
replication.factortype: int
Default: -\":\"Importance: medium
更改日志主题和重新分区流处理应用程序创建的复制因素。默认值
-1(这代表:使用代理默认复制因素)需要代理版本 2.4 或更高版本。security.protocolType: string
Default: PLAINTEXT
Valid Values: [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]
Importance: medium
用于与代理通信的协议。有效值为: PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
statestore.cache.max.bytesType: long
Default: 10485760 (10 mebibytes)
Valid Values: [0,…]
Importance: medium
在所有线程中用于 statestore 缓存的最大内存字节数。
task.timeout.msType: long
Default: 300000 (5 minutes)
Valid Values: [0,…]
Importance: medium
因内部错误并重试直到引发错误前,任务可能会停滞的最长时间(以毫秒为单位)。如果超时为 0ms,任务会引发第一个内部错误的错误。对于大于 0ms 的超时时间,任务将在引发错误前至少重试一次。
topology.optimizationType: string
Default: none
Valid Values: org.apache.kafka.streams.StreamsConfig$$Lambda$27/0x0000000840079840@6ed3ef1
Importance: medium
配置会告知 Kafka Streams (如果应该优化拓扑)以及要应用的优化。可接受的值有:"NO_OPTIMIZATION"、"OPTIMIZE"或以逗号分隔的特定优化列表:("REUSE_KTABLE_SOURCE_TOPICS", "MERGE_REPARTITION_TOPICS" + "SINGLE_STORE_SELF_JOIN+")."NO_OPTIMIZATION"。
application.servertype: string
Default: ""
Importance: low
host:port 对指向用户定义的端点,可用于在此 KafkaStreams 实例上状态存储发现和交互式查询。
auto.include.jmx.reporterType: boolean
Default: true
Importance: low
已弃用。即使没有在
metric.reporters中列出,也自动包含 JmxReporter。此配置将在 Kafka 4.0 中删除,用户应改为将org.apache.kafka.common.metrics.JmxReporter包含在metric.reporters中,以便启用 JmxReporter。buffered.records.per.partitionType: int
Default: 1000
Importance: low
每个分区缓冲区的最大记录数。
built.in.metrics.versiontype: string
Default: latest
Valid Values: [latest]
Importance: low
要使用的内置指标版本。
commit.interval.msType: long
Default: 30000 (30 seconds)
Valid Values: [0,…]
Importance: low
提交处理进度的频率(毫秒)。对于 at-least-once 处理,提交意味着保存处理器的位置(如偏移)。对于完全处理,这意味着提交包含保存位置的事务,并在输出中提交的数据对具有隔离级别 read_committed 的用户可见。(请注意,如果
processing.guarantee设置为exactly_once_v2,exactly_once,则默认值为100,否则默认值为30000。connections.max.idle.msType: long
Default: 540000 (9 分钟)
Importance: low
在此配置指定的毫秒数后关闭闲置连接。
default.client.supplierType: class
Default: org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
Importance: low
实现
org.apache.kafka.streams.KafkaClientSupplier接口的客户端供应商类。default.dsl.storetype: string
Default: rocksDB
Valid Values: [rocksDB, in_memory]
Importance: low
DSL operator 使用的默认状态存储类型。
metadata.max.age.msType: long
Default: 300000 (5 minutes)
Valid Values: [0,…]
Importance: low
我们强制刷新元数据的时间(以毫秒为单位),即使我们没有看到任何分区领导更改来主动发现任何新的代理或分区。
metric.reporterstype: list
Default: ""
Importance: low
用作指标报告器的类列表。实施
org.apache.kafka.common.metrics.MetricsReporter接口允许插入将在创建新指标创建通知的类中。JmxReporter 始终被包含以注册 JMX 统计信息。metrics.num.samplestype: int
Default: 2
Valid Values: [1,…]
Importance: low
为计算指标维护的示例数量。
metrics.recording.leveltype: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low
指标的最大记录级别。
metrics.sample.window.msType: long
Default: 30000 (30 seconds)
Valid Values: [0,…]
Importance: low
计算指标示例的时间窗口。
poll.msType: long
Default: 100
Importance: low
阻止等待输入的时间(毫秒)。
probing.rebalance.interval.msType: long
Default: 600000 (10 minutes)
Valid Values: [60000,…]
Importance: low
在触发重新平衡前等待的最长时间(毫秒)以探测到已完成温并准备好激活的温副本。探测重新平衡将继续触发,直到分配平衡为止。必须至少为 1 分钟。
receive.buffer.bytestype: int
Default: 32768 (32 kibibytes)
Valid Values: [-1,…]
Importance: low
读取数据时使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,则使用 OS 默认。
reconnect.backoff.max.msType: long
Default: 1000 (1 second)
Valid Values: [0,…]
Importance: low
当重新连接到重复无法连接的代理时,等待的最大时间(毫秒)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。计算 backoff 后,会添加 20% 随机 jitter 以避免连接停滞。
reconnect.backoff.msType: long
Default: 50
Valid Values: [0,…]
Importance: low
尝试重新连接到给定主机前等待的时间。这可避免在严格的循环中重复连接到主机。此 backoff 适用于客户端到代理的所有连接尝试。
repartition.purge.interval.msType: long
Default: 30000 (30 seconds)
Valid Values: [0,…]
Importance: low
从重新分区主题中删除完全消耗的记录的频率(毫秒)。清除将在自上次清除后至少有这个值后进行,但可能会延迟直到稍后为止。(注意,与
commit.interval.ms不同,在processing.guarantee被设置为exactly_once_v2时,这个值的默认值不会改变。request.timeout.mstype: int
Default: 40000 (40 seconds)
Valid Values: [0,…]
Importance: low
配置控制客户端等待请求响应的最长时间。如果在超时超时前未收到响应,如果需要,或者如果重试耗尽,则请求将重新发送。
retriestype: int
Default: 0
Valid Values: [0,…,2147483647]
Importance: low
设置大于零的值将导致客户端重新发送任何失败的请求,并显示潜在的临时错误。建议将值设为零或
MAX_VALUE,并使用对应的超时参数来控制客户端应重试请求的时长。retry.backoff.msType: long
Default: 100
Valid Values: [0,…]
Importance: low
尝试重试失败的请求到给定主题分区前等待的时间。这可避免在某些故障场景中重复在严格的循环中发送请求。
rocksdb.config.setterType: class
Default: null
Importance: low
实现
org.apache.kafka.streams.state.RocksDBConfigSetter接口的 Rocks DB config setter 类或类名称。send.buffer.bytestype: int
Default: 131072 (128 kibibytes)
Valid Values: [-1,…]
Importance: low
发送数据时使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,则使用 OS 默认。
state.cleanup.delay.msType: long
Default: 600000 (10 分钟)
Importance: low
当分区迁移时,在删除状态前等待的时间(毫秒)。只有尚未修改至少
state.cleanup.delay.ms的状态目录才会被删除。upgrade.fromType: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4]
Importance: low
允许以向后兼容的方式进行升级。从 [0.10.0.0, 1.1] 升级到 2.0+,或者从 [2.0, 2.3] 升级到 2.4+ 时需要。当从 3.3 升级到更新的版本时,不需要指定此配置。默认为
null。接受的值为 "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.1", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "3.0", "3.1", "3.2", "3.3", "3.4", "3.4"window.size.msType: long
Default: null
Importance: low
为反序列化器设置窗口大小,以计算窗口结束时间。
windowed.inner.class.serdeType: string
Default: null
Importance: low
默认序列化器 / deserializer 用于窗口记录的内部类。必须实施
org.apache.kafka.common.serialization.Serde接口。请注意,在 KafkaStreams 应用程序中设置此配置会导致错误,因为它只从 Plain consumer 客户端使用。windowstore.changelog.additional.retention.msType: long
Default: 86400000 (1 day)
Importance: low
添加到窗口 MaintenanceMs 中,以确保不会预先从日志中删除数据。允许时钟偏移。默认为 1 天。