48.5. 异步响应


48.5.1. 服务器上的异步处理

概述

在服务器端异步处理调用的目的是,可以更有效地使用线程,并最终避免客户端连接尝试被拒绝的情况,因为所有服务器的请求线程都被阻止。异步处理调用时,请求线程将立即释放。

注意

请注意,即使在服务器端启用了异步处理,客户端 仍会被阻止,直到它从服务器收到响应为止。如果要在客户端中看到异步行为,您必须实施客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”

用于异步处理的基本模型

图 48.1 “同步处理的线程模型” 显示了在服务器端异步处理的基本模型概述。

图 48.1. 同步处理的线程模型

asyncresponse 01

简而言之,请求在异步模型中按如下方式进行处理:

  1. 在请求线程中调用异步资源方法(并接收对 AsyncResponse 对象的引用,稍后需要发送响应)。
  2. 资源方法将暂停的请求封装在 Runnable 对象中,其中包含处理请求所需的所有信息和处理逻辑。
  3. 资源方法将 Runnable 对象推送到 executor 线程池的阻塞队列中。
  4. 资源方法现在可以返回,从而释放请求线程。
  5. Runnable 对象进入队列的顶部时,它由 executor 线程池中的一个线程处理。然后,使用封装的 AsyncResponse 对象将响应发回到客户端。

使用 Java executor 实现的线程池

java.util.concurrent API 是一个强大的 API,可让您非常轻松地创建完整的线程池实现。在 Java 并发 API 的术语中,线程池称为 executor。它只需要一行代码来创建完整的工作线程池,包括工作线程和提供它们的阻塞队列。

例如,要创建一个完整的工作线程池,如 图 48.1 “同步处理的线程模型” 中显示的 Executor Thread Pool,请创建一个 java.util.concurrent.Executor 实例,如下所示:

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

此构造器创建一个具有五个线程的新线程池,由单个阻塞队列 fed 处理,最多可保存 10 个 可运行的 对象。要将任务提交到线程池,请调用 executor.execute 方法,传递对 Runnable 对象(封装异步任务)的引用。

定义异步资源方法

要定义异步的资源方法,请使用 @Suspended 注释注入 javax.ws.rs.container.AsyncResponse 类型的参数,并确保方法返回 void。例如:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

请注意,资源方法必须返回 void,因为注入的 AsyncResponse 对象将在以后返回响应。

AsyncResponse 类

javax.ws.rs.container.AsyncResponse 类在传入客户端连接上提供了一个抽象处理。当 AsyncResponse 对象注入资源方法时,底层 TCP 客户端连接最初处于 暂停状态。稍后,当您准备好返回响应时,您可以通过调用 AsyncResponse 实例在 AsyncResponse 实例中重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,您可以在 AsyncResponse 实例上调用 cancel

将暂停请求封装为 Runnable

图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您将暂停的请求推送到队列,从中可以在稍后由专用线程池处理它。但是,为了实现这种方法,您需要有某种方式 在对象中封装 暂停的请求。暂停的请求对象需要封装以下事项:

  • 传入请求中的参数(若有)。
  • AsyncResponse 对象,它为传入的客户端连接提供句柄,以及发回响应的方法。
  • 调用的逻辑。

封装这些事项的一种便捷方法是定义一个 可运行的 类来代表暂停的请求,其中 Runnable.run () 方法封装了调用的逻辑。执行此操作的最简单方法是实现 Runnable 作为 本地类,如下例所示。

异步处理示例

要实现异步处理场景,资源方法的实现必须将 Runnable 对象(代表暂停的请求)传递给 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将 Runnable 类定义为本地类,如下例所示:

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

请注意,资源方法参数 idresponse 如何直接传递给 Runnable 本地类的定义。这个特殊语法可让您直接在 Runnable.run () 方法中使用资源方法参数,而无需在本地类中定义对应的字段。

重要

要使这种特殊语法正常工作,资源方法参数 必须 声明为 最终 (这意味着在方法实施中不得更改这些资源)。

48.5.2. 超时和超时处理程序

概述

异步处理模型还支持在 REST 调用时进行超时。默认情况下,超时会导致 HTTP 错误响应发送到客户端。但是,您也可以选择注册超时处理程序回调,供您自定义超时事件的响应。

在没有处理程序的情况下设置超时示例

要定义简单的调用超时,而不指定超时处理程序,在 AsyncResponse 对象中调用 setTimeout 方法,如下例所示:

// Java
// Java
...
import java.util.concurrent.TimeUnit;
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/defaulttimeout")
    public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

请注意,您可以使用 java.util.concurrent.TimeUnit 类中的任何时间单位指定超时值。前面的示例没有显示将请求发送到 executor 线程池的代码。如果您只想测试超时的行为,您可以在资源方法正文中包含对 async.SetTimeout 的调用,并且每次调用时都会触发超时。

AsyncResponse.NO_TIMEOUT 值代表一个无限超时。

默认超时行为

默认情况下,如果触发了调用超时,JAX-RS 运行时会引发 ServiceUnavailableException 异常,并发回一个状态为 503 的 HTTP 错误响应。

TimeoutHandler 接口

如果要自定义超时行为,您必须通过实施 TimeoutHandler 接口来定义超时处理器:

// Java
package javax.ws.rs.container;

public interface TimeoutHandler {
    public void handleTimeout(AsyncResponse asyncResponse);
}

当您在实现类中覆盖 handleTimeout 方法时,您可以选择以下方法来处理超时:

  • 通过调用 asyncResponse.cancel 方法取消响应。
  • 使用响应值调用 asyncResponse.resume 方法来发送响应。
  • 通过调用 asyncResponse.setTimeout 方法来扩展等待周期。(例如,若要等待 10 秒,您可以调用 asyncResponse.setTimeout (10, TimeUnit.SECONDS))。

使用处理程序设置超时示例

要使用超时处理程序定义调用超时,在 AsyncResponse 对象中调用 setTimeout 方法和 setTimeoutHandler 方法,如下例所示:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/cancel")
    public void getBookDescriptionWithCancel(@PathParam("id") String id,
                                             @Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        async.setTimeoutHandler(new CancelTimeoutHandlerImpl());
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

其中,本例注册 CancelTimeoutHandlerImpl 超时处理程序的实例,以处理调用超时。

使用超时处理程序取消响应

CancelTimeoutHandlerImpl 超时处理程序定义如下:

// Java
...
import javax.ws.rs.container.AsyncResponse;
...
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    private class CancelTimeoutHandlerImpl implements TimeoutHandler {

        @Override
        public void handleTimeout(AsyncResponse asyncResponse) {
            asyncResponse.cancel();
        }

    }
    ...
}

AsyncResponse 对象上调用 cancel 的影响是向客户端发送 HTTP 503 (服务不可用)错误响应。您可以选择为 cancel 方法指定一个参数( intjava.util.Date 值),它将用于在响应消息中设置 Retry-After: HTTP 标头。但是,客户端通常会忽略 Retry-After: 标头。

在 Runnable 实例中处理取消的响应

如果您已将暂停请求封装为 Runnable 实例,该实例在 executor 线程池中排队以处理,您可能会发现,当线程池要处理请求时,您可能会发现 AsyncResponse 已取消。因此,您需要在 Runnable 实例中添加一些代码,这样就可以 使用取消的 AsyncResponse 对象来应对它。例如:

// Java
...
@Path("/bookstore")
public class BookContinuationStore {
    ...
    private void sendRequestToThreadPool(final String id, final AsyncResponse response) {

        executor.execute(new Runnable() {
            public void run() {
                if ( !response.isCancelled() ) {
                    // Process the suspended request ...
                    // ...
                }
            }
        });

    }
    ...
}

48.5.3. 处理丢弃的连接

概述

您可以添加回调来处理客户端连接丢失的情况。

ConnectionCallback 接口

要为丢弃的连接添加回调,您必须实施 javax.ws.rs.container.ConnectionCallback 接口,其定义如下:

// Java
package javax.ws.rs.container;

public interface ConnectionCallback {
    public void onDisconnect(AsyncResponse disconnected);
}

注册连接回调

在实施连接回调后,必须通过调用其中一个寄存器方法来将其注册到当前的 AsyncResponse 对象。例如,要注册类型为 MyConnectionCallback 的连接回调:

asyncResponse.register(new MyConnectionCallback());

连接回调的典型场景

通常,实施连接回调的主要原因是释放与丢弃客户端连接关联的资源(您可以使用 AsyncResponse 实例作为键来识别需要释放的资源)。

48.5.4. 注册回调

概述

您可以选择将回调添加到 AsyncResponse 实例,以便在调用完成后获得通知。当调用此回调时,处理中有两个替代点,其中之一:

  • 请求处理完成后,响应已发送到客户端,或者
  • 请求处理完成后,一个未映射的 Throwable 会被传播到托管 I/O 容器。

CompletionCallback 接口

要添加完成回调,您必须实施 javax.ws.rs.container.CompletionCallback 接口,其定义如下:

// Java
package javax.ws.rs.container;

public interface CompletionCallback {
    public void onComplete(Throwable throwable);
}

通常,可丢弃 的参数为 null。但是,如果请求处理导致异常异常,则 抛出 包含未映射的异常实例。

注册完成回调

在实施完成回调后,您必须通过调用其中一个寄存器方法来将其注册到当前的 AsyncResponse 对象。例如,要注册类型为 MyCompletionCallback 的完成回调:

asyncResponse.register(new MyCompletionCallback());
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.