提交 1e62ad86 编写于 作者: R Rob Winch 提交者: Rossen Stoyanchev

Add beforeConcurrentHandling support

Previously CallableProcessingInterceptor and
DeferredResultProcessingInterceptor did not have support for capturing
the state of the original Thread just prior to processing. This made it
difficult to transfer the state of one Thread (i.e. ThreadLocal) to the
Thread used to process the Callable.

This commit adds a new method to CallableProcessingInterceptor and
DeferredResultProcessingInterceptor named beforeConcurrentHandling
which will be invoked on the original Thread used to submit the Callable
or DeferredResult. This means the state of the original Thread can be
captured in beforeConcurrentHandling and transfered to the new Thread
in preProcess.

Issue: SPR-10052
上级 0d73d199
......@@ -26,6 +26,7 @@ import org.springframework.web.context.request.NativeWebRequest;
* Assists with the invocation of {@link CallableProcessingInterceptor}'s.
*
* @author Rossen Stoyanchev
* @author Rob Winch
* @since 3.2
*/
class CallableInterceptorChain {
......@@ -41,6 +42,12 @@ class CallableInterceptorChain {
this.interceptors = interceptors;
}
public void applyBeforeConcurrentHandling(NativeWebRequest request, Callable<?> task) throws Exception {
for (CallableProcessingInterceptor interceptor : this.interceptors) {
interceptor.beforeConcurrentHandling(request, task);
}
}
public void applyPreProcess(NativeWebRequest request, Callable<?> task) throws Exception {
for (CallableProcessingInterceptor interceptor : this.interceptors) {
interceptor.preProcess(request, task);
......
......@@ -39,6 +39,7 @@ import org.springframework.web.context.request.NativeWebRequest;
* can select a value to be used to resume processing.
*
* @author Rossen Stoyanchev
* @author Rob Winch
* @since 3.2
*/
public interface CallableProcessingInterceptor {
......@@ -47,6 +48,24 @@ public interface CallableProcessingInterceptor {
static final Object RESPONSE_HANDLED = new Object();
/**
* Invoked <em>before</em> the start of concurrent handling in the original
* thread in which the {@code Callable} is submitted for concurrent handling.
*
* <p>
* This is useful for capturing the state of the current thread just prior to
* invoking the {@link Callable}. Once the state is captured, it can then be
* transfered to the new {@link Thread} in
* {@link #preProcess(NativeWebRequest, Callable)}. Capturing the state of
* Spring Security's SecurityContextHolder and migrating it to the new Thread
* is a concrete example of where this is useful.
* </p>
*
* @param request the current request
* @param task the task for the current async request
* @throws Exception in case of errors
*/
<T> void beforeConcurrentHandling(NativeWebRequest request, Callable<T> task) throws Exception;
/**
* Invoked <em>after</em> the start of concurrent handling in the async
......
......@@ -24,10 +24,17 @@ import org.springframework.web.context.request.NativeWebRequest;
* for simplified implementation of individual methods.
*
* @author Rossen Stoyanchev
* @author Rob Winch
* @since 3.2
*/
public abstract class CallableProcessingInterceptorAdapter implements CallableProcessingInterceptor {
/**
* This implementation is empty.
*/
public <T> void beforeConcurrentHandling(NativeWebRequest request, Callable<T> task) throws Exception {
}
/**
* This implementation is empty.
*/
......
......@@ -40,6 +40,12 @@ class DeferredResultInterceptorChain {
this.interceptors = interceptors;
}
public void applyBeforeConcurrentHandling(NativeWebRequest request, DeferredResult<?> deferredResult) throws Exception {
for (DeferredResultProcessingInterceptor interceptor : this.interceptors) {
interceptor.beforeConcurrentHandling(request, deferredResult);
}
}
public void applyPreProcess(NativeWebRequest request, DeferredResult<?> deferredResult) throws Exception {
for (DeferredResultProcessingInterceptor interceptor : this.interceptors) {
interceptor.preProcess(request, deferredResult);
......
......@@ -36,10 +36,22 @@ import org.springframework.web.context.request.NativeWebRequest;
* method can set the {@code DeferredResult} in order to resume processing.
*
* @author Rossen Stoyanchev
* @author Rob Winch
* @since 3.2
*/
public interface DeferredResultProcessingInterceptor {
/**
* Invoked immediately before the start of concurrent handling, in the same
* thread that started it. This method may be used to capture state just prior
* to the start of concurrent processing with the given {@code DeferredResult}.
*
* @param request the current request
* @param deferredResult the DeferredResult for the current request
* @throws Exception in case of errors
*/
<T> void beforeConcurrentHandling(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception;
/**
* Invoked immediately after the start of concurrent handling, in the same
* thread that started it. This method may be used to detect the start of
......
......@@ -22,10 +22,17 @@ import org.springframework.web.context.request.NativeWebRequest;
* interface for simplified implementation of individual methods.
*
* @author Rossen Stoyanchev
* @author Rob Winch
* @since 3.2
*/
public abstract class DeferredResultProcessingInterceptorAdapter implements DeferredResultProcessingInterceptor {
/**
* This implementation is empty.
*/
public <T> void beforeConcurrentHandling(NativeWebRequest request, DeferredResult<T> deferredResult) throws Exception {
}
/**
* This implementation is empty.
*/
......
......@@ -249,12 +249,13 @@ public final class WebAsyncManager {
* @param callable a unit of work to be executed asynchronously
* @param processingContext additional context to save that can be accessed
* via {@link #getConcurrentResultContext()}
* @throws Exception If concurrent processing failed to start
*
* @see #getConcurrentResult()
* @see #getConcurrentResultContext()
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void startCallableProcessing(final Callable<?> callable, Object... processingContext) {
public void startCallableProcessing(final Callable<?> callable, Object... processingContext) throws Exception {
Assert.notNull(callable, "Callable must not be null");
startCallableProcessing(new WebAsyncTask(callable), processingContext);
}
......@@ -267,8 +268,9 @@ public final class WebAsyncManager {
* @param webAsyncTask an WebAsyncTask containing the target {@code Callable}
* @param processingContext additional context to save that can be accessed
* via {@link #getConcurrentResultContext()}
* @throws Exception If concurrent processing failed to start
*/
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) {
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
......@@ -306,6 +308,8 @@ public final class WebAsyncManager {
}
});
interceptorChain.applyBeforeConcurrentHandling(asyncWebRequest, callable);
startAsyncProcessing(processingContext);
this.taskExecutor.submit(new Runnable() {
......@@ -356,12 +360,13 @@ public final class WebAsyncManager {
* @param deferredResult the DeferredResult instance to initialize
* @param processingContext additional context to save that can be accessed
* via {@link #getConcurrentResultContext()}
* @throws Exception If concurrent processing failed to start
*
* @see #getConcurrentResult()
* @see #getConcurrentResultContext()
*/
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) {
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
......@@ -395,6 +400,8 @@ public final class WebAsyncManager {
}
});
interceptorChain.applyBeforeConcurrentHandling(asyncWebRequest, deferredResult);
startAsyncProcessing(processingContext);
try {
......
......@@ -66,7 +66,7 @@ public class WebAsyncManagerTests {
}
@Test
public void startAsyncProcessingWithoutAsyncWebRequest() {
public void startAsyncProcessingWithoutAsyncWebRequest() throws Exception {
WebAsyncManager manager = WebAsyncUtils.getAsyncManager(new MockHttpServletRequest());
try {
......@@ -118,6 +118,7 @@ public class WebAsyncManagerTests {
Callable<Object> task = new StubCallable(concurrentResult);
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, task);
interceptor.preProcess(this.asyncWebRequest, task);
interceptor.postProcess(this.asyncWebRequest, task, new Integer(concurrentResult));
replay(interceptor);
......@@ -140,6 +141,7 @@ public class WebAsyncManagerTests {
Callable<Object> task = new StubCallable(concurrentResult);
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, task);
interceptor.preProcess(this.asyncWebRequest, task);
interceptor.postProcess(this.asyncWebRequest, task, concurrentResult);
replay(interceptor);
......@@ -155,6 +157,34 @@ public class WebAsyncManagerTests {
verify(interceptor, this.asyncWebRequest);
}
@Test
public void startCallableProcessingBeforeConcurrentHandlingException() throws Exception {
Callable<Object> task = new StubCallable(21);
Exception exception = new Exception();
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, task);
expectLastCall().andThrow(exception);
replay(interceptor);
this.asyncWebRequest.addTimeoutHandler((Runnable) notNull());
this.asyncWebRequest.addCompletionHandler((Runnable) notNull());
replay(this.asyncWebRequest);
this.asyncManager.registerCallableInterceptor("interceptor", interceptor);
try {
this.asyncManager.startCallableProcessing(task);
fail("Expected Exception");
}catch(Exception e) {
assertEquals(exception, e);
}
assertFalse(this.asyncManager.hasConcurrentResult());
verify(this.asyncWebRequest, interceptor);
}
@Test
public void startCallableProcessingPreProcessException() throws Exception {
......@@ -162,6 +192,7 @@ public class WebAsyncManagerTests {
Exception exception = new Exception();
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, task);
interceptor.preProcess(this.asyncWebRequest, task);
expectLastCall().andThrow(exception);
replay(interceptor);
......@@ -184,6 +215,7 @@ public class WebAsyncManagerTests {
Exception exception = new Exception();
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, task);
interceptor.preProcess(this.asyncWebRequest, task);
interceptor.postProcess(this.asyncWebRequest, task, 21);
expectLastCall().andThrow(exception);
......@@ -207,11 +239,13 @@ public class WebAsyncManagerTests {
Exception exception = new Exception();
CallableProcessingInterceptor interceptor1 = createMock(CallableProcessingInterceptor.class);
interceptor1.beforeConcurrentHandling(this.asyncWebRequest, task);
interceptor1.preProcess(this.asyncWebRequest, task);
interceptor1.postProcess(this.asyncWebRequest, task, 21);
replay(interceptor1);
CallableProcessingInterceptor interceptor2 = createMock(CallableProcessingInterceptor.class);
interceptor2.beforeConcurrentHandling(this.asyncWebRequest, task);
interceptor2.preProcess(this.asyncWebRequest, task);
interceptor2.postProcess(this.asyncWebRequest, task, 21);
expectLastCall().andThrow(exception);
......@@ -231,7 +265,7 @@ public class WebAsyncManagerTests {
}
@Test
public void startCallableProcessingWithAsyncTask() {
public void startCallableProcessingWithAsyncTask() throws Exception {
AsyncTaskExecutor executor = createMock(AsyncTaskExecutor.class);
expect(executor.submit((Runnable) notNull())).andReturn(null);
......@@ -251,7 +285,7 @@ public class WebAsyncManagerTests {
}
@Test
public void startCallableProcessingNullInput() {
public void startCallableProcessingNullInput() throws Exception {
try {
this.asyncManager.startCallableProcessing((Callable<?>) null);
fail("Expected exception");
......@@ -268,6 +302,7 @@ public class WebAsyncManagerTests {
String concurrentResult = "abc";
DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult);
interceptor.preProcess(this.asyncWebRequest, deferredResult);
interceptor.postProcess(asyncWebRequest, deferredResult, concurrentResult);
replay(interceptor);
......@@ -284,6 +319,36 @@ public class WebAsyncManagerTests {
verify(this.asyncWebRequest, interceptor);
}
@Test
public void startDeferredResultProcessingBeforeConcurrentHandlingException() throws Exception {
DeferredResult<Integer> deferredResult = new DeferredResult<Integer>();
Exception exception = new Exception();
DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult);
expectLastCall().andThrow(exception);
replay(interceptor);
this.asyncWebRequest.addTimeoutHandler((Runnable) notNull());
this.asyncWebRequest.addCompletionHandler((Runnable) notNull());
replay(this.asyncWebRequest);
this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor);
try {
this.asyncManager.startDeferredResultProcessing(deferredResult);
fail("Expected Exception");
}
catch(Exception success) {
assertEquals(exception, success);
}
assertFalse(this.asyncManager.hasConcurrentResult());
verify(this.asyncWebRequest, interceptor);
}
@Test
public void startDeferredResultProcessingPreProcessException() throws Exception {
......@@ -291,6 +356,7 @@ public class WebAsyncManagerTests {
Exception exception = new Exception();
DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult);
interceptor.preProcess(this.asyncWebRequest, deferredResult);
expectLastCall().andThrow(exception);
replay(interceptor);
......@@ -313,6 +379,7 @@ public class WebAsyncManagerTests {
Exception exception = new Exception();
DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult);
interceptor.preProcess(this.asyncWebRequest, deferredResult);
interceptor.postProcess(this.asyncWebRequest, deferredResult, 25);
expectLastCall().andThrow(exception);
......@@ -330,7 +397,7 @@ public class WebAsyncManagerTests {
}
@Test
public void startDeferredResultProcessingNullInput() {
public void startDeferredResultProcessingNullInput() throws Exception {
try {
this.asyncManager.startDeferredResultProcessing((DeferredResult<?>) null);
fail("Expected exception");
......
......@@ -80,6 +80,7 @@ public class WebAsyncManagerTimeoutTests {
StubCallable callable = new StubCallable();
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, callable);
expect(interceptor.handleTimeout(this.asyncWebRequest, callable)).andReturn(RESULT_NONE);
interceptor.afterCompletion(this.asyncWebRequest, callable);
replay(interceptor);
......@@ -123,6 +124,7 @@ public class WebAsyncManagerTimeoutTests {
StubCallable callable = new StubCallable();
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, callable);
expect(interceptor.handleTimeout(this.asyncWebRequest, callable)).andReturn(22);
replay(interceptor);
......@@ -145,6 +147,7 @@ public class WebAsyncManagerTimeoutTests {
Exception exception = new Exception();
CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, callable);
expect(interceptor.handleTimeout(this.asyncWebRequest, callable)).andThrow(exception);
replay(interceptor);
......@@ -166,6 +169,7 @@ public class WebAsyncManagerTimeoutTests {
DeferredResult<Integer> deferredResult = new DeferredResult<Integer>();
DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class);
interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult);
interceptor.preProcess(this.asyncWebRequest, deferredResult);
expect(interceptor.handleTimeout(this.asyncWebRequest, deferredResult)).andReturn(true);
interceptor.afterCompletion(this.asyncWebRequest, deferredResult);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册