提交 257318eb 编写于 作者: R Rossen Stoyanchev

Minor refactoring + polish in server reactive package

Renamed:
AbstractListenerFlushProcessor -> AbstractListenerWriteFlushProcessor
上级 4738a61e
......@@ -17,11 +17,9 @@
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ReadListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -32,15 +30,17 @@ import reactor.core.publisher.Operators;
/**
* Abstract base class for {@code Publisher} implementations that bridge between
* event-listener read APIs and Reactive Streams. Specifically, a base class for
* reading from the HTTP request body with Servlet 3.1 and Undertow as well as
* handling incoming WebSocket messages with JSR-356, Jetty, and Undertow.
* event-listener read APIs and Reactive Streams.
*
* <p>Specifically a base class for reading from the HTTP request body with
* Servlet 3.1 non-blocking I/O and Undertow XNIO as well as handling incoming
* WebSocket messages with standard Java WebSocket (JSR-356), Jetty, and
* Undertow.
*
* @author Arjen Poutsma
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
*/
public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
......@@ -53,6 +53,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
private Subscriber<? super T> subscriber;
// Publisher implementation...
@Override
public void subscribe(Subscriber<? super T> subscriber) {
if (this.logger.isTraceEnabled()) {
......@@ -61,10 +63,11 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
this.state.get().subscribe(this, subscriber);
}
// Listener delegation methods...
/**
* Called via a listener interface to indicate that reading is possible.
* @see ReadListener#onDataAvailable()
* @see org.xnio.ChannelListener#handleEvent(Channel)
* Listeners can call this to notify when reading is possible.
*/
public final void onDataAvailable() {
if (this.logger.isTraceEnabled()) {
......@@ -74,9 +77,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
/**
* Called via a listener interface to indicate that all data has been read.
* @see ReadListener#onAllDataRead()
* @see org.xnio.ChannelListener#handleEvent(Channel)
* Listeners can call this to notify when all data has been read.
*/
public void onAllDataRead() {
if (this.logger.isTraceEnabled()) {
......@@ -86,9 +87,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
/**
* Called by a listener interface to indicate that as error has occurred.
* @param t the error
* @see ReadListener#onError(Throwable)
* Listeners can call this to notify when a read error has occurred.
*/
public final void onError(Throwable t) {
if (this.logger.isErrorEnabled()) {
......@@ -97,9 +96,19 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
this.state.get().onError(this, t);
}
protected abstract void checkOnDataAvailable();
/**
* Reads and publishes data from the input. Continues till either there is no
* more demand, or till there is no more data to be read.
* Reads a data from the input, if possible.
* @return the data that was read; or {@code null}
*/
protected abstract T read() throws IOException;
/**
* Read and publish data from the input. Continue till there is no more
* demand or there is no more data to be read.
* @return {@code true} if there is more demand; {@code false} otherwise
*/
private boolean readAndPublish() throws IOException {
......@@ -117,9 +126,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
/**
* Concurrent substraction bound to 0 and Long.MAX_VALUE.
* Concurrent subscription bound to 0 and Long.MAX_VALUE.
* Any concurrent write will "happen" before this operation.
*
* @param sequence current atomic to update
* @param toSub delta to sub
* @return value before subscription, 0 or Long.MAX_VALUE
......@@ -138,16 +146,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
return r;
}
protected abstract void checkOnDataAvailable();
/**
* Reads a data from the input, if possible. Returns {@code null} if a data
* could not be read.
* @return the data that was read; or {@code null}
*/
protected abstract T read() throws IOException;
private boolean hasDemand() {
return (this.demand.get() > 0);
}
......@@ -161,6 +159,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
private final AbstractListenerReadPublisher<?> publisher;
public ReadSubscription(AbstractListenerReadPublisher<?> publisher) {
this.publisher = publisher;
}
......
......@@ -48,19 +48,19 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
}
@Override
protected final Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body) {
protected final Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
if (this.writeCalled.compareAndSet(false, true)) {
Processor<? super Publisher<? extends DataBuffer>, Void> bodyProcessor = createBodyFlushProcessor();
Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
return Mono.from(subscriber -> {
body.subscribe(bodyProcessor);
bodyProcessor.subscribe(subscriber);
body.subscribe(processor);
processor.subscribe(subscriber);
});
}
else {
return Mono.error(new IllegalStateException(
"writeWith() or writeAndFlushWith() has already been called"));
}
}
/**
* Abstract template method to create a {@code Processor<Publisher<DataBuffer>, Void>}
......
......@@ -28,18 +28,16 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* Abstract base class for {@code Processor} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the
* Servlet 3.1 and Undertow support.
* An alternative to {@link AbstractListenerWriteProcessor} but instead writing
* a {@code Publisher<Publisher<T>>} with flush boundaries enforces after
* the completion of each nested Publisher.
*
* @author Arjen Poutsma
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse#writeAndFlushWith(Publisher)
*/
public abstract class AbstractListenerFlushProcessor<T> implements Processor<Publisher<? extends T>, Void> {
public abstract class AbstractListenerWriteFlushProcessor<T> implements Processor<Publisher<? extends T>, Void> {
protected final Log logger = LogFactory.getLog(getClass());
......@@ -52,7 +50,7 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
private Subscription subscription;
// Subscriber
// Subscriber implementation...
@Override
public final void onSubscribe(Subscription subscription) {
......@@ -87,7 +85,7 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
}
// Publisher
// Publisher implementation...
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
......@@ -96,12 +94,20 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
/**
* Creates a new processor for subscribing to a body chunk.
* Listeners can call this method to cancel further writing.
*/
protected abstract Processor<? super T, Void> createBodyProcessor();
protected void cancel() {
this.subscription.cancel();
}
/**
* Create a new processor for subscribing to the next flush boundary.
*/
protected abstract Processor<? super T, Void> createWriteProcessor();
/**
* Flushes the output.
* Flush the output.
*/
protected abstract void flush() throws IOException;
......@@ -115,11 +121,6 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
logger.trace(this.state + " writeComplete");
}
this.state.get().writeComplete(this);
}
protected void cancel() {
this.subscription.cancel();
}
......@@ -128,7 +129,7 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
UNSUBSCRIBED {
@Override
public <T> void onSubscribe(AbstractListenerFlushProcessor<T> processor, Subscription subscription) {
public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> processor, Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription;
......@@ -142,16 +143,16 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
REQUESTED {
@Override
public <T> void onNext(AbstractListenerFlushProcessor<T> processor, Publisher<? extends T> chunk) {
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> chunk) {
if (processor.changeState(this, RECEIVED)) {
Processor<? super T, Void> chunkProcessor = processor.createBodyProcessor();
Processor<? super T, Void> chunkProcessor = processor.createWriteProcessor();
chunk.subscribe(chunkProcessor);
chunkProcessor.subscribe(new WriteSubscriber(processor));
}
}
@Override
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
......@@ -160,7 +161,7 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
RECEIVED {
@Override
public <T> void writeComplete(AbstractListenerFlushProcessor<T> processor) {
public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) {
try {
processor.flush();
}
......@@ -182,58 +183,59 @@ public abstract class AbstractListenerFlushProcessor<T> implements Processor<Pub
}
@Override
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
processor.subscriberCompleted = true;
}
},
COMPLETED {
@Override
public <T> void onNext(AbstractListenerFlushProcessor<T> processor,
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor,
Publisher<? extends T> publisher) {
// ignore
}
@Override
public <T> void onError(AbstractListenerFlushProcessor<T> processor, Throwable t) {
public <T> void onError(AbstractListenerWriteFlushProcessor<T> processor, Throwable t) {
// ignore
}
@Override
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
// ignore
}
};
public <T> void onSubscribe(AbstractListenerFlushProcessor<T> processor, Subscription subscription) {
public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> processor, Subscription subscription) {
subscription.cancel();
}
public <T> void onNext(AbstractListenerFlushProcessor<T> processor, Publisher<? extends T> publisher) {
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
throw new IllegalStateException(toString());
}
public <T> void onError(AbstractListenerFlushProcessor<T> processor, Throwable ex) {
public <T> void onError(AbstractListenerWriteFlushProcessor<T> processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishError(ex);
}
}
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
throw new IllegalStateException(toString());
}
public <T> void writeComplete(AbstractListenerFlushProcessor<T> processor) {
public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) {
// ignore
}
private static class WriteSubscriber implements Subscriber<Void> {
private final AbstractListenerFlushProcessor<?> processor;
private final AbstractListenerWriteFlushProcessor<?> processor;
public WriteSubscriber(AbstractListenerFlushProcessor<?> processor) {
public WriteSubscriber(AbstractListenerWriteFlushProcessor<?> processor) {
this.processor = processor;
}
......
......@@ -17,15 +17,12 @@
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.WriteListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
......@@ -33,16 +30,16 @@ import org.springframework.util.Assert;
/**
* Abstract base class for {@code Processor} implementations that bridge between
* event-listener write APIs and Reactive Streams. Specifically, base class for
* writing to the HTTP response body with Servlet 3.1 and Undertow support as
* well for writing WebSocket messages with JSR-356, Jetty, and Undertow.
* event-listener write APIs and Reactive Streams.
*
* <p>Specifically a base class for writing to the HTTP response body with
* Servlet 3.1 non-blocking I/O and Undertow XNIO as well for writing WebSocket
* messages through the Java WebSocket API (JSR-356), Jetty, and Undertow.
*
* @author Arjen Poutsma
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse#writeWith(Publisher)
*/
public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> {
......@@ -59,7 +56,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private Subscription subscription;
// Subscriber
// Subscriber implementation...
@Override
public final void onSubscribe(Subscription subscription) {
......@@ -94,7 +91,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
// Publisher
// Publisher implementation...
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
......@@ -102,20 +99,25 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
// listener methods
// Listener delegation methods...
/**
* Called via a listener interface to indicate that writing is possible.
* @see WriteListener#onWritePossible()
* @see org.xnio.ChannelListener#handleEvent(Channel)
* Listeners can call this to notify when writing is possible.
*/
public final void onWritePossible() {
this.state.get().onWritePossible(this);
}
/**
* Called when a data is received via {@link Subscriber#onNext(Object)}
* @param data the data that was received.
* Listeners can call this method to cancel further writing.
*/
public void cancel() {
this.subscription.cancel();
}
/**
* Called when a data item is received via {@link Subscriber#onNext(Object)}
*/
protected void receiveData(T data) {
Assert.state(this.currentData == null);
......@@ -123,46 +125,39 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
/**
* Called when the current data should be released.
* Called when the current received data item can be released.
*/
protected abstract void releaseData();
protected abstract boolean isDataEmpty(T data);
/**
* Called when a data is received via {@link Subscriber#onNext(Object)}
* or when only partial data was written.
* Whether the given data item contains any actual data to be processed.
*/
private void writeIfPossible() {
if (isWritePossible()) {
onWritePossible();
}
}
protected abstract boolean isDataEmpty(T data);
/**
* Called via a listener interface to determine whether writing is possible.
* Whether writing is possible.
*/
protected boolean isWritePossible() {
return false;
}
protected abstract boolean isWritePossible();
/**
* Writes the given data to the output, indicating if the entire data was
* written.
* Writes the given data to the output.
* @param data the data to write
* @return {@code true} if the data was fully written and a new data
* can be requested; {@code false} otherwise
* @return whether the data was fully written (true)and new data can be
* requested or otherwise (false)
*/
protected abstract boolean write(T data) throws IOException;
public void cancel() {
this.subscription.cancel();
}
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
private void writeIfPossible() {
if (isWritePossible()) {
onWritePossible();
}
}
/**
* Represents a state for the {@link Subscriber} to be in. The following figure
......
......@@ -48,17 +48,15 @@ import org.springframework.util.Assert;
*/
public class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener();
private final HttpServletResponse response;
private final int bufferSize;
private volatile boolean flushOnNext;
private volatile ResponseBodyFlushProcessor bodyFlushProcessor;
private volatile ResponseBodyProcessor bodyProcessor;
private volatile ResponseBodyFlushProcessor bodyFlushProcessor;
private volatile boolean flushOnNext;
public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
......@@ -76,7 +74,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
asyncContext.addListener(new ResponseAsyncListener());
// Tomcat expects WriteListener registration on initial thread
registerWriteListener();
response.getOutputStream().setWriteListener(new ResponseBodyWriteListener());
}
......@@ -134,21 +132,8 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
return processor;
}
private void registerWriteListener() {
try {
outputStream().setWriteListener(this.writeListener);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
private ServletOutputStream outputStream() throws IOException {
return this.response.getOutputStream();
}
private void flush() throws IOException {
ServletOutputStream outputStream = outputStream();
ServletOutputStream outputStream = this.response.getOutputStream();
if (outputStream.isReady()) {
try {
outputStream.flush();
......@@ -206,12 +191,54 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
}
private class ResponseBodyWriteListener implements WriteListener {
@Override
public void onWritePossible() throws IOException {
if (bodyProcessor != null) {
bodyProcessor.onWritePossible();
}
}
@Override
public void onError(Throwable ex) {
if (bodyProcessor != null) {
bodyProcessor.cancel();
bodyProcessor.onError(ex);
}
}
}
private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProcessor<DataBuffer> {
@Override
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
try {
ServletOutputStream outputStream = response.getOutputStream();
bodyProcessor = new ResponseBodyProcessor(outputStream, bufferSize);
return bodyProcessor;
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
protected void flush() throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("flush");
}
ServletServerHttpResponse.this.flush();
}
}
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
private final ServletOutputStream outputStream;
private final int bufferSize;
public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) {
this.outputStream = outputStream;
this.bufferSize = bufferSize;
......@@ -275,45 +302,4 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
}
private class ResponseBodyWriteListener implements WriteListener {
@Override
public void onWritePossible() throws IOException {
if (bodyProcessor != null) {
bodyProcessor.onWritePossible();
}
}
@Override
public void onError(Throwable ex) {
if (bodyProcessor != null) {
bodyProcessor.cancel();
bodyProcessor.onError(ex);
}
}
}
private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor<DataBuffer> {
@Override
protected Processor<? super DataBuffer, Void> createBodyProcessor() {
try {
bodyProcessor = new ResponseBodyProcessor(outputStream(), bufferSize);
return bodyProcessor;
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
protected void flush() throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("flush");
}
ServletServerHttpResponse.this.flush();
}
}
}
......@@ -157,6 +157,11 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
this.channel.resumeWrites();
}
@Override
protected boolean isWritePossible() {
return false;
}
@Override
protected boolean write(DataBuffer dataBuffer) throws IOException {
if (this.byteBuffer == null) {
......@@ -208,10 +213,10 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
}
}
private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor<DataBuffer> {
private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProcessor<DataBuffer> {
@Override
protected Processor<? super DataBuffer, Void> createBodyProcessor() {
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
return UndertowServerHttpResponse.this.createBodyProcessor();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册