検索

付録G Kafka Streams 設定パラメーター

download PDF
application.id

型: string
重要度:

ストリーム処理アプリケーションの識別子。Kafka クラスター内で一意である必要があります。これは、1) デフォルトのクライアント ID の接頭辞、2) メンバーシップ管理のためのグループ ID、3) Changelog のトピックの接頭辞として使用されます。

bootstrap.servers

型: list
重要度:

Kafka クラスターへの最初の接続を確立するために使用されるホストとポートのペアの一覧。クライアントは、ブートストラップ用にここで指定されたサーバーに関係なく、すべてのサーバーを利用します。この一覧は、サーバーのフルセットを検出するために使用される最初のホストにのみ影響します。この一覧は、host1:port1,host2:port2,…​ の形式にする必要があります。これらのサーバーは、(動的に変更される可能性がある) 完全なクラスターメンバーシップを検出するための最初の接続にだけ使用されるため、このリストにはサーバーの完全なセットを含める必要はありません (ただし、サーバーがダウンした場合に備えて、複数のサーバーが必要になる場合があります)。

replication.factor

型: int
デフォルト: 1
重要度:

ストリーム処理アプリケーションによって作成されたログトピックおよびパーティショントピックを変更するためのレプリケーション係数。

state.dir

型: string
デフォルト: /tmp/kafka-streams
重要度:

状態ストアのディレクトリーの場所。このパスは、同じ基礎となるファイルシステムを共有するストリームインスタンスごとに一意である必要があります。

cache.max.bytes.buffering

型: long
デフォルト: 10485760
有効な値: [0,…​]
重要度:

すべてのスレッドでバッファーするのに使用されるメモリーバイトの最大数。

client.id

型: string
デフォルト: ""
重要度:

内部コンシューマー、プロデューサー、および復元コンシューマーのクライアント ID に使用される ID 接頭辞。パターンは、'<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>' です。

default.deserialization.exception.handler

型: class
デフォルト: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
重要度:

org.apache.kafka.streams.errors.DeserializationExceptionHandler インターフェイスを実装する例外処理クラス。

default.key.serde

型: クラス
デフォルト: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
重要度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装するキーのデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serde インターフェイスを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。

default.production.exception.handler

型: class
デフォルト: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
重要度:

org.apache.kafka.streams.errors.ProductionExceptionHandler インターフェイスを実装する例外処理クラス。

default.timestamp.extractor

型: class
デフォルト: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
重要度:

org.apache.kafka.streams.processor.TimestampExtractor インターフェイスを実装した、デフォルトのタイムスタンプ抽出クラスです。

default.value.serde

型: クラス
デフォルト: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
重要度:

org.apache.kafka.common.serialization.Serde インターフェイスを実装する値のデフォルトのシリアライザー/デシリアライザークラス。windowed serde クラスを使用する場合は、org.apache.kafka.common.serialization.Serde インターフェイスを実装した inner serde クラスを 'default.windowed.key.serde.inner' または 'default.windowed.value.serde.inner' で設定する必要があることに注意してください。

max.task.idle.ms

型: long
デフォルト: 0
重要度:

複数の入力ストリーム間で順不同のレコード処理を防ぐために、すべてのパーティションバッファーにレコードが含まれていないと、ストリームタスクがアイドル状態のままになる最大時間。

num.standby.replicas

型: int
デフォルト: 0
重要度:

各タスクのスタンバイレプリカ数。

num.stream.threads

型: int
デフォルト: 1
重要度:

ストリーム処理を実行するスレッドの数。

processing.guarantee

型: 文字列
デフォルト: at_least_once
有効な値: at_least_once、exact_once
重要度:

使用されるべき処理保証です。可能な値は at_least_once (デフォルト) と exact_once です。1 回だけの処理には、デフォルトで少なくとも 3 つのブローカーのクラスターが必要であることに注意してください。これは、実稼働環境に推奨される設定です。開発の場合、ブローカー設定 transaction.state.log.replication.factor および transaction.state.log.min.isr を調整することにより、これを変更できます。

security.protocol

型: string
デフォルト: PLAINTEXT
重要度:

ブローカーとの通信に使用されるプロトコル。有効な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。

topology.optimization

型: string
デフォルト: none
有効な値: [none, all]
重要度:

デフォルトでは、トポロジーを最適化する必要がある場合に Kafka Streams に指示する設定。

application.server

型: string
デフォルト: ""
重要度:

この KafkaStreams インスタンスでの状態ストア検出とインタラクティブなクエリーに使用できるユーザー定義のエンドポイントを参照する host:port ペア。

buffered.records.per.partition

型: int
デフォルト: 1000
重要度:

パーティションごとにバッファーを行う最大レコード数。

built.in.metrics.version

型: string
デフォルト: latest
有効な値: [0.10.0-2.4, latest]
重要度:

使用する組み込みメトリクスのバージョン。

commit.interval.ms

型: long
デフォルト: 30000
有効な値: [0,…​]
重要度:

プロセッサーの位置を保存する頻度。(なお、processing.guaranteeexactly_once に設定されている場合はデフォルトで 100、そうでない場合はデフォルトで 30000 となっています。

connections.max.idle.ms

型: long
デフォルト: 540000
重要度:

この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。

metadata.max.age.ms

型: long
デフォルト: 300000
有効な値: [0,…​]
重要度:

新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。

metric.reporters

型: list
デフォルト: ""
重要度:

メトリクスレポーターとして使用するクラスの一覧。org.apache.kafka.common.metrics.MetricsReporter インターフェイスを実装すると、新しいメトリクスの作成が通知されるクラスのプラグが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。

metrics.num.samples

型: int
デフォルト: 2
有効な値: [1,…​]
重要度:

メトリクスを計算するために保持されるサンプルの数。

metrics.recording.level

型: string
デフォルト: INFO
有効な値: [INFO, DEBUG]
重要度:

メトリクスの最も高い記録レベル。

metrics.sample.window.ms

型: long
デフォルト: 30000
有効な値: [0,…​]
重要度:

メトリクスサンプルが計算される時間枠。

partition.grouper

型: クラス
デフォルト: org.apache.kafka.streams.processor.DefaultPartitionGrouper
重要度:

org.apache.kafka.streams.processor.PartitionGrouper インターフェイスを実装するパーティショングループクラス。警告: この設定は非推奨になり、3.0.0 リリースで削除されます。

poll.ms

型: long
デフォルト: 100
重要度:

入力を待つためにブロックする時間をミリ秒単位で指定します。

receive.buffer.bytes

型: int
デフォルト: 32768
有効な値: [-1,…​]
重要度:

データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

reconnect.backoff.max.ms

型: long
デフォルト: 1000
有効な値: [0,…​]
重要度:

接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。

reconnect.backoff.ms

型: long
デフォルト: 50
有効な値: [0,…​]
重要度:

特定のホストへの再接続を試みる前に待機するベース時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、クライアントによるブローカーへのすべての接続試行に適用されます。

request.timeout.ms

型: int
デフォルト: 40000
有効な値: [0,…​]
重要度:

この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。

retries

型: int
デフォルト: 0
有効な値: [0,…​,2147483647]
重要度:

ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性がある失敗した要求を再送信します。

retry.backoff.ms

型: long
デフォルト: 100
有効な値: [0,…​]
重要度:

特定のトピックパーティションに対して失敗したリクエストを再試行するまでの待機時間。これにより、一部の障害シナリオでタイトループでリクエストを繰り返し送信することを回避できます。

rocksdb.config.setter

型: class
デフォルト: null
重要度:

org.apache.kafka.streams.state.RocksDBConfigSetter インターフェイスを実装する Rocks DB 設定セッタークラスまたはクラス名。

send.buffer.bytes

型: int
デフォルト: 131072
有効な値: [-1,…​]
重要度:

データの送信時に使用する TCP 送信バッファー (SO_SNDBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。

state.cleanup.delay.ms

型: long
デフォルト: 600000
重要度:

パーティションが移行されたときに状態を削除するまで待機する時間 (ミリ秒単位)。少なくとも state.cleanup.delay.ms の間変更されていない state ディレクトリーのみが削除されます。

upgrade.from

型: string
デフォルト: null
有効な値: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
重要度:

後方互換性のある方法でのアップグレードを許可します。これは、[0.10.0, 1.1] から 2.0+ にアップグレード、または [2.0, 2.3] から 2.4+ にアップグレードする際に必要です。2.4 から新しいバージョンにアップグレードする場合は、この設定を指定する必要はありません。デフォルトは null です。許可される値は 0.10.0、0.10.1、0.10.2、0.11.0、1.0、1.1、2.0、2.1、2.2、2.3 (対応する旧バージョンからのアップグレード用) です。

windowstore.changelog.additional.retention.ms

型: long
デフォルト: 86400000
重要度:

windows maintainMs に追加され、データがログから早期に削除されないようにします。クロックドリフトを許可します。デフォルトは 1 日です。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.