提交 b315e54c 编写于 作者: A alanb

8012019: (fc) Thread.interrupt triggers hang in FileChannelImpl.pread (win)

Reviewed-by: chegar
上级 0b6e29e7
...@@ -538,7 +538,7 @@ class DatagramChannelImpl ...@@ -538,7 +538,7 @@ class DatagramChannelImpl
return 0; return 0;
readerThread = NativeThread.current(); readerThread = NativeThread.current();
do { do {
n = IOUtil.read(fd, buf, -1, nd, readLock); n = IOUtil.read(fd, buf, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -594,7 +594,7 @@ class DatagramChannelImpl ...@@ -594,7 +594,7 @@ class DatagramChannelImpl
return 0; return 0;
writerThread = NativeThread.current(); writerThread = NativeThread.current();
do { do {
n = IOUtil.write(fd, buf, -1, nd, writeLock); n = IOUtil.write(fd, buf, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
......
...@@ -140,7 +140,7 @@ public class FileChannelImpl ...@@ -140,7 +140,7 @@ public class FileChannelImpl
if (!isOpen()) if (!isOpen())
return 0; return 0;
do { do {
n = IOUtil.read(fd, dst, -1, nd, positionLock); n = IOUtil.read(fd, dst, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -192,7 +192,7 @@ public class FileChannelImpl ...@@ -192,7 +192,7 @@ public class FileChannelImpl
if (!isOpen()) if (!isOpen())
return 0; return 0;
do { do {
n = IOUtil.write(fd, src, -1, nd, positionLock); n = IOUtil.write(fd, src, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -671,6 +671,17 @@ public class FileChannelImpl ...@@ -671,6 +671,17 @@ public class FileChannelImpl
if (!readable) if (!readable)
throw new NonReadableChannelException(); throw new NonReadableChannelException();
ensureOpen(); ensureOpen();
if (nd.needsPositionLock()) {
synchronized (positionLock) {
return readInternal(dst, position);
}
} else {
return readInternal(dst, position);
}
}
private int readInternal(ByteBuffer dst, long position) throws IOException {
assert !nd.needsPositionLock() || Thread.holdsLock(positionLock);
int n = 0; int n = 0;
int ti = -1; int ti = -1;
try { try {
...@@ -679,7 +690,7 @@ public class FileChannelImpl ...@@ -679,7 +690,7 @@ public class FileChannelImpl
if (!isOpen()) if (!isOpen())
return -1; return -1;
do { do {
n = IOUtil.read(fd, dst, position, nd, positionLock); n = IOUtil.read(fd, dst, position, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
...@@ -697,6 +708,17 @@ public class FileChannelImpl ...@@ -697,6 +708,17 @@ public class FileChannelImpl
if (!writable) if (!writable)
throw new NonWritableChannelException(); throw new NonWritableChannelException();
ensureOpen(); ensureOpen();
if (nd.needsPositionLock()) {
synchronized (positionLock) {
return writeInternal(src, position);
}
} else {
return writeInternal(src, position);
}
}
private int writeInternal(ByteBuffer src, long position) throws IOException {
assert !nd.needsPositionLock() || Thread.holdsLock(positionLock);
int n = 0; int n = 0;
int ti = -1; int ti = -1;
try { try {
...@@ -705,7 +727,7 @@ public class FileChannelImpl ...@@ -705,7 +727,7 @@ public class FileChannelImpl
if (!isOpen()) if (!isOpen())
return -1; return -1;
do { do {
n = IOUtil.write(fd, src, position, nd, positionLock); n = IOUtil.write(fd, src, position, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
......
...@@ -44,11 +44,11 @@ public class IOUtil { ...@@ -44,11 +44,11 @@ public class IOUtil {
private IOUtil() { } // No instantiation private IOUtil() { } // No instantiation
static int write(FileDescriptor fd, ByteBuffer src, long position, static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd, Object lock) NativeDispatcher nd)
throws IOException throws IOException
{ {
if (src instanceof DirectBuffer) if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd, lock); return writeFromNativeBuffer(fd, src, position, nd);
// Substitute a native buffer // Substitute a native buffer
int pos = src.position(); int pos = src.position();
...@@ -62,7 +62,7 @@ public class IOUtil { ...@@ -62,7 +62,7 @@ public class IOUtil {
// Do not update src until we see how many bytes were written // Do not update src until we see how many bytes were written
src.position(pos); src.position(pos);
int n = writeFromNativeBuffer(fd, bb, position, nd, lock); int n = writeFromNativeBuffer(fd, bb, position, nd);
if (n > 0) { if (n > 0) {
// now update src // now update src
src.position(pos + n); src.position(pos + n);
...@@ -74,8 +74,7 @@ public class IOUtil { ...@@ -74,8 +74,7 @@ public class IOUtil {
} }
private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, NativeDispatcher nd, long position, NativeDispatcher nd)
Object lock)
throws IOException throws IOException
{ {
int pos = bb.position(); int pos = bb.position();
...@@ -89,7 +88,7 @@ public class IOUtil { ...@@ -89,7 +88,7 @@ public class IOUtil {
if (position != -1) { if (position != -1) {
written = nd.pwrite(fd, written = nd.pwrite(fd,
((DirectBuffer)bb).address() + pos, ((DirectBuffer)bb).address() + pos,
rem, position, lock); rem, position);
} else { } else {
written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem); written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
} }
...@@ -184,18 +183,18 @@ public class IOUtil { ...@@ -184,18 +183,18 @@ public class IOUtil {
} }
static int read(FileDescriptor fd, ByteBuffer dst, long position, static int read(FileDescriptor fd, ByteBuffer dst, long position,
NativeDispatcher nd, Object lock) NativeDispatcher nd)
throws IOException throws IOException
{ {
if (dst.isReadOnly()) if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer"); throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer) if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd, lock); return readIntoNativeBuffer(fd, dst, position, nd);
// Substitute a native buffer // Substitute a native buffer
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining()); ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try { try {
int n = readIntoNativeBuffer(fd, bb, position, nd, lock); int n = readIntoNativeBuffer(fd, bb, position, nd);
bb.flip(); bb.flip();
if (n > 0) if (n > 0)
dst.put(bb); dst.put(bb);
...@@ -206,8 +205,7 @@ public class IOUtil { ...@@ -206,8 +205,7 @@ public class IOUtil {
} }
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, NativeDispatcher nd, long position, NativeDispatcher nd)
Object lock)
throws IOException throws IOException
{ {
int pos = bb.position(); int pos = bb.position();
...@@ -220,7 +218,7 @@ public class IOUtil { ...@@ -220,7 +218,7 @@ public class IOUtil {
int n = 0; int n = 0;
if (position != -1) { if (position != -1) {
n = nd.pread(fd, ((DirectBuffer)bb).address() + pos, n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
rem, position, lock); rem, position);
} else { } else {
n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem); n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
} }
......
...@@ -38,8 +38,16 @@ abstract class NativeDispatcher ...@@ -38,8 +38,16 @@ abstract class NativeDispatcher
abstract int read(FileDescriptor fd, long address, int len) abstract int read(FileDescriptor fd, long address, int len)
throws IOException; throws IOException;
int pread(FileDescriptor fd, long address, int len, /**
long position, Object lock) throws IOException * Returns {@code true} if pread/pwrite needs to be synchronized with
* position sensitive methods.
*/
boolean needsPositionLock() {
return false;
}
int pread(FileDescriptor fd, long address, int len, long position)
throws IOException
{ {
throw new IOException("Operation Unsupported"); throw new IOException("Operation Unsupported");
} }
...@@ -50,8 +58,8 @@ abstract class NativeDispatcher ...@@ -50,8 +58,8 @@ abstract class NativeDispatcher
abstract int write(FileDescriptor fd, long address, int len) abstract int write(FileDescriptor fd, long address, int len)
throws IOException; throws IOException;
int pwrite(FileDescriptor fd, long address, int len, int pwrite(FileDescriptor fd, long address, int len, long position)
long position, Object lock) throws IOException throws IOException
{ {
throw new IOException("Operation Unsupported"); throw new IOException("Operation Unsupported");
} }
......
...@@ -318,7 +318,7 @@ public class SimpleAsynchronousFileChannelImpl ...@@ -318,7 +318,7 @@ public class SimpleAsynchronousFileChannelImpl
try { try {
begin(); begin();
do { do {
n = IOUtil.read(fdObj, dst, position, nd, null); n = IOUtil.read(fdObj, dst, position, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n < 0 && !isOpen()) if (n < 0 && !isOpen())
throw new AsynchronousCloseException(); throw new AsynchronousCloseException();
...@@ -372,7 +372,7 @@ public class SimpleAsynchronousFileChannelImpl ...@@ -372,7 +372,7 @@ public class SimpleAsynchronousFileChannelImpl
try { try {
begin(); begin();
do { do {
n = IOUtil.write(fdObj, src, position, nd, null); n = IOUtil.write(fdObj, src, position, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n < 0 && !isOpen()) if (n < 0 && !isOpen())
throw new AsynchronousCloseException(); throw new AsynchronousCloseException();
......
...@@ -356,7 +356,7 @@ class SocketChannelImpl ...@@ -356,7 +356,7 @@ class SocketChannelImpl
// except that the shutdown operation plays the role of // except that the shutdown operation plays the role of
// nd.preClose(). // nd.preClose().
for (;;) { for (;;) {
n = IOUtil.read(fd, buf, -1, nd, readLock); n = IOUtil.read(fd, buf, -1, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen()) { if ((n == IOStatus.INTERRUPTED) && isOpen()) {
// The system call was interrupted but the channel // The system call was interrupted but the channel
// is still open, so retry // is still open, so retry
...@@ -447,7 +447,7 @@ class SocketChannelImpl ...@@ -447,7 +447,7 @@ class SocketChannelImpl
writerThread = NativeThread.current(); writerThread = NativeThread.current();
} }
for (;;) { for (;;) {
n = IOUtil.write(fd, buf, -1, nd, writeLock); n = IOUtil.write(fd, buf, -1, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen()) if ((n == IOStatus.INTERRUPTED) && isOpen())
continue; continue;
return IOStatus.normalize(n); return IOStatus.normalize(n);
......
...@@ -46,8 +46,9 @@ class FileDispatcherImpl extends FileDispatcher ...@@ -46,8 +46,9 @@ class FileDispatcherImpl extends FileDispatcher
return read0(fd, address, len); return read0(fd, address, len);
} }
int pread(FileDescriptor fd, long address, int len, int pread(FileDescriptor fd, long address, int len, long position)
long position, Object lock) throws IOException { throws IOException
{
return pread0(fd, address, len, position); return pread0(fd, address, len, position);
} }
...@@ -59,8 +60,8 @@ class FileDispatcherImpl extends FileDispatcher ...@@ -59,8 +60,8 @@ class FileDispatcherImpl extends FileDispatcher
return write0(fd, address, len); return write0(fd, address, len);
} }
int pwrite(FileDescriptor fd, long address, int len, int pwrite(FileDescriptor fd, long address, int len, long position)
long position, Object lock) throws IOException throws IOException
{ {
return pwrite0(fd, address, len, position); return pwrite0(fd, address, len, position);
} }
......
...@@ -165,7 +165,7 @@ class SinkChannelImpl ...@@ -165,7 +165,7 @@ class SinkChannelImpl
return 0; return 0;
thread = NativeThread.current(); thread = NativeThread.current();
do { do {
n = IOUtil.write(fd, src, -1, nd, lock); n = IOUtil.write(fd, src, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
......
...@@ -165,7 +165,7 @@ class SourceChannelImpl ...@@ -165,7 +165,7 @@ class SourceChannelImpl
return 0; return 0;
thread = NativeThread.current(); thread = NativeThread.current();
do { do {
n = IOUtil.read(fd, dst, -1, nd, lock); n = IOUtil.read(fd, dst, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen()); } while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n); return IOStatus.normalize(n);
} finally { } finally {
......
...@@ -384,7 +384,7 @@ class UnixAsynchronousSocketChannelImpl ...@@ -384,7 +384,7 @@ class UnixAsynchronousSocketChannelImpl
if (scattering) { if (scattering) {
n = (int)IOUtil.read(fd, readBuffers, nd); n = (int)IOUtil.read(fd, readBuffers, nd);
} else { } else {
n = IOUtil.read(fd, readBuffer, -1, nd, null); n = IOUtil.read(fd, readBuffer, -1, nd);
} }
if (n == IOStatus.UNAVAILABLE) { if (n == IOStatus.UNAVAILABLE) {
// spurious wakeup, is this possible? // spurious wakeup, is this possible?
...@@ -505,7 +505,7 @@ class UnixAsynchronousSocketChannelImpl ...@@ -505,7 +505,7 @@ class UnixAsynchronousSocketChannelImpl
if (isScatteringRead) { if (isScatteringRead) {
n = (int)IOUtil.read(fd, dsts, nd); n = (int)IOUtil.read(fd, dsts, nd);
} else { } else {
n = IOUtil.read(fd, dst, -1, nd, null); n = IOUtil.read(fd, dst, -1, nd);
} }
} }
...@@ -579,7 +579,7 @@ class UnixAsynchronousSocketChannelImpl ...@@ -579,7 +579,7 @@ class UnixAsynchronousSocketChannelImpl
if (gathering) { if (gathering) {
n = (int)IOUtil.write(fd, writeBuffers, nd); n = (int)IOUtil.write(fd, writeBuffers, nd);
} else { } else {
n = IOUtil.write(fd, writeBuffer, -1, nd, null); n = IOUtil.write(fd, writeBuffer, -1, nd);
} }
if (n == IOStatus.UNAVAILABLE) { if (n == IOStatus.UNAVAILABLE) {
// spurious wakeup, is this possible? // spurious wakeup, is this possible?
...@@ -688,7 +688,7 @@ class UnixAsynchronousSocketChannelImpl ...@@ -688,7 +688,7 @@ class UnixAsynchronousSocketChannelImpl
if (isGatheringWrite) { if (isGatheringWrite) {
n = (int)IOUtil.write(fd, srcs, nd); n = (int)IOUtil.write(fd, srcs, nd);
} else { } else {
n = IOUtil.write(fd, src, -1, nd, null); n = IOUtil.write(fd, src, -1, nd);
} }
} }
......
...@@ -49,19 +49,22 @@ class FileDispatcherImpl extends FileDispatcher ...@@ -49,19 +49,22 @@ class FileDispatcherImpl extends FileDispatcher
this(false); this(false);
} }
@Override
boolean needsPositionLock() {
return true;
}
int read(FileDescriptor fd, long address, int len) int read(FileDescriptor fd, long address, int len)
throws IOException throws IOException
{ {
return read0(fd, address, len); return read0(fd, address, len);
} }
int pread(FileDescriptor fd, long address, int len, int pread(FileDescriptor fd, long address, int len, long position)
long position, Object lock) throws IOException throws IOException
{ {
synchronized(lock) {
return pread0(fd, address, len, position); return pread0(fd, address, len, position);
} }
}
long readv(FileDescriptor fd, long address, int len) throws IOException { long readv(FileDescriptor fd, long address, int len) throws IOException {
return readv0(fd, address, len); return readv0(fd, address, len);
...@@ -71,13 +74,11 @@ class FileDispatcherImpl extends FileDispatcher ...@@ -71,13 +74,11 @@ class FileDispatcherImpl extends FileDispatcher
return write0(fd, address, len, append); return write0(fd, address, len, append);
} }
int pwrite(FileDescriptor fd, long address, int len, int pwrite(FileDescriptor fd, long address, int len, long position)
long position, Object lock) throws IOException throws IOException
{ {
synchronized(lock) {
return pwrite0(fd, address, len, position); return pwrite0(fd, address, len, position);
} }
}
long writev(FileDescriptor fd, long address, int len) throws IOException { long writev(FileDescriptor fd, long address, int len) throws IOException {
return writev0(fd, address, len, append); return writev0(fd, address, len, append);
......
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/* @test
* @bug 8012019
* @summary Tests interruption of threads doing position-based read methods in
* an attempt to provoke a deadlock between position sensitive and position
* insensitive methods
*/
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.file.*;
import static java.nio.file.StandardOpenOption.*;
public class InterruptDeadlock {
/**
* A thread that continuously reads from a FileChannel with
* read(ByteBuffer,long). The thread terminates when interrupted and/or
* the FileChannel is closed.
*/
static class Reader extends Thread {
final FileChannel fc;
volatile Exception exception;
Reader(FileChannel fc) {
this.fc = fc;
}
@Override
public void run() {
ByteBuffer bb = ByteBuffer.allocate(1024);
try {
long pos = 0L;
for (;;) {
bb.clear();
int n = fc.read(bb, pos);
if (n > 0)
pos += n;
// fc.size is important here as it is position sensitive
if (pos > fc.size())
pos = 0L;
}
} catch (ClosedChannelException x) {
System.out.println(x.getClass() + " (expected)");
} catch (Exception unexpected) {
this.exception = unexpected;
}
}
Exception exception() {
return exception;
}
static Reader startReader(FileChannel fc) {
Reader r = new Reader(fc);
r.start();
return r;
}
}
// the number of reader threads to start
private static final int READER_COUNT = 4;
public static void main(String[] args) throws Exception {
Path file = Paths.get("data.txt");
try (FileChannel fc = FileChannel.open(file, CREATE, TRUNCATE_EXISTING, WRITE)) {
fc.position(1024L * 1024L);
fc.write(ByteBuffer.wrap(new byte[1]));
}
Reader[] readers = new Reader[READER_COUNT];
for (int i=1; i<=20; i++) {
System.out.format("Iteration: %s%n", i);
try (FileChannel fc = FileChannel.open(file)) {
boolean failed = false;
// start reader threads
for (int j=0; j<READER_COUNT; j++) {
readers[j] = Reader.startReader(fc);
}
// give readers a bit of time to get started (not strictly required)
Thread.sleep(100);
// interrupt and wait for the readers to terminate
for (Reader r: readers) {
r.interrupt();
}
for (Reader r: readers) {
try {
r.join(10000);
Exception e = r.exception();
if (e != null) {
System.err.println("Reader thread failed with: " + e);
failed = true;
}
} catch (InterruptedException x) {
System.err.println("Reader thread did not terminte");
failed = true;
}
}
// the channel should not be open at this point
if (fc.isOpen()) {
System.err.println("FileChannel was not closed");
failed = true;
}
if (failed)
throw new RuntimeException("Test failed - see log for details");
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册