48.5. 异步响应
48.5.1. 服务器上的异步处理
概述
在服务器端异步处理调用的目的是,可以更有效地利用线程,并最终避免客户端连接尝试所拒绝的情况,因为所有服务器的请求线程都会被阻断。异步处理调用时,该请求线程几乎会立即释放。
请注意,即使在服务器端启用了异步处理,客户端 仍会被阻止,直到它从服务器收到响应。如果您想在客户端中看到异步行为,则必须实施客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”。
异步处理的基本模型
图 48.1 “同步处理的线程模型” 显示了服务器端异步处理的基本模型概述。
图 48.1. 同步处理的线程模型
在提纲中,请求在异步模型中处理如下:
-
在请求线程内调用异步资源方法(并接收对
AsyncResponse
对象的引用),稍后需要发回响应。 -
资源方法将暂停的请求封装在可运行对象中,其中包含处理请求所需的所有信息和处理逻辑。
- 资源方法将 Runnable 对象推送到 executor 线程池的块队列中。
- 资源方法现在可以返回,从而释放请求线程。
-
当
Runnable
对象到达队列的顶部时,它由 executor 线程池中的一个线程处理。然后,使用封装的AsyncResponse
对象将响应发回到客户端。
使用 Java executor 实现线程池
java.util.concurrent
API 是一个功能强大的 API,可让您非常轻松地创建完整的线程池实施。在 Java 并发 API 的术语中,线程池被称为 executor。它只需要单行代码来创建完整的工作线程池,包括工作线程和源源的块队列。
例如,要创建类似 图 48.1 “同步处理的线程模型” 可执行文件的 Executor Thread 池的完整工作池,请创建一个 java.util.concurrent.Executor
实例,如下所示:
Executor executor = new ThreadPoolExecutor( 5, // Core pool size 5, // Maximum pool size 0, // Keep-alive time TimeUnit.SECONDS, // Time unit new ArrayBlockingQueue<Runnable>(10) // Blocking queue );
此构造器创建具有五个线程的新线程池,由单个块队列来容纳最多 10 个可运行的对象。若要将任务提交到线程池,请调用
executor.execute
方法,传入对可 Runnable
对象的引用(封装异步任务)。
定义异步资源方法
要定义异步的资源类型,请使用 @Suspended
注释注入类型为 javax.ws.rs.container.AsyncResponse
的参数,并确保方法返回 void
。例如:
// Java ... import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("{id}") public void handleRequestInPool(@PathParam("id") String id, @Suspended AsyncResponse response) { ... } ... }
请注意,资源方法必须返回 void
,因为注入的 AsyncResponse
对象将在以后返回响应。
AsyncResponse 类
javax.ws.rs.container.AsyncResponse
类在传入客户端连接中提供抽象句柄。当 AsyncResponse
对象注入资源方法时,底层 TCP 客户端连接最初处于 暂停 状态。稍后,当您准备好返回响应时,您可以通过在 AsyncResponse
实例中调用 恢复
来重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,可以在 AsyncResponse
实例上调用 取消
。
将暂停请求封装为可运行
在 图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您可以将暂停的请求推送到队列,稍后可在其中被专用线程池处理。但是,为了使这种方法能够工作,您需要以某种方式 封装 对象中暂停的请求。暂停的请求对象需要封装以下事项:
- 来自传入请求的参数(若有)。
-
AsyncResponse
对象,它在传入客户端连接上提供一个句柄,以及发回响应的方法。 - 调用的逻辑。
封装这些内容的便捷方法是定义一个可运行的类来代表暂停请求,其中 Runnable
.run ()
方法封装了调用的逻辑。执行此操作的最最好方法是将 Runnable
作为本地类实施,如下例所示。
异步处理示例
要实施异步处理场景,资源方法的实现必须将可运行的对象(代表暂停的请求)传递给 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将
Runnable
类定义为本地类,如下例所示:
// Java package org.apache.cxf.systest.jaxrs; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.CompletionCallback; import javax.ws.rs.container.ConnectionCallback; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; import org.apache.cxf.phase.PhaseInterceptorChain; @Path("/bookstore") public class BookContinuationStore { private Map<String, String> books = new HashMap<String, String>(); private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); public BookContinuationStore() { init(); } ... @GET @Path("{id}") public void handleRequestInPool(final @PathParam("id") String id, final @Suspended AsyncResponse response) { executor.execute(new Runnable() { public void run() { // Retrieve the book data for 'id' // which is presumed to be a very slow, blocking operation // ... bookdata = ... // Re-activate the client connection with 'resume' // and send the 'bookdata' object as the response response.resume(bookdata); } }); } ... }
注意资源方法参数、id
和 响应
如何直接传递到可运行本地类的定义。这个特殊语法可让您直接在
Runnable.run ()
方法中使用资源方法参数,而无需在本地类中定义对应的字段。
要使这种特殊语法发挥作用,必须声明资源方法参数作为 最终性
(这意味着在方法实施中必须不更改它们)。
48.5.2. 超时和超时处理程序
概述
异步处理模型还支持在 REST 调用时实现超时。默认情况下,超时会导致将 HTTP 错误响应发送到客户端。但是,您还可以选择注册超时处理器回调,它可让您自定义对超时事件的响应。
设置没有处理程序的超时示例
要定义简单的调用超时,但不指定 timeout 处理程序,在 AsyncResponse
对象中调用 setTimeout
方法,如下例所示:
// Java // Java ... import java.util.concurrent.TimeUnit; ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/defaulttimeout") public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); // Optionally, send request to executor queue for processing // ... } ... }
请注意,您可以使用 java.util.concurrent.TimeUnit
类中的任何单元来指定超时值。前面的示例没有显示将请求发送到 executor 线程池的代码。如果您只想测试超时行为,可以在资源方法正文中包含对 async.SetTimeout
的调用,每次调用时会触发超时。
AsyncResponse.NO_TIMEOUT
值代表无限超时。
默认超时
默认情况下,如果触发调用超时,JAX-RS 运行时会引发 ServiceUnavailableException
异常,并发送 HTTP 错误响应,其状态为 503
。
TimeoutHandler 接口
如果要自定义超时行为,您必须通过实施 TimeoutHandler
接口来定义超时处理程序:
// Java package javax.ws.rs.container; public interface TimeoutHandler { public void handleTimeout(AsyncResponse asyncResponse); }
当您覆盖实现类中的 handleTimeout
方法时,您可以选择以下方法来处理超时:
-
通过调用
asyncResponse.cancel
方法取消响应。 -
通过用响应值调用
asyncResponse.resume
方法来发送响应。 -
通过调用
asyncResponse.setTimeout
方法来扩展等待的时间。(例如,要等待 10 秒,您可以调用asyncResponse.setTimeout (10, TimeUnit.SECONDS)
)。
使用处理程序设置超时示例
要使用超时处理程序定义调用超时,请在 AsyncResponse
对象中调用 setTimeout
Handler 方法和 setTimeoutHandler
方法,如下例所示:
// Java ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/cancel") public void getBookDescriptionWithCancel(@PathParam("id") String id, @Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); async.setTimeoutHandler(new CancelTimeoutHandlerImpl()); // Optionally, send request to executor queue for processing // ... } ... }
在本例中注册一个 CancelTimeoutHandlerImpl
timeout 处理程序实例,以处理调用超时。
使用超时处理程序取消响应
CancelTimeoutHandlerImpl
timeout 处理程序定义如下:
// Java ... import javax.ws.rs.container.AsyncResponse; ... import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... private class CancelTimeoutHandlerImpl implements TimeoutHandler { @Override public void handleTimeout(AsyncResponse asyncResponse) { asyncResponse.cancel(); } } ... }
在 AsyncResponse
对象上调用 取消
的效果是为客户端发送 HTTP 503 (服务不可用
)错误响应。您可以选择为 cancel
方法指定参数( int
或 java.util.Date
值),该值将用于在响应消息中设置 Retry-After:
HTTP 标头。但是,客户端通常忽略 Retry-After:
标头。
在可运行的实例中处理取消响应
如果您将暂停请求封装为可运行的实例,它会在 executor 线程池中排队处理,您可能会发现线程池围绕处理请求时已取消 AsyncResponse
已取消。因此,您很难将一些代码添加到可运行的实例,使它能处理已取消的
AsyncResponse
对象。例如:
// Java ... @Path("/bookstore") public class BookContinuationStore { ... private void sendRequestToThreadPool(final String id, final AsyncResponse response) { executor.execute(new Runnable() { public void run() { if ( !response.isCancelled() ) { // Process the suspended request ... // ... } } }); } ... }
48.5.3. 处理丢弃连接
概述
可以添加回调来处理客户端连接丢失的情况。
ConnectionCallback 接口
要为丢弃的连接添加回调,您必须实施 javax.ws.rs.container.ConnectionCallback
接口,该接口定义如下:
// Java package javax.ws.rs.container; public interface ConnectionCallback { public void onDisconnect(AsyncResponse disconnected); }
注册连接回调
在实施了连接回调后,必须通过调用其中一个寄存器方法,将其注册到
当前的 AsyncResponse
对象。例如,注册类型为 MyConnectionCallback
的连接回调:
asyncResponse.register(new MyConnectionCallback());
连接回调的典型场景
通常,实施连接回调的主要原因是释放与丢弃客户端连接关联的资源(您可以使用 AsyncResponse
实例作为密钥来识别需要释放的资源)。
48.5.4. 注册回调
概述
您可以选择将回调添加到 AsyncResponse
实例,以便在调用完成时获得通知。当可以调用这个回调时,可以调用以下两种替代点:
- 请求处理完成后,响应已发回到客户端,或者
-
请求处理完成后,会将未映射的
Throwable
传播到托管 I/O 容器。
CompletionCallback 接口
要添加完成回调,您必须实施 javax.ws.rs.container.CompletionCallback
接口,该接口定义如下:
// Java package javax.ws.rs.container; public interface CompletionCallback { public void onComplete(Throwable throwable); }
通常,可调参数为
null
。但是,如果请求处理导致未映射异常,则会 抛出
包含未映射异常实例。
注册完成回调
实施完成回调后,必须通过调用其中一个寄存器方法,将其注册到
当前的 AsyncResponse
对象。例如,注册类型为 MyCompletionCallback
的完成回调:
asyncResponse.register(new MyCompletionCallback());