diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java index 2102b5f30e34ee9ba2c8cca869f42b12fee6021a..072e24a7337e49470d14cb00e3c3d14b111e72ae 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java @@ -248,11 +248,25 @@ public interface DataBuffer { /** * Expose this buffer's data as an {@link InputStream}. Both data and read position are - * shared between the returned stream and this data buffer. + * shared between the returned stream and this data buffer. The underlying buffer will + * not be {@linkplain DataBufferUtils#release(DataBuffer) released} when the + * input stream is {@linkplain InputStream#close() closed}. * @return this data buffer as an input stream + * @see #asInputStream(boolean) */ InputStream asInputStream(); + /** + * Expose this buffer's data as an {@link InputStream}. Both data and read position are + * shared between the returned stream and this data buffer. + * @param releaseOnClose whether the underlying buffer will be + * {@linkplain DataBufferUtils#release(DataBuffer) released} when the input stream is + * {@linkplain InputStream#close() closed}. + * @return this data buffer as an input stream + * @since 5.0.4 + */ + InputStream asInputStream(boolean releaseOnClose); + /** * Expose this buffer's data as an {@link OutputStream}. Both data and write position are * shared between the returned stream and this data buffer. diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java index 704ca10083ed590c5017969d2650b6f3109c1047..699e74f110e294c9d4c19ea0fcf82bd1634d93e5 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java @@ -365,6 +365,11 @@ public class DefaultDataBuffer implements DataBuffer { return new DefaultDataBufferInputStream(); } + @Override + public InputStream asInputStream(boolean releaseOnClose) { + return new DefaultDataBufferInputStream(); + } + @Override public OutputStream asOutputStream() { return new DefaultDataBufferOutputStream(); diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java index ee41d4db89c6a476caab5bbbf9af051f15df8bad..cde0e9a03ecd9254e02b6e24324aa8da0fb858b5 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java @@ -250,6 +250,11 @@ public class NettyDataBuffer implements PooledDataBuffer { return new ByteBufInputStream(this.byteBuf); } + @Override + public InputStream asInputStream(boolean releaseOnClose) { + return new ByteBufInputStream(this.byteBuf, releaseOnClose); + } + @Override public OutputStream asOutputStream() { return new ByteBufOutputStream(this.byteBuf); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index cdb8adf97a9e112763260e3bc186fb5626e14d12..851e2aa59a5da48de46caec702bd2ce7094266e3 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -182,6 +182,27 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase { release(buffer); } + @Test + public void inputStreamReleaseOnClose() throws IOException { + DataBuffer buffer = createDataBuffer(3); + byte[] bytes = {'a', 'b', 'c'}; + buffer.write(bytes); + + InputStream inputStream = buffer.asInputStream(true); + + try { + byte[] result = new byte[3]; + int len = inputStream.read(result); + assertEquals(3, len); + assertArrayEquals(bytes, result); + } finally { + inputStream.close(); + } + + // AbstractDataBufferAllocatingTestCase.LeakDetector will verify the buffer's release + + } + @Test public void outputStream() throws IOException { DataBuffer buffer = createDataBuffer(4); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 045b98e547c06c8a648b6431b79c459ebd0360c8..66b0992d8e3514a91436fb4a6333e871bba9db3b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -35,6 +35,7 @@ import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; @@ -210,8 +211,14 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override public boolean release() { - this.pooledByteBuffer.close(); - return this.pooledByteBuffer.isOpen(); + boolean result; + try { + result = DataBufferUtils.release(this.dataBuffer); + } + finally { + this.pooledByteBuffer.close(); + } + return result && this.pooledByteBuffer.isOpen(); } @Override @@ -338,6 +345,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { return this.dataBuffer.asInputStream(); } + @Override + public InputStream asInputStream(boolean releaseOnClose) { + return this.dataBuffer.asInputStream(releaseOnClose); + } + @Override public OutputStream asOutputStream() { return this.dataBuffer.asOutputStream();