27.3. コンポーネントオプション


Kafka コンポーネントは、以下に示す 104 オプションをサポートします。

Expand
名前説明デフォルトタイプ

additionalProperties (共通)

kafka コンシューマーまたは kafka プロデューサーに追加のプロパティーを設定します。Camel 設定に直接設定できない場合は(例:Camel 設定に反映されていない新しい Kafka プロパティー)、プロパティーの前に additionalProperties を付ける必要があります。たとえば、additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro です。

 

マップ

brokers (共通)

使用する Kafka ブローカーの URL。形式は host1:port1,host2:port2 で、一覧はブローカーのサブセットまたはブローカーのサブセットを参照する VIP にすることができます。このオプションは、Kafka ドキュメントの bootstrap.servers と呼ばれます。

 

文字列

clientId (共通)

クライアント ID は、トレース呼び出しに役立つ各リクエストで送信されるユーザー指定の文字列です。リクエストを行うアプリケーションを論理的に特定する必要があります。

 

文字列

configuration (共通)

エンドポイントが再利用する一般的なオプションで Kafka コンポーネントを事前に設定できます。

 

KafkaConfiguration

headerFilterStrategy (共通)

カスタムの HeaderFilterStrategy を使用して、Camel メッセージとの間でヘッダーをフィルタリングします。

 

HeaderFilterStrategy

reconnectBackoffMaxMs (共通)

接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。

1000

整数

shutdownTimeout (共通)

コンシューマーまたはプロデューサーがそのワーカースレッドを正常にシャットダウンして終了するまでのタイムアウト(ミリ秒単位)。

30000

int

allowManualCommit (コンシューマー)

KafkaManualCommit による手動コミットを許可するかどうか。このオプションを有効にすると、KafkaManualCommit のインスタンスが Exchange メッセージヘッダーに保存されます。これにより、エンドユーザーはこの API にアクセスし、Kafka コンシューマーを介して手動オフセットコミットを実行できます。

false

boolean

autoCommitEnable (コンシューマー)

true の場合、コンシューマーによってすでにフェッチされているメッセージのオフセットを ZooKeeper に定期的にコミットします。このコミットされたオフセットは、新しいコンシューマーが開始する位置としてプロセスが失敗すると使用されます。

true

ブール値

autoCommitIntervalMs (コンシューマー)

コンシューマーオフセットが zookeeper にコミットされる頻度(ミリ秒単位)。

5000

整数

autoCommitOnStop (コンシューマー)

コンシューマーが最後に使用されたメッセージからのコミットを確実にするためにコンシューマーが停止したときに明示的な自動コミットを実行するかどうか。これには、autoCommitEnable オプションをオンにする必要があります。可能な値は、sync、async、または none です。および sync がデフォルト値です。

列挙値:

  • sync
  • async
  • none

sync

文字列

autoOffsetReset (consumer)

ZooKeeper に初期オフセットがない場合や、オフセットが範囲外である場合に何を行うか:最も早いオフセットにオフセットを自動的にリセットします。オフセットを最新のオフセットに自動的にリセットします。失敗:例外をコンシューマーに出力します。

列挙値:

  • latest
  • earliest
  • none

latest

文字列

breakOnFirstError (コンシューマー)

このオプションは、コンシューマーがエクスチェンジを処理して失敗するときに何が起こるかを制御します。オプションが false の場合、コンシューマーは次のメッセージに進み、処理します。オプションが true の場合、コンシューマーは破損し、失敗の原因となったメッセージのオフセットに戻り、このメッセージの処理を再度試行します。ただし、これにより、ポイズメッセージなど、バインドが毎回失敗すると、同じメッセージの無限処理が発生する可能性があります。そのため、Camel のエラーハンドラーを使用してそれを処理することをお勧めします。

false

boolean

bridgeErrorHandler (コンシューマー)

コンシューマーの Camel ルーティングエラーハンドラーへのブリッジを許可します。よって、コンシューマーが受信メッセージなどの取得を試行している間に発生した例外は、メッセージとして処理され、ルーティングエラーハンドラーによって処理されます。デフォルトでは、コンシューマーは org.apache.camel.spi.ExceptionHandler を使用して例外に対応し、WARN または ERROR レベルでログに記録され、無視されます。

false

boolean

checkCrcs (コンシューマー)

消費されたレコードの CRC32 を自動的に確認します。これにより、メッセージのネットワーク上またはディスク上の破損が発生しなくなります。このチェックはオーバーヘッドを追加するため、極端なパフォーマンスを求める場合は無効になる可能性があります。

true

ブール値

commitTimeoutMs (consumer)

同期コミットが完了するまでコードが待機する最大時間(ミリ秒単位)。

5000

Long

consumerRequestTimeoutMs (consumer)

この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。

40000

整数

consumersCount (コンシューマー)

kafka サーバーに接続するコンシューマーの数。各コンシューマーは、受信データを取得して処理する個別のスレッドで実行されます。

1

int

fetchMaxBytes (コンシューマー)

サーバーがフェッチリクエストに対して返す必要のあるデータの最大量。これは絶対最大値ではありません。フェッチの最初の空ではないパーティションの最初のメッセージがこの値よりも大きい場合でも、コンシューマーが確実に進行できるようにメッセージが返されます。ブローカーによって許可されるメッセージの最大サイズは、message.max.bytes (ブローカー設定)または max.message.bytes (トピック設定)で定義されます。コンシューマーは複数のフェッチを並行して実行することに注意してください。

52428800

整数

fetchMinBytes (コンシューマー)

サーバーがフェッチ要求に対して返す必要のあるデータの最小量。利用可能なデータが不十分な場合、リクエストは、リクエストに応答する前に、十分なデータが蓄積されるのを待ちます。

1

整数

fetchWaitMaxMs (コンシューマー)

fetch.min.bytes をすぐに満たすのに十分なデータがない場合に、サーバーがフェッチリクエストに応答するまでにブロックする最大時間。

500

整数

groupId (コンシューマー)

このコンシューマーが属するコンシューマープロセスのグループを一意に識別する文字列。複数のプロセスが同じグループ ID を設定すると、それらがすべて同じコンシューマーグループの一部であることを示します。このオプションはコンシューマーに必要です。

 

文字列

groupInstanceId (コンシューマー)

エンドユーザーが提供するコンシューマーインスタンスの一意の識別子。non-empty strings のみが許可されます。設定されている場合、コンシューマーは静的メンバーとして扱われます。つまり、常に、この ID を持つ 1 つのインスタンスのみがコンシューマーグループで許可されます。これは、より大きなセッションタイムアウトと組み合わせて使用して、一時的な利用不可 (プロセス再起動など) によるグループのリバランスを回避します。設定しないと、コンシューマーは従来の動作である動的メンバーとしてグループに参加します。

 

文字列

headerDeserializer (コンシューマー)

カスタム KafkaHeaderDeserializer を使用して kafka ヘッダーの値をデシリアライズするには、以下を行います。

 

KafkaHeaderDeserializer

heartbeatIntervalMs (コンシューマー)

Kafka のグループ管理機能を使用する場合の、ハートビートからコンシューマーコーディネーター間の想定される時間。ハートビートは、コンシューマーのセッションがアクティブな状態を維持し、新しいコンシューマーがグループに参加したり離脱したりする際のリバランスを促進するために使用されます。この値は session.timeout.ms よりも低く設定する必要がありますが、通常はその値の 1/3 以下に設定する必要があります。さらに低く調整することで、通常のリバランスの予想時間を制御することもできます。

3000

整数

keyDeserializer (コンシューマー)

Deserializer インターフェイスを実装するキーのデシリアライザークラス。

org.apache.kafka.common.serialization.StringDeserializer

文字列

maxPartitionFetchBytes (コンシューマー)

サーバーが返すパーティションごとのデータの最大量。リクエストに使用される最大合計メモリーは #partitions max.partition.fetch.bytes になります。このサイズは、サーバーが許可する最大メッセージサイズ以上である必要があります。そうしないと、プロデューサーがフェッチできるサイズよりも大きいメッセージを送信できます。この場合、コンシューマーは特定のパーティションで大きなメッセージの取得を試みてスタックする可能性があります。

1048576

整数

maxPollIntervalMs (コンシューマー)

コンシューマーグループ管理を使用する場合の poll() の呼び出し間の最大遅延。これにより、コンシューマーがさらにレコードをフェッチする前にアイドル状態になることができる時間に上限が設定されます。このタイムアウトの期限が切れる前に poll() が呼び出されない場合、コンシューマーは失敗とみなされ、グループはパーティションを別のメンバーに再割り当てするためにリバランスします。

 

Long

maxPollRecords (コンシューマー)

poll() への単一の呼び出しで返される最大レコード数。

500

整数

offsetRepository (コンシューマー)

トピックの各パーティションのオフセットをローカルで保存するために使用するオフセットリポジトリー。定義すると、自動コミットが無効になります。

 

StateRepository

partitionAssignor (コンシューマー)

グループ管理が使用される場合に、クライアントがコンシューマーインスタンス間でパーティションの所有権を分散するために使用するパーティション割り当てストラテジーのクラス名。

org.apache.kafka.clients.consumer.RangeAssignor

文字列

pollOnError (コンシューマー)

新しいメッセージのポーリング中に、kafka が例外を出力した場合のアクション。エンドポイントレベルで明示的な値が設定されていない限り、デフォルトでコンポーネント設定からの値を使用します。DISCARD はメッセージを破棄し、次のメッセージのポーリングを続行します。ERROR_HANDLER は Camel のエラーハンドラーを使用して例外を処理し、その後、次のメッセージのポーリングを続行します。RECONNECT はコンシューマーを再接続し、再度 RETRY をポーリングしようとすると、同じメッセージをポーリングして STOP を再度試行します(コンシューマーが再度メッセージを消費できる必要がある場合は、手動で起動/再起動する必要があります)。

列挙値:

  • DISCARD
  • ERROR_HANDLER
  • RECONNECT
  • RETRY
  • STOP

ERROR_HANDLER

PollOnError

pollTimeoutMs (コンシューマー)

KafkaConsumer をポーリングするときに使用されるタイムアウト。

5000

Long

resumeStrategy (コンシューマー)

このオプションを使用すると、ユーザーはカスタムの再開ストラテジーを設定できます。再開ストラテジーは、パーティションが割り当てられているときに実行されます(接続または再接続時など)。これにより、実装は操作を再開する方法をカスタマイズし、seekTo および offsetRepository メカニズムよりも柔軟な代替手段として機能します。実装の詳細については、KafkaConsumerResumeStrategy を参照してください。このオプションは、自動コミット設定には影響しません。この設定を使用する実装は、これと共に manual commit オプションを使用して評価する必要がある可能性もあります。

 

KafkaConsumerResumeStrategy

seekTo (コンシューマー)

KafkaConsumer が起動時に開始または終了する場合に設定します。最初から読み取る : read from end (終了から読み込んだ)は、以前のプロパティー seekToBeginning に代わるものです。

列挙値:

  • 開始
  • end
 

文字列

sessionTimeoutMs (コンシューマー)

Kafka のグループ管理機能の使用時に障害を検出するために使用されるタイムアウト。

10000

整数

specificAvroReader (コンシューマー)

これにより、Confluent Platform スキーマレジストリーおよび io.confluent.kafka.serializers.KafkaAvroDeserializer で使用する特定の Avro リーダーを使用できます。このオプションは、Confluent Platform (標準の Apache Kafka ではない) でのみ使用できます。

false

boolean

topicIsPattern (コンシューマー)

トピックがパターン(正規表現)であるかどうか。これは、パターンに一致するトピックの動的数にサブスクライブするために使用できます。

false

boolean

valueDeserializer (コンシューマー)

Deserializer インターフェイスを実装する値のデシリアライザークラス。

org.apache.kafka.common.serialization.StringDeserializer

文字列

kafkaManualCommitFactory (コンシューマー (上級))

KafkaManualCommit インスタンスの作成に使用する Autowired Factory。これにより、カスタムファクトリーがカスタム KafkaManualCommit インスタンスを作成するためにプラグインしてカスタム KafkaManualCommit インスタンスを作成できます。

 

KafkaManualCommitFactory

pollExceptionStrategy (コンシューマー (上級))

Autowired: コンシューマーでカスタムストラテジーを使用し、メッセージのプーリング中に Kafka ブローカーから発生した例外を処理する方法を制御します。

 

PollExceptionStrategy

bufferMemorySize (プロデューサー)

プロデューサーが、サーバーへの送信を待機しているレコードをバッファーリングするために使用できるメモリーの合計バイト数。レコードがサーバーへ配信できるよりも速く送信されると、プロデューサーは block.on.buffer.full で指定された設定に基づいて例外をブロックまたは出力します。この設定は、プロデューサーが使用するメモリーの合計にほぼ対応しますが、プロデューサーが使用するすべてのメモリーが使用されるため、ハードバウンドではありません。一部の追加メモリーは、圧縮 (圧縮が有効な場合) やインフライトリクエストの維持に使用されます。

33554432

整数

compressionCodec (producer)

このパラメーターを使用すると、このプロデューサーによって生成されたすべてのデータの圧縮コーデックを指定できます。有効な値は none、gzip、snappy です。

列挙値:

  • none
  • gzip
  • snappy
  • lz4

none

文字列

connectionMaxIdleMs (プロデューサー)

この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。

540000

整数

deliveryTimeoutMs (producer)

send() の呼び出しが返された後、成功または失敗を報告する時間の上限。これにより、送信前にレコードが遅延する合計時間、ブローカーから確認応答を待つ時間 (予想される場合)、および再試行可能な送信の失敗に許容される時間が制限されます。

120000

整数

enableIdempotence (プロデューサー)

'true' に設定すると、プロデューサーは、プロデューサーは各メッセージのコピーが 1 つだけストリームに書き込まれるようにします。'false' の場合、プロデューサーの再試行によって再試行されたメッセージの重複がストリームに書き込まれる可能性があります。このオプションを true に設定すると、max.in.flight.requests.per.connection を 1 に設定する必要があり、再試行はゼロにできず、さらに 'all' に設定する必要があります。

false

boolean

headerSerializer (プロデューサー)

カスタム KafkaHeaderSerializer を使用して kafka ヘッダー値をシリアライズするには、以下を行います。

 

KafkaHeaderSerializer

key (プロデューサー)

レコードキー (キーが指定されていない場合は null)。このオプションが設定されている場合、ヘッダー KafkaConstants#KEY よりも優先されます。

 

文字列

keySerializer (プロデューサー)

キーのシリアライザークラス(指定がない場合は、デフォルトでメッセージと同じになります)。

org.apache.kafka.common.serialization.StringSerializer

文字列

lazyStartProducer (producer)

最初のメッセージでプロデューサーをレイジーに起動すべきかどうか。レイジーに起動することで、起動時にプロデューサーが失敗し、それによりルートが失敗する可能性がある状況で、CamelContext およびルートの起動を許可します。レイジーな起動を延期すると、Camel のルーティングエラーハンドラー経由でメッセージのルーティング中に起動の失敗を処理できます。最初のメッセージが処理されるときに、プロデューサーの作成および起動に若干時間がかかり、合計処理時間が長くなる可能性があることに注意してください。

false

boolean

lingerMs (プロデューサー)

プロデューサーは、リクエストの送信の間に到着したレコードを 1 つのバッチリクエストにグループ化します。通常、これは、レコードが送信できるよりも早く到着した場合に、負荷がかかった状態でのみ発生します。ただし、状況によっては、中程度の負荷がかかっている場合でも、クライアントがリクエストの数を減らしたい場合があります。この設定は、少量の人為的な遅延を追加することでこれを実現します。つまり、レコードをすぐに送信するのではなく、プロデューサーは指定された遅延まで待機して他のレコードを送信できるようにし、送信をまとめてバッチ処理できるようにします。これは、TCP の Nagle アルゴリズムに類似するものと考えることができます。この設定は、バッチ処理の遅延の上限を提供します。あるパーティションで batch.size 相当のレコードを取得すると、この設定に関係なくすぐに送信されますが、このパーティションで蓄積されたバイト数がこれより少ない場合は、指定された時間の間、さらにレコードが取得されるのを待つことになります。デフォルトは 0 (つまり遅延なし) に設定されます。たとえば、linger.ms=5 を設定すると、送信されるリクエストの数が減りますが、負荷がない状態で送信されるレコードに最大 5 ミリ秒のレイテンシーが追加されます。

0

整数

maxBlockMs (プロデューサー)

この設定は、kafka への送信がブロックする時間を制御します。これらのメソッドは、複数の理由でブロックされる可能性があります。たとえば、バッファーが満杯、メタデータは利用できません。この設定は、メタデータの取得、キーと値のシリアライズ、send ()の実行時にバッファーメモリーのシリアライズ、バッファーメモリーのシリアライズに費やされた合計時間に最大制限を課します。partitionsFor ()の場合、この設定はメタデータを待機している最大時間しきい値を課します。

60000

整数

maxInFlightRequest (プロデューサー)

ブロックする前にクライアントが 1 つの接続で送信する確認されていないリクエストの最大数。この設定を 1 よりも大きい値に設定され、送信が失敗した場合には、再試行(再試行が有効な場合)によるメッセージの並べ替えのリスクがあることに注意してください。

5

整数

maxRequestSize (プロデューサー)

要求の最大サイズ。これは事実上、最大レコードサイズの上限でもあります。サーバーには、レコードサイズに独自の上限があり、これとは異なる可能性があることに注意してください。この設定により、プロデューサーが 1 回のリクエストで送信するレコードバッチの数が制限され、大量のリクエストが送信されないようになります。

1048576

整数

metadataMaxAgeMs (プロデューサー)

新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。

300000

整数

metricReporters (プロデューサー)

メトリクスレポーターとして使用するクラスの一覧。MetricReporter インターフェイスを実装すると、新しいメトリクスの作成が通知されるクラスでプラグインが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。

 

文字列

metricsSampleWindowMs (producer)

メトリクスを計算するために保持されるサンプルの数。

30000

整数

noOfMetricsSample (producer)

メトリクスを計算するために保持されるサンプルの数。

2

整数

partitioner (プロデューサー)

サブトピック間でメッセージを分割するための partitioner クラス。デフォルトのパーティショナーは、キーのハッシュに基づいています。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

文字列

partitionKey (プロデューサー)

レコードの送信先となるパーティション(パーティションが指定されていない場合は null)。このオプションが設定されている場合、ヘッダー KafkaConstants#PARTITION_KEY よりも優先されます。

 

整数

producerBatchSize (プロデューサー)

複数のレコードが同じパーティションに送信されるときは常に、プロデューサーはレコードをまとめてより少ない要求にバッチ処理しようとします。これにより、クライアントとサーバーの両方でパフォーマンスが向上します。この設定では、デフォルトのバッチサイズをバイト単位で制御します。このサイズより大きいレコードをバッチ処理することはありません。ブローカーに送信されたリクエストには複数のバッチが含まれます。各パーティションに 1 つずつ送信できるデータを持ちます。バッチサイズが小さいと、バッチ処理が一般的になり、スループットが減少する可能性があります(バッチサイズがゼロの場合はバッチを完全に無効にします)。バッチサイズが非常に大きい場合は、追加のレコードを想定して、常に指定のバッチサイズのバッファーを割り当てるため、メモリーを多少無駄に使用する可能性があります。

16384

整数

queueBufferingMaxMessages (producer)

非同期モードを使用している場合にプロデューサーをキューに入れることができる未送信メッセージの最大数。プロデューサーがブロックされるか、またはデータをドロップする必要があります。

10000

整数

receiveBufferBytes (プロデューサー)

データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。

65536

整数

reconnectBackoffMs (プロデューサー)

特定のホストへの再接続を試みるまで待機する時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、コンシューマーによってブローカーに送信されるすべてのリクエストに適用されます。

50

整数

recordMetadata (producer)

プロデューサーが RecordMetadata の結果を Kafka に送信すべきかどうか。結果は RecordMetadata メタデータが含まれる List に保存されます。このリストは、KafkaConstants#KAFKA_RECORDMETA キーを持つヘッダーに保存されます。

true

boolean

requestRequiredAcks (プロデューサー)

リクエストが完了したと見なす前に、プロデューサーがリーダーに受け取ったことを要求する確認の数。これは、送信されるレコードの耐久性を制御します。以下の設定は一般的です。acks=0 をゼロに設定すると、プロデューサーはサーバーからの確認をまったく待ちません。レコードは直ちにソケットバッファーに追加され、送信済みと見なされます。この場合、サーバーがレコードを受信したかどうかは保証されず、retries の設定は有効になりません (クライアントは通常、失敗を知ることができないからです)。各レコードで返されるオフセットは常に -1 に設定されます。これは、リーダーがレコードをローカルログに書き込みますが、すべてのフォロワーからの完全な確認を待たずに応答します。この場合、リーダーはレコードを承認した直後に失敗し、フォロワーがレコードを複製する前に失敗すると、レコードが失われます。acks=all これは、リーダーが In-Sync レプリカの完全なセットを待機してレコードを確認するのを待ちます。これにより、少なくとも 1 つの In-Sync レプリカが動作している限り、レコードが失われないことが保証されます。これは利用可能な最強の保証になります。

列挙値:

  • -1
  • 0
  • 1
  • all

1

文字列

requestTimeoutMs (プロデューサー)

ブローカーが request.required.acks 要件を満たすまで待機してから、エラーをクライアントに送り返す時間。

30000

整数

retries (プロデューサー)

ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性により送信に失敗したレコードを再送信します。この再試行は、クライアントがエラーを受信したときにレコードを再送した場合と同じであることに注意してください。再試行を許可すると、レコードの順序が変わる可能性があります。2 つのレコードが 1 つのパーティションに送信され、最初のレコードが失敗して再試行され、2 番目に成功すると、2 番目のレコードが最初に表示される可能性があるためです。

0

整数

retryBackoffMs (プロデューサー)

再試行する前に、プロデューサーは関連するトピックのメタデータを更新し、新しいリーダーが選出されているかどうかを確認します。リーダーエレクションは少しかかるため、このプロパティーは、メタデータを更新する前にプロデューサーが待機する時間を指定します。

100

整数

sendBufferBytes (プロデューサー)

ソケット書き込みバッファーサイズ。

131072

整数

valueSerializer (プロデューサー)

メッセージのシリアライザークラス。

org.apache.kafka.common.serialization.StringSerializer

文字列

workerPool (プロデューサー)

kafka サーバーが非同期の非ブロッキング処理を使用して KafkaProducer から送信されたメッセージを確認した後に、カスタムワーカープールを使用して、ルーティングエクスチェンジを継続します。このオプションを使用する場合、プールが不要になったときにシャットダウンするには、スレッドプールのライフサイクルを処理する必要があります。

 

executorService

workerPoolCoreSize (プロデューサー)

kafka サーバーが非同期の非ブロッキング処理を使用して KafkaProducer から送信されたメッセージを確認した後に、ワーカープールのコアスレッドの数。

10

整数

workerPoolMaxSize (プロデューサー)

kafka サーバーが非同期の非ブロッキング処理を使用して KafkaProducer から送信されたメッセージを確認した後に、ルーティングエクスチェンジを継続するためのワーカープールのスレッドの最大数。

20

整数

autowiredEnabled (上級)

自動ワイヤリングが有効になっているかどうか。これは、コンポーネントで設定される一致するタイプのインスタンスが 1 つあるかどうかを検出するためにレジストリーを検索することで、自動ワイアリングオプションに使用されます (オプションは自動ワイアとマーク付けされる必要があります)。これは、JDBC データソース、JMS 接続ファクトリー、AWS クライアントなどの自動設定に使用できます。

true

boolean

kafkaClientFactory (上級)

org.apache.kafka.clients.consumer.KafkaConsumer および org.apache.kafka.clients.producer.KafkaProducer インスタンスの作成に使用する Autowired Factory。これにより、vanilla Kafka クライアントを拡張するロジックを持つインスタンスを作成するようにカスタムファクトリーを設定できます。

 

KafkaClientFactory

synchronous (上級)

同期処理を厳密に使用するかどうかを設定します。

false

boolean

schemaRegistryURL (confluent)

使用する Confluent Platform スキーマレジストリーサーバーの URL。形式は host1:port1,host2:port2 です。これは、Confluent Platform ドキュメントの schema.registry.url として知られています。このオプションは、Confluent Platform (標準の Apache Kafka ではない) でのみ使用できます。

 

文字列

interceptorClasses (モニタリング)

プロデューサーまたはコンシューマーのインターセプターを設定します。プロデューサーインターセプターは、org.apache.kafka.clients.producer.ProducerInterceptor コンシューマーインターセプターを実装するクラスである必要があります。プロデューサーインターセプターは、org.apache.kafka.clients.consumer.ConsumerInterceptor を実装するクラスである必要があります。コンシューマーで Producer インターセプターを使用すると、ランタイムでクラスキャスト例外が発生します。

 

文字列

kerberosBeforeReloginMinTime (セキュリティー)

更新試行間のログインスレッドのスリープ時間。

60000

整数

kerberosInitCmd (セキュリティー)

Kerberos kinit コマンドパス。デフォルトは /usr/bin/kinit です。

/usr/bin/kinit

文字列

kerberosPrincipalToLocalRules (セキュリティー)

プリンシパル名から短縮名 (通常はオペレーティングシステムのユーザー名) にマッピングするためのルールの一覧です。ルールは順番に評価され、プリンシパル名と一致する最初のルールは、これを短縮名にマップするために使用されます。一覧の後続のルールは無視されます。デフォルトでは、{username}/{hostname}{REALM} 形式のプリンシパル名は {username} にマッピングされます。形式の詳細は、セキュリティー承認および acls ドキュメントを参照してください。複数の値はコンマで区切ることができます。

DEFAULT

文字列

kerberosRenewJitter (セキュリティー)

更新時間に追加されたランダムなジッターの割合。

0.05

double

kerberosRenewWindowFactor (security)

ログインスレッドは、最後の更新からチケットの有効期限までの指定された時間のウィンドウファクターに達するまでスリープし、その時点でチケットの更新を試みます。

0.8

double

saslJaasConfig (セキュリティー)

kafka sasl.jaas.config パラメーターを公開します(例:org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;)。

 

文字列

saslKerberosServiceName (security)

Kafka が実行される Kerberos プリンシパル名。これは、Kafka の JAAS 設定または Kafka の設定で定義できます。

 

文字列

saslMechanism (セキュリティー)

使用される Simple Authentication and Security Layer(SASL) メカニズム。有効な値は、を参照してください。

GSSAPI

文字列

securityProtocol (セキュリティー)

ブローカーとの通信に使用されるプロトコル。SASL_PLAINTEXT、PLAINTEXT、および SSL がサポートされています。

PLAINTEXT

文字列

sslCipherSuites (セキュリティー)

暗号化スイートの一覧。これは、TLS または SSL ネットワークプロトコルを使用してネットワーク接続のセキュリティー設定をネゴシエートするために使用される認証、暗号化、MAC、および鍵交換アルゴリズムの名前付きの組み合わせです。デフォルトでは、利用可能なすべての暗号スイートがサポートされます。

 

文字列

sslContextParameters (security)

Camel SSLContextParameters オブジェクトを使用した SSL 設定。設定されている場合、これは他の SSL エンドポイントパラメーターの前に適用されます。注記:Kafka はファイルの場所からのキーストアの読み込みのみをサポートします。そのため、KeyStoreParameters.resource オプションで file: の接頭辞を付けます。

 

SSLContextParameters

sslEnabledProtocols (security)

SSL 接続で有効なプロトコルの一覧。TLSv1.2、TLSv1.1、および TLSv1 はデフォルトで有効になっています。

 

文字列

sslEndpointAlgorithm (セキュリティー)

サーバー証明書を使用してサーバーのホスト名を検証するエンドポイント識別アルゴリズム。

https

文字列

sslKeymanagerAlgorithm (セキュリティー)

SSL 接続のキーマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたキーマネージャーファクトリーアルゴリズムです。

SunX509

文字列

sslKeyPassword (security)

キーストアファイルの秘密鍵のパスワード。これはクライアントにとってオプションになります。

 

文字列

sslKeystoreLocation (セキュリティー)

キーストアファイルの場所。これはクライアントではオプションで、クライアントの双方向認証に使用できます。

 

文字列

sslKeystorePassword (security)

キーストアファイルのストアパスワード。これはクライアントのオプションであり、ssl.keystore.location が設定されている場合にのみ必要です。

 

文字列

sslKeystoreType (セキュリティー)

キーストアファイルのファイル形式。これはクライアントにとってオプションになります。デフォルト値は JKS です。

JKS

文字列

sslProtocol (セキュリティー)

SSLContext の生成に使用される SSL プロトコル。デフォルト設定は TLS で、ほとんどの場合で問題ありません。最近の JVM で許可される値は TLS、TLSv1.1、および TLSv1.2 です。SSL、SSLv2、SSLv3 は古い JVM でサポートされる場合がありますが、既知のセキュリティー脆弱性のために使用は推奨されません。

 

文字列

sslProvider (セキュリティー)

SSL 接続に使用されるセキュリティープロバイダーの名前。デフォルト値は JVM のデフォルトのセキュリティープロバイダーです。

 

文字列

sslTrustmanagerAlgorithm (セキュリティー)

SSL 接続のトラストマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたトラストマネージャーファクトリーアルゴリズムです。

PKIX

文字列

sslTruststoreLocation (セキュリティー)

トラストストアファイルの場所。

 

文字列

sslTruststorePassword (security)

トラストストアファイルのパスワード。

 

文字列

sslTruststoreType (セキュリティー)

トラストストアファイルのファイル形式。デフォルト値は JKS です。

JKS

文字列

useGlobalSslContextParameters (security)

グローバル SSL コンテキストパラメーターの使用を有効にします。

false

boolean

Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2026 Red Hat
トップに戻る