第4章 プロデューサー設定プロパティー
key.serializer
型: class
重要度: 高
org.apache.kafka.common.serialization.Serializer
インターフェイスを実装するキーのシリアライザークラス。value.serializer
型: class
重要度: 高
org.apache.kafka.common.serialization.Serializer
インターフェイスを実装する値のシリアライザークラス。bootstrap.servers
型: list
デフォルト: ""
有効な値: null 以外の文字列
重要度: 高
Kafka クラスターへの最初の接続を確立するために使用されるホストとポートのペアのリスト。クライアントは、ブートストラップ用にここで指定されたサーバーに関係なく、すべてのサーバーを利用します。このリストは、サーバーのフルセットを検出するために使用される最初のホストにのみ影響します。このリストは、
host1:port1,host2:port2,…
の形式にする必要があります。これらのサーバーは、(動的に変更される可能性がある) 完全なクラスターメンバーシップを検出するための最初の接続にだけ使用されるため、このリストにはサーバーの完全なセットを含める必要はありません (ただし、サーバーがダウンした場合に備えて、複数のサーバーが必要になる場合があります)。buffer.memory
型: long
デフォルト: 33554432
有効な値: [0,…]
重要度: 高
producer が、サーバーへの送信を待機しているレコードをバッファリングするために使用できるメモリーの合計バイト数。レコードの送信がサーバーへの配信よりも速い場合、プロデューサーは
max.block.ms
の間ブロックし、その後例外を出力します。この設定は、プロデューサーが使用する合計メモリーにおおまかに対応する必要がありますが、プロデューサーが使用するすべてのメモリーがバッファリングに使用されるわけではないため、ハードバウンドではありません。一部の追加メモリーは、圧縮 (圧縮が有効な場合) やインフライトリクエストの維持に使用されます。
compression.type
型: string
デフォルト: none
有効な値: [none, gzip, snappy, lz4, zstd]
重要度: 高
プロデューサーによって生成されたすべてのデータの圧縮タイプ。デフォルトは none (圧縮なし) です。有効な値は、
none
、gzip
、snappy
、lz4
、またはzstd
です。圧縮はデータの完全なバッチであるため、バッチ処理の有効性は圧縮率にも影響します (バッチ処理が多いほど圧縮率が高くなります)。retries
型: int
デフォルト: 2147483647
有効な値: [0,…,2147483647]
重要度: 高
ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性により送信に失敗したレコードを再送信します。この再試行は、クライアントがエラーを受信したときにレコードを再送した場合と同じであることに注意してください。
delivery.timeout.ms
によって設定されたタイムアウトが、正常に確認される前に最初に期限切れになる場合、プロデュース要求は再試行回数が尽きる前に失敗します。通常は、この設定は未設定のままにし、代わりにdelivery.timeout.ms
を使用して再試行動作を制御します。べき等性を有効にするには、この設定値を 0 より大きくする必要があります。競合する設定が設定されており、べき等が明示的に有効になっていない場合、べき等は無効になります。
enable.idempotence
をfalse
に設定し、max.in.flight.requests.per.connection
を 1 より大きく設定して再試行を許可すると、レコードの順序が変更される可能性があります。2 番目のバッチが成功すると、2 番目のバッチのレコードが最初に表示される場合があります。ssl.key.password
型: password
デフォルト: null
重要度: 高
キーストアファイル内の秘密鍵、または ssl.keystore.key で指定された PEM キーのパスワード。
ssl.keystore.certificate.chain
型: password
デフォルト: null
重要度: 高
'ssl.keystore.type' で指定された形式の証明書チェーン。デフォルトの SSL エンジンファクトリーは、X.509 証明書のリストを含む PEM 形式のみをサポートします。
ssl.keystore.key
型: password
デフォルト: null
重要度: 高
ssl.keystore.type で指定された形式の秘密鍵。デフォルトの SSL エンジンファクトリーは、PKCS#8 キーを持つ PEM 形式のみをサポートします。鍵が暗号化されている場合は、'ssl.key.password' を使用して鍵のパスワードを指定する必要があります。
ssl.keystore.location
型: string
デフォルト: null
重要度: 高
キーストアファイルのロケーション。これはクライアントではオプションで、クライアントの双方向認証に使用できます。
ssl.keystore.password
型: password
デフォルト: null
重要度: 高
キーストアファイルのストアパスワード。これはクライアントではオプションで、'ssl.keystore.location' が設定されている場合にのみ必要です。キーストアのパスワードは PEM 形式ではサポートされません。
ssl.truststore.certificates
型: password
デフォルト: null
重要度: 高
'ssl.truststore.type' で指定された形式の信頼できる証明書。デフォルトの SSL エンジンファクトリーは、X.509 証明書を使用する PEM 形式のみをサポートします。
ssl.truststore.location
型: string
デフォルト: null
重要度: 高
トラストストアファイルのロケーション。
ssl.truststore.password
型: password
デフォルト: null
重要度: 高
トラストストアファイルのパスワード。パスワードが設定されていない場合、設定されたトラストストアファイルは引き続き使用されますが、整合性チェックは無効になります。トラストストアのパスワードは PEM 形式ではサポートされません。
batch.size
型: int
デフォルト: 16384
有効な値: [0,…]
重要度: 中
複数のレコードが同じパーティションに送信されるときは常に、producer はレコードをまとめてより少ない要求にバッチ処理しようとします。これにより、クライアントとサーバーの両方でパフォーマンスが向上します。この設定では、デフォルトのバッチサイズをバイト単位で制御します。
このサイズより大きいレコードをバッチ処理することはありません。
ブローカーに送信されるリクエストには、複数のバッチが含まれます (送信可能なデータがあるパーティションごとに 1 つ)。
バッチサイズを小さくすると、バッチ処理が少なくなり、スループットが低下する可能性があります (バッチサイズがゼロの場合、バッチ処理は完全に無効になります)。バッチサイズが非常に大きい場合は、追加のレコードを想定して、常に指定のバッチサイズのバッファーを割り当てるため、メモリーを多少無駄に使用する可能性があります。
注意: この設定は、送信されるバッチサイズの上限を示します。このパーティションに蓄積されたバイト数がこれより少ない場合は、
linger.ms
時間、より多くのレコードが表示されるのを待つために待機することになります。このlinger.ms
設定のデフォルトは 0 で、累積バッチサイズがこのbatch.size
設定より小さい場合でも、すぐにレコードを送信することを意味します。client.dns.lookup
型: string
デフォルト: use_all_dns_ips
有効な値: [use_all_dns_ips, resolve_canonical_bootstrap_servers_only]
重要度: 中
クライアントが DNS ルックアップを使用する方法を制御します。
use_all_dns_ips
に設定すると、正常な接続が確立されるまで、返された各 IP アドレスに順番に接続します。切断後に、次の IP が使用されます。すべての IP が一度使用されると、クライアントはホスト名から IP を再度解決します (ただし、JVM と OS の両方は DNS 名の検索をキャッシュします)。resolve_canonical_bootstrap_servers_only
に設定すると、各ブートストラップアドレスを正規名のリストに解決します。ブートストラップフェーズ後、これはuse_all_dns_ips
と同じように動作します。client.id
型: string
デフォルト: ""
重要度: 中
要求の実行時にサーバーに渡す ID 文字列。この目的は、サーバー側の要求ロギングに論理アプリケーション名を含めることを許可することにより、ip/port の他にもリクエストのソースを追跡できるようにすることです。
connections.max.idle.ms
型: long
デフォルト: 540000 (9 分)
重要度: 中
この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。
delivery.timeout.ms
型: int
デフォルト: 120000 (2 分)
有効な値: [0,…]
重要度: 中
send()
の呼び出しが返された後、成功または失敗を報告する時間の上限。これにより、送信前にレコードが遅延する合計時間、ブローカーから確認応答を待つ時間 (予想される場合)、および再試行可能な送信の失敗に許容される時間が制限されます。プロデューサーは、回復不可能なエラーが発生した場合や再試行し尽くした場合、またはレコードが早い配信有効期限に達したバッチに追加された場合に、この設定よりも前にレコードの送信の失敗を報告する可能性があります。この設定の値は、request.timeout.ms
およびlinger.ms
の合計値以上である必要があります。linger.ms
型: long
デフォルト: 0
有効な値: [0,…]
重要度: 中
producer は、リクエストの送信の間に到着したレコードを 1 つのバッチリクエストにグループ化します。通常、これは、レコードが送信できるよりも早く到着した場合に、負荷がかかった状態でのみ発生します。ただし、状況によっては、中程度の負荷がかかっている場合でも、クライアントがリクエストの数を減らす場合があります。この設定は、少量の人為的な遅延を追加することでこれを実現します。つまり、レコードをすぐに送信するのではなく、プロデューサーは指定された遅延まで待機して他のレコードを送信できるようにし、送信をまとめてバッチ処理できるようにします。これは、TCP の Nagle アルゴリズムに類似するものと考えることができます。この設定は、バッチ処理の遅延の上限を提供します。あるパーティションで
batch.size
相当のレコードを取得すると、この設定に関係なくすぐに送信されますが、このパーティションで蓄積されたバイト数がこれより少ない場合は、指定された時間の間、さらにレコードが取得されるのを待つことになります。デフォルトは 0 (つまり遅延なし) に設定されます。たとえば、linger.ms=5
を設定すると、送信されるリクエストの数が減りますが、負荷がない状態で送信されるレコードに最大 5 ミリ秒のレイテンシーが追加されます。max.block.ms
型: long
デフォルト: 60000 (1 分)
有効な値: [0,…]
重要度: 中
この設定は、
KafkaProducer's `send()
、partitionsFor()
、initTransactions()
、sendOffsetsToTransaction()
、commitTransaction()
、およびabortTransaction()
メソッドがブロックされる期間を制御します。send()
の場合、このタイムアウトが、メタデータのフェッチとバッファーの割り当ての両方の待ち時間の合計を制限します (ユーザーが提供するシリアライザーやパーティショナーでのブロックは、このタイムアウトにはカウントされません)。partitionsFor()
の場合、このタイムアウトは、メタデータが利用できない場合の待ち時間を制限します。トランザクション関連のメソッドは常にブロックしますが、トランザクションコーディネーターが検出されなかった場合、またはタイムアウト内に応答しなかった場合は、タイムアウトになる可能性があります。max.request.size
型: int
デフォルト: 1048576
有効な値: [0,…]
重要度: 中
リクエストの最大サイズ (バイト単位)。この設定により、プロデューサーが 1 回のリクエストで送信するレコードバッチの数が制限され、大量のリクエストが送信されないようになります。これは事実上、圧縮されていないレコードの最大バッチサイズの上限でもあります。サーバーには、レコードバッチサイズ (圧縮が有効な場合は圧縮後) に独自の上限があり、これとは異なる可能性があることに注意してください。
partitioner.class
型: class
デフォルト: null
重要度: 中
レコードの生成時にレコードを送信するパーティションを決定します。利用可能なオプションは以下の通りです。
設定されていない場合、デフォルトのパーティショニングロジックが使用されます。この戦略は、少なくとも batch.size バイトがパーティションに生成されるまで、レコードをパーティションに送信します。それはストラテジーで動作します。
1) If no partition is specified but a key is present, choose a partition based on a hash of the key.
2) If no partition or key is present, choose the sticky partition that changes when at least batch.size bytes are produced to the partition. * `org.apache.kafka.clients.producer.RoundRobinPartitioner`: A partitioning strategy where each record in a series of consecutive records is sent to a different partition, regardless of whether the 'key' is provided or not, until partitions run out and the process starts over again. Note: There's a known issue that will cause uneven distribution when a new batch is created. See KAFKA-9965 for more detail.
org.apache.kafka.clients.producer.Partitioner
インターフェイスを実装すると、カスタムパーティショナーをプラグインできます。
partitioner.ignore.keys
型: boolean
デフォルト: false
重要度: 中
true に設定すると、プロデューサーはレコードキーを使用してパーティションを選択しません。false の場合、キーが存在する場合、プロデューサーはキーのハッシュに基づいてパーティションを選択します。注: カスタムパーティショナーが使用されている場合、この設定は効果がありません。
receive.buffer.bytes
型: int
デフォルト: 32768 (32 キビバイト)
有効な値: [-1,…]
重要度: 中
データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
request.timeout.ms
型: int
デフォルト: 30000 (30 秒)
有効な値: [0,…]
重要度: 中
この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。これは、不必要なプロデューサーの再試行によるメッセージの重複の可能性を減らすために、
replica.lag.time.max.ms
(ブローカーの設定) よりも大きくする必要があります。sasl.client.callback.handler.class
型: class
デフォルト: null
重要度: 中
AuthenticateCallbackHandler インターフェイスを実装する SASL クライアントコールバックハンドラークラスの完全修飾名。
sasl.jaas.config
型: password
デフォルト: null
重要度: 中
JAAS 設定ファイルで使用される形式の SASL 接続の JAAS ログインコンテキストパラメーター。JAAS 設定ファイルの形式は、こちら で説明されています。値の形式は
loginModuleClass controlFlag (optionName=optionValue)*;
です。ブローカーの場合、設定の前にリスナー接頭辞と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required; などです。sasl.kerberos.service.name
型: string
デフォルト: null
重要度: 中
Kafka が実行される Kerberos プリンシパル名。これは、Kafka の JAAS 設定または Kafka の設定で定義できます。
sasl.login.callback.handler.class
型: class
デフォルト: null
重要度: 中
AuthenticateCallbackHandler インターフェイスを実装する SASL ログインコールバックハンドラークラスの完全修飾名。ブローカーの場合、ログインコールバックハンドラー設定の前には、リスナー接頭辞 と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler などです。
sasl.login.class
型: class
デフォルト: null
重要度: 中
Login インターフェイスを実装するクラスの完全修飾名。ブローカーの場合、ログイン設定の前にリスナー接頭辞 と SASL メカニズム名を小文字で指定する必要があります。たとえば、listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin です。
sasl.mechanism
型: string
デフォルト: GSSAPI
重要度: 中
クライアント接続に使用される SASL メカニズム。これは、セキュリティープロバイダーが利用可能な任意のメカニズムである可能性があります。GSSAPI はデフォルトのメカニズムです。
sasl.oauthbearer.jwks.endpoint.url
型: string
デフォルト: null
重要度: 中
プロバイダーの JWKS (JSON Web Key Set) を取得できる OAuth/OIDC プロバイダーの URL。URL は、HTTP(S) ベースまたはファイルベースにすることができます。URL が HTTP(S) ベースの場合、JWKS データは、ブローカーの起動時に設定された URL を介して OAuth/OIDC プロバイダーから取得されます。その時点で最新のすべてのキーは、受信リクエストのためにブローカーにキャッシュされます。まだキャッシュにない kid ヘッダークレーム値を含む JWT の認証リクエストを受信した場合、JWKS エンドポイントはオンデマンドで再度クエリーされます。ただし、ブローカーは、sasl.oauthbearer.jwks.endpoint.refresh.ms ミリ秒ごとに URL をポーリングして、それらを含む JWT リクエストが受信される前に、今後のキーでキャッシュを更新します。URL がファイルベースの場合、ブローカーは起動時に設定された場所から JWKS ファイルをロードします。JWT に JWKS ファイルにない kid ヘッダー値が含まれている場合、ブローカーは JWT を拒否し、認証は失敗します。
sasl.oauthbearer.token.endpoint.url
型: string
デフォルト: null
重要度: 中
OAuth/OIDCID プロバイダーの URL。URL が HTTP(S) ベースの場合、sasl.jaas.config の設定に基づいてログインリクエストが行われるのは、issuer のトークンエンドポイント URL です。URL がファイルベースの場合、認証に使用するために OAuth/OIDC ID プロバイダーによって発行されたアクセストークン (JWT シリアル化形式) を含むファイルを指定します。
security.protocol
型: string
デフォルト: PLAINTEXT
有効な値: (大文字と小文字の区別なし) [SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT]
重要度: 中
ブローカーとの通信に使用されるプロトコル。有効な値は、PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL です。
send.buffer.bytes
型: int
デフォルト: 131072 (128 キビバイト)
有効な値: [-1,…]
重要度: 中
データの送信時に使用する TCP 送信バッファー (SO_SNDBUF) のサイズ。値が -1 の場合、OS のデフォルトが使用されます。
socket.connection.setup.timeout.max.ms
型: long
デフォルト: 30000 (30 秒)
重要度: 中
ソケット接続が確立されるまでクライアントが待機する最大時間。接続設定のタイムアウトは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。接続ストームを回避するために、タイムアウトに 0.2 のランダム化ファクターが適用され、計算された値よりも 20% 未満と 20% 超の間にランダムな範囲が発生します。
socket.connection.setup.timeout.ms
型: long
デフォルト: 10000 (10 秒)
重要度: 中
ソケット接続が確立されるまでクライアントが待機する時間。タイムアウトが経過する前に接続が構築されない場合、クライアントはソケットチャネルを閉じます。この値は初期バックオフ値であり、連続して接続に失敗するたびに、最大で
socket.connection.setup.timeout.max.ms
の値まで指数関数的に増加します。ssl.enabled.protocols
型: list
デフォルト: TLSv1.2,TLSv1.3
重要度: 中
SSL 接続で有効なプロトコルの一覧。Java 11 以降で実行する場合、デフォルトは 'TLSv1.2,TLSv1.3' で、それ以外の場合は 'TLSv1.2' になります。Java 11 のデフォルト値では、クライアントとサーバーの両方が TLSv1.3 をサポートする場合は TLSv1.3 を優先し、そうでない場合は TLSv1.2 にフォールバックします (両方が少なくとも TLSv1.2 をサポートすることを前提とします)。ほとんどの場合、このデフォルトは問題ありません。
ssl.protocol
の設定ドキュメントも参照してください。ssl.keystore.type
型: string
デフォルト: JKS
重要度: 中
キーストアファイルのファイル形式。これはクライアントにとってオプションになります。デフォルトの
ssl.engine.factory.class
で現在サポートされている値は JKS、PKCS12、PEM です。ssl.protocol
型: string
デフォルト: TLSv1.3
重要度: 中
SSLContext の生成に使用される SSL プロトコル。Java 11 以降で実行する場合、デフォルトは 'TLSv1.3' になります。それ以外の場合は 'TLSv1.2' になります。この値は、ほとんどのユースケースで問題ありません。最近の JVM で許可される値は 'TLSv1.2' および 'TLSv1.3' です。'TLS'、'TLSv1.1'、'SSL'、'SSLv2'、および 'SSLv3' は古い JVM でサポートされる可能性がありますが、既知のセキュリティー脆弱性のために使用は推奨されません。この設定のデフォルト値および 'ssl.enabled.protocols' では、サーバーが 'TLSv1.3' に対応していない場合は、クライアントは 'TLSv1.2' にダウングレードされます。この設定が 'TLSv1.2' に設定されている場合、'TLSv1.3' が ssl.enabled.protocols の値の 1 つで、サーバーが 'TLSv1.3' のみをサポートする場合でも、クライアントは 'TLSv1.3' を使用しません。
ssl.provider
型: string
デフォルト: null
重要度: 中
SSL 接続に使用されるセキュリティープロバイダーの名前。デフォルト値は JVM のデフォルトのセキュリティープロバイダーです。
ssl.truststore.type
型: string
デフォルト: JKS
重要度: 中
トラストストアファイルのファイル形式。デフォルトの
ssl.engine.factory.class
で現在サポートされている値は JKS、PKCS12、PEM です。acks
型: string
デフォルト: all
有効な値: [all, -1, 0, 1]
重要度: 低
リクエストが完了したと見なす前に、producer がリーダーに受け取ったことを要求する確認の数。これは、送信されるレコードの耐久性を制御します。以下の設定が許可されます。
-
acks=0
ゼロに設定すると、プロデューサーはサーバーからの確認応答を一切待ちません。レコードは直ちにソケットバッファーに追加され、送信済みと見なされます。この場合、サーバーがレコードを受信したかどうかは保証されず、retries
の設定は有効になりません (クライアントは通常、失敗を知ることができないからです)。各レコードで返されるオフセットは、常に-1
に設定されます。 -
acks=1
これは、リーダーはその記録をローカルログに書き込みますが、全フォロワーからの完全な承認を待たずに応答します。この場合、リーダーがレコードを確認した直後に失敗し、これがフォロワーがレコードを複製する前であった場合は、レコードは失われます。 acks=all
リーダーは同期しているレプリカの完全セットがレコードを確認するのを待ちます。これにより、少なくとも 1 つの In-Sync レプリカが動作している限り、レコードが失われないことが保証されます。これは利用可能な最強の保証になります。これは acks=-1 設定に相当します。べき等性を有効にするには、この設定値を all にする必要があることに注意してください。競合する設定が設定されており、べき等が明示的に有効になっていない場合、べき等は無効になります。
-
auto.include.jmx.reporter
型: boolean
デフォルト: true
重要度: 低
非推奨。JmxReporter が
metric.reporters
にリストされていない場合でも、自動的に含めるかどうか。この設定は Kafka 4.0 で削除される予定です。JmxReporter を有効にするには、代わりにorg.apache.kafka.common.metrics.JmxReporter
をmetric.reporters
に含める必要があります。enable.idempotence
型: boolean
デフォルト: true
重要度: 低
'true' に設定すると、プロデューサーは各メッセージのコピーが 1 つだけストリームに書き込まれるようにします。'false' の場合、ブローカーの失敗などによるプロデューサーの再試行により、再試行されたメッセージの重複がストリームに書き込まれる可能性があります。べき等を有効にするには、
max.in.flight.requests.per.connection
は 5 以下 (メッセージの順序は任意の許容値で保持される) で、retries
は 0 よりも大きくなければならず、acks
は 'all' である必要がある点に注意してください。競合する設定が設定されていない場合、べき等性はデフォルトで有効になっています。競合する設定が設定されており、べき等が明示的に有効になっていない場合、べき等は無効になります。べき等が明示的に有効化されていて、競合する設定が設定されている場合、
ConfigException
が出力されます。enable.metrics.push
型: boolean
デフォルト: true
重要度: 低
クラスターにクライアントと一致するクライアントメトリクスサブスクリプションがある場合に、クラスターへのクライアントメトリクスのプッシュを有効にするかどうか。
interceptor.classes
型: list
デフォルト: ""
有効な値: null 以外の文字列
重要度: 低
インターセプターとして使用するクラスのリスト。
org.apache.kafka.clients.producer.ProducerInterceptor
インターフェイスを実装すると、プロデューサーが受信したレコードが Kafka クラスターに公開される前に、そのレコードをインターセプト (場合によってはミューテーションも可能) できます。デフォルトでは、インターセプターはありません。max.in.flight.requests.per.connection
型: int
デフォルト: 5
有効な値: [1,…]
重要度: 低
ブロックする前にクライアントが 1 つの接続で送信する、確認されていないリクエストの最大数。この設定が 1 よりも大きく、
enable.idempotence
が false に設定されている場合、再試行 (つまり、再試行が有効になっている場合) により、送信が失敗した後にメッセージが並べ替えられるリスクがあることに注意してください。再試行が無効になっている場合、またはenable.idempotence
が true に設定されている場合、順序は保持されます。さらに、べき等性を有効にするには、この設定の値を 5 以下にする必要があります。競合する設定が設定されており、べき等が明示的に有効になっていない場合、べき等は無効になります。metadata.max.age.ms
型: long
デフォルト: 300000 (5 分)
有効な値: [0,…]
重要度: 低
新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。
metadata.max.idle.ms
型: long
デフォルト: 300000 (5 分)
有効な値: [5000,…]
重要度: 低
アイドル状態のトピックのメタデータをプロデューサーがキャッシュする時間を制御します。トピックが最後に作成されてからの経過時間がメタデータのアイドル期間を超えた場合、トピックのメタデータは忘れられ、次にアクセスするとメタデータフェッチ要求が強制されます。
metric.reporters
型: list
デフォルト: ""
有効な値: null 以外の文字列
重要度: 低
メトリクスレポーターとして使用するクラスの一覧。
org.apache.kafka.common.metrics.MetricsReporter
インターフェイスを実装すると、新しいメトリックの作成が通知されるクラスのプラグが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。metrics.num.samples
型: int
デフォルト: 2
有効な値: [1,…]
重要度: 低
メトリクスを計算するために保持されるサンプルの数。
metrics.recording.level
型: string
デフォルト: INFO
有効な値: [INFO, DEBUG, TRACE]
重要度: 低
メトリックの最も高い記録レベル。
metrics.sample.window.ms
型: long
デフォルト: 30000 (30 秒)
有効な値: [0,…]
重要度: 低
メトリックサンプルが計算される時間枠。
partitioner.adaptive.partitioning.enable
型: boolean
デフォルト: true
重要度: 低
true に設定すると、プロデューサーはブローカーのパフォーマンスに適応し、より高速なブローカーでホストされているパーティションにより多くのメッセージを生成しようとします。false の場合、プロデューサーはメッセージを均一に配布しようとします。注: カスタムパーティショナーが使用されている場合、この設定は効果がありません。
partitioner.availability.timeout.ms
型: long
デフォルト: 0
有効な値: [0,…]
重要度: 低
ブローカーが
partitioner.availability.timeout.ms
の間、パーティションからの生成要求を処理できない場合、パーティショナーはそのパーティションを利用できないものとして扱います。値が 0 の場合、このロジックは無効になります。注記: カスタムパーティショナーが使用されているか、partitioner.adaptive.partitioning.enable
が false に設定されている場合、この設定は効果がありません。reconnect.backoff.max.ms
型: long
デフォルト: 1000 (1 秒)
有効な値: [0,…]
重要度: 低
接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。
reconnect.backoff.ms
型: long
デフォルト: 50
有効な値: [0,…]
重要度: 低
特定のホストへの再接続を試みる前の基本的な待機時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、クライアントによるブローカーへのすべての接続試行に適用されます。この値は初期バックオフ値であり、連続して接続に失敗するたびに、最大で
reconnect.backoff.max.ms
の値まで指数関数的に増加します。retry.backoff.max.ms
型: long
デフォルト: 1000 (1 秒)
有効な値: [0,…]
重要度: 低
繰り返し失敗したブローカーへの要求を再試行するときに待機する最大時間 (ミリ秒)。指定した場合、クライアントごとのバックオフは、要求が失敗するたびにこの最大値まで指数関数的に増加します。再試行時にすべてのクライアントが同期されないように、バックオフには係数 0.2 のランダム化されたジッターが適用されます。その結果、バックオフは計算値より 20% 低い値と 20% 高い値の間になります。
retry.backoff.ms
がretry.backoff.max.ms
よりも大きく設定されている場合は、指数関数的に増加することなく、最初からretry.backoff.max.ms
が定数バックオフとして使用されます。retry.backoff.ms
型: long
デフォルト: 100
有効な値: [0,…]
重要度: 低
特定のトピックパーティションに対して失敗したリクエストを再試行するまでの待機時間。これにより、一部の障害シナリオでタイトループでリクエストを繰り返し送信することを回避できます。この値は初期バックオフ値であり、要求が失敗するたびに、最大で
retry.backoff.max.ms
の値まで指数関数的に増加します。sasl.kerberos.kinit.cmd
型: string
デフォルト:/usr/bin/kinit
重要度: 低
Kerberos kinit コマンドパス。
sasl.kerberos.min.time.before.relogin
型: long
デフォルト: 60000
重要度: 低
更新試行間のログインスレッドのスリープ時間。
sasl.kerberos.ticket.renew.jitter
型: double
デフォルト: 0.05
重要度: 低
更新時間に追加されたランダムなジッターの割合。
sasl.kerberos.ticket.renew.window.factor
型: double
デフォルト: 0.8
重要度: 低
ログインスレッドは、最後の更新からチケットの有効期限までの指定された時間のウィンドウファクターに達するまでスリープし、その時点でチケットの更新を試みます。
sasl.login.connect.timeout.ms
型: int
デフォルト: null
重要度: 低
外部認証プロバイダーの接続タイムアウト用の (オプションの) 値 (ミリ秒単位)。現在は、OAUTHBEARER にのみ適用されます。
sasl.login.read.timeout.ms
型: int
デフォルト: null
重要度: 低
外部認証プロバイダーの読み取りタイムアウト用の (オプションの) 値 (ミリ秒単位)。現在は、OAUTHBEARER にのみ適用されます。
sasl.login.refresh.buffer.seconds
型: short
デフォルト: 300
有効な値: [0,…,3600]
重要度: 低
クレデンシャルを更新するときに維持するクレデンシャルの有効期限が切れるまでのバッファー時間 (秒単位)。更新が、バッファー秒数よりも有効期限に近いときに発生する場合、更新は、バッファー時間をできるだけ維持するために前倒しで行われます。設定可能な値は 0 から 3600 (1 時間) です。値を指定しない場合は、デフォルト値の 300 (5 分) が使用されます。この値と sasl.login.refresh.min.period.seconds の合計が、クレデンシャルの残りの有効期間を超える場合は、両方とも無視されます。現在は、OAUTHBEARER にのみ適用されます。
sasl.login.refresh.min.period.seconds
型: short
デフォルト: 60
有効な値: [0,…,900]
重要度: 低
ログイン更新スレッドがクレデンシャルを更新する前に待機する最小時間 (秒単位)。設定可能な値は 0 から 900 (15 分) です。値を指定しない場合は、デフォルト値の 60 (1 分) が使用されます。この値と sasl.login.refresh.buffer.seconds の合計が、クレデンシャルの残りの有効期間を超える場合は、両方とも無視されます。現在は、OAUTHBEARER にのみ適用されます。
sasl.login.refresh.window.factor
型: double
デフォルト: 0.8
有効な値: [0.5,…,1.0]
重要度: 低
ログイン更新スレッドは、クレデンシャルの有効期間との関連で指定の期間ファクターに達するまでスリープ状態になり、達成した時点でクレデンシャルの更新を試みます。設定可能な値は 0.5 (50%) から 1.0 (100%) までです。値が指定されていない場合、デフォルト値の 0.8 (80%) が使用されます。現在は、OAUTHBEARER にのみ適用されます。
sasl.login.refresh.window.jitter
型: double
デフォルト: 0.05
有効な値: [0.0,…,0.25]
重要度: 低
ログイン更新スレッドのスリープ時間に追加されるクレデンシャルの存続期間に関連するランダムジッターの最大量。設定可能な値は 0 から 0.25 (25%) です。値が指定されていない場合は、デフォルト値の 0.05 (5%) が使用されます。現在は、OAUTHBEARER にのみ適用されます。
sasl.login.retry.backoff.max.ms
型: long
デフォルト: 10000 (10 秒)
重要度: 低
外部認証プロバイダーへのログイン試行間における最大待機時間の (オプションの) 値 (ミリ秒単位)。ログインでは、指数関数バックオフアルゴリズムが使用され、初期待機時間は sasl.login.retry.backoff.ms 設定に基づき、試行間の待機時間は 2 倍になり、最大待機時間は sasl.login.retry.backoff.max.ms 設定に基づき、指定されます。現在は、OAUTHBEARER にのみ適用されます。
sasl.login.retry.backoff.ms
型: long
デフォルト: 100
重要度: 低
外部認証プロバイダーへのログイン試行間における初期待機用の (オプションの) 値 (ミリ秒単位)。ログインでは、指数関数バックオフアルゴリズムが使用され、初期待機時間は sasl.login.retry.backoff.ms 設定に基づき、試行間の待機時間は 2 倍になり、最大待機時間は sasl.login.retry.backoff.max.ms 設定に基づき、指定されます。現在は、OAUTHBEARER にのみ適用されます。
sasl.oauthbearer.clock.skew.seconds
型: int
デフォルト: 30
重要度: 低
OAuth/OIDC ID プロバイダーとブローカーの時間の差を考慮した秒単位の (オプションの) 値。
sasl.oauthbearer.expected.audience
型: list
デフォルト: null
重要度: 低
JWT が想定される対象者の 1 つに対して発行されたことを確認するためにブローカーが使用する (オプションの) コンマ区切りの設定です。JWT は標準的な OAuth の aud クレームについて検査され、この値が設定されている場合、ブローカーは JWT の aud クレームから値が完全に一致するかどうかを確認するために照合します。一致するものがない場合、ブローカーは JWT を拒否し、認証は失敗します。
sasl.oauthbearer.expected.issuer
型: string
デフォルト: null
重要度: 低
想定される issuer によって JWT が作成されたことを確認するためにブローカーが使用する (オプションの) 設定。JWT は標準の OAuth iss クレームについて検査され、この値が設定されている場合、ブローカーは JWT の iss クレームにあるものと正確に照合します。一致するものがない場合、ブローカーは JWT を拒否し、認証は失敗します。
sasl.oauthbearer.jwks.endpoint.refresh.ms
型: long
デフォルト: 3600000 (1 時間)
重要度: 低
ブローカーが JWT の署名を検証するためのキーを含む JWKS (JSON Web Key Set) キャッシュを更新するまで待機するミリ秒単位の (オプションの) 値。
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
型: long
デフォルト: 10000 (10 秒)
重要度: 低
外部認証プロバイダーから JWKS (JSON Web Key Set) を取得する試行間における最大待機時間の (オプションの) 値 (ミリ秒単位)。JWKS の取得では、sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 設定に基づく初期待機を伴う指数関数バックオフアルゴリズムが使用され、sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 設定で指定された最大待機時間まで試行間の待機時間が 2 倍になります。
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
型: long
デフォルト: 100
重要度: 低
外部認証プロバイダーからの JWKS (JSON Web Key Set) の取得試行間における初期待機の (オプションの) 値 (ミリ秒単位)。JWKS の取得では、sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 設定に基づく初期待機を伴う指数関数バックオフアルゴリズムが使用され、sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 設定で指定された最大待機時間まで試行間の待機時間が 2 倍になります。
sasl.oauthbearer.scope.claim.name
型: string
デフォルト: scope
重要度: 低
スコープの OAuth クレームはスコープと呼ばれることがよくありますが、OAuth/OIDC プロバイダーがそのクレームに別の名前を使用する場合、この (オプションの) 設定は JWT ペイロードのクレームに含まれるスコープに使用する別の名前を提供できます。
sasl.oauthbearer.sub.claim.name
型: string
デフォルト: sub
重要度: 低
サブジェクトの OAuth クレームは sub と呼ばれることがよくありますが、この (オプションの) 設定は、OAuth/OIDC プロバイダーがそのクレームに別の名前を使用する場合、JWT ペイロードのクレームに含まれるサブジェクトに使用する別の名前を提供できます。
security.providers
型: string
デフォルト: null
重要度: 低
設定可能なクリエータークラスのリストで、それぞれがセキュリティーアルゴリズムを実装するプロバイダーを返します。これらのクラスは
org.apache.kafka.common.security.auth.SecurityProviderCreator
インターフェイスを実装する必要があります。ssl.cipher.suites
型: list
デフォルト: null
重要度: 低
暗号化スイートの一覧。これは、TLS または SSL ネットワークプロトコルを使用してネットワーク接続のセキュリティー設定をネゴシエートするために使用される認証、暗号化、MAC、および鍵交換アルゴリズムの名前付きの組み合わせです。デフォルトでは、利用可能なすべての暗号スイートがサポートされます。
ssl.endpoint.identification.algorithm
型: string
デフォルト: https
重要度: 低
サーバー証明書を使用してサーバーのホスト名を検証するエンドポイント識別アルゴリズム。
ssl.engine.factory.class
型: class
デフォルト: null
重要度: 低
SSLEngine オブジェクトを提供する org.apache.kafka.common.security.auth.SslEngineFactory タイプのクラス。デフォルト値は org.apache.kafka.common.security.ssl.DefaultSslEngineFactory です。あるいは、これを org.apache.kafka.common.security.ssl.CommonNameLoggingSslEngineFactory に設定すると、クライアントがいずれかのブローカーでの認証に使用する期限切れ SSL 証明書のコモンネームが、ログレベル INFO で記録されます。この場合、mTLS クライアントからブローカーへの新しい接続を確立する際に、クライアントが提供した証明書チェーンを調べるための追加コードが原因で、わずかに遅延が発生することに注意してください。さらに、実装では標準の Java トラストストアに基づくカスタムトラストストアが使用されます。したがって、標準のトラストストアほど成熟しておらず、セキュリティーリスクとみなされる可能性があることに注意してください。
ssl.keymanager.algorithm
型: string
デフォルト: SunX509
重要度: 低
SSL 接続のキーマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたキーマネージャーファクトリーアルゴリズムです。
ssl.secure.random.implementation
型: string
デフォルト: null
重要度: 低
SSL 暗号操作に使用する SecureRandom PRNG 実装。
ssl.trustmanager.algorithm
型: string
デフォルト: PKIX
重要度: 低
SSL 接続のトラストマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたトラストマネージャーファクトリーアルゴリズムです。
transaction.timeout.ms
型: int
デフォルト: 60000 (1 分)
重大度: 低
コーディネーターがトランザクションを積極的に中止するまで、トランザクションが開いたままになる最大時間 (ミリ秒単位)。トランザクションの開始は、最初のパーティションの追加時に設定されます。この値がブローカーの
transaction.max.timeout.ms
設定よりも大きい場合、リクエストはInvalidTxnTimeoutException
エラーで失敗します。transactional.id
型: string
デフォルト: null
有効な値: 空以外の文字列
重要度: 低
トランザクション配信に使用する TransactionalId。これにより、クライアントは、新しいトランザクションを開始する前に同じ TransactionalId を使用するトランザクションが完了したことを保証できるため、複数のプロデューサーセッションにまたがる信頼性セマンティクスが可能になります。TransactionalId が指定されていない場合、プロデューサーはべき等配信に制限されます。TransactionalId が設定されている場合は、暗黙的に
enable.idempotence
になります。デフォルトでは、TransactionId は設定されていません。つまり、トランザクションは使用できません。なお、デフォルトでは、トランザクションには少なくとも 3 つのブローカーのクラスターが必要で、これは本番環境では推奨される設定です。開発環境ではtransaction.state.log.replication.factor
設定を調整して変更できます。