付録G Kafka Streams 設定パラメーター
application.id
type: string
Importance: high
ストリーム処理アプリケーションの ID。Kafka クラスター内で一意である必要があります。これは、デフォルトの client-id プレフィックスである 1)として使用されます。2)メンバーシップ管理の group-id、および changelog トピックプレフィックスは 3 です。
bootstrap.servers
type: list
Importance: high
Kafka クラスターへの最初の接続を確立するために使用するホストとポートのペアの一覧。クライアントは、ここで指定するすべてのサーバーの使用を行います。このリストは、サーバーのフルセットを検出するために使用される初期ホストにのみ影響します。このリストは
host1:port1,host2:port2,…
の形式にする必要があります。これらのサーバーは、クラスターの全メンバーシップを検出するために初期接続(動的に変更されている可能性がある)にだけ使用されているため、この一覧にはサーバーのフルセットを含める必要はありません(サーバーがダウンした場合の場合もあります)。replication.factor
type: int
デフォルト: 1
の修正: high
変更ログトピックおよびストリーム処理アプリケーションによって作成されるトピックの再パーティショントピックのレプリケーション係数。ブローカークラスターがバージョン 2.4 以降にある場合は、-1 を設定してブローカーのデフォルトのレプリケーション係数を使用できます。
state.dir
type: string
Default: /tmp/kafka-streams
Importance: high
ステートストアのディレクトリーの場所。このパスは、同じ基礎となるファイルシステムを共有するストリームインスタンスごとに固有でなければなりません。
acceptable.recovery.lag
type: long
Default: 10000
Valid Values: [0,…]
Importance: medium
アクティブなタスクでクライアントがキャッチされる最大ラグ(オフセットの数)。指定のワークロードの 1 分未満のリカバリー時間に対応します。0 以上でなければなりません。
cache.max.bytes.buffering
type: long
デフォルト: 10485760
有効な値: [0,…]
Importance: medium
すべてのスレッドでバッファーに使用されるメモリーバイトの最大数。
client.id
type: string
デフォルト: ""
Importance: medium
内部コンシューマー、プロデューサー、および restore-consumer のクライアント ID に使用される ID プレフィックス文字列。パターン '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'。
default.deserialization.exception.handler
type: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium
org.apache.kafka.streams.errors.DeserializationExceptionHandler
インターフェースを実装する例外処理クラス。default.key.serde
type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium
org.apache.kafka.common.serialization.Serde
インターフェースを実装するキーのデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスが使用されている場合は、'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' でもorg.apache.kafka.common.serialization.Serde
インターフェースを実装する内部クラスを設定する必要があります。default.production.exception.handler
type: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium
org.apache.kafka.streams.errors.ProductionExceptionHandler
インターフェースを実装する例外処理クラス。default.timestamp.extractor
Type: class
デフォルト: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium
org.apache.kafka.streams.processor.TimestampExtractor
インターフェースを実装するデフォルトのタイムスタンプ抽出。default.value.serde
type: class
Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Importance: medium
org.apache.kafka.common.serialization.Serde
インターフェースを実装する値のデフォルトシリアライザー/デシリアライザークラス。windowed serde クラスが使用されている場合は、'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' でもorg.apache.kafka.common.serialization.Serde
インターフェースを実装する内部クラスを設定する必要があります。default.windowed.key.serde.inner
type: class
Default: null
Importance: medium
ウィンドウ化されたキーの内部クラスのデフォルトのシリアライザー/デシリアライザー。
org.apache.kafka.common.serialization.Serde
インターフェースを実装する必要があります。default.windowed.value.serde.inner
type: class
Default: null
Importance: medium
ウィンドウ化された値の内部クラスのデフォルトのシリアライザー/デシリアライザー。
org.apache.kafka.common.serialization.Serde
インターフェースを実装する必要があります。max.task.idle.ms
タイプ: long
デフォルト: 0
のインポート: 中
パーティションバッファーにすべてのパーティションバッファーにレコードが含まれていなければ、ストリームタスクがアイドル状態になる最大時間(ミリ秒単位)。これにより、複数の入力ストリーム間で順序不足のレコード処理の可能性を回避します。
max.warmup.replicas
type: int
Default: 2
Valid Values: [1,…]
Importance: medium
ウォームアップレプリカの最大数(設定された num.standbys 以上の期間で 1 つのインスタンスがタスクを利用可能にする目的で 1 度に割り当てることができるウォームスタンバイ)の最大数。別のインスタンスでこのタスクを利用できる状態にしておく一方で、再割り当てされたことになります。高可用性に使用できる追加のブローカートラフィックとクラスター状態のサイズを調整するために使用されます。1 以上でなければなりません。
num.standby.replicas
type: int
Default: 0
Importance: medium
各タスクのスタンバイレプリカ数。
num.stream.threads
type: int
Default: 1
Importance: medium
ストリーム処理を実行するスレッドの数。
processing.guarantee
type: string
デフォルト: at_least_once
有効な値: [at_least_once, exactly_once_beta]
Importance: medium
処理により、使用するべき確実性があります。使用できる値は、
at_least_once
(デフォルト)、exactly_once
(ブローカーバージョン 0.11.0 以降が必要)、およびexactly_once_beta
(ブローカーバージョン 2.5 以降が必要)です。完全に 1 回実行処理には、デフォルトで、実稼働に推奨される設定である 3 つ以上のブローカークラスターが必要です。開発の場合は、ブローカーの設定transaction.state.log.replication.factor
およびtransaction.state.log.min.isr
を調整することで変更できます。security.protocol
type: string
Default: PLAINTEXT
Importance: medium
ブローカーとの通信に使用されるプロトコル。有効な値は PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
task.timeout.ms
type: long
デフォルト: 300000(5 分)
有効な値: [0,…]
Importance: medium
内部エラーが原因でタスクが停止している可能性があり、エラーが発生するまで再試行する可能性がある最大時間(ミリ秒単位)。0ms のタイムアウトでは、タスクは最初の内部エラーに対してエラーを出しました。0ms を超えるタイムアウトの場合には、エラーが発生する前に、タスクは少なくとも 1 回再試行します。
topology.optimization
type: string
Default: none
Valid Values: [none, all]
Importance: medium
Kafka Streams に指示する設定は、デフォルトで無効化され、トポロジーを最適化する必要があります。
application.server
type: string
デフォルト: ""
Importance: low
この KafkaStreams インスタンスのステートストア検出と対話型クエリーに使用できる、ユーザー定義のエンドポイントを参照する host:port ペア。
buffered.records.per.partition
type: int
Default: 1000
Importance: low
パーティションごとにバッファーする最大レコード数。
built.in.metrics.version
type: string
Default: latest
Valid Values: [0.10.0-2.4, latest]
Importance: low
使用する組み込みメトリクスのバージョン。
commit.interval.ms
type: long
デフォルト: 30000(30 秒)
有効値: [0,…]
Importance: low
プロセッサーの場所を保存する頻度(ミリ秒単位)。(
processing.guarantee
がexactly_once
に設定された場合、デフォルト値は100
です。それ以外の場合は、デフォルト値は30000
です。connections.max.idle.ms
type: long
デフォルト: 540000(9 分)
Importance: low
この設定によって指定された期間(ミリ秒単位)の後にアイドル状態の接続を閉じます。
metadata.max.age.ms
type: long
デフォルト: 300000(5 分)
有効な値: [0,…]
Importance: low
パーティションリーダーが変更されておらず、新しいブローカーやパーティションをプロアクティブに検出するようにしていても、メタデータの更新を強制する期間(ミリ秒単位)。
metric.reporters
type: list
デフォルト: ""
Importance: low
メトリクスレポーターとして使用するクラスの一覧。
org.apache.kafka.common.metrics.MetricsReporter
インターフェースを実装すると、新しいメトリクス作成の通知となるクラスにプラグインすることができます。JmxReporter は、JMX 統計を登録するために常に含まれます。metrics.num.samples
type: int
Default: 2
Valid Values: [1,…]
Importance: low
メトリクスを計算するために保持されるサンプルの数。
metrics.recording.level
type: string
Default: INFO
Valid Values: [INFO, DEBUG, TRACE]
Importance: low
メトリックの最大記録レベル。
metrics.sample.window.ms
type: long
デフォルト: 30000(30 秒)
有効値: [0,…]
Importance: low
メトリクスサンプルが計算される期間。
partition.grouper
type: class
Default: org.apache.kafka.streams.processor.DefaultPartitionGrouper
Importance: low
org.apache.kafka.streams.processor.PartitionGrouper
インターフェースを実装するパーティション分類クラス。警告: この設定は非推奨となり、3.0.0 リリースで削除される予定です。poll.ms
タイプ: long
デフォルト: 100
修正: low
入力を待機する期間(ミリ秒単位)。
probing.rebalance.interval.ms
type: long
デフォルト: 600000(10 分)
有効な値: [60000,…]
Importance: low
ウォームアップを終了し、アクティブになるレプリカに対するリバランスをトリガーするまで待機する最大時間(ミリ秒単位)。割り当てがバランスになるまで、リバランスはトリガーを継続します。1 分以上でなければなりません。
receive.buffer.bytes
type: int
Default: 32768(32 kibibytes)
Valid Values: [-1,…]
Importance: low
データの読み取り時に使用する TCP 受信バッファー(SO_RCVBUF)のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
reconnect.backoff.max.ms
type: long
デフォルト: 1000(1秒)
有効値: [0,…]
Importance: low
繰り返し接続に失敗したブローカーに再接続するまで待機する最大時間(ミリ秒単位)。指定された場合、ホストごとのバックオフは連続した接続障害ごとに指数関数的に増加し、最大接続数までします。バックオフの増加を計算すると、接続のフレッターを回避するために 20% のランダムジッターが追加されます。
reconnect.backoff.ms
type: long
デフォルト: 50
有効な値: [0,…]
インポートランス: low
指定のホストに再接続を試みる前に待機する時間。これにより、密接なループでホストへ繰り返し接続することはできません。このバックオフは、クライアントがブローカーへのすべての接続試行に適用されます。
request.timeout.ms
type: int
デフォルト: 40000(40秒)
有効値: [0,…]
Importance: low
この設定では、クライアントがリクエストの応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントがリクエストを再送するか、再試行した場合はリクエストが失敗します。
retries
type: int
Default: 0
Valid Values: [0,…,2147483647]
Importance: low
ゼロよりも大きい値を設定すると、クライアントは一時的なエラーで失敗したリクエストを再送信します。値をゼロまたは
MAX_VALUE
に設定し、対応するタイムアウトパラメーターを使用してクライアントの再試行期間を制御することが推奨されます。retry.backoff.ms
type: long
デフォルト: 100
有効な値: [0,…]
インポートランス: low
特定のトピックパーティションへの失敗した要求を再試行するまでの待機時間。これにより、障害シナリオによっては、1 回目のループでリクエストを繰り返し送信しないようにします。
rocksdb.config.setter
type: class
Default: null
Importance: low
org.apache.kafka.streams.state.RocksDBConfigSetter
インターフェースを実装する Rocks DB 設定セッタークラスまたはクラス名。send.buffer.bytes
type: int
Default: 131072(128 kibibytes)
Valid Values: [-1,…]
Importance: low
データ送信時に使用する TCP 送信バッファー(SO_SNDBUF)のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
state.cleanup.delay.ms
type: long
デフォルト: 600000(10 分)
Importance: low
パーティションが移行されたときに状態を削除するまでの待機時間(ミリ秒単位)。少なくとも
state.cleanup.delay.ms
で編集されていない状態ディレクトリーのみが削除されます。upgrade.from
type: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
Importance: low
後方互換性がある方法でのアップグレードを可能にします。これは、[0.10.0, 1.1] から 2.0 以降にアップグレードする場合や、[2.0、2.3] から 2.4+ にアップグレードする際に必要になります。2.4 から新しいバージョンにアップグレードする場合、この設定を指定する必要はありません。デフォルトは
null
です。許可される値は "0.10.0"、"0.10.1"、"0.10.2"、"0.11.0"、""1.0"、"1.1"、"2.0"、"2.1"、"2.2"、"2.3" です(対応する古いバージョンからアップグレードする場合)。window.size.ms
タイプ: long
デフォルト: null
インポートランス: low
ウィンドウの終了時間を計算するために、デシリアライザーのウィンドウサイズを設定します。
windowstore.changelog.additional.retention.ms
type: long
デフォルト: 86400000(1 日)
Importance: low
データがログ早期から削除されないように、ウィンドウメンテナンス Ms に追加されました。クロックドリフトを許可します。デフォルトは 1 日です。