48.5. 非同期応答
48.5.1. サーバー上の非同期処理
概要
サーバー側での呼び出しの非同期処理の目的は、スレッドをより効率的に使用できるようにすることで、最終的に、サーバーの要求スレッドがブロックされていることが原因でクライアントの接続試行が拒否されるというシナリオを回避することです。呼び出しが非同期的に処理されると、要求スレッドはほぼすぐに解放されます。
サーバー側で非同期処理が有効になっている場合でも、サーバーから応答を受信するまでクライアントは、ブロックされたまま になることに注意してください。クライアント側で非同期動作を実行するには、クライアント側の非同期処理を実装する必要があります。「クライアントの非同期処理」 を参照してください。
非同期処理の基本モデル
図48.1「非同期処理のスレッドモデル」 は、サーバー側での非同期処理の基本モデルの概要を示しています。
図48.1 非同期処理のスレッドモデル
つまり、リクエストは非同期モデルで以下のように処理されます。
-
非同期リソースメソッドはリクエストスレッド内で呼び出されます (その後応答の送信に必要な
AsyncResponse
オブジェクトへの参照を受け取ります) 。 -
リソースメソッドは、
Runnable
オブジェクトで一時停止されたリクエストをカプセル化します。これには、リクエストを処理するのに必要なすべての情報および処理ロジックが含まれます。 - リソースメソッドは、Runnable オブジェクトをエグゼキュータースレッドプールのブロッキングキューにプッシュします。
- リソースメソッドが返されるようになったため、要求スレッドが解放されます。
-
Runnable
オブジェクトがキューの先頭に到達すると、エグゼキュータースレッドプールのスレッドの 1 つによって処理されます。カプセル化されたAsyncResponse
オブジェクトは、応答をクライアントに送り返すために使用されます。
Java エグゼキューターを使用したスレッドプールの実装
java.util.concurrent
API は、完全なスレッドプールの実装を非常に簡単に作成できる強力な API です。Java 同時実行 API の用語では、スレッドプールは エグゼキューター と呼ばれます。作業スレッドやそれを提供するブロッキングキューなど、完全な作業スレッドプールを作成するには、コード 1 行のみが必要です。
たとえば、図48.1「非同期処理のスレッドモデル」 に記載されている Executor Thread Pool のような完全な作業スレッドプールを作成するには、以下のように java.util.concurrent.Executor
インスタンスを作成します。
Executor executor = new ThreadPoolExecutor( 5, // Core pool size 5, // Maximum pool size 0, // Keep-alive time TimeUnit.SECONDS, // Time unit new ArrayBlockingQueue<Runnable>(10) // Blocking queue );
このコンストラクタは、5 つのスレッドを持つ新しいスレッドプールを作成し、最大 10 個の Runnable
オブジェクトを保持できる単一のブロッキングキューによって供給されます。スレッドプールにタスクを送信するには、executor.execute
メソッドを呼び出して、Runnable
オブジェクト (非同期タスクをカプセル化する) への参照を渡します。
非同期リソースメソッドの定義
非同期であるリソースメソッドを定義するには、@Suspended
アノテーションを使用して型 javax.ws.rs.container.AsyncResponse
の引数を注入し、メソッドが void
を返すことを確認します。以下に例を示します。
// Java ... import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("{id}") public void handleRequestInPool(@PathParam("id") String id, @Suspended AsyncResponse response) { ... } ... }
注入された AsyncResponse
オブジェクトは後でレスポンスを返すために使用されるため、リソースメソッドは void
を返す必要があることに注意してください。
AsyncResponse クラス
javax.ws.rs.container.AsyncResponse
クラスは、受信クライアントコネクションにおける抽象ハンドルを提供します。AsyncResponse
オブジェクトがリソースメソッドに注入されると、ベースとなる TCP クライアントコネクションは最初は 一時停止 状態になります。後で応答を返す準備ができたら、AsyncResponse
インスタンスで resume
を呼び出すことで、基礎となる TCP クライアントコネクションを再度アクティブにし、レスポンスを返すことができます。または、呼び出しを中止する必要がある場合は、AsyncResponse
インスタンスで cancel
を呼び出すことができます。
一時停止した要求の Runnable としてのカプセル化
図48.1「非同期処理のスレッドモデル」 の非同期処理シナリオでは、一時停止された要求をキューにプッシュします。そのキューから、後で専用のスレッドプールで処理できます。ただし、このアプローチを機能させるには、オブジェクトで一時停止された要求を カプセル化 する方法が必要です。一時停止された要求オブジェクトは以下をカプセル化する必要があります。
- 受信要求からのパラメーター (存在する場合)。
-
AsyncResponse
オブジェクト。受信クライアントコネクションのハンドルと、レスポンスの返信方法を提供します。 - 呼び出しのロジック。
これらをカプセル化する便利な方法は、Runnable
クラスを定義して一時停止されたリクエストを表します。ここで、Runnable.run()
メソッドは呼び出しのロジックをカプセル化します。これを実行する最も一般的な方法は、以下の例のように Runnable
をローカルクラスとして実装することです。
非同期処理の例
非同期処理シナリオを実装するには、リソースメソッドの実装は Runnable
オブジェクト (一時停止されたリクエストを表します) をエグゼキュータースレッドプールに渡す必要があります。Java 7 および 8 では、次の例に示すように、いくつかの新しい構文を利用して、Runnable
クラスをローカルクラスとして定義できます。
// Java package org.apache.cxf.systest.jaxrs; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.CompletionCallback; import javax.ws.rs.container.ConnectionCallback; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; import org.apache.cxf.phase.PhaseInterceptorChain; @Path("/bookstore") public class BookContinuationStore { private Map<String, String> books = new HashMap<String, String>(); private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); public BookContinuationStore() { init(); } ... @GET @Path("{id}") public void handleRequestInPool(final @PathParam("id") String id, final @Suspended AsyncResponse response) { executor.execute(new Runnable() { public void run() { // Retrieve the book data for 'id' // which is presumed to be a very slow, blocking operation // ... bookdata = ... // Re-activate the client connection with 'resume' // and send the 'bookdata' object as the response response.resume(bookdata); } }); } ... }
リソースメソッド引数 id
および response
が、Runnable
ローカルクラスの定義に直接渡される方法に注意してください。この特別な構文を使用すると、ローカルクラスの対応するフィールドを定義しなくても、Runnable.run()
メソッドでリソースメソッドの引数を直接使用できます。
この特別な構文を機能させるには、リソースメソッドパラメーターを final
として宣言する 必要 があります (メソッド実装で変更できないことを意味します)。
48.5.2. タイムアウトおよびタイムアウトハンドラー
概要
非同期処理モデルには、REST 呼び出しにタイムアウトを指定するサポートがあります。デフォルトでは、タイムアウトになると HTTP エラー応答がクライアントに送信されます。ただし、タイムアウトハンドラーコールバックを登録するオプションもあります。これにより、タイムアウトイベントへの応答をカスタマイズできます。
ハンドラーなしでタイムアウトを設定する例
タイムアウトハンドラーを指定せずに単純な呼び出しタイムアウトを定義するには、以下の例のように AsyncResponse
オブジェクトで setTimeout
メソッドを呼び出します。
// Java // Java ... import java.util.concurrent.TimeUnit; ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/defaulttimeout") public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); // Optionally, send request to executor queue for processing // ... } ... }
java.util.concurrent.TimeUnit
クラスの任意の時間単位を使用して、タイムアウト値を指定できることに注意してください。上記の例では、要求をエグゼキュータースレッドプールに送信するコードは提示されていません。タイムアウトの動作をテストするだけであれば、リソースメソッド本文に async.SetTimeout
への呼び出しのみを含めると、タイムアウトは呼び出しごとにトリガーされます。
AsyncResponse.NO_TIMEOUT
の値は無限のタイムアウトを表します。
デフォルトのタイムアウト動作
デフォルトでは、呼び出しタイムアウトがトリガーされると、JAX-RS ランタイムが ServiceUnavailableException
例外を発生させ、ステータス 503
で HTTP エラーの応答を返します。
TimeoutHandler インターフェイス
タイムアウトの動作をカスタマイズする場合は、TimeoutHandler
インターフェイスを実装してタイムアウトハンドラーを定義する必要があります。
// Java package javax.ws.rs.container; public interface TimeoutHandler { public void handleTimeout(AsyncResponse asyncResponse); }
実装クラスで handleTimeout
メソッドを上書きする場合は、タイムアウトを処理する次の方法のいずれかを選択できます。
-
asyncResponse.cancel
メソッドを呼び出すことで、レスポンスを取り消します。 -
レスポンス値で
asyncResponse.resume
メソッドを呼び出すことで、レスポンスを送信します。 -
asyncResponse.setTimeout
メソッドを呼び出すことで、待機期間を延長します。たとえば、さらに 10 秒間待つには、asyncResponse.setTimeout(10, TimeUnit.SECONDS)
を呼び出しできます。
ハンドラーでタイムアウトを設定する例
タイムアウトハンドラーで呼び出しタイムアウトを定義するには、以下の例のように AsyncResponse
オブジェクトの setTimeout
メソッドと setTimeoutHandler
メソッドの両方を呼び出します。
// Java ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/cancel") public void getBookDescriptionWithCancel(@PathParam("id") String id, @Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); async.setTimeoutHandler(new CancelTimeoutHandlerImpl()); // Optionally, send request to executor queue for processing // ... } ... }
この例では、呼び出しタイムアウトを処理するために CancelTimeoutHandlerImpl
タイムアウトハンドラーのインスタンスを登録します。
タイムアウトハンドラーを使用した応答の取り消し
CancelTimeoutHandlerImpl
タイムアウトハンドラーは以下のように定義されます。
// Java ... import javax.ws.rs.container.AsyncResponse; ... import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... private class CancelTimeoutHandlerImpl implements TimeoutHandler { @Override public void handleTimeout(AsyncResponse asyncResponse) { asyncResponse.cancel(); } } ... }
AsyncResponse
オブジェクト上で cancel
を呼び出す効果は、クライアントに HTTP 503 (Service unavailable
) エラーの応答を送信することです。任意で、cancel
メソッド (int
または java.util.Date
の値) の引数を指定できます。これは応答メッセージで Retry-After:
HTTP ヘッダーを設定するために使用されます。ただし、クライアントは多くの場合で Retry-After:
ヘッダーを無視します。
Runnable インスタンスでの取り消し済みの応答への対応
エグゼキュータースレッドプールで処理のためにキューに格納された Runnable
インスタンスとして、一時停止されたリクエストをカプセル化した場合、スレッドプールがリクエストを処理するまでに AsyncResponse
がキャンセルされる可能性があります。このため、Runnable
インスタンスにコードを追加する必要があります。これにより、キャンセルされた AsyncResponse
オブジェクトに対応できるようになります。以下に例を示します。
// Java ... @Path("/bookstore") public class BookContinuationStore { ... private void sendRequestToThreadPool(final String id, final AsyncResponse response) { executor.execute(new Runnable() { public void run() { if ( !response.isCancelled() ) { // Process the suspended request ... // ... } } }); } ... }
48.5.3. 切断された接続の処理
概要
クライアント接続が失われた場合に対処するためにコールバックを追加することが可能です。
ConnectionCallback インターフェイス
切断されたコネクションのコールバックを追加するには、以下のように javax.ws.rs.container.ConnectionCallback
インターフェイスを実装する必要があります。
// Java package javax.ws.rs.container; public interface ConnectionCallback { public void onDisconnect(AsyncResponse disconnected); }
接続コールバックの登録
接続コールバックを実装したら、register
メソッドの 1 つを呼び出して、現在の AsyncResponse
オブジェクトに登録する必要があります。たとえば、型 MyConnectionCallback
の接続コールバックを登録するには、次のコマンドを実行します。
asyncResponse.register(new MyConnectionCallback());
接続コールバックの一般的なシナリオ
通常、接続コールバックを実装する主な理由は、切断されたクライアントコネクション (解放が必要なリソースを識別するためのキーとして AsyncResponse
インスタンスを使える) に関連付けられたリソースを解放することです。
48.5.4. コールバックの登録
概要
オプションで、呼び出しが完了したときに通知されるように、AsyncResponse
インスタンスにコールバックを追加できます。このコールバックの呼び出しが可能な場合の処理には、代替えポイントが 2 つあります。
- 要求の処理が完了し、応答がすでにクライアントに送信された後。
-
リクエスト処理を終了して、マッピングされていない
Throwable
がホスト I/O コンテナーに伝播された後。
CompletionCallback インターフェイス
完了コールバックを追加するには、以下のように定義された javax.ws.rs.container.CompletionCallback
インターフェイスを実装する必要があります。
// Java package javax.ws.rs.container; public interface CompletionCallback { public void onComplete(Throwable throwable); }
通常、throwable
引数は null
です。ただし、リクエスト処理によってマッピングされていない例外が発生した場合は、throwable
にはマッピングされていない例外インスタンスが含まれます。
完了コールバックの登録
完了コールバックを実装したら、register
メソッドの 1 つを呼び出して、現在の AsyncResponse
オブジェクトに登録する必要があります。たとえば、型 MyCompletionCallback
の完了コールバックを登録するには、次のコマンドを実行します。
asyncResponse.register(new MyCompletionCallback());