diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index 71adf24dd16329a0822cf562d48fef6e1e114509..811cb78a96c80725956e4c3ce4c37a678eb3cb7d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -106,6 +106,10 @@ abstract class AbstractResponseBodyFlushProcessor */ protected abstract void flush() throws IOException; + private void cancel() { + this.subscription.cancel(); + } + private void writeComplete() { if (logger.isTraceEnabled()) { logger.trace(this.state + " writeComplete"); @@ -157,11 +161,12 @@ abstract class AbstractResponseBodyFlushProcessor else { try { processor.flush(); + processor.subscription.request(1); } catch (IOException ex) { + processor.cancel(); processor.onError(ex); } - processor.subscription.request(1); } } }, COMPLETED { @@ -231,6 +236,7 @@ abstract class AbstractResponseBodyFlushProcessor @Override public void onError(Throwable t) { + processor.cancel(); processor.onError(t); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java index 1a0268cd07d80b0a0df1f82d23847d84c2f1821b..2463fb941438b74b565a368d8caf61621405b759 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -159,6 +159,10 @@ abstract class AbstractResponseBodyProcessor implements Processor