diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 321795ed8c7b2afdcd156924c7e2026e642a018d..18c67606ff016bcbc4151a66553092d36bd972c3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -17,11 +17,9 @@ package org.springframework.http.server.reactive; import java.io.IOException; -import java.nio.channels.Channel; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import javax.servlet.ReadListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,15 +30,17 @@ import reactor.core.publisher.Operators; /** * Abstract base class for {@code Publisher} implementations that bridge between - * event-listener read APIs and Reactive Streams. Specifically, a base class for - * reading from the HTTP request body with Servlet 3.1 and Undertow as well as - * handling incoming WebSocket messages with JSR-356, Jetty, and Undertow. + * event-listener read APIs and Reactive Streams. + * + *

Specifically a base class for reading from the HTTP request body with + * Servlet 3.1 non-blocking I/O and Undertow XNIO as well as handling incoming + * WebSocket messages with standard Java WebSocket (JSR-356), Jetty, and + * Undertow. * * @author Arjen Poutsma * @author Violeta Georgieva + * @author Rossen Stoyanchev * @since 5.0 - * @see ServletServerHttpRequest - * @see UndertowHttpHandlerAdapter */ public abstract class AbstractListenerReadPublisher implements Publisher { @@ -53,6 +53,8 @@ public abstract class AbstractListenerReadPublisher implements Publisher { private Subscriber subscriber; + // Publisher implementation... + @Override public void subscribe(Subscriber subscriber) { if (this.logger.isTraceEnabled()) { @@ -61,10 +63,11 @@ public abstract class AbstractListenerReadPublisher implements Publisher { this.state.get().subscribe(this, subscriber); } + + // Listener delegation methods... + /** - * Called via a listener interface to indicate that reading is possible. - * @see ReadListener#onDataAvailable() - * @see org.xnio.ChannelListener#handleEvent(Channel) + * Listeners can call this to notify when reading is possible. */ public final void onDataAvailable() { if (this.logger.isTraceEnabled()) { @@ -74,9 +77,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } /** - * Called via a listener interface to indicate that all data has been read. - * @see ReadListener#onAllDataRead() - * @see org.xnio.ChannelListener#handleEvent(Channel) + * Listeners can call this to notify when all data has been read. */ public void onAllDataRead() { if (this.logger.isTraceEnabled()) { @@ -86,9 +87,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } /** - * Called by a listener interface to indicate that as error has occurred. - * @param t the error - * @see ReadListener#onError(Throwable) + * Listeners can call this to notify when a read error has occurred. */ public final void onError(Throwable t) { if (this.logger.isErrorEnabled()) { @@ -97,9 +96,19 @@ public abstract class AbstractListenerReadPublisher implements Publisher { this.state.get().onError(this, t); } + + protected abstract void checkOnDataAvailable(); + /** - * Reads and publishes data from the input. Continues till either there is no - * more demand, or till there is no more data to be read. + * Reads a data from the input, if possible. + * @return the data that was read; or {@code null} + */ + protected abstract T read() throws IOException; + + + /** + * Read and publish data from the input. Continue till there is no more + * demand or there is no more data to be read. * @return {@code true} if there is more demand; {@code false} otherwise */ private boolean readAndPublish() throws IOException { @@ -117,11 +126,10 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } /** - * Concurrent substraction bound to 0 and Long.MAX_VALUE. + * Concurrent subscription bound to 0 and Long.MAX_VALUE. * Any concurrent write will "happen" before this operation. - * * @param sequence current atomic to update - * @param toSub delta to sub + * @param toSub delta to sub * @return value before subscription, 0 or Long.MAX_VALUE */ private static long getAndSub(AtomicLong sequence, long toSub) { @@ -138,16 +146,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { return r; } - - protected abstract void checkOnDataAvailable(); - - /** - * Reads a data from the input, if possible. Returns {@code null} if a data - * could not be read. - * @return the data that was read; or {@code null} - */ - protected abstract T read() throws IOException; - private boolean hasDemand() { return (this.demand.get() > 0); } @@ -161,6 +159,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { private final AbstractListenerReadPublisher publisher; + public ReadSubscription(AbstractListenerReadPublisher publisher) { this.publisher = publisher; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java index 35440503ed63ef30bd56916a2d385ee89c4dda43..78c537e152e3bcfdfb886637d8ec844d17ae62b8 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java @@ -48,18 +48,18 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH } @Override - protected final Mono writeAndFlushWithInternal(Publisher> body) { + protected final Mono writeAndFlushWithInternal( + Publisher> body) { + if (this.writeCalled.compareAndSet(false, true)) { - Processor, Void> bodyProcessor = createBodyFlushProcessor(); + Processor, Void> processor = createBodyFlushProcessor(); return Mono.from(subscriber -> { - body.subscribe(bodyProcessor); - bodyProcessor.subscribe(subscriber); + body.subscribe(processor); + processor.subscribe(subscriber); }); } - else { - return Mono.error(new IllegalStateException( - "writeWith() or writeAndFlushWith() has already been called")); - } + return Mono.error(new IllegalStateException( + "writeWith() or writeAndFlushWith() has already been called")); } /** diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java similarity index 70% rename from spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerFlushProcessor.java rename to spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index b4d259e943e4267f4fbdf7dff71b88b5c856f48c..4053fd30100b045501cee31bc94d42a0c87d32da 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -28,18 +28,16 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; /** - * Abstract base class for {@code Processor} implementations that bridge between - * event-listener APIs and Reactive Streams. Specifically, base class for the - * Servlet 3.1 and Undertow support. + * An alternative to {@link AbstractListenerWriteProcessor} but instead writing + * a {@code Publisher>} with flush boundaries enforces after + * the completion of each nested Publisher. * * @author Arjen Poutsma * @author Violeta Georgieva + * @author Rossen Stoyanchev * @since 5.0 - * @see ServletServerHttpRequest - * @see UndertowHttpHandlerAdapter - * @see ServerHttpResponse#writeAndFlushWith(Publisher) */ -public abstract class AbstractListenerFlushProcessor implements Processor, Void> { +public abstract class AbstractListenerWriteFlushProcessor implements Processor, Void> { protected final Log logger = LogFactory.getLog(getClass()); @@ -52,7 +50,7 @@ public abstract class AbstractListenerFlushProcessor implements Processor implements Processor subscriber) { @@ -96,12 +94,20 @@ public abstract class AbstractListenerFlushProcessor implements Processor createBodyProcessor(); + protected void cancel() { + this.subscription.cancel(); + } + + + /** + * Create a new processor for subscribing to the next flush boundary. + */ + protected abstract Processor createWriteProcessor(); /** - * Flushes the output. + * Flush the output. */ protected abstract void flush() throws IOException; @@ -115,11 +121,6 @@ public abstract class AbstractListenerFlushProcessor implements Processor implements Processor void onSubscribe(AbstractListenerFlushProcessor processor, Subscription subscription) { + public void onSubscribe(AbstractListenerWriteFlushProcessor processor, Subscription subscription) { Objects.requireNonNull(subscription, "Subscription cannot be null"); if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; @@ -142,16 +143,16 @@ public abstract class AbstractListenerFlushProcessor implements Processor void onNext(AbstractListenerFlushProcessor processor, Publisher chunk) { + public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher chunk) { if (processor.changeState(this, RECEIVED)) { - Processor chunkProcessor = processor.createBodyProcessor(); + Processor chunkProcessor = processor.createWriteProcessor(); chunk.subscribe(chunkProcessor); chunkProcessor.subscribe(new WriteSubscriber(processor)); } } @Override - public void onComplete(AbstractListenerFlushProcessor processor) { + public void onComplete(AbstractListenerWriteFlushProcessor processor) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } @@ -160,7 +161,7 @@ public abstract class AbstractListenerFlushProcessor implements Processor void writeComplete(AbstractListenerFlushProcessor processor) { + public void writeComplete(AbstractListenerWriteFlushProcessor processor) { try { processor.flush(); } @@ -182,58 +183,59 @@ public abstract class AbstractListenerFlushProcessor implements Processor void onComplete(AbstractListenerFlushProcessor processor) { + public void onComplete(AbstractListenerWriteFlushProcessor processor) { processor.subscriberCompleted = true; } }, COMPLETED { @Override - public void onNext(AbstractListenerFlushProcessor processor, + public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { // ignore } @Override - public void onError(AbstractListenerFlushProcessor processor, Throwable t) { + public void onError(AbstractListenerWriteFlushProcessor processor, Throwable t) { // ignore } @Override - public void onComplete(AbstractListenerFlushProcessor processor) { + public void onComplete(AbstractListenerWriteFlushProcessor processor) { // ignore } }; - public void onSubscribe(AbstractListenerFlushProcessor processor, Subscription subscription) { + public void onSubscribe(AbstractListenerWriteFlushProcessor processor, Subscription subscription) { subscription.cancel(); } - public void onNext(AbstractListenerFlushProcessor processor, Publisher publisher) { + public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { throw new IllegalStateException(toString()); } - public void onError(AbstractListenerFlushProcessor processor, Throwable ex) { + public void onError(AbstractListenerWriteFlushProcessor processor, Throwable ex) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishError(ex); } } - public void onComplete(AbstractListenerFlushProcessor processor) { + public void onComplete(AbstractListenerWriteFlushProcessor processor) { throw new IllegalStateException(toString()); } - public void writeComplete(AbstractListenerFlushProcessor processor) { + public void writeComplete(AbstractListenerWriteFlushProcessor processor) { // ignore } private static class WriteSubscriber implements Subscriber { - private final AbstractListenerFlushProcessor processor; + private final AbstractListenerWriteFlushProcessor processor; + - public WriteSubscriber(AbstractListenerFlushProcessor processor) { + public WriteSubscriber(AbstractListenerWriteFlushProcessor processor) { this.processor = processor; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index fb3b12a5fcad0f9fc267ce7bccbd7e759b652f89..55ea82967a41f955b384c6ef8883222e9b625213 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -17,15 +17,12 @@ package org.springframework.http.server.reactive; import java.io.IOException; -import java.nio.channels.Channel; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; -import javax.servlet.WriteListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Processor; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -33,16 +30,16 @@ import org.springframework.util.Assert; /** * Abstract base class for {@code Processor} implementations that bridge between - * event-listener write APIs and Reactive Streams. Specifically, base class for - * writing to the HTTP response body with Servlet 3.1 and Undertow support as - * well for writing WebSocket messages with JSR-356, Jetty, and Undertow. + * event-listener write APIs and Reactive Streams. + * + *

Specifically a base class for writing to the HTTP response body with + * Servlet 3.1 non-blocking I/O and Undertow XNIO as well for writing WebSocket + * messages through the Java WebSocket API (JSR-356), Jetty, and Undertow. * * @author Arjen Poutsma * @author Violeta Georgieva + * @author Rossen Stoyanchev * @since 5.0 - * @see ServletServerHttpRequest - * @see UndertowHttpHandlerAdapter - * @see ServerHttpResponse#writeWith(Publisher) */ public abstract class AbstractListenerWriteProcessor implements Processor { @@ -59,7 +56,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor subscriber) { @@ -102,20 +99,25 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor { + + @Override + protected Processor createWriteProcessor() { + try { + ServletOutputStream outputStream = response.getOutputStream(); + bodyProcessor = new ResponseBodyProcessor(outputStream, bufferSize); + return bodyProcessor; + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + @Override + protected void flush() throws IOException { + if (logger.isTraceEnabled()) { + logger.trace("flush"); + } + ServletServerHttpResponse.this.flush(); + } + } + private class ResponseBodyProcessor extends AbstractListenerWriteProcessor { private final ServletOutputStream outputStream; private final int bufferSize; + public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { this.outputStream = outputStream; this.bufferSize = bufferSize; @@ -275,45 +302,4 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } - - private class ResponseBodyWriteListener implements WriteListener { - - @Override - public void onWritePossible() throws IOException { - if (bodyProcessor != null) { - bodyProcessor.onWritePossible(); - } - } - - @Override - public void onError(Throwable ex) { - if (bodyProcessor != null) { - bodyProcessor.cancel(); - bodyProcessor.onError(ex); - } - } - } - - private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor { - - @Override - protected Processor createBodyProcessor() { - try { - bodyProcessor = new ResponseBodyProcessor(outputStream(), bufferSize); - return bodyProcessor; - } - catch (IOException ex) { - throw new UncheckedIOException(ex); - } - } - - @Override - protected void flush() throws IOException { - if (logger.isTraceEnabled()) { - logger.trace("flush"); - } - ServletServerHttpResponse.this.flush(); - } - } - } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 005adbe337e153a986b233fee7654fbc8244ef52..d43c4cb266585c0177cdc9d4cf9e29025c33c207 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -157,6 +157,11 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon this.channel.resumeWrites(); } + @Override + protected boolean isWritePossible() { + return false; + } + @Override protected boolean write(DataBuffer dataBuffer) throws IOException { if (this.byteBuffer == null) { @@ -208,10 +213,10 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } } - private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor { + private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProcessor { @Override - protected Processor createBodyProcessor() { + protected Processor createWriteProcessor() { return UndertowServerHttpResponse.this.createBodyProcessor(); }