48.5. 异步响应
48.5.1. 服务器上的异步处理
概述
在服务器端异步处理调用的目的是,可以更有效地利用线程,并最终避免客户端连接尝试所拒绝的情况,因为所有服务器的请求线程都会被阻断。异步处理调用时,该请求线程几乎会立即释放。
请注意,即使在服务器端启用了异步处理,客户端 仍会被阻止,直到它从服务器收到响应。如果您想在客户端中看到异步行为,则必须实施客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”。
异步处理的基本模型
图 48.1 “同步处理的线程模型” 显示了服务器端异步处理的基本模型概述。
图 48.1. 同步处理的线程模型
在提纲中,请求在异步模型中处理如下:
-
在请求线程内调用异步资源方法(并接收对
AsyncResponse
对象的引用),稍后需要发回响应。 -
资源方法将暂停的请求封装在可运行对象中,其中包含处理请求所需的所有信息和处理逻辑。
- 资源方法将 Runnable 对象推送到 executor 线程池的块队列中。
- 资源方法现在可以返回,从而释放请求线程。
-
当
Runnable
对象到达队列的顶部时,它由 executor 线程池中的一个线程处理。然后,使用封装的AsyncResponse
对象将响应发回到客户端。
使用 Java executor 实现线程池
java.util.concurrent
API 是一个功能强大的 API,可让您非常轻松地创建完整的线程池实施。在 Java 并发 API 的术语中,线程池被称为 executor。它只需要单行代码来创建完整的工作线程池,包括工作线程和源源的块队列。
例如,要创建类似 图 48.1 “同步处理的线程模型” 可执行文件的 Executor Thread 池的完整工作池,请创建一个 java.util.concurrent.Executor
实例,如下所示:
Executor executor = new ThreadPoolExecutor( 5, // Core pool size 5, // Maximum pool size 0, // Keep-alive time TimeUnit.SECONDS, // Time unit new ArrayBlockingQueue<Runnable>(10) // Blocking queue );
此构造器创建具有五个线程的新线程池,由单个块队列来容纳最多 10 个可运行的对象。若要将任务提交到线程池,请调用
executor.execute
方法,传入对可 Runnable
对象的引用(封装异步任务)。
定义异步资源方法
要定义异步的资源类型,请使用 @Suspended
注释注入类型为 javax.ws.rs.container.AsyncResponse
的参数,并确保方法返回 void
。例如:
// Java ... import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("{id}") public void handleRequestInPool(@PathParam("id") String id, @Suspended AsyncResponse response) { ... } ... }
请注意,资源方法必须返回 void
,因为注入的 AsyncResponse
对象将在以后返回响应。
AsyncResponse 类
javax.ws.rs.container.AsyncResponse
类在传入客户端连接中提供抽象句柄。当 AsyncResponse
对象注入资源方法时,底层 TCP 客户端连接最初处于 暂停 状态。稍后,当您准备好返回响应时,您可以通过在 AsyncResponse
实例中调用 恢复
来重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,可以在 AsyncResponse
实例上调用 取消
。
将暂停请求封装为可运行
在 图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您可以将暂停的请求推送到队列,稍后可在其中被专用线程池处理。但是,为了使这种方法能够工作,您需要以某种方式 封装 对象中暂停的请求。暂停的请求对象需要封装以下事项:
- 来自传入请求的参数(若有)。
-
AsyncResponse
对象,它在传入客户端连接上提供一个句柄,以及发回响应的方法。 - 调用的逻辑。
封装这些内容的便捷方法是定义一个可运行的类来代表暂停请求,其中 Runnable
.run ()
方法封装了调用的逻辑。执行此操作的最最好方法是将 Runnable
作为本地类实施,如下例所示。
异步处理示例
要实施异步处理场景,资源方法的实现必须将可运行的对象(代表暂停的请求)传递给 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将
Runnable
类定义为本地类,如下例所示:
// Java package org.apache.cxf.systest.jaxrs; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.CompletionCallback; import javax.ws.rs.container.ConnectionCallback; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; import org.apache.cxf.phase.PhaseInterceptorChain; @Path("/bookstore") public class BookContinuationStore { private Map<String, String> books = new HashMap<String, String>(); private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); public BookContinuationStore() { init(); } ... @GET @Path("{id}") public void handleRequestInPool(final @PathParam("id") String id, final @Suspended AsyncResponse response) { executor.execute(new Runnable() { public void run() { // Retrieve the book data for 'id' // which is presumed to be a very slow, blocking operation // ... bookdata = ... // Re-activate the client connection with 'resume' // and send the 'bookdata' object as the response response.resume(bookdata); } }); } ... }
注意资源方法参数、id
和 响应
如何直接传递到可运行本地类的定义。这个特殊语法可让您直接在
Runnable.run ()
方法中使用资源方法参数,而无需在本地类中定义对应的字段。
要使这种特殊语法发挥作用,必须声明资源方法参数作为 最终性
(这意味着在方法实施中必须不更改它们)。