48.5. 异步响应


48.5.1. 服务器上的异步处理

概述

在服务器端异步处理调用的目的是,可以更有效地利用线程,并最终避免客户端连接尝试所拒绝的情况,因为所有服务器的请求线程都会被阻断。异步处理调用时,该请求线程几乎会立即释放。

注意

请注意,即使在服务器端启用了异步处理,客户端 仍会被阻止,直到它从服务器收到响应。如果您想在客户端中看到异步行为,则必须实施客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”

异步处理的基本模型

图 48.1 “同步处理的线程模型” 显示了服务器端异步处理的基本模型概述。

图 48.1. 同步处理的线程模型

asyncresponse 01

在提纲中,请求在异步模型中处理如下:

  1. 在请求线程内调用异步资源方法(并接收对 AsyncResponse 对象的引用),稍后需要发回响应。
  2. 资源方法将暂停的请求封装在可运行对象中,其中包含处理请求所需的所有信息和处理逻辑。
  3. 资源方法将 Runnable 对象推送到 executor 线程池的块队列中。
  4. 资源方法现在可以返回,从而释放请求线程。
  5. Runnable 对象到达队列的顶部时,它由 executor 线程池中的一个线程处理。然后,使用封装的 AsyncResponse 对象将响应发回到客户端。

使用 Java executor 实现线程池

java.util.concurrent API 是一个功能强大的 API,可让您非常轻松地创建完整的线程池实施。在 Java 并发 API 的术语中,线程池被称为 executor。它只需要单行代码来创建完整的工作线程池,包括工作线程和源源的块队列。

例如,要创建类似 图 48.1 “同步处理的线程模型” 可执行文件的 Executor Thread 池的完整工作池,请创建一个 java.util.concurrent.Executor 实例,如下所示:

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

此构造器创建具有五个线程的新线程池,由单个块队列来容纳最多 10 个可运行的对象。若要将任务提交到线程池,请调用 executor.execute 方法,传入对可 Runnable 对象的引用(封装异步任务)。

定义异步资源方法

要定义异步的资源类型,请使用 @Suspended 注释注入类型为 javax.ws.rs.container.AsyncResponse 的参数,并确保方法返回 void。例如:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

请注意,资源方法必须返回 void,因为注入的 AsyncResponse 对象将在以后返回响应。

AsyncResponse 类

javax.ws.rs.container.AsyncResponse 类在传入客户端连接中提供抽象句柄。当 AsyncResponse 对象注入资源方法时,底层 TCP 客户端连接最初处于 暂停 状态。稍后,当您准备好返回响应时,您可以通过在 AsyncResponse 实例中调用 恢复 来重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,可以在 AsyncResponse 实例上调用 取消

将暂停请求封装为可运行

图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您可以将暂停的请求推送到队列,稍后可在其中被专用线程池处理。但是,为了使这种方法能够工作,您需要以某种方式 封装 对象中暂停的请求。暂停的请求对象需要封装以下事项:

  • 来自传入请求的参数(若有)。
  • AsyncResponse 对象,它在传入客户端连接上提供一个句柄,以及发回响应的方法。
  • 调用的逻辑。

封装这些内容的便捷方法是定义一个可运行的类来代表暂停请求,其中 Runnable .run () 方法封装了调用的逻辑。执行此操作的最最好方法是将 Runnable 作为本地类实施,如下例所示。

异步处理示例

要实施异步处理场景,资源方法的实现必须将可运行的对象(代表暂停的请求)传递给 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将 Runnable 类定义为本地类,如下例所示:

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

注意资源方法参数、id响应 如何直接传递到可运行本地类的定义。这个特殊语法可让您直接在 Runnable.run () 方法中使用资源方法参数,而无需在本地类中定义对应的字段。

重要

要使这种特殊语法发挥作用,必须声明资源方法参数作为 最终性 (这意味着在方法实施中必须不更改它们)。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.