diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index bc97362c989ec78256813d4ebe65e1195c6e176d..9ac2d80b77d649153a492446af31e96c49189b74 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -506,7 +506,7 @@ public abstract class DataBufferUtils { private final AtomicBoolean reading = new AtomicBoolean(); - private final AtomicBoolean canceled = new AtomicBoolean(); + private final AtomicBoolean disposed = new AtomicBoolean(); public ReadCompletionHandler(AsynchronousFileChannel channel, FluxSink sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { @@ -519,7 +519,9 @@ public abstract class DataBufferUtils { } public void read() { - if (this.sink.requestedFromDownstream() > 0 && this.reading.compareAndSet(false, true)) { + if (this.sink.requestedFromDownstream() > 0 && + isNotDisposed() && + this.reading.compareAndSet(false, true)) { DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); @@ -528,34 +530,38 @@ public abstract class DataBufferUtils { @Override public void completed(Integer read, DataBuffer dataBuffer) { - this.reading.set(false); - if (!isCanceled()) { + if (isNotDisposed()) { if (read != -1) { this.position.addAndGet(read); dataBuffer.writePosition(read); this.sink.next(dataBuffer); + this.reading.set(false); read(); } else { release(dataBuffer); closeChannel(this.channel); - this.sink.complete(); + if (this.disposed.compareAndSet(false, true)) { + this.sink.complete(); + } + this.reading.set(false); } } else { release(dataBuffer); closeChannel(this.channel); + this.reading.set(false); } } @Override public void failed(Throwable exc, DataBuffer dataBuffer) { - this.reading.set(false); release(dataBuffer); closeChannel(this.channel); - if (!isCanceled()) { + if (this.disposed.compareAndSet(false, true)) { this.sink.error(exc); } + this.reading.set(false); } public void request(long n) { @@ -563,15 +569,15 @@ public abstract class DataBufferUtils { } public void cancel() { - if (this.canceled.compareAndSet(false, true)) { + if (this.disposed.compareAndSet(false, true)) { if (!this.reading.get()) { closeChannel(this.channel); } } } - private boolean isCanceled() { - return this.canceled.get(); + private boolean isNotDisposed() { + return !this.disposed.get(); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 5e0692768747d33230bccbc254d051de19fa279c..2720fbb6eaf1d1625fa43b856a33ddf611cde884 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -18,7 +18,9 @@ package org.springframework.web.reactive.function.client; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.time.Duration; import java.util.Arrays; @@ -406,7 +408,6 @@ public class WebClientIntegrationTests { prepareResponse(response -> {}); Resource resource = new ClassPathResource("largeTextFile.txt", getClass()); - byte[] expected = Files.readAllBytes(resource.getFile().toPath()); Flux body = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096); Mono result = this.webClient.post() @@ -415,18 +416,21 @@ public class WebClientIntegrationTests { .retrieve() .bodyToMono(Void.class); - StepVerifier.create(result).verifyComplete(); + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); expectRequest(request -> { - ByteArrayOutputStream actual = new ByteArrayOutputStream(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { - request.getBody().copyTo(actual); + request.getBody().copyTo(bos); + String actual = bos.toString("UTF-8"); + String expected = new String(Files.readAllBytes(resource.getFile().toPath()), StandardCharsets.UTF_8); + assertEquals(expected, actual); } catch (IOException ex) { - throw new IllegalStateException(ex); + throw new UncheckedIOException(ex); } - assertEquals(expected.length, actual.size()); - assertEquals(hash(expected), hash(actual.toByteArray())); }); }