48.5. 异步响应
48.5.1. 服务器上的异步处理
概述
在服务器端异步处理调用的目的是,可以更有效地使用线程,并最终避免客户端连接尝试被拒绝的情况,因为所有服务器的请求线程都被阻止。异步处理调用时,请求线程将立即释放。
请注意,即使在服务器端启用了异步处理,客户端 仍会被阻止,直到它从服务器收到响应为止。如果要在客户端中看到异步行为,您必须实施客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”。
用于异步处理的基本模型
图 48.1 “同步处理的线程模型” 显示了在服务器端异步处理的基本模型概述。
图 48.1. 同步处理的线程模型
简而言之,请求在异步模型中按如下方式进行处理:
-
在请求线程中调用异步资源方法(并接收对
AsyncResponse
对象的引用,稍后需要发送响应)。 -
资源方法将暂停的请求封装在
Runnable
对象中,其中包含处理请求所需的所有信息和处理逻辑。 - 资源方法将 Runnable 对象推送到 executor 线程池的阻塞队列中。
- 资源方法现在可以返回,从而释放请求线程。
-
当
Runnable
对象进入队列的顶部时,它由 executor 线程池中的一个线程处理。然后,使用封装的AsyncResponse
对象将响应发回到客户端。
使用 Java executor 实现的线程池
java.util.concurrent
API 是一个强大的 API,可让您非常轻松地创建完整的线程池实现。在 Java 并发 API 的术语中,线程池称为 executor。它只需要一行代码来创建完整的工作线程池,包括工作线程和提供它们的阻塞队列。
例如,要创建一个完整的工作线程池,如 图 48.1 “同步处理的线程模型” 中显示的 Executor Thread Pool,请创建一个 java.util.concurrent.Executor
实例,如下所示:
Executor executor = new ThreadPoolExecutor( 5, // Core pool size 5, // Maximum pool size 0, // Keep-alive time TimeUnit.SECONDS, // Time unit new ArrayBlockingQueue<Runnable>(10) // Blocking queue );
此构造器创建一个具有五个线程的新线程池,由单个阻塞队列 fed 处理,最多可保存 10 个 可运行的
对象。要将任务提交到线程池,请调用 executor.execute
方法,传递对 Runnable
对象(封装异步任务)的引用。
定义异步资源方法
要定义异步的资源方法,请使用 @Suspended
注释注入 javax.ws.rs.container.AsyncResponse
类型的参数,并确保方法返回 void
。例如:
// Java ... import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("{id}") public void handleRequestInPool(@PathParam("id") String id, @Suspended AsyncResponse response) { ... } ... }
请注意,资源方法必须返回 void
,因为注入的 AsyncResponse
对象将在以后返回响应。
AsyncResponse 类
javax.ws.rs.container.AsyncResponse
类在传入客户端连接上提供了一个抽象处理。当 AsyncResponse
对象注入资源方法时,底层 TCP 客户端连接最初处于 暂停状态。稍后,当您准备好返回响应时,您可以通过调用 AsyncResponse
实例在 AsyncResponse 实例中重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,您可以在
AsyncResponse
实例上调用 cancel
。
将暂停请求封装为 Runnable
在 图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您将暂停的请求推送到队列,从中可以在稍后由专用线程池处理它。但是,为了实现这种方法,您需要有某种方式 在对象中封装 暂停的请求。暂停的请求对象需要封装以下事项:
- 传入请求中的参数(若有)。
-
AsyncResponse
对象,它为传入的客户端连接提供句柄,以及发回响应的方法。 - 调用的逻辑。
封装这些事项的一种便捷方法是定义一个 可运行的
类来代表暂停的请求,其中 Runnable.run ()
方法封装了调用的逻辑。执行此操作的最简单方法是实现 Runnable 作为
本地类,如下例所示。
异步处理示例
要实现异步处理场景,资源方法的实现必须将 Runnable
对象(代表暂停的请求)传递给 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将 Runnable
类定义为本地类,如下例所示:
// Java package org.apache.cxf.systest.jaxrs; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.CompletionCallback; import javax.ws.rs.container.ConnectionCallback; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; import org.apache.cxf.phase.PhaseInterceptorChain; @Path("/bookstore") public class BookContinuationStore { private Map<String, String> books = new HashMap<String, String>(); private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); public BookContinuationStore() { init(); } ... @GET @Path("{id}") public void handleRequestInPool(final @PathParam("id") String id, final @Suspended AsyncResponse response) { executor.execute(new Runnable() { public void run() { // Retrieve the book data for 'id' // which is presumed to be a very slow, blocking operation // ... bookdata = ... // Re-activate the client connection with 'resume' // and send the 'bookdata' object as the response response.resume(bookdata); } }); } ... }
请注意,资源方法参数 id
和 response
如何直接传递给 Runnable
本地类的定义。这个特殊语法可让您直接在 Runnable.run ()
方法中使用资源方法参数,而无需在本地类中定义对应的字段。
要使这种特殊语法正常工作,资源方法参数 必须 声明为 最终
(这意味着在方法实施中不得更改这些资源)。
48.5.2. 超时和超时处理程序
概述
异步处理模型还支持在 REST 调用时进行超时。默认情况下,超时会导致 HTTP 错误响应发送到客户端。但是,您也可以选择注册超时处理程序回调,供您自定义超时事件的响应。
在没有处理程序的情况下设置超时示例
要定义简单的调用超时,而不指定超时处理程序,在 AsyncResponse
对象中调用 setTimeout
方法,如下例所示:
// Java // Java ... import java.util.concurrent.TimeUnit; ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/defaulttimeout") public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); // Optionally, send request to executor queue for processing // ... } ... }
请注意,您可以使用 java.util.concurrent.TimeUnit
类中的任何时间单位指定超时值。前面的示例没有显示将请求发送到 executor 线程池的代码。如果您只想测试超时的行为,您可以在资源方法正文中包含对 async.SetTimeout
的调用,并且每次调用时都会触发超时。
AsyncResponse.NO_TIMEOUT
值代表一个无限超时。
默认超时行为
默认情况下,如果触发了调用超时,JAX-RS 运行时会引发 ServiceUnavailableException
异常,并发回一个状态为 503
的 HTTP 错误响应。
TimeoutHandler 接口
如果要自定义超时行为,您必须通过实施 TimeoutHandler
接口来定义超时处理器:
// Java package javax.ws.rs.container; public interface TimeoutHandler { public void handleTimeout(AsyncResponse asyncResponse); }
当您在实现类中覆盖 handleTimeout
方法时,您可以选择以下方法来处理超时:
-
通过调用
asyncResponse.cancel
方法取消响应。 -
使用响应值调用
asyncResponse.resume
方法来发送响应。 -
通过调用
asyncResponse.setTimeout
方法来扩展等待周期。(例如,若要等待 10 秒,您可以调用asyncResponse.setTimeout (10, TimeUnit.SECONDS)
)。
使用处理程序设置超时示例
要使用超时处理程序定义调用超时,在 AsyncResponse
对象中调用 setTimeout
方法和 setTimeoutHandler
方法,如下例所示:
// Java ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/cancel") public void getBookDescriptionWithCancel(@PathParam("id") String id, @Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); async.setTimeoutHandler(new CancelTimeoutHandlerImpl()); // Optionally, send request to executor queue for processing // ... } ... }
其中,本例注册 CancelTimeoutHandlerImpl
超时处理程序的实例,以处理调用超时。
使用超时处理程序取消响应
CancelTimeoutHandlerImpl
超时处理程序定义如下:
// Java ... import javax.ws.rs.container.AsyncResponse; ... import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... private class CancelTimeoutHandlerImpl implements TimeoutHandler { @Override public void handleTimeout(AsyncResponse asyncResponse) { asyncResponse.cancel(); } } ... }
在 AsyncResponse
对象上调用 cancel
的影响是向客户端发送 HTTP 503 (服务不可用
)错误响应。您可以选择为 cancel
方法指定一个参数( int
或 java.util.Date
值),它将用于在响应消息中设置 Retry-After:
HTTP 标头。但是,客户端通常会忽略 Retry-After:
标头。
在 Runnable 实例中处理取消的响应
如果您已将暂停请求封装为 Runnable
实例,该实例在 executor 线程池中排队以处理,您可能会发现,当线程池要处理请求时,您可能会发现 AsyncResponse
已取消。因此,您需要在 Runnable 实例中添加一些代码,这样就可以
使用取消的 AsyncResponse
对象来应对它。例如:
// Java ... @Path("/bookstore") public class BookContinuationStore { ... private void sendRequestToThreadPool(final String id, final AsyncResponse response) { executor.execute(new Runnable() { public void run() { if ( !response.isCancelled() ) { // Process the suspended request ... // ... } } }); } ... }
48.5.3. 处理丢弃的连接
概述
您可以添加回调来处理客户端连接丢失的情况。
ConnectionCallback 接口
要为丢弃的连接添加回调,您必须实施 javax.ws.rs.container.ConnectionCallback
接口,其定义如下:
// Java package javax.ws.rs.container; public interface ConnectionCallback { public void onDisconnect(AsyncResponse disconnected); }
注册连接回调
在实施连接回调后,必须通过调用其中一个寄存器方法来将其注册到当前的 AsyncResponse
对象。例如,要注册类型为
MyConnectionCallback
的连接回调:
asyncResponse.register(new MyConnectionCallback());
连接回调的典型场景
通常,实施连接回调的主要原因是释放与丢弃客户端连接关联的资源(您可以使用 AsyncResponse
实例作为键来识别需要释放的资源)。
48.5.4. 注册回调
概述
您可以选择将回调添加到 AsyncResponse
实例,以便在调用完成后获得通知。当调用此回调时,处理中有两个替代点,其中之一:
- 请求处理完成后,响应已发送到客户端,或者
-
请求处理完成后,一个未映射的
Throwable
会被传播到托管 I/O 容器。
CompletionCallback 接口
要添加完成回调,您必须实施 javax.ws.rs.container.CompletionCallback
接口,其定义如下:
// Java package javax.ws.rs.container; public interface CompletionCallback { public void onComplete(Throwable throwable); }
通常,可丢弃
的参数为 null
。但是,如果请求处理导致异常异常,则 抛出
包含未映射的异常实例。
注册完成回调
在实施完成回调后,您必须通过调用其中一个寄存器方法来将其注册到当前的 AsyncResponse
对象。例如,要注册类型为
MyCompletionCallback
的完成回调:
asyncResponse.register(new MyCompletionCallback());