此内容没有您所选择的语言版本。
47.5. Asynchronous Response
47.5.1. Asynchronous Processing on the Server
Overview
The purpose of asynchronous processing of invocations on the server side is to enable more efficient use of threads and, ultimately, to avoid the scenario where client connection attempts are refused because all of the server's request threads are blocked. When an invocation is processed asynchronously, the request thread is freed up almost immediately.
Note
Note that even when asynchronous processing is enabled on the server side, a client will still remain blocked until it receives a response from the server. If you want to see asynchronous behaviour on the client side, you must implement client-side asynchronous processing. See ???.
Basic model for asynchronous processing
Figure 47.1, “Threading Model for Asynchronous Processing” shows an overview of the basic model for asynchronous processing on the server side.
Figure 47.1. Threading Model for Asynchronous Processing
In outline, a request is processed as follows in the asynchronous model:
- An asynchronous resource method is invoked within a request thread (and receives a reference to an
AsyncResponse
object, which will be needed later to send back the response). - The resource method encapsulates the suspended request in a
Runnable
object, which contains all of the information and processing logic required to process the request. - The resource method pushes the Runnable object onto the blocking queue of the executor thread pool.
- The resource method can now return, thus freeing up the request thread.
- When the
Runnable
object gets to the top of the queue, it is processed by one of the threads in the executor thread pool. The encapsulatedAsyncResponse
object is then used to send the response back to the client.
Thread pool implementation with Java executor
The
java.util.concurrent
API is a powerful API that enables you to create a complete thread pool implementation very easily. In the terminology of the Java concurrency API, a thread pool is called an executor. It requires only a single line of code to create a complete working thread pool, including the working threads and the blocking queue that feeds them.
For example, to create a complete working thread pool like the Executor Thread Pool shown in Figure 47.1, “Threading Model for Asynchronous Processing”, create a
java.util.concurrent.Executor
instance, as follows:
Executor executor = new ThreadPoolExecutor( 5, // Core pool size 5, // Maximum pool size 0, // Keep-alive time TimeUnit.SECONDS, // Time unit new ArrayBlockingQueue<Runnable>(10) // Blocking queue );
This constructor creates a new thread pool with five threads, fed by a single blocking queue with which can hold up to 10
Runnable
objects. To submit a task to the thread pool, call the executor.execute
method, passing in a reference to a Runnable
object (which encapsulates the asynchronous task).
Defining an asynchronous resource method
To define a resource method that is asynchronous, inject an argument of type
javax.ws.rs.container.AsyncResponse
using the @Suspended
annotation and make sure that the method returns void
. For example:
// Java ... import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("{id}") public void handleRequestInPool(@PathParam("id") String id, @Suspended AsyncResponse response) { ... } ... }
Note that the resource method must return
void
, because the injected AsyncResponse
object will be used to return the response at a later time.
AsyncResponse class
The
javax.ws.rs.container.AsyncResponse
class provides an a abstract handle on an incoming client connection. When an AsyncResponse
object is injected into a resource method, the underlying TCP client connection is initially in a suspended state. At a later time, when you are ready to return the response, you can re-activate the underlying TCP client connection and pass back the response, by calling resume
on the AsyncResponse
instance. Alternatively, if you need to abort the invocation, you could call cancel
on the AsyncResponse
instance.
Encapsulating a suspended request as a Runnable
In the asynchronous processing scenario shown in Figure 47.1, “Threading Model for Asynchronous Processing”, you push the suspended request onto a queue, from where it can be processed at a later time by a dedicated thread pool. In order for this approach to work, however, you need to have some way of encapsulating the suspended request in an object. The suspended request object needs to encapsulate the following things:
- Parameters from the incoming request (if any).
- The
AsyncResponse
object, which provides a handle on the incoming client connection and a way of sending back the response. - The logic of the invocation.
A convenient way to encapsulate these things is to define a
Runnable
class to represent the suspended request, where the Runnable.run()
method encapsulates the logic of the invocation. The most elegant way to do this is to implement the Runnable
as a local class, as shown in the following example.
Example of asynchronous processing
To implement the asynchronous processing scenario, the implementation of the resource method must pass a
Runnable
object (representing the suspended request) to the executor thread pool. In Java 7 and 8, you can exploit some novel syntax to define the Runnable
class as a local class, as shown in the following example:
// Java package org.apache.cxf.systest.jaxrs; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.CompletionCallback; import javax.ws.rs.container.ConnectionCallback; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; import org.apache.cxf.phase.PhaseInterceptorChain; @Path("/bookstore") public class BookContinuationStore { private Map<String, String> books = new HashMap<String, String>(); private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)); public BookContinuationStore() { init(); } ... @GET @Path("{id}") public void handleRequestInPool(final @PathParam("id") String id, final @Suspended AsyncResponse response) { executor.execute(new Runnable() { public void run() { // Retrieve the book data for 'id' // which is presumed to be a very slow, blocking operation // ... bookdata = ... // Re-activate the client connection with 'resume' // and send the 'bookdata' object as the response response.resume(bookdata); } }); } ... }
Note how the resource method arguments,
id
and response
, are passed straight into the definition of the Runnable
local class. This special syntax enables you to use the resource method arguments directly in the Runnable.run()
method, without having to define corresponding fields in the local class.
Important
In order for this special syntax to work, the resource method parameters must be declared as
final
(which implies that they must not be changed in the method implementation).
47.5.2. Timeouts and Timeout Handlers
Overview
The asynchronous processing model also provides support for imposing timeouts on REST invocations. By default, a timeout results in a HTTP error response being sent back to the client. But you also have the option of registering a timeout handler callback, which enables you to customize the response to a timeout event.
Example of setting a timeout without a handler
To define a simple invocation timeout, without specifying a timeout handler, call the
setTimeout
method on the AsyncResponse
object, as shown in the following example:
// Java // Java ... import java.util.concurrent.TimeUnit; ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/defaulttimeout") public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); // Optionally, send request to executor queue for processing // ... } ... }
Note that you can specify the timeout value using any time unit from the
java.util.concurrent.TimeUnit
class. The preceding example does not show the code for sending the request to the executor thread pool. If you just wanted to test the timeout behaviour, you could include just the call to async.SetTimeout
in the resource method body, and the timeout would be triggered on every invocation.
Tip
The
AsyncResponse.NO_TIMEOUT
value represents an infinite timeout.
Default timeout behaviour
By default, if the invocation timeout is triggered, the JAX-RS runtime raises a
ServiceUnavailableException
exception and sends back a HTTP error response with the status 503
.
TimeoutHandler interface
If you want to customize the timeout behaviour, you must define a timeout handler, by implementing the
TimeoutHandler
interface:
// Java package javax.ws.rs.container; public interface TimeoutHandler { public void handleTimeout(AsyncResponse asyncResponse); }
When you override the
handleTimeout
method in your implementation class, you can choose between the following approaches to dealing with the timeout:
- Cancel the response, by calling the
asyncResponse.cancel
method. - Send a response, by calling the
asyncResponse.resume
method with the response value. - Extend the waiting period, by calling the
asyncResponse.setTimeout
method. (For example, to wait for a further 10 seconds, you could callasyncResponse.setTimeout(10, TimeUnit.SECONDS)
).
Example of setting a timeout with a handler
To define an invocation timeout with a timeout handler, call both the
setTimeout
method and the setTimeoutHandler
method on the AsyncResponse
object, as shown in the following example:
// Java ... import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... @GET @Path("/books/cancel") public void getBookDescriptionWithCancel(@PathParam("id") String id, @Suspended AsyncResponse async) { async.setTimeout(2000, TimeUnit.MILLISECONDS); async.setTimeoutHandler(new CancelTimeoutHandlerImpl()); // Optionally, send request to executor queue for processing // ... } ... }
Where this example registers an instance of the
CancelTimeoutHandlerImpl
timeout handler to handle the invocation timeout.
Using a timeout handler to cancel the response
The
CancelTimeoutHandlerImpl
timeout handler is defined as follows:
// Java ... import javax.ws.rs.container.AsyncResponse; ... import javax.ws.rs.container.TimeoutHandler; @Path("/bookstore") public class BookContinuationStore { ... private class CancelTimeoutHandlerImpl implements TimeoutHandler { @Override public void handleTimeout(AsyncResponse asyncResponse) { asyncResponse.cancel(); } } ... }
The effect of calling
cancel
on the AsyncResponse
object is to send a HTTP 503 (Service unavailable
) error response to the client. You can optionally specify an argument to the cancel
method (either an int
or a java.util.Date
value), which would be used to set a Retry-After:
HTTP header in the response message. Clients often ignore the Retry-After:
header, however.
Dealing with a cancelled response in the Runnable instance
If you have encapsulated a suspended request as a
Runnable
instance, which is queued for processing in an executor thread pool, you might find that the AsyncResponse
has been cancelled by the time the thread pool gets around to processing the request. For this reason, you ought to add some code to your Runnable
instance, which enables it to cope with a cancelled AsyncResponse
object. For example:
// Java ... @Path("/bookstore") public class BookContinuationStore { ... private void sendRequestToThreadPool(final String id, final AsyncResponse response) { executor.execute(new Runnable() { public void run() { if ( !response.isCancelled() ) { // Process the suspended request ... // ... } } }); } ... }
47.5.3. Handling Dropped Connections
Overview
It is possible to add a callback to deal with the case where the client connection is lost.
ConnectionCallback interface
To add a callback for dropped connections, you must implement the
javax.ws.rs.container.ConnectionCallback
interface, which is defined as follows:
// Java package javax.ws.rs.container; public interface ConnectionCallback { public void onDisconnect(AsyncResponse disconnected); }
Registering a connection callback
After implementing a connection callback, you must register it with the current
AsyncResponse
object, by calling one of the register
methods. For example, to register a connection callback of type, MyConnectionCallback
:
asyncResponse.register(new MyConnectionCallback());
Typical scenario for connection callback
Typically, the main reason for implementing a connection callback would be to free up resources associated with the dropped client connection (where you could use the
AsyncResponse
instance as the key to identify the resources that need to be freed).
47.5.4. Registering Callbacks
Overview
You can optionally add a callback to an
AsyncResponse
instance, in order to be notified when the invocation has completed. There are two alternative points in the processing when this callback can be invoked, either:
- After the request processing is finished and the response has already been sent back to the client, or
- After the request processing is finished and an unmapped
Throwable
has been propagated to the hosting I/O container.
CompletionCallback interface
To add a completion callback, you must implement the
javax.ws.rs.container.CompletionCallback
interface, which is defined as follows:
// Java package javax.ws.rs.container; public interface CompletionCallback { public void onComplete(Throwable throwable); }
Usually, the
throwable
argument is null
. However, if the request processing resulted in an unmapped exception, throwable
contains the unmapped exception instance.
Registering a completion callback
After implementing a completion callback, you must register it with the current
AsyncResponse
object, by calling one of the register
methods. For example, to register a completion callback of type, MyCompletionCallback
:
asyncResponse.register(new MyCompletionCallback());