Appendix G. Kafka Streams configuration parameters
application.id
Type: string
Importance: high
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
bootstrap.servers
Type: list
Importance: high
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form
host1:port1,host2:port2,…
. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).replication.factor
Type: int
Default: 1
Importance: high
The replication factor for change log topics and repartition topics created by the stream processing application.
state.dir
Type: string
Default: /tmp/kafka-streams
Importance: high
Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.
acceptable.recovery.lag
Type: long
Default: 10000
Valid Values: [0,…]
Importance: medium
The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up for an active task.Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
cache.max.bytes.buffering
Type: long
Default: 10485760
Valid Values: [0,…]
Importance: medium
Maximum number of memory bytes to be used for buffering across all threads.
client.id
Type: string
Default: ""
Importance: medium
An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'.
default.deserialization.exception.handler
Type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium
Exception handling class that implements the
org.apache.kafka.streams.errors.DeserializationExceptionHandler
interface.default.key.serde
Type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium
Default serializer / deserializer class for key that implements the
org.apache.kafka.common.serialization.Serde
interface. Note when windowed serde class is used, one needs to set the inner serde class that implements theorg.apache.kafka.common.serialization.Serde
interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well.default.production.exception.handler
Type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium
Exception handling class that implements the
org.apache.kafka.streams.errors.ProductionExceptionHandler
interface.default.timestamp.extractor
Type: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium
Default timestamp extractor class that implements the
org.apache.kafka.streams.processor.TimestampExtractor
interface.default.value.serde
Type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium
Default serializer / deserializer class for value that implements the
org.apache.kafka.common.serialization.Serde
interface. Note when windowed serde class is used, one needs to set the inner serde class that implements theorg.apache.kafka.common.serialization.Serde
interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well.max.task.idle.ms
Type: long
Default: 0
Importance: medium
Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.
max.warmup.replicas
Type: int
Default: 2
Valid Values: [1,…]
Importance: medium
The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker traffic and cluster state can be used for high availability. Must be at least 1.
num.standby.replicas
Type: int
Default: 0
Importance: medium
The number of standby replicas for each task.
num.stream.threads
Type: int
Default: 1
Importance: medium
The number of threads to execute stream processing.
processing.guarantee
Type: string
Default: at_least_once
Valid Values: [at_least_once, exactly_once, exactly_once_beta]
Importance: medium
The processing guarantee that should be used. Possible values are
at_least_once
(default),exactly_once
(requires brokers version 0.11.0 or higher), andexactly_once_beta
(requires brokers version 2.5 or higher). Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker settingtransaction.state.log.replication.factor
andtransaction.state.log.min.isr
.security.protocol
Type: string
Default: PLAINTEXT
Importance: medium
Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
topology.optimization
Type: string
Default: none
Valid Values: [none, all]
Importance: medium
A configuration telling Kafka Streams if it should optimize the topology, disabled by default.
application.server
Type: string
Default: ""
Importance: low
A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance.
buffered.records.per.partition
Type: int
Default: 1000
Importance: low
Maximum number of records to buffer per partition.
built.in.metrics.version
Type: string
Default: latest
Valid Values: [0.10.0-2.4, latest]
Importance: low
Version of the built-in metrics to use.
commit.interval.ms
Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…]
Importance: low
The frequency with which to save the position of the processor. (Note, if
processing.guarantee
is set toexactly_once
, the default value is100
, otherwise the default value is30000
.connections.max.idle.ms
Type: long
Default: 540000 (9 minutes)
Importance: low
Close idle connections after the number of milliseconds specified by this config.
metadata.max.age.ms
Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…]
Importance: low
The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.
metric.reporters
Type: list
Default: ""
Importance: low
A list of classes to use as metrics reporters. Implementing the
org.apache.kafka.common.metrics.MetricsReporter
interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.metrics.num.samples
Type: int
Default: 2
Valid Values: [1,…]
Importance: low
The number of samples maintained to compute metrics.
metrics.recording.level
Type: string
Default: INFO
Valid Values: [INFO, DEBUG]
Importance: low
The highest recording level for metrics.
metrics.sample.window.ms
Type: long
Default: 30000 (30 seconds)
Valid Values: [0,…]
Importance: low
The window of time a metrics sample is computed over.
partition.grouper
Type: class
Default: org.apache.kafka.streams.processor.DefaultPartitionGrouper
Importance: low
Partition grouper class that implements the
org.apache.kafka.streams.processor.PartitionGrouper
interface. WARNING: This config is deprecated and will be removed in 3.0.0 release.poll.ms
Type: long
Default: 100
Importance: low
The amount of time in milliseconds to block waiting for input.
probing.rebalance.interval.ms
Type: long
Default: 600000 (10 minutes)
Valid Values: [60000,…]
Importance: low
The maximum time to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute.
receive.buffer.bytes
Type: int
Default: 32768 (32 kibibytes)
Valid Values: [-1,…]
Importance: low
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
reconnect.backoff.max.ms
Type: long
Default: 1000 (1 second)
Valid Values: [0,…]
Importance: low
The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
reconnect.backoff.ms
Type: long
Default: 50
Valid Values: [0,…]
Importance: low
The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
request.timeout.ms
Type: int
Default: 40000 (40 seconds)
Valid Values: [0,…]
Importance: low
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
retries
Type: int
Default: 0
Valid Values: [0,…,2147483647]
Importance: low
Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error.
retry.backoff.ms
Type: long
Default: 100
Valid Values: [0,…]
Importance: low
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
rocksdb.config.setter
Type: class
Default: null
Importance: low
A Rocks DB config setter class or class name that implements the
org.apache.kafka.streams.state.RocksDBConfigSetter
interface.send.buffer.bytes
Type: int
Default: 131072 (128 kibibytes)
Valid Values: [-1,…]
Importance: low
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.
state.cleanup.delay.ms
Type: long
Default: 600000 (10 minutes)
Importance: low
The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least
state.cleanup.delay.ms
will be removed.upgrade.from
Type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
Importance: low
Allows upgrading in a backward compatible way. This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. When upgrading from 2.4 to a newer version it is not required to specify this config. Default is
null
. Accepted values are "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.1", "2.2", "2.3" (for upgrading from the corresponding old version).windowstore.changelog.additional.retention.ms
Type: long
Default: 86400000 (1 day)
Importance: low
Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day.