提交 c7a15260 编写于 作者: A Arjen Poutsma

Various DataBuffer improvements

This commit introduces various improvements in DataBuffer:

- DataBuffer now exposes its read and write position, as well as its
capacity and writable byte count.
- Added DataBuffer.asByteBuffer(int, int)
- DataBufferUtils.read now reads directly into a DataBuffer, rather than
copying a ByteBuffer into a DataBuffer
- TomcatHttpHandler now reads directly into a DataBuffer

Issues: SPR-16068 SPR-16070
上级 d8a7b96b
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -24,8 +24,29 @@ import java.util.function.IntPredicate;
/**
* Basic abstraction over byte buffers.
*
* <p>{@code DataBuffer}s has a separate {@linkplain #readPosition() read} and
* {@linkplain #writePosition() write} position, as opposed to {@code ByteBuffer}'s single
* {@linkplain ByteBuffer#position() position}. As such, the {@code DataBuffer} does not require
* a {@linkplain ByteBuffer#flip() flip} to read after writing. In general, the following invariant
* holds for the read and write positions, and the capacity:
*
* <blockquote>
* <tt>0</tt> <tt>&lt;=</tt>
* <i>readPosition</i> <tt>&lt;=</tt>
* <i>writePosition</i> <tt>&lt;=</tt>
* <i>capacity</i>
* </blockquote>
*
* <p>The {@linkplain #capacity() capacity} of a {@code DataBuffer} is expanded on demand,
* similar to {@code StringBuilder}.
*
* <p>The main purpose of the {@code DataBuffer} abstraction is provide a convenient wrapper around
* {@link ByteBuffer}, similar to Netty's {@link io.netty.buffer.ByteBuf}, that can also be used on
* non-Netty platforms (i.e. Servlet).
*
* @author Arjen Poutsma
* @since 5.0
* @see DataBufferFactory
*/
public interface DataBuffer {
......@@ -61,6 +82,63 @@ public interface DataBuffer {
*/
int readableByteCount();
/**
* Return the number of bytes that can be written to this data buffer.
* @return the writable byte count
* @since 5.0.1
*/
int writableByteCount();
/**
* Return the number of bytes that this buffer can contain.
* @return the capacity
* @since 5.0.1
*/
int capacity();
/**
* Sets the number of bytes that this buffer can contain. If the new capacity is lower than
* the current capacity, the contents of this buffer will be truncated. If the new capacity
* is higher than the current capacity, it will be expanded.
* @param capacity the new capacity
* @return this buffer
*/
DataBuffer capacity(int capacity);
/**
* Return the position from which this buffer will read.
* @return the read position
* @since 5.0.1
*/
int readPosition();
/**
* Set the position from which this buffer will read.
* @param readPosition the new read position
* @return this buffer
* @throws IndexOutOfBoundsException if {@code readPosition} is smaller than 0 or greater than
* {@link #writePosition()}
* @since 5.0.1
*/
DataBuffer readPosition(int readPosition);
/**
* Return the position to which this buffer will write.
* @return the write position
* @since 5.0.1
*/
int writePosition();
/**
* Set the position to which this buffer will write.
* @param writePosition the new write position
* @return this buffer
* @throws IndexOutOfBoundsException if {@code writePosition} is smaller than
* {@link #readPosition()} or greater than {@link #capacity()}
* @since 5.0.1
*/
DataBuffer writePosition(int writePosition);
/**
* Read a single byte from the current reading position of this data buffer.
* @return the byte at this buffer's current reading position
......@@ -146,6 +224,18 @@ public interface DataBuffer {
*/
ByteBuffer asByteBuffer();
/**
* Expose a subsequence of this buffer's bytes as a {@link ByteBuffer}. Data between this
* {@code DataBuffer} and the returned {@code ByteBuffer} is shared; though
* changes in the returned buffer's {@linkplain ByteBuffer#position() position}
* will not be reflected in the reading nor writing position of this data buffer.
* @param index the index at which to start the byte buffer
* @param length the length of the returned byte buffer
* @return this data buffer as a byte buffer
* @since 5.0.1
*/
ByteBuffer asByteBuffer(int index, int length);
/**
* Expose this buffer's data as an {@link InputStream}. Both data and position are
* shared between the returned stream and this data buffer.
......
......@@ -71,7 +71,6 @@ public abstract class DataBufferUtils {
DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(inputStream, "InputStream must not be null");
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
ReadableByteChannel channel = Channels.newChannel(inputStream);
return read(channel, dataBufferFactory, bufferSize);
......@@ -90,6 +89,7 @@ public abstract class DataBufferUtils {
Assert.notNull(channel, "ReadableByteChannel must not be null");
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
return Flux.generate(() -> channel,
new ReadableByteChannelGenerator(dataBufferFactory, bufferSize),
......@@ -125,15 +125,17 @@ public abstract class DataBufferUtils {
Assert.notNull(channel, "'channel' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
return Flux.create(sink -> {
sink.onDispose(() -> closeChannel(channel));
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(sink, position,
dataBufferFactory, byteBuffer);
channel.read(byteBuffer, position, channel, completionHandler);
CompletionHandler<Integer, DataBuffer> completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel, sink, position,
dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
});
}
......@@ -414,32 +416,25 @@ public abstract class DataBufferUtils {
private final DataBufferFactory dataBufferFactory;
private final ByteBuffer byteBuffer;
private final int bufferSize;
public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory, int chunkSize) {
public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory, int bufferSize) {
this.dataBufferFactory = dataBufferFactory;
this.byteBuffer = ByteBuffer.allocate(chunkSize);
this.bufferSize = bufferSize;
}
@Override
public ReadableByteChannel apply(ReadableByteChannel channel, SynchronousSink<DataBuffer> sub) {
public ReadableByteChannel apply(ReadableByteChannel channel,
SynchronousSink<DataBuffer> sub) {
boolean release = true;
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
try {
int read;
if ((read = channel.read(this.byteBuffer)) >= 0) {
this.byteBuffer.flip();
boolean release = true;
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read);
try {
dataBuffer.write(this.byteBuffer);
release = false;
sub.next(dataBuffer);
}
finally {
if (release) {
release(dataBuffer);
}
}
this.byteBuffer.clear();
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
if ((read = channel.read(byteBuffer)) >= 0) {
dataBuffer.writePosition(read);
release = false;
sub.next(dataBuffer);
}
else {
sub.complete();
......@@ -448,62 +443,64 @@ public abstract class DataBufferUtils {
catch (IOException ex) {
sub.error(ex);
}
finally {
if (release) {
release(dataBuffer);
}
}
return channel;
}
}
private static class AsynchronousFileChannelReadCompletionHandler
implements CompletionHandler<Integer, AsynchronousFileChannel> {
implements CompletionHandler<Integer, DataBuffer> {
private final FluxSink<DataBuffer> sink;
private final AsynchronousFileChannel channel;
private final ByteBuffer byteBuffer;
private final FluxSink<DataBuffer> sink;
private final DataBufferFactory dataBufferFactory;
private long position;
private final int bufferSize;
private AtomicLong position;
private AsynchronousFileChannelReadCompletionHandler(FluxSink<DataBuffer> sink,
long position, DataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
private AsynchronousFileChannelReadCompletionHandler(
AsynchronousFileChannel channel, FluxSink<DataBuffer> sink,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
this.channel = channel;
this.sink = sink;
this.position = position;
this.position = new AtomicLong(position);
this.dataBufferFactory = dataBufferFactory;
this.byteBuffer = byteBuffer;
this.bufferSize = bufferSize;
}
@Override
public void completed(Integer read, AsynchronousFileChannel channel) {
public void completed(Integer read, DataBuffer dataBuffer) {
if (read != -1) {
this.position += read;
this.byteBuffer.flip();
boolean release = true;
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read);
try {
dataBuffer.write(this.byteBuffer);
release = false;
this.sink.next(dataBuffer);
}
finally {
if (release) {
release(dataBuffer);
}
}
this.byteBuffer.clear();
long pos = this.position.addAndGet(read);
dataBuffer.writePosition(read);
this.sink.next(dataBuffer);
if (!this.sink.isCancelled()) {
channel.read(this.byteBuffer, this.position, channel, this);
DataBuffer newDataBuffer =
this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
this.channel.read(newByteBuffer, pos, newDataBuffer, this);
}
}
else {
release(dataBuffer);
closeChannel(this.channel);
this.sink.complete();
closeChannel(channel);
}
}
@Override
public void failed(Throwable exc, AsynchronousFileChannel channel) {
public void failed(Throwable exc, DataBuffer dataBuffer) {
release(dataBuffer);
closeChannel(this.channel);
this.sink.error(exc);
closeChannel(channel);
}
}
......@@ -558,9 +555,9 @@ public abstract class DataBufferUtils {
@Override
public void completed(Integer written, ByteBuffer byteBuffer) {
this.position.addAndGet(written);
long pos = this.position.addAndGet(written);
if (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
this.channel.write(byteBuffer, pos, byteBuffer, this);
return;
}
......
......@@ -22,17 +22,20 @@ import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.function.Function;
import java.util.function.IntPredicate;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Default implementation of the {@link DataBuffer} interface that uses a
* {@link ByteBuffer} internally, with separate read and write positions.
* {@link ByteBuffer} internally. with separate read and write positions.
* Constructed using the {@link DefaultDataBufferFactory}.
*
* <p>Inspired by Netty's {@code ByteBuf}. Introduced so that non-Netty runtimes (i.e. Servlet)
* do not require Netty on the classpath.
*
* @author Arjen Poutsma
* @author Juergen Hoeller
* @since 5.0
......@@ -53,32 +56,29 @@ public class DefaultDataBuffer implements DataBuffer {
private int writePosition;
private int capacity;
/**
* Create a new {@code DefaultDataBuffer} based on the given
* {@code ByteBuffer}. Both reading and writing position of this buffer are
* based on the current {@linkplain
* ByteBuffer#position() position} of the given buffer.
* @param byteBuffer the buffer to base this buffer on
*/
DefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory) {
this(byteBuffer, byteBuffer.position(), byteBuffer.position(), dataBufferFactory);
private DefaultDataBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.notNull(byteBuffer, "'byteBuffer' must not be null");
this.dataBufferFactory = dataBufferFactory;
ByteBuffer slice = byteBuffer.slice();
this.byteBuffer = slice;
this.capacity = slice.remaining();
}
DefaultDataBuffer(ByteBuffer byteBuffer, int readPosition, int writePosition,
DefaultDataBufferFactory dataBufferFactory) {
static DefaultDataBuffer fromFilledByteBuffer(DefaultDataBufferFactory dataBufferFactory,
ByteBuffer byteBuffer) {
Assert.notNull(byteBuffer, "'byteBuffer' must not be null");
Assert.isTrue(readPosition >= 0, "'readPosition' must be 0 or higher");
Assert.isTrue(writePosition >= 0, "'writePosition' must be 0 or higher");
Assert.isTrue(readPosition <= writePosition,
"'readPosition' must be smaller than or equal to 'writePosition'");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
DefaultDataBuffer dataBuffer = new DefaultDataBuffer(dataBufferFactory, byteBuffer);
dataBuffer.writePosition(byteBuffer.remaining());
return dataBuffer;
}
this.byteBuffer = byteBuffer;
this.readPosition = readPosition;
this.writePosition = writePosition;
this.dataBufferFactory = dataBufferFactory;
static DefaultDataBuffer fromEmptyByteBuffer(DefaultDataBufferFactory dataBufferFactory,
ByteBuffer byteBuffer) {
return new DefaultDataBuffer(dataBufferFactory, byteBuffer);
}
......@@ -90,6 +90,11 @@ public class DefaultDataBuffer implements DataBuffer {
return this.byteBuffer;
}
private void setNativeBuffer(ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
this.capacity = byteBuffer.remaining();
}
@Override
public DefaultDataBufferFactory factory() {
return this.dataBufferFactory;
......@@ -132,60 +137,151 @@ public class DefaultDataBuffer implements DataBuffer {
return this.writePosition - this.readPosition;
}
@Override
public int writableByteCount() {
return this.capacity - this.writePosition;
}
@Override
public int readPosition() {
return this.readPosition;
}
@Override
public DataBuffer readPosition(int readPosition) {
assertIndex(readPosition >= 0, "'readPosition' %d must be >= 0", readPosition);
assertIndex(readPosition <= this.writePosition, "'readPosition' %d must be <= %d",
readPosition, this.writePosition);
this.readPosition = readPosition;
return this;
}
@Override
public int writePosition() {
return this.writePosition;
}
@Override
public DataBuffer writePosition(int writePosition) {
assertIndex(writePosition >= this.readPosition, "'writePosition' %d must be >= %d",
writePosition, this.readPosition);
assertIndex(writePosition <= this.capacity, "'writePosition' %d must be <= %d",
writePosition, this.capacity);
this.writePosition = writePosition;
return this;
}
@Override
public int capacity() {
return this.capacity;
}
@Override
public DataBuffer capacity(int newCapacity) {
Assert.isTrue(newCapacity > 0,
String.format("'newCapacity' %d must be higher than 0", newCapacity));
int readPosition = readPosition();
int writePosition = writePosition();
int oldCapacity = capacity();
if (newCapacity > oldCapacity) {
ByteBuffer oldBuffer = this.byteBuffer;
ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect());
((Buffer) oldBuffer).position(0).limit(oldBuffer.capacity());
((Buffer) newBuffer).position(0).limit(oldBuffer.capacity());
newBuffer.put(oldBuffer);
newBuffer.clear();
setNativeBuffer(newBuffer);
}
else if (newCapacity < oldCapacity) {
ByteBuffer oldBuffer = this.byteBuffer;
ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect());
if (readPosition < newCapacity) {
if (writePosition > newCapacity) {
writePosition = newCapacity;
writePosition(writePosition);
}
((Buffer) oldBuffer).position(readPosition).limit(writePosition);
((Buffer) newBuffer).position(readPosition).limit(writePosition);
newBuffer.put(oldBuffer);
newBuffer.clear();
}
else {
readPosition(newCapacity);
writePosition(newCapacity);
}
setNativeBuffer(newBuffer);
}
return this;
}
private static ByteBuffer allocate(int capacity, boolean direct) {
return direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
}
@Override
public byte read() {
return readInternal(ByteBuffer::get);
assertIndex(this.readPosition <= this.writePosition - 1, "readPosition %d must be <= %d",
this.readPosition, this.writePosition - 1);
int pos = this.readPosition;
byte b = this.byteBuffer.get(pos);
this.readPosition = pos + 1;
return b;
}
@Override
public DefaultDataBuffer read(byte[] destination) {
Assert.notNull(destination, "'destination' must not be null");
readInternal(b -> b.get(destination));
read(destination, 0, destination.length);
return this;
}
@Override
public DefaultDataBuffer read(byte[] destination, int offset, int length) {
Assert.notNull(destination, "'destination' must not be null");
readInternal(b -> b.get(destination, offset, length));
return this;
}
assertIndex(this.readPosition <= this.writePosition - length,
"readPosition %d and length %d should be smaller than writePosition %d",
this.readPosition, length, this.writePosition);
/**
* Internal read method that keeps track of the {@link #readPosition} before and after
* applying the given function on {@link #byteBuffer}.
*/
private <T> T readInternal(Function<ByteBuffer, T> function) {
// Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
((Buffer) this.byteBuffer).position(this.readPosition);
try {
return function.apply(this.byteBuffer);
}
finally {
this.readPosition = this.byteBuffer.position();
}
ByteBuffer tmp = this.byteBuffer.duplicate();
int limit = this.readPosition + length;
((Buffer) tmp).clear().position(this.readPosition).limit(limit);
tmp.get(destination, offset, length);
this.readPosition += length;
return this;
}
@Override
public DefaultDataBuffer write(byte b) {
ensureExtraCapacity(1);
writeInternal(buffer -> buffer.put(b));
ensureCapacity(1);
int pos = this.writePosition;
this.byteBuffer.put(pos, b);
this.writePosition = pos + 1;
return this;
}
@Override
public DefaultDataBuffer write(byte[] source) {
Assert.notNull(source, "'source' must not be null");
ensureExtraCapacity(source.length);
writeInternal(buffer -> buffer.put(source));
write(source, 0, source.length);
return this;
}
@Override
public DefaultDataBuffer write(byte[] source, int offset, int length) {
Assert.notNull(source, "'source' must not be null");
ensureExtraCapacity(length);
writeInternal(buffer -> buffer.put(source, offset, length));
ensureCapacity(length);
ByteBuffer tmp = this.byteBuffer.duplicate();
int limit = this.writePosition + length;
((Buffer) tmp).clear().position(this.writePosition).limit(limit);
tmp.put(source, offset, length);
this.writePosition += length;
return this;
}
......@@ -203,30 +299,24 @@ public class DefaultDataBuffer implements DataBuffer {
@Override
public DefaultDataBuffer write(ByteBuffer... byteBuffers) {
Assert.notEmpty(byteBuffers, "'byteBuffers' must not be empty");
int extraCapacity = Arrays.stream(byteBuffers).mapToInt(ByteBuffer::remaining).sum();
ensureExtraCapacity(extraCapacity);
Arrays.stream(byteBuffers)
.forEach(byteBuffer -> writeInternal(buffer -> buffer.put(byteBuffer)));
int capacity = Arrays.stream(byteBuffers).mapToInt(ByteBuffer::remaining).sum();
ensureCapacity(capacity);
Arrays.stream(byteBuffers).forEach(this::write);
return this;
}
/**
* Internal write method that keeps track of the {@link #writePosition} before and
* after applying the given function on {@link #byteBuffer}.
*/
private <T> T writeInternal(Function<ByteBuffer, T> function) {
// Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
((Buffer) this.byteBuffer).position(this.writePosition);
try {
return function.apply(this.byteBuffer);
}
finally {
this.writePosition = this.byteBuffer.position();
}
private void write(ByteBuffer source) {
int length = source.remaining();
ByteBuffer tmp = this.byteBuffer.duplicate();
int limit = this.writePosition + source.remaining();
((Buffer) tmp).clear().position(this.writePosition).limit(limit);
tmp.put(source);
this.writePosition += length;
}
@Override
public DataBuffer slice(int index, int length) {
checkIndex(index, length);
int oldPosition = this.byteBuffer.position();
// Explicit access via Buffer base type for compatibility
// with covariant return type on JDK 9's ByteBuffer...
......@@ -236,7 +326,7 @@ public class DefaultDataBuffer implements DataBuffer {
ByteBuffer slice = this.byteBuffer.slice();
// Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
((Buffer) slice).limit(length);
return new SlicedDefaultDataBuffer(slice, 0, length, this.dataBufferFactory);
return new SlicedDefaultDataBuffer(slice, this.dataBufferFactory, length);
}
finally {
buffer.position(oldPosition);
......@@ -245,13 +335,20 @@ public class DefaultDataBuffer implements DataBuffer {
@Override
public ByteBuffer asByteBuffer() {
return asByteBuffer(this.readPosition, readableByteCount());
}
@Override
public ByteBuffer asByteBuffer(int index, int length) {
checkIndex(index, length);
ByteBuffer duplicate = this.byteBuffer.duplicate();
// Explicit access via Buffer base type for compatibility
// with covariant return type on JDK 9's ByteBuffer...
Buffer buffer = duplicate;
buffer.position(this.readPosition);
buffer.limit(this.writePosition);
return duplicate;
buffer.position(index);
buffer.limit(index + length);
return duplicate.slice();
}
@Override
......@@ -264,11 +361,12 @@ public class DefaultDataBuffer implements DataBuffer {
return new DefaultDataBufferOutputStream();
}
private void ensureExtraCapacity(int extraCapacity) {
int neededCapacity = calculateCapacity(this.writePosition + extraCapacity);
if (neededCapacity > this.byteBuffer.capacity()) {
grow(neededCapacity);
private void ensureCapacity(int length) {
if (length <= writableByteCount()) {
return;
}
int newCapacity = calculateCapacity(this.writePosition + length);
capacity(newCapacity);
}
/**
......@@ -299,24 +397,6 @@ public class DefaultDataBuffer implements DataBuffer {
}
}
void grow(int capacity) {
ByteBuffer oldBuffer = this.byteBuffer;
ByteBuffer newBuffer =
(oldBuffer.isDirect() ? ByteBuffer.allocateDirect(capacity) :
ByteBuffer.allocate(capacity));
final int remaining = readableByteCount();
// Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
((Buffer) oldBuffer).position(this.readPosition).limit(this.writePosition);
newBuffer.put(oldBuffer);
this.byteBuffer = newBuffer;
this.readPosition = 0;
this.writePosition = remaining;
// Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
((Buffer) oldBuffer).clear();
}
@Override
public boolean equals(Object obj) {
......@@ -339,67 +419,78 @@ public class DefaultDataBuffer implements DataBuffer {
@Override
public String toString() {
return this.byteBuffer.toString();
return String.format("DefaultDataBuffer (r: %d, w %d, c %d)", this.readPosition,
this.writePosition, this.capacity);
}
private void checkIndex(int index, int length) {
assertIndex(index >= 0, "index %d must be >= 0", index);
assertIndex(length >= 0, "length %d must be >= 0", index);
assertIndex(index <= this.capacity, "index %d must be <= %d", index, this.capacity);
assertIndex(length <= this.capacity, "length %d must be <= %d", index, this.capacity);
}
private static void assertIndex(boolean expression, String format, Object... args) {
if (!expression) {
String message = String.format(format, args);
throw new IndexOutOfBoundsException(message);
}
}
private class DefaultDataBufferInputStream extends InputStream {
@Override
public int available() throws IOException {
public int available() {
return readableByteCount();
}
@Override
public int read() {
return readInternal(
buffer -> readableByteCount() > 0 ? buffer.get() & 0xFF : -1);
return available() > 0 ? DefaultDataBuffer.this.read() & 0xFF : -1;
}
@Override
public int read(byte[] bytes, int off, int len) throws IOException {
return readInternal(buffer -> {
int count = readableByteCount();
if (count > 0) {
int minLen = Math.min(len, count);
buffer.get(bytes, off, minLen);
return minLen;
}
else {
return -1;
}
});
int available = available();
if (available > 0) {
len = Math.min(len, available);
DefaultDataBuffer.this.read(bytes, off, len);
return len;
}
else {
return -1;
}
}
}
private class DefaultDataBufferOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
ensureExtraCapacity(1);
writeInternal(buffer -> buffer.put((byte) b));
DefaultDataBuffer.this.write((byte) b);
}
@Override
public void write(byte[] bytes, int off, int len) throws IOException {
ensureExtraCapacity(len);
writeInternal(buffer -> buffer.put(bytes, off, len));
DefaultDataBuffer.this.write(bytes, off, len);
}
}
private static class SlicedDefaultDataBuffer extends DefaultDataBuffer {
SlicedDefaultDataBuffer(ByteBuffer byteBuffer, int readPosition,
int writePosition, DefaultDataBufferFactory dataBufferFactory) {
super(byteBuffer, readPosition, writePosition, dataBufferFactory);
SlicedDefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory,
int length) {
super(dataBufferFactory, byteBuffer);
writePosition(length);
}
@Override
void grow(int capacity) {
public DataBuffer capacity(int newCapacity) {
throw new UnsupportedOperationException(
"Growing the capacity of a sliced buffer is not supported");
"Changing the capacity of a sliced buffer is not supported");
}
}
......
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -83,21 +83,23 @@ public class DefaultDataBufferFactory implements DataBufferFactory {
@Override
public DefaultDataBuffer allocateBuffer(int initialCapacity) {
return (this.preferDirect ?
new DefaultDataBuffer(ByteBuffer.allocateDirect(initialCapacity), this) :
new DefaultDataBuffer(ByteBuffer.allocate(initialCapacity), this));
ByteBuffer byteBuffer = (this.preferDirect ?
ByteBuffer.allocateDirect(initialCapacity) :
ByteBuffer.allocate(initialCapacity));
return DefaultDataBuffer.fromEmptyByteBuffer(this, byteBuffer);
}
@Override
public DefaultDataBuffer wrap(ByteBuffer byteBuffer) {
ByteBuffer sliced = byteBuffer.slice();
return new DefaultDataBuffer(sliced, 0, byteBuffer.remaining(), this);
return DefaultDataBuffer.fromFilledByteBuffer(this, sliced);
}
@Override
public DataBuffer wrap(byte[] bytes) {
ByteBuffer wrapper = ByteBuffer.wrap(bytes);
return new DefaultDataBuffer(wrapper, 0, bytes.length, this);
return DefaultDataBuffer.fromFilledByteBuffer(this, wrapper);
}
@Override
......
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -99,6 +99,44 @@ public class NettyDataBuffer implements PooledDataBuffer {
return this.byteBuf.readableBytes();
}
@Override
public int writableByteCount() {
return this.byteBuf.writableBytes();
}
@Override
public int readPosition() {
return this.byteBuf.readerIndex();
}
@Override
public DataBuffer readPosition(int readPosition) {
this.byteBuf.readerIndex(readPosition);
return this;
}
@Override
public int writePosition() {
return this.byteBuf.writerIndex();
}
@Override
public DataBuffer writePosition(int writePosition) {
this.byteBuf.writerIndex(writePosition);
return this;
}
@Override
public int capacity() {
return this.byteBuf.capacity();
}
@Override
public DataBuffer capacity(int capacity) {
this.byteBuf.capacity(capacity);
return this;
}
@Override
public byte read() {
return this.byteBuf.readByte();
......@@ -194,6 +232,11 @@ public class NettyDataBuffer implements PooledDataBuffer {
return this.byteBuf.nioBuffer();
}
@Override
public ByteBuffer asByteBuffer(int index, int length) {
return this.byteBuf.nioBuffer(index, length);
}
@Override
public InputStream asInputStream() {
return new ByteBufInputStream(this.byteBuf);
......
......@@ -80,8 +80,8 @@ public abstract class AbstractDataBufferAllocatingTestCase {
return dataBuffer -> {
String value =
DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8);
assertEquals(expected, value);
DataBufferUtils.release(dataBuffer);
assertEquals(expected, value);
};
}
......
......@@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.junit.Test;
......@@ -33,8 +32,107 @@ import static org.junit.Assert.*;
public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
@Test
public void writeAndRead() {
public void byteCountsAndPositions() {
DataBuffer buffer = createDataBuffer(2);
assertEquals(0, buffer.readPosition());
assertEquals(0, buffer.writePosition());
assertEquals(0, buffer.readableByteCount());
assertEquals(2, buffer.writableByteCount());
assertEquals(2, buffer.capacity());
buffer.write((byte) 'a');
assertEquals(0, buffer.readPosition());
assertEquals(1, buffer.writePosition());
assertEquals(1, buffer.readableByteCount());
assertEquals(1, buffer.writableByteCount());
assertEquals(2, buffer.capacity());
buffer.write((byte) 'b');
assertEquals(0, buffer.readPosition());
assertEquals(2, buffer.writePosition());
assertEquals(2, buffer.readableByteCount());
assertEquals(0, buffer.writableByteCount());
assertEquals(2, buffer.capacity());
buffer.read();
assertEquals(1, buffer.readPosition());
assertEquals(2, buffer.writePosition());
assertEquals(1, buffer.readableByteCount());
assertEquals(0, buffer.writableByteCount());
assertEquals(2, buffer.capacity());
buffer.read();
assertEquals(2, buffer.readPosition());
assertEquals(2, buffer.writePosition());
assertEquals(0, buffer.readableByteCount());
assertEquals(0, buffer.writableByteCount());
assertEquals(2, buffer.capacity());
release(buffer);
}
@Test
public void readPositionSmallerThanZero() {
DataBuffer buffer = createDataBuffer(1);
try {
buffer.readPosition(-1);
fail("IndexOutOfBoundsException expected");
}
catch (IndexOutOfBoundsException ignored) {
}
finally {
release(buffer);
}
}
@Test
public void readPositionGreaterThanWritePosition() {
DataBuffer buffer = createDataBuffer(1);
try {
buffer.readPosition(1);
fail("IndexOutOfBoundsException expected");
}
catch (IndexOutOfBoundsException ignored) {
}
finally {
release(buffer);
}
}
@Test
public void writePositionSmallerThanReadPosition() {
DataBuffer buffer = createDataBuffer(2);
try {
buffer.write((byte) 'a');
buffer.read();
buffer.writePosition(0);
fail("IndexOutOfBoundsException expected");
}
catch (IndexOutOfBoundsException ignored) {
}
finally {
release(buffer);
}
}
@Test
public void writePositionGreaterThanCapacity() {
DataBuffer buffer = createDataBuffer(1);
try {
buffer.writePosition(2);
fail("IndexOutOfBoundsException expected");
}
catch (IndexOutOfBoundsException ignored) {
}
finally {
release(buffer);
}
}
@Test
public void writeAndRead() {
DataBuffer buffer = createDataBuffer(5);
buffer.write(new byte[]{'a', 'b', 'c'});
......@@ -54,33 +152,32 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
@Test
public void inputStream() throws IOException {
byte[] data = new byte[]{'a', 'b', 'c', 'd', 'e'};
DataBuffer buffer = createDataBuffer(4);
buffer.write(data);
buffer.read(); // readIndex++
buffer.write(new byte[]{'a', 'b', 'c', 'd', 'e'});
buffer.readPosition(1);
InputStream inputStream = buffer.asInputStream();
int available = inputStream.available();
assertEquals(4, available);
assertEquals(4, inputStream.available());
int result = inputStream.read();
assertEquals('b', result);
available = inputStream.available();
assertEquals(3, available);
assertEquals(3, inputStream.available());
byte[] bytes = new byte[2];
int len = inputStream.read(bytes);
assertEquals(2, len);
assertArrayEquals(new byte[]{'c', 'd'}, bytes);
assertEquals(1, inputStream.available());
Arrays.fill(bytes, (byte) 0);
len = inputStream.read(bytes);
assertEquals(1, len);
assertArrayEquals(new byte[]{'e', (byte) 0}, bytes);
assertEquals(0, inputStream.available());
assertEquals(-1, inputStream.read());
assertEquals(-1, inputStream.read(bytes));
release(buffer);
}
......@@ -91,7 +188,8 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
buffer.write((byte) 'a');
OutputStream outputStream = buffer.asOutputStream();
outputStream.write(new byte[]{'b', 'c', 'd'});
outputStream.write('b');
outputStream.write(new byte[]{'c', 'd'});
buffer.write((byte) 'e');
......@@ -106,21 +204,60 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
public void expand() {
DataBuffer buffer = createDataBuffer(1);
buffer.write((byte) 'a');
assertEquals(1, buffer.capacity());
buffer.write((byte) 'b');
byte[] result = new byte[2];
buffer.read(result);
assertArrayEquals(new byte[]{'a', 'b'}, result);
assertTrue(buffer.capacity() > 1);
buffer.write(new byte[]{'c', 'd'});
release(buffer);
}
result = new byte[2];
buffer.read(result);
assertArrayEquals(new byte[]{'c', 'd'}, result);
@Test
public void increaseCapacity() {
DataBuffer buffer = createDataBuffer(1);
assertEquals(1, buffer.capacity());
buffer.capacity(2);
assertEquals(2, buffer.capacity());
release(buffer);
}
@Test
public void decreaseCapacityLowReadPosition() {
DataBuffer buffer = createDataBuffer(2);
buffer.writePosition(2);
buffer.capacity(1);
assertEquals(1, buffer.capacity());
release(buffer);
}
@Test
public void decreaseCapacityHighReadPosition() {
DataBuffer buffer = createDataBuffer(2);
buffer.writePosition(2);
buffer.readPosition(2);
buffer.capacity(1);
assertEquals(1, buffer.capacity());
release(buffer);
}
@Test
public void capacityLessThanZero() {
DataBuffer buffer = createDataBuffer(1);
try {
buffer.capacity(-1);
fail("IllegalArgumentException expected");
}
catch (IllegalArgumentException ignored) {
}
finally {
release(buffer);
}
}
@Test
public void writeByteBuffer() {
DataBuffer buffer1 = createDataBuffer(1);
......@@ -176,16 +313,74 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
buffer.read(); // skip a
ByteBuffer result = buffer.asByteBuffer();
assertEquals(2, result.capacity());
buffer.write((byte) 'd');
assertEquals(2, result.remaining());
byte[] resultBytes = new byte[2];
buffer.read(resultBytes);
result.get(resultBytes);
assertArrayEquals(new byte[]{'b', 'c'}, resultBytes);
release(buffer);
}
@Test
public void asByteBufferIndexLength() {
DataBuffer buffer = createDataBuffer(3);
buffer.write(new byte[]{'a', 'b'});
ByteBuffer result = buffer.asByteBuffer(1, 2);
assertEquals(2, result.capacity());
buffer.write((byte) 'c');
assertEquals(2, result.remaining());
byte[] resultBytes = new byte[2];
result.get(resultBytes);
assertArrayEquals(new byte[]{'b', 'c'}, resultBytes);
release(buffer);
}
@Test
public void byteBufferContainsDataBufferChanges() {
DataBuffer dataBuffer = createDataBuffer(1);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, 1);
dataBuffer.write((byte) 'a');
assertEquals(1, byteBuffer.limit());
byte b = byteBuffer.get();
assertEquals('a', b);
release(dataBuffer);
}
@Test
public void dataBufferContainsByteBufferChanges() {
DataBuffer dataBuffer = createDataBuffer(1);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, 1);
byteBuffer.put((byte) 'a');
dataBuffer.writePosition(1);
byte b = dataBuffer.read();
assertEquals('a', b);
release(dataBuffer);
}
@Test
public void emptyAsByteBuffer() {
DataBuffer buffer = createDataBuffer(1);
ByteBuffer result = buffer.asByteBuffer();
assertEquals(0, result.capacity());
release(buffer);
}
@Test
public void indexOf() {
DataBuffer buffer = createDataBuffer(3);
......@@ -244,7 +439,7 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
assertEquals(2, slice.readableByteCount());
try {
slice.write((byte) 0);
fail("IndexOutOfBoundsException expected");
fail("Exception expected");
}
catch (Exception ignored) {
}
......@@ -267,22 +462,4 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
}
@Test
public void growDataBuffer() {
DataBuffer buffer = stringBuffer("Hello World!");
byte[] bytes = new byte[5];
buffer.read(bytes);
assertArrayEquals("Hello".getBytes(StandardCharsets.UTF_8), bytes);
buffer.write("!!".getBytes(StandardCharsets.UTF_8));
bytes = new byte[9];
buffer.read(bytes);
assertArrayEquals(" World!!!".getBytes(StandardCharsets.UTF_8), bytes);
release(buffer);
}
}
\ No newline at end of file
......@@ -19,8 +19,10 @@ package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
......@@ -30,6 +32,7 @@ import java.time.Duration;
import java.util.stream.Collectors;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
......@@ -37,6 +40,8 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
/**
* @author Arjen Poutsma
......@@ -291,4 +296,56 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
// AbstractDataBufferAllocatingTestCase.LeakDetector will assert the release of the buffers
}
@Test
public void SPR16070() throws Exception {
ReadableByteChannel channel = mock(ReadableByteChannel.class);
when(channel.read(any()))
.thenAnswer(putByte(1))
.thenAnswer(putByte(2))
.thenAnswer(putByte(3))
.thenReturn(-1);
Flux<DataBuffer> read = DataBufferUtils.read(channel, this.bufferFactory, 1);
StepVerifier.create(
read.reduce(DataBuffer::write)
.map(this::dataBufferToBytes)
.map(this::encodeHexString)
)
.expectNext("010203")
.verifyComplete();
}
private Answer<Integer> putByte(int b) {
return invocation -> {
ByteBuffer buffer = invocation.getArgument(0);
buffer.put((byte) b);
return 1;
};
}
private byte[] dataBufferToBytes(DataBuffer buffer) {
try {
int byteCount = buffer.readableByteCount();
byte[] bytes = new byte[byteCount];
buffer.read(bytes);
return bytes;
}
finally {
release(buffer);
}
}
private String encodeHexString(byte[] data) {
StringBuilder builder = new StringBuilder();
for (byte b : data) {
builder.append((0xF0 & b) >>> 4);
builder.append(0x0F & b);
}
return builder.toString();
}
}
......@@ -30,6 +30,7 @@ import org.apache.catalina.connector.CoyoteOutputStream;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
/**
* {@link ServletHttpHandlerAdapter} extension that uses Tomcat APIs for reading
......@@ -67,21 +68,32 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected DataBuffer readFromInputStream() throws IOException {
DataBuffer buffer = getDataBufferFactory().allocateBuffer(getBufferSize());
ByteBuffer byteBuffer = buffer.asByteBuffer();
byteBuffer.limit(byteBuffer.capacity());
ServletRequest request = getNativeRequest();
int read = ((CoyoteInputStream) request.getInputStream()).read(byteBuffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
boolean release = true;
int capacity = getBufferSize();
DataBuffer dataBuffer = getDataBufferFactory().allocateBuffer(capacity);
try {
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, capacity);
ServletRequest request = getNativeRequest();
int read = ((CoyoteInputStream) request.getInputStream()).read(byteBuffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
}
if (read > 0) {
dataBuffer.writePosition(read);
release = false;
return dataBuffer;
}
else {
return null;
}
}
if (read > 0) {
return getDataBufferFactory().wrap(byteBuffer);
finally {
if (release) {
DataBufferUtils.release(dataBuffer);
}
}
return null;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册