diff --git a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java index 8d12cb8297caa64d4b673bca0273fa4949d6a7a5..cba8f1a4732ab2ee3d33b2f69b4e92b66b9b7841 100644 --- a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java +++ b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java @@ -36,6 +36,13 @@ import org.springframework.core.io.buffer.DataBufferFactory; */ public interface ReactiveHttpOutputMessage extends HttpMessage { + /** + * Return a {@link DataBufferFactory} that can be used for creating the body. + * @return a buffer factory + * @see #writeWith(Publisher) + */ + DataBufferFactory bufferFactory(); + /** * Register an action to be applied just before the message is committed. * @param action the action @@ -45,7 +52,6 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { /** * Use the given {@link Publisher} to write the body of the message to the underlying * HTTP layer. - * * @param body the body content publisher * @return a publisher that indicates completion or error. */ @@ -55,19 +61,11 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { * Use the given {@link Publisher} of {@code Publishers} to write the body of the * message to the underlying HTTP layer, flushing after each * {@code Publisher}. - * * @param body the body content publisher * @return a publisher that indicates completion or error. */ Mono writeAndFlushWith(Publisher> body); - /** - * Returns a {@link DataBufferFactory} that can be used for creating the body. - * @return a buffer factory - * @see #writeWith(Publisher) - */ - DataBufferFactory bufferFactory(); - /** * Indicate that message handling is complete, allowing for any cleanup or * end-of-processing tasks to be performed such as applying header changes diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index b02666f7927947bf80095dd1874a681389067be4..f929f45391a80624753c6eab8333ab88f20ee469 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -46,13 +46,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher { protected final Log logger = LogFactory.getLog(getClass()); - private final AtomicReference state = - new AtomicReference<>(State.UNSUBSCRIBED); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); private final AtomicLong demand = new AtomicLong(); private Subscriber subscriber; + @Override public void subscribe(Subscriber subscriber) { if (this.logger.isTraceEnabled()) { @@ -126,14 +126,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher { protected abstract DataBuffer read() throws IOException; private boolean hasDemand() { - return this.demand.get() > 0; + return (this.demand.get() > 0); } - private boolean changeState(AbstractRequestBodyPublisher.State oldState, - AbstractRequestBodyPublisher.State newState) { + private boolean changeState(State oldState, State newState) { return this.state.compareAndSet(oldState, newState); } + private static final class RequestBodySubscription implements Subscription { private final AbstractRequestBodyPublisher publisher; @@ -158,12 +158,12 @@ abstract class AbstractRequestBodyPublisher implements Publisher { state().cancel(this.publisher); } - private AbstractRequestBodyPublisher.State state() { + private State state() { return this.publisher.state.get(); } - } + /** * Represents a state for the {@link Publisher} to be in. The following figure * indicate the four different states that exist, and the relationships between them. @@ -182,8 +182,8 @@ abstract class AbstractRequestBodyPublisher implements Publisher { * * Refer to the individual states for more information. */ - private enum State { + /** * The initial unsubscribed state. Will respond to {@link * #subscribe(AbstractRequestBodyPublisher, Subscriber)} by @@ -191,12 +191,10 @@ abstract class AbstractRequestBodyPublisher implements Publisher { */ UNSUBSCRIBED { @Override - void subscribe(AbstractRequestBodyPublisher publisher, - Subscriber subscriber) { + void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { Objects.requireNonNull(subscriber); if (publisher.changeState(this, NO_DEMAND)) { - Subscription subscription = new RequestBodySubscription( - publisher); + Subscription subscription = new RequestBodySubscription(publisher); publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); } @@ -205,6 +203,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } } }, + /** * State that gets entered when there is no demand. Responds to {@link * #request(AbstractRequestBodyPublisher, long)} by increasing the demand, @@ -222,6 +221,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } } }, + /** * State that gets entered when there is demand. Responds to * {@link #onDataAvailable(AbstractRequestBodyPublisher)} by @@ -237,15 +237,18 @@ abstract class AbstractRequestBodyPublisher implements Publisher { if (demandAvailable) { publisher.changeState(READING, DEMAND); publisher.checkOnDataAvailable(); - } else { + } + else { publisher.changeState(READING, NO_DEMAND); } - } catch (IOException ex) { + } + catch (IOException ex) { publisher.onError(ex); } } } }, + READING { @Override void request(AbstractRequestBodyPublisher publisher, long n) { @@ -254,34 +257,30 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } } }, + /** * The terminal completed state. Does not respond to any events. */ COMPLETED { - @Override void request(AbstractRequestBodyPublisher publisher, long n) { // ignore } - @Override void cancel(AbstractRequestBodyPublisher publisher) { // ignore } - @Override void onAllDataRead(AbstractRequestBodyPublisher publisher) { // ignore } - @Override void onError(AbstractRequestBodyPublisher publisher, Throwable t) { // ignore } }; - void subscribe(AbstractRequestBodyPublisher publisher, - Subscriber subscriber) { + void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { throw new IllegalStateException(toString()); } @@ -312,6 +311,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } } } - } + } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpRequest.java index 9f8666df1bdfb1e813904dad2101cee10def211f..1853008e947acc95f6514dd9aeb958cf6cce733c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.http.server.reactive; import java.net.URI; @@ -60,13 +61,6 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest { return this.uri; } - /** - * Initialize a URI that represents the request. Invoked lazily on the first - * call to {@link #getURI()} and then cached. - * @throws URISyntaxException - */ - protected abstract URI initUri() throws URISyntaxException; - @Override public MultiValueMap getQueryParams() { if (this.queryParams == null) { @@ -99,12 +93,6 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest { return this.headers; } - /** - * Initialize the headers from the underlying request. Invoked lazily on the - * first call to {@link #getHeaders()} and then cached. - */ - protected abstract HttpHeaders initHeaders(); - @Override public MultiValueMap getCookies() { if (this.cookies == null) { @@ -113,9 +101,24 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest { return this.cookies; } + + /** + * Initialize a URI that represents the request. + *

Invoked lazily on the first call to {@link #getURI()} and then cached. + * @throws URISyntaxException + */ + protected abstract URI initUri() throws URISyntaxException; + + /** + * Initialize the headers from the underlying request. + *

Invoked lazily on the first call to {@link #getHeaders()} and then cached. + */ + protected abstract HttpHeaders initHeaders(); + /** - * Initialize the cookies from the underlying request. Invoked lazily on the - * first access to cookies via {@link #getHeaders()} and then cached. + * Initialize the cookies from the underlying request. + *

Invoked lazily on the first access to cookies via {@link #getHeaders()} + * and then cached. */ protected abstract MultiValueMap initCookies(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index f6ad85ada905d5940e93f04f5f9f3183f0057bb0..88009d805902702a13df2b0f89a68dc3f8cff64c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -36,7 +36,6 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; - /** * Base class for {@link ServerHttpResponse} implementations. * @@ -46,8 +45,6 @@ import org.springframework.util.MultiValueMap; */ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { - private Log logger = LogFactory.getLog(getClass()); - private static final int STATE_NEW = 1; private static final int STATE_COMMITTING = 2; @@ -55,10 +52,9 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { private static final int STATE_COMMITTED = 3; - private final DataBufferFactory dataBufferFactory; - + private final Log logger = LogFactory.getLog(getClass()); - private HttpStatus statusCode; + private final DataBufferFactory dataBufferFactory; private final HttpHeaders headers; @@ -68,6 +64,8 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { private final AtomicInteger state = new AtomicInteger(STATE_NEW); + private HttpStatus statusCode; + public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory) { Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); @@ -83,7 +81,6 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { return this.dataBufferFactory; } - @Override public boolean setStatusCode(HttpStatus statusCode) { Assert.notNull(statusCode); @@ -139,6 +136,11 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { .then(() -> writeAndFlushWithInternal(writePublisher))); } + @Override + public Mono setComplete() { + return applyBeforeCommit(); + } + protected Mono applyBeforeCommit() { Mono mono = Mono.empty(); if (this.state.compareAndSet(STATE_NEW, STATE_COMMITTING)) { @@ -160,6 +162,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { return mono; } + /** * Implement this method to write to the underlying the response. * @param body the publisher to write with @@ -171,8 +174,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { * each {@code Publisher}. * @param body the publisher to write and flush with */ - protected abstract Mono writeAndFlushWithInternal( - Publisher> body); + protected abstract Mono writeAndFlushWithInternal(Publisher> body); /** * Implement this method to write the status code to the underlying response. @@ -192,10 +194,4 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { */ protected abstract void writeCookies(); - - @Override - public Mono setComplete() { - return applyBeforeCommit(); - } - } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index d001cb6f4b919ea46fb728a5c9302a1ec07f6b22..5020038cb278dceb06aa75c7e291ec9f587995d9 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.http.server.reactive; import java.util.function.Function; @@ -42,17 +43,18 @@ public class ChannelSendOperator extends MonoSource { private final Function, Publisher> writeFunction; - public ChannelSendOperator(Publisher source, - Function, Publisher> writeFunction) { + public ChannelSendOperator(Publisher source, Function, Publisher> writeFunction) { super(source); this.writeFunction = writeFunction; } + @Override public void subscribe(Subscriber s) { - source.subscribe(new WriteWithBarrier(s)); + this.source.subscribe(new WriteWithBarrier(s)); } + private class WriteWithBarrier extends Operators.SubscriberAdapter implements Publisher { /** @@ -77,17 +79,14 @@ public class ChannelSendOperator extends MonoSource { /** The actual writeSubscriber vs the downstream completion subscriber */ private Subscriber writeSubscriber; - public WriteWithBarrier(Subscriber subscriber) { super(subscriber); } - @Override protected void doOnSubscribe(Subscription subscription) { super.doOnSubscribe(subscription); - super.upstream() - .request(1); // bypass doRequest + super.upstream().request(1); // bypass doRequest } @Override @@ -156,7 +155,7 @@ public class ChannelSendOperator extends MonoSource { @Override public void subscribe(Subscriber writeSubscriber) { synchronized (this) { - Assert.isNull(this.writeSubscriber, "Only one writeSubscriber supported."); + Assert.isNull(this.writeSubscriber, "Only one writeSubscriber supported"); this.writeSubscriber = writeSubscriber; if (this.error != null || this.completed) { @@ -210,6 +209,7 @@ public class ChannelSendOperator extends MonoSource { } } + private class DownstreamBridge implements Subscriber { private final Subscriber downstream; diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/HttpHandler.java b/spring-web/src/main/java/org/springframework/http/server/reactive/HttpHandler.java index b1952110cae5f70fd6fca3d0f84ec08285ffef55..7788399bb8faf9665087ce9ec5cfb1043f5398a5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/HttpHandler.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/HttpHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,10 +28,9 @@ public interface HttpHandler { /** * Handle the given request and generate a response. - * - * @param request current HTTP request. - * @param response current HTTP response. - * @return {@code Mono} to indicate when request handling is complete. + * @param request current HTTP request + * @param response current HTTP response + * @return {@code Mono} to indicate when request handling is complete */ Mono handle(ServerHttpRequest request, ServerHttpResponse response); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java index 8648004e98e5ab32ab6f700967385913cedfb880..4a59fdcee9e0c6604572ec0cb00f32387ac3797b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java @@ -34,11 +34,9 @@ import reactor.core.publisher.Operators; */ class ResponseBodyWriteResultPublisher implements Publisher { - private static final Log logger = - LogFactory.getLog(ResponseBodyWriteResultPublisher.class); + private static final Log logger = LogFactory.getLog(ResponseBodyWriteResultPublisher.class); - private final AtomicReference state = - new AtomicReference<>(State.UNSUBSCRIBED); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); private Subscriber subscriber; @@ -46,6 +44,7 @@ class ResponseBodyWriteResultPublisher implements Publisher { private volatile Throwable publisherError; + @Override public final void subscribe(Subscriber subscriber) { if (logger.isTraceEnabled()) { @@ -78,13 +77,12 @@ class ResponseBodyWriteResultPublisher implements Publisher { this.state.get().publishError(this, t); } - private static final class ResponseBodyWriteResultSubscription - implements Subscription { + + private static final class ResponseBodyWriteResultSubscription implements Subscription { private final ResponseBodyWriteResultPublisher publisher; - public ResponseBodyWriteResultSubscription( - ResponseBodyWriteResultPublisher publisher) { + public ResponseBodyWriteResultSubscription(ResponseBodyWriteResultPublisher publisher) { this.publisher = publisher; } @@ -107,10 +105,11 @@ class ResponseBodyWriteResultPublisher implements Publisher { private State state() { return this.publisher.state.get(); } - } + private enum State { + UNSUBSCRIBED { @Override void subscribe(ResponseBodyWriteResultPublisher publisher, @@ -132,62 +131,55 @@ class ResponseBodyWriteResultPublisher implements Publisher { throw new IllegalStateException(toString()); } } - @Override void publishComplete(ResponseBodyWriteResultPublisher publisher) { publisher.publisherCompleted = true; } - @Override void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { publisher.publisherError = t; } }, + SUBSCRIBED { @Override void request(ResponseBodyWriteResultPublisher publisher, long n) { Operators.checkRequest(n, publisher.subscriber); } - @Override void publishComplete(ResponseBodyWriteResultPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { publisher.subscriber.onComplete(); } } - @Override void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { if (publisher.changeState(this, COMPLETED)) { publisher.subscriber.onError(t); } } - }, + COMPLETED { @Override void request(ResponseBodyWriteResultPublisher publisher, long n) { // ignore } - @Override void cancel(ResponseBodyWriteResultPublisher publisher) { // ignore } - @Override void publishComplete(ResponseBodyWriteResultPublisher publisher) { // ignore } - @Override void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { // ignore } }; - void subscribe(ResponseBodyWriteResultPublisher publisher, - Subscriber subscriber) { + void subscribe(ResponseBodyWriteResultPublisher publisher, Subscriber subscriber) { throw new IllegalStateException(toString()); } @@ -206,8 +198,6 @@ class ResponseBodyWriteResultPublisher implements Publisher { void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { throw new IllegalStateException(toString()); } - } - } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServerHttpResponse.java index f25fd6248b14d33f7534f380b600c8e9d2a6aefc..8f550b623edf7d04701559480a9addb5c7d46f45 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServerHttpResponse.java @@ -45,9 +45,9 @@ public interface ServerHttpResponse extends ReactiveHttpOutputMessage { */ HttpStatus getStatusCode(); - /** - * Return a mutable map with the cookies to send to the server. - */ + /** + * Return a mutable map with the cookies to send to the server. + */ MultiValueMap getCookies(); /** diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index e64a44997bd7e1889656a0ca5651a02d862e4dd1..de1248347a65ca2269e751cf2c7ff8854dff721c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -47,6 +47,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private static final int DEFAULT_BUFFER_SIZE = 8192; + private static Log logger = LogFactory.getLog(ServletHttpHandlerAdapter.class); private final HttpHandler handler; @@ -57,15 +58,17 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private int bufferSize = DEFAULT_BUFFER_SIZE; + /** * Create a new {@code ServletHttpHandlerAdapter} with the given HTTP handler. * @param handler the handler */ public ServletHttpHandlerAdapter(HttpHandler handler) { - Assert.notNull(handler, "'handler' must not be null"); + Assert.notNull(handler, "HttpHandler must not be null"); this.handler = handler; } + public void setDataBufferFactory(DataBufferFactory dataBufferFactory) { Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); this.dataBufferFactory = dataBufferFactory; @@ -76,26 +79,21 @@ public class ServletHttpHandlerAdapter extends HttpServlet { this.bufferSize = bufferSize; } + @Override - protected void service(HttpServletRequest servletRequest, - HttpServletResponse servletResponse) throws ServletException, IOException { + protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse) + throws ServletException, IOException { AsyncContext asyncContext = servletRequest.startAsync(); - - ServletServerHttpRequest request = - new ServletServerHttpRequest(servletRequest, this.dataBufferFactory, - this.bufferSize); - - ServletServerHttpResponse response = - new ServletServerHttpResponse(servletResponse, this.dataBufferFactory, - this.bufferSize); - - HandlerResultSubscriber resultSubscriber = - new HandlerResultSubscriber(asyncContext); - + ServletServerHttpRequest request = new ServletServerHttpRequest( + servletRequest, this.dataBufferFactory, this.bufferSize); + ServletServerHttpResponse response = new ServletServerHttpResponse( + servletResponse, this.dataBufferFactory, this.bufferSize); + HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(asyncContext); this.handler.handle(request, response).subscribe(resultSubscriber); } + private static class HandlerResultSubscriber implements Subscriber { private final AsyncContext asyncContext; @@ -117,8 +115,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @Override public void onError(Throwable ex) { logger.error("Error from request handling. Completing the request.", ex); - HttpServletResponse response = - (HttpServletResponse) this.asyncContext.getResponse(); + HttpServletResponse response = (HttpServletResponse) this.asyncContext.getResponse(); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); this.asyncContext.complete(); } @@ -129,5 +126,4 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } } - -} \ No newline at end of file +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index c1e49b61e5bbe8b415e3694adacc530c29fa8a53..6ad1848ce47e4bf13031f2991d10d6860192da04 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -49,27 +49,29 @@ import org.springframework.util.StringUtils; */ public class ServletServerHttpRequest extends AbstractServerHttpRequest { - private final Object bodyPublisherMonitor = new Object(); - - private volatile RequestBodyPublisher bodyPublisher; - private final HttpServletRequest request; private final DataBufferFactory dataBufferFactory; private final int bufferSize; + private final Object bodyPublisherMonitor = new Object(); + + private volatile RequestBodyPublisher bodyPublisher; + + public ServletServerHttpRequest(HttpServletRequest request, DataBufferFactory dataBufferFactory, int bufferSize) { - Assert.notNull(request, "'request' must not be null."); - Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); - Assert.isTrue(bufferSize > 0); + Assert.notNull(request, "HttpServletRequest must not be null"); + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null"); + Assert.isTrue(bufferSize > 0, "Buffer size must be higher than 0"); this.request = request; this.dataBufferFactory = dataBufferFactory; this.bufferSize = bufferSize; } + public HttpServletRequest getServletRequest() { return this.request; } @@ -151,7 +153,8 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { synchronized (this.bodyPublisherMonitor) { bodyPublisher = this.bodyPublisher; if (bodyPublisher == null) { - this.bodyPublisher = bodyPublisher = createBodyPublisher(); + bodyPublisher = createBodyPublisher(); + this.bodyPublisher = bodyPublisher; } } } @@ -163,13 +166,13 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { } private RequestBodyPublisher createBodyPublisher() throws IOException { - RequestBodyPublisher bodyPublisher = - new RequestBodyPublisher(request.getInputStream(), this.dataBufferFactory, - this.bufferSize); + RequestBodyPublisher bodyPublisher = new RequestBodyPublisher( + this.request.getInputStream(), this.dataBufferFactory, this.bufferSize); bodyPublisher.registerListener(); return bodyPublisher; } + private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { private final RequestBodyPublisher.RequestBodyReadListener readListener = @@ -183,6 +186,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { public RequestBodyPublisher(ServletInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { + this.inputStream = inputStream; this.dataBufferFactory = dataBufferFactory; this.buffer = new byte[bufferSize]; @@ -216,6 +220,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { return null; } + private class RequestBodyReadListener implements ReadListener { @Override @@ -235,4 +240,5 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { } } } + } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 9b6324f82c4c6f5003f3b3ee93df72bdaec56041..01b7b55ba08a44ea8a031c1868b97c09fd9cba79 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -46,23 +46,22 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); - private volatile ResponseBodyProcessor bodyProcessor; - private final HttpServletResponse response; private final int bufferSize; private volatile boolean flushOnNext; + private volatile ResponseBodyProcessor bodyProcessor; + public ServletServerHttpResponse(HttpServletResponse response, DataBufferFactory dataBufferFactory, int bufferSize) throws IOException { super(dataBufferFactory); - Assert.notNull(response, "'response' must not be null"); - Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); - Assert.isTrue(bufferSize > 0); - + Assert.notNull(response, "HttpServletResponse must not be null"); + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null"); + Assert.isTrue(bufferSize > 0, "Buffer size must be higher than 0"); this.response = response; this.bufferSize = bufferSize; } @@ -159,13 +158,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private final int bufferSize; - public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { this.outputStream = outputStream; this.bufferSize = bufferSize; } - @Override protected boolean isWritePossible() { return this.outputStream.isReady(); @@ -179,13 +176,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } flush(); } - boolean ready = this.outputStream.isReady(); - if (this.logger.isTraceEnabled()) { this.logger.trace("write: " + dataBuffer + " ready: " + ready); } - if (ready) { int total = dataBuffer.readableByteCount(); int written = writeDataBuffer(dataBuffer); @@ -202,21 +196,18 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private int writeDataBuffer(DataBuffer dataBuffer) throws IOException { InputStream input = dataBuffer.asInputStream(); - int bytesWritten = 0; byte[] buffer = new byte[this.bufferSize]; int bytesRead = -1; - while (this.outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) { this.outputStream.write(buffer, 0, bytesRead); bytesWritten += bytesRead; } - return bytesWritten; } - } + private class ResponseBodyWriteListener implements WriteListener { @Override @@ -235,6 +226,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } + private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { @Override @@ -255,7 +247,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } ServletServerHttpResponse.this.flush(); } - } -} \ No newline at end of file +}