提交 d56fedc2 编写于 作者: R Rossen Stoyanchev

Methods for reading a Resource in DataBufferUtils

Currently ResourceEncoder and ResourceRegionEncoder use DataBufferUtils
to read resource with an AsynchronousFileChannel if it is a file or
otherwise fallback on getting the channel from the resource.

The same is now required in other places where a Resource needs to be
read and is also generally useful.

Issue: SPR-15773
上级 ee91e52c
......@@ -16,11 +16,6 @@
package org.springframework.core.codec;
import java.io.File;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import reactor.core.publisher.Flux;
......@@ -70,25 +65,7 @@ public class ResourceEncoder extends AbstractSingleValueEncoder<Resource> {
protected Flux<DataBuffer> encode(Resource resource, DataBufferFactory dataBufferFactory,
ResolvableType type, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
try {
if (resource.isFile()) {
File file = resource.getFile();
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize);
}
}
catch (IOException ignore) {
// fallback to resource.readableChannel(), below
}
try {
ReadableByteChannel channel = resource.readableChannel();
return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize);
}
catch (IOException ex) {
return Flux.error(ex);
}
return DataBufferUtils.read(resource, dataBufferFactory, this.bufferSize);
}
}
......@@ -16,13 +16,9 @@
package org.springframework.core.codec;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.OptionalLong;
......@@ -118,32 +114,10 @@ public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
}
private Flux<DataBuffer> writeResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) {
Flux<DataBuffer> in = readResourceRegion(region, bufferFactory);
return DataBufferUtils.takeUntilByteCount(in, region.getCount());
}
private Flux<DataBuffer> readResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) {
Resource resource = region.getResource();
try {
if (resource.isFile()) {
File file = region.getResource().getFile();
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
return DataBufferUtils.read(channel, region.getPosition(),
bufferFactory, this.bufferSize);
}
}
catch (IOException ignore) {
// fallback to resource.readableChannel(), below
}
try {
ReadableByteChannel channel = resource.readableChannel();
Flux<DataBuffer> in = DataBufferUtils.read(channel, bufferFactory, this.bufferSize);
return DataBufferUtils.skipUntilByteCount(in, region.getPosition());
}
catch (IOException ex) {
return Flux.error(ex);
}
long position = region.getPosition();
Flux<DataBuffer> in = DataBufferUtils.read(resource, position, bufferFactory, this.bufferSize);
return DataBufferUtils.takeUntilByteCount(in, region.getCount());
}
private Flux<DataBuffer> getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) {
......
......@@ -16,6 +16,7 @@
package org.springframework.core.io.buffer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
......@@ -26,6 +27,7 @@ import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
......@@ -38,6 +40,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SynchronousSink;
import org.springframework.core.io.Resource;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
......@@ -134,6 +137,64 @@ public abstract class DataBufferUtils {
});
}
/**
* Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s.
* <p>If the resource is a file, it is read into an
* {@code AsynchronousFileChannel} and turned to {@code Flux} via
* {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else
* fall back on {@link #read(InputStream, DataBufferFactory, int)} closes
* the channel when the flux is terminated.
* @param resource the resource to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(Resource resource,
DataBufferFactory dataBufferFactory, int bufferSize) {
return read(resource, 0, dataBufferFactory, bufferSize);
}
/**
* Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
* starting at the given position.
* <p>If the resource is a file, it is read into an
* {@code AsynchronousFileChannel} and turned to {@code Flux} via
* {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else
* fall back on {@link #read(InputStream, DataBufferFactory, int)}. Closes
* the channel when the flux is terminated.
* @param resource the resource to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(Resource resource, long position,
DataBufferFactory dataBufferFactory, int bufferSize) {
try {
if (resource.isFile()) {
File file = resource.getFile();
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
return DataBufferUtils.read(channel, position, dataBufferFactory, bufferSize);
}
}
catch (IOException ignore) {
// fallback to resource.readableChannel(), below
}
try {
ReadableByteChannel channel = resource.readableChannel();
Flux<DataBuffer> in = DataBufferUtils.read(channel, dataBufferFactory, bufferSize);
return DataBufferUtils.skipUntilByteCount(in, position);
}
catch (IOException ex) {
return Flux.error(ex);
}
}
//---------------------------------------------------------------------
// Writing
//---------------------------------------------------------------------
......
......@@ -34,6 +34,9 @@ import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import static org.junit.Assert.*;
/**
......@@ -119,6 +122,33 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify(Duration.ofSeconds(5));
}
@Test
public void readResource() throws Exception {
Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass());
Flux<DataBuffer> flux = DataBufferUtils.read(resource, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void readResourcePosition() throws Exception {
Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass());
Flux<DataBuffer> flux = DataBufferUtils.read(resource, 3, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("bar"))
.consumeNextWith(stringConsumer("baz"))
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void writeOutputStream() throws Exception {
DataBuffer foo = stringBuffer("foo");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册