From 7a5f8e03bc53ad78f76be82ecdd5fc8b8e86797a Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 4 Dec 2018 21:17:44 -0500 Subject: [PATCH] Refine check for multiple subscribers Commit #c187cb2 introduced proactive rejection of multiple subscribers in ReactorClientHttpResponse, instead of hanging indefinitely as per https://github.com/reactor/reactor-netty/issues/503. However FluxReceive also rejects subsequent subscribers if the response is consumed fully, as opposed to being canceled, e.g. as with bodyToMono(Void.class). In that case, a subsequent subscriber causes two competing error signals to be sent, and one gets dropped and logged by reactor-core. This fix ensures that a rejection is raised in ReactorClientHttpResponse only after a cancel() was detected. Issue: SPR-17564 --- .../reactive/ReactorClientHttpResponse.java | 19 +++++++++++++------ .../function/client/DefaultWebClient.java | 5 +++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 03d275e02e..26c93f9878 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -29,7 +29,6 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; -import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -49,7 +48,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyInbound inbound; - private final AtomicBoolean bodyConsumed = new AtomicBoolean(); + private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) { @@ -62,10 +61,18 @@ class ReactorClientHttpResponse implements ClientHttpResponse { @Override public Flux getBody() { return this.inbound.receive() - .doOnSubscribe(s -> - // See https://github.com/reactor/reactor-netty/issues/503 - Assert.state(this.bodyConsumed.compareAndSet(false, true), - "The client response body can only be consumed once.")) + .doOnSubscribe(s -> { + if (this.rejectSubscribers.get()) { + throw new IllegalStateException("The client response body can only be consumed once."); + } + }) + .doOnCancel(() -> { + // https://github.com/reactor/reactor-netty/issues/503 + // FluxReceive rejects multiple subscribers, but not after a cancel(). + // Subsequent subscribers after cancel() will not be rejected, but will hang instead. + // So we need to intercept and reject them in that case. + this.rejectSubscribers.set(true); + }) .map(byteBuf -> { byteBuf.retain(); return this.bufferFactory.wrap(byteBuf); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index e66398fdfd..d2ad4f1f90 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -447,8 +447,9 @@ class DefaultWebClient implements WebClient { @SuppressWarnings("unchecked") private Mono drainBody(ClientResponse response, Throwable ex) { // Ensure the body is drained, even if the StatusHandler didn't consume it, - // but ignore errors in case it did consume it. - return (Mono) response.bodyToMono(Void.class).onErrorMap(ex2 -> ex).thenReturn(ex); + // but ignore exception, in case the handler did consume. + return (Mono) response.bodyToMono(Void.class) + .onErrorResume(ex2 -> Mono.empty()).thenReturn(ex); } private static Mono createResponseException(ClientResponse response) { -- GitLab