48.5. 异步响应


48.5.1. 服务器上的异步处理

概述

在服务器端异步处理调用的目的是,可以更有效地利用线程,并最终避免客户端连接尝试所拒绝的情况,因为所有服务器的请求线程都会被阻断。异步处理调用时,该请求线程几乎会立即释放。

注意

请注意,即使在服务器端启用了异步处理,客户端 仍会被阻止,直到它从服务器收到响应。如果您想在客户端中看到异步行为,则必须实施客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”

异步处理的基本模型

图 48.1 “同步处理的线程模型” 显示了服务器端异步处理的基本模型概述。

图 48.1. 同步处理的线程模型

asyncresponse 01

在提纲中,请求在异步模型中处理如下:

  1. 在请求线程内调用异步资源方法(并接收对 AsyncResponse 对象的引用),稍后需要发回响应。
  2. 资源方法将暂停的请求封装在可运行对象中,其中包含处理请求所需的所有信息和处理逻辑。
  3. 资源方法将 Runnable 对象推送到 executor 线程池的块队列中。
  4. 资源方法现在可以返回,从而释放请求线程。
  5. Runnable 对象到达队列的顶部时,它由 executor 线程池中的一个线程处理。然后,使用封装的 AsyncResponse 对象将响应发回到客户端。

使用 Java executor 实现线程池

java.util.concurrent API 是一个功能强大的 API,可让您非常轻松地创建完整的线程池实施。在 Java 并发 API 的术语中,线程池被称为 executor。它只需要单行代码来创建完整的工作线程池,包括工作线程和源源的块队列。

例如,要创建类似 图 48.1 “同步处理的线程模型” 可执行文件的 Executor Thread 池的完整工作池,请创建一个 java.util.concurrent.Executor 实例,如下所示:

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

此构造器创建具有五个线程的新线程池,由单个块队列来容纳最多 10 个可运行的对象。若要将任务提交到线程池,请调用 executor.execute 方法,传入对可 Runnable 对象的引用(封装异步任务)。

定义异步资源方法

要定义异步的资源类型,请使用 @Suspended 注释注入类型为 javax.ws.rs.container.AsyncResponse 的参数,并确保方法返回 void。例如:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

请注意,资源方法必须返回 void,因为注入的 AsyncResponse 对象将在以后返回响应。

AsyncResponse 类

javax.ws.rs.container.AsyncResponse 类在传入客户端连接中提供抽象句柄。当 AsyncResponse 对象注入资源方法时,底层 TCP 客户端连接最初处于 暂停 状态。稍后,当您准备好返回响应时,您可以通过在 AsyncResponse 实例中调用 恢复 来重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,可以在 AsyncResponse 实例上调用 取消

将暂停请求封装为可运行

图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您可以将暂停的请求推送到队列,稍后可在其中被专用线程池处理。但是,为了使这种方法能够工作,您需要以某种方式 封装 对象中暂停的请求。暂停的请求对象需要封装以下事项:

  • 来自传入请求的参数(若有)。
  • AsyncResponse 对象,它在传入客户端连接上提供一个句柄,以及发回响应的方法。
  • 调用的逻辑。

封装这些内容的便捷方法是定义一个可运行的类来代表暂停请求,其中 Runnable .run () 方法封装了调用的逻辑。执行此操作的最最好方法是将 Runnable 作为本地类实施,如下例所示。

异步处理示例

要实施异步处理场景,资源方法的实现必须将可运行的对象(代表暂停的请求)传递给 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将 Runnable 类定义为本地类,如下例所示:

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

注意资源方法参数、id响应 如何直接传递到可运行本地类的定义。这个特殊语法可让您直接在 Runnable.run () 方法中使用资源方法参数,而无需在本地类中定义对应的字段。

重要

要使这种特殊语法发挥作用,必须声明资源方法参数作为 最终性 (这意味着在方法实施中必须不更改它们)。

48.5.2. 超时和超时处理程序

概述

异步处理模型还支持在 REST 调用时实现超时。默认情况下,超时会导致将 HTTP 错误响应发送到客户端。但是,您还可以选择注册超时处理器回调,它可让您自定义对超时事件的响应。

设置没有处理程序的超时示例

要定义简单的调用超时,但不指定 timeout 处理程序,在 AsyncResponse 对象中调用 setTimeout 方法,如下例所示:

// Java
// Java
...
import java.util.concurrent.TimeUnit;
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/defaulttimeout")
    public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

请注意,您可以使用 java.util.concurrent.TimeUnit 类中的任何单元来指定超时值。前面的示例没有显示将请求发送到 executor 线程池的代码。如果您只想测试超时行为,可以在资源方法正文中包含对 async.SetTimeout 的调用,每次调用时会触发超时。

AsyncResponse.NO_TIMEOUT 值代表无限超时。

默认超时

默认情况下,如果触发调用超时,JAX-RS 运行时会引发 ServiceUnavailableException 异常,并发送 HTTP 错误响应,其状态为 503

TimeoutHandler 接口

如果要自定义超时行为,您必须通过实施 TimeoutHandler 接口来定义超时处理程序:

// Java
package javax.ws.rs.container;

public interface TimeoutHandler {
    public void handleTimeout(AsyncResponse asyncResponse);
}

当您覆盖实现类中的 handleTimeout 方法时,您可以选择以下方法来处理超时:

  • 通过调用 asyncResponse.cancel 方法取消响应。
  • 通过用响应值调用 asyncResponse.resume 方法来发送响应。
  • 通过调用 asyncResponse.setTimeout 方法来扩展等待的时间。(例如,要等待 10 秒,您可以调用 asyncResponse.setTimeout (10, TimeUnit.SECONDS))。

使用处理程序设置超时示例

要使用超时处理程序定义调用超时,请在 AsyncResponse 对象中调用 setTimeout Handler 方法和 setTimeoutHandler 方法,如下例所示:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/cancel")
    public void getBookDescriptionWithCancel(@PathParam("id") String id,
                                             @Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        async.setTimeoutHandler(new CancelTimeoutHandlerImpl());
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

在本例中注册一个 CancelTimeoutHandlerImpl timeout 处理程序实例,以处理调用超时。

使用超时处理程序取消响应

CancelTimeoutHandlerImpl timeout 处理程序定义如下:

// Java
...
import javax.ws.rs.container.AsyncResponse;
...
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    private class CancelTimeoutHandlerImpl implements TimeoutHandler {

        @Override
        public void handleTimeout(AsyncResponse asyncResponse) {
            asyncResponse.cancel();
        }

    }
    ...
}

AsyncResponse 对象上调用 取消 的效果是为客户端发送 HTTP 503 (服务不可用)错误响应。您可以选择为 cancel 方法指定参数( intjava.util.Date 值),该值将用于在响应消息中设置 Retry-After: HTTP 标头。但是,客户端通常忽略 Retry-After: 标头。

在可运行的实例中处理取消响应

如果您将暂停请求封装为可运行的实例,它会在 executor 线程池中排队处理,您可能会发现线程池围绕处理请求时已取消 AsyncResponse 已取消。因此,您很难将一些代码添加到可运行的实例,使它能处理已取消的 AsyncResponse 对象。例如:

// Java
...
@Path("/bookstore")
public class BookContinuationStore {
    ...
    private void sendRequestToThreadPool(final String id, final AsyncResponse response) {

        executor.execute(new Runnable() {
            public void run() {
                if ( !response.isCancelled() ) {
                    // Process the suspended request ...
                    // ...
                }
            }
        });

    }
    ...
}

48.5.3. 处理丢弃连接

概述

可以添加回调来处理客户端连接丢失的情况。

ConnectionCallback 接口

要为丢弃的连接添加回调,您必须实施 javax.ws.rs.container.ConnectionCallback 接口,该接口定义如下:

// Java
package javax.ws.rs.container;

public interface ConnectionCallback {
    public void onDisconnect(AsyncResponse disconnected);
}

注册连接回调

在实施了连接回调后,必须通过调用其中一个寄存器方法,将其注册到 当前的 AsyncResponse 对象。例如,注册类型为 MyConnectionCallback 的连接回调:

asyncResponse.register(new MyConnectionCallback());

连接回调的典型场景

通常,实施连接回调的主要原因是释放与丢弃客户端连接关联的资源(您可以使用 AsyncResponse 实例作为密钥来识别需要释放的资源)。

48.5.4. 注册回调

概述

您可以选择将回调添加到 AsyncResponse 实例,以便在调用完成时获得通知。当可以调用这个回调时,可以调用以下两种替代点:

  • 请求处理完成后,响应已发回到客户端,或者
  • 请求处理完成后,会将未映射的 Throwable 传播到托管 I/O 容器。

CompletionCallback 接口

要添加完成回调,您必须实施 javax.ws.rs.container.CompletionCallback 接口,该接口定义如下:

// Java
package javax.ws.rs.container;

public interface CompletionCallback {
    public void onComplete(Throwable throwable);
}

通常,可调参数为 null。但是,如果请求处理导致未映射异常,则会 抛出 包含未映射异常实例。

注册完成回调

实施完成回调后,必须通过调用其中一个寄存器方法,将其注册到 当前的 AsyncResponse 对象。例如,注册类型为 MyCompletionCallback 的完成回调:

asyncResponse.register(new MyCompletionCallback());
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.