3.6. 削除および圧縮ポリシーを使用した Kafka ログの管理


Kafka はログに依存してメッセージデータを保存します。ログは一連のセグメントで設定され、各セグメントはオフセットベースおよびタイムスタンプベースのインデックスに関連付けられます。新しいメッセージは アクティブ なセグメントに書き込まれ、その後変更されません。コンシューマーからのフェッチ要求を処理するときに、セグメントが読み取られます。定期的に、アクティブセグメントが ロール されて読み取り専用になり、それを置き換えるために新しいアクティブセグメントが作成されます。トピックパーティションごとに 1 つのアクティブなセグメントのみがあります。古いセグメントは、削除の対象となるまで保持されます。

ブローカーレベルの設定により、ログセグメントの最大サイズ (バイト単位) とアクティブなセグメントがロールされるまでの時間 (ミリ秒単位) が決まります。

# ...
log.segment.bytes=1073741824
log.roll.ms=604800000
# ...

これらの設定は、segment.bytes および segment.ms を使用してトピックレベルでオーバーライドできます。これらの値を下げるか上げるかの選択は、セグメント削除のポリシーによって異なります。サイズが大きいほど、アクティブセグメントに含まれるメッセージが多くなり、ロールされる頻度が少なくなります。セグメントが削除の対象となる頻度も低くなります。

Kafka では、ログクリーンアップポリシーによってログデータの管理方法が決まります。ほとんどの場合、クラスターレベルでデフォルト設定を変更する必要はないため、delete クリーンアップポリシーが指定され、compact クリーンアップポリシーで使用されるログクリーナーが有効になります。

# ...
log.cleanup.policy=delete
log.cleaner.enable=true
# ...
Delete クリーンアップポリシー
Delete クリーンアップポリシーは、全トピックを対象にするデフォルトのクラスター全体のポリシーです。このポリシーは、特定のトピックレベルのポリシーが設定されていないトピックに適用されます。Kafka は、時間ベースまたはサイズベースのログ保持制限に基づいて古いセグメントを削除します。
Compact クリーンアップポリシー
Compact クリーンアップポリシーは通常、トピックレベルのポリシー (cleanup.policy=compact) として設定されます。Kafka のログクリーナーは、特定のトピックに圧縮を適用し、トピック内のキーの最新の値のみを保持します。両方のポリシー (cleanup.policy=compact,delete) を使用するようにトピックを設定することもできます。

削除ポリシーの保持制限の設定

Delete クリーンアップポリシーは、データを保持したログの管理に対応します。このポリシーは、データを永久に保持する必要がない場合に適しています。時間ベースまたはサイズベースのログ保持ポリシーとクリーンアップポリシーを確立して、ログを制限した状態に保つことができます。

ログ保持ポリシーが使用される場合、保持制限に達すると、アクティブではないログセグメントが削除されます。古いセグメントを削除すると、ディスク容量の超過を防ぐことができます。

期間ベースのログ保持の場合は、時間、分、またミリ秒ベースで保持期間を設定します。

# ...
log.retention.ms=1680000
# ...

保持期間は、メッセージがセグメントに追加された時間に基づいています。Kafka は、セグメント内の最新のメッセージのタイムスタンプを使用して、そのセグメントの有効期限が切れているかどうかを判断します。ミリ秒設定は分設定よりも優先され、分設定は時間設定おりも優先されます。分とミリ秒の設定はデフォルトで null ですが、3 つのオプションにより、保持するデータを実質的に制御できます。動的に更新できるのは 3 つのプロパティーの 1 つだけであるため、ミリ秒設定を優先する必要があります。

log.retention.ms が -1 に設定されている場合には、ログ保持には時間制限が適用されないため、すべてのログが保持されます。ただし、この設定は、ディスクがいっぱいになるなど、修正の困難な問題が発生する可能性があるため、通常は推奨されません。

サイズベースのログ保持の場合、最小ログサイズ (バイト単位) を指定します。

# ...
log.retention.bytes=1073741824
# ...

これは、Kafka により、少なくとも指定された量の利用可能なログデータを常に存在させることができます。

たとえば、log.retention.bytes を 1000 に設定し、log.segment.bytes を 300 に設定した場合、Kafka は 4 セグメントとアクティブセグメントを保持し、少なくとも 1000 バイトを使用できるようにします。アクティブなセグメントがいっぱいになり、新しいセグメントが作成されると、最も古いセグメントが削除されます。この時点で、ディスク上のサイズは指定された 1000 バイトを超える可能性があり、1200 ~ 1500 バイトの範囲になる可能性があります (インデックスファイルを除く)。

ログサイズの使用に関する潜在的な問題は、メッセージがセグメントに追加された時刻が考慮されていないことです。クリーンアップポリシーに時間ベースおよびサイズベースのログ保持を使用して、必要なバランスをとることができます。どちらのしきい値に最初に到達しても、クリーンアップがトリガーされます。

セグメントファイルがシステムから削除されるまでの遅延時間を追加するには、すべてのトピックに対してブローカーレベルで log.segment.delete.lay.ms を使用できます。

# ...
log.segment.delete.delay.ms=60000
# ...

または、トピックレベルで file.delete.lay.ms を設定します。

ログのクリーンアップをチェックする頻度をミリ秒単位で設定します。

# ...
log.retention.check.interval.ms=300000
# ...

ログ保持設定に関連して、ログ保持チェックの間隔を調整します。保持サイズが小さいほど、より頻繁なチェックが必要になる場合があります。クリーンアップの頻度は、ディスクスペースを管理するのに十分な頻度である必要がありますが、ブローカーのパフォーマンスに影響を与えるほど頻度を上げてはなりません。

コンパクトポリシーを使用して最新のメッセージの保持

cleanup.policy=compact を設定してトピックのログ圧縮を有効にすると、Kafka はログクリーナーをバックグラウンドスレッドとして使用して圧縮を実行します。圧縮ポリシーは、各メッセージキーの最新のメッセージが保持されるようにし、古いバージョンのレコードを効果的に消去します。このポリシーは、メッセージ値が変更可能であり、最新の更新を保持する場合に適しています。

ログコンパクションにクリーンアップポリシーが設定されている場合、ログの 先頭 は標準の Kafka ログとして機能し、新しいメッセージへの書き込みが順番に追加されます。ログクリーナーが動作する圧縮ログの 末尾 で、ログの後半でキーが同じ別のレコードが発生した場合、レコードは削除されます。null 値を持つメッセージも削除されます。Kafka では、各キーの最新メッセージが保持されるようにしますが、圧縮されたログ全体に重複が含まれないようにすることができないので、圧縮を使用するには、関連するメッセージを識別するためのキーが必要です。

図3.1 コンパクション前のオフセットの位置によるキー値の書き込みを示すログ

キー値の書き込みを示す圧縮のイメージ

キーを使用してメッセージを識別する Kafka 圧縮では、特定のメッセージキーのログ末尾に存在する最新のメッセージ (オフセットが最も大きい) が保持され、最終的にはキーが同じ、以前のメッセージが破棄されます。最新状態のメッセージは常に利用可能であり、その特定の以前のメッセージレコードは、ログクリーナーの実行時に最終的に削除されます。メッセージを以前の状態に復元できます。周囲のレコードが削除されても、レコードは元のオフセットを保持します。その結果、末尾は連続しないオフセットを持つ可能性があります。末尾で使用できなくなったオフセットを消費すると、次に高いオフセットを持つレコードが見つかります。

図3.2 コンパクション後のログ

ログのクリーンアップ後の圧縮のイメージ

必要に応じて、圧縮プロセスに遅延を追加できます。

# ...
log.cleaner.delete.retention.ms=86400000
# ...

削除されたデータの保持期間は、データが完全に削除される前に、データが削除されたことに気付く時間を確保します。

特定のキーに関連するすべてのメッセージを削除するために、プロデューサーは廃棄 (tombstone) メッセージを送信できます。tombstone には null 値があり、そのキーの対応するメッセージが削除されたことをコンシューマーに知らせるマーカーとして機能します。しばらくすると、tombstone マーカーのみが保持されます。新しいメッセージが引き続き受信されると仮定すると、コンシューマーが削除を認識するのに十分な時間を確保できるように、マーカーは log.cleaner.delete.retention.ms で指定された期間保持されます。

クリーニングするログがない場合にクリーナーをスタンバイにする時間をミリ秒単位で設定することもできます。

# ...
log.cleaner.backoff.ms=15000
# ...

圧縮ポリシーと削除ポリシーを組み合わせて使用する

圧縮ポリシーのみを選択すると、ログが任意に大きくなる可能性があります。このような場合、ログを圧縮して削除するようにトピックのクリーンアップポリシーを設定できます。Kafka はログ圧縮を適用し、古いバージョンのレコードを削除し、各キーの最新バージョンのみを保持します。Kafka は、指定された時間ベースまたはサイズベースのログ保持設定に基づいてレコードも削除します。

たとえば、次の図では、特定のメッセージキーの最新のメッセージ (オフセットが最も大きい) のみが圧縮ポイントまで保持されます。保持ポイントまでのレコードが残っている場合は、削除されます。この場合、圧縮プロセスではすべての重複が削除されます。

図3.3 ログ保持ポイントおよびコンパクションポイント

保持ポイントを使用した圧縮のイメージ
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.