提交 11ed847a 编写于 作者: A Arjen Poutsma

AbstractRequestBodyPublisher improvements

Reactored Servlet 3.1 and Undertow request support
(AbstractResponseBodySubscriber) to use an internal state machine,
making thread-safity a lot easier.
上级 3a681fba
......@@ -16,16 +16,21 @@
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ReadListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.util.BackpressureUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.Assert;
/**
* Abstract base class for {@code Publisher} implementations that bridge between
......@@ -38,171 +43,287 @@ import org.springframework.util.Assert;
*/
abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
private ResponseBodySubscription subscription;
protected final Log logger = LogFactory.getLog(getClass());
private volatile boolean stalled;
private final AtomicReference<State> state =
new AtomicReference<>(State.UNSUBSCRIBED);
private final AtomicLong demand = new AtomicLong();
private Subscriber<? super DataBuffer> subscriber;
@Override
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
Objects.requireNonNull(subscriber);
Assert.state(this.subscription == null, "Only a single subscriber allowed");
this.subscription = new ResponseBodySubscription(subscriber);
subscriber.onSubscribe(this.subscription);
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " subscribe: " + subscriber);
}
this.state.get().subscribe(this, subscriber);
}
/**
* Publishes the given signal to the subscriber.
* @param dataBuffer the signal to publish
* @see Subscriber#onNext(Object)
* Called via a listener interface to indicate that reading is possible.
* @see ReadListener#onDataAvailable()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void publishOnNext(DataBuffer dataBuffer) {
Assert.state(this.subscription != null);
this.subscription.publishOnNext(dataBuffer);
protected final void onDataAvailable() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onDataAvailable");
}
this.state.get().onDataAvailable(this);
}
/**
* Publishes the given error to the subscriber.
* @param t the error to publish
* @see Subscriber#onError(Throwable)
* Called via a listener interface to indicate that all data has been read.
* @see ReadListener#onAllDataRead()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void publishOnError(Throwable t) {
if (this.subscription != null) {
this.subscription.publishOnError(t);
protected final void onAllDataRead() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onAllDataRead");
}
this.state.get().onAllDataRead(this);
}
/**
* Publishes the complete signal to the subscriber.
* @see Subscriber#onComplete()
* Called by a listener interface to indicate that as error has occured.
* @param t the error
* @see ReadListener#onError(Throwable)
*/
protected final void publishOnComplete() {
if (this.subscription != null) {
this.subscription.publishOnComplete();
protected final void onError(Throwable t) {
if (this.logger.isErrorEnabled()) {
this.logger.error(this.state + " onError: " + t, t);
}
this.state.get().onError(this, t);
}
/**
* Returns true if the {@code Subscriber} associated with this {@code Publisher} has
* cancelled its {@code Subscription}.
* @return {@code true} if a subscriber has been registered and its subscription has
* been cancelled; {@code false} otherwise
* @see ResponseBodySubscription#isCancelled()
* @see Subscription#cancel()
* Reads and publishes data buffers from the input. Continues till either there is no
* more demand, or till there is no more data to be read.
* @return {@code true} if there is more data to be read; {@code false} otherwise
*/
protected final boolean isSubscriptionCancelled() {
return (this.subscription != null && this.subscription.isCancelled());
private boolean readAndPublish() {
try {
while (hasDemand()) {
DataBuffer dataBuffer = read();
if (dataBuffer != null) {
BackpressureUtils.getAndSub(this.demand, 1L);
this.subscriber.onNext(dataBuffer);
}
else {
return false;
}
}
return true;
}
catch (IOException ex) {
onError(ex);
return false;
}
}
/**
* Checks the subscription for demand, and marks this publisher as "stalled" if there
* is none. The next time the subscriber {@linkplain Subscription#request(long)
* requests} more events, the {@link #noLongerStalled()} method is called.
* @return {@code true} if there is demand; {@code false} otherwise
* Reads a data buffer from the input, if possible. Returns {@code null} if a buffer
* could not be read.
* @return the data buffer that was read; or {@code null}
*/
protected final boolean checkSubscriptionForDemand() {
if (this.subscription == null || !this.subscription.hasDemand()) {
this.stalled = true;
return false;
}
else {
return true;
}
}
protected abstract DataBuffer read() throws IOException;
/**
* Abstract template method called when this publisher is no longer "stalled". Used in
* sub-classes to resume reading from the request.
* Closes the input.
*/
protected abstract void noLongerStalled();
protected abstract void close();
private final class ResponseBodySubscription implements Subscription {
private boolean hasDemand() {
return this.demand.get() > 0;
}
private final Subscriber<? super DataBuffer> subscriber;
private boolean changeState(AbstractRequestBodyPublisher.State oldState,
AbstractRequestBodyPublisher.State newState) {
return this.state.compareAndSet(oldState, newState);
}
private final AtomicLong demand = new AtomicLong();
private static final class RequestBodySubscription implements Subscription {
private boolean cancelled;
private final AbstractRequestBodyPublisher publisher;
public ResponseBodySubscription(Subscriber<? super DataBuffer> subscriber) {
Assert.notNull(subscriber, "'subscriber' must not be null");
public RequestBodySubscription(AbstractRequestBodyPublisher publisher) {
this.publisher = publisher;
}
this.subscriber = subscriber;
@Override
public final void request(long n) {
if (this.publisher.logger.isTraceEnabled()) {
this.publisher.logger.trace(state() + " request: " + n);
}
state().request(this.publisher, n);
}
@Override
public final void cancel() {
this.cancelled = true;
if (this.publisher.logger.isTraceEnabled()) {
this.publisher.logger.trace(state() + " cancel");
}
state().cancel(this.publisher);
}
/**
* Indicates whether this subscription has been cancelled.
* @see #cancel()
*/
protected final boolean isCancelled() {
return this.cancelled;
private AbstractRequestBodyPublisher.State state() {
return this.publisher.state.get();
}
@Override
public final void request(long n) {
if (!isCancelled() && BackpressureUtils.checkRequest(n, this.subscriber)) {
long demand = BackpressureUtils.addAndGet(this.demand, n);
}
if (stalled && demand > 0) {
stalled = false;
noLongerStalled();
}
}
}
/**
* Represents a state for the {@link Publisher} to be in. The following figure
* indicate the four different states that exist, and the relationships between them.
*
* <pre>
* UNSUBSCRIBED
* |
* v
* DATA_UNAVAILABLE <---> DATA_AVAILABLE
* | |
* v v
* COMPLETED
* </pre>
* Refer to the individual states for more information.
*/
private enum State {
/**
* Indicates whether this subscription has demand.
* @see #request(long)
* The initial unsubscribed state. Will respond to {@link
* #subscribe(AbstractRequestBodyPublisher, Subscriber)} by
* changing state to {@link #DATA_UNAVAILABLE}.
*/
protected final boolean hasDemand() {
return this.demand.get() > 0;
}
UNSUBSCRIBED {
@Override
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
Objects.requireNonNull(subscriber);
if (publisher.changeState(this, DATA_UNAVAILABLE)) {
Subscription subscription = new RequestBodySubscription(
publisher);
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
}
else {
throw new IllegalStateException(toString());
}
}
},
/**
* Publishes the given signal to the subscriber wrapped by this subscription, if
* it has not been cancelled. If there is {@linkplain #hasDemand() no demand} for
* the signal, an exception will be thrown.
* @param dataBuffer the signal to publish
* @see Subscriber#onNext(Object)
* State that gets entered when there is no data to be read. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and
* responds to {@link #onDataAvailable(AbstractRequestBodyPublisher)} by
* reading the available data and changing state to {@link #DATA_AVAILABLE} if
* there continues to be more data available after the demand has been satisfied.
*/
protected final void publishOnNext(DataBuffer dataBuffer) {
if (!isCancelled()) {
if (hasDemand()) {
BackpressureUtils.getAndSub(this.demand, 1L);
this.subscriber.onNext(dataBuffer);
DATA_UNAVAILABLE {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
if (BackpressureUtils.checkRequest(n, publisher.subscriber)) {
BackpressureUtils.addAndGet(publisher.demand, n);
}
else {
throw new IllegalStateException("No demand for: " + dataBuffer);
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
boolean dataAvailable = publisher.readAndPublish();
if (dataAvailable) {
publisher.changeState(this, DATA_AVAILABLE);
}
}
}
},
/**
* Publishes the given error to the subscriber wrapped by this subscription, if it
* has not been cancelled.
* @param t the error to publish
* @see Subscriber#onError(Throwable)
* State that gets entered when there is data to be read. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and
* by reading the available data and changing state to {@link #DATA_UNAVAILABLE}
* if there is no more data available.
*/
protected final void publishOnError(Throwable t) {
if (!isCancelled()) {
this.subscriber.onError(t);
DATA_AVAILABLE {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
if (BackpressureUtils.checkRequest(n, publisher.subscriber)) {
BackpressureUtils.addAndGet(publisher.demand, n);
boolean dataAvailable = publisher.readAndPublish();
if (!dataAvailable) {
publisher.changeState(this, DATA_UNAVAILABLE);
}
}
}
}
},
/**
* Publishes the complete signal to the subscriber wrapped by this subscription,
* if it has not been cancelled.
* @see Subscriber#onComplete()
* The terminal completed state. Does not respond to any events.
*/
protected final void publishOnComplete() {
if (!isCancelled()) {
this.subscriber.onComplete();
COMPLETED {
@Override
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
// ignore
}
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
// ignore
}
@Override
void cancel(AbstractRequestBodyPublisher publisher) {
// ignore
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
// ignore
}
@Override
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
// ignore
}
@Override
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
// ignore
}
};
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
throw new IllegalStateException(toString());
}
void request(AbstractRequestBodyPublisher publisher, long n) {
throw new IllegalStateException(toString());
}
void cancel(AbstractRequestBodyPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.close();
}
}
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
throw new IllegalStateException(toString());
}
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.close();
if (publisher.subscriber != null) {
publisher.subscriber.onComplete();
}
}
}
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
publisher.close();
if (publisher.subscriber != null) {
publisher.subscriber.onError(t);
}
}
}
}
}
......@@ -141,10 +141,8 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class);
private final RequestBodyReadListener readListener =
new RequestBodyReadListener();
private final RequestBodyPublisher.RequestBodyReadListener readListener =
new RequestBodyPublisher.RequestBodyReadListener();
private final ServletAsyncContextSynchronizer synchronizer;
......@@ -165,76 +163,50 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
@Override
protected void noLongerStalled() {
try {
this.readListener.onDataAvailable();
}
catch (IOException ex) {
this.readListener.onError(ex);
protected DataBuffer read() throws IOException {
ServletInputStream input = this.synchronizer.getRequest().getInputStream();
if (input.isReady()) {
int read = input.read(this.buffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
}
if (read > 0) {
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read);
dataBuffer.write(this.buffer, 0, read);
return dataBuffer;
}
}
return null;
}
@Override
protected void close() {
this.synchronizer.readComplete();
}
private class RequestBodyReadListener implements ReadListener {
@Override
public void onDataAvailable() throws IOException {
if (isSubscriptionCancelled()) {
return;
}
logger.trace("onDataAvailable");
ServletInputStream input =
RequestBodyPublisher.this.synchronizer.getRequest()
.getInputStream();
while (true) {
if (!checkSubscriptionForDemand()) {
break;
}
boolean ready = input.isReady();
logger.trace(
"Input ready: " + ready + " finished: " + input.isFinished());
if (!ready) {
break;
}
int read = input.read(RequestBodyPublisher.this.buffer);
logger.trace("Input read:" + read);
if (read == -1) {
break;
}
else if (read > 0) {
DataBuffer dataBuffer =
RequestBodyPublisher.this.dataBufferFactory
.allocateBuffer(read);
dataBuffer.write(RequestBodyPublisher.this.buffer, 0, read);
publishOnNext(dataBuffer);
}
}
RequestBodyPublisher.this.onDataAvailable();
}
@Override
public void onAllDataRead() throws IOException {
logger.trace("All data read");
RequestBodyPublisher.this.synchronizer.readComplete();
publishOnComplete();
RequestBodyPublisher.this.onAllDataRead();
}
@Override
public void onError(Throwable t) {
logger.trace("RequestBodyReadListener Error", t);
RequestBodyPublisher.this.synchronizer.readComplete();
public void onError(Throwable throwable) {
RequestBodyPublisher.this.onError(throwable);
publishOnError(t);
}
}
}
private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber {
private final ResponseBodyWriteListener writeListener =
......
......@@ -57,7 +57,6 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
this.dataBufferFactory = dataBufferFactory;
}
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
......@@ -72,7 +71,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
responseBody.registerListener();
ServerHttpResponse response =
new UndertowServerHttpResponse(exchange, responseChannel,
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBody)),
publisher -> Mono
.from(subscriber -> publisher.subscribe(responseBody)),
this.dataBufferFactory);
this.delegate.handle(request, response).subscribe(new Subscriber<Void>() {
......@@ -90,7 +90,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void onError(Throwable ex) {
if (exchange.isResponseStarted() || exchange.getStatusCode() > 500) {
logger.error("Error from request handling. Completing the request.", ex);
logger.error("Error from request handling. Completing the request.",
ex);
}
else {
exchange.setStatusCode(500);
......@@ -107,10 +108,11 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class);
private final ChannelListener<StreamSourceChannel> readListener =
new ReadListener();
private final ChannelListener<StreamSourceChannel> listener =
new RequestBodyListener();
private final ChannelListener<StreamSourceChannel> closeListener =
new CloseListener();
private final StreamSourceChannel requestChannel;
......@@ -127,11 +129,31 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
}
public void registerListener() {
this.requestChannel.getReadSetter().set(this.listener);
this.requestChannel.getReadSetter().set(this.readListener);
this.requestChannel.getCloseSetter().set(this.closeListener);
this.requestChannel.resumeReads();
}
private void close() {
@Override
protected DataBuffer read() throws IOException {
ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer();
int read = this.requestChannel.read(byteBuffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
}
if (read > 0) {
byteBuffer.flip();
return this.dataBufferFactory.wrap(byteBuffer);
}
else if (read == -1) {
onAllDataRead();
}
return null;
}
@Override
protected void close() {
if (this.pooledByteBuffer != null) {
IoUtils.safeClose(this.pooledByteBuffer);
}
......@@ -140,54 +162,21 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
}
}
@Override
protected void noLongerStalled() {
this.listener.handleEvent(this.requestChannel);
private class ReadListener implements ChannelListener<StreamSourceChannel> {
@Override
public void handleEvent(StreamSourceChannel channel) {
onDataAvailable();
}
}
private class RequestBodyListener
implements ChannelListener<StreamSourceChannel> {
private class CloseListener implements ChannelListener<StreamSourceChannel> {
@Override
public void handleEvent(StreamSourceChannel channel) {
if (isSubscriptionCancelled()) {
return;
}
logger.trace("handleEvent");
ByteBuffer byteBuffer =
RequestBodyPublisher.this.pooledByteBuffer.getBuffer();
try {
while (true) {
if (!checkSubscriptionForDemand()) {
break;
}
int read = channel.read(byteBuffer);
logger.trace("Input read:" + read);
if (read == -1) {
publishOnComplete();
close();
break;
}
else if (read == 0) {
// input not ready, wait until we are invoked again
break;
}
else {
byteBuffer.flip();
DataBuffer dataBuffer =
RequestBodyPublisher.this.dataBufferFactory
.wrap(byteBuffer);
publishOnNext(dataBuffer);
}
}
}
catch (IOException ex) {
publishOnError(ex);
}
onAllDataRead();
}
}
}
private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber {
......@@ -296,5 +285,4 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册