diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 03d275e02ed69187760569fb5186bc2e4ee35b2f..26c93f9878df6607af2b3333cf5c2b8e7ea8c565 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -29,7 +29,6 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; -import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -49,7 +48,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyInbound inbound; - private final AtomicBoolean bodyConsumed = new AtomicBoolean(); + private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) { @@ -62,10 +61,18 @@ class ReactorClientHttpResponse implements ClientHttpResponse { @Override public Flux getBody() { return this.inbound.receive() - .doOnSubscribe(s -> - // See https://github.com/reactor/reactor-netty/issues/503 - Assert.state(this.bodyConsumed.compareAndSet(false, true), - "The client response body can only be consumed once.")) + .doOnSubscribe(s -> { + if (this.rejectSubscribers.get()) { + throw new IllegalStateException("The client response body can only be consumed once."); + } + }) + .doOnCancel(() -> { + // https://github.com/reactor/reactor-netty/issues/503 + // FluxReceive rejects multiple subscribers, but not after a cancel(). + // Subsequent subscribers after cancel() will not be rejected, but will hang instead. + // So we need to intercept and reject them in that case. + this.rejectSubscribers.set(true); + }) .map(byteBuf -> { byteBuf.retain(); return this.bufferFactory.wrap(byteBuf); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index e66398fdfdf25ff43eae3b1969649f6399a700a4..d2ad4f1f901966aa9d7fe8f97c442b909339fcd2 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -447,8 +447,9 @@ class DefaultWebClient implements WebClient { @SuppressWarnings("unchecked") private Mono drainBody(ClientResponse response, Throwable ex) { // Ensure the body is drained, even if the StatusHandler didn't consume it, - // but ignore errors in case it did consume it. - return (Mono) response.bodyToMono(Void.class).onErrorMap(ex2 -> ex).thenReturn(ex); + // but ignore exception, in case the handler did consume. + return (Mono) response.bodyToMono(Void.class) + .onErrorResume(ex2 -> Mono.empty()).thenReturn(ex); } private static Mono createResponseException(ClientResponse response) {