第7章 Kafka ストリームの設定プロパティー


application.id

型: string
重要度:

ストリーム処理アプリケーションの識別子。Kafka クラスター内で一意である必要があります。これは、1) デフォルトのクライアント ID の接頭辞、2) メンバーシップ管理のためのグループ ID、3) Changelog のトピックの接頭辞、として使用されます。

bootstrap.servers

型: list
重要度:

Kafka クラスターへの最初の接続を確立するために使用されるホストとポートのペアのリスト。クライアントは、ブートストラップ用にここで指定されたサーバーに関係なく、すべてのサーバーを利用します。このリストは、サーバーのフルセットを検出するために使用される最初のホストにのみ影響します。このリストは、host1:port1,host2:port2,…​ の形式にする必要があります。これらのサーバーは、(動的に変更される可能性がある) 完全なクラスターメンバーシップを検出するための最初の接続にだけ使用されるため、このリストにはサーバーの完全なセットを含める必要はありません (ただし、サーバーがダウンした場合に備えて、複数のサーバーが必要になる場合があります)。

num.standby.replicas

型: int
デフォルト: 0
重要度:

各タスクのスタンバイレプリカ数。

state.dir

型: string
デフォルト: /tmp/kafka-streams
重要度:

状態ストアのディレクトリーの場所。このパスは、同じ基礎となるファイルシステムを共有するストリームインスタンスごとに一意である必要があります。

acceptable.recovery.lag

型: long
デフォルト: 10000
有効な値: [0,…​]
重要度:

クライアントがアクティブなタスク割り当てを受け取るのに十分なほど追いついたと見なされる最大許容ラグ (追いつくまでのオフセット数)。割り当て時に、処理前に残りの変更ログを復元します。リバランス中の処理の一時停止を回避するために、この設定は、特定のワークロードの 1 分未満の回復時間に対応する必要があります。0 以上である必要があります。

cache.max.bytes.buffering

型: long
デフォルト: 10485760
有効な値: [0,…​]
重要度:

すべてのスレッドでバッファーするのに使用されるメモリーバイトの最大数。

client.id

型: string
デフォルト: ""
重要度:

内部 [main-|restore-|global-] コンシューマー、プロデューサー、および管理クライアントのクライアント ID に使用される ID 接頭辞文字列 (パターン <client.id>-[Global]StreamThread[-<threadSequenceNumber$gt;]-<consumer|producer|restore-consumer|global-consumer>)。

default.deserialization.exception.handler

型: class
デフォルト: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
重要度:

org.apache.kafka.streams.errors.DeserializationExceptionHandler インターフェイスを実装する例外処理クラス。

default.key.serde

型: class
デフォルト: null
重要度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装するキーのデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serde インターフェイスを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。

default.list.key.serde.inner

型: class
デフォルト: null
重要度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装するキーの list serde のデフォルトの内部クラス。この設定は、default.key.serde 設定が org.apache.kafka.common.serialization.Serdes.ListSerde に設定されている場合にのみ読み取られます。

default.list.key.serde.type

型: class
デフォルト: null
重要度:

java.util.List インターフェイスを実装するキーのデフォルトクラス。この設定は、default.key.serde 設定が org.apache.kafka.common.serialization.Serdes.ListSerde に設定されている場合にのみ読み取られます。リスト serde クラスが使用されている場合は、org.apache.kafka.common.serialization.Serde インターフェイスを 'default.list.key.serde.inner' 経由で実装する内部 serde クラスを設定する必要がある点に注意してください。

default.list.value.serde.inner

型: class
デフォルト: null
重要度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装する値の list serde のデフォルトの内部クラス。この設定は、default.value.serde 設定が org.apache.kafka.common.serialization.Serdes.ListSerde に設定されている場合にのみ読み取られます。

default.list.value.serde.type

型: class
デフォルト: null
重要度:

java.util.List インターフェイスを実装する値のデフォルトクラス。この設定は、default.value.serde 設定が org.apache.kafka.common.serialization.Serdes.ListSerde に設定されている場合にのみ読み取られます。リスト serde クラスが使用されている場合は、org.apache.kafka.common.serialization.Serde インターフェイスを 'default.list.value.serde.inner' 経由で実装する内部 serde クラスを設定する必要がある点に注意してください。

default.production.exception.handler

型: class
デフォルト: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
重要度:

org.apache.kafka.streams.errors.ProductionExceptionHandler インターフェイスを実装する例外処理クラス。

default.timestamp.extractor

型: class
デフォルト: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
重要度:

org.apache.kafka.streams.processor.TimestampExtractor インターフェイスを実装した、デフォルトのタイムスタンプ抽出クラスです。

default.value.serde

型: class
デフォルト: null
重要度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装する値のデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serde インターフェイスを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。

max.task.idle.ms

型: long
デフォルト: 0
重要度:

この設定は、結合とマージが順不同の結果を生成するかどうかを制御します。この設定値は、ストリームタスクが、一部の (すべてではない) 入力パーティションに完全に追いついたときに、プロデューサーが追加のレコードを送信するのを待ち、複数の入力ストリームにまたがるレコード処理の順序がずれる可能性を避けるためにアイドル状態を保つ最大時間 (ミリ秒単位) になります。デフォルト (ゼロ) は、プロデューサーがさらにレコードを送信するのを待ちませんが、ブローカーにすでに存在するデータをフェッチするのを待ちます。このデフォルトは、ブローカーにすでに存在するレコードの場合、Streams がそれらをタイムスタンプ順に処理することを意味します。-1 に設定すると、アイドリングを完全に無効にし、ローカルで利用可能なデータをすべて処理しますが、このように処理する場合でも、順序外の処理が発生する可能性があります。

max.warmup.replicas

型: int
デフォルト: 2
有効な値: [1,…​]
重要度:

タスクが再割り当てされたあるインスタンスでウォームアップしている間に、別のインスタンスでタスクを利用できるようにするために、一度に割り当てることができるウォームアップレプリカ (設定された num.standbys を超える追加のスタンドバイ) の最大数です。高可用性を確保するために追加のブローカートラフィックとクラスターの状態を調整するのに使用されます。少なくとも 1 である必要があります。1 つのウォームアップレプリカが 1 つのストリームタスクに対応することに注意してください。さらに、各ウォームアップレプリカは、リバランス中 (通常は、probing.rebalance.interval.ms 設定で指定された頻度で発生する、いわゆるプローブリバランス中) にのみアクティブタスクに昇格できることに注意してください。これは、アクティブなタスクを 1 つの Kafka Streams インスタンスから別のインスタンスに移行できる最大速度は (max.warmup.replicas / probing.rebalance.interval.ms) によって決定できることを意味します。

num.stream.threads

型: int
デフォルト: 1
重要度:

ストリーム処理を実行するスレッドの数。

processing.guarantee

型: string
デフォルト: at_least_once
有効な値: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
重要度:

使用されるべき処理保証です。使用できる値は、at_least_once (デフォルト) および exactly_once_v2 (ブローカーバージョン 2.5 以降が必要) です。非推奨のオプションは exactly_once (ブローカーバージョン 0.11.0 以降が必要) および exactly_once_beta (ブローカーバージョン 2.5 以降が必要) です。1 回だけの処理には、デフォルトで少なくとも 3 つのブローカーのクラスターが必要であることに注意してください。これは、実稼働環境に推奨される設定です。開発の場合、ブローカー設定 transaction.state.log.replication.factor および transaction.state.log.min.isr を調整することにより、これを変更できます。

rack.aware.assignment.non_overlap_cost

型: int
デフォルト: null
重要度:

既存の割り当てからのタスクの移動に関連するコスト。この設定と rack.aware.assignment.traffic_cost は、最適化アルゴリズムがラック間トラフィックの最小化を優先するか、既存の割り当てにおけるタスクの移動を最小化するかを制御します。より大きな値を設定すると、org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor は既存の割り当てを維持するように最適化します。デフォルト値は null で、異なるアサイナーでデフォルトの non_overlap コスト値を使用することを意味します。

rack.aware.assignment.strategy

型: string
デフォルト: none
有効な値: [none, min_traffic, balance_subtopology]
重要度:

ラック考慮割り当てに使用する戦略。ラック考慮割り当てでは、ラック間のトラフィックを最小限に抑えるために、タスクを割り当てるときに client.rackTopicPartitionracks が考慮されます。有効な設定は、ラック考慮割り当てを無効にする none (デフォルト)、最小ラック間トラフィック割り当てを計算する min_traffic、最小ラック間トラフィックを計算し、同じサブトポロジーのタスクを異なるクライアント間でバランスよく分散するよう試みる balance_subtopology です。

rack.aware.assignment.tags

型: list
デフォルト: ""
有効な値: 最大 5 つの要素を含むリスト
重要度:

Kafka Streams インスタンス間でスタンバイレプリカを配布するために使用されるクライアントタグキーのリスト。設定すると、Kafka ストリームは、各クライアントタグディメンションにスタンバイタスクを分散するために最善を尽くします。

rack.aware.assignment.traffic_cost

型: int
デフォルト: null
重要度:

ラック間トラフィックに関連するコスト。この設定と rack.aware.assignment.non_overlap_cost は、最適化アルゴリズムがラック間トラフィックの最小化を優先するか、既存の割り当てにおけるタスクの移動を最小化するかを制御します。より大きな値を設定すると、org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor はラック間トラフィックを最小化するように最適化します。デフォルト値は null で、異なるアサイナーでデフォルトのトラフィックコスト値が使用されることを意味します。

replication.factor

型: int
デフォルト: -1
重要度:

ストリーム処理アプリケーションによって作成されたログトピックおよびパーティショントピックを変更するためのレプリケーション係数。デフォルトの -1 (つまり、ブローカーのデフォルトレプリケーションファクターを使用) にはブローカーバージョン 2.4 以降が必要です。

security.protocol

型: string
デフォルト: PLAINTEXT
有効な値: (大文字と小文字の区別なし) [SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT]
重要度:

ブローカーとの通信に使用されるプロトコル。有効な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。

statestore.cache.max.bytes

型: long
デフォルト: 10485760 (10 メビバイト)
有効な値: [0,…​]
重要度:

すべてのスレッドにわたるステートストアキャッシュに使用されるメモリーバイトの最大数。

task.timeout.ms

型: long
デフォルト: 300000 (5 分)
有効な値: [0,…​]
重要度:

内部エラーが原因でタスクが停止し、エラーが発生するまで再試行する最大時間 (ミリ秒単位)。タイムアウトが 0ms の場合、タスクは最初の内部エラーに対してエラーを発生させます。タイムアウトが 0ms を超える場合、タスクはエラーが発生する前に少なくとも 1 回再試行します。

topology.optimization

型: string
デフォルト: なし
有効な値: org.apache.kafka.streams.StreamsConfig$$Lambda$31/0x00007f269c00d000@7e32c033
重要度:

Kafka Streams にトポロジーを最適化する必要があるかどうか、およびどのような最適化を適用するかを指示する設定。使用できる値は、NO_OPTIMIZATION、OPTIMIZE、または特定の最適化のコンマ区切りリストです (REUSE_KTABLE_SOURCE_TOPICS、MERGE_REPARTITION_TOPICS + SINGLE_STORE_SELF_JOIN+)。デフォルトでは NO_OPTIMIZATION。

application.server

型: string
デフォルト: ""
重要度:

この KafkaStreams インスタンスでの状態ストア検出とインタラクティブなクエリーに使用できるユーザー定義のエンドポイントを参照する host:port ペア。

auto.include.jmx.reporter

型: boolean
デフォルト: true
重要度:

非推奨。JmxReporter が metric.reporters にリストされていない場合でも、自動的に含めるかどうか。この設定は Kafka 4.0 で削除される予定です。JmxReporter を有効にするには、代わりに org.apache.kafka.common.metrics.JmxReportermetric.reporters に含める必要があります。

buffered.records.per.partition

型: int
デフォルト: 1000
重要度:

パーティションごとにバッファーを行う最大レコード数。

built.in.metrics.version

型: string
デフォルト: latest
有効な値: [latest]
重要度:

使用する組み込みメトリックのバージョン。

commit.interval.ms

型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…​]
重要度:

処理の進行状況をコミットする頻度 (ミリ秒単位)。少なくとも 1 回の処理の場合、コミットとは、プロセッサーの位置 (つまり、オフセット) を保存することを意味します。1 回限りの処理の場合、トランザクションをコミットすることを意味します。これには、位置を保存し、出力トピック内のコミットされたデータを分離レベル read_committed でコンシューマーが表示できるようにすることが含まれます。(processing.guaranteeexactly_once_v2exactly_once に設定されている場合、デフォルト値は 100 になり、これ以外の場合のデフォルト値は 30000 になる点に注意してください。

connections.max.idle.ms

型: long
デフォルト: 540000 (9 分)
重要度:

この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。

default.client.supplier

型: クラス
デフォルト: org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
重要度:

org.apache.kafka.streams.KafkaClientSupplier インターフェイスを実装するクライアントサプライヤークラス。

default.dsl.store

型: string
デフォルト: rocksDB
有効な値: [rocksDB, in_memory]
重要度:

DSL Operator が使用するデフォルトの状態ストアタイプ。

dsl.store.suppliers.class

型: class
デフォルト: org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers
重要度:

DSL Operator にプラグインするストア実装を定義します。org.apache.kafka.streams.state.DslStoreSuppliers インターフェイスを実装する必要があります。

enable.metrics.push

型: boolean
デフォルト: true
重要度:

クラスターにクライアントと一致するクライアントメトリクスサブスクリプションがある場合に、内部 [main-|restore-|global] コンシューマー、プロデューサー、および管理クライアントメトリクスのクラスターへのプッシュを有効にするかどうか。

metadata.max.age.ms

型: long
デフォルト: 300000 (5 分)
有効な値: [0,…​]
重要度:

新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。

metric.reporters

型: list
デフォルト: ""
重要度:

メトリクスレポーターとして使用するクラスの一覧。org.apache.kafka.common.metrics.MetricsReporter インターフェイスを実装すると、新しいメトリックの作成が通知されるクラスのプラグが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。

metrics.num.samples

型: int
デフォルト: 2
有効な値: [1,…​]
重要度:

メトリクスを計算するために保持されるサンプルの数。

metrics.recording.level

型: string
デフォルト: INFO
有効な値: [INFO, DEBUG, TRACE]
重要度:

メトリックの最も高い記録レベル。

metrics.sample.window.ms

型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…​]
重要度:

メトリックサンプルが計算される時間枠。

poll.ms

型: long
デフォルト: 100
重要度:

入力を待つためにブロックする時間をミリ秒単位で指定します。

probing.rebalance.interval.ms

型: long
デフォルト: 600000 (10 分)
有効な値: [60000,…​]
重要度:

ウォーミングアップが終了し、アクティブになる準備ができているウォームアップレプリカをプローブするためにリバランスをトリガーするまで待機する最大時間 (ミリ秒単位)。プローブリバランスは、割り当てが分散されるまで引き続きトリガーされます。1 分以上でなければなりません。

receive.buffer.bytes

型: int
デフォルト: 32768 (32 kibibytes)
有効な値: [-1,…​]
重大度:

データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

reconnect.backoff.max.ms

型: long
デフォルト: 1000 (1 秒)
有効な値: [0,…​]
重要度:

接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。

reconnect.backoff.ms

型: long
デフォルト: 50
有効な値: [0,…​]
重要度:

特定のホストへの再接続を試みる前の基本的な待機時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、クライアントによるブローカーへのすべての接続試行に適用されます。この値は初期バックオフ値であり、連続して接続に失敗するたびに、最大で reconnect.backoff.max.ms の値まで指数関数的に増加します。

repartition.purge.interval.ms

型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…​]
重要度:

再分割トピックから完全に消費されたレコードを削除する頻度 (ミリ秒単位)。パージは、最後のパージから少なくともこの値の後に行われますが、それ以降まで遅れる場合があります。(commit.interval.ms とは異なり、processing.guaranteeexactly_once_v2 に設定されている場合、この値のデフォルトは変更されないことに注意してください)。

request.timeout.ms

型: int
デフォルト: 40000 (40 秒)
有効な値: [0,…​]
重要度:

この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。

retries

型: int
デフォルト: 0
有効な値: [0,…​,2147483647]
重要度:

ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性がある失敗した要求を再送信します。値をゼロまたは MAX_VALUE のいずれかに設定し、対応するタイムアウトパラメーターを使用して、クライアントがリクエストを再試行する期間を制御することが推奨されます。

retry.backoff.ms

型: long
デフォルト: 100
有効な値: [0,…​]
重要度:

特定のトピックパーティションに対して失敗したリクエストを再試行するまでの待機時間。これにより、一部の障害シナリオでタイトループでリクエストを繰り返し送信することを回避できます。この値は初期バックオフ値であり、要求が失敗するたびに、最大で retry.backoff.max.ms の値まで指数関数的に増加します。

rocksdb.config.setter

型: class
デフォルト: null
重要度:

org.apache.kafka.streams.state.RocksDBConfigSetter インターフェイスを実装する Rocks DB 設定セッタークラスまたはクラス名。

send.buffer.bytes

型: int
デフォルト: 131072 (128 キビバイト)
有効な値: [-1,…​]
重要度:

データの送信時に使用する TCP 送信バッファー (SO_SNDBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

state.cleanup.delay.ms

型: long
デフォルト: 600000 (10 分)
重要度:

パーティションが移行されたときに状態を削除するまで待機する時間 (ミリ秒単位)。少なくとも state.cleanup.delay.ms の間変更されていない state ディレクトリーのみが削除されます。

upgrade.from

型: string
デフォルト: null
有効な値: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6]
重要度:

後方互換性のある方法でのアップグレードを許可します。これは、[0.10.0, 1.1] から 2.0+ にアップグレード、または [2.0, 2.3] から 2.4+ にアップグレードする際に必要です。3.3 から新しいバージョンにアップグレードする場合は、この設定を指定する必要はありません。デフォルトは null です。受け入れられる値は、0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1、2.0、2.1、2.2、2.3、2.4、2.5、2.6、2.7、2.8、3.0、3.1、3.2、3.3、3.4、3.5、3.6 です (対応する旧バージョンからのアップグレード用)。

window.size.ms

型: long
デフォルト: null
重要度:

ウィンドウの終了時間を計算するためにデシリアライザーのウィンドウサイズを設定します。

windowed.inner.class.serde

型: string
デフォルト: null
重要度:

ウィンドウ表示されたレコードの内部クラスのデフォルトのシリアライザー/デシリアライザー。org.apache.kafka.common.serialization.Serde インターフェイスを実装する必要があります。KafkaStreams アプリケーションでこの設定を設定すると、Plain コンシューマークライアントからのみ使用されることが意図されているため、エラーが発生することに注意してください。

windowstore.changelog.additional.retention.ms

型: long
デフォルト: 86400000 (1 日)
重要度:

windows maintainMs に追加され、データがログから早期に削除されないようにします。クロックドリフトを許可します。デフォルトは 1 日です。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat, Inc.