/* * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package sun.nio.ch; import java.nio.channels.*; import java.nio.ByteBuffer; import java.net.*; import java.util.concurrent.*; import java.io.IOException; import java.io.FileDescriptor; import java.security.AccessController; import sun.net.NetHooks; import sun.security.action.GetPropertyAction; /** * Unix implementation of AsynchronousSocketChannel */ class UnixAsynchronousSocketChannelImpl extends AsynchronousSocketChannelImpl implements Port.PollableChannel { private final static NativeDispatcher nd = new SocketDispatcher(); private static enum OpType { CONNECT, READ, WRITE }; private static final boolean disableSynchronousRead; static { String propValue = AccessController.doPrivileged( new GetPropertyAction("sun.nio.ch.disableSynchronousRead", "false")); disableSynchronousRead = (propValue.length() == 0) ? true : Boolean.valueOf(propValue); } private final Port port; private final int fdVal; // used to ensure that the context for I/O operations that complete // ascynrhonously is visible to the pooled threads handling I/O events. private final Object updateLock = new Object(); // pending connect (updateLock) private boolean connectPending; private CompletionHandler connectHandler; private Object connectAttachment; private PendingFuture connectFuture; // pending remote address (stateLock) private SocketAddress pendingRemote; // pending read (updateLock) private boolean readPending; private boolean isScatteringRead; private ByteBuffer readBuffer; private ByteBuffer[] readBuffers; private CompletionHandler readHandler; private Object readAttachment; private PendingFuture readFuture; private Future readTimer; // pending write (updateLock) private boolean writePending; private boolean isGatheringWrite; private ByteBuffer writeBuffer; private ByteBuffer[] writeBuffers; private CompletionHandler writeHandler; private Object writeAttachment; private PendingFuture writeFuture; private Future writeTimer; UnixAsynchronousSocketChannelImpl(Port port) throws IOException { super(port); // set non-blocking try { IOUtil.configureBlocking(fd, false); } catch (IOException x) { nd.close(fd); throw x; } this.port = port; this.fdVal = IOUtil.fdVal(fd); // add mapping from file descriptor to this channel port.register(fdVal, this); } // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl UnixAsynchronousSocketChannelImpl(Port port, FileDescriptor fd, InetSocketAddress remote) throws IOException { super(port, fd, remote); this.fdVal = IOUtil.fdVal(fd); IOUtil.configureBlocking(fd, false); try { port.register(fdVal, this); } catch (ShutdownChannelGroupException x) { // ShutdownChannelGroupException thrown if we attempt to register a // new channel after the group is shutdown throw new IOException(x); } this.port = port; } @Override public AsynchronousChannelGroupImpl group() { return port; } // register events for outstanding I/O operations, caller already owns updateLock private void updateEvents() { assert Thread.holdsLock(updateLock); int events = 0; if (readPending) events |= Port.POLLIN; if (connectPending || writePending) events |= Port.POLLOUT; if (events != 0) port.startPoll(fdVal, events); } // register events for outstanding I/O operations private void lockAndUpdateEvents() { synchronized (updateLock) { updateEvents(); } } // invoke to finish read and/or write operations private void finish(boolean mayInvokeDirect, boolean readable, boolean writable) { boolean finishRead = false; boolean finishWrite = false; boolean finishConnect = false; // map event to pending result synchronized (updateLock) { if (readable && this.readPending) { this.readPending = false; finishRead = true; } if (writable) { if (this.writePending) { this.writePending = false; finishWrite = true; } else if (this.connectPending) { this.connectPending = false; finishConnect = true; } } } // complete the I/O operation. Special case for when channel is // ready for both reading and writing. In that case, submit task to // complete write if write operation has a completion handler. if (finishRead) { if (finishWrite) finishWrite(false); finishRead(mayInvokeDirect); return; } if (finishWrite) { finishWrite(mayInvokeDirect); } if (finishConnect) { finishConnect(mayInvokeDirect); } } /** * Invoked by event handler thread when file descriptor is polled */ @Override public void onEvent(int events, boolean mayInvokeDirect) { boolean readable = (events & Port.POLLIN) > 0; boolean writable = (events & Port.POLLOUT) > 0; if ((events & (Port.POLLERR | Port.POLLHUP)) > 0) { readable = true; writable = true; } finish(mayInvokeDirect, readable, writable); } @Override void implClose() throws IOException { // remove the mapping port.unregister(fdVal); // close file descriptor nd.close(fd); // All outstanding I/O operations are required to fail finish(false, true, true); } @Override public void onCancel(PendingFuture task) { if (task.getContext() == OpType.CONNECT) killConnect(); if (task.getContext() == OpType.READ) killReading(); if (task.getContext() == OpType.WRITE) killWriting(); } // -- connect -- private void setConnected() throws IOException { synchronized (stateLock) { state = ST_CONNECTED; localAddress = Net.localAddress(fd); remoteAddress = pendingRemote; } } private void finishConnect(boolean mayInvokeDirect) { Throwable e = null; try { begin(); checkConnect(fdVal); setConnected(); } catch (Throwable x) { if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); e = x; } finally { end(); } if (e != null) { // close channel if connection cannot be established try { close(); } catch (Throwable suppressed) { e.addSuppressed(suppressed); } } // invoke handler and set result CompletionHandler handler = connectHandler; Object att = connectAttachment; PendingFuture future = connectFuture; if (handler == null) { future.setResult(null, e); } else { if (mayInvokeDirect) { Invoker.invokeUnchecked(handler, att, null, e); } else { Invoker.invokeIndirectly(this, handler, att, null, e); } } } @Override @SuppressWarnings("unchecked") Future implConnect(SocketAddress remote, A attachment, CompletionHandler handler) { if (!isOpen()) { Throwable e = new ClosedChannelException(); if (handler == null) { return CompletedFuture.withFailure(e); } else { Invoker.invoke(this, handler, attachment, null, e); return null; } } InetSocketAddress isa = Net.checkAddress(remote); // permission check SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); // check and set state boolean notifyBeforeTcpConnect; synchronized (stateLock) { if (state == ST_CONNECTED) throw new AlreadyConnectedException(); if (state == ST_PENDING) throw new ConnectionPendingException(); state = ST_PENDING; pendingRemote = remote; notifyBeforeTcpConnect = (localAddress == null); } Throwable e = null; try { begin(); // notify hook if unbound if (notifyBeforeTcpConnect) NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); int n = Net.connect(fd, isa.getAddress(), isa.getPort()); if (n == IOStatus.UNAVAILABLE) { // connection could not be established immediately PendingFuture result = null; synchronized (updateLock) { if (handler == null) { result = new PendingFuture(this, OpType.CONNECT); this.connectFuture = (PendingFuture)result; } else { this.connectHandler = (CompletionHandler)handler; this.connectAttachment = attachment; } this.connectPending = true; updateEvents(); } return result; } setConnected(); } catch (Throwable x) { if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); e = x; } finally { end(); } // close channel if connect fails if (e != null) { try { close(); } catch (Throwable suppressed) { e.addSuppressed(suppressed); } } if (handler == null) { return CompletedFuture.withResult(null, e); } else { Invoker.invoke(this, handler, attachment, null, e); return null; } } // -- read -- private void finishRead(boolean mayInvokeDirect) { int n = -1; Throwable exc = null; // copy fields as we can't access them after reading is re-enabled. boolean scattering = isScatteringRead; CompletionHandler handler = readHandler; Object att = readAttachment; PendingFuture future = readFuture; Future timeout = readTimer; try { begin(); if (scattering) { n = (int)IOUtil.read(fd, readBuffers, nd); } else { n = IOUtil.read(fd, readBuffer, -1, nd, null); } if (n == IOStatus.UNAVAILABLE) { // spurious wakeup, is this possible? synchronized (updateLock) { readPending = true; } return; } // allow objects to be GC'ed. this.readBuffer = null; this.readBuffers = null; this.readAttachment = null; // allow another read to be initiated enableReading(); } catch (Throwable x) { enableReading(); if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); exc = x; } finally { // restart poll in case of concurrent write if (!(exc instanceof AsynchronousCloseException)) lockAndUpdateEvents(); end(); } // cancel the associated timer if (timeout != null) timeout.cancel(false); // create result Number result = (exc != null) ? null : (scattering) ? (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); // invoke handler or set result if (handler == null) { future.setResult(result, exc); } else { if (mayInvokeDirect) { Invoker.invokeUnchecked(handler, att, result, exc); } else { Invoker.invokeIndirectly(this, handler, att, result, exc); } } } private Runnable readTimeoutTask = new Runnable() { public void run() { CompletionHandler handler = null; Object att = null; PendingFuture future = null; synchronized (updateLock) { if (!readPending) return; readPending = false; handler = readHandler; att = readAttachment; future = readFuture; } // kill further reading before releasing waiters enableReading(true); // invoke handler or set result Exception exc = new InterruptedByTimeoutException(); if (handler == null) { future.setFailure(exc); } else { AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this; Invoker.invokeIndirectly(ch, handler, att, null, exc); } } }; /** * Initiates a read or scattering read operation */ @Override @SuppressWarnings("unchecked") Future implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { // A synchronous read is not attempted if disallowed by system property // or, we are using a fixed thread pool and the completion handler may // not be invoked directly (because the thread is not a pooled thread or // there are too many handlers on the stack). Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null; boolean invokeDirect = false; boolean attemptRead = false; if (!disableSynchronousRead) { if (handler == null) { attemptRead = true; } else { myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount(); invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); // okay to attempt read with user thread pool attemptRead = invokeDirect || !port.isFixedThreadPool(); } } int n = IOStatus.UNAVAILABLE; Throwable exc = null; boolean pending = false; try { begin(); if (attemptRead) { if (isScatteringRead) { n = (int)IOUtil.read(fd, dsts, nd); } else { n = IOUtil.read(fd, dst, -1, nd, null); } } if (n == IOStatus.UNAVAILABLE) { PendingFuture result = null; synchronized (updateLock) { this.isScatteringRead = isScatteringRead; this.readBuffer = dst; this.readBuffers = dsts; if (handler == null) { this.readHandler = null; result = new PendingFuture(this, OpType.READ); this.readFuture = (PendingFuture)result; this.readAttachment = null; } else { this.readHandler = (CompletionHandler)handler; this.readAttachment = attachment; this.readFuture = null; } if (timeout > 0L) { this.readTimer = port.schedule(readTimeoutTask, timeout, unit); } this.readPending = true; updateEvents(); } pending = true; return result; } } catch (Throwable x) { if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); exc = x; } finally { if (!pending) enableReading(); end(); } Number result = (exc != null) ? null : (isScatteringRead) ? (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); // read completed immediately if (handler != null) { if (invokeDirect) { Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); } else { Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); } return null; } else { return CompletedFuture.withResult((V)result, exc); } } // -- write -- private void finishWrite(boolean mayInvokeDirect) { int n = -1; Throwable exc = null; // copy fields as we can't access them after reading is re-enabled. boolean gathering = this.isGatheringWrite; CompletionHandler handler = this.writeHandler; Object att = this.writeAttachment; PendingFuture future = this.writeFuture; Future timer = this.writeTimer; try { begin(); if (gathering) { n = (int)IOUtil.write(fd, writeBuffers, nd); } else { n = IOUtil.write(fd, writeBuffer, -1, nd, null); } if (n == IOStatus.UNAVAILABLE) { // spurious wakeup, is this possible? synchronized (updateLock) { writePending = true; } return; } // allow objects to be GC'ed. this.writeBuffer = null; this.writeBuffers = null; this.writeAttachment = null; // allow another write to be initiated enableWriting(); } catch (Throwable x) { enableWriting(); if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); exc = x; } finally { // restart poll in case of concurrent write if (!(exc instanceof AsynchronousCloseException)) lockAndUpdateEvents(); end(); } // cancel the associated timer if (timer != null) timer.cancel(false); // create result Number result = (exc != null) ? null : (gathering) ? (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); // invoke handler or set result if (handler == null) { future.setResult(result, exc); } else { if (mayInvokeDirect) { Invoker.invokeUnchecked(handler, att, result, exc); } else { Invoker.invokeIndirectly(this, handler, att, result, exc); } } } private Runnable writeTimeoutTask = new Runnable() { public void run() { CompletionHandler handler = null; Object att = null; PendingFuture future = null; synchronized (updateLock) { if (!writePending) return; writePending = false; handler = writeHandler; att = writeAttachment; future = writeFuture; } // kill further writing before releasing waiters enableWriting(true); // invoke handler or set result Exception exc = new InterruptedByTimeoutException(); if (handler != null) { Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, handler, att, null, exc); } else { future.setFailure(exc); } } }; /** * Initiates a read or scattering read operation */ @Override @SuppressWarnings("unchecked") Future implWrite(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { Invoker.GroupAndInvokeCount myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount(); boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); boolean attemptWrite = (handler == null) || invokeDirect || !port.isFixedThreadPool(); // okay to attempt write with user thread pool int n = IOStatus.UNAVAILABLE; Throwable exc = null; boolean pending = false; try { begin(); if (attemptWrite) { if (isGatheringWrite) { n = (int)IOUtil.write(fd, srcs, nd); } else { n = IOUtil.write(fd, src, -1, nd, null); } } if (n == IOStatus.UNAVAILABLE) { PendingFuture result = null; synchronized (updateLock) { this.isGatheringWrite = isGatheringWrite; this.writeBuffer = src; this.writeBuffers = srcs; if (handler == null) { this.writeHandler = null; result = new PendingFuture(this, OpType.WRITE); this.writeFuture = (PendingFuture)result; this.writeAttachment = null; } else { this.writeHandler = (CompletionHandler)handler; this.writeAttachment = attachment; this.writeFuture = null; } if (timeout > 0L) { this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit); } this.writePending = true; updateEvents(); } pending = true; return result; } } catch (Throwable x) { if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); exc = x; } finally { if (!pending) enableWriting(); end(); } Number result = (exc != null) ? null : (isGatheringWrite) ? (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); // write completed immediately if (handler != null) { if (invokeDirect) { Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); } else { Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); } return null; } else { return CompletedFuture.withResult((V)result, exc); } } // -- Native methods -- private static native void checkConnect(int fdVal) throws IOException; static { Util.load(); } }