2.21. 扩展 RESTEasy 支持异步请求处理和 Reactive Return 类型
扩展 RESTEasy 支持仅作为技术预览提供。技术预览功能不包括在红帽生产服务级别协议(SLA)中,且其功能可能并不完善。因此,红帽不建议在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。
如需有关 技术预览功能支持范围 的信息,请参阅红帽客户门户网站中的技术预览功能支持范围。
2.21.1. 可插拔响应类型 复制链接链接已复制到粘贴板!
Jakarta RESTful Web Services 2.1 可扩展,可支持各种被动库。RESTEasy 的可选模块 resteasy-rxjava2 支持以下被动类型:
-
io.reactivex.Single:与CompletionStage类似,因为它最多有一个潜在值。 -
io.reactivex.Flowable:Implementsio.reactivex.Publisher. -
io.reactivex.Observable:与 Flowable类似,只是不支持反压缩,订阅者可以通过调用Subscription.request()来控制从生产者收到的负载。
如果您导入 resteasy-rxjava2,您可以从服务器端的资源方法返回这些被动类型,并在客户端端接收它们。
resteasy-rxjava2 模块支持以下三个类分别访问客户端一侧的 Singles、Oservables 和 Flowables:
-
org.jboss.resteasy.rxjava2.SingleRxInvoker -
org.jboss.resteasy.rxjava2.FlowableRxInvoker -
org.jboss.resteasy.rxjava2.ObservableRxInvoker
2.21.2. 其他活跃类的扩展 复制链接链接已复制到粘贴板!
RESTEasy 实施框架,支持其他被动类的扩展。在服务器端,当资源方法返回 CompletionStage 类型时,RESTEasy 会使用 org.jboss.resteasy.core.AsyncResponseConsumer 类订阅它。当 CompletionStage 完成后,它会调用 CompletionStageResponseConsumer.accept(),它会将结果发回到客户端。
RESTEasy 内置了对 CompletionStage 的支持。我们可以通过提供一种将单机转换为完成阶段的机制,将支持扩展到 单 类 。在 resteasy-rxjava2 模块中,org.jboss.resteasy.rxjava2.SingleProvider 实施 org.jboss.resteasy.spi.AsyncResponseProvider<Single<?>> 接口提供此机制:
public interface AsyncResponseProvider<T> {
public CompletionStage toCompletionStage(T asyncResponse);
}
假定为 SingleProvider 类,RESTEasy 可以单选一个 单 级,将它转换为 CompletionStage ResponseConsumer,然后使用 CompletionStageResponseConsumer 来处理 单 台最终值。类似地,当资源方法返回流的被动类(如 Flowable )时,RESTEasy 订阅它,接收数据元素流并将其发送到客户端。AsyncResponseConsumer 具有多个支持类,各自实施不同的流模式。
例如,AsyncResponseConsumer.Async GeneralStreamingSseResponseConsumer 处理常规流和 SSE 流。订阅通过调用 org.reactivestreams.Publisher.subscribe() 完成,因此需要一种机制来 将流向 发布 程序,例如:也就是说,调用 org.jboss.resteasy.spi.AsyncStreamProvider<Flowable> 的实现,其定义 AsyncStreamProvider,如下例所示:
public interface AsyncStreamProvider<T> {
public Publisher toAsyncStream(T asyncResponse);
}
在 resteasy-rxjava2 模块中,org.jboss.resteasy.FlowableProvider 为 Flowable 提供这种机制。
这意味着,在服务器端,您可以通过为流或 AsyncResponse Provider 接口声明对流或 注释来添加对其他被动类型的支持。这两个接口都有一个方法,可将新的被动类型分别转换为 ProviderAsyncResponseProvider 接口的 @Publisher 或 CompletionStage (流)或单个值。
在客户端,Jkarta RESTful Web Services 2.1 提出了两个支持被动类的要求:
-
支持
CompletionStage作为javax.ws.rs.client.CompletionStageRxInvoker接口的实施。 - 支持注册实施的供应商的可扩展性:
public interface RxInvokerProvider<T extends RxInvoker> {
public boolean isProviderFor(Class<T> clazz);
public T getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService);
}
注册 RxInvokerProvider 后,您可以通过调用 javax.ws.rs.client.Invocation.Builder 方法来请求 RxInvoker:
public <T extends RxInvoker> T rx(Class<T> clazz);
您可以使用 RxInvoker 进行调用,以返回适当的被动类。例如:
FlowableRxInvoker invoker = client.target(generateURL("/get/string")).request().rx(FlowableRxInvoker.class);
Flowable<String> flowable = (Flowable<String>) invoker.get();
RESTEasy 为实施 RxInvokers 提供部分支持。例如,上面提到的 SingleProvider 还实施了 org.jboss.resteasy.spi.AsyncClientResponseProvider<Single<?>>,其中 AsyncClientResponseProvider 定义为以下内容:
public interface AsyncClientResponseProvider<T> {
public T fromCompletionStage(CompletionStage<?> completionStage);
}
2.21.3. 被动客户端 API 复制链接链接已复制到粘贴板!
RESTEasy 定义名为 RxInvoker 的新调用者类型,以及此类型的默认实施,名为 CompletionStageRxInvoker。CompletionStageRxInvoker 实施 Java 8 的界面 完成阶段。此接口声明了大量专门用于管理异步计算的方法。
2.21.4. 异步过滤器 复制链接链接已复制到粘贴板!
如果必须暂停执行过滤器直到有特定资源可用,您可以将其转换为异步过滤器。关闭请求异步不需要对您的资源方法声明或额外的过滤器声明进行任何更改。
要将过滤器的执行异步切换,您必须进行 cast:
-
ContainerRequestContext toSuspendableContainerRequestContext用于 pre 和 post 请求过滤器。 -
ContainerResponseContext转换为用于响应过滤器的 SuspendableContainerResponseContext。
这些上下文对象可以通过调用 suspend() 方法将当前过滤器的执行转换为异步。异步后,过滤器链会被暂停,只有在上下文对象中调用以下方法之一后才会恢复:
-
abortWith(Response):终止过滤器链,将给定的 Response 返回给客户端。这只适用于 ContainerRequestFilter。 -
restore():通过调用下一个过滤器来恢复过滤器链的执行。 -
Restore(可浏览):通过抛出给定异常来中止执行过滤器链。这的行为就像过滤器是同步的,并且三个给出的异常。
2.21.5. 代理 复制链接链接已复制到粘贴板!
代理是 RESTEasy 扩展,支持直观的编程风格,用特定于应用的接口调用来取代通用 Jakarta RESTful Web 服务调用者调用。代理框架扩展为包括 CompletionStage 和 RxJava2 类型 Single、Observable 和 Flowable。以下示例演示了 RESTEasy 代理如何工作:
示例 1:
@Path("")
public interface RxCompletionStageResource {
@GET
@Path("get/string")
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<String> getString();
}
@Path("")
public class RxCompletionStageResourceImpl {
@GET
@Path("get/string")
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<String> getString() { .... }
}
public class RxCompletionStageProxyTest {
private static ResteasyClient client;
private static RxCompletionStageResource proxy;
static {
client = new ResteasyClientBuilder().build();
proxy = client.target(generateURL("/")).proxy(RxCompletionStageResource.class);
}
@Test
public void testGet() throws Exception {
CompletionStage<String> completionStage = proxy.getString();
Assert.assertEquals("x", completionStage.toCompletableFuture().get());
}
}
示例 2:
public interface Rx2FlowableResource {
@GET
@Path("get/string")
@Produces(MediaType.TEXT_PLAIN)
@Stream
public Flowable<String> getFlowable();
}
@Path("")
public class Rx2FlowableResourceImpl {
@GET
@Path("get/string")
@Produces(MediaType.TEXT_PLAIN)
@Stream
public Flowable<String> getFlowable() { ... }
}
public class Rx2FlowableProxyTest {
private static ResteasyClient client;
private static Rx2FlowableResource proxy;
static {
client = new ResteasyClientBuilder().build();
proxy = client.target(generateURL("/")).proxy(Rx2FlowableResource.class);
}
@Test
public void testGet() throws Exception {
Flowable<String> flowable = proxy.getFlowable();
flowable.subscribe(
(String o) -> stringList.add(o),
(Throwable t) -> errors.incrementAndGet(),
() -> latch.countDown());
boolean waitResult = latch.await(30, TimeUnit.SECONDS);
Assert.assertTrue("Waiting for event to be delivered has timed out.", waitResult);
Assert.assertEquals(0, errors.get());
Assert.assertEquals(xStringList, stringList);
}
}