提交 383375c3 编写于 作者: V Violeta Georgieva 提交者: Rossen Stoyanchev

Use separate reactive HttpHandler for Tomcat/Jetty

Introduce separate adapters TomcatHttpHandlerAdapter/JettyHttpHandlerAdapter
so that each adapter can use the Tomcat/Jetty APIs for reading/writing
with ByteBuffer.
上级 ea67a637
...@@ -779,6 +779,7 @@ project("spring-web") { ...@@ -779,6 +779,7 @@ project("spring-web") {
optional("javax.xml.ws:jaxws-api:${jaxwsVersion}") optional("javax.xml.ws:jaxws-api:${jaxwsVersion}")
optional("javax.mail:javax.mail-api:${javamailVersion}") optional("javax.mail:javax.mail-api:${javamailVersion}")
optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}") optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
optional("org.apache.tomcat:tomcat-catalina:${tomcatVersion}")
testCompile(project(":spring-context-support")) // for JafMediaTypeFactory testCompile(project(":spring-context-support")) // for JafMediaTypeFactory
testCompile("io.projectreactor.addons:reactor-test") testCompile("io.projectreactor.addons:reactor-test")
testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.1") { testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.1") {
......
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpOutput;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
/**
* Adapt {@link HttpHandler} to an {@link HttpServlet} using Servlet Async
* support and Servlet 3.1 non-blocking I/O. Use Jetty API for writing with
* ByteBuffer.
*
* @author Violeta Georgieva
* @since 5.0
*/
@WebServlet(asyncSupported = true)
public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
public JettyHttpHandlerAdapter(HttpHandler httpHandler) {
super(httpHandler);
}
public JettyHttpHandlerAdapter(Map<String, HttpHandler> handlerMap) {
super(handlerMap);
}
@Override
protected ServerHttpResponse createServletServerHttpResponse(
HttpServletResponse response, AsyncContext asyncContext) throws IOException {
return new JettyServerHttpResponse(
response, asyncContext, getDataBufferFactory(), getBufferSize());
}
private static final class JettyServerHttpResponse extends ServletServerHttpResponse {
public JettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize) throws IOException {
super(response, asyncContext, bufferFactory, bufferSize);
}
@Override
protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = getServletResponse().getOutputStream();
ByteBuffer input = dataBuffer.asByteBuffer();
int len = input.remaining();
if (outputStream.isReady() && len > 0) {
((HttpOutput) outputStream).write(input);
}
return len;
}
}
}
\ No newline at end of file
...@@ -98,16 +98,28 @@ public class ServletHttpHandlerAdapter extends HttpHandlerAdapterSupport impleme ...@@ -98,16 +98,28 @@ public class ServletHttpHandlerAdapter extends HttpHandlerAdapterSupport impleme
// Start async before Read/WriteListener registration // Start async before Read/WriteListener registration
AsyncContext asyncContext = request.startAsync(); AsyncContext asyncContext = request.startAsync();
ServerHttpRequest httpRequest = new ServletServerHttpRequest( ServerHttpRequest httpRequest = createServletServerHttpRequest(
((HttpServletRequest) request), asyncContext, getDataBufferFactory(), getBufferSize()); ((HttpServletRequest) request), asyncContext);
ServerHttpResponse httpResponse = new ServletServerHttpResponse( ServerHttpResponse httpResponse = createServletServerHttpResponse(
((HttpServletResponse) response), asyncContext, getDataBufferFactory(), getBufferSize()); ((HttpServletResponse) response), asyncContext);
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext);
getHttpHandler().handle(httpRequest, httpResponse).subscribe(subscriber); getHttpHandler().handle(httpRequest, httpResponse).subscribe(subscriber);
} }
protected ServerHttpRequest createServletServerHttpRequest(HttpServletRequest request,
AsyncContext asyncContext) throws IOException {
return new ServletServerHttpRequest(
request, asyncContext, getDataBufferFactory(), getBufferSize());
}
protected ServerHttpResponse createServletServerHttpResponse(HttpServletResponse response,
AsyncContext asyncContext) throws IOException {
return new ServletServerHttpResponse(
response, asyncContext, getDataBufferFactory(), getBufferSize());
}
// Other Servlet methods... // Other Servlet methods...
@Override @Override
......
...@@ -32,6 +32,8 @@ import javax.servlet.http.HttpServletRequest; ...@@ -32,6 +32,8 @@ import javax.servlet.http.HttpServletRequest;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpCookie; import org.springframework.http.HttpCookie;
...@@ -58,6 +60,12 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { ...@@ -58,6 +60,12 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
private final Object cookieLock = new Object(); private final Object cookieLock = new Object();
private final DataBufferFactory bufferFactory;
private final byte[] buffer;
protected final Log logger = LogFactory.getLog(getClass());
public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext, public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize) throws IOException { DataBufferFactory bufferFactory, int bufferSize) throws IOException {
...@@ -68,12 +76,14 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { ...@@ -68,12 +76,14 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
Assert.isTrue(bufferSize > 0, "'bufferSize' must be higher than 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be higher than 0");
this.request = request; this.request = request;
this.bufferFactory = bufferFactory;
this.buffer = new byte[bufferSize];
asyncContext.addListener(new RequestAsyncListener()); asyncContext.addListener(new RequestAsyncListener());
// Tomcat expects ReadListener registration on initial thread // Tomcat expects ReadListener registration on initial thread
ServletInputStream inputStream = request.getInputStream(); ServletInputStream inputStream = request.getInputStream();
this.bodyPublisher = new RequestBodyPublisher(inputStream, bufferFactory, bufferSize); this.bodyPublisher = new RequestBodyPublisher(inputStream);
this.bodyPublisher.registerReadListener(); this.bodyPublisher.registerReadListener();
} }
...@@ -169,6 +179,21 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { ...@@ -169,6 +179,21 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.bodyPublisher); return Flux.from(this.bodyPublisher);
} }
protected DataBuffer readDataBuffer() throws IOException {
int read = this.request.getInputStream().read(this.buffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
}
if (read > 0) {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(read);
dataBuffer.write(this.buffer, 0, read);
return dataBuffer;
}
return null;
}
private final class RequestAsyncListener implements AsyncListener { private final class RequestAsyncListener implements AsyncListener {
...@@ -193,21 +218,14 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { ...@@ -193,21 +218,14 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
} }
} }
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> { private class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
private final ServletInputStream inputStream; private final ServletInputStream inputStream;
private final DataBufferFactory bufferFactory;
private final byte[] buffer;
public RequestBodyPublisher(ServletInputStream inputStream, public RequestBodyPublisher(ServletInputStream inputStream) {
DataBufferFactory bufferFactory, int bufferSize) {
this.inputStream = inputStream; this.inputStream = inputStream;
this.bufferFactory = bufferFactory;
this.buffer = new byte[bufferSize];
} }
public void registerReadListener() throws IOException { public void registerReadListener() throws IOException {
...@@ -224,16 +242,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { ...@@ -224,16 +242,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
protected DataBuffer read() throws IOException { protected DataBuffer read() throws IOException {
if (this.inputStream.isReady()) { if (this.inputStream.isReady()) {
int read = this.inputStream.read(this.buffer); return readDataBuffer();
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
}
if (read > 0) {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(read);
dataBuffer.write(this.buffer, 0, read);
return dataBuffer;
}
} }
return null; return null;
} }
......
...@@ -132,6 +132,19 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -132,6 +132,19 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
return processor; return processor;
} }
protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = response.getOutputStream();
InputStream input = dataBuffer.asInputStream();
int bytesWritten = 0;
byte[] buffer = new byte[this.bufferSize];
int bytesRead = -1;
while (outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
bytesWritten += bytesRead;
}
return bytesWritten;
}
private void flush() throws IOException { private void flush() throws IOException {
ServletOutputStream outputStream = this.response.getOutputStream(); ServletOutputStream outputStream = this.response.getOutputStream();
if (outputStream.isReady()) { if (outputStream.isReady()) {
...@@ -215,7 +228,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -215,7 +228,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
protected Processor<? super DataBuffer, Void> createWriteProcessor() { protected Processor<? super DataBuffer, Void> createWriteProcessor() {
try { try {
ServletOutputStream outputStream = response.getOutputStream(); ServletOutputStream outputStream = response.getOutputStream();
bodyProcessor = new ResponseBodyProcessor(outputStream, bufferSize); bodyProcessor = new ResponseBodyProcessor(outputStream);
return bodyProcessor; return bodyProcessor;
} }
catch (IOException ex) { catch (IOException ex) {
...@@ -236,12 +249,9 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -236,12 +249,9 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
private final ServletOutputStream outputStream; private final ServletOutputStream outputStream;
private final int bufferSize;
public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { public ResponseBodyProcessor(ServletOutputStream outputStream) {
this.outputStream = outputStream; this.outputStream = outputStream;
this.bufferSize = bufferSize;
} }
@Override @Override
...@@ -288,18 +298,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -288,18 +298,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
return false; return false;
} }
} }
private int writeDataBuffer(DataBuffer dataBuffer) throws IOException {
InputStream input = dataBuffer.asInputStream();
int bytesWritten = 0;
byte[] buffer = new byte[this.bufferSize];
int bytesRead = -1;
while (this.outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) {
this.outputStream.write(buffer, 0, bytesRead);
bytesWritten += bytesRead;
}
return bytesWritten;
}
} }
} }
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.catalina.connector.CoyoteInputStream;
import org.apache.catalina.connector.CoyoteOutputStream;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
/**
* Adapt {@link HttpHandler} to an {@link HttpServlet} using Servlet Async
* support and Servlet 3.1 non-blocking I/O. Use Tomcat API for
* reading/writing with ByteBuffer.
*
* @author Violeta Georgieva
* @since 5.0
*/
@WebServlet(asyncSupported = true)
public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
public TomcatHttpHandlerAdapter(HttpHandler httpHandler) {
super(httpHandler);
}
public TomcatHttpHandlerAdapter(Map<String, HttpHandler> handlerMap) {
super(handlerMap);
}
@Override
protected ServerHttpRequest createServletServerHttpRequest(
HttpServletRequest request, AsyncContext asyncContext) throws IOException {
return new TomcatServerHttpRequest(
request, asyncContext, getDataBufferFactory(), getBufferSize());
}
@Override
protected ServerHttpResponse createServletServerHttpResponse(
HttpServletResponse response, AsyncContext asyncContext) throws IOException {
return new TomcatServerHttpResponse(
response, asyncContext, getDataBufferFactory(), getBufferSize());
}
private final class TomcatServerHttpRequest extends ServletServerHttpRequest {
public TomcatServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize) throws IOException {
super(request, asyncContext, bufferFactory, bufferSize);
}
@Override
protected DataBuffer readDataBuffer() throws IOException {
DataBuffer buffer = getDataBufferFactory().allocateBuffer(getBufferSize());
ByteBuffer byteBuffer = buffer.asByteBuffer();
byteBuffer.limit(byteBuffer.capacity());
int read = ((CoyoteInputStream) getServletRequest().getInputStream()).read(
byteBuffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
}
if (read > 0) {
return getDataBufferFactory().wrap(byteBuffer);
}
return null;
}
}
private static final class TomcatServerHttpResponse extends ServletServerHttpResponse {
public TomcatServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize) throws IOException {
super(response, asyncContext, bufferFactory, bufferSize);
}
@Override
protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = getServletResponse().getOutputStream();
ByteBuffer input = dataBuffer.asByteBuffer();
int len = input.remaining();
if (outputStream.isReady() && len > 0) {
((CoyoteOutputStream) outputStream).write(input);
}
return len;
}
}
}
\ No newline at end of file
...@@ -22,6 +22,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler; ...@@ -22,6 +22,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.server.reactive.JettyHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServletHttpHandlerAdapter; import org.springframework.http.server.reactive.ServletHttpHandlerAdapter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
...@@ -56,11 +57,11 @@ public class JettyHttpServer extends HttpServerSupport implements HttpServer, In ...@@ -56,11 +57,11 @@ public class JettyHttpServer extends HttpServerSupport implements HttpServer, In
private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() { private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() {
if (getHttpHandlerMap() != null) { if (getHttpHandlerMap() != null) {
return new ServletHttpHandlerAdapter(getHttpHandlerMap()); return new JettyHttpHandlerAdapter(getHttpHandlerMap());
} }
else { else {
Assert.notNull(getHttpHandler()); Assert.notNull(getHttpHandler());
return new ServletHttpHandlerAdapter(getHttpHandler()); return new JettyHttpHandlerAdapter(getHttpHandler());
} }
} }
......
...@@ -24,6 +24,7 @@ import org.apache.catalina.startup.Tomcat; ...@@ -24,6 +24,7 @@ import org.apache.catalina.startup.Tomcat;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.server.reactive.ServletHttpHandlerAdapter; import org.springframework.http.server.reactive.ServletHttpHandlerAdapter;
import org.springframework.http.server.reactive.TomcatHttpHandlerAdapter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
...@@ -75,11 +76,11 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I ...@@ -75,11 +76,11 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I
private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() { private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() {
if (getHttpHandlerMap() != null) { if (getHttpHandlerMap() != null) {
return new ServletHttpHandlerAdapter(getHttpHandlerMap()); return new TomcatHttpHandlerAdapter(getHttpHandlerMap());
} }
else { else {
Assert.notNull(getHttpHandler()); Assert.notNull(getHttpHandler());
return new ServletHttpHandlerAdapter(getHttpHandler()); return new TomcatHttpHandlerAdapter(getHttpHandler());
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册