From d56fedc226288ebca3afb248679591c4c9dbb070 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 17 Jul 2017 12:25:25 +0200 Subject: [PATCH] Methods for reading a Resource in DataBufferUtils Currently ResourceEncoder and ResourceRegionEncoder use DataBufferUtils to read resource with an AsynchronousFileChannel if it is a file or otherwise fallback on getting the channel from the resource. The same is now required in other places where a Resource needs to be read and is also generally useful. Issue: SPR-15773 --- .../core/codec/ResourceEncoder.java | 25 +------- .../core/codec/ResourceRegionEncoder.java | 32 +--------- .../core/io/buffer/DataBufferUtils.java | 61 +++++++++++++++++++ .../core/io/buffer/DataBufferUtilsTests.java | 30 +++++++++ 4 files changed, 95 insertions(+), 53 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java index 07a16af953..499245598e 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java @@ -16,11 +16,6 @@ package org.springframework.core.codec; -import java.io.File; -import java.io.IOException; -import java.nio.channels.AsynchronousFileChannel; -import java.nio.channels.ReadableByteChannel; -import java.nio.file.StandardOpenOption; import java.util.Map; import reactor.core.publisher.Flux; @@ -70,25 +65,7 @@ public class ResourceEncoder extends AbstractSingleValueEncoder { protected Flux encode(Resource resource, DataBufferFactory dataBufferFactory, ResolvableType type, @Nullable MimeType mimeType, @Nullable Map hints) { - try { - if (resource.isFile()) { - File file = resource.getFile(); - AsynchronousFileChannel channel = - AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize); - } - } - catch (IOException ignore) { - // fallback to resource.readableChannel(), below - } - - try { - ReadableByteChannel channel = resource.readableChannel(); - return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize); - } - catch (IOException ex) { - return Flux.error(ex); - } + return DataBufferUtils.read(resource, dataBufferFactory, this.bufferSize); } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java index 3c1ab64e3c..f4f7fd63b3 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java @@ -16,13 +16,9 @@ package org.springframework.core.codec; -import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; -import java.nio.channels.ReadableByteChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.StandardOpenOption; import java.util.Map; import java.util.OptionalLong; @@ -118,32 +114,10 @@ public class ResourceRegionEncoder extends AbstractEncoder { } private Flux writeResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) { - Flux in = readResourceRegion(region, bufferFactory); - return DataBufferUtils.takeUntilByteCount(in, region.getCount()); - } - - private Flux readResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) { Resource resource = region.getResource(); - try { - if (resource.isFile()) { - File file = region.getResource().getFile(); - AsynchronousFileChannel channel = - AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - return DataBufferUtils.read(channel, region.getPosition(), - bufferFactory, this.bufferSize); - } - } - catch (IOException ignore) { - // fallback to resource.readableChannel(), below - } - try { - ReadableByteChannel channel = resource.readableChannel(); - Flux in = DataBufferUtils.read(channel, bufferFactory, this.bufferSize); - return DataBufferUtils.skipUntilByteCount(in, region.getPosition()); - } - catch (IOException ex) { - return Flux.error(ex); - } + long position = region.getPosition(); + Flux in = DataBufferUtils.read(resource, position, bufferFactory, this.bufferSize); + return DataBufferUtils.takeUntilByteCount(in, region.getCount()); } private Flux getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 6d47cfeed6..0c0fa71915 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -16,6 +16,7 @@ package org.springframework.core.io.buffer; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -26,6 +27,7 @@ import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.file.StandardOpenOption; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -38,6 +40,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.SynchronousSink; +import org.springframework.core.io.Resource; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -134,6 +137,64 @@ public abstract class DataBufferUtils { }); } + /** + * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s. + *

If the resource is a file, it is read into an + * {@code AsynchronousFileChannel} and turned to {@code Flux} via + * {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else + * fall back on {@link #read(InputStream, DataBufferFactory, int)} closes + * the channel when the flux is terminated. + * @param resource the resource to read from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux read(Resource resource, + DataBufferFactory dataBufferFactory, int bufferSize) { + + return read(resource, 0, dataBufferFactory, bufferSize); + } + + /** + * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s + * starting at the given position. + *

If the resource is a file, it is read into an + * {@code AsynchronousFileChannel} and turned to {@code Flux} via + * {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else + * fall back on {@link #read(InputStream, DataBufferFactory, int)}. Closes + * the channel when the flux is terminated. + * @param resource the resource to read from + * @param position the position to start reading from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux read(Resource resource, long position, + DataBufferFactory dataBufferFactory, int bufferSize) { + + try { + if (resource.isFile()) { + File file = resource.getFile(); + AsynchronousFileChannel channel = + AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); + return DataBufferUtils.read(channel, position, dataBufferFactory, bufferSize); + } + } + catch (IOException ignore) { + // fallback to resource.readableChannel(), below + } + + try { + ReadableByteChannel channel = resource.readableChannel(); + Flux in = DataBufferUtils.read(channel, dataBufferFactory, bufferSize); + return DataBufferUtils.skipUntilByteCount(in, position); + } + catch (IOException ex) { + return Flux.error(ex); + } + } + + //--------------------------------------------------------------------- // Writing //--------------------------------------------------------------------- diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 41b679dab5..ba6e9e164f 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -34,6 +34,9 @@ import org.junit.Test; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + import static org.junit.Assert.*; /** @@ -119,6 +122,33 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .verify(Duration.ofSeconds(5)); } + @Test + public void readResource() throws Exception { + Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass()); + Flux flux = DataBufferUtils.read(resource, this.bufferFactory, 3); + + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + public void readResourcePosition() throws Exception { + Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass()); + Flux flux = DataBufferUtils.read(resource, 3, this.bufferFactory, 3); + + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + @Test public void writeOutputStream() throws Exception { DataBuffer foo = stringBuffer("foo"); -- GitLab