第 7 章 Kafka Streams 配置属性


application.id

type: string
Importance: high

流处理应用的标识符。在 Kafka 集群中必须是唯一的。它被用作默认 client-id 前缀,即 2)用于成员资格管理的 group-id,3) changelog 主题前缀。

bootstrap.servers

type: list
Importance: high

用于建立到 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器与此处为引导指定的服务器无关 - 此列表仅影响用于发现完整服务器集的初始主机。此列表的格式应为 host1:port1,host2:port2,…​。由于这些服务器仅用于初始连接来发现完整的群集成员身份(可能会动态更改),因此此列表不需要包含整组服务器(但是如果服务器停机,您可能需要多个服务器)。

num.standby.replicas

Type: int
Default: 0
Importance: high

每个任务的待机副本数。

state.dir

type: string
Default: /tmp/kafka-streams
Importance: high

状态存储的目录位置。这个路径对于共享同一底层文件系统的每个流实例都必须是唯一的。

acceptable.recovery.lag

type: long
Default: 10000
Valid Values: [0,…​]
Importance: medium

可接受的最大滞后(要捕获的偏移数),客户端被视为足以接收活跃的任务分配。在分配后,它仍然会在处理前恢复更改日志的其余部分。为了避免在重新平衡过程中暂停处理,这个配置应该在一个给定工作负载的一分钟内与恢复时间相对应。必须至少为 0。

cache.max.bytes.buffering

type: long
Default: 10485760
Valid Values: [0,…​]
Importance: medium

在所有线程之间用于缓冲的最大内存字节数。

client.id

Type: string
Default: ""
Importance: medium

用于内部消费者、生成者和 restore-consumer 的客户端 ID 的 ID 前缀字符串,特征为 < client.id>-StreamThread-<threadSequenceNumber$gt;-<consumer|producer|restore-consumer >。

default.deserialization.exception.handler

type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium

实现 org.apache.kafka.streams.errors.DeserializationExceptionHandler 接口的异常处理类。

default.key.serde

Type: class
Default: null
Importance: medium

用于实现 org.apache.kafka.common.serialization.Serde 接口的密钥的默认序列化器/反序列化器类。请注意,当使用窗口的 serde 类时,还需要设置实现 org.apache.kafka.common.serialization.Serde 接口(通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner')的 inner serde 类。

default.list.key.serde.inner

Type: class
Default: null
Importance: medium

默认内部类是实施 org.apache.kafka.common.serialization.Serde 接口的关键。只有在 default.key.serde 配置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde 时,才会读取此配置。

default.list.key.serde.type

Type: class
Default: null
Importance: medium

用于实施 java.util.List 接口的密钥的默认类。当使用列表 serde 类时,只有 default.key.serde 配置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde s.ListSerde,才会读取此配置,它需要设置实现 org.apache.kafka.common.serialization.Serde 接口的 inner serde 类。

default.list.value.serde.inner

Type: class
Default: null
Importance: medium

默认内部类是实施 org.apache.kafka.common.serialization.Serde 接口的值。只有在 default.value.serde 配置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde 时,才会读取此配置。

default.list.value.serde.type

Type: class
Default: null
Importance: medium

用于实现 java.util.List 接口的值的默认类。只有在 default.value.serde 配置且只有这个设置被设置为 org.apache.kafka.common.serialization.Serdes.ListSerde 是才会从这个配置中读取。请注意,当使用 list serde 类时,一个需要设置 inner serde 类,它实现了 org.apache.kafka.common.serialization.Serde 接口(通过 'default.list.value.serde.inner')。

default.production.exception.handler

type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium

实现 org.apache.kafka.streams.errors.ProductionExceptionHandler 接口的异常处理类。

default.timestamp.extractor

type: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium

实现 org.apache.kafka.streams.processor.TimestampExtractor 接口的默认时间戳提取器类。

default.value.serde

Type: class
Default: null
Importance: medium

默认序列化器/反序列化器类用于实现 org.apache.kafka.common.serialization.Serde 接口的值。请注意,当使用窗口的 serde 类时,还需要设置实现 org.apache.kafka.common.serialization.Serde 接口(通过 'default.windowed.key.serde.inner' 或 'default.windowed.value.serde.inner')的 inner serde 类。

max.task.idle.ms

Type: long
Default: 0
Importance: medium

此配置控制加入和合并是否可能会产生超出顺序的结果。当流任务完全在某些(但不全部)输入分区上等待生成者发送额外记录并避免在多个输入流间处理时,配置值是流任务的最大时间(以毫秒为单位)。默认(零)不会等待生成者发送更多记录,但它会等待代理上已存在的数据。此默认值意味着对于已在代理上存在的记录,流将以时间戳顺序处理它们。设置为 -1 可完全禁用闲置并完全处理任何本地可用数据,即使这样做可能会生成没有顺序处理。

max.warmup.replicas

type: int
Default: 2
Valid Values: [1,…​]
Importance: medium

一次可以一次分配的 warmup 副本的最大数量(除配置的 num.standbys 之外),保持任务在一个实例上可用,同时将其重新分配给另一个实例。用于节流额外的代理流量和集群状态可用于高可用性。必须至少为 1.Note,一个 warmup 副本对应于一个流任务。另外,请注意,每个温副本只能在重新平衡期间提升到活跃任务(通常在所谓的探测重新平衡过程中,这会在 probing.rebalance.interval.ms 配置指定的频率时发生)。这意味着,活跃任务可以从一个 Kafka Streams 实例迁移到另一个实例的最大速率由(max.warmup.replicas / probing.rebalance.interval.ms)决定。

num.stream.threads

Type: int
Default: 1
Importance: medium

执行流处理的线程数量。

processing.guarantee

Type: string
Default: at_least_once
Valid Values: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
Importance: medium

处理保证应使用。可能的值有 at_least_once (默认)和 exactly_once_v2 (需要代理版本 2.5 或更高版本)。弃用的选项为 exactly_once (需要代理版本 0.11.0 或更高版本)和 exactly_once_beta (需要代理版本 2.5 或更高版本)。请注意,完全处理需要至少三个代理的集群,这是生产的建议设置;通过调整代理设置 transaction.state.log.replication.factortransaction.state.log.min.isr

rack.aware.assignment.non_overlap_cost

type: int
Default: null
Importance: medium

与从现有分配移动任务相关的成本。这个 config 和 rack.aware.assignment.assignment.traffic_cost 控制优化算法是否首选最小化跨机架流量,或最小化现有分配中的任务移动。如果设置较大的值 org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor 将优化以维护现有分配。默认值为 null,这意味着它将在不同的分配器中使用默认的 non_overlap 成本值。

rack.aware.assignment.strategy

Type: string
Default: none
Valid Values: [none, min_traffic]
Importance: medium

我们用于机架感知分配的策略。在分配任务以最小化跨机架流量时,机架感知分配将把 client.rackracks 纳入考量。有效设置是 : none (默认),它将禁用机架感知分配; min_traffic,它将计算最小跨机架流量分配。

rack.aware.assignment.tags

type: list
Default: ""
Valid Values: List contains maximum 5 elements
Importance: medium

用于在 Kafka Streams 实例之间分发备用副本的客户端标签密钥列表。配置后,Kafka Streams 将有一个 best-effort 来在每个客户端标签维度上分发待机任务。

rack.aware.assignment.traffic_cost

type: int
Default: null
Importance: medium

与跨机架流量相关的成本。此 config 和 rack.aware.assignment.non_overlap_cost 控制优化算法是否首选最小化跨机架流量,或最小化现有分配中的任务移动。如果设置较大的值 org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor 将优化以最小化跨机架流量。默认值为 null,这意味着它将在不同的分配器中使用默认流量成本值。

replication.factor

type: int
Default: -1
Importance: medium

更改由流处理应用程序创建的日志主题和重新分区主题的复制因素。默认值 -1 (主题:使用代理默认复制因素)需要代理版本 2.4 或更新版本。

security.protocol

Type: string
Default: PLAINTEXT
Valid Values: (case insensitive)[SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT]
Importance: medium

用于与代理通信的协议。有效值为: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。

statestore.cache.max.bytes

type: long
Default: 10485760 (10 mebibytes)
Valid Values: [0,…​]
Importance: medium

在所有线程中用于 statestore 缓存的最大内存字节数。

task.timeout.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…​]
Importance: medium

任务因为内部错误而停止的最大时间(以毫秒为单位),并重试直到引发错误为止。对于超时为 0ms,任务会引发第一个内部错误的错误。对于大于 0ms 的任何超时,任务将在引发错误前至少重试一次。

topology.optimization

Type: string
Default: none
Valid Values: org.apache.kafka.streams.StreamsConfig$$Lambda/0x00007fe89800a400@31cefde0
Importance: medium

如果应该优化拓扑以及要应用的优化,则告知 Kafka Streams 的配置。可接受值为:"NO_OPTIMIZATION", "OPTIMIZE", "OPTIMIZE",或以逗号分隔的特定优化列表:"REUSE_KTABLE_SOURCE_TOPICS", "MERGE_REPARTITION_TOPICS" + "SINGLE_STORE_SELF_JOIN+")."NO_OPTIMIZATION", "MERGE_REPARTITION_TOPATION"

application.server

Type: string
Default: ""
Importance: low

host:port 对指向用户定义的端点,可用于此 KafkaStreams 实例上的状态存储发现和交互式查询。

auto.include.jmx.reporter

Type: boolean
Default: true
Importance: low

已弃用。是否自动包含 JmxReporter,即使它在 metric.reporters 中未列出。此配置将在 Kafka 4.0 中删除,用户应在 metric.reporters 中包含 org.apache.kafka.common.metrics.JmxReporter,以启用 JmxReporter。

buffered.records.per.partition

type: int
Default: 1000
Importance: low

每个分区要缓冲的最大记录数。

built.in.metrics.version

Type: string
Default: latest
Valid Values: [latest]
Importance: low

要使用的内置指标的版本。

commit.interval.ms

type: long
Default: 30000 (30 秒)
Valid Values: [0,…​]
Importance: low

提交处理进度的频率(以毫秒为单位)。对于 at-least-once 处理,提交方法可以保存处理器的位置(如偏移)。对于精确处理,需要提交包含保存位置的事务,并使输出主题中的提交数据对具有隔离级别 read_committed 的用户可见。(请注意,如果 processing.guarantee 设置为 exactly_once_v2, exactly_once,则默认值为 100,否则默认值为 30000

connections.max.idle.ms

Type: long
Default: 540000 (9 minutes)
Importance: low

在这个配置指定的毫秒数后关闭闲置连接。

default.client.supplier

Type: class
Default: org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
Importance: low

实现 org.apache.kafka.streams.KafkaClientSupplier 接口的客户端供应商类。

default.dsl.store

type: string
Default: rocksDB
Valid Values: [rocksDB, in_memory]
Importance: low

DSL 运算符使用的默认状态存储类型。

metadata.max.age.ms

Type: long
Default: 300000 (5 minutes)
Valid Values: [0,…​]
Importance: low

即使我们未看到任何分区领导力更改来主动发现任何新的代理或分区,我们才会强制刷新元数据的时间(以毫秒为单位)。

metric.reporters

Type: list
Default: ""
Importance: low

用作指标报告器的类列表。实施 org.apache.kafka.common.metrics.MetricsReporter 接口,允许插入将收到新指标创建通知的类。总是包括 JmxReporter 来注册 JMX 统计信息。

metrics.num.samples

type: int
Default: 2
Valid Values: [1,…​]
Importance: low

为计算指标维护的示例数量。

metrics.recording.level

Type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low

指标的最高记录级别。

metrics.sample.window.ms

type: long
Default: 30000 (30 秒)
Valid Values: [0,…​]
Importance: low

计算指标示例的时间窗口。

poll.ms

Type: long
Default: 100
Importance: low

阻止等待输入的时间(以毫秒为单位)。

probing.rebalance.interval.ms

Type: long
Default: 600000 (10 minutes)
Valid Values: [60000,…​]
Importance: low

触发重新平衡以探测到探测到回收完成并准备好激活的温副本前等待的时间(以毫秒为单位)。在分配平衡前,将继续触发重新平衡。必须至少为 1 分钟。

receive.buffer.bytes

type: int
Default: 32768 (32 kibibytes)
Valid Values: [-1,…​]
Importance: low

读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,则使用操作系统默认值。

reconnect.backoff.max.ms

type: long
Default: 1000 (1 second)
Valid Values: [0,…​]
Importance: low

重新连接到重复连接失败的代理时等待的最大时间(以毫秒为单位)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,直到最高值。在计算 backoff 增长后,添加了 20% 的随机 jitter,以避免连接状况。

reconnect.backoff.ms

type: long
Default: 50
Valid Values: [0,…​]
Importance: low

尝试重新连接到给定主机前等待的基本时间。这可避免在紧密循环中重复连接到主机。此 backoff 应用到客户端到代理的所有连接尝试。

repartition.purge.interval.ms

type: long
Default: 30000 (30 秒)
Valid Values: [0,…​]
Importance: low

从重新分区主题中删除完全消耗的记录的频率(以毫秒为单位)。清除将在最后一次清除以来至少在这个值后进行,但可能会延迟直到之后为止。(请注意,与 commit.interval.ms 不同,当 处理时,这个值的默认值保持不变。guarantee 被设置为 exactly_once_v2

request.timeout.ms

type: int
Default: 40000 (40 seconds)
Valid Values: [0,…​]
Importance: low

配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在需要时重新发送请求(如果重试用时失败)。

retries

type: int
Default: 0
Valid Values: [0,…​,2147483647]
Importance: low

设置大于零的值将导致客户端重新发送失败并可能出现临时错误的请求。建议将值设为 0 或 MAX_VALUE,并使用对应的超时参数来控制客户端应重试请求的时长。

retry.backoff.ms

Type: long
Default: 100
Valid Values: [0,…​]
Importance: low

尝试重试对给定主题分区失败的请求前等待的时间。这可避免在某些故障场景中的紧密循环中重复发送请求。

rocksdb.config.setter

Type: class
Default: null
Importance: low

实现 org.apache.kafka.streams.state.RocksDBConfigSetter 接口的 Rocks DB 配置 setter 类或类名称。

send.buffer.bytes

type: int
Default: 131072 (128 kibibytes)
Valid Values: [-1,…​]
Importance: low

发送数据时要使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,则使用操作系统默认值。

state.cleanup.delay.ms

Type: long
Default: 600000 (10 minutes)
Importance: low

分区迁移时,在删除状态前等待的时间(以毫秒为单位)。只有尚未修改至少 状态的目录。cleanup.delay.ms 将被删除。

upgrade.from

Type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4]
Importance: low

允许以向后兼容的方式进行升级。从 [0.10.0.0, 1.1] 升级到 2.0+,或者从 [2.0, 2.3] 升级到 2.4+ 时需要。当从 3.3 升级到更新的版本时,不需要指定此配置。默认为 null。接受的值为 "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.1", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "3.0", "3.1", "3.2", "3.3", "3.4", "3.4"

window.size.ms

Type: long
Default: null
Importance: low

为反序列化器设置窗口大小,以计算窗口结束时间。

windowed.inner.class.serde

Type: string
Default: null
Importance: low

窗口记录的内类的默认序列化器/反序列化器。必须实施 org.apache.kafka.common.serialization.Serde 接口。请注意,在 KafkaStreams 应用程序中设置此配置会导致错误,因为它只用于从 Plain consumer 客户端中使用。

windowstore.changelog.additional.retention.ms

type: long
Default: 86400000 (1 day)
Importance: low

向窗口添加 maintainMs,以确保不会预先从日志中删除数据。允许时钟偏移。默认值为 1 天。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.