48.5. 非同期応答


48.5.1. サーバー上の非同期処理

概要

サーバー側での呼び出しの非同期処理の目的は、スレッドをより効率的に使用できるようにすることで、最終的に、サーバーの要求スレッドがブロックされていることが原因でクライアントの接続試行が拒否されるというシナリオを回避することです。呼び出しが非同期的に処理されると、要求スレッドはほぼすぐに解放されます。

注記

サーバー側で非同期処理が有効になっている場合でも、サーバーから応答を受信するまでクライアントは、ブロックされたまま になることに注意してください。クライアント側で非同期動作を実行するには、クライアント側の非同期処理を実装する必要があります。「クライアントの非同期処理」 を参照してください。

非同期処理の基本モデル

図48.1「非同期処理のスレッドモデル」 は、サーバー側での非同期処理の基本モデルの概要を示しています。

図48.1 非同期処理のスレッドモデル

asyncresponse 01

つまり、リクエストは非同期モデルで以下のように処理されます。

  1. 非同期リソースメソッドはリクエストスレッド内で呼び出されます (その後応答の送信に必要な AsyncResponse オブジェクトへの参照を受け取ります) 。
  2. リソースメソッドは、Runnable オブジェクトで一時停止されたリクエストをカプセル化します。これには、リクエストを処理するのに必要なすべての情報および処理ロジックが含まれます。
  3. リソースメソッドは、Runnable オブジェクトをエグゼキュータースレッドプールのブロッキングキューにプッシュします。
  4. リソースメソッドが返されるようになったため、要求スレッドが解放されます。
  5. Runnable オブジェクトがキューの先頭に到達すると、エグゼキュータースレッドプールのスレッドの 1 つによって処理されます。カプセル化された AsyncResponse オブジェクトは、応答をクライアントに送り返すために使用されます。

Java エグゼキューターを使用したスレッドプールの実装

java.util.concurrent API は、完全なスレッドプールの実装を非常に簡単に作成できる強力な API です。Java 同時実行 API の用語では、スレッドプールは エグゼキューター と呼ばれます。作業スレッドやそれを提供するブロッキングキューなど、完全な作業スレッドプールを作成するには、コード 1 行のみが必要です。

たとえば、図48.1「非同期処理のスレッドモデル」 に記載されている Executor Thread Pool のような完全な作業スレッドプールを作成するには、以下のように java.util.concurrent.Executor インスタンスを作成します。

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

このコンストラクタは、5 つのスレッドを持つ新しいスレッドプールを作成し、最大 10 個の Runnable オブジェクトを保持できる単一のブロッキングキューによって供給されます。スレッドプールにタスクを送信するには、executor.execute メソッドを呼び出して、Runnable オブジェクト (非同期タスクをカプセル化する) への参照を渡します。

非同期リソースメソッドの定義

非同期であるリソースメソッドを定義するには、@Suspended アノテーションを使用して型 javax.ws.rs.container.AsyncResponse の引数を注入し、メソッドが void を返すことを確認します。以下に例を示します。

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

注入された AsyncResponse オブジェクトは後でレスポンスを返すために使用されるため、リソースメソッドは void を返す必要があることに注意してください。

AsyncResponse クラス

javax.ws.rs.container.AsyncResponse クラスは、受信クライアントコネクションにおける抽象ハンドルを提供します。AsyncResponse オブジェクトがリソースメソッドに注入されると、ベースとなる TCP クライアントコネクションは最初は 一時停止 状態になります。後で応答を返す準備ができたら、AsyncResponse インスタンスで resume を呼び出すことで、基礎となる TCP クライアントコネクションを再度アクティブにし、レスポンスを返すことができます。または、呼び出しを中止する必要がある場合は、AsyncResponse インスタンスで cancel を呼び出すことができます。

一時停止した要求の Runnable としてのカプセル化

図48.1「非同期処理のスレッドモデル」 の非同期処理シナリオでは、一時停止された要求をキューにプッシュします。そのキューから、後で専用のスレッドプールで処理できます。ただし、このアプローチを機能させるには、オブジェクトで一時停止された要求を カプセル化 する方法が必要です。一時停止された要求オブジェクトは以下をカプセル化する必要があります。

  • 受信要求からのパラメーター (存在する場合)。
  • AsyncResponse オブジェクト。受信クライアントコネクションのハンドルと、レスポンスの返信方法を提供します。
  • 呼び出しのロジック。

これらをカプセル化する便利な方法は、Runnable クラスを定義して一時停止されたリクエストを表します。ここで、Runnable.run() メソッドは呼び出しのロジックをカプセル化します。これを実行する最も一般的な方法は、以下の例のように Runnable をローカルクラスとして実装することです。

非同期処理の例

非同期処理シナリオを実装するには、リソースメソッドの実装は Runnable オブジェクト (一時停止されたリクエストを表します) をエグゼキュータースレッドプールに渡す必要があります。Java 7 および 8 では、次の例に示すように、いくつかの新しい構文を利用して、Runnable クラスをローカルクラスとして定義できます。

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

リソースメソッド引数 id および response が、Runnable ローカルクラスの定義に直接渡される方法に注意してください。この特別な構文を使用すると、ローカルクラスの対応するフィールドを定義しなくても、Runnable.run() メソッドでリソースメソッドの引数を直接使用できます。

重要

この特別な構文を機能させるには、リソースメソッドパラメーターを final として宣言する 必要 があります (メソッド実装で変更できないことを意味します)。

48.5.2. タイムアウトおよびタイムアウトハンドラー

概要

非同期処理モデルには、REST 呼び出しにタイムアウトを指定するサポートがあります。デフォルトでは、タイムアウトになると HTTP エラー応答がクライアントに送信されます。ただし、タイムアウトハンドラーコールバックを登録するオプションもあります。これにより、タイムアウトイベントへの応答をカスタマイズできます。

ハンドラーなしでタイムアウトを設定する例

タイムアウトハンドラーを指定せずに単純な呼び出しタイムアウトを定義するには、以下の例のように AsyncResponse オブジェクトで setTimeout メソッドを呼び出します。

// Java
// Java
...
import java.util.concurrent.TimeUnit;
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/defaulttimeout")
    public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

java.util.concurrent.TimeUnit クラスの任意の時間単位を使用して、タイムアウト値を指定できることに注意してください。上記の例では、要求をエグゼキュータースレッドプールに送信するコードは提示されていません。タイムアウトの動作をテストするだけであれば、リソースメソッド本文に async.SetTimeout への呼び出しのみを含めると、タイムアウトは呼び出しごとにトリガーされます。

AsyncResponse.NO_TIMEOUT の値は無限のタイムアウトを表します。

デフォルトのタイムアウト動作

デフォルトでは、呼び出しタイムアウトがトリガーされると、JAX-RS ランタイムが ServiceUnavailableException 例外を発生させ、ステータス 503 で HTTP エラーの応答を返します。

TimeoutHandler インターフェイス

タイムアウトの動作をカスタマイズする場合は、TimeoutHandler インターフェイスを実装してタイムアウトハンドラーを定義する必要があります。

// Java
package javax.ws.rs.container;

public interface TimeoutHandler {
    public void handleTimeout(AsyncResponse asyncResponse);
}

実装クラスで handleTimeout メソッドを上書きする場合は、タイムアウトを処理する次の方法のいずれかを選択できます。

  • asyncResponse.cancel メソッドを呼び出すことで、レスポンスを取り消します。
  • レスポンス値で asyncResponse.resume メソッドを呼び出すことで、レスポンスを送信します。
  • asyncResponse.setTimeout メソッドを呼び出すことで、待機期間を延長します。たとえば、さらに 10 秒間待つには、asyncResponse.setTimeout(10, TimeUnit.SECONDS) を呼び出しできます。

ハンドラーでタイムアウトを設定する例

タイムアウトハンドラーで呼び出しタイムアウトを定義するには、以下の例のように AsyncResponse オブジェクトの setTimeout メソッドと setTimeoutHandler メソッドの両方を呼び出します。

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/cancel")
    public void getBookDescriptionWithCancel(@PathParam("id") String id,
                                             @Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        async.setTimeoutHandler(new CancelTimeoutHandlerImpl());
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

この例では、呼び出しタイムアウトを処理するために CancelTimeoutHandlerImpl タイムアウトハンドラーのインスタンスを登録します。

タイムアウトハンドラーを使用した応答の取り消し

CancelTimeoutHandlerImpl タイムアウトハンドラーは以下のように定義されます。

// Java
...
import javax.ws.rs.container.AsyncResponse;
...
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    private class CancelTimeoutHandlerImpl implements TimeoutHandler {

        @Override
        public void handleTimeout(AsyncResponse asyncResponse) {
            asyncResponse.cancel();
        }

    }
    ...
}

AsyncResponse オブジェクト上で cancel を呼び出す効果は、クライアントに HTTP 503 (Service unavailable) エラーの応答を送信することです。任意で、cancel メソッド (int または java.util.Date の値) の引数を指定できます。これは応答メッセージで Retry-After: HTTP ヘッダーを設定するために使用されます。ただし、クライアントは多くの場合で Retry-After: ヘッダーを無視します。

Runnable インスタンスでの取り消し済みの応答への対応

エグゼキュータースレッドプールで処理のためにキューに格納された Runnable インスタンスとして、一時停止されたリクエストをカプセル化した場合、スレッドプールがリクエストを処理するまでに AsyncResponse がキャンセルされる可能性があります。このため、Runnable インスタンスにコードを追加する必要があります。これにより、キャンセルされた AsyncResponse オブジェクトに対応できるようになります。以下に例を示します。

// Java
...
@Path("/bookstore")
public class BookContinuationStore {
    ...
    private void sendRequestToThreadPool(final String id, final AsyncResponse response) {

        executor.execute(new Runnable() {
            public void run() {
                if ( !response.isCancelled() ) {
                    // Process the suspended request ...
                    // ...
                }
            }
        });

    }
    ...
}

48.5.3. 切断された接続の処理

概要

クライアント接続が失われた場合に対処するためにコールバックを追加することが可能です。

ConnectionCallback インターフェイス

切断されたコネクションのコールバックを追加するには、以下のように javax.ws.rs.container.ConnectionCallback インターフェイスを実装する必要があります。

// Java
package javax.ws.rs.container;

public interface ConnectionCallback {
    public void onDisconnect(AsyncResponse disconnected);
}

接続コールバックの登録

接続コールバックを実装したら、register メソッドの 1 つを呼び出して、現在の AsyncResponse オブジェクトに登録する必要があります。たとえば、型 MyConnectionCallback の接続コールバックを登録するには、次のコマンドを実行します。

asyncResponse.register(new MyConnectionCallback());

接続コールバックの一般的なシナリオ

通常、接続コールバックを実装する主な理由は、切断されたクライアントコネクション (解放が必要なリソースを識別するためのキーとして AsyncResponse インスタンスを使える) に関連付けられたリソースを解放することです。

48.5.4. コールバックの登録

概要

オプションで、呼び出しが完了したときに通知されるように、AsyncResponse インスタンスにコールバックを追加できます。このコールバックの呼び出しが可能な場合の処理には、代替えポイントが 2 つあります。

  • 要求の処理が完了し、応答がすでにクライアントに送信された後。
  • リクエスト処理を終了して、マッピングされていない Throwable がホスト I/O コンテナーに伝播された後。

CompletionCallback インターフェイス

完了コールバックを追加するには、以下のように定義された javax.ws.rs.container.CompletionCallback インターフェイスを実装する必要があります。

// Java
package javax.ws.rs.container;

public interface CompletionCallback {
    public void onComplete(Throwable throwable);
}

通常、throwable 引数は null です。ただし、リクエスト処理によってマッピングされていない例外が発生した場合は、throwable にはマッピングされていない例外インスタンスが含まれます。

完了コールバックの登録

完了コールバックを実装したら、register メソッドの 1 つを呼び出して、現在の AsyncResponse オブジェクトに登録する必要があります。たとえば、型 MyCompletionCallback の完了コールバックを登録するには、次のコマンドを実行します。

asyncResponse.register(new MyCompletionCallback());
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.