提交 3714d0e4 编写于 作者: A Alex Feigin 提交者: Arjen Poutsma

Expose future response in new AsyncServerResponse

This commit introduces AsyncServerResponse, an extension of
ServerResponse that is returned from ServerResponse.async and that
allows users to get the future response by calling the block method.
This is particularly useful for testing purposes.
上级 227d85a6
......@@ -16,161 +16,61 @@
package org.springframework.web.servlet.function;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.request.async.AsyncWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.servlet.ModelAndView;
/**
* Implementation of {@link ServerResponse} based on a {@link CompletableFuture}.
* Asynchronous subtype of {@link ServerResponse} that exposes the future
* response.
*
* @author Arjen Poutsma
* @since 5.3
* @since 5.3.2
* @see ServerResponse#async(Object)
*/
final class AsyncServerResponse extends ErrorHandlingServerResponse {
static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
"org.reactivestreams.Publisher", AsyncServerResponse.class.getClassLoader());
private final CompletableFuture<ServerResponse> futureResponse;
@Nullable
private final Duration timeout;
private AsyncServerResponse(CompletableFuture<ServerResponse> futureResponse, @Nullable Duration timeout) {
this.futureResponse = futureResponse;
this.timeout = timeout;
}
@Override
public HttpStatus statusCode() {
return delegate(ServerResponse::statusCode);
}
@Override
public int rawStatusCode() {
return delegate(ServerResponse::rawStatusCode);
}
@Override
public HttpHeaders headers() {
return delegate(ServerResponse::headers);
}
@Override
public MultiValueMap<String, Cookie> cookies() {
return delegate(ServerResponse::cookies);
public interface AsyncServerResponse extends ServerResponse {
/**
* Blocks indefinitely until the future response is obtained.
*/
ServerResponse block();
// Static creation methods
/**
* Create a {@code AsyncServerResponse} with the given asynchronous response.
* Parameter {@code asyncResponse} can be a
* {@link CompletableFuture CompletableFuture&lt;ServerResponse&gt;} or
* {@link Publisher Publisher&lt;ServerResponse&gt;} (or any
* asynchronous producer of a single {@code ServerResponse} that can be
* adapted via the {@link ReactiveAdapterRegistry}).
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
* {@code Publisher<ServerResponse>}
* @return the asynchronous response
*/
static AsyncServerResponse create(Object asyncResponse) {
return DefaultAsyncServerResponse.create(asyncResponse, null);
}
private <R> R delegate(Function<ServerResponse, R> function) {
ServerResponse response = this.futureResponse.getNow(null);
if (response != null) {
return function.apply(response);
}
else {
throw new IllegalStateException("Future ServerResponse has not yet completed");
}
/**
* Create a (built) response with the given asynchronous response.
* Parameter {@code asyncResponse} can be a
* {@link CompletableFuture CompletableFuture&lt;ServerResponse&gt;} or
* {@link Publisher Publisher&lt;ServerResponse&gt;} (or any
* asynchronous producer of a single {@code ServerResponse} that can be
* adapted via the {@link ReactiveAdapterRegistry}).
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
* {@code Publisher<ServerResponse>}
* @param timeout maximum time period to wait for before timing out
* @return the asynchronous response
*/
static AsyncServerResponse create(Object asyncResponse, Duration timeout) {
return DefaultAsyncServerResponse.create(asyncResponse, timeout);
}
@Nullable
@Override
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context)
throws ServletException, IOException {
writeAsync(request, response, createDeferredResult());
return null;
}
static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult<?> deferredResult)
throws ServletException, IOException {
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncManager.setAsyncWebRequest(asyncWebRequest);
try {
asyncManager.startDeferredResultProcessing(deferredResult);
}
catch (IOException | ServletException ex) {
throw ex;
}
catch (Exception ex) {
throw new ServletException("Async processing failed", ex);
}
}
private DeferredResult<ServerResponse> createDeferredResult() {
DeferredResult<ServerResponse> result;
if (this.timeout != null) {
result = new DeferredResult<>(this.timeout.toMillis());
}
else {
result = new DeferredResult<>();
}
this.futureResponse.handle((value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
}
result.setErrorResult(ex);
}
else {
result.setResult(value);
}
return null;
});
return result;
}
@SuppressWarnings({"unchecked"})
public static ServerResponse create(Object o, @Nullable Duration timeout) {
Assert.notNull(o, "Argument to async must not be null");
if (o instanceof CompletableFuture) {
CompletableFuture<ServerResponse> futureResponse = (CompletableFuture<ServerResponse>) o;
return new AsyncServerResponse(futureResponse, timeout);
}
else if (reactiveStreamsPresent) {
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass());
if (publisherAdapter != null) {
Publisher<ServerResponse> publisher = publisherAdapter.toPublisher(o);
ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class);
if (futureAdapter != null) {
CompletableFuture<ServerResponse> futureResponse =
(CompletableFuture<ServerResponse>) futureAdapter.fromPublisher(publisher);
return new AsyncServerResponse(futureResponse, timeout);
}
}
}
throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass());
}
}
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.servlet.function;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.request.async.AsyncWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.servlet.ModelAndView;
/**
* Default {@link AsyncServerResponse} implementation.
*
* @author Arjen Poutsma
* @since 5.3.2
*/
final class DefaultAsyncServerResponse extends ErrorHandlingServerResponse implements AsyncServerResponse {
static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
"org.reactivestreams.Publisher", DefaultAsyncServerResponse.class.getClassLoader());
private final CompletableFuture<ServerResponse> futureResponse;
@Nullable
private final Duration timeout;
private DefaultAsyncServerResponse(CompletableFuture<ServerResponse> futureResponse, @Nullable Duration timeout) {
this.futureResponse = futureResponse;
this.timeout = timeout;
}
@Override
public ServerResponse block() {
try {
if (this.timeout != null) {
return this.futureResponse.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
}
else {
return this.futureResponse.get();
}
}
catch (InterruptedException | ExecutionException | TimeoutException ex) {
throw new IllegalStateException("Failed to get future response", ex);
}
}
@Override
public HttpStatus statusCode() {
return delegate(ServerResponse::statusCode);
}
@Override
public int rawStatusCode() {
return delegate(ServerResponse::rawStatusCode);
}
@Override
public HttpHeaders headers() {
return delegate(ServerResponse::headers);
}
@Override
public MultiValueMap<String, Cookie> cookies() {
return delegate(ServerResponse::cookies);
}
private <R> R delegate(Function<ServerResponse, R> function) {
ServerResponse response = this.futureResponse.getNow(null);
if (response != null) {
return function.apply(response);
}
else {
throw new IllegalStateException("Future ServerResponse has not yet completed");
}
}
@Nullable
@Override
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context)
throws ServletException, IOException {
writeAsync(request, response, createDeferredResult());
return null;
}
static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult<?> deferredResult)
throws ServletException, IOException {
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncManager.setAsyncWebRequest(asyncWebRequest);
try {
asyncManager.startDeferredResultProcessing(deferredResult);
}
catch (IOException | ServletException ex) {
throw ex;
}
catch (Exception ex) {
throw new ServletException("Async processing failed", ex);
}
}
private DeferredResult<ServerResponse> createDeferredResult() {
DeferredResult<ServerResponse> result;
if (this.timeout != null) {
result = new DeferredResult<>(this.timeout.toMillis());
}
else {
result = new DeferredResult<>();
}
this.futureResponse.handle((value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
}
result.setErrorResult(ex);
}
else {
result.setResult(value);
}
return null;
});
return result;
}
@SuppressWarnings({"unchecked"})
public static AsyncServerResponse create(Object o, @Nullable Duration timeout) {
Assert.notNull(o, "Argument to async must not be null");
if (o instanceof CompletableFuture) {
CompletableFuture<ServerResponse> futureResponse = (CompletableFuture<ServerResponse>) o;
return new DefaultAsyncServerResponse(futureResponse, timeout);
}
else if (reactiveStreamsPresent) {
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass());
if (publisherAdapter != null) {
Publisher<ServerResponse> publisher = publisherAdapter.toPublisher(o);
ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class);
if (futureAdapter != null) {
CompletableFuture<ServerResponse> futureResponse =
(CompletableFuture<ServerResponse>) futureAdapter.fromPublisher(publisher);
return new DefaultAsyncServerResponse(futureResponse, timeout);
}
}
}
throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass());
}
}
......@@ -208,7 +208,7 @@ final class DefaultEntityResponseBuilder<T> implements EntityResponse.Builder<T>
return new CompletionStageEntityResponse(this.status, this.headers, this.cookies,
completionStage, this.entityType);
}
else if (AsyncServerResponse.reactiveStreamsPresent) {
else if (DefaultAsyncServerResponse.reactiveStreamsPresent) {
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(this.entity.getClass());
if (adapter != null) {
Publisher<T> publisher = adapter.toPublisher(this.entity);
......@@ -362,7 +362,7 @@ final class DefaultEntityResponseBuilder<T> implements EntityResponse.Builder<T>
Context context) throws ServletException, IOException {
DeferredResult<?> deferredResult = createDeferredResult(servletRequest, servletResponse, context);
AsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult);
DefaultAsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult);
return null;
}
......@@ -410,7 +410,7 @@ final class DefaultEntityResponseBuilder<T> implements EntityResponse.Builder<T>
Context context) throws ServletException, IOException {
DeferredResult<?> deferredResult = new DeferredResult<>();
AsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult);
DefaultAsyncServerResponse.writeAsync(servletRequest, servletResponse, deferredResult);
entity().subscribe(new DeferredResultSubscriber(servletRequest, servletResponse, context, deferredResult));
return null;
......
......@@ -236,7 +236,7 @@ public interface ServerResponse {
* @since 5.3
*/
static ServerResponse async(Object asyncResponse) {
return AsyncServerResponse.create(asyncResponse, null);
return DefaultAsyncServerResponse.create(asyncResponse, null);
}
/**
......@@ -257,7 +257,7 @@ public interface ServerResponse {
* @since 5.3.2
*/
static ServerResponse async(Object asyncResponse, Duration timeout) {
return AsyncServerResponse.create(asyncResponse, timeout);
return DefaultAsyncServerResponse.create(asyncResponse, timeout);
}
/**
......
......@@ -89,7 +89,7 @@ final class SseServerResponse extends AbstractServerResponse {
result = new DeferredResult<>();
}
AsyncServerResponse.writeAsync(request, response, result);
DefaultAsyncServerResponse.writeAsync(request, response, result);
this.sseConsumer.accept(new DefaultSseBuilder(response, context, result));
return null;
}
......
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.servlet.function;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Arjen Poutsma
*/
class DefaultAsyncServerResponseTests {
@Test
void block() {
ServerResponse wrappee = ServerResponse.ok().build();
CompletableFuture<ServerResponse> future = CompletableFuture.completedFuture(wrappee);
AsyncServerResponse response = AsyncServerResponse.create(future);
assertThat(response.block()).isSameAs(wrappee);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册