付録G Kafka Streams 設定パラメーター


application.id

タイプ: string
重要度:

ストリーム処理アプリケーションの識別子。Kafka クラスター内で一意である必要があります。これは、1) デフォルトのクライアントIDのプレフィックス、2) メンバーシップ管理のためのグループID、3) Changelog のトピックのプレフィックスとして使用されます。

bootstrap.servers

タイプ: list
重要度:

Kafka クラスターへの最初の接続を確立するために使用されるホストとポートのペアの一覧。クライアントは、ブートストラップ用にここで指定されたサーバーに関係なく、すべてのサーバーを利用します。この一覧は、サーバーのフルセットを検出するために使用される最初のホストにのみ影響します。この一覧は、host1:port1,host2:port2,…​ の形式にする必要があります。これらのサーバーは、(動的に変更される可能性がある) 完全なクラスターメンバーシップを検出するための最初の接続にだけ使用されるため、このリストにはサーバーの完全なセットを含める必要はありません (ただし、サーバーがダウンした場合に備えて、複数のサーバーが必要になる場合があります)。

replication.factor

タイプ: int
デフォルト: 1
重大度:

ストリーム処理アプリケーションによって作成されたログトピックおよびパーティショントピックを変更するためのレプリケーション係数。

state.dir

タイプ: string
デフォルト: /tmp/kafka-streams
重要度:

状態ストアのディレクトリーの場所。このパスは、同じ基礎となるファイルシステムを共有するストリームインスタンスごとに一意である必要があります。

acceptable.recovery.lag

型: long
デフォルト: 10000
有効な値: [0,…​]
重要度:

クライアントがアクティブなタスクに対して取り戻したと見なされる最大許容遅延 (補うためのオフセット数) です。特定のワークロードに対して、1 分間でリカバリー時間に十分に対応できるはずです。0 以上である必要があります。

cache.max.bytes.buffering

型: long
デフォルト: 10485760
有効な値: [0,…​]
重要度:

すべてのスレッドでバッファーするのに使用されるメモリーバイトの最大数。

client.id

タイプ: string
デフォルト: ""
重要度:

内部コンシューマー、プロデューサー、および復元コンシューマーのクライアントIDに使用される ID プレフィックス。パターンは、'<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>' です。

default.deserialization.exception.handler

タイプ: class
デフォルト: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
重要度:

org.apache.kafka.streams.errors.DeserializationExceptionHandler インターフェイスを実装する例外処理クラス。

default.key.serde

タイプ: class
デフォルト: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
重大度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装するキーのデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serdeインターフェースを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。

default.production.exception.handler

タイプ: class
デフォルト: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
重要度:

org.apache.kafka.streams.errors.ProductionExceptionHandler インターフェイスを実装する例外処理クラス。

default.timestamp.extractor

タイプ: class
デフォルト: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
重要度:

org.apache.kafka.streams.processor.TimestampExtractor インターフェイスを実装した、デフォルトのタイムスタンプ抽出クラスです。

default.value.serde

タイプ: class
デフォルト: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
重大度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装する値のデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serde インターフェイスを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。

max.task.idle.ms

型: long
デフォルト: 0
重要度:

複数の入力ストリームで順不同のレコード処理を防ぐために、すべてのパーティションバッファーにレコードが含まれない場合に、ストリームタスクがアイドル状態になる最大時間。

max.warmup.replicas

型: int
デフォルト: 2
有効な値: [1,…​]
重要度:

タスクが再割り当てされたあるインスタンスでウォームアップしている間に、別のインスタンスでタスクを利用できるようにするために、一度に割り当てることができるウォームアップレプリカ (設定されたnum.standbysを超える追加のスタンドバイ) の最大数です。高可用性を確保するために追加のブローカートラフィックとクラスターの状態を調整するのに使用されます。1 以上でなければなりません。

num.standby.replicas

タイプ: int
デフォルト: 0
重大度:

各タスクのスタンバイレプリカ数。

num.stream.threads

型: int
デフォルト: 1
重要度:

ストリーム処理を実行するスレッドの数。

processing.guarantee

タイプ: string
デフォルト: at_least_once
有効な値: [at_least_once, exactly_once, exactly_once_beta]
重大度:

使用されるべき処理保証です。使用可能な値は、at_least_once(デフォルト)、exactly_once(ブローカーバージョン 0.11.0 以降が必要)、および exactly_once_beta(ブローカーバージョン 2.5 以降が必要)です。正確に1回の処理には、デフォルトで少なくとも3つのブローカーのクラスターが必要であることに注意してください。これは、実稼働環境に推奨される設定です。開発の場合、ブローカー設定transaction.state.log.replication.factorおよびtransaction.state.log.min.isrを調整することにより、これを変更できます。

security.protocol

型: string
デフォルト: PLAINTEXT
重要度:

ブローカーとの通信に使用されるプロトコル。有効な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。

topology.optimization

タイプ: string
デフォルト: none
有効な値: [none, all]
重要度:

デフォルトでは、トポロジーを最適化する必要がある場合に Kafka Streams に指示する設定。

application.server

タイプ: string
デフォルト: ""
重要度:

この KafkaStreams インスタンスでの状態ストア検出とインタラクティブなクエリーに使用できるユーザー定義のエンドポイントを参照する host:port ペア。

buffered.records.per.partition

型: int
デフォルト: 1000
重要度:

パーティションごとにバッファーを行う最大レコード数。

built.in.metrics.version

タイプ: string
デフォルト: latest
有効な値: [0.10.0-2.4, latest]
重大度:

使用する組み込みメトリクスのバージョン。

commit.interval.ms

型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…​]
重要度:

プロセッサーの位置を保存する頻度。(なお、processing.guaranteeexactly_onceに設定されている場合はデフォルトで100、そうでない場合はデフォルトで30000となっています。

connections.max.idle.ms

型: long
デフォルト: 540000 (9 分)
重要度: low

この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。

metadata.max.age.ms

型: long
デフォルト: 300000 (5 分)
有効な値: [0,…​]
重要度: low

新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。

metric.reporters

タイプ: list
デフォルト: ""
重要度:

メトリクスレポーターとして使用するクラスの一覧。org.apache.kafka.common.metrics.MetricsReporter インターフェイスを実装すると、新しいメトリクスの作成が通知されるクラスのプラグが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。

metrics.num.samples

型: int
デフォルト: 2
有効な値: [1,…​]
重要度:

メトリクスを計算するために保持されるサンプルの数。

metrics.recording.level

型: string
デフォルト: INFO
有効な値: [INFO, DEBUG]
重要度:

メトリクスの最も高い記録レベル。

metrics.sample.window.ms

型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…​]
重要度:

メトリクスサンプルが計算される時間枠。

partition.grouper

タイプ: class
デフォルト: org.apache.kafka.streams.processor.DefaultPartitionGrouper
重大度:

org.apache.kafka.streams.processor.PartitionGrouper インターフェースを実装するパーティショングループクラス。警告: この設定は非推奨になり、3.0.0 リリースで削除されます。

poll.ms

型: long
デフォルト: 100
重要度:

入力を待つためにブロックする時間をミリ秒単位で指定します。

probing.rebalance.interval.ms

型: long
デフォルト: 600000 (10 分)
有効な値: [60000,…​]
重要度: low

準備が完了し、アクティブになる準備が整っているウォームアップレプリカのリバランスをトリガーする前に待機する最大時間。プローブリバランスは、割り当てが分散されるまで引き続きトリガーされます。1 分以上でなければなりません。

receive.buffer.bytes

型: int
デフォルト: 32768 (32 キビバイト)
有効な値: [-1,…​]
重要度: low

データの読み取り時に使用する TCP 受信バッファー(SO_RCVBUF)のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

reconnect.backoff.max.ms

型: long
デフォルト: 1000 (1 秒)
有効な値: [0,…​]
重要度:

接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。

reconnect.backoff.ms

型: long
デフォルト: 50
有効な値: [0,…​]
重要度:

特定のホストへの再接続を試みる前に待機するベース時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、クライアントによるブローカーへのすべての接続試行に適用されます。

request.timeout.ms

型: int
デフォルト: 40000 (40 秒)
有効な値: [0,…​]
重要度:

この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。

retries

型: int
デフォルト: 0
有効な値: [0,…​,2147483647]
重要度:

ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性がある失敗した要求を再送信します。

retry.backoff.ms

型: long
デフォルト: 100
有効な値: [0,…​]
重要度:

特定のトピックパーティションに対して失敗したリクエストを再試行するまでの待機時間。これにより、一部の障害シナリオでタイトループでリクエストを繰り返し送信することを回避できます。

rocksdb.config.setter

タイプ: class
デフォルト: null
重要度:

org.apache.kafka.streams.state.RocksDBConfigSetter インターフェイスを実装する Rocks DB 設定セッタークラスまたはクラス名。

send.buffer.bytes

型: int
デフォルト: 131072 (128 キビバイト)
有効な値: [-1,…​]
重要度: low

データの送信時に使用する TCP 送信バッファー (SO_SNDBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

state.cleanup.delay.ms

型: long
デフォルト: 600000 (10 分)
重要度: low

パーティションが移行されたときに状態を削除するまで待機する時間 (ミリ秒単位)。少なくとも state.cleanup.delay.ms の間変更されていない state ディレクトリーのみが削除されます。

upgrade.from

タイプ: string
デフォルト: null
有効な値: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
重要度:

後方互換性のある方法でのアップグレードを許可します。これは、[0.10.0, 1.1] から 2.0+ にアップグレード、または [2.0, 2.3] から 2.4+ にアップグレードする際に必要です。2.4 から新しいバージョンにアップグレードする場合は、この設定を指定する必要はありません。デフォルトは null です。許可される値は 0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1、2.0、2.1、2.2、2.3 (対応する旧バージョンからのアップグレード用) です。

windowstore.changelog.additional.retention.ms

型: long
デフォルト: 86400000 (1 日)
重要度:

windows maintainMs に追加され、データがログから早期に削除されないようにします。クロックドリフトを許可します。デフォルトは 1 日です。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.