提交 d2fce24e 编写于 作者: A Arjen Poutsma

Introduced ResponseBodyWriteResultPublisher

Refactored Publisher<Void> in AbstractResponseBodyProcessor into
separate ResponseBodyWriteResultPublisher.
上级 e906a78e
...@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; ...@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Processor; import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import reactor.core.util.BackpressureUtils;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.FlushingDataBuffer; import org.springframework.core.io.buffer.FlushingDataBuffer;
...@@ -46,80 +45,57 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -46,80 +45,57 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
private final AtomicReference<SubscriberState> subscriberState = private final ResponseBodyWriteResultPublisher publisherDelegate =
new AtomicReference<>(SubscriberState.UNSUBSCRIBED); new ResponseBodyWriteResultPublisher();
private final AtomicReference<PublisherState> publisherState = private final AtomicReference<State> state =
new AtomicReference<>(PublisherState.UNSUBSCRIBED); new AtomicReference<>(State.UNSUBSCRIBED);
private volatile DataBuffer currentBuffer; private volatile DataBuffer currentBuffer;
private volatile boolean subscriberCompleted; private volatile boolean subscriberCompleted;
private volatile boolean publisherCompleted;
private volatile Throwable publisherError;
private Subscription subscription; private Subscription subscription;
private Subscriber<? super Void> subscriber;
// Subscriber // Subscriber
@Override @Override
public final void onSubscribe(Subscription subscription) { public final void onSubscribe(Subscription subscription) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("SUB " + this.subscriberState + " onSubscribe: " + subscription); logger.trace(this.state + " onSubscribe: " + subscription);
} }
this.subscriberState.get().onSubscribe(this, subscription); this.state.get().onSubscribe(this, subscription);
} }
@Override @Override
public final void onNext(DataBuffer dataBuffer) { public final void onNext(DataBuffer dataBuffer) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("SUB " + this.subscriberState + " onNext: " + dataBuffer); logger.trace(this.state + " onNext: " + dataBuffer);
} }
this.subscriberState.get().onNext(this, dataBuffer); this.state.get().onNext(this, dataBuffer);
} }
@Override @Override
public final void onError(Throwable t) { public final void onError(Throwable t) {
if (logger.isErrorEnabled()) { if (logger.isErrorEnabled()) {
logger.error("SUB " + this.subscriberState + " publishError: " + t, t); logger.error(this.state + " onError: " + t, t);
} }
this.subscriberState.get().onError(this, t); this.state.get().onError(this, t);
} }
@Override @Override
public final void onComplete() { public final void onComplete() {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("SUB " + this.subscriberState + " onComplete"); logger.trace(this.state + " onComplete");
} }
this.subscriberState.get().onComplete(this); this.state.get().onComplete(this);
} }
// Publisher // Publisher
@Override @Override
public final void subscribe(Subscriber<? super Void> subscriber) { public final void subscribe(Subscriber<? super Void> subscriber) {
if (logger.isTraceEnabled()) { this.publisherDelegate.subscribe(subscriber);
logger.trace("PUB " + this.publisherState + " subscribe: " + subscriber);
}
this.publisherState.get().subscribe(this, subscriber);
}
private void publishComplete() {
if (logger.isTraceEnabled()) {
logger.trace("PUB " + this.publisherState + " publishComplete");
}
this.publisherState.get().publishComplete(this);
}
private void publishError(Throwable t) {
if (logger.isTraceEnabled()) {
logger.trace("PUB " + this.publisherState + " publishError: " + t);
}
this.publisherState.get().publishError(this, t);
} }
// listener methods // listener methods
...@@ -130,7 +106,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -130,7 +106,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* @see org.xnio.ChannelListener#handleEvent(Channel) * @see org.xnio.ChannelListener#handleEvent(Channel)
*/ */
protected final void onWritePossible() { protected final void onWritePossible() {
this.subscriberState.get().onWritePossible(this); this.state.get().onWritePossible(this);
} }
/** /**
...@@ -140,16 +116,6 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -140,16 +116,6 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
protected void receiveBuffer(DataBuffer dataBuffer) { protected void receiveBuffer(DataBuffer dataBuffer) {
Assert.state(this.currentBuffer == null); Assert.state(this.currentBuffer == null);
this.currentBuffer = dataBuffer; this.currentBuffer = dataBuffer;
checkOnWritePossible();
}
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* or when only partial data from the {@link DataBuffer} was written.
*/
protected void checkOnWritePossible() {
// no-op
} }
/** /**
...@@ -164,6 +130,23 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -164,6 +130,23 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
this.currentBuffer = null; this.currentBuffer = null;
} }
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* or when only partial data from the {@link DataBuffer} was written.
*/
private void writeIfPossible() {
if (isWritePossible()) {
onWritePossible();
}
}
/**
* Called via a listener interface to determine whether writing is possible.
*/
protected boolean isWritePossible() {
return false;
}
/** /**
* Writes the given data buffer to the output, indicating if the entire buffer was * Writes the given data buffer to the output, indicating if the entire buffer was
* written. * written.
...@@ -178,49 +161,8 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -178,49 +161,8 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
*/ */
protected abstract void flush() throws IOException; protected abstract void flush() throws IOException;
/** private boolean changeState(State oldState, State newState) {
* Closes the output. return this.state.compareAndSet(oldState, newState);
*/
protected abstract void close();
private boolean changeSubscriberState(SubscriberState oldState,
SubscriberState newState) {
return this.subscriberState.compareAndSet(oldState, newState);
}
private boolean changePublisherState(PublisherState oldState,
PublisherState newState) {
return this.publisherState.compareAndSet(oldState, newState);
}
private static final class ResponseBodySubscription implements Subscription {
private final AbstractResponseBodyProcessor processor;
public ResponseBodySubscription(AbstractResponseBodyProcessor processor) {
this.processor = processor;
}
@Override
public final void request(long n) {
if (this.processor.logger.isTraceEnabled()) {
this.processor.logger.trace("PUB " + state() + " request: " + n);
}
state().request(this.processor, n);
}
@Override
public final void cancel() {
if (this.processor.logger.isTraceEnabled()) {
this.processor.logger.trace("PUB " + state() + " cancel");
}
state().cancel(this.processor);
}
private PublisherState state() {
return this.processor.publisherState.get();
}
} }
/** /**
...@@ -241,7 +183,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -241,7 +183,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* </pre> * </pre>
* Refer to the individual states for more information. * Refer to the individual states for more information.
*/ */
private enum SubscriberState { private enum State {
/** /**
* The initial unsubscribed state. Will respond to {@code onSubscribe} by * The initial unsubscribed state. Will respond to {@code onSubscribe} by
...@@ -253,7 +195,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -253,7 +195,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
void onSubscribe(AbstractResponseBodyProcessor processor, void onSubscribe(AbstractResponseBodyProcessor processor,
Subscription subscription) { Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null"); Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeSubscriberState(this, REQUESTED)) { if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription; processor.subscription = subscription;
subscription.request(1); subscription.request(1);
} }
...@@ -271,17 +213,17 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -271,17 +213,17 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
REQUESTED { REQUESTED {
@Override @Override
void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
if (processor.changeSubscriberState(this, RECEIVED)) { if (processor.changeState(this, RECEIVED)) {
processor.receiveBuffer(dataBuffer); processor.receiveBuffer(dataBuffer);
processor.writeIfPossible();
} }
} }
@Override @Override
void onComplete(AbstractResponseBodyProcessor processor) { void onComplete(AbstractResponseBodyProcessor processor) {
if (processor.changeSubscriberState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.subscriberCompleted = true; processor.subscriberCompleted = true;
processor.close(); processor.publisherDelegate.publishComplete();
processor.publishComplete();
} }
} }
}, },
...@@ -297,7 +239,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -297,7 +239,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
RECEIVED { RECEIVED {
@Override @Override
void onWritePossible(AbstractResponseBodyProcessor processor) { void onWritePossible(AbstractResponseBodyProcessor processor) {
if (processor.changeSubscriberState(this, WRITING)) { if (processor.changeState(this, WRITING)) {
DataBuffer dataBuffer = processor.currentBuffer; DataBuffer dataBuffer = processor.currentBuffer;
try { try {
boolean writeCompleted = processor.write(dataBuffer); boolean writeCompleted = processor.write(dataBuffer);
...@@ -307,18 +249,17 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -307,18 +249,17 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
} }
processor.releaseBuffer(); processor.releaseBuffer();
if (!processor.subscriberCompleted) { if (!processor.subscriberCompleted) {
processor.changeSubscriberState(WRITING, REQUESTED); processor.changeState(WRITING, REQUESTED);
processor.subscription.request(1); processor.subscription.request(1);
} }
else { else {
processor.changeSubscriberState(WRITING, COMPLETED); processor.changeState(WRITING, COMPLETED);
processor.close(); processor.publisherDelegate.publishComplete();
processor.publishComplete();
} }
} }
else { else {
processor.changeSubscriberState(WRITING, RECEIVED); processor.changeState(WRITING, RECEIVED);
processor.checkOnWritePossible(); processor.writeIfPossible();
} }
} }
catch (IOException ex) { catch (IOException ex) {
...@@ -371,115 +312,17 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -371,115 +312,17 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
} }
void onError(AbstractResponseBodyProcessor processor, Throwable t) { void onError(AbstractResponseBodyProcessor processor, Throwable t) {
if (processor.changeSubscriberState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.publishError(t); processor.publisherDelegate.publishError(t);
} }
} }
void onComplete(AbstractResponseBodyProcessor processor) { void onComplete(AbstractResponseBodyProcessor processor) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
void onWritePossible(AbstractResponseBodyProcessor processor) { void onWritePossible(AbstractResponseBodyProcessor processor) {
// ignore // ignore
} }
}
private enum PublisherState {
UNSUBSCRIBED {
@Override
void subscribe(AbstractResponseBodyProcessor processor,
Subscriber<? super Void> subscriber) {
Objects.requireNonNull(subscriber);
if (processor.changePublisherState(this, SUBSCRIBED)) {
Subscription subscription = new ResponseBodySubscription(processor);
processor.subscriber = subscriber;
subscriber.onSubscribe(subscription);
if (processor.publisherCompleted) {
processor.publishComplete();
}
else if (processor.publisherError != null) {
processor.publishError(processor.publisherError);
}
}
else {
throw new IllegalStateException(toString());
}
}
@Override
void publishComplete(AbstractResponseBodyProcessor processor) {
processor.publisherCompleted = true;
}
@Override
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
processor.publisherError = t;
}
},
SUBSCRIBED {
@Override
void request(AbstractResponseBodyProcessor processor, long n) {
BackpressureUtils.checkRequest(n, processor.subscriber);
}
@Override
void publishComplete(AbstractResponseBodyProcessor processor) {
if (processor.changePublisherState(this, COMPLETED)) {
processor.subscriber.onComplete();
}
}
@Override
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
if (processor.changePublisherState(this, COMPLETED)) {
processor.subscriber.onError(t);
}
}
},
COMPLETED {
@Override
void request(AbstractResponseBodyProcessor processor, long n) {
// ignore
}
@Override
void cancel(AbstractResponseBodyProcessor processor) {
// ignore
}
@Override
void publishComplete(AbstractResponseBodyProcessor processor) {
// ignore
}
@Override
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
// ignore
}
};
void subscribe(AbstractResponseBodyProcessor processor,
Subscriber<? super Void> subscriber) {
throw new IllegalStateException(toString());
}
void request(AbstractResponseBodyProcessor processor, long n) {
throw new IllegalStateException(toString());
}
void cancel(AbstractResponseBodyProcessor processor) {
processor.changePublisherState(this, COMPLETED);
}
void publishComplete(AbstractResponseBodyProcessor processor) {
throw new IllegalStateException(toString());
}
void publishError(AbstractResponseBodyProcessor processor, Throwable t) {
throw new IllegalStateException(toString());
}
} }
......
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.util.BackpressureUtils;
/**
* Publisher returned from {@link ServerHttpResponse#writeWith(Publisher)}.
* @author Arjen Poutsma
*/
class ResponseBodyWriteResultPublisher implements Publisher<Void> {
private static final Log logger =
LogFactory.getLog(ResponseBodyWriteResultPublisher.class);
private final AtomicReference<State> state =
new AtomicReference<>(State.UNSUBSCRIBED);
private Subscriber<? super Void> subscriber;
private volatile boolean publisherCompleted;
private volatile Throwable publisherError;
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " subscribe: " + subscriber);
}
this.state.get().subscribe(this, subscriber);
}
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
/**
* Publishes the complete signal to the subscriber of this publisher.
*/
public void publishComplete() {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " publishComplete");
}
this.state.get().publishComplete(this);
}
/**
* Publishes the given error signal to the subscriber of this publisher.
*/
public void publishError(Throwable t) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " publishError: " + t);
}
this.state.get().publishError(this, t);
}
private static final class ResponseBodyWriteResultSubscription
implements Subscription {
private final ResponseBodyWriteResultPublisher publisher;
public ResponseBodyWriteResultSubscription(
ResponseBodyWriteResultPublisher publisher) {
this.publisher = publisher;
}
@Override
public final void request(long n) {
if (logger.isTraceEnabled()) {
logger.trace(state() + " request: " + n);
}
state().request(this.publisher, n);
}
@Override
public final void cancel() {
if (logger.isTraceEnabled()) {
logger.trace(state() + " cancel");
}
state().cancel(this.publisher);
}
private State state() {
return this.publisher.state.get();
}
}
private enum State {
UNSUBSCRIBED {
@Override
void subscribe(ResponseBodyWriteResultPublisher publisher,
Subscriber<? super Void> subscriber) {
Objects.requireNonNull(subscriber);
if (publisher.changeState(this, SUBSCRIBED)) {
Subscription subscription =
new ResponseBodyWriteResultSubscription(publisher);
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
if (publisher.publisherCompleted) {
publisher.publishComplete();
}
else if (publisher.publisherError != null) {
publisher.publishError(publisher.publisherError);
}
}
else {
throw new IllegalStateException(toString());
}
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
publisher.publisherCompleted = true;
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
publisher.publisherError = t;
}
},
SUBSCRIBED {
@Override
void request(ResponseBodyWriteResultPublisher publisher, long n) {
BackpressureUtils.checkRequest(n, publisher.subscriber);
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.subscriber.onComplete();
}
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
publisher.subscriber.onError(t);
}
}
},
COMPLETED {
@Override
void request(ResponseBodyWriteResultPublisher publisher, long n) {
// ignore
}
@Override
void cancel(ResponseBodyWriteResultPublisher publisher) {
// ignore
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
// ignore
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
// ignore
}
};
void subscribe(ResponseBodyWriteResultPublisher publisher,
Subscriber<? super Void> subscriber) {
throw new IllegalStateException(toString());
}
void request(ResponseBodyWriteResultPublisher publisher, long n) {
throw new IllegalStateException(toString());
}
void cancel(ResponseBodyWriteResultPublisher publisher) {
publisher.changeState(this, COMPLETED);
}
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
throw new IllegalStateException(toString());
}
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
throw new IllegalStateException(toString());
}
}
}
...@@ -248,14 +248,13 @@ public class ServletHttpHandlerAdapter extends HttpServlet { ...@@ -248,14 +248,13 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
} }
@Override @Override
protected void checkOnWritePossible() { protected boolean isWritePossible() {
try { try {
if (outputStream().isReady()) { return outputStream().isReady();
onWritePossible();
}
} }
catch (IOException ex) { catch (IOException ex) {
onError(ex); onError(ex);
return false;
} }
} }
...@@ -307,11 +306,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { ...@@ -307,11 +306,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
} }
@Override
protected void close() {
this.synchronizer.writeComplete();
}
private int writeDataBuffer(DataBuffer dataBuffer) throws IOException { private int writeDataBuffer(DataBuffer dataBuffer) throws IOException {
InputStream input = dataBuffer.asInputStream(); InputStream input = dataBuffer.asInputStream();
ServletOutputStream output = outputStream(); ServletOutputStream output = outputStream();
......
...@@ -30,8 +30,6 @@ import io.undertow.server.handlers.CookieImpl; ...@@ -30,8 +30,6 @@ import io.undertow.server.handlers.CookieImpl;
import io.undertow.util.HttpString; import io.undertow.util.HttpString;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.xnio.ChannelListener; import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import org.xnio.channels.StreamSinkChannel; import org.xnio.channels.StreamSinkChannel;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
...@@ -206,23 +204,6 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse ...@@ -206,23 +204,6 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse
this.byteBuffer = null; this.byteBuffer = null;
} }
@Override
protected void close() {
try {
this.responseChannel.shutdownWrites();
if (!this.responseChannel.flush()) {
this.responseChannel.getWriteSetter().set(ChannelListeners
.flushingChannelListener(
o -> IoUtils.safeClose(this.responseChannel),
ChannelListeners.closingChannelExceptionHandler()));
this.responseChannel.resumeWrites();
}
}
catch (IOException ignored) {
}
}
private class WriteListener implements ChannelListener<StreamSinkChannel> { private class WriteListener implements ChannelListener<StreamSinkChannel> {
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册