提交 7ee821d3 编写于 作者: R Rossen Stoyanchev

Add ability to handle a timeout to DeferredResult

When a controller returns a DeferredResult, the underlying async
request will eventually time out. Until now the default behavior was
to send a 503 (SERVICE_UNAVAILABLE). However, this is not desirable
in all cases. For example if waiting on an event, a timeout simply
means there is no new information to send.

To handle those cases a DeferredResult now accespts a timeout result
Object in its constructor. If the timeout occurs before the
DeferredResult is set, the timeout result provided to the constructor
is used instead.

Issue: SPR-8617
上级 f37efb42
......@@ -143,7 +143,7 @@ public final class AsyncExecutionChain {
}
private Callable<Object> buildChain() {
Assert.state(this.callable != null, "The callable field is required to complete the chain");
Assert.state(this.callable != null, "The last callable is required to build the async chain");
this.delegatingCallables.add(new StaleAsyncRequestCheckingCallable(asyncWebRequest));
Callable<Object> result = this.callable;
for (int i = this.delegatingCallables.size() - 1; i >= 0; i--) {
......@@ -165,25 +165,39 @@ public final class AsyncExecutionChain {
* the threading model, i.e. whether a TaskExecutor is used.
* @see DeferredResult
*/
public void startDeferredResultProcessing(DeferredResult deferredResult) {
Assert.notNull(deferredResult, "A DeferredResult is required");
public void startDeferredResultProcessing(final DeferredResult deferredResult) {
Assert.notNull(deferredResult, "DeferredResult is required");
startAsync();
deferredResult.setValueProcessor(new DeferredResultHandler() {
public void handle(Object value) {
deferredResult.init(new DeferredResultHandler() {
public void handle(Object result) {
if (asyncWebRequest.isAsyncCompleted()) {
throw new StaleAsyncWebRequestException("Async request processing already completed");
}
setCallable(getSimpleCallable(value));
setCallable(new PassThroughCallable(result));
new AsyncExecutionChainRunnable(asyncWebRequest, buildChain()).run();
}
});
if (deferredResult.canHandleTimeout()) {
this.asyncWebRequest.setTimeoutHandler(new Runnable() {
public void run() {
deferredResult.handleTimeout();
}
});
}
}
private Callable<Object> getSimpleCallable(final Object value) {
return new Callable<Object>() {
public Object call() throws Exception {
return value;
}
};
private static class PassThroughCallable implements Callable<Object> {
private final Object value;
public PassThroughCallable(Object value) {
this.value = value;
}
public Object call() throws Exception {
return this.value;
}
}
}
......@@ -49,7 +49,6 @@ public class AsyncExecutionChainRunnable implements Runnable {
public AsyncExecutionChainRunnable(AsyncWebRequest asyncWebRequest, Callable<?> callable) {
Assert.notNull(asyncWebRequest, "An AsyncWebRequest is required");
Assert.notNull(callable, "A Callable is required");
Assert.state(asyncWebRequest.isAsyncStarted(), "Not an async request");
this.asyncWebRequest = asyncWebRequest;
this.callable = callable;
}
......
......@@ -35,6 +35,12 @@ public interface AsyncWebRequest extends NativeWebRequest {
*/
void setTimeout(Long timeout);
/**
* Invoked on a timeout to complete the response instead of the default
* behavior that sets the status to 503 (SERVICE_UNAVAILABLE).
*/
void setTimeoutHandler(Runnable runnable);
/**
* Mark the start of async request processing for example ensuring the
* request remains open in order to be completed in a separate thread.
......
......@@ -16,72 +16,135 @@
package org.springframework.web.context.request.async;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.util.Assert;
/**
* DeferredResult provides an alternative to using a Callable to complete async
* request processing. Whereas with a Callable the framework manages a thread on
* behalf of the application through an {@link AsyncTaskExecutor}, with a
* DeferredResult the application can produce a value using a thread of its choice.
* DeferredResult provides an alternative to using a Callable for async request
* processing. With a Callable the framework manages a thread on behalf of the
* application through an {@link AsyncTaskExecutor}. With a DeferredResult the
* application sets the result in a thread of its choice.
*
* <p>The following sequence describes typical use of a DeferredResult:
* <p>The following sequence describes the intended use scenario:
* <ol>
* <li>Application method (e.g. controller method) returns a DeferredResult instance
* <li>The framework completes initialization of the returned DeferredResult in the same thread
* <li>The application calls {@link DeferredResult#set(Object)} from another thread
* <li>The framework completes request processing in the thread in which it is invoked
* <li>thread-1: framework calls application method
* <li>thread-1: application method returns a DeferredResult
* <li>thread-1: framework initializes DeferredResult
* <li>thread-2: application calls {@link #set(Object)}
* <li>thread-2: framework completes async processing with given result
* </ol>
*
* <p><strong>Note:</strong> {@link DeferredResult#set(Object)} will block if
* called before the DeferredResult is fully initialized (by the framework).
* Application code should never create a DeferredResult and set it immediately:
*
* <pre>
* DeferredResult value = new DeferredResult();
* value.set(1); // blocks
* </pre>
* <p>If the application calls {@link #set(Object)} in thread-2 before the
* DeferredResult is initialized by the framework in thread-1, then thread-2
* will block and wait for the initialization to complete. Therefore an
* application should never create and set the DeferredResult in the same
* thread because the initialization will never complete.</p>
*
* @author Rossen Stoyanchev
* @since 3.2
*/
public final class DeferredResult {
private final AtomicReference<Object> value = new AtomicReference<Object>();
private final static Object TIMEOUT_RESULT_NONE = new Object();
private Object result;
private final Object timeoutResult;
private DeferredResultHandler resultHandler;
private final CountDownLatch readySignal = new CountDownLatch(1);
private final ReentrantLock timeoutLock = new ReentrantLock();
/**
* Create a new instance.
*/
public DeferredResult() {
this(TIMEOUT_RESULT_NONE);
}
/**
* Create a new instance and also provide a default result to use if a
* timeout occurs before {@link #set(Object)} is called.
*/
public DeferredResult(Object timeoutResult) {
this.timeoutResult = timeoutResult;
}
private final BlockingQueue<DeferredResultHandler> handlers = new ArrayBlockingQueue<DeferredResultHandler>(1);
boolean canHandleTimeout() {
return this.timeoutResult != TIMEOUT_RESULT_NONE;
}
/**
* Provide a value to use to complete async request processing.
* This method should be invoked only once and usually from a separate
* thread to allow the framework to fully initialize the created
* DeferrredValue. See the class level documentation for more details.
* Complete async processing with the given result. If the DeferredResult is
* not yet fully initialized, this method will block and wait for that to
* occur before proceeding. See the class level javadoc for more details.
*
* @throws StaleAsyncWebRequestException if the underlying async request
* ended due to a timeout or an error before the value was set.
* has already timed out or ended due to a network error.
*/
public void set(Object value) throws StaleAsyncWebRequestException {
Assert.isNull(this.value.get(), "Value already set");
this.value.set(value);
public void set(Object result) throws StaleAsyncWebRequestException {
if (this.timeoutLock.tryLock() && (this.result != this.timeoutResult)) {
try {
handle(result);
}
finally {
this.timeoutLock.unlock();
}
}
else {
// A timeout is in progress
throw new StaleAsyncWebRequestException("Async request already timed out");
}
}
/**
* Invoked to complete async processing when a timeout occurs before
* {@link #set(Object)} is called. Or if {@link #set(Object)} is already in
* progress, this method blocks, waits for it to complete, and then returns.
*/
void handleTimeout() {
Assert.state(canHandleTimeout(), "Can't handle timeout");
this.timeoutLock.lock();
try {
if (this.result == null) {
handle(this.timeoutResult);
}
}
finally {
this.timeoutLock.unlock();
}
}
private void handle(Object result) throws StaleAsyncWebRequestException {
Assert.isNull(this.result, "A deferred result can be set once only");
this.result = result;
try {
this.handlers.take().handle(value);
this.readySignal.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new IllegalStateException("Failed to process deferred return value: " + value, e);
throw new IllegalStateException(
"Gave up on waiting for DeferredResult to be initialized. " +
"Are you perhaps creating and setting a DeferredResult in the same thread? " +
"The DeferredResult must be fully initialized before you can set it. " +
"See the class javadoc for more details");
}
this.resultHandler.handle(result);
}
void setValueProcessor(DeferredResultHandler handler) {
this.handlers.add(handler);
void init(DeferredResultHandler handler) {
this.resultHandler = handler;
this.readySignal.countDown();
}
/**
* Puts the set value through processing wiht the async execution chain.
* Completes processing when {@link DeferredResult#set(Object)} is called.
*/
interface DeferredResultHandler {
......
......@@ -39,6 +39,9 @@ public class NoOpAsyncWebRequest extends ServletWebRequest implements AsyncWebRe
public void setTimeout(Long timeout) {
}
public void setTimeoutHandler(Runnable runnable) {
}
public boolean isAsyncStarted() {
return false;
}
......
......@@ -48,6 +48,8 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
private AtomicBoolean asyncCompleted = new AtomicBoolean(false);
private Runnable timeoutHandler;
public StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
super(request, response);
}
......@@ -64,6 +66,10 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
return this.asyncCompleted.get();
}
public void setTimeoutHandler(Runnable timeoutHandler) {
this.timeoutHandler = timeoutHandler;
}
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
......@@ -111,8 +117,13 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
// ---------------------------------------------------------------------
public void onTimeout(AsyncEvent event) throws IOException {
if (this.timeoutHandler == null) {
getResponse().sendError(HttpStatus.SERVICE_UNAVAILABLE.value());
}
else {
this.timeoutHandler.run();
}
completeInternal();
getResponse().sendError(HttpStatus.SERVICE_UNAVAILABLE.value());
}
public void onError(AsyncEvent event) throws IOException {
......
......@@ -123,7 +123,7 @@ public class AsyncExecutionChainTests {
fail("Expected exception");
}
catch (IllegalStateException ex) {
assertThat(ex.getMessage(), containsString("The callable field is required"));
assertThat(ex.getMessage(), containsString("last callable is required"));
}
}
......@@ -171,7 +171,7 @@ public class AsyncExecutionChainTests {
fail("Expected exception");
}
catch (IllegalArgumentException ex) {
assertThat(ex.getMessage(), containsString("A DeferredResult is required"));
assertThat(ex.getMessage(), containsString("DeferredResult is required"));
}
}
......@@ -186,6 +186,10 @@ public class AsyncExecutionChainTests {
super(request, response);
}
public void setTimeout(Long timeout) { }
public void setTimeoutHandler(Runnable runnable) { }
public void startAsync() {
this.asyncStarted = true;
}
......@@ -194,8 +198,6 @@ public class AsyncExecutionChainTests {
return this.asyncStarted;
}
public void setTimeout(Long timeout) { }
public void complete() {
this.asyncStarted = false;
this.asyncCompleted = true;
......
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.context.request.async;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
/**
* DeferredResult tests.
*
* @author Rossen Stoyanchev
*/
public class DeferredResultTests {
@Test
public void canHandleTimeout() {
assertFalse(new DeferredResult().canHandleTimeout());
assertTrue(new DeferredResult("foo").canHandleTimeout());
}
@Test
public void set() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
DeferredResult deferredResult = new DeferredResult();
deferredResult.init(resultHandler);
resultHandler.handle("foo");
replay(resultHandler);
deferredResult.set("foo");
verify(resultHandler);
}
@Test
public void handleTimeout() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
DeferredResult deferredResult = new DeferredResult("foo");
deferredResult.init(resultHandler);
resultHandler.handle("foo");
replay(resultHandler);
deferredResult.handleTimeout();
verify(resultHandler);
}
@Test(expected=IllegalStateException.class)
public void handleTimeout_timeoutResultNone() {
new DeferredResult().handleTimeout();
}
@Test
public void setAfterHandleTimeout() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
DeferredResult deferredResult = new DeferredResult("foo");
deferredResult.init(resultHandler);
resultHandler.handle("foo");
replay(resultHandler);
deferredResult.handleTimeout();
verify(resultHandler);
try {
deferredResult.set("foo");
fail("Expected exception");
}
catch (StaleAsyncWebRequestException ex) {
// expected
}
}
@Test
public void setBeforeHandleTimeout() {
DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class);
DeferredResult deferredResult = new DeferredResult("foo");
deferredResult.init(resultHandler);
resultHandler.handle("foo");
replay(resultHandler);
deferredResult.set("foo");
verify(resultHandler);
reset(resultHandler);
replay(resultHandler);
deferredResult.handleTimeout();
verify(resultHandler);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册