6.2.7. オフセットポリシーの管理
auto.offset.reset
プロパティーを使用して、オフセットをすべてコミットしなかった場合やコミットされたオフセットが有効でないまたは削除された場合の、コンシューマーの動作を制御します。
コンシューマーアプリケーションを初めてデプロイし、既存のトピックからメッセージを読み取る場合について考えてみましょう。group.id
が初めて使用されるため、__consumer_offsets
トピックには、このアプリケーションのオフセット情報は含まれません。新しいアプリケーションは、ログの始めからすべての既存メッセージの処理を開始するか、新しいメッセージのみ処理を開始できます。デフォルトのリセット値は、パーティションの最後から開始する latest
で、一部のメッセージは見逃されることを意味します。データの損失を回避し、処理量を増やすには、auto.offset.reset
を earliest
に設定し、パーティションの最初から開始します。
また、ブローカーに設定されたオフセットの保持期間 (offsets.retention.minutes
) が終了したときにメッセージが失われないようにするため、earliest
オプションを使用することも検討してください。コンシューマーグループまたはスタンドアロンコンシューマーが非アクティブで、保持期間中にオフセットをコミットしない場合、以前にコミットされたオフセットは __consumer_offsets
から削除されます。
# ... heartbeat.interval.ms=3000 1 session.timeout.ms=10000 2 auto.offset.reset=earliest 3 # ...
- 1
- 予想されるリバランスに応じて、ハートビートの間隔を短くして調整します。
- 2
- タイムアウトの期限が切れる前に Kafka ブローカーによってハートビートが受信されなかった場合、コンシューマーはコンシューマーグループから削除され、リバランスが開始されます。ブローカー設定に
group.min.session.timeout.ms
およびgroup.max.session.timeout.ms
がある場合は、セッションタイムアウト値はこの範囲内である必要があります。 - 3
- パーティションの最初に戻り、オフセットがコミットされなかった場合にデータの損失が発生しないようにするには、
earliest
に設定します。
1 つのフェッチリクエストで返されるデータ量が大きい場合、コンシューマーが処理する前にタイムアウトが発生することがあります。この場合は、max.partition.fetch.bytes
の値を低くするか、session.timeout.ms
の値を高くします。