27.12. Spring Boot Auto-Configuration
Spring Boot で kafka を使用する場合は、自動設定をサポートするために、以下の Maven 依存関係を必ず使用してください。
コンポーネントは、以下に記載される 105 のオプションをサポートします。
名前 | 説明 | デフォルト | タイプ |
---|---|---|---|
camel.component.kafka.additional-properties | kafka コンシューマーまたは kafka プロデューサーに追加のプロパティーを設定します。Camel 設定に直接設定できない場合は(例:Camel 設定に反映されていない新しい Kafka プロパティー)、プロパティーの前に additionalProperties を付ける必要があります。たとえば、additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro です。 | マップ | |
camel.component.kafka.allow-manual-commit | KafkaManualCommit による手動コミットを許可するかどうか。このオプションを有効にすると、KafkaManualCommit のインスタンスが Exchange メッセージヘッダーに保存されます。これにより、エンドユーザーはこの API にアクセスし、Kafka コンシューマーを介して手動オフセットコミットを実行できます。 | false | ブール値 |
camel.component.kafka.auto-commit-enable | true の場合、コンシューマーによってすでにフェッチされているメッセージのオフセットを ZooKeeper に定期的にコミットします。このコミットされたオフセットは、新しいコンシューマーが開始する位置としてプロセスが失敗すると使用されます。 | true | ブール値 |
camel.component.kafka.auto-commit-interval-ms | コンシューマーオフセットが zookeeper にコミットされる頻度(ミリ秒単位)。 | 5000 | 整数 |
camel.component.kafka.auto-commit-on-stop | コンシューマーが最後に使用されたメッセージからのコミットを確実にするためにコンシューマーが停止したときに明示的な自動コミットを実行するかどうか。これには、autoCommitEnable オプションをオンにする必要があります。可能な値は、sync、async、または none です。および sync がデフォルト値です。 | sync | 文字列 |
camel.component.kafka.auto-offset-reset | ZooKeeper に初期オフセットがない場合や、オフセットが範囲外である場合に何を行うか:最も早いオフセットにオフセットを自動的にリセットします。オフセットを最新のオフセットに自動的にリセットします。失敗:例外をコンシューマーに出力します。 | latest | 文字列 |
camel.component.kafka.autowired-enabled | 自動ワイヤリングが有効になっているかどうか。これは、コンポーネントで設定される一致するタイプのインスタンスが 1 つあるかどうかを検出するためにレジストリーを検索することで、自動ワイアリングオプションに使用されます (オプションは自動ワイアとマーク付けされる必要があります)。これは、JDBC データソース、JMS 接続ファクトリー、AWS クライアントなどの自動設定に使用できます。 | true | ブール値 |
camel.component.kafka.break-on-first-error | このオプションは、コンシューマーがエクスチェンジを処理して失敗するときに何が起こるかを制御します。オプションが false の場合、コンシューマーは次のメッセージに進み、処理します。オプションが true の場合、コンシューマーは破損し、失敗の原因となったメッセージのオフセットに戻り、このメッセージの処理を再度試行します。ただし、これにより、ポイズメッセージなど、バインドが毎回失敗すると、同じメッセージの無限処理が発生する可能性があります。そのため、Camel のエラーハンドラーを使用してそれを処理することをお勧めします。 | false | ブール値 |
camel.component.kafka.bridge-error-handler | コンシューマーの Camel ルーティングエラーハンドラーへのブリッジを許可します。よって、コンシューマーが受信メッセージなどの取得を試行している間に発生した例外は、メッセージとして処理され、ルーティングエラーハンドラーによって処理されます。デフォルトでは、コンシューマーは org.apache.camel.spi.ExceptionHandler を使用して例外に対応し、WARN または ERROR レベルでログに記録され、無視されます。 | false | ブール値 |
camel.component.kafka.brokers | 使用する Kafka ブローカーの URL。形式は host1:port1,host2:port2 で、一覧はブローカーのサブセットまたはブローカーのサブセットを参照する VIP にすることができます。このオプションは、Kafka ドキュメントの bootstrap.servers と呼ばれます。 | 文字列 | |
camel.component.kafka.buffer-memory-size | プロデューサーが、サーバーへの送信を待機しているレコードをバッファーリングするために使用できるメモリーの合計バイト数。レコードがサーバーへ配信できるよりも速く送信されると、プロデューサーは block.on.buffer.full で指定された設定に基づいて例外をブロックまたは出力します。この設定は、プロデューサーが使用するメモリーの合計にほぼ対応しますが、プロデューサーが使用するすべてのメモリーが使用されるため、ハードバウンドではありません。一部の追加メモリーは、圧縮 (圧縮が有効な場合) やインフライトリクエストの維持に使用されます。 | 33554432 | 整数 |
camel.component.kafka.check-crcs | 消費されたレコードの CRC32 を自動的に確認します。これにより、メッセージのネットワーク上またはディスク上の破損が発生しなくなります。このチェックはオーバーヘッドを追加するため、極端なパフォーマンスを求める場合は無効になる可能性があります。 | true | ブール値 |
camel.component.kafka.client-id | クライアント ID は、トレース呼び出しに役立つ各リクエストで送信されるユーザー指定の文字列です。リクエストを行うアプリケーションを論理的に特定する必要があります。 | 文字列 | |
camel.component.kafka.commit-timeout-ms | 同期コミットが完了するまでコードが待機する最大時間(ミリ秒単位)。オプションは java.lang.Long 型です。 | 5000 | Long |
camel.component.kafka.compression-codec | このパラメーターを使用すると、このプロデューサーによって生成されたすべてのデータの圧縮コーデックを指定できます。有効な値は none、gzip、snappy です。 | none | 文字列 |
camel.component.kafka.configuration | エンドポイントが再利用する一般的なオプションで Kafka コンポーネントを事前に設定できます。オプションは org.apache.camel.component.kafka.KafkaConfiguration タイプです。 | KafkaConfiguration | |
camel.component.kafka.connection-max-idle-ms | この設定で指定された期間 (ミリ秒単位) の後にアイドル状態の接続を閉じます。 | 540000 | 整数 |
camel.component.kafka.consumer-request-timeout-ms | この設定は、クライアントの要求の応答を待つ最大時間を制御します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信します。または、再試行が使い切られるとリクエストが失敗します。 | 40000 | 整数 |
camel.component.kafka.consumers-count | kafka サーバーに接続するコンシューマーの数。各コンシューマーは、受信データを取得して処理する個別のスレッドで実行されます。 | 1 | 整数 |
camel.component.kafka.delivery-timeout-ms | send() の呼び出しが返された後、成功または失敗を報告する時間の上限。これにより、送信前にレコードが遅延する合計時間、ブローカーから確認応答を待つ時間 (予想される場合)、および再試行可能な送信の失敗に許容される時間が制限されます。 | 120000 | 整数 |
camel.component.kafka.enable-idempotence | 'true' に設定すると、プロデューサーは、プロデューサーは各メッセージのコピーが 1 つだけストリームに書き込まれるようにします。'false' の場合、プロデューサーの再試行によって再試行されたメッセージの重複がストリームに書き込まれる可能性があります。このオプションを true に設定すると、max.in.flight.requests.per.connection を 1 に設定する必要があり、再試行はゼロにできず、さらに 'all' に設定する必要があります。 | false | ブール値 |
camel.component.kafka.enabled | kafka コンポーネントの自動設定を有効にするかどうか。これはデフォルトで有効になっています。 | ブール値 | |
camel.component.kafka.fetch-max-bytes | サーバーがフェッチリクエストに対して返す必要のあるデータの最大量。これは絶対最大値ではありません。フェッチの最初の空ではないパーティションの最初のメッセージがこの値よりも大きい場合でも、コンシューマーが確実に進行できるようにメッセージが返されます。ブローカーによって許可されるメッセージの最大サイズは、message.max.bytes (ブローカー設定)または max.message.bytes (トピック設定)で定義されます。コンシューマーは複数のフェッチを並行して実行することに注意してください。 | 52428800 | 整数 |
camel.component.kafka.fetch-min-bytes | サーバーがフェッチ要求に対して返す必要のあるデータの最小量。利用可能なデータが不十分な場合、リクエストは、リクエストに応答する前に、十分なデータが蓄積されるのを待ちます。 | 1 | 整数 |
camel.component.kafka.fetch-wait-max-ms | fetch.min.bytes をすぐに満たすのに十分なデータがない場合に、サーバーがフェッチリクエストに応答するまでにブロックする最大時間。 | 500 | 整数 |
camel.component.kafka.group-id | このコンシューマーが属するコンシューマープロセスのグループを一意に識別する文字列。複数のプロセスが同じグループ ID を設定すると、それらがすべて同じコンシューマーグループの一部であることを示します。このオプションはコンシューマーに必要です。 | 文字列 | |
camel.component.kafka.group-instance-id | エンドユーザーが提供するコンシューマーインスタンスの一意の識別子。non-empty strings のみが許可されます。設定されている場合、コンシューマーは静的メンバーとして扱われます。つまり、常に、この ID を持つ 1 つのインスタンスのみがコンシューマーグループで許可されます。これは、より大きなセッションタイムアウトと組み合わせて使用して、一時的な利用不可 (プロセス再起動など) によるグループのリバランスを回避します。設定しないと、コンシューマーは従来の動作である動的メンバーとしてグループに参加します。 | 文字列 | |
camel.component.kafka.header-deserializer | カスタム KafkaHeaderDeserializer を使用して kafka ヘッダーの値をデシリアライズするには、以下を行います。オプションは org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer タイプです。 | KafkaHeaderDeserializer | |
camel.component.kafka.header-filter-strategy | カスタムの HeaderFilterStrategy を使用して、Camel メッセージとの間でヘッダーをフィルタリングします。このオプションは org.apache.camel.spi.HeaderFilterStrategy タイプです。 | HeaderFilterStrategy | |
camel.component.kafka.header-serializer | カスタム KafkaHeaderSerializer を使用して kafka ヘッダー値をシリアライズするには、以下を行います。オプションは org.apache.camel.component.kafka.serde.KafkaHeaderSerializer タイプです。 | KafkaHeaderSerializer | |
camel.component.kafka.heartbeat-interval-ms | Kafka のグループ管理機能を使用する場合の、ハートビートからコンシューマーコーディネーター間の想定される時間。ハートビートは、コンシューマーのセッションがアクティブな状態を維持し、新しいコンシューマーがグループに参加したり離脱したりする際のリバランスを促進するために使用されます。この値は session.timeout.ms よりも低く設定する必要がありますが、通常はその値の 1/3 以下に設定する必要があります。さらに低く調整することで、通常のリバランスの予想時間を制御することもできます。 | 3000 | 整数 |
camel.component.kafka.interceptor-classes | プロデューサーまたはコンシューマーのインターセプターを設定します。プロデューサーインターセプターは、org.apache.kafka.clients.producer.ProducerInterceptor コンシューマーインターセプターを実装するクラスである必要があります。プロデューサーインターセプターは、org.apache.kafka.clients.consumer.ConsumerInterceptor を実装するクラスである必要があります。コンシューマーで Producer インターセプターを使用すると、ランタイムでクラスキャスト例外が発生します。 | 文字列 | |
camel.component.kafka.kafka-client-factory | org.apache.kafka.clients.consumer.KafkaConsumer および org.apache.kafka.clients.producer.KafkaProducer インスタンスの作成に使用するファクトリー。これにより、vanilla Kafka クライアントを拡張するロジックを持つインスタンスを作成するようにカスタムファクトリーを設定できます。オプションは org.apache.camel.component.kafka.KafkaClientFactory タイプです。 | KafkaClientFactory | |
camel.component.kafka.kafka-manual-commit-factory | KafkaManualCommit インスタンスの作成に使用するファクトリー。これにより、カスタムファクトリーがカスタム KafkaManualCommit インスタンスを作成するためにプラグインしてカスタム KafkaManualCommit インスタンスを作成できます。オプションは org.apache.camel.component.kafka.KafkaManualCommitFactory タイプです。 | KafkaManualCommitFactory | |
camel.component.kafka.kerberos-before-relogin-min-time | 更新試行間のログインスレッドのスリープ時間。 | 60000 | 整数 |
camel.component.kafka.kerberos-init-cmd | Kerberos kinit コマンドパス。デフォルトは /usr/bin/kinit です。 | /usr/bin/kinit | 文字列 |
camel.component.kafka.kerberos-principal-to-local-rules | プリンシパル名から短縮名 (通常はオペレーティングシステムのユーザー名) にマッピングするためのルールの一覧です。ルールは順番に評価され、プリンシパル名と一致する最初のルールは、これを短縮名にマップするために使用されます。一覧の後続のルールは無視されます。デフォルトでは、{username}/{hostname}{REALM} 形式のプリンシパル名は {username} にマッピングされます。形式の詳細は、セキュリティー承認および acls ドキュメントを参照してください。複数の値はコンマで区切ることができます。 | DEFAULT | 文字列 |
camel.component.kafka.kerberos-renew-jitter | 更新時間に追加されたランダムなジッターの割合。 | double | |
camel.component.kafka.kerberos-renew-window-factor | ログインスレッドは、最後の更新からチケットの有効期限までの指定された時間のウィンドウファクターに達するまでスリープし、その時点でチケットの更新を試みます。 | double | |
camel.component.kafka.key | レコードキー (キーが指定されていない場合は null)。このオプションが設定されている場合、ヘッダー KafkaConstants#KEY よりも優先されます。 | 文字列 | |
camel.component.kafka.key-deserializer | Deserializer インターフェイスを実装するキーのデシリアライザークラス。 | org.apache.kafka.common.serialization.StringDeserializer | 文字列 |
camel.component.kafka.key-serializer | キーのシリアライザークラス(指定がない場合は、デフォルトでメッセージと同じになります)。 | org.apache.kafka.common.serialization.StringSerializer | 文字列 |
camel.component.kafka.lazy-start-producer | 最初のメッセージでプロデューサーをレイジーに起動すべきかどうか。レイジーに起動することで、起動時にプロデューサーが失敗し、それによりルートが失敗する可能性がある状況で、CamelContext およびルートの起動を許可します。レイジーな起動を延期すると、Camel のルーティングエラーハンドラー経由でメッセージのルーティング中に起動の失敗を処理できます。最初のメッセージが処理されるときに、プロデューサーの作成および起動に若干時間がかかり、合計処理時間が長くなる可能性があることに注意してください。 | false | ブール値 |
camel.component.kafka.linger-ms | プロデューサーは、リクエストの送信の間に到着したレコードを 1 つのバッチリクエストにグループ化します。通常、これは、レコードが送信できるよりも早く到着した場合に、負荷がかかった状態でのみ発生します。ただし、状況によっては、中程度の負荷がかかっている場合でも、クライアントがリクエストの数を減らしたい場合があります。この設定は、少量の人為的な遅延を追加することでこれを実現します。つまり、レコードをすぐに送信するのではなく、プロデューサーは指定された遅延まで待機して他のレコードを送信できるようにし、送信をまとめてバッチ処理できるようにします。これは、TCP の Nagle アルゴリズムに類似するものと考えることができます。この設定は、バッチ処理の遅延の上限を提供します。あるパーティションで batch.size 相当のレコードを取得すると、この設定に関係なくすぐに送信されますが、このパーティションで蓄積されたバイト数がこれより少ない場合は、指定された時間の間、さらにレコードが取得されるのを待つことになります。デフォルトは 0 (つまり遅延なし) に設定されます。たとえば、linger.ms=5 を設定すると、送信されるリクエストの数が減りますが、負荷がない状態で送信されるレコードに最大 5 ミリ秒のレイテンシーが追加されます。 | 0 | 整数 |
camel.component.kafka.max-block-ms | この設定は、kafka への送信がブロックする時間を制御します。これらのメソッドは、複数の理由でブロックされる可能性があります。たとえば、バッファーが満杯、メタデータは利用できません。この設定は、メタデータの取得、キーと値のシリアライズ、send ()の実行時にバッファーメモリーのシリアライズ、バッファーメモリーのシリアライズに費やされた合計時間に最大制限を課します。partitionsFor ()の場合、この設定はメタデータを待機している最大時間しきい値を課します。 | 60000 | 整数 |
camel.component.kafka.max-in-flight-request | ブロックする前にクライアントが 1 つの接続で送信する確認されていないリクエストの最大数。この設定を 1 よりも大きい値に設定され、送信が失敗した場合には、再試行(再試行が有効な場合)によるメッセージの並べ替えのリスクがあることに注意してください。 | 5 | 整数 |
camel.component.kafka.max-partition-fetch-bytes | サーバーが返すパーティションごとのデータの最大量。リクエストに使用される最大合計メモリーは #partitions max.partition.fetch.bytes になります。このサイズは、サーバーが許可する最大メッセージサイズ以上である必要があります。そうしないと、プロデューサーがフェッチできるサイズよりも大きいメッセージを送信できます。この場合、コンシューマーは特定のパーティションで大きなメッセージの取得を試みてスタックする可能性があります。 | 1048576 | 整数 |
camel.component.kafka.max-poll-interval-ms | コンシューマーグループ管理を使用する場合の poll() の呼び出し間の最大遅延。これにより、コンシューマーがさらにレコードをフェッチする前にアイドル状態になることができる時間に上限が設定されます。このタイムアウトの期限が切れる前に poll() が呼び出されない場合、コンシューマーは失敗とみなされ、グループはパーティションを別のメンバーに再割り当てするためにリバランスします。オプションは java.lang.Long 型です。 | Long | |
camel.component.kafka.max-poll-records | poll() への単一の呼び出しで返される最大レコード数。 | 500 | 整数 |
camel.component.kafka.max-request-size | 要求の最大サイズ。これは事実上、最大レコードサイズの上限でもあります。サーバーには、レコードサイズに独自の上限があり、これとは異なる可能性があることに注意してください。この設定により、プロデューサーが 1 回のリクエストで送信するレコードバッチの数が制限され、大量のリクエストが送信されないようになります。 | 1048576 | 整数 |
camel.component.kafka.metadata-max-age-ms | 新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がない場合でも、メタデータの更新を強制するまでの期間 (ミリ秒単位)。 | 300000 | 整数 |
camel.component.kafka.metric-reporters | メトリクスレポーターとして使用するクラスの一覧。MetricReporter インターフェイスを実装すると、新しいメトリクスの作成が通知されるクラスでプラグインが可能になります。JmxReporter は、JMX 統計を登録するために常に含まれます。 | 文字列 | |
camel.component.kafka.metrics-sample-window-ms | メトリクスを計算するために保持されるサンプルの数。 | 30000 | 整数 |
camel.component.kafka.no-of-metrics-sample | メトリクスを計算するために保持されるサンプルの数。 | 2 | 整数 |
camel.component.kafka.offset-repository | トピックの各パーティションのオフセットをローカルで保存するために使用するオフセットリポジトリー。定義すると、自動コミットが無効になります。オプションは org.apache.camel.spi.StateRepository<java.lang.String, java.lang.String> タイプです。 | StateRepository | |
camel.component.kafka.partition-assignor | グループ管理が使用される場合に、クライアントがコンシューマーインスタンス間でパーティションの所有権を分散するために使用するパーティション割り当てストラテジーのクラス名。 | org.apache.kafka.clients.consumer.RangeAssignor | 文字列 |
camel.component.kafka.partition-key | レコードの送信先となるパーティション(パーティションが指定されていない場合は null)。このオプションが設定されている場合、ヘッダー KafkaConstants#PARTITION_KEY よりも優先されます。 | 整数 | |
camel.component.kafka.partitioner | サブトピック間でメッセージを分割するための partitioner クラス。デフォルトのパーティショナーは、キーのハッシュに基づいています。 | org.apache.kafka.clients.producer.internals.DefaultPartitioner | 文字列 |
camel.component.kafka.poll-exception-strategy | コンシューマーでカスタムストラテジーを使用して、メッセージのプーリング中に Kafka ブローカーから発生した例外を処理する方法を制御します。オプションは org.apache.camel.component.kafka.PollExceptionStrategy タイプです。 | PollExceptionStrategy | |
camel.component.kafka.poll-on-error | 新しいメッセージのポーリング中に、kafka が例外を出力した場合のアクション。エンドポイントレベルで明示的な値が設定されていない限り、デフォルトでコンポーネント設定からの値を使用します。DISCARD はメッセージを破棄し、次のメッセージのポーリングを続行します。ERROR_HANDLER は Camel のエラーハンドラーを使用して例外を処理し、その後、次のメッセージのポーリングを続行します。RECONNECT はコンシューマーを再接続し、再度 RETRY をポーリングしようとすると、同じメッセージをポーリングして STOP を再度試行します(コンシューマーが再度メッセージを消費できる必要がある場合は、手動で起動/再起動する必要があります)。 | PollOnError | |
camel.component.kafka.poll-timeout-ms | KafkaConsumer をポーリングするときに使用されるタイムアウト。オプションは java.lang.Long 型です。 | 5000 | Long |
camel.component.kafka.producer-batch-size | 複数のレコードが同じパーティションに送信されるときは常に、プロデューサーはレコードをまとめてより少ない要求にバッチ処理しようとします。これにより、クライアントとサーバーの両方でパフォーマンスが向上します。この設定では、デフォルトのバッチサイズをバイト単位で制御します。このサイズより大きいレコードをバッチ処理することはありません。ブローカーに送信されたリクエストには複数のバッチが含まれます。各パーティションに 1 つずつ送信できるデータを持ちます。バッチサイズが小さいと、バッチ処理が一般的になり、スループットが減少する可能性があります(バッチサイズがゼロの場合はバッチを完全に無効にします)。バッチサイズが非常に大きい場合は、追加のレコードを想定して、常に指定のバッチサイズのバッファーを割り当てるため、メモリーを多少無駄に使用する可能性があります。 | 16384 | 整数 |
camel.component.kafka.queue-buffering-max-messages | 非同期モードを使用している場合にプロデューサーをキューに入れることができる未送信メッセージの最大数。プロデューサーがブロックされるか、またはデータをドロップする必要があります。 | 10000 | 整数 |
camel.component.kafka.receive-buffer-bytes | データの読み取り時に使用する TCP 受信バッファー (SO_RCVBUF) のサイズ。 | 65536 | 整数 |
camel.component.kafka.reconnect-backoff-max-ms | 接続に繰り返し失敗したブローカーへの再接続時に待機する最大時間 (ミリ秒単位)。これが指定されている場合、ホストごとのバックオフは、連続して接続に失敗するたびに、この最大値まで指数関数的に増加します。バックオフの増加を計算した後、コネクションストームを回避するために 20% のランダムなジッターが追加されます。 | 1000 | 整数 |
camel.component.kafka.reconnect-backoff-ms | 特定のホストへの再接続を試みるまで待機する時間。これにより、タイトなループでホストに繰り返し接続することを回避します。このバックオフは、コンシューマーによってブローカーに送信されるすべてのリクエストに適用されます。 | 50 | 整数 |
camel.component.kafka.record-metadata | プロデューサーが RecordMetadata の結果を Kafka に送信すべきかどうか。結果は RecordMetadata メタデータが含まれる List に保存されます。このリストは、KafkaConstants#KAFKA_RECORDMETA キーを持つヘッダーに保存されます。 | true | ブール値 |
camel.component.kafka.request-required-acks | リクエストが完了したと見なす前に、プロデューサーがリーダーに受け取ったことを要求する確認の数。これは、送信されるレコードの耐久性を制御します。以下の設定は一般的です。acks=0 をゼロに設定すると、プロデューサーはサーバーからの確認をまったく待ちません。レコードは直ちにソケットバッファーに追加され、送信済みと見なされます。この場合、サーバーがレコードを受信したかどうかは保証されず、retries の設定は有効になりません (クライアントは通常、失敗を知ることができないからです)。各レコードで返されるオフセットは常に -1 に設定されます。これは、リーダーがレコードをローカルログに書き込みますが、すべてのフォロワーからの完全な確認を待たずに応答します。この場合、リーダーはレコードを承認した直後に失敗し、フォロワーがレコードを複製する前に失敗すると、レコードが失われます。acks=all これは、リーダーが In-Sync レプリカの完全なセットを待機してレコードを確認するのを待ちます。これにより、少なくとも 1 つの In-Sync レプリカが動作している限り、レコードが失われないことが保証されます。これは利用可能な最強の保証になります。 | 1 | 文字列 |
camel.component.kafka.request-timeout-ms | ブローカーが request.required.acks 要件を満たすまで待機してから、エラーをクライアントに送り返す時間。 | 30000 | 整数 |
camel.component.kafka.resume-strategy | このオプションを使用すると、ユーザーはカスタムの再開ストラテジーを設定できます。再開ストラテジーは、パーティションが割り当てられているときに実行されます(接続または再接続時など)。これにより、実装は操作を再開する方法をカスタマイズし、seekTo および offsetRepository メカニズムよりも柔軟な代替手段として機能します。実装の詳細については、KafkaConsumerResumeStrategy を参照してください。このオプションは、自動コミット設定には影響しません。この設定を使用する実装は、これと共に manual commit オプションを使用して評価する必要がある可能性もあります。オプションは org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy タイプです。 | KafkaConsumerResumeStrategy | |
camel.component.kafka.retries | ゼロより大きい値を設定すると、クライアントは、一時的なエラーの可能性により送信に失敗したレコードを再送信します。この再試行は、クライアントがエラーを受信したときにレコードを再送した場合と同じであることに注意してください。再試行を許可すると、レコードの順序が変わる可能性があります。2 つのレコードが 1 つのパーティションに送信され、最初のレコードが失敗して再試行され、2 番目に成功すると、2 番目のレコードが最初に表示される可能性があるためです。 | 0 | 整数 |
camel.component.kafka.retry-backoff-ms | 再試行する前に、プロデューサーは関連するトピックのメタデータを更新し、新しいリーダーが選出されているかどうかを確認します。リーダーエレクションは少しかかるため、このプロパティーは、メタデータを更新する前にプロデューサーが待機する時間を指定します。 | 100 | 整数 |
camel.component.kafka.sasl-jaas-config | kafka sasl.jaas.config パラメーターを公開します(例:org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;)。 | 文字列 | |
camel.component.kafka.sasl-kerberos-service-name | Kafka が実行される Kerberos プリンシパル名。これは、Kafka の JAAS 設定または Kafka の設定で定義できます。 | 文字列 | |
camel.component.kafka.sasl-mechanism | 使用される Simple Authentication and Security Layer(SASL) メカニズム。有効な値は、を参照してください。 | GSSAPI | 文字列 |
camel.component.kafka.schema-registry-u-r-l | 使用する Confluent Platform スキーマレジストリーサーバーの URL。形式は host1:port1,host2:port2 です。これは、Confluent Platform ドキュメントの schema.registry.url として知られています。このオプションは、Confluent Platform (標準の Apache Kafka ではない) でのみ使用できます。 | 文字列 | |
camel.component.kafka.security-protocol | ブローカーとの通信に使用されるプロトコル。SASL_PLAINTEXT、PLAINTEXT、および SSL がサポートされています。 | PLAINTEXT | 文字列 |
camel.component.kafka.seek-to | KafkaConsumer が起動時に開始または終了する場合に設定します。最初から読み取る : read from end (終了から読み込んだ)は、以前のプロパティー seekToBeginning に代わるものです。 | 文字列 | |
camel.component.kafka.send-buffer-bytes | ソケット書き込みバッファーサイズ。 | 131072 | 整数 |
camel.component.kafka.session-timeout-ms | Kafka のグループ管理機能の使用時に障害を検出するために使用されるタイムアウト。 | 10000 | 整数 |
camel.component.kafka.shutdown-timeout | コンシューマーまたはプロデューサーがそのワーカースレッドを正常にシャットダウンして終了するまでのタイムアウト(ミリ秒単位)。 | 30000 | 整数 |
camel.component.kafka.specific-avro-reader | これにより、Confluent Platform スキーマレジストリーおよび io.confluent.kafka.serializers.KafkaAvroDeserializer で使用する特定の Avro リーダーを使用できます。このオプションは、Confluent Platform (標準の Apache Kafka ではない) でのみ使用できます。 | false | ブール値 |
camel.component.kafka.ssl-cipher-suites | 暗号化スイートの一覧。これは、TLS または SSL ネットワークプロトコルを使用してネットワーク接続のセキュリティー設定をネゴシエートするために使用される認証、暗号化、MAC、および鍵交換アルゴリズムの名前付きの組み合わせです。デフォルトでは、利用可能なすべての暗号スイートがサポートされます。 | 文字列 | |
camel.component.kafka.ssl-context-parameters | Camel SSLContextParameters オブジェクトを使用した SSL 設定。設定されている場合、これは他の SSL エンドポイントパラメーターの前に適用されます。注記:Kafka はファイルの場所からのキーストアの読み込みのみをサポートします。そのため、KeyStoreParameters.resource オプションで file: の接頭辞を付けます。オプションは org.apache.camel.support.jsse.SSLContextParameters タイプです。 | SSLContextParameters | |
camel.component.kafka.ssl-enabled-protocols | SSL 接続で有効なプロトコルの一覧。TLSv1.2、TLSv1.1、および TLSv1 はデフォルトで有効になっています。 | 文字列 | |
camel.component.kafka.ssl-endpoint-algorithm | サーバー証明書を使用してサーバーのホスト名を検証するエンドポイント識別アルゴリズム。 | https | 文字列 |
camel.component.kafka.ssl-key-password | キーストアファイルの秘密鍵のパスワード。これはクライアントにとってオプションになります。 | 文字列 | |
camel.component.kafka.ssl-keymanager-algorithm | SSL 接続のキーマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたキーマネージャーファクトリーアルゴリズムです。 | SunX509 | 文字列 |
camel.component.kafka.ssl-keystore-location | キーストアファイルの場所。これはクライアントではオプションで、クライアントの双方向認証に使用できます。 | 文字列 | |
camel.component.kafka.ssl-keystore-password | キーストアファイルのストアパスワード。これはクライアントのオプションであり、ssl.keystore.location が設定されている場合にのみ必要です。 | 文字列 | |
camel.component.kafka.ssl-keystore-type | キーストアファイルのファイル形式。これはクライアントにとってオプションになります。デフォルト値は JKS です。 | JKS | 文字列 |
camel.component.kafka.ssl-protocol | SSLContext の生成に使用される SSL プロトコル。デフォルト設定は TLS で、ほとんどの場合で問題ありません。最近の JVM で許可される値は TLS、TLSv1.1、および TLSv1.2 です。SSL、SSLv2、SSLv3 は古い JVM でサポートされる場合がありますが、既知のセキュリティー脆弱性のために使用は推奨されません。 | 文字列 | |
camel.component.kafka.ssl-provider | SSL 接続に使用されるセキュリティープロバイダーの名前。デフォルト値は JVM のデフォルトのセキュリティープロバイダーです。 | 文字列 | |
camel.component.kafka.ssl-trustmanager-algorithm | SSL 接続のトラストマネージャーファクトリーによって使用されるアルゴリズム。デフォルト値は、Java 仮想マシンに設定されたトラストマネージャーファクトリーアルゴリズムです。 | PKIX | 文字列 |
camel.component.kafka.ssl-truststore-location | トラストストアファイルの場所。 | 文字列 | |
camel.component.kafka.ssl-truststore-password | トラストストアファイルのパスワード。 | 文字列 | |
camel.component.kafka.ssl-truststore-type | トラストストアファイルのファイル形式。デフォルト値は JKS です。 | JKS | 文字列 |
camel.component.kafka.synchronous | 同期処理を厳密に使用するかどうかを設定します。 | false | ブール値 |
camel.component.kafka.topic-is-pattern | トピックがパターン(正規表現)であるかどうか。これは、パターンに一致するトピックの動的数にサブスクライブするために使用できます。 | false | ブール値 |
camel.component.kafka.use-global-ssl-context-parameters | グローバル SSL コンテキストパラメーターの使用を有効にします。 | false | ブール値 |
camel.component.kafka.value-deserializer | Deserializer インターフェイスを実装する値のデシリアライザークラス。 | org.apache.kafka.common.serialization.StringDeserializer | 文字列 |
camel.component.kafka.value-serializer | メッセージのシリアライザークラス。 | org.apache.kafka.common.serialization.StringSerializer | 文字列 |
camel.component.kafka.worker-pool | kafka サーバーが非同期の非ブロッキング処理を使用して KafkaProducer から送信されたメッセージを確認した後に、カスタムワーカープールを使用して、ルーティングエクスチェンジを継続します。このオプションを使用する場合、プールが不要になったときにシャットダウンするには、スレッドプールのライフサイクルを処理する必要があります。オプションは java.util.concurrent.ExecutorService タイプです。 | executorService | |
camel.component.kafka.worker-pool-core-size | kafka サーバーが非同期の非ブロッキング処理を使用して KafkaProducer から送信されたメッセージを確認した後に、ワーカープールのコアスレッドの数。 | 10 | 整数 |
camel.component.kafka.worker-pool-max-size | kafka サーバーが非同期の非ブロッキング処理を使用して KafkaProducer から送信されたメッセージを確認した後に、ルーティングエクスチェンジを継続するためのワーカープールのスレッドの最大数。 | 20 | 整数 |