提交 8eeda746 编写于 作者: R Rossen Stoyanchev

Merge pull request #1194 from violetagg/async-timeout

......@@ -120,7 +120,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
}
private void cancel() {
protected void cancel() {
this.subscription.cancel();
}
......
......@@ -18,6 +18,8 @@ package org.springframework.http.server.reactive;
import java.io.IOException;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
......@@ -89,6 +91,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
servletRequest, this.dataBufferFactory, this.bufferSize);
ServletServerHttpResponse response = new ServletServerHttpResponse(
servletResponse, this.dataBufferFactory, this.bufferSize);
asyncContext.addListener(new ErrorHandlingAsyncListener(request, response));
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(asyncContext);
this.handler.handle(request, response).subscribe(resultSubscriber);
}
......@@ -127,4 +130,47 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
}
private static final class ErrorHandlingAsyncListener implements AsyncListener {
private final ServletServerHttpRequest request;
private final ServletServerHttpResponse response;
public ErrorHandlingAsyncListener(ServletServerHttpRequest request,
ServletServerHttpResponse response) {
this.request = request;
this.response = response;
}
@Override
public void onTimeout(AsyncEvent event) {
Throwable ex = event.getThrowable();
if (ex == null) {
ex = new IllegalStateException("Async operation timeout.");
}
this.request.handleAsyncListenerError(ex);
this.response.handleAsyncListenerError(ex);
}
@Override
public void onError(AsyncEvent event) {
this.request.handleAsyncListenerError(event.getThrowable());
this.response.handleAsyncListenerError(event.getThrowable());
}
@Override
public void onStartAsync(AsyncEvent event) {
// no op
}
@Override
public void onComplete(AsyncEvent event) {
// no op
}
}
}
......@@ -170,6 +170,13 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
}
}
/** Handle a timeout/error callback from the Servlet container */
void handleAsyncListenerError(Throwable ex) {
if (this.bodyPublisher != null) {
this.bodyPublisher.onError(ex);
}
}
private RequestBodyPublisher createBodyPublisher() throws IOException {
RequestBodyPublisher bodyPublisher = new RequestBodyPublisher(
this.request.getInputStream(), this.dataBufferFactory, this.bufferSize);
......
......@@ -54,6 +54,8 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
private volatile ResponseBodyProcessor bodyProcessor;
private volatile ResponseBodyFlushProcessor bodyFlushProcessor;
public ServletServerHttpResponse(HttpServletResponse response,
DataBufferFactory dataBufferFactory, int bufferSize) throws IOException {
......@@ -116,8 +118,9 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
@Override
protected Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor() {
Processor<Publisher<DataBuffer>, Void> processor = new ResponseBodyFlushProcessor();
ResponseBodyFlushProcessor processor = new ResponseBodyFlushProcessor();
registerListener();
bodyFlushProcessor = processor;
return processor;
}
......@@ -151,6 +154,18 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
}
/** Handle a timeout/error callback from the Servlet container */
void handleAsyncListenerError(Throwable ex) {
if (this.bodyFlushProcessor != null) {
this.bodyFlushProcessor.cancel();
this.bodyFlushProcessor.onError(ex);
}
if (this.bodyProcessor != null) {
this.bodyProcessor.cancel();
this.bodyProcessor.onError(ex);
}
}
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册