提交 7b3a72f4 编写于 作者: R Rossen Stoyanchev

Warn when SimpleAsyncTaskExecutor is used

Issue: SPR-16203
上级 1b1bc7f5
......@@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
/**
* The central class for managing asynchronous request processing, mainly intended
......@@ -61,6 +63,9 @@ public final class WebAsyncManager {
private static final Object RESULT_NONE = new Object();
private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR =
new SimpleAsyncTaskExecutor(WebHttpHandlerBuilder.class.getSimpleName());
private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
private static final CallableProcessingInterceptor timeoutCallableInterceptor =
......@@ -69,10 +74,12 @@ public final class WebAsyncManager {
private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor =
new TimeoutDeferredResultProcessingInterceptor();
private static Boolean taskExecutorWarning = true;
private AsyncWebRequest asyncWebRequest;
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName());
private AsyncTaskExecutor taskExecutor = DEFAULT_TASK_EXECUTOR;
private volatile Object concurrentResult = RESULT_NONE;
......@@ -277,6 +284,9 @@ public final class WebAsyncManager {
if (executor != null) {
this.taskExecutor = executor;
}
else {
logExecutorWarning();
}
List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(webAsyncTask.getInterceptor());
......@@ -330,6 +340,27 @@ public final class WebAsyncManager {
}
}
@SuppressWarnings("ConstantConditions")
private void logExecutorWarning() {
if (taskExecutorWarning && logger.isWarnEnabled()) {
synchronized (DEFAULT_TASK_EXECUTOR) {
AsyncTaskExecutor executor = this.taskExecutor;
if (taskExecutorWarning &&
(executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor)) {
String executorTypeName = executor.getClass().getSimpleName();
logger.warn("\n!!!\n" +
"An Executor is required to handle java.util.concurrent.Callable return values.\n" +
"Please, configure a TaskExecutor in the MVC config under \"async support\".\n" +
"The " + executorTypeName + " currently in use is not suitable under load.\n" +
"-------------------------------\n" +
"Request URI: '" + formatRequestUri() + "'\n" +
"!!!");
taskExecutorWarning = false;
}
}
}
}
private String formatRequestUri() {
HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class);
return request != null ? request.getRequestURI() : "servlet container";
......
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -115,7 +115,7 @@ public class WebAsyncManagerTests {
verifyDefaultAsyncScenario();
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, task);
verify(interceptor).preProcess(this.asyncWebRequest, task);
verify(interceptor).postProcess(this.asyncWebRequest, task, new Integer(concurrentResult));
verify(interceptor).postProcess(this.asyncWebRequest, task, concurrentResult);
}
@Test
......@@ -161,9 +161,9 @@ public class WebAsyncManagerTests {
assertFalse(this.asyncManager.hasConcurrentResult());
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
verify(this.asyncWebRequest).addTimeoutHandler(notNull());
verify(this.asyncWebRequest).addErrorHandler(notNull());
verify(this.asyncWebRequest).addCompletionHandler(notNull());
}
@Test
......@@ -303,9 +303,9 @@ public class WebAsyncManagerTests {
assertFalse(this.asyncManager.hasConcurrentResult());
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
verify(this.asyncWebRequest).addTimeoutHandler(notNull());
verify(this.asyncWebRequest).addErrorHandler(notNull());
verify(this.asyncWebRequest).addCompletionHandler(notNull());
}
@Test
......@@ -353,7 +353,7 @@ public class WebAsyncManagerTests {
@Test
public void startDeferredResultProcessingNullInput() throws Exception {
try {
this.asyncManager.startDeferredResultProcessing((DeferredResult<?>) null);
this.asyncManager.startDeferredResultProcessing(null);
fail("Expected exception");
}
catch (IllegalArgumentException ex) {
......@@ -368,9 +368,9 @@ public class WebAsyncManagerTests {
@SuppressWarnings("unchecked")
private void verifyDefaultAsyncScenario() {
verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull());
verify(this.asyncWebRequest).addErrorHandler((Consumer<Throwable>) notNull());
verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull());
verify(this.asyncWebRequest).addTimeoutHandler(notNull());
verify(this.asyncWebRequest).addErrorHandler(notNull());
verify(this.asyncWebRequest).addCompletionHandler(notNull());
verify(this.asyncWebRequest).startAsync();
verify(this.asyncWebRequest).dispatch();
}
......
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -24,10 +24,10 @@ import java.util.concurrent.Callable;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.DeferredResultProcessingInterceptor;
import org.springframework.web.context.request.async.WebAsyncTask;
/**
* Helps with configuring options for asynchronous request processing.
......@@ -49,16 +49,15 @@ public class AsyncSupportConfigurer {
/**
* Set the default {@link AsyncTaskExecutor} to use when a controller method
* returns a {@link Callable}. Controller methods can override this default on
* a per-request basis by returning a {@link WebAsyncTask}.
* <p>By default a {@link SimpleAsyncTaskExecutor} instance is used, and it's
* highly recommended to change that default in production since the simple
* executor does not re-use threads.
* <p>As of 5.0 this executor is also used when a controller returns a reactive
* type that does streaming (e.g. "text/event-stream" or
* "application/stream+json") for the blocking writes to the
* {@link javax.servlet.ServletOutputStream}.
* The provided task executor is used to:
* <ol>
* <li>Handle {@link Callable} controller method return values.
* <li>Perform blocking writes when streaming to the response
* through a reactive (e.g. Reactor, RxJava) controller method return value.
* </ol>
* <p>By default only a {@link SimpleAsyncTaskExecutor} is used. However when
* using the above two use cases, it's recommended to configure an executor
* backed by a thread pool such as {@link ThreadPoolTaskExecutor}.
* @param taskExecutor the task executor instance to use by default
*/
public AsyncSupportConfigurer setTaskExecutor(AsyncTaskExecutor taskExecutor) {
......
......@@ -35,6 +35,7 @@ import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.MediaType;
......@@ -79,6 +80,8 @@ class ReactiveTypeHandler {
private final TaskExecutor taskExecutor;
private Boolean taskExecutorWarning;
private final ContentNegotiationManager contentNegotiationManager;
......@@ -92,6 +95,7 @@ class ReactiveTypeHandler {
Assert.notNull(manager, "ContentNegotiationManager is required");
this.reactiveRegistry = registry;
this.taskExecutor = executor;
this.taskExecutorWarning = executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor;
this.contentNegotiationManager = manager;
}
......@@ -127,16 +131,19 @@ class ReactiveTypeHandler {
if (adapter.isMultiValue()) {
if (mediaTypes.stream().anyMatch(MediaType.TEXT_EVENT_STREAM::includes) ||
ServerSentEvent.class.isAssignableFrom(elementClass)) {
logExecutorWarning(returnType);
SseEmitter emitter = new SseEmitter(STREAMING_TIMEOUT_VALUE);
new SseEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
return emitter;
}
if (CharSequence.class.isAssignableFrom(elementClass)) {
logExecutorWarning(returnType);
ResponseBodyEmitter emitter = getEmitter(mediaType.orElse(MediaType.TEXT_PLAIN));
new TextEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
return emitter;
}
if (mediaTypes.stream().anyMatch(MediaType.APPLICATION_STREAM_JSON::includes)) {
logExecutorWarning(returnType);
ResponseBodyEmitter emitter = getEmitter(MediaType.APPLICATION_STREAM_JSON);
new JsonEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
return emitter;
......@@ -171,6 +178,27 @@ class ReactiveTypeHandler {
};
}
@SuppressWarnings("ConstantConditions")
private void logExecutorWarning(MethodParameter returnType) {
if (this.taskExecutorWarning && logger.isWarnEnabled()) {
synchronized (this) {
if (this.taskExecutorWarning) {
String executorTypeName = this.taskExecutor.getClass().getSimpleName();
logger.warn("\n!!!\n" +
"Streaming through a reactive type requires an Executor to write to the response.\n" +
"Please, configure a TaskExecutor in the MVC config under \"async support\".\n" +
"The " + executorTypeName + " currently in use is not suitable under load.\n" +
"-------------------------------\n" +
"Controller:\t" + returnType.getContainingClass().getName() + "\n" +
"Method:\t\t" + returnType.getMethod().getName() + "\n" +
"Returning:\t" + ResolvableType.forMethodParameter(returnType).toString() + "\n" +
"!!!");
this.taskExecutorWarning = false;
}
}
}
}
private abstract static class AbstractEmitterSubscriber implements Subscriber<Object>, Runnable {
......
......@@ -81,7 +81,8 @@ public class ReactiveTypeHandlerTests {
ContentNegotiationManagerFactoryBean factoryBean = new ContentNegotiationManagerFactoryBean();
factoryBean.afterPropertiesSet();
ContentNegotiationManager manager = factoryBean.getObject();
this.handler = new ReactiveTypeHandler(ReactiveAdapterRegistry.getSharedInstance(), new SyncTaskExecutor(), manager);
ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
this.handler = new ReactiveTypeHandler(adapterRegistry, new SyncTaskExecutor(), manager);
resetRequest();
}
......@@ -90,8 +91,8 @@ public class ReactiveTypeHandlerTests {
this.servletResponse = new MockHttpServletResponse();
this.webRequest = new ServletWebRequest(this.servletRequest, this.servletResponse);
AsyncWebRequest asyncWebRequest = new StandardServletAsyncWebRequest(this.servletRequest, this.servletResponse);
WebAsyncUtils.getAsyncManager(this.webRequest).setAsyncWebRequest(asyncWebRequest);
AsyncWebRequest webRequest = new StandardServletAsyncWebRequest(this.servletRequest, this.servletResponse);
WebAsyncUtils.getAsyncManager(this.webRequest).setAsyncWebRequest(webRequest);
this.servletRequest.setAsyncSupported(true);
}
......@@ -122,7 +123,8 @@ public class ReactiveTypeHandlerTests {
// RxJava 1 Single
AtomicReference<SingleEmitter<String>> ref = new AtomicReference<>();
Single<String> single = Single.fromEmitter(ref::set);
testDeferredResultSubscriber(single, Single.class, forClass(String.class), () -> ref.get().onSuccess("foo"), "foo");
testDeferredResultSubscriber(single, Single.class, forClass(String.class),
() -> ref.get().onSuccess("foo"), "foo");
// RxJava 2 Single
AtomicReference<io.reactivex.SingleEmitter<String>> ref2 = new AtomicReference<>();
......
......@@ -3629,14 +3629,13 @@ Spring MVC supports Reactor and RxJava through the
`spring-core` which allows it to adapt from multiple reactive libraries.
====
When streaming to the response via reactive types, Spring MVC supports reactive back
pressure, but still needs to use blocking I/O to perform actual writes. This is done
through the <<mvc-ann-async-configuration-spring-mvc,configured>> MVC `TaskExecutor` on
a separate thread in order to avoid blocking the upstream source (e.g. a `Flux` returned
from the `WebClient`). By default a `SyncTaskExecutor` is used which is not suitable for
production. https://jira.spring.io/browse/SPR-16203[SPR-16203] will provide better
defaults in Spring Framework 5.1. In the mean time please configure the executor through
the <<mvc-ann-async-configuration-spring-mvc,MVC config>>.
For streaming to the response, reactive back pressure is supported, but writes to the
response are still blocking, and are executed on a separate thread through the
<<mvc-ann-async-configuration-spring-mvc,configured>> `TaskExecutor` in order to avoid
blocking the upstream source (e.g. a `Flux` returned from the `WebClient`).
By default `SimpleAsyncTaskExecutor` is used for the blocking writes but that is not
suitable under load. If you plan to stream with a reactive type, please use the
<<mvc-ann-async-configuration-spring-mvc,MVC config>> to configure a task executor.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册