提交 4219640f 编写于 作者: A alanb

6971825: (so) improve scatter/gather implementation

Reviewed-by: chegar, sherman
上级 ee7b3a88
...@@ -536,9 +536,11 @@ class DatagramChannelImpl ...@@ -536,9 +536,11 @@ class DatagramChannelImpl
} }
} }
private long read0(ByteBuffer[] bufs) throws IOException { public long read(ByteBuffer[] dsts, int offset, int length)
if (bufs == null) throws IOException
throw new NullPointerException(); {
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
synchronized (readLock) { synchronized (readLock) {
synchronized (stateLock) { synchronized (stateLock) {
ensureOpen(); ensureOpen();
...@@ -552,7 +554,7 @@ class DatagramChannelImpl ...@@ -552,7 +554,7 @@ class DatagramChannelImpl
return 0; return 0;
readerThread = NativeThread.current(); readerThread = NativeThread.current();
do { do {
n = IOUtil.read(fd, bufs, nd); n = IOUtil.read(fd, dsts, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -563,15 +565,6 @@ class DatagramChannelImpl ...@@ -563,15 +565,6 @@ class DatagramChannelImpl
} }
} }
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
// ## Fix IOUtil.write so that we can avoid this array copy
return read0(Util.subsequence(dsts, offset, length));
}
public int write(ByteBuffer buf) throws IOException { public int write(ByteBuffer buf) throws IOException {
if (buf == null) if (buf == null)
throw new NullPointerException(); throw new NullPointerException();
...@@ -599,9 +592,11 @@ class DatagramChannelImpl ...@@ -599,9 +592,11 @@ class DatagramChannelImpl
} }
} }
private long write0(ByteBuffer[] bufs) throws IOException { public long write(ByteBuffer[] srcs, int offset, int length)
if (bufs == null) throws IOException
throw new NullPointerException(); {
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
synchronized (writeLock) { synchronized (writeLock) {
synchronized (stateLock) { synchronized (stateLock) {
ensureOpen(); ensureOpen();
...@@ -615,7 +610,7 @@ class DatagramChannelImpl ...@@ -615,7 +610,7 @@ class DatagramChannelImpl
return 0; return 0;
writerThread = NativeThread.current(); writerThread = NativeThread.current();
do { do {
n = IOUtil.write(fd, bufs, nd); n = IOUtil.write(fd, srcs, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -626,15 +621,6 @@ class DatagramChannelImpl ...@@ -626,15 +621,6 @@ class DatagramChannelImpl
} }
} }
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
// ## Fix IOUtil.write so that we can avoid this array copy
return write0(Util.subsequence(srcs, offset, length));
}
protected void implConfigureBlocking(boolean block) throws IOException { protected void implConfigureBlocking(boolean block) throws IOException {
IOUtil.configureBlocking(fd, block); IOUtil.configureBlocking(fd, block);
} }
......
...@@ -143,7 +143,11 @@ public class FileChannelImpl ...@@ -143,7 +143,11 @@ public class FileChannelImpl
} }
} }
private long read0(ByteBuffer[] dsts) throws IOException { public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
ensureOpen(); ensureOpen();
if (!readable) if (!readable)
throw new NonReadableChannelException(); throw new NonReadableChannelException();
...@@ -156,7 +160,7 @@ public class FileChannelImpl ...@@ -156,7 +160,7 @@ public class FileChannelImpl
if (!isOpen()) if (!isOpen())
return 0; return 0;
do { do {
n = IOUtil.read(fd, dsts, nd); n = IOUtil.read(fd, dsts, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -167,15 +171,6 @@ public class FileChannelImpl ...@@ -167,15 +171,6 @@ public class FileChannelImpl
} }
} }
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
// ## Fix IOUtil.write so that we can avoid this array copy
return read0(Util.subsequence(dsts, offset, length));
}
public int write(ByteBuffer src) throws IOException { public int write(ByteBuffer src) throws IOException {
ensureOpen(); ensureOpen();
if (!writable) if (!writable)
...@@ -200,7 +195,11 @@ public class FileChannelImpl ...@@ -200,7 +195,11 @@ public class FileChannelImpl
} }
} }
private long write0(ByteBuffer[] srcs) throws IOException { public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
ensureOpen(); ensureOpen();
if (!writable) if (!writable)
throw new NonWritableChannelException(); throw new NonWritableChannelException();
...@@ -213,7 +212,7 @@ public class FileChannelImpl ...@@ -213,7 +212,7 @@ public class FileChannelImpl
if (!isOpen()) if (!isOpen())
return 0; return 0;
do { do {
n = IOUtil.write(fd, srcs, nd); n = IOUtil.write(fd, srcs, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -224,16 +223,6 @@ public class FileChannelImpl ...@@ -224,16 +223,6 @@ public class FileChannelImpl
} }
} }
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
// ## Fix IOUtil.write so that we can avoid this array copy
return write0(Util.subsequence(srcs, offset, length));
}
// -- Other operations -- // -- Other operations --
public long position() throws IOException { public long position() throws IOException {
......
...@@ -38,34 +38,6 @@ class IOUtil { ...@@ -38,34 +38,6 @@ class IOUtil {
private IOUtil() { } // No instantiation private IOUtil() { } // No instantiation
/*
* Returns the index of first buffer in bufs with remaining,
* or -1 if there is nothing left
*/
private static int remaining(ByteBuffer[] bufs) {
int numBufs = bufs.length;
for (int i=0; i<numBufs; i++) {
if (bufs[i].hasRemaining()) {
return i;
}
}
return -1;
}
/*
* Returns a new ByteBuffer array with only unfinished buffers in it
*/
private static ByteBuffer[] skipBufs(ByteBuffer[] bufs,
int nextWithRemaining)
{
int newSize = bufs.length - nextWithRemaining;
ByteBuffer[] temp = new ByteBuffer[newSize];
for (int i=0; i<newSize; i++) {
temp[i] = bufs[i + nextWithRemaining];
}
return temp;
}
static int write(FileDescriptor fd, ByteBuffer src, long position, static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd, Object lock) NativeDispatcher nd, Object lock)
throws IOException throws IOException
...@@ -93,7 +65,7 @@ class IOUtil { ...@@ -93,7 +65,7 @@ class IOUtil {
} }
return n; return n;
} finally { } finally {
Util.releaseTemporaryDirectBuffer(bb); Util.offerFirstTemporaryDirectBuffer(bb);
} }
} }
...@@ -125,88 +97,81 @@ class IOUtil { ...@@ -125,88 +97,81 @@ class IOUtil {
static long write(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) static long write(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd)
throws IOException throws IOException
{ {
int nextWithRemaining = remaining(bufs); return write(fd, bufs, 0, bufs.length, nd);
// if all bufs are empty we should return immediately }
if (nextWithRemaining < 0)
return 0;
// If some bufs are empty we should skip them
if (nextWithRemaining > 0)
bufs = skipBufs(bufs, nextWithRemaining);
int numBufs = bufs.length; static long write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
NativeDispatcher nd)
throws IOException
{
IOVecWrapper vec = IOVecWrapper.get(length);
// Create shadow to ensure DirectByteBuffers are used boolean completed = false;
ByteBuffer[] shadow = new ByteBuffer[numBufs]; int iov_len = 0;
try { try {
for (int i=0; i<numBufs; i++) {
if (!(bufs[i] instanceof DirectBuffer)) { // Iterate over buffers to populate native iovec array.
int pos = bufs[i].position(); int count = offset + length;
int lim = bufs[i].limit(); for (int i=offset; i<count; i++) {
ByteBuffer buf = bufs[i];
int pos = buf.position();
int lim = buf.limit();
assert (pos <= lim); assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0); int rem = (pos <= lim ? lim - pos : 0);
if (rem > 0) {
vec.setBuffer(iov_len, buf, pos, rem);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); // allocate shadow buffer to ensure I/O is done with direct buffer
shadow[i] = bb; if (!(buf instanceof DirectBuffer)) {
// Leave slow buffer position untouched; it will be updated ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
// after we see how many bytes were really written out shadow.put(buf);
bb.put(bufs[i]); shadow.flip();
bufs[i].position(pos); vec.setShadow(iov_len, shadow);
bb.flip(); buf.position(pos); // temporarily restore position in user buffer
} else { buf = shadow;
shadow[i] = bufs[i]; pos = shadow.position();
}
} }
IOVecWrapper vec = null; vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
long bytesWritten = 0; vec.putLen(iov_len, rem);
try { iov_len++;
// Create a native iovec array
vec= new IOVecWrapper(numBufs);
// Fill in the iovec array with appropriate data
for (int i=0; i<numBufs; i++) {
ByteBuffer nextBuffer = shadow[i];
// put in the buffer addresses
long pos = nextBuffer.position();
long len = nextBuffer.limit() - pos;
vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos);
vec.putLen(i, len);
} }
// Invoke native call to fill the buffers
bytesWritten = nd.writev(fd, vec.address, numBufs);
} finally {
vec.free();
} }
long returnVal = bytesWritten; if (iov_len == 0)
return 0L;
long bytesWritten = nd.writev(fd, vec.address, iov_len);
// Notify the buffers how many bytes were taken // Notify the buffers how many bytes were taken
for (int i=0; i<numBufs; i++) { long left = bytesWritten;
ByteBuffer nextBuffer = bufs[i]; for (int j=0; j<iov_len; j++) {
int pos = nextBuffer.position(); if (left > 0) {
int lim = nextBuffer.limit(); ByteBuffer buf = vec.getBuffer(j);
assert (pos <= lim); int pos = vec.getPosition(j);
int len = (pos <= lim ? lim - pos : lim); int rem = vec.getRemaining(j);
if (bytesWritten >= len) { int n = (left > rem) ? rem : (int)left;
bytesWritten -= len; buf.position(pos + n);
int newPosition = pos + len; left -= n;
nextBuffer.position(newPosition); }
} else { // Buffers not completely filled // return shadow buffers to buffer pool
if (bytesWritten > 0) { ByteBuffer shadow = vec.getShadow(j);
assert(pos + bytesWritten < (long)Integer.MAX_VALUE); if (shadow != null)
int newPosition = (int)(pos + bytesWritten); Util.offerLastTemporaryDirectBuffer(shadow);
nextBuffer.position(newPosition); vec.clearRefs(j);
} }
break;
} completed = true;
} return bytesWritten;
return returnVal;
} finally { } finally {
// return any substituted buffers to cache // if an error occurred then clear refs to buffers and return any shadow
for (int i=0; i<numBufs; i++) { // buffers to cache
ByteBuffer bb = shadow[i]; if (!completed) {
if (bb != null && bb != bufs[i]) { for (int j=0; j<iov_len; j++) {
Util.releaseTemporaryDirectBuffer(bb); ByteBuffer shadow = vec.getShadow(j);
if (shadow != null)
Util.offerLastTemporaryDirectBuffer(shadow);
vec.clearRefs(j);
} }
} }
} }
...@@ -231,7 +196,7 @@ class IOUtil { ...@@ -231,7 +196,7 @@ class IOUtil {
dst.put(bb); dst.put(bb);
return n; return n;
} finally { } finally {
Util.releaseTemporaryDirectBuffer(bb); Util.offerFirstTemporaryDirectBuffer(bb);
} }
} }
...@@ -262,92 +227,85 @@ class IOUtil { ...@@ -262,92 +227,85 @@ class IOUtil {
static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd) static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd)
throws IOException throws IOException
{ {
int nextWithRemaining = remaining(bufs); return read(fd, bufs, 0, bufs.length, nd);
// if all bufs are empty we should return immediately }
if (nextWithRemaining < 0)
return 0;
// If some bufs are empty we should skip them
if (nextWithRemaining > 0)
bufs = skipBufs(bufs, nextWithRemaining);
int numBufs = bufs.length; static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
NativeDispatcher nd)
throws IOException
{
IOVecWrapper vec = IOVecWrapper.get(length);
// Read into the shadow to ensure DirectByteBuffers are used boolean completed = false;
ByteBuffer[] shadow = new ByteBuffer[numBufs]; int iov_len = 0;
boolean usingSlowBuffers = false;
try { try {
for (int i=0; i<numBufs; i++) {
if (bufs[i].isReadOnly()) // Iterate over buffers to populate native iovec array.
int count = offset + length;
for (int i=offset; i<count; i++) {
ByteBuffer buf = bufs[i];
if (buf.isReadOnly())
throw new IllegalArgumentException("Read-only buffer"); throw new IllegalArgumentException("Read-only buffer");
if (!(bufs[i] instanceof DirectBuffer)) { int pos = buf.position();
shadow[i] = Util.getTemporaryDirectBuffer(bufs[i].remaining()); int lim = buf.limit();
usingSlowBuffers = true; assert (pos <= lim);
} else { int rem = (pos <= lim ? lim - pos : 0);
shadow[i] = bufs[i];
}
}
IOVecWrapper vec = null; if (rem > 0) {
long bytesRead = 0; vec.setBuffer(iov_len, buf, pos, rem);
try {
// Create a native iovec array
vec = new IOVecWrapper(numBufs);
// Fill in the iovec array with appropriate data // allocate shadow buffer to ensure I/O is done with direct buffer
for (int i=0; i<numBufs; i++) { if (!(buf instanceof DirectBuffer)) {
ByteBuffer nextBuffer = shadow[i]; ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
// put in the buffer addresses vec.setShadow(iov_len, shadow);
long pos = nextBuffer.position(); buf = shadow;
long len = nextBuffer.remaining(); pos = shadow.position();
vec.putBase(i, ((DirectBuffer)nextBuffer).address() + pos);
vec.putLen(i, len);
} }
// Invoke native call to fill the buffers vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
bytesRead = nd.readv(fd, vec.address, numBufs); vec.putLen(iov_len, rem);
} finally { iov_len++;
vec.free(); }
} }
long returnVal = bytesRead; if (iov_len == 0)
return 0L;
long bytesRead = nd.readv(fd, vec.address, iov_len);
// Notify the buffers how many bytes were read // Notify the buffers how many bytes were read
for (int i=0; i<numBufs; i++) { long left = bytesRead;
ByteBuffer nextBuffer = shadow[i]; for (int j=0; j<iov_len; j++) {
// Note: should this have been cached from above? ByteBuffer shadow = vec.getShadow(j);
int pos = nextBuffer.position(); if (left > 0) {
int len = nextBuffer.remaining(); ByteBuffer buf = vec.getBuffer(j);
if (bytesRead >= len) { int rem = vec.getRemaining(j);
bytesRead -= len; int n = (left > rem) ? rem : (int)left;
int newPosition = pos + len; if (shadow == null) {
nextBuffer.position(newPosition); int pos = vec.getPosition(j);
} else { // Buffers not completely filled buf.position(pos + n);
if (bytesRead > 0) { } else {
assert(pos + bytesRead < (long)Integer.MAX_VALUE); shadow.limit(shadow.position() + n);
int newPosition = (int)(pos + bytesRead); buf.put(shadow);
nextBuffer.position(newPosition);
} }
break; left -= n;
} }
if (shadow != null)
Util.offerLastTemporaryDirectBuffer(shadow);
vec.clearRefs(j);
} }
// Put results from shadow into the slow buffers completed = true;
if (usingSlowBuffers) { return bytesRead;
for (int i=0; i<numBufs; i++) {
if (!(bufs[i] instanceof DirectBuffer)) {
shadow[i].flip();
bufs[i].put(shadow[i]);
}
}
}
return returnVal;
} finally { } finally {
// return any substituted buffers to cache // if an error occurred then clear refs to buffers and return any shadow
if (usingSlowBuffers) { // buffers to cache
for (int i=0; i<numBufs; i++) { if (!completed) {
ByteBuffer bb = shadow[i]; for (int j=0; j<iov_len; j++) {
if (bb != null && bb != bufs[i]) { ByteBuffer shadow = vec.getShadow(j);
Util.releaseTemporaryDirectBuffer(bb); if (shadow != null)
} Util.offerLastTemporaryDirectBuffer(shadow);
vec.clearRefs(j);
} }
} }
} }
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
package sun.nio.ch; package sun.nio.ch;
import java.nio.ByteBuffer;
import sun.misc.*; import sun.misc.*;
...@@ -43,23 +44,98 @@ import sun.misc.*; ...@@ -43,23 +44,98 @@ import sun.misc.*;
class IOVecWrapper { class IOVecWrapper {
// Miscellaneous constants // Miscellaneous constants
static int BASE_OFFSET = 0; private static final int BASE_OFFSET = 0;
static int LEN_OFFSET; private static final int LEN_OFFSET;
static int SIZE_IOVEC; private static final int SIZE_IOVEC;
// The iovec array // The iovec array
private AllocatedNativeObject vecArray; private final AllocatedNativeObject vecArray;
// Number of elements in iovec array
private final int size;
// Buffers and position/remaining corresponding to elements in iovec array
private final ByteBuffer[] buf;
private final int[] position;
private final int[] remaining;
// Shadow buffers for cases when original buffer is substituted
private final ByteBuffer[] shadow;
// Base address of this array // Base address of this array
long address; final long address;
// Address size in bytes // Address size in bytes
static int addressSize; static int addressSize;
IOVecWrapper(int newSize) { private static class Deallocator implements Runnable {
newSize = (newSize + 1) * SIZE_IOVEC; private final AllocatedNativeObject obj;
vecArray = new AllocatedNativeObject(newSize, false); Deallocator(AllocatedNativeObject obj) {
address = vecArray.address(); this.obj = obj;
}
public void run() {
obj.free();
}
}
// per thread IOVecWrapper
private static final ThreadLocal<IOVecWrapper> cached =
new ThreadLocal<IOVecWrapper>();
private IOVecWrapper(int size) {
this.size = size;
this.buf = new ByteBuffer[size];
this.position = new int[size];
this.remaining = new int[size];
this.shadow = new ByteBuffer[size];
this.vecArray = new AllocatedNativeObject(size * SIZE_IOVEC, false);
this.address = vecArray.address();
}
static IOVecWrapper get(int size) {
IOVecWrapper wrapper = cached.get();
if (wrapper != null && wrapper.size < size) {
// not big enough; eagerly release memory
wrapper.vecArray.free();
wrapper = null;
}
if (wrapper == null) {
wrapper = new IOVecWrapper(size);
Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
cached.set(wrapper);
}
return wrapper;
}
void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
this.buf[i] = buf;
this.position[i] = pos;
this.remaining[i] = rem;
}
void setShadow(int i, ByteBuffer buf) {
shadow[i] = buf;
}
ByteBuffer getBuffer(int i) {
return buf[i];
}
int getPosition(int i) {
return position[i];
}
int getRemaining(int i) {
return remaining[i];
}
ByteBuffer getShadow(int i) {
return shadow[i];
}
void clearRefs(int i) {
buf[i] = null;
shadow[i] = null;
} }
void putBase(int i, long base) { void putBase(int i, long base) {
...@@ -78,10 +154,6 @@ class IOVecWrapper { ...@@ -78,10 +154,6 @@ class IOVecWrapper {
vecArray.putLong(offset, len); vecArray.putLong(offset, len);
} }
void free() {
vecArray.free();
}
static { static {
addressSize = Util.unsafe().addressSize(); addressSize = Util.unsafe().addressSize();
LEN_OFFSET = addressSize; LEN_OFFSET = addressSize;
......
...@@ -385,9 +385,11 @@ class SocketChannelImpl ...@@ -385,9 +385,11 @@ class SocketChannelImpl
} }
} }
private long read0(ByteBuffer[] bufs) throws IOException { public long read(ByteBuffer[] dsts, int offset, int length)
if (bufs == null) throws IOException
throw new NullPointerException(); {
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
synchronized (readLock) { synchronized (readLock) {
if (!ensureReadOpen()) if (!ensureReadOpen())
return -1; return -1;
...@@ -401,7 +403,7 @@ class SocketChannelImpl ...@@ -401,7 +403,7 @@ class SocketChannelImpl
} }
for (;;) { for (;;) {
n = IOUtil.read(fd, bufs, nd); n = IOUtil.read(fd, dsts, offset, length, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen()) if ((n == IOStatus.INTERRUPTED) && isOpen())
continue; continue;
return IOStatus.normalize(n); return IOStatus.normalize(n);
...@@ -418,15 +420,6 @@ class SocketChannelImpl ...@@ -418,15 +420,6 @@ class SocketChannelImpl
} }
} }
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
// ## Fix IOUtil.write so that we can avoid this array copy
return read0(Util.subsequence(dsts, offset, length));
}
public int write(ByteBuffer buf) throws IOException { public int write(ByteBuffer buf) throws IOException {
if (buf == null) if (buf == null)
throw new NullPointerException(); throw new NullPointerException();
...@@ -458,9 +451,11 @@ class SocketChannelImpl ...@@ -458,9 +451,11 @@ class SocketChannelImpl
} }
} }
public long write0(ByteBuffer[] bufs) throws IOException { public long write(ByteBuffer[] srcs, int offset, int length)
if (bufs == null) throws IOException
throw new NullPointerException(); {
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
synchronized (writeLock) { synchronized (writeLock) {
ensureWriteOpen(); ensureWriteOpen();
long n = 0; long n = 0;
...@@ -472,7 +467,7 @@ class SocketChannelImpl ...@@ -472,7 +467,7 @@ class SocketChannelImpl
writerThread = NativeThread.current(); writerThread = NativeThread.current();
} }
for (;;) { for (;;) {
n = IOUtil.write(fd, bufs, nd); n = IOUtil.write(fd, srcs, offset, length, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen()) if ((n == IOStatus.INTERRUPTED) && isOpen())
continue; continue;
return IOStatus.normalize(n); return IOStatus.normalize(n);
...@@ -489,15 +484,6 @@ class SocketChannelImpl ...@@ -489,15 +484,6 @@ class SocketChannelImpl
} }
} }
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
// ## Fix IOUtil.write so that we can avoid this array copy
return write0(Util.subsequence(srcs, offset, length));
}
// package-private // package-private
int sendOutOfBandData(byte b) throws IOException { int sendOutOfBandData(byte b) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
......
...@@ -41,66 +41,179 @@ import sun.security.action.GetPropertyAction; ...@@ -41,66 +41,179 @@ import sun.security.action.GetPropertyAction;
class Util { class Util {
// -- Caches -- // -- Caches --
// The number of temp buffers in our pool // The number of temp buffers in our pool
private static final int TEMP_BUF_POOL_SIZE = 3; private static final int TEMP_BUF_POOL_SIZE = 8;
// Per-thread cache of temporary direct buffers
private static ThreadLocal<BufferCache> bufferCache =
new ThreadLocal<BufferCache>()
{
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
};
/**
* A simple cache of direct buffers.
*/
private static class BufferCache {
// the array of buffers
private ByteBuffer[] buffers;
// the number of buffers in the cache
private int count;
// Per-thread soft cache of the last temporary direct buffer // the index of the first valid buffer (undefined if count == 0)
private static ThreadLocal<SoftReference<ByteBuffer>>[] bufferPool; private int start;
@SuppressWarnings("unchecked") private int next(int i) {
static ThreadLocal<SoftReference<ByteBuffer>>[] createThreadLocalBufferPool() { return (i + 1) % TEMP_BUF_POOL_SIZE;
return new ThreadLocal[TEMP_BUF_POOL_SIZE];
} }
static { BufferCache() {
bufferPool = createThreadLocalBufferPool(); buffers = new ByteBuffer[TEMP_BUF_POOL_SIZE];
for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
bufferPool[i] = new ThreadLocal<SoftReference<ByteBuffer>>();
} }
static ByteBuffer getTemporaryDirectBuffer(int size) { /**
ByteBuffer buf = null; * Removes and returns a buffer from the cache of at least the given
// Grab a buffer if available * size (or null if no suitable buffer is found).
for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) { */
SoftReference<ByteBuffer> ref = bufferPool[i].get(); ByteBuffer get(int size) {
if ((ref != null) && ((buf = ref.get()) != null) && if (count == 0)
(buf.capacity() >= size)) { return null; // cache is empty
ByteBuffer[] buffers = this.buffers;
// search for suitable buffer (often the first buffer will do)
ByteBuffer buf = buffers[start];
if (buf.capacity() < size) {
buf = null;
int i = start;
while ((i = next(i)) != start) {
ByteBuffer bb = buffers[i];
if (bb == null)
break;
if (bb.capacity() >= size) {
buf = bb;
break;
}
}
if (buf == null)
return null;
// move first element to here to avoid re-packing
buffers[i] = buffers[start];
}
// remove first element
buffers[start] = null;
start = next(start);
count--;
// prepare the buffer and return it
buf.rewind(); buf.rewind();
buf.limit(size); buf.limit(size);
bufferPool[i].set(null); return buf;
}
boolean offerFirst(ByteBuffer buf) {
if (count >= TEMP_BUF_POOL_SIZE) {
return false;
} else {
start = (start + TEMP_BUF_POOL_SIZE - 1) % TEMP_BUF_POOL_SIZE;
buffers[start] = buf;
count++;
return true;
}
}
boolean offerLast(ByteBuffer buf) {
if (count >= TEMP_BUF_POOL_SIZE) {
return false;
} else {
int next = (start + count) % TEMP_BUF_POOL_SIZE;
buffers[next] = buf;
count++;
return true;
}
}
boolean isEmpty() {
return count == 0;
}
ByteBuffer removeFirst() {
assert count > 0;
ByteBuffer buf = buffers[start];
buffers[start] = null;
start = next(start);
count--;
return buf; return buf;
} }
} }
// Make a new one /**
* Returns a temporary buffer of at least the given size
*/
static ByteBuffer getTemporaryDirectBuffer(int size) {
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size); return ByteBuffer.allocateDirect(size);
} }
}
/**
* Releases a temporary buffer by returning to the cache or freeing it.
*/
static void releaseTemporaryDirectBuffer(ByteBuffer buf) { static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
if (buf == null) offerFirstTemporaryDirectBuffer(buf);
return;
// Put it in an empty slot if such exists
for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
SoftReference<ByteBuffer> ref = bufferPool[i].get();
if ((ref == null) || (ref.get() == null)) {
bufferPool[i].set(new SoftReference<ByteBuffer>(buf));
return;
} }
/**
* Releases a temporary buffer by returning to the cache or freeing it. If
* returning to the cache then insert it at the start so that it is
* likely to be returned by a subsequent call to getTemporaryDirectBuffer.
*/
static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
assert buf != null;
BufferCache cache = bufferCache.get();
if (!cache.offerFirst(buf)) {
// cache is full
free(buf);
} }
// Otherwise replace a smaller one in the cache if such exists }
for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
SoftReference<ByteBuffer> ref = bufferPool[i].get(); /**
ByteBuffer inCacheBuf = ref.get(); * Releases a temporary buffer by returning to the cache or freeing it. If
if ((inCacheBuf == null) || (buf.capacity() > inCacheBuf.capacity())) { * returning to the cache then insert it at the end. This makes it
bufferPool[i].set(new SoftReference<ByteBuffer>(buf)); * suitable for scatter/gather operations where the buffers are returned to
return; * cache in same order that they were obtained.
*/
static void offerLastTemporaryDirectBuffer(ByteBuffer buf) {
assert buf != null;
BufferCache cache = bufferCache.get();
if (!cache.offerLast(buf)) {
// cache is full
free(buf);
} }
} }
// release memory /**
* Frees the memory for the given direct buffer
*/
private static void free(ByteBuffer buf) {
((DirectBuffer)buf).cleaner().clean(); ((DirectBuffer)buf).cleaner().clean();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册