附录 G. Kafka Streams 配置参数


application.id

type: string
Importance: high

流处理应用的标识符。在 Kafka 集群中必须是唯一的。它用作 1)默认的 client-id 前缀 2,即用于成员资格管理的 group-id 的 group-id,即 changelog 主题前缀 3。

bootstrap.servers

type: list
Importance: high

用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用与此处为 bootstrapping 指定的服务器一样的所有服务器,此列表只会影响用于发现整套服务器的初始主机。这个列表应该采用 host1:port1,host2:port2,…​ 格式。由于这些服务器仅用于初始连接来发现完整的集群成员身份(可能会动态更改),因此该列表不需要包含完整的服务器集合(尽管,如果服务器停机,您可能希望多台服务器)。

replication.factor

type: int
Default: 1
Importance: high

用于更改日志主题和流处理应用程序创建的重新分区主题的复制因素。如果您的代理集群使用版本 2.4 或更新的版本,您可以将 -1 设置为使用代理默认复制因素。

state.dir

type: string
Default: /tmp/kafka-streams
Importance: high

状态存储的目录位置.对于共享同一底层文件系统的每个流实例,此路径必须是唯一的。

acceptable.recovery.lag

type: long
Default: 10000
Valid Values: [0,…​]
Importance: medium

客户端被视为活动任务捕捉到最高可接受滞后(偏移数)的延时。这与给定工作负载在一分钟之内的恢复时间对应。必须至少为 0。

cache.max.bytes.buffering

type: long
Default: 10485760
Valid Values: [0,…​]
Importance: medium

用于在所有线程间缓冲的最大内存字节数。

client.id

type: string
Default: ""
Importance: medium

用于内部消费者、生产者和 restore-consumer 的客户端 ID 的 ID 前缀字符串,其模式为 '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'。

default.deserialization.exception.handler

type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: media

实施 org.apache.kafka.streams.errors.DeserializationExceptionHandler 接口的异常处理类。

default.key.serde

type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium

实现 org.apache.kafka.common.serialization.Serde 接口的密钥的默认序列化器/反序列化器类。请注意,当使用窗口的 serde 类时,还需要设置通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实施 org.apache.kafka.common.serialization.Serde 接口的内部serde 类。

default.production.exception.handler

type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: media

实施 org.apache.kafka.streams.errors.ProductionExceptionHandler 接口的异常处理类。

default.timestamp.extractor

type: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: media

实施 org.apache.kafka.streams.processor.TimestampExtractor 接口的默认时间戳提取器类。

default.value.serde

type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium

实现 org.apache.kafka.common.serialization.Serde 接口的值的默认序列化器/反序列化器类。请注意,当使用窗口的 serde 类时,还需要设置通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner' 实施 org.apache.kafka.common.serialization.Serde 接口的内部serde 类。

default.windowed.key.serde.inner

type: class
Default: null
Importance: medium

窗口密钥内类的默认序列化器/反序列器。必须实施 org.apache.kafka.common.serialization.Serde 接口。

default.windowed.value.serde.inner

type: class
Default: null
Importance: medium

用于窗口值内类的默认序列化器/反序列器。必须实施 org.apache.kafka.common.serialization.Serde 接口。

max.task.idle.ms

type: long
Default: 0
Importance: medium

如果并非所有分区缓冲区都包含 记录,则流任务的最大时间(毫秒)将保持空闲状态,以避免在多个输入流间处理序列不足的记录。

max.warmup.replicas

type: int
Default: 2
Valid Values: [1,…​]
Importance: medium

可以一次分配的最大温副本数(在配置的 num.standbys 之外备用),以便在其中一个实例上保持任务可用,同时在它被重新分配给的另一实例上保持其可用。用于调整有多少额外的代理流量和集群状态可用于高可用性。必须至少为 1。

num.standby.replicas

type: int
Default: 0
Importance: medium

每个任务的待机副本数。

num.stream.threads

类型: int
Default: 1
Importance: medium

执行流处理的线程数量。

processing.guarantee

type: string
Default: at_least_once
Valid Values: [at_least_once, exact_once_beta]
Importance: media

处理确保 应使用。可能的值有 at_least_once (默认)、exactly_once (需要 0.11.0 或更高版本)和 exactly_once_beta (需要代理版本 2.5 或更高版本)。请注意,正好一次的处理需要在默认情况下,一个包含至少三个代理的集群,这是生产环境的建议设置。对于开发,您可以调整代理设置 transaction.state.log.replication.factortransaction.state.log.min.isr

security.protocol

type: string
Default: PLAINTEXT
Importance: medium

用于与代理通信的协议.有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL.

task.timeout.ms

type: long
Default: 300000(5 分钟)
Valid Values: [0,…​]
Importance: medium

任务最多的时间(毫秒)可能会因为内部错误和重试而停止,直到引发错误。对于超时为 0ms,任务可能会引发第一个内部错误的错误。对于大于 0ms 的任何超时,任务将至少在引发错误前重试一次。

topology.optimization

type: string
Default: none
Valid Values: [none, all]
Importance: medium

告知 Kafka Streams 的配置是否应该优化拓扑,默认为禁用。

application.server

type: string
Default: ""
Importance: low

host:port 对指向用户定义的端点,可用于此 KafkaStreams 实例上的状态存储发现和交互式查询。

buffered.records.per.partition

type: int
Default: 1000
Importance: low

每个分区要缓冲的最多记录数.

built.in.metrics.version

type: string
Default: latest
Valid Values: [0.10.0-2.4, latest]
Importance: low

要使用的内置指标的版本。

commit.interval.ms

type: long
Default: 30000(30 秒)
Valid Values: [0,…​]
Importance: low

保存处理器位置的频率,以毫秒为单位。(请注意,如果 processing.guarantee 设为 exactly_once,则默认值为 100,否则默认值为 30000。)

connections.max.idle.ms

type: long
Default: 540000(9 分钟)
Importance: low

在此配置指定的毫秒数后关闭空闲连接。

metadata.max.age.ms

type: long
Default: 300000(5 分钟)
Valid Values: [0,…​]
Importance: low

在经过这一时间之后,我们强制刷新元数据,即使我们尚未看到任何分区领导变化来主动发现任何新的代理或分区。

metric.reporters

type: list
Default: ""
Importance: low

用作指标报告器的类的列表。实施 org.apache.kafka.common.metrics.MetricsReporter 接口允许在类中插入,这些类将通知新指标创建。JmxReporter 始终包含在内,用于注册 JMX 统计数据。

metrics.num.samples

type: int
Default: 2
Valid Values: [1,…​]
Importance: low

维护到计算指标的示例数量。

metrics.recording.level

type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low

指标的最高记录级别。

metrics.sample.window.ms

type: long
Default: 30000(30 秒)
Valid Values: [0,…​]
Importance: low

指标样本计算完的时间窗口。

partition.grouper

type: class
Default: org.apache.kafka.streams.processor.DefaultPartitionGrouper
Importance: low

实现 org.apache.kafka.streams.processor.PartitionGrouper 接口的分区分组器类。警告:此配置已弃用,并将在 3.0.0 发行版本中删除。

poll.ms

type: long
Default: 100
Importance: low

阻止等待输入的时间(毫秒为单位)。

probing.rebalance.interval.ms

type: long
Default: 600000(10 minutes)
Valid Values: [60000,…​]
Importance: low

在触发重新平衡到探测完成争用并且准备好激活的温备份副本前,等待的最长时间(以毫秒为单位)。探测重新平衡将继续触发,直到分配均衡为止。至少 1 分钟。

receive.buffer.bytes

type: int
Default: 32768(32 kibibytes)
Valid Values: [-1,…​]
Importance: low

读取数据时使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,将使用 OS 默认值。

reconnect.backoff.max.ms

type: long
Default: 1000(1 second)
Valid Values: [0,…​]
Importance: low

重新连接到一个代理时需要等待的时间上限(以毫秒为单位),且该代理重复无法连接。如果提供,每个主机的 backoff 都会为每个连续连接失败呈指数级增长,最高可达此最大值。计算 backoff 后,添加 20% 的随机 jitter,以避免连接风暴。

reconnect.backoff.ms

type: long
Default: 50
Valid Values: [0,…​]
Importance: low

尝试重新连接到给定主机前等待的基本时间。这可避免在紧凑循环中重复连接主机。这个后端适用于客户端对代理的所有连接尝试。

request.timeout.ms

type: int
Default: 40000(40 秒)
Valid Values: [0,…​]
Importance: low

配置控制客户端等待请求响应的最大时间。如果在超时前未收到响应,客户端将在必要时重新发送请求,或者在重试结束时失败请求。

retries

type: int
Default: 0
Valid Values: [0,…​,2147483647]
Importance: low

设置大于零的值将导致客户端重新发送任何失败且出现可能瞬时错误的请求。建议将值设置为零或 MAX_VALUE,并使用对应的超时参数来控制客户端重试请求的时间。

retry.backoff.ms

type: long
Default: 100
Valid Values: [0,…​]
Importance: low

尝试将失败的请求重试到给定主题分区前等待的时间。这可避免在某些故障情况下在紧凑循环中重复发送请求。

rocksdb.config.setter

type: class
Default: null
Importance: low

实现 org.apache.kafka.streams.state.RocksDBConfigSetter 接口的 Rocks DB 配置集类或类名称。

send.buffer.bytes

type: int
Default: 131072(128 kibibytes)
Valid Values: [-1,…​]
Importance: low

发送数据时使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,将使用 OS 默认值。

state.cleanup.delay.ms

type: long
Default: 600000(10 分钟)
Importance: low

分区迁移后,在删除状态前等待的时间(毫秒)。只有尚未修改 state.cleanup.delay.ms 的状态目录才会被删除。

upgrade.from

type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
Importance: low

允许以向后兼容的方式升级.从 [0.10.0、1.1] 升级到 2.0+ 或者从 [2.0, 2.3] 升级到 2.4+ 时需要此项。当从 2.4 升级到更新的版本时,不需要指定此配置。默认值为 null。可接受的值为 "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.2", "2.3"(用于从对应的旧版本升级)。

window.size.ms

type: long
Default: null
Importance: low

设置 deserializer 的窗口大小,以便计算窗口结束时间。

windowstore.changelog.additional.retention.ms

type: long
Default: 86400000(1 天)
Importance: low

添加至窗口维护M,以确保不会从日志中永久删除数据。允许时钟偏移.默认值为 1 天。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.