附录 G. Kafka Streams 配置参数
application.id
type: string
Importance: high
流处理应用的标识符。在 Kafka 集群中必须是唯一的。它用作 1)默认的 client-id 前缀 2,即用于成员资格管理的 group-id 的 group-id,即 changelog 主题前缀 3。
bootstrap.servers
type: list
Importance: high
用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用与此处为 bootstrapping 指定的服务器一样的所有服务器,此列表只会影响用于发现整套服务器的初始主机。这个列表应该采用
host1:port1,host2:port2,…
格式。由于这些服务器仅用于初始连接来发现完整的集群成员身份(可能会动态更改),因此该列表不需要包含完整的服务器集合(尽管,如果服务器停机,您可能希望多台服务器)。replication.factor
type: int
Default: 1
Importance: high
用于更改日志主题和流处理应用程序创建的重新分区主题的复制因素。如果您的代理集群使用版本 2.4 或更新的版本,您可以将 -1 设置为使用代理默认复制因素。
state.dir
type: string
Default: /tmp/kafka-streams
Importance: high
状态存储的目录位置.对于共享同一底层文件系统的每个流实例,此路径必须是唯一的。
acceptable.recovery.lag
type: long
Default: 10000
Valid Values: [0,…]
Importance: medium
客户端被视为活动任务捕捉到最高可接受滞后(偏移数)的延时。这与给定工作负载在一分钟之内的恢复时间对应。必须至少为 0。
cache.max.bytes.buffering
type: long
Default: 10485760
Valid Values: [0,…]
Importance: medium
用于在所有线程间缓冲的最大内存字节数。
client.id
type: string
Default: ""
Importance: medium
用于内部消费者、生产者和 restore-consumer 的客户端 ID 的 ID 前缀字符串,其模式为 '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'。
default.deserialization.exception.handler
type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: media
实施
org.apache.kafka.streams.errors.DeserializationExceptionHandler
接口的异常处理类。default.key.serde
type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium
实现
org.apache.kafka.common.serialization.Serde
接口的密钥的默认序列化器/反序列化器类。请注意,当使用窗口的 serde 类时,还需要设置通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实施org.apache.kafka.common.serialization.Serde
接口的内部serde 类。default.production.exception.handler
type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: media
实施
org.apache.kafka.streams.errors.ProductionExceptionHandler
接口的异常处理类。default.timestamp.extractor
type: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: media
实施
org.apache.kafka.streams.processor.TimestampExtractor
接口的默认时间戳提取器类。default.value.serde
type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium
实现
org.apache.kafka.common.serialization.Serde
接口的值的默认序列化器/反序列化器类。请注意,当使用窗口的 serde 类时,还需要设置通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实施org.apache.kafka.common.serialization.Serde
接口的内部serde 类。default.windowed.key.serde.inner
type: class
Default: null
Importance: medium
窗口密钥内类的默认序列化器/反序列器。必须实施
org.apache.kafka.common.serialization.Serde
接口。default.windowed.value.serde.inner
type: class
Default: null
Importance: medium
用于窗口值内类的默认序列化器/反序列器。必须实施
org.apache.kafka.common.serialization.Serde
接口。max.task.idle.ms
type: long
Default: 0
Importance: medium
如果并非所有分区缓冲区都包含 记录,则流任务的最大时间(毫秒)将保持空闲状态,以避免在多个输入流间处理序列不足的记录。
max.warmup.replicas
type: int
Default: 2
Valid Values: [1,…]
Importance: medium
可以一次分配的最大温副本数(在配置的 num.standbys 之外备用),以便在其中一个实例上保持任务可用,同时在它被重新分配给的另一实例上保持其可用。用于调整有多少额外的代理流量和集群状态可用于高可用性。必须至少为 1。
num.standby.replicas
type: int
Default: 0
Importance: medium
每个任务的待机副本数。
num.stream.threads
类型: int
Default: 1
Importance: medium
执行流处理的线程数量。
processing.guarantee
type: string
Default: at_least_once
Valid Values: [at_least_once, exact_once_beta]
Importance: media
处理确保 应使用。可能的值有
at_least_once
(默认)、exactly_once
(需要 0.11.0 或更高版本)和exactly_once_beta
(需要代理版本 2.5 或更高版本)。请注意,正好一次的处理需要在默认情况下,一个包含至少三个代理的集群,这是生产环境的建议设置。对于开发,您可以调整代理设置transaction.state.log.replication.factor
和transaction.state.log.min.isr
。security.protocol
type: string
Default: PLAINTEXT
Importance: medium
用于与代理通信的协议.有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL.
task.timeout.ms
type: long
Default: 300000(5 分钟)
Valid Values: [0,…]
Importance: medium
任务最多的时间(毫秒)可能会因为内部错误和重试而停止,直到引发错误。对于超时为 0ms,任务可能会引发第一个内部错误的错误。对于大于 0ms 的任何超时,任务将至少在引发错误前重试一次。
topology.optimization
type: string
Default: none
Valid Values: [none, all]
Importance: medium
告知 Kafka Streams 的配置是否应该优化拓扑,默认为禁用。
application.server
type: string
Default: ""
Importance: low
host:port 对指向用户定义的端点,可用于此 KafkaStreams 实例上的状态存储发现和交互式查询。
buffered.records.per.partition
type: int
Default: 1000
Importance: low
每个分区要缓冲的最多记录数.
built.in.metrics.version
type: string
Default: latest
Valid Values: [0.10.0-2.4, latest]
Importance: low
要使用的内置指标的版本。
commit.interval.ms
type: long
Default: 30000(30 秒)
Valid Values: [0,…]
Importance: low
保存处理器位置的频率,以毫秒为单位。(请注意,如果
processing.guarantee
设为exactly_once
,则默认值为100
,否则默认值为30000
。)connections.max.idle.ms
type: long
Default: 540000(9 分钟)
Importance: low
在此配置指定的毫秒数后关闭空闲连接。
metadata.max.age.ms
type: long
Default: 300000(5 分钟)
Valid Values: [0,…]
Importance: low
在经过这一时间之后,我们强制刷新元数据,即使我们尚未看到任何分区领导变化来主动发现任何新的代理或分区。
metric.reporters
type: list
Default: ""
Importance: low
用作指标报告器的类的列表。实施
org.apache.kafka.common.metrics.MetricsReporter
接口允许在类中插入,这些类将通知新指标创建。JmxReporter 始终包含在内,用于注册 JMX 统计数据。metrics.num.samples
type: int
Default: 2
Valid Values: [1,…]
Importance: low
维护到计算指标的示例数量。
metrics.recording.level
type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low
指标的最高记录级别。
metrics.sample.window.ms
type: long
Default: 30000(30 秒)
Valid Values: [0,…]
Importance: low
指标样本计算完的时间窗口。
partition.grouper
type: class
Default: org.apache.kafka.streams.processor.DefaultPartitionGrouper
Importance: low
实现
org.apache.kafka.streams.processor.PartitionGrouper
接口的分区分组器类。警告:此配置已弃用,并将在 3.0.0 发行版本中删除。poll.ms
type: long
Default: 100
Importance: low
阻止等待输入的时间(毫秒为单位)。
probing.rebalance.interval.ms
type: long
Default: 600000(10 minutes)
Valid Values: [60000,…]
Importance: low
在触发重新平衡到探测完成争用并且准备好激活的温备份副本前,等待的最长时间(以毫秒为单位)。探测重新平衡将继续触发,直到分配均衡为止。至少 1 分钟。
receive.buffer.bytes
type: int
Default: 32768(32 kibibytes)
Valid Values: [-1,…]
Importance: low
读取数据时使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,将使用 OS 默认值。
reconnect.backoff.max.ms
type: long
Default: 1000(1 second)
Valid Values: [0,…]
Importance: low
重新连接到一个代理时需要等待的时间上限(以毫秒为单位),且该代理重复无法连接。如果提供,每个主机的 backoff 都会为每个连续连接失败呈指数级增长,最高可达此最大值。计算 backoff 后,添加 20% 的随机 jitter,以避免连接风暴。
reconnect.backoff.ms
type: long
Default: 50
Valid Values: [0,…]
Importance: low
尝试重新连接到给定主机前等待的基本时间。这可避免在紧凑循环中重复连接主机。这个后端适用于客户端对代理的所有连接尝试。
request.timeout.ms
type: int
Default: 40000(40 秒)
Valid Values: [0,…]
Importance: low
配置控制客户端等待请求响应的最大时间。如果在超时前未收到响应,客户端将在必要时重新发送请求,或者在重试结束时失败请求。
retries
type: int
Default: 0
Valid Values: [0,…,2147483647]
Importance: low
设置大于零的值将导致客户端重新发送任何失败且出现可能瞬时错误的请求。建议将值设置为零或
MAX_VALUE
,并使用对应的超时参数来控制客户端重试请求的时间。retry.backoff.ms
type: long
Default: 100
Valid Values: [0,…]
Importance: low
尝试将失败的请求重试到给定主题分区前等待的时间。这可避免在某些故障情况下在紧凑循环中重复发送请求。
rocksdb.config.setter
type: class
Default: null
Importance: low
实现
org.apache.kafka.streams.state.RocksDBConfigSetter
接口的 Rocks DB 配置集类或类名称。send.buffer.bytes
type: int
Default: 131072(128 kibibytes)
Valid Values: [-1,…]
Importance: low
发送数据时使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,将使用 OS 默认值。
state.cleanup.delay.ms
type: long
Default: 600000(10 分钟)
Importance: low
分区迁移后,在删除状态前等待的时间(毫秒)。只有尚未修改
state.cleanup.delay.ms
的状态目录才会被删除。upgrade.from
type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
Importance: low
允许以向后兼容的方式升级.从 [0.10.0、1.1] 升级到 2.0+ 或者从 [2.0, 2.3] 升级到 2.4+ 时需要此项。当从 2.4 升级到更新的版本时,不需要指定此配置。默认值为
null
。可接受的值为 "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.2", "2.3"(用于从对应的旧版本升级)。window.size.ms
type: long
Default: null
Importance: low
设置 deserializer 的窗口大小,以便计算窗口结束时间。
windowstore.changelog.additional.retention.ms
type: long
Default: 86400000(1 天)
Importance: low
添加至窗口维护M,以确保不会从日志中永久删除数据。允许时钟偏移.默认值为 1 天。