付録D プロデューサー設定パラメーター
key.serializer
型: class
重要度: 高
org.apache.kafka.common.serialization.Serializer
インターフェイスを実装するキーのシリアライザークラス。value.serializer
型: class
重要度: 高
org.apache.kafka.common.serialization.Serializer
インターフェイスを実装する値のシリアライザークラス。acks
型: 文字列
デフォルト: 1
有効な値: すべて、-1、0、1
重要度: 高
リクエストが完了したと見なす前に、プロデューサーがリーダーに受け取ったことを要求する確認の数。これは、送信されるレコードの耐久性を制御します。以下の設定が許可されます。
-
acks=0
ゼロに設定すると、プロデューサーはサーバーからの確認応答を一切待ちません。レコードは直ちにソケットバッファーに追加され、送信済みと見なされます。この場合、サーバーがレコードを受信したかどうかは保証されず、retries
の設定は有効になりません (クライアントは通常、失敗を知ることができないからです)。各レコードで返されるオフセットは、常に-1
に設定されます。 -
acks=1
これは、リーダーはその記録をローカルログに書き込みますが、全フォロワーからの完全な承認を待たずに応答します。この場合、リーダーがレコードを確認した直後に失敗し、これがフォロワーがレコードを複製する前であった場合は、レコードは失われます。 -
acks=all
リーダーは同期しているレプリカの完全セットがレコードを確認するのを待ちます。これにより、少なくとも 1 つの In-Sync レプリカが動作している限り、レコードが失われないことが保証されます。これは利用可能な最強の保証になります。これは acks=-1 設定に相当します。
-
bootstrap.servers
型: list
デフォルト: ""
有効な値: non-null string
重要度: 高
Kafka クラスターへの最初の接続を確立するために使用されるホストとポートのペアの一覧。クライアントは、ブートストラップ用にここで指定されたサーバーに関係なく、すべてのサーバーを利用します。この一覧は、サーバーのフルセットを検出するために使用される最初のホストにのみ影響します。この一覧は、
host1:port1,host2:port2,…
の形式にする必要があります。これらのサーバーは、(動的に変更される可能性がある) 完全なクラスターメンバーシップを検出するための最初の接続にだけ使用されるため、このリストにはサーバーの完全なセットを含める必要はありません (ただし、サーバーがダウンした場合に備えて、複数のサーバーが必要になる場合があります)。buffer.memory
型: long
デフォルト: 33554432
有効な値: [0,…]
重要度: 高
プロデューサーが、サーバーへの送信を待機しているレコードをバッファーリングするために使用できるメモリーの合計バイト数。レコードの送信がサーバーへの配信よりも速い場合、プロデューサーは
max.block.ms
の間ブロックし、その後例外を出力します。この設定は、プロデューサーが使用する合計メモリーにおおまかに対応する必要がありますが、プロデューサーが使用するすべてのメモリーがバッファーリングに使用されるわけではないため、ハードバウンドではありません。一部の追加メモリーは、圧縮 (圧縮が有効な場合) やインフライトリクエストの維持に使用されます。
compression.type
型: string
デフォルト: none
重要度: 高
プロデューサーによって生成されたすべてのデータの圧縮タイプ。デフォルトは none (圧縮なし) です。有効な値は、
none
、gzip
、snappy
、lz4
、またはzstd
です。圧縮はデータの完全なバッチであるため、バッチ処理の有効性は圧縮率にも影響します (バッチ処理が多いほど圧縮率が高くなります)。retries
型: int
デフォルト: 2147483647
有効な値: [0,…,2147483647]
重要度: 高
ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性により送信に失敗したレコードを再送信します。この再試行は、クライアントがエラーを受信したときにレコードを再送した場合と同じであることに注意してください。
max.in.flight.requests.per.connection
に 1 を設定せずに再試行を許可すると、レコードの順序が変わる可能性があります。なぜなら、2 つのバッチが 1 つのパーティションに送信され、最初のバッチが失敗して再試行され、2 番目のバッチが成功した場合、2 番目のバッチのレコードが最初に表示される可能性があるからです。さらに、確認が正常に実行される前にdelivery.timeout.ms
によって設定されたタイムアウトが期限切れになると、再試行の回数が指定数に達する前に行る前に生成の要求に失敗することに注意してください。通常は、この設定は未設定のままにし、代わりにdelivery.timeout.ms
を使用して再試行動作を制御します。ssl.key.password
型: password
デフォルト: null
重要度: 高
キーストアファイル内の秘密キーのパスワード。これはクライアントにとってオプションになります。
ssl.keystore.location
型: string
デフォルト: null
重要度: 高
キーストアファイルの場所。これはクライアントではオプションで、クライアントの双方向認証に使用できます。
ssl.keystore.password
型: password
デフォルト: null
重要度: 高
キーストアファイルのストアパスワード。これはクライアントではオプションで、ssl.keystore.location が設定されている場合にのみ必要です。
ssl.truststore.location
型: string
デフォルト: null
重要度: 高
トラストストアファイルの場所。
ssl.truststore.password
型: password
デフォルト: null
重要度: 高
トラストストアファイルのパスワード。パスワードが設定されていない場合でもトラストストアへのアクセスは可能ですが、整合性チェックは無効になっています。
batch.size
型: int
デフォルト: 16384
有効な値: [0,…]
重要度: 中
複数のレコードが同じパーティションに送信されるときは常に、プロデューサーはレコードをまとめてより少ない要求にバッチ処理しようとします。これにより、クライアントとサーバーの両方でパフォーマンスが向上します。この設定では、デフォルトのバッチサイズをバイト単位で制御します。
このサイズより大きいレコードをバッチ処理することはありません。
ブローカーに送信されるリクエストには、複数のバッチが含まれます (送信可能なデータがあるパーティションごとに 1 つ)。
バッチサイズを小さくすると、バッチ処理が少なくなり、スループットが低下する可能性があります (バッチサイズがゼロの場合、バッチ処理は完全に無効になります)。バッチサイズが非常に大きい場合は、追加のレコードを想定して、常に指定のバッチサイズのバッファーを割り当てるため、メモリーを多少無駄に使用する可能性があります。
client.dns.lookup
型: 文字列
デフォルト: デフォルト
有効な値: デフォルト、use_all_dns_ips、resolve_canonical_bootstrap_servers_only
重要度: 中
クライアントが DNS ルックアップを使用する方法を制御します。
use_all_dns_ips
に設定すると、ルックアップでホスト名に対して複数の IP アドレスが返された場合、接続に失敗する前にすべての IP アドレスへの接続が試行されます。ブートストラップサーバーとアドバタイズサーバーの両方に適用されます。値がresolve_canonical_bootstrap_servers_only
の場合、各エントリーは解決され、正規名のリストにデプロイメントされます。client.id
型: string
デフォルト: ""
重要度: 中
要求の実行時にサーバーに渡す ID 文字列。この目的は、サーバー側の要求ロギングに論理アプリケーション名を含めることを許可することにより、ip/port の他にもリクエストのソースを追跡できるようにすることです。
connections.max.idle.ms
型: long
デフォルト: 540000
重要度: 中
この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。
delivery.timeout.ms
型: int
デフォルト: 120000
有効な値: [0,…]
重要度: 中
send()
の呼び出しが返された後、成功または失敗を報告する時間の上限。これにより、送信前にレコードが遅延する合計時間、ブローカーから確認応答を待つ時間 (予想される場合)、および再試行可能な送信の失敗に許容される時間が制限されます。プロデューサーは、回復不可能なエラーが発生した場合や再試行し尽くした場合、またはレコードが早い配信有効期限に達したバッチに追加された場合に、この設定よりも前にレコードの送信の失敗を報告する可能性があります。この設定の値は、request.timeout.ms
およびlinger.ms
の合計値以上である必要があります。linger.ms
型: long
デフォルト: 0
有効な値: [0,…]
重要度: 中
プロデューサーは、リクエストの送信の間に到着したレコードを 1 つのバッチリクエストにグループ化します。通常、これは、レコードが送信できるよりも早く到着した場合に、負荷がかかった状態でのみ発生します。ただし、状況によっては、中程度の負荷がかかっている場合でも、クライアントがリクエストの数を減らしたい場合があります。この設定は、少量の人為的な遅延を追加することでこれを実現します。つまり、レコードをすぐに送信するのではなく、プロデューサーは指定された遅延まで待機して他のレコードを送信できるようにし、送信をまとめてバッチ処理できるようにします。これは、TCP の Nagle アルゴリズムに類似するものと考えることができます。この設定は、バッチ処理の遅延の上限を提供します。あるパーティションで
batch.size
相当のレコードを取得すると、この設定に関係なくすぐに送信されますが、このパーティションで蓄積されたバイト数がこれより少ない場合は、指定された時間の間、さらにレコードが取得されるのを待つことになります。デフォルトは 0 (つまり遅延なし) に設定されます。たとえば、linger.ms=5
を設定すると、送信されるリクエストの数が減りますが、負荷がない状態で送信されるレコードに最大 5 ミリ秒のレイテンシーが追加されます。max.block.ms
型: long
デフォルト: 60000
有効な値: [0,…]
重要度: 中
設定は、
KafkaProducer.send()
およびKafkaProducer.partitionsFor()
がブロックする時間を制御します。これらのメソッドは、バッファーがいっぱいであるか、メタデータが利用できないためにブロックされる可能性があります。ユーザー提供のシリアライザーまたはパーティショナーでのブロックは、このタイムアウトに対してカウントされません。.max.request.size
型: int
デフォルト: 1048576
有効な値: [0,…]
重要度: 中
リクエストの最大サイズ (バイト単位)。この設定により、プロデューサーが 1 回のリクエストで送信するレコードバッチの数が制限され、大量のリクエストが送信されないようになります。これは事実上、圧縮されていないレコードの最大バッチサイズの上限でもあります。サーバーには、レコードバッチサイズ (圧縮が有効な場合は圧縮後) に独自の上限があり、これとは異なる可能性があることに注意してください。
partitioner.class
型: class
デフォルト: org.apache.kafka.clients.producer.internals.DefaultPartitioner
重要度: 中
org.apache.kafka.clients.producer.Partitioner
インターフェイスを実装するパーティションクラス。receive.buffer.bytes
型: int
デフォルト: 32768
有効な値: [-1,…]
重要度: 中
データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
request.timeout.ms
型: int
デフォルト: 30000
有効な値: [0,…]
重要度: 中
この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。これは、不必要なプロデューサーの再試行によるメッセージの重複の可能性を減らすために、
replica.lag.time.max.ms
(ブローカーの設定) よりも大きくする必要があります。sasl.client.callback.handler.class
型: class
デフォルト: null
重要度: 中
AuthenticateCallbackHandler インターフェイスを実装する SASL クライアントコールバックハンドラークラスの完全修飾名。
sasl.jaas.config
型: password
デフォルト: null
重要度: 中
JAAS 設定ファイルで使用される形式の SASL 接続の JAAS ログインコンテキストパラメーター。JAAS 設定ファイルの形式は、こちら で説明されています。値の形式は 「loginModuleClass controlFlag (optionName=optionValue)*;」 です。ブローカーの場合、設定の前にリスナー接頭辞と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; などです。
sasl.kerberos.service.name
型: string
デフォルト: null
重要度: 中
Kafka が実行される Kerberos プリンシパル名。これは、Kafka の JAAS 設定または Kafka の設定で定義できます。
sasl.login.callback.handler.class
型: class
デフォルト: null
重要度: 中
AuthenticateCallbackHandler インターフェイスを実装する SASL ログインコールバックハンドラークラスの完全修飾名。ブローカーの場合、ログインコールバックハンドラー設定の前には、リスナー接頭辞 と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler などです。
sasl.login.class
型: class
デフォルト: null
重要度: 中
Login インターフェイスを実装するクラスの完全修飾名。ブローカーの場合、ログイン設定の前にリスナー接頭辞 と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin です。
sasl.mechanism
型: string
デフォルト: GSSAPI
重要度: 中
クライアント接続に使用される SASL メカニズム。これは、セキュリティープロバイダーが利用可能な任意のメカニズムである可能性がありますGSSAPI はデフォルトのメカニズムです。
security.protocol
型: string
デフォルト: PLAINTEXT
重要度: 中
ブローカーとの通信に使用されるプロトコル。有効な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
send.buffer.bytes
型: int
デフォルト: 131072
有効な値: [-1,…]
重要度: 中
データの送信時に使用する TCP 送信バッファー (SO_SNDBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
ssl.enabled.protocols
型: リスト
デフォルト: TLSv1.2
重要度: 中
SSL 接続で有効なプロトコルの一覧。
ssl.keystore.type
型: string
デフォルト: JKS
重要度: 中
キーストアファイルのファイル形式。これはクライアントにとってオプションになります。
ssl.protocol
型: string
デフォルト: TLSv1.2
重要度: 中
SSLContext の生成に使用される SSL プロトコル。デフォルト設定は TLSv1.2 で、ほとんどの場合に問題ありません。最近の JVM で許可される値は TLSv1.2 および TLSv1.3 です。TLS、TLSv1.1、SSL、SSLv2、および SSLv3 は古い JVM でサポートされる可能性がありますが、既知のセキュリティー脆弱性のために使用は推奨されません。
ssl.provider
型: string
デフォルト: null
重要度: 中
SSL 接続に使用されるセキュリティープロバイダーの名前。デフォルト値は JVM のデフォルトのセキュリティープロバイダーです。
ssl.truststore.type
型: string
デフォルト: JKS
重要度: 中
トラストストアファイルのファイル形式。
enable.idempotence
型: boolean
デフォルト: false
重要度: 低
'true' に設定すると、プロデューサーは、プロデューサーは各メッセージのコピーが 1 つだけストリームに書き込まれるようにします。'false' の場合、ブローカーの失敗などによるプロデューサーの再試行により、再試行されたメッセージの重複がストリームに書き込まれる可能性があります。べき等を有効にするには、
max.in.flight.requests.per.connection
は 5 以下、retries
は 0 よりも大きい必要があり、acks
は all でなければなりません。これらの値がユーザーによって明示的に設定されていない場合は、適切な値が選択されます。互換性のない値が設定された場合は、ConfigException
が出力されます。interceptor.classes
型: list
デフォルト: ""
有効な値: non-null string
重要度: 低
インターセプターとして使用するクラスの一覧。
org.apache.kafka.clients.producer.ProducerInterceptor
インターフェイスを実装すると、プロデューサーが受信したレコードが Kafka クラスターに公開される前に、そのレコードをインターセプト (場合によってはミューテーションも可能) できます。デフォルトでは、インターセプターはありません。max.in.flight.requests.per.connection
型: int
デフォルト: 5
有効な値: [1,…]
重要度: 低
ブロックする前にクライアントが 1 つの接続で送信する確認されていないリクエストの最大数。この設定が 1 より大きい値に設定されていて、送信に失敗した場合、再試行によりメッセージの順序が変更されるリスクがあることに注意してください (つまり、再試行が有効になっている場合)。
metadata.max.age.ms
型: long
デフォルト: 300000
有効な値: [0,…]
重要度: 低
新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。
metadata.max.idle.ms
型: long
デフォルト: 300000
有効な値: [5000,…]
重要度: 低
アイドル状態のトピックのメタデータをプロデューサーがキャッシュする時間を制御します。トピックが最後に作成されてからの経過時間がメタデータのアイドル期間を超えた場合、トピックのメタデータは忘れられ、次にアクセスするとメタデータフェッチ要求が強制されます。
metric.reporters
型: list
デフォルト: ""
有効な値: non-null string
重要度: 低
メトリクスレポーターとして使用するクラスの一覧。
org.apache.kafka.common.metrics.MetricsReporter
インターフェイスを実装すると、新しいメトリクスの作成が通知されるクラスのプラグが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。metrics.num.samples
型: int
デフォルト: 2
有効な値: [1,…]
重要度: 低
メトリクスを計算するために保持されるサンプルの数。
metrics.recording.level
型: string
デフォルト: INFO
有効な値: [INFO, DEBUG]
重要度: 低
メトリクスの最も高い記録レベル。
metrics.sample.window.ms
型: long
デフォルト: 30000
有効な値: [0,…]
重要度: 低
メトリクスサンプルが計算される時間枠。
reconnect.backoff.max.ms
型: long
デフォルト: 1000
有効な値: [0,…]
重要度: 低
接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。
reconnect.backoff.ms
型: long
デフォルト: 50
有効な値: [0,…]
重要度: 低
特定のホストへの再接続を試みる前に待機するベース時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、クライアントによるブローカーへのすべての接続試行に適用されます。
retry.backoff.ms
型: long
デフォルト: 100
有効な値: [0,…]
重要度: 低
特定のトピックパーティションに対して失敗したリクエストを再試行するまでの待機時間。これにより、一部の障害シナリオでタイトループでリクエストを繰り返し送信することを回避できます。
sasl.kerberos.kinit.cmd
型: string
デフォルト: /usr/bin/kinit
重要度: 低
Kerberos kinit コマンドパス。
sasl.kerberos.min.time.before.relogin
型: long
デフォルト: 60000
重要度: 低
更新試行間のログインスレッドのスリープ時間。
sasl.kerberos.ticket.renew.jitter
型: double
デフォルト: 0.05
重要度: 低
更新時間に追加されたランダムなジッターの割合。
sasl.kerberos.ticket.renew.window.factor
型: double
デフォルト: 0.8
重要度: 低
ログインスレッドは、最後の更新からチケットの有効期限までの指定された時間のウィンドウファクターに達するまでスリープし、その時点でチケットの更新を試みます。
sasl.login.refresh.buffer.seconds
型: short
デフォルト: 300
有効な値: [0,…,3600]
重要度: 低
クレデンシャルを更新するときに維持するクレデンシャルの有効期限が切れるまでのバッファー時間 (秒単位)。更新が、バッファー秒数よりも有効期限に近いときに発生する場合、更新は、可能な限り多くのバッファー時間を維持するために上に移動されます。設定可能な値は 0 から 3600 (1 時間) です。値を指定しない場合は、デフォルト値の 300 (5 分) が使用されます。この値および sasl.login.refresh.min.period.seconds は、その合計がクレデンシャルの残りの有効期間を超える場合にどちらも無視されます。現在、OAUTHBEARER にのみ適用されます。
sasl.login.refresh.min.period.seconds
型: short
デフォルト: 60
有効な値: [0,…,900]
重要度: 低
ログイン更新スレッドがクレデンシャルを更新する前に待機する最小時間 (秒単位)。設定可能な値は 0 から 900 (15 分) です。値を指定しない場合は、デフォルト値の 60 (1 分) が使用されます。この値および sasl.login.refresh.buffer.seconds は、その合計がクレデンシャルの残りの有効期間を超える場合に両方とも無視されます。現在、OAUTHBEARER にのみ適用されます。
sasl.login.refresh.window.factor
型: double
デフォルト: 0.8
有効な値: [0.5,…,1.0]
重要度: 低
ログイン更新スレッドは、クレデンシャルの有効期間との関連で指定の期間ファクターに達するまでスリープ状態になり、達成した時点でクレデンシャルの更新を試みます。設定可能な値は 0.5(50%) から 1.0(100%) までです。値が指定されていない場合、デフォルト値の 0.8(80%) が使用されます。現在、OAUTHBEARER にのみ適用されます。
sasl.login.refresh.window.jitter
型: double
デフォルト: 0.05
有効な値: [0.0,…,0.25]
重要度: 低
ログイン更新スレッドのスリープ時間に追加されるクレデンシャルの存続期間に関連するランダムジッターの最大量。設定可能な値は 0 から 0.25(25%) です。値が指定されていない場合は、デフォルト値の 0.05(5%) が使用されます。現在、OAUTHBEARER にのみ適用されます。
security.providers
型: string
デフォルト: null
重要度: 低
設定可能なクリエータークラスのリストで、それぞれがセキュリティーアルゴリズムを実装するプロバイダーを返します。これらのクラスは
org.apache.kafka.common.security.auth.SecurityProviderCreator
インターフェイスを実装する必要があります。ssl.cipher.suites
型: list
デフォルト: null
重要度: 低
暗号化スイートの一覧。これは、TLS または SSL ネットワークプロトコルを使用してネットワーク接続のセキュリティー設定をネゴシエートするために使用される認証、暗号化、MAC、および鍵交換アルゴリズムの名前付きの組み合わせです。デフォルトでは、利用可能なすべての暗号スイートがサポートされます。
ssl.endpoint.identification.algorithm
型: string
デフォルト: https
重要度: 低
サーバー証明書を使用してサーバーのホスト名を検証するエンドポイント識別アルゴリズム。
ssl.keymanager.algorithm
型: string
デフォルト: SunX509
重要度: 低
SSL 接続のキーマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたキーマネージャーファクトリーアルゴリズムです。
ssl.secure.random.implementation
型: string
デフォルト: null
重要度: 低
SSL 暗号操作に使用する SecureRandom PRNG 実装。
ssl.trustmanager.algorithm
型: string
デフォルト: PKIX
重要度: 低
SSL 接続のトラストマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたトラストマネージャーファクトリーアルゴリズムです。
transaction.timeout.ms
型: int
デフォルト: 60000
重要度: 低
トランザクションコーディネーターが、進行中のトランザクションを積極的に中止する前に、プロデューサーからのトランザクション状態の更新を待つ最大時間 (ミリ秒)。この値がブローカーの transaction.max.timeout.ms 設定より大きい場合、リクエストは
InvalidTransactionTimeout
エラーで失敗します。transactional.id
型: string
デフォルト: null
有効な値: non-empty string
重要度: 低
トランザクション配信に使用する TransactionalId。これにより、クライアントは、新しいトランザクションを開始する前に同じ TransactionalId を使用するトランザクションが完了したことを保証できるため、複数のプロデューサーセッションにまたがる信頼性セマンティクスが可能になります。TransactionalId が指定されていない場合、プロデューサーはべき等配信に制限されます。TransactionalId が設定されている場合は、
enable.idempotence
を有効にする必要があることに注意してください。デフォルトはnull
です。これは、トランザクションを使用できないことを意味します。なお、デフォルトでは、トランザクションには少なくとも 3 つのブローカーのクラスターが必要で、これは本番環境では推奨される設定です。開発環境ではtransaction.state.log.replication.factor
設定を調整して変更できます。