此内容没有您所选择的语言版本。

Chapter 7. Kafka Streams configuration properties


application.id

Type: string
Importance: high

An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.

bootstrap.servers

Type: list
Importance: high

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,…​. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

num.standby.replicas

Type: int
Default: 0
Importance: high

The number of standby replicas for each task.

state.dir

Type: string
Default: /tmp/kafka-streams
Importance: high

Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem. Note that if not configured, then the default location will be different in each environment as it is computed using System.getProperty("java.io.tmpdir").

acceptable.recovery.lag

Type: long
Default: 10000
Valid Values: [0,…​]
Importance: medium

The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment. Upon assignment, it will still restore the rest of the changelog before processing. To avoid a pause in processing during rebalances, this config should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.

cache.max.bytes.buffering

Type: long
Default: 10485760
Valid Values: [0,…​]
Importance: medium

Maximum number of memory bytes to be used for buffering across all threads.

client.id

Type: string
Default: ""
Importance: medium

An ID prefix string used for the client IDs of internal (main, restore, and global) consumers , producers, and admin clients with pattern <client.id>-[Global]StreamThread[-<threadSequenceNumber>]-<consumer|producer|restore-consumer|global-consumer>.

default.deserialization.exception.handler

Type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium

Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface.

default.key.serde

Type: class
Default: null
Importance: medium

Default serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well.

default.list.key.serde.inner

Type: class
Default: null
Importance: medium

Default inner class of list serde for key that implements the org.apache.kafka.common.serialization.Serde interface. This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde.

default.list.key.serde.type

Type: class
Default: null
Importance: medium

Default class for key that implements the java.util.List interface. This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via 'default.list.key.serde.inner'.

default.list.value.serde.inner

Type: class
Default: null
Importance: medium

Default inner class of list serde for value that implements the org.apache.kafka.common.serialization.Serde interface. This configuration will be read if and only if default.value.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde.

default.list.value.serde.type

Type: class
Default: null
Importance: medium

Default class for value that implements the java.util.List interface. This configuration will be read if and only if default.value.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via 'default.list.value.serde.inner'.

default.production.exception.handler

Type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium

Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface.

default.timestamp.extractor

Type: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium

Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface.

default.value.serde

Type: class
Default: null
Importance: medium

Default serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well.

max.task.idle.ms

Type: long
Default: 0
Importance: medium

This config controls whether joins and merges may produce out-of-order results. The config value is the maximum amount of time in milliseconds a stream task will stay idle when it is fully caught up on some (but not all) input partitions to wait for producers to send additional records and avoid potential out-of-order record processing across multiple input streams. The default (zero) does not wait for producers to send more records, but it does wait to fetch data that is already present on the brokers. This default means that for records that are already present on the brokers, Streams will process them in timestamp order. Set to -1 to disable idling entirely and process any locally available data, even though doing so may produce out-of-order processing.

max.warmup.replicas

Type: int
Default: 2
Valid Values: [1,…​]
Importance: medium

The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker traffic and cluster state can be used for high availability. Must be at least 1.Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active task during a rebalance (normally during a so-called probing rebalance, which occur at a frequency specified by the probing.rebalance.interval.ms config). This means that the maximum rate at which active tasks can be migrated from one Kafka Streams Instance to another instance can be determined by (max.warmup.replicas / probing.rebalance.interval.ms).

num.stream.threads

Type: int
Default: 1
Importance: medium

The number of threads to execute stream processing.

processing.guarantee

Type: string
Default: at_least_once
Valid Values: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
Importance: medium

The processing guarantee that should be used. Possible values are at_least_once (default) and exactly_once_v2 (requires brokers version 2.5 or higher). Deprecated options are exactly_once (requires brokers version 0.11.0 or higher) and exactly_once_beta (requires brokers version 2.5 or higher). Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor and transaction.state.log.min.isr.

rack.aware.assignment.non_overlap_cost

Type: int
Default: null
Importance: medium

Cost associated with moving tasks from existing assignment. This config and rack.aware.assignment.traffic_cost controls whether the optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor will optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.

rack.aware.assignment.strategy

Type: string
Default: none
Valid Values: [none, min_traffic, balance_subtopology]
Importance: medium

The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning tasks to minimize cross rack traffic. Valid settings are : none (default), which will disable rack aware assignment; min_traffic, which will compute minimum cross rack traffic assignment; balance_subtopology, which will compute minimum cross rack traffic and try to balance the tasks of same subtopolgies across different clients.

rack.aware.assignment.tags

Type: list
Default: ""
Valid Values: List containing maximum of 5 elements
Importance: medium

List of client tag keys used to distribute standby replicas across Kafka Streams instances. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over each client tag dimension.

rack.aware.assignment.traffic_cost

Type: int
Default: null
Importance: medium

Cost associated with cross rack traffic. This config and rack.aware.assignment.non_overlap_cost controls whether the optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor will optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors.

replication.factor

Type: int
Default: -1
Importance: medium

The replication factor for change log topics and repartition topics created by the stream processing application. The default of -1 (meaning: use broker default replication factor) requires broker version 2.4 or newer.

security.protocol

Type: string
Default: PLAINTEXT
Valid Values: (case insensitive) [SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT]
Importance: medium

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

statestore.cache.max.bytes

Type: long
Default: 10485760 (10 mebibytes)
Valid Values: [0,…​]
Importance: medium

Maximum number of memory bytes to be used for statestore cache across all threads.

task.assignor.class

Type: string
Default: null
Importance: medium

A task assignor class or class name implementing the org.apache.kafka.streams.processor.assignment.TaskAssignor interface. Defaults to the HighAvailabilityTaskAssignor class.

task.timeout.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…​]
Importance: medium

The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0ms, a task would raise an error for the first internal error. For any timeout larger than 0ms, a task will retry at least once before an error is raised.

topology.optimization

Type: string
Default: none
Valid Values: [all, none, reuse.ktable.source.topics, merge.repartition.topics, single.store.self.join]
Importance: medium

A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: "NO_OPTIMIZATION", "OPTIMIZE", or a comma separated list of specific optimizations: ("REUSE_KTABLE_SOURCE_TOPICS", "MERGE_REPARTITION_TOPICS", "SINGLE_STORE_SELF_JOIN"). "NO_OPTIMIZATION" by default.

application.server

Type: string
Default: ""
Importance: low

A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance.

auto.include.jmx.reporter

Type: boolean
Default: true
Importance: low

Deprecated. Whether to automatically include JmxReporter even if it’s not listed in metric.reporters. This configuration will be removed in Kafka 4.0, users should instead include org.apache.kafka.common.metrics.JmxReporter in metric.reporters in order to enable the JmxReporter.

buffered.records.per.partition

Type: int
Default: 1000
Importance: low

Maximum number of records to buffer per partition.

built.in.metrics.version

Type: string
Default: latest
Valid Values: [latest]
Importance: low

Version of the built-in metrics to use.

commit.interval.ms

Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…​]
Importance: low

The frequency in milliseconds with which to commit processing progress. For at-least-once processing, committing means to save the position (ie, offsets) of the processor. For exactly-once processing, it means to commit the transaction which includes to save the position and to make the committed data in the output topic visible to consumers with isolation level read_committed. (Note, if processing.guarantee is set to exactly_once_v2, exactly_once,the default value is 100, otherwise the default value is 30000.

connections.max.idle.ms

Type: long
Default: 540000 (9 minutes)
Importance: low

Close idle connections after the number of milliseconds specified by this config.

default.client.supplier

Type: class
Default: org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
Importance: low

Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface.

default.dsl.store

Type: string
Default: rocksDB
Valid Values: [rocksDB, in_memory]
Importance: low

The default state store type used by DSL operators.

dsl.store.suppliers.class

Type: class
Default: org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers
Importance: low

Defines which store implementations to plug in to DSL operators. Must implement the org.apache.kafka.streams.state.DslStoreSuppliers interface.

enable.metrics.push

Type: boolean
Default: true
Importance: low

Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients. The cluster must have a client metrics subscription which corresponds to a client.

metadata.max.age.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…​]
Importance: low

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

metric.reporters

Type: list
Default: ""
Importance: low

A list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.

metrics.num.samples

Type: int
Default: 2
Valid Values: [1,…​]
Importance: low

The number of samples maintained to compute metrics.

metrics.recording.level

Type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low

The highest recording level for metrics.

metrics.sample.window.ms

Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…​]
Importance: low

The window of time a metrics sample is computed over.

poll.ms

Type: long
Default: 100
Importance: low

The amount of time in milliseconds to block waiting for input.

probing.rebalance.interval.ms

Type: long
Default: 600000 (10 minutes)
Valid Values: [60000,…​]
Importance: low

The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute.

receive.buffer.bytes

Type: int
Default: 32768 (32 kibibytes)
Valid Values: [-1,…​]
Importance: low

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.

reconnect.backoff.max.ms

Type: long
Default: 1000 (1 second)
Valid Values: [0,…​]
Importance: low

The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

reconnect.backoff.ms

Type: long
Default: 50
Valid Values: [0,…​]
Importance: low

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the reconnect.backoff.max.ms value.

repartition.purge.interval.ms

Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…​]
Importance: low

The frequency in milliseconds with which to delete fully consumed records from repartition topics. Purging will occur after at least this value since the last purge, but may be delayed until later. (Note, unlike commit.interval.ms, the default for this value remains unchanged when processing.guarantee is set to exactly_once_v2).

request.timeout.ms

Type: int
Default: 40000 (40 seconds)
Valid Values: [0,…​]
Importance: low

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

retries

Type: int
Default: 0
Valid Values: [0,…​,2147483647]
Importance: low

Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. It is recommended to set the value to either zero or MAX_VALUE and use corresponding timeout parameters to control how long a client should retry a request.

retry.backoff.ms

Type: long
Default: 100
Valid Values: [0,…​]
Importance: low

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. This value is the initial backoff value and will increase exponentially for each failed request, up to the retry.backoff.max.ms value.

rocksdb.config.setter

Type: class
Default: null
Importance: low

A Rocks DB config setter class or class name that implements the org.apache.kafka.streams.state.RocksDBConfigSetter interface.

send.buffer.bytes

Type: int
Default: 131072 (128 kibibytes)
Valid Values: [-1,…​]
Importance: low

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.

state.cleanup.delay.ms

Type: long
Default: 600000 (10 minutes)
Importance: low

The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed.

upgrade.from

Type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7]
Importance: low

Allows upgrading in a backward compatible way. This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. When upgrading from 3.3 to a newer version it is not required to specify this config. Default is null. Accepted values are "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.1", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "3.0", "3.1", "3.2", "3.3", "3.4", "3.5", "3.6", "3.7" (for upgrading from the corresponding old version).

window.size.ms

Type: long
Default: null
Importance: low

Sets window size for the deserializer in order to calculate window end times.

windowed.inner.class.serde

Type: string
Default: null
Importance: low

Default serializer / deserializer for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. Note that setting this config in KafkaStreams application would result in an error as it is meant to be used only from Plain consumer client.

windowstore.changelog.additional.retention.ms

Type: long
Default: 86400000 (1 day)
Importance: low

Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day.

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat