4.6. オフセットをコミットする際のデータ損失または重複の回避
Kafka の 自動コミットメカニズム により、コンシューマーはメッセージのオフセットを自動的にコミットできます。有効にすると、コンシューマーはブローカーをポーリングして受信したオフセットを 5000ms 間隔でコミットします。
自動コミットのメカニズムは便利ですが、データ損失と重複のリスクが発生します。コンシューマーが多くのメッセージを取得および変換し、自動コミットの実行時にコンシューマーバッファーに処理されたメッセージがある状態でシステムがクラッシュすると、そのデータは失われます。メッセージの処理後、自動コミットの実行前にシステムがクラッシュした場合、リバランス後に別のコンシューマーインスタンスでデータが複製されます。
ブローカーへの次のポーリングの前またはコンシューマーが閉じられる前に、すべてのメッセージが処理された場合は、自動コミットによるデータの損失を回避できます。
データの損失や重複の可能性を最小限に抑えるには、enable.auto.commit
を false
に設定し、クライアントアプリケーションを開発して、オフセットのコミットをより詳細に制御できるようにします。または、auto.commit.interval.ms
を使用してコミットの間隔を減らすことができます。
# ...
enable.auto.commit=false 1
# ...
- 1
- 自動コミットを false に設定すると、オフセットのコミットの制御が強化されます。
enable.auto.commit
を false
に設定すると、すべての 処理が実行され、メッセージが消費された後にオフセットをコミットできます。たとえば、Kafka commitSync
および commitAsync
コミット API を呼び出すようにアプリケーションを設定できます。
commitSync
API は、ポーリングから返されるメッセージバッチのオフセットをコミットします。バッチのメッセージすべての処理が完了したら API を呼び出します。commitSync
API を使用する場合、アプリケーションはバッチの最後のオフセットがコミットされるまで新しいメッセージをポーリングしません。これがスループットに悪影響する場合は、コミットする頻度が低いか、commitAsync
API を使用できます。commitAsync
API はブローカーがコミットリクエストにレスポンスするまで待機しませんが、リバランス時にさらに重複が発生するリスクがあります。一般的な方法として、両方のコミット API をアプリケーションで組み合わせ、コンシューマーをシャットダウンまたはリバランスの直前に commitSync
API を使用し、最終コミットが正常に実行されるようにします。
4.6.1. トランザクションメッセージの制御
プロデューサー側でトランザクション ID を使用し、冪等性 (enable.idempotence=true
) を有効にすることを検討してください。これにより、1 回限りの配信を保証します。コンシューマー側で、isolation.level
プロパティーを使用して、コンシューマーによってトランザクションメッセージが読み取られる方法を制御できます。
isolation.level
プロパティーには、有効な 2 つの値があります。
-
read_committed
-
read_uncommitted
(デフォルト)
read_committed
を使用して、コミットされたトランザクションメッセージのみがコンシューマーによって読み取られるようにします。ただし、これによりトランザクションの結果を記録するトランザクションマーカー (committed または aborted) がブローカーによって書き込まれるまで、コンシューマーはメッセージを返すことができないため、エンドツーエンドのレイテンシーが長くなります。
# ...
enable.auto.commit=false
isolation.level=read_committed 1
# ...
- 1
- コミットされたメッセージのみがコンシューマーによって読み取られるように、
read_committed
に設定します。