From 3a681fba890ab6044b54145c2f33bdea0922caa6 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 28 Jun 2016 11:17:57 +0200 Subject: [PATCH] AbstractResponseBodySubscriber improvements - AbstractResponseBodySubscriber now checks if the current state is expected before changing to a new state. - Included comments by @violetagg --- .../AbstractResponseBodySubscriber.java | 40 +++++++++++-------- .../reactive/ServletHttpHandlerAdapter.java | 8 ++-- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java index 7c174edd12..98d13c52e2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java @@ -139,8 +139,8 @@ abstract class AbstractResponseBodySubscriber implements Subscriber */ protected abstract void close(); - private void changeState(State oldState, State newState) { - this.state.compareAndSet(oldState, newState); + private boolean changeState(State oldState, State newState) { + return this.state.compareAndSet(oldState, newState); } /** @@ -171,8 +171,9 @@ abstract class AbstractResponseBodySubscriber implements Subscriber Subscription subscription) { if (BackpressureUtils.validate(subscriber.subscription, subscription)) { subscriber.subscription = subscription; - subscriber.changeState(this, REQUESTED); - subscription.request(1); + if (subscriber.changeState(this, REQUESTED)) { + subscription.request(1); + } } } }, @@ -186,15 +187,17 @@ abstract class AbstractResponseBodySubscriber implements Subscriber @Override void onNext(AbstractResponseBodySubscriber subscriber, DataBuffer dataBuffer) { - subscriber.changeState(this, RECEIVED); - subscriber.receiveBuffer(dataBuffer); + if (subscriber.changeState(this, RECEIVED)) { + subscriber.receiveBuffer(dataBuffer); + } } @Override void onComplete(AbstractResponseBodySubscriber subscriber) { - subscriber.subscriptionCompleted = true; - subscriber.changeState(this, COMPLETED); - subscriber.close(); + if (subscriber.changeState(this, COMPLETED)) { + subscriber.subscriptionCompleted = true; + subscriber.close(); + } } }, /** @@ -217,12 +220,14 @@ abstract class AbstractResponseBodySubscriber implements Subscriber subscriber.releaseBuffer(); boolean subscriptionCompleted = subscriber.subscriptionCompleted; if (!subscriptionCompleted) { - subscriber.changeState(this, REQUESTED); - subscriber.subscription.request(1); + if (subscriber.changeState(this, REQUESTED)) { + subscriber.subscription.request(1); + } } else { - subscriber.changeState(this, COMPLETED); - subscriber.close(); + if (subscriber.changeState(this, COMPLETED)) { + subscriber.close(); + } } } } @@ -258,7 +263,7 @@ abstract class AbstractResponseBodySubscriber implements Subscriber }; void onSubscribe(AbstractResponseBodySubscriber subscriber, Subscription s) { - throw new IllegalStateException(toString()); + s.cancel(); } void onNext(AbstractResponseBodySubscriber subscriber, DataBuffer dataBuffer) { @@ -266,9 +271,10 @@ abstract class AbstractResponseBodySubscriber implements Subscriber } void onError(AbstractResponseBodySubscriber subscriber, Throwable t) { - subscriber.changeState(this, COMPLETED); - subscriber.writeError(t); - subscriber.close(); + if (subscriber.changeState(this, COMPLETED)) { + subscriber.writeError(t); + subscriber.close(); + } } void onComplete(AbstractResponseBodySubscriber subscriber) { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index aa1edf35cd..3f0f430424 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -269,7 +269,8 @@ public class ServletHttpHandlerAdapter extends HttpServlet { onWritePossible(); } } - catch (IOException ignored) { + catch (IOException ex) { + onError(ex); } } @@ -277,13 +278,12 @@ public class ServletHttpHandlerAdapter extends HttpServlet { protected boolean write(DataBuffer dataBuffer) throws IOException { ServletOutputStream output = outputStream(); - boolean ready = output.isReady(); - if (this.flushOnNext) { flush(); - ready = output.isReady(); } + boolean ready = output.isReady(); + if (this.logger.isTraceEnabled()) { this.logger.trace("write: " + dataBuffer + " ready: " + ready); } -- GitLab