提交 a7b5a2ab 编写于 作者: A alanb

6842687: New I/O: Update Asynchronous I/O API to jsr203/nio2-b101

Reviewed-by: sherman
上级 3d77c423
......@@ -160,7 +160,6 @@ FILES_src = \
\
sun/nio/ByteBuffered.java \
\
sun/nio/ch/AbstractFuture.java \
sun/nio/ch/AbstractPollArrayWrapper.java \
sun/nio/ch/AllocatedNativeObject.java \
sun/nio/ch/AsynchronousChannelGroupImpl.java \
......
......@@ -56,18 +56,18 @@ public interface AsynchronousByteChannel
/**
* Reads a sequence of bytes from this channel into the given buffer.
*
* <p> This method initiates an operation to read a sequence of bytes from
* this channel into the given buffer. The method returns a {@link Future}
* representing the pending result of the operation. The result of the
* operation, obtained by invoking the {@code Future} 's {@link
* Future#get() get} method, is the number of bytes read or {@code -1} if
* all bytes have been read and the channel has reached end-of-stream.
*
* <p> This method initiates a read operation to read up to <i>r</i> bytes
* from the channel, where <i>r</i> is the number of bytes remaining in the
* buffer, that is, {@code dst.remaining()} at the time that the read is
* attempted. Where <i>r</i> is 0, the read operation completes immediately
* with a result of {@code 0} without initiating an I/O operation.
* <p> This method initiates an asynchronous read operation to read a
* sequence of bytes from this channel into the given buffer. The {@code
* handler} parameter is a completion handler that is invoked when the read
* operation completes (or fails). The result passed to the completion
* handler is the number of bytes read or {@code -1} if no bytes could be
* read because the channel has reached end-of-stream.
*
* <p> The read operation may read up to <i>r</i> bytes from the channel,
* where <i>r</i> is the number of bytes remaining in the buffer, that is,
* {@code dst.remaining()} at the time that the read is attempted. Where
* <i>r</i> is 0, the read operation completes immediately with a result of
* {@code 0} without initiating an I/O operation.
*
* <p> Suppose that a byte sequence of length <i>n</i> is read, where
* <tt>0</tt>&nbsp;<tt>&lt;</tt>&nbsp;<i>n</i>&nbsp;<tt>&lt;=</tt>&nbsp;<i>r</i>.
......@@ -79,44 +79,46 @@ public interface AsynchronousByteChannel
* <i>p</i>&nbsp;<tt>+</tt>&nbsp;<i>n</i>; its limit will not have changed.
*
* <p> Buffers are not safe for use by multiple concurrent threads so care
* should be taken to not to access the buffer until the operaton has completed.
* should be taken to not access the buffer until the operation has
* completed.
*
* <p> This method may be invoked at any time. Some channel types may not
* allow more than one read to be outstanding at any given time. If a thread
* initiates a read operation before a previous read operation has
* completed then a {@link ReadPendingException} will be thrown.
*
* <p> The <tt>handler</tt> parameter is used to specify a {@link
* CompletionHandler}. When the read operation completes the handler's
* {@link CompletionHandler#completed completed} method is executed.
*
*
* @param dst
* The buffer into which bytes are to be transferred
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The completion handler object; can be {@code null}
*
* @return A Future representing the result of the operation
* The completion handler
*
* @throws IllegalArgumentException
* If the buffer is read-only
* @throws ReadPendingException
* If the channel does not allow more than one read to be outstanding
* and a previous read has not completed
* @throws ShutdownChannelGroupException
* If the channel is associated with a {@link AsynchronousChannelGroup
* group} that has terminated
*/
<A> Future<Integer> read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler);
<A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler);
/**
* Reads a sequence of bytes from this channel into the given buffer.
*
* <p> An invocation of this method of the form <tt>c.read(dst)</tt>
* behaves in exactly the same manner as the invocation
* <blockquote><pre>
* c.read(dst, null, null);</pre></blockquote>
* <p> This method initiates an asynchronous read operation to read a
* sequence of bytes from this channel into the given buffer. The method
* behaves in exactly the same manner as the {@link
* #read(ByteBuffer,Object,CompletionHandler)
* read(ByteBuffer,Object,CompletionHandler)} method except that instead
* of specifying a completion handler, this method returns a {@code Future}
* representing the pending result. The {@code Future}'s {@link Future#get()
* get} method returns the number of bytes read or {@code -1} if no bytes
* could be read because the channel has reached end-of-stream.
*
* @param dst
* The buffer into which bytes are to be transferred
......@@ -134,17 +136,17 @@ public interface AsynchronousByteChannel
/**
* Writes a sequence of bytes to this channel from the given buffer.
*
* <p> This method initiates an operation to write a sequence of bytes to
* this channel from the given buffer. This method returns a {@link
* Future} representing the pending result of the operation. The result
* of the operation, obtained by invoking the <tt>Future</tt>'s {@link
* Future#get() get} method, is the number of bytes written, possibly zero.
* <p> This method initiates an asynchronous write operation to write a
* sequence of bytes to this channel from the given buffer. The {@code
* handler} parameter is a completion handler that is invoked when the write
* operation completes (or fails). The result passed to the completion
* handler is the number of bytes written.
*
* <p> This method initiates a write operation to write up to <i>r</i> bytes
* to the channel, where <i>r</i> is the number of bytes remaining in the
* buffer, that is, {@code src.remaining()} at the moment the write is
* attempted. Where <i>r</i> is 0, the write operation completes immediately
* with a result of {@code 0} without initiating an I/O operation.
* <p> The write operation may write up to <i>r</i> bytes to the channel,
* where <i>r</i> is the number of bytes remaining in the buffer, that is,
* {@code src.remaining()} at the time that the write is attempted. Where
* <i>r</i> is 0, the write operation completes immediately with a result of
* {@code 0} without initiating an I/O operation.
*
* <p> Suppose that a byte sequence of length <i>n</i> is written, where
* <tt>0</tt>&nbsp;<tt>&lt;</tt>&nbsp;<i>n</i>&nbsp;<tt>&lt;=</tt>&nbsp;<i>r</i>.
......@@ -156,41 +158,43 @@ public interface AsynchronousByteChannel
* <i>p</i>&nbsp;<tt>+</tt>&nbsp;<i>n</i>; its limit will not have changed.
*
* <p> Buffers are not safe for use by multiple concurrent threads so care
* should be taken to not to access the buffer until the operaton has completed.
* should be taken to not access the buffer until the operation has
* completed.
*
* <p> This method may be invoked at any time. Some channel types may not
* allow more than one write to be outstanding at any given time. If a thread
* initiates a write operation before a previous write operation has
* completed then a {@link WritePendingException} will be thrown.
*
* <p> The <tt>handler</tt> parameter is used to specify a {@link
* CompletionHandler}. When the write operation completes the handler's
* {@link CompletionHandler#completed completed} method is executed.
*
* @param src
* The buffer from which bytes are to be retrieved
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The completion handler object; can be {@code null}
*
* @return A Future representing the result of the operation
* The completion handler object
*
* @throws WritePendingException
* If the channel does not allow more than one write to be outstanding
* and a previous write has not completed
* @throws ShutdownChannelGroupException
* If the channel is associated with a {@link AsynchronousChannelGroup
* group} that has terminated
*/
<A> Future<Integer> write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler);
<A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler);
/**
* Writes a sequence of bytes to this channel from the given buffer.
*
* <p> An invocation of this method of the form <tt>c.write(src)</tt>
* behaves in exactly the same manner as the invocation
* <blockquote><pre>
* c.write(src, null, null);</pre></blockquote>
* <p> This method initiates an asynchronous write operation to write a
* sequence of bytes to this channel from the given buffer. The method
* behaves in exactly the same manner as the {@link
* #write(ByteBuffer,Object,CompletionHandler)
* write(ByteBuffer,Object,CompletionHandler)} method except that instead
* of specifying a completion handler, this method returns a {@code Future}
* representing the pending result. The {@code Future}'s {@link Future#get()
* get} method returns the number of bytes written.
*
* @param src
* The buffer from which bytes are to be retrieved
......
......@@ -34,7 +34,8 @@ import java.util.concurrent.Future; // javadoc
*
* <ol>
* <li><pre>{@link Future}&lt;V&gt; <em>operation</em>(<em>...</em>)</pre></li>
* <li><pre>Future&lt;V&gt; <em>operation</em>(<em>...</em> A attachment, {@link CompletionHandler}&lt;V,? super A&gt handler)</pre></li>
* <li><pre>void <em>operation</em>(<em>...</em> A attachment, {@link
* CompletionHandler}&lt;V,? super A&gt; handler)</pre></li>
* </ol>
*
* where <i>operation</i> is the name of the I/O operation (read or write for
......@@ -48,7 +49,7 @@ import java.util.concurrent.Future; // javadoc
* interface may be used to check if the operation has completed, wait for its
* completion, and to retrieve the result. In the second form, a {@link
* CompletionHandler} is invoked to consume the result of the I/O operation when
* it completes, fails, or is cancelled.
* it completes or fails.
*
* <p> A channel that implements this interface is <em>asynchronously
* closeable</em>: If an I/O operation is outstanding on the channel and the
......@@ -63,33 +64,33 @@ import java.util.concurrent.Future; // javadoc
* <h4>Cancellation</h4>
*
* <p> The {@code Future} interface defines the {@link Future#cancel cancel}
* method to cancel execution of a task.
*
* <p> Where the {@code cancel} method is invoked with the {@code
* method to cancel execution. This causes all threads waiting on the result of
* the I/O operation to throw {@link java.util.concurrent.CancellationException}.
* Whether the underlying I/O operation can be cancelled is highly implementation
* specific and therefore not specified. Where cancellation leaves the channel,
* or the entity to which it is connected, in an inconsistent state, then the
* channel is put into an implementation specific <em>error state</em> that
* prevents further attempts to initiate I/O operations that are <i>similar</i>
* to the operation that was cancelled. For example, if a read operation is
* cancelled but the implementation cannot guarantee that bytes have not been
* read from the channel then it puts the channel into an error state; further
* attempts to initiate a {@code read} operation cause an unspecified runtime
* exception to be thrown. Similarly, if a write operation is cancelled but the
* implementation cannot guarantee that bytes have not been written to the
* channel then subsequent attempts to initiate a {@code write} will fail with
* an unspecified runtime exception.
*
* <p> Where the {@link Future#cancel cancel} method is invoked with the {@code
* mayInterruptIfRunning} parameter set to {@code true} then the I/O operation
* may be interrupted by closing the channel. This will cause any other I/O
* operations outstanding on the channel to complete with the exception {@link
* AsynchronousCloseException}.
*
* <p> If a {@code CompletionHandler} is specified when initiating an I/O
* operation, and the {@code cancel} method is invoked to cancel the I/O
* operation before it completes, then the {@code CompletionHandler}'s {@link
* CompletionHandler#cancelled cancelled} method is invoked.
*
* <p> If an implementation of this interface supports a means to cancel I/O
* operations, and where cancellation may leave the channel, or the entity to
* which it is connected, in an inconsistent state, then the channel is put into
* an implementation specific <em>error state</em> that prevents further
* attempts to initiate I/O operations on the channel. For example, if a read
* operation is cancelled but the implementation cannot guarantee that bytes
* have not been read from the channel then it puts the channel into error state
* state; further attempts to initiate a {@code read} operation causes an
* unspecified runtime exception to be thrown.
* may be interrupted by closing the channel. In that case all threads waiting
* on the result of the I/O operation throw {@code CancellationException} and
* any other I/O operations outstanding on the channel complete with the
* exception {@link AsynchronousCloseException}.
*
* <p> Where the {@code cancel} method is invoked to cancel read or write
* operations then it recommended that all buffers used in the I/O operations be
* discarded or care taken to ensure that the buffers are not accessed while the
* channel remains open.
* operations then it is recommended that all buffers used in the I/O operations
* be discarded or care taken to ensure that the buffers are not accessed while
* the channel remains open.
*
* @since 1.7
*/
......@@ -102,7 +103,7 @@ public interface AsynchronousChannel
*
* <p> Any outstanding asynchronous operations upon this channel will
* complete with the exception {@link AsynchronousCloseException}. After a
* channel is closed then further attempts to initiate asynchronous I/O
* channel is closed, further attempts to initiate asynchronous I/O
* operations complete immediately with cause {@link ClosedChannelException}.
*
* <p> This method otherwise behaves exactly as specified by the {@link
......
......@@ -85,9 +85,6 @@ import java.io.IOException;
* public void failed(Throwable exc, Void att) {
* ...
* }
* public void cancelled(Void att) {
* ...
* }
* });
* </pre>
*
......@@ -240,11 +237,11 @@ public abstract class AsynchronousServerSocketChannel
/**
* Accepts a connection.
*
* <p> This method initiates accepting a connection made to this channel's
* socket, returning a {@link Future} representing the pending result
* of the operation. The {@code Future}'s {@link Future#get() get}
* method will return the {@link AsynchronousSocketChannel} for the new
* connection on successful completion.
* <p> This method initiates an asynchronous operation to accept a
* connection made to this channel's socket. The {@code handler} parameter is
* a completion handler that is invoked when a connection is accepted (or
* the operation fails). The result passed to the completion handler is
* the {@link AsynchronousSocketChannel} to the new connection.
*
* <p> When a new connection is accepted then the resulting {@code
* AsynchronousSocketChannel} will be bound to the same {@link
......@@ -269,35 +266,35 @@ public abstract class AsynchronousServerSocketChannel
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The handler for consuming the result; can be {@code null}
*
* @return an <tt>Future</tt> object representing the pending result
* The handler for consuming the result
*
* @throws AcceptPendingException
* If an accept operation is already in progress on this channel
* @throws NotYetBoundException
* If this channel's socket has not yet been bound
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
*/
public abstract <A> Future<AsynchronousSocketChannel>
accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);
public abstract <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel,? super A> handler);
/**
* Accepts a connection.
*
* <p> This method is equivalent to invoking {@link
* #accept(Object,CompletionHandler)} with the {@code attachment}
* and {@code handler} parameters set to {@code null}.
* <p> This method initiates an asynchronous operation to accept a
* connection made to this channel's socket. The method behaves in exactly
* the same manner as the {@link #accept(Object, CompletionHandler)} method
* except that instead of specifying a completion handler, this method
* returns a {@code Future} representing the pending result. The {@code
* Future}'s {@link Future#get() get} method returns the {@link
* AsynchronousSocketChannel} to the new connection on successful completion.
*
* @return an <tt>Future</tt> object representing the pending result
* @return a {@code Future} object representing the pending result
*
* @throws AcceptPendingException
* If an accept operation is already in progress on this channel
* @throws NotYetBoundException
* If this channel's socket has not yet been bound
*/
public final Future<AsynchronousSocketChannel> accept() {
return accept(null, null);
}
public abstract Future<AsynchronousSocketChannel> accept();
}
......@@ -274,14 +274,11 @@ public abstract class AsynchronousSocketChannel
/**
* Connects this channel.
*
* <p> This method initiates an operation to connect this channel, returning
* a {@code Future} representing the pending result of the operation. If
* the connection is successfully established then the {@code Future}'s
* {@link Future#get() get} method will return {@code null}. If the
* connection cannot be established then the channel is closed. In that case,
* invoking the {@code get} method throws {@link
* java.util.concurrent.ExecutionException} with an {@code IOException} as
* the cause.
* <p> This method initiates an operation to connect this channel. The
* {@code handler} parameter is a completion handler that is invoked when
* the connection is successfully established or connection cannot be
* established. If the connection cannot be established then the channel is
* closed.
*
* <p> This method performs exactly the same security checks as the {@link
* java.net.Socket} class. That is, if a security manager has been
......@@ -294,9 +291,7 @@ public abstract class AsynchronousSocketChannel
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The handler for consuming the result; can be {@code null}
*
* @return A {@code Future} object representing the pending result
* The handler for consuming the result
*
* @throws UnresolvedAddressException
* If the given remote address is not fully resolved
......@@ -307,23 +302,26 @@ public abstract class AsynchronousSocketChannel
* @throws ConnectionPendingException
* If a connection operation is already in progress on this channel
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
* @throws SecurityException
* If a security manager has been installed
* and it does not permit access to the given remote endpoint
*
* @see #getRemoteAddress
*/
public abstract <A> Future<Void> connect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler);
public abstract <A> void connect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler);
/**
* Connects this channel.
*
* <p> This method is equivalent to invoking {@link
* #connect(SocketAddress,Object,CompletionHandler)} with the {@code attachment}
* and handler parameters set to {@code null}.
* <p> This method initiates an operation to connect this channel. This
* method behaves in exactly the same manner as the {@link
* #connect(SocketAddress, Object, CompletionHandler)} method except that
* instead of specifying a completion handler, this method returns a {@code
* Future} representing the pending result. The {@code Future}'s {@link
* Future#get() get} method returns {@code null} on successful completion.
*
* @param remote
* The remote address to which this channel is to be connected
......@@ -342,18 +340,17 @@ public abstract class AsynchronousSocketChannel
* If a security manager has been installed
* and it does not permit access to the given remote endpoint
*/
public final Future<Void> connect(SocketAddress remote) {
return connect(remote, null, null);
}
public abstract Future<Void> connect(SocketAddress remote);
/**
* Reads a sequence of bytes from this channel into the given buffer.
*
* <p> This method initiates the reading of a sequence of bytes from this
* channel into the given buffer, returning a {@code Future} representing
* the pending result of the operation. The {@code Future}'s {@link
* Future#get() get} method returns the number of bytes read or {@code -1}
* if all bytes have been read and channel has reached end-of-stream.
* <p> This method initiates an asynchronous read operation to read a
* sequence of bytes from this channel into the given buffer. The {@code
* handler} parameter is a completion handler that is invoked when the read
* operation completes (or fails). The result passed to the completion
* handler is the number of bytes read or {@code -1} if no bytes could be
* read because the channel has reached end-of-stream.
*
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then the operation completes with the exception {@link
......@@ -376,9 +373,7 @@ public abstract class AsynchronousSocketChannel
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The handler for consuming the result; can be {@code null}
*
* @return A {@code Future} object representing the pending result
* The handler for consuming the result
*
* @throws IllegalArgumentException
* If the {@code timeout} parameter is negative or the buffer is
......@@ -388,13 +383,13 @@ public abstract class AsynchronousSocketChannel
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
*/
public abstract <A> Future<Integer> read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
public abstract <A> void read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
/**
* @throws IllegalArgumentException {@inheritDoc}
......@@ -402,14 +397,14 @@ public abstract class AsynchronousSocketChannel
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
*/
@Override
public final <A> Future<Integer> read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler)
public final <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
return read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
/**
......@@ -419,16 +414,18 @@ public abstract class AsynchronousSocketChannel
* If this channel is not yet connected
*/
@Override
public final Future<Integer> read(ByteBuffer dst) {
return read(dst, 0L, TimeUnit.MILLISECONDS, null, null);
}
public abstract Future<Integer> read(ByteBuffer dst);
/**
* Reads a sequence of bytes from this channel into a subsequence of the
* given buffers. This operation, sometimes called a <em>scattering read</em>,
* is often useful when implementing network protocols that group data into
* segments consisting of one or more fixed-length headers followed by a
* variable-length body.
* variable-length body. The {@code handler} parameter is a completion
* handler that is invoked when the read operation completes (or fails). The
* result passed to the completion handler is the number of bytes read or
* {@code -1} if no bytes could be read because the channel has reached
* end-of-stream.
*
* <p> This method initiates a read of up to <i>r</i> bytes from this channel,
* where <i>r</i> is the total number of bytes remaining in the specified
......@@ -456,11 +453,6 @@ public abstract class AsynchronousSocketChannel
* I/O operation is performed with the maximum number of buffers allowed by
* the operating system.
*
* <p> The return value from this method is a {@code Future} representing
* the pending result of the operation. The {@code Future}'s {@link
* Future#get() get} method returns the number of bytes read or {@code -1L}
* if all bytes have been read and the channel has reached end-of-stream.
*
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then it completes with the exception {@link
* InterruptedByTimeoutException}. Where a timeout occurs, and the
......@@ -485,9 +477,7 @@ public abstract class AsynchronousSocketChannel
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The handler for consuming the result; can be {@code null}
*
* @return A {@code Future} object representing the pending result
* The handler for consuming the result
*
* @throws IndexOutOfBoundsException
* If the pre-conditions for the {@code offset} and {@code length}
......@@ -500,23 +490,24 @@ public abstract class AsynchronousSocketChannel
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
*/
public abstract <A> Future<Long> read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
public abstract <A> void read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
/**
* Writes a sequence of bytes to this channel from the given buffer.
*
* <p> This method initiates the writing of a sequence of bytes to this channel
* from the given buffer, returning a {@code Future} representing the
* pending result of the operation. The {@code Future}'s {@link Future#get()
* get} method will return the number of bytes written.
* <p> This method initiates an asynchronous write operation to write a
* sequence of bytes to this channel from the given buffer. The {@code
* handler} parameter is a completion handler that is invoked when the write
* operation completes (or fails). The result passed to the completion
* handler is the number of bytes written.
*
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then it completes with the exception {@link
......@@ -539,9 +530,7 @@ public abstract class AsynchronousSocketChannel
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The handler for consuming the result; can be {@code null}
*
* @return A {@code Future} object representing the pending result
* The handler for consuming the result
*
* @throws IllegalArgumentException
* If the {@code timeout} parameter is negative
......@@ -550,28 +539,28 @@ public abstract class AsynchronousSocketChannel
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
*/
public abstract <A> Future<Integer> write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
public abstract <A> void write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
/**
* @throws WritePendingException {@inheritDoc}
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
*/
@Override
public final <A> Future<Integer> write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)
public final <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
return write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}
/**
......@@ -580,16 +569,16 @@ public abstract class AsynchronousSocketChannel
* If this channel is not yet connected
*/
@Override
public final Future<Integer> write(ByteBuffer src) {
return write(src, 0L, TimeUnit.MILLISECONDS, null, null);
}
public abstract Future<Integer> write(ByteBuffer src);
/**
* Writes a sequence of bytes to this channel from a subsequence of the given
* buffers. This operation, sometimes called a <em>gathering write</em>, is
* often useful when implementing network protocols that group data into
* segments consisting of one or more fixed-length headers followed by a
* variable-length body.
* variable-length body. The {@code handler} parameter is a completion
* handler that is invoked when the write operation completes (or fails).
* The result passed to the completion handler is the number of bytes written.
*
* <p> This method initiates a write of up to <i>r</i> bytes to this channel,
* where <i>r</i> is the total number of bytes remaining in the specified
......@@ -616,10 +605,6 @@ public abstract class AsynchronousSocketChannel
* remaining), exceeds this limit, then the I/O operation is performed with
* the maximum number of buffers allowed by the operating system.
*
* <p> The return value from this method is a {@code Future} representing
* the pending result of the operation. The {@code Future}'s {@link
* Future#get() get} method will return the number of bytes written.
*
* <p> If a timeout is specified and the timeout elapses before the operation
* completes then it completes with the exception {@link
* InterruptedByTimeoutException}. Where a timeout occurs, and the
......@@ -644,9 +629,7 @@ public abstract class AsynchronousSocketChannel
* @param attachment
* The object to attach to the I/O operation; can be {@code null}
* @param handler
* The handler for consuming the result; can be {@code null}
*
* @return A {@code Future} object representing the pending result
* The handler for consuming the result
*
* @throws IndexOutOfBoundsException
* If the pre-conditions for the {@code offset} and {@code length}
......@@ -658,13 +641,13 @@ public abstract class AsynchronousSocketChannel
* @throws NotYetConnectedException
* If this channel is not yet connected
* @throws ShutdownChannelGroupException
* If a handler is specified, and the channel group is shutdown
* If the channel group has terminated
*/
public abstract <A> Future<Long> write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
public abstract <A> void write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
}
......@@ -32,11 +32,9 @@ package java.nio.channels;
* handler to be specified to consume the result of an asynchronous operation.
* The {@link #completed completed} method is invoked when the I/O operation
* completes successfully. The {@link #failed failed} method is invoked if the
* I/O operations fails. The {@link #cancelled cancelled} method is invoked when
* the I/O operation is cancelled by invoking the {@link
* java.util.concurrent.Future#cancel cancel} method. The implementations of
* these methods should complete in a timely manner so as to avoid keeping the
* invoking thread from dispatching to other completion handlers.
* I/O operations fails. The implementations of these methods should complete
* in a timely manner so as to avoid keeping the invoking thread from dispatching
* to other completion handlers.
*
* @param <V> The result type of the I/O operation
* @param <A> The type of the object attached to the I/O operation
......@@ -65,13 +63,4 @@ public interface CompletionHandler<V,A> {
* The object attached to the I/O operation when it was initiated.
*/
void failed(Throwable exc, A attachment);
/**
* Invoked when an operation is cancelled by invoking the {@link
* java.util.concurrent.Future#cancel cancel} method.
*
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void cancelled(A attachment);
}
......@@ -190,5 +190,5 @@ gen WritePendingException "
gen ShutdownChannelGroupException "
* Unchecked exception thrown when an attempt is made to construct a channel in
* a group that is shutdown or the completion handler for an I/O operation
* cannot be invoked because the channel group is shutdown." \
* cannot be invoked because the channel group has terminated." \
-3903801676350154157L
/*
* Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Sun designates this
* particular file as subject to the "Classpath" exception as provided
* by Sun in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
package sun.nio.ch;
import java.nio.channels.AsynchronousChannel;
import java.util.concurrent.Future;
/**
* Base implementation of Future used for asynchronous I/O
*/
abstract class AbstractFuture<V,A>
implements Future<V>
{
private final AsynchronousChannel channel;
private final A attachment;
protected AbstractFuture(AsynchronousChannel channel, A attachment) {
this.channel = channel;
this.attachment = attachment;
}
final AsynchronousChannel channel() {
return channel;
}
final A attachment() {
return attachment;
}
/**
* Returns the result of the operation if it has completed successfully.
*/
abstract V value();
/**
* Returns the exception if the operation has failed.
*/
abstract Throwable exception();
}
......@@ -32,8 +32,8 @@ import java.io.IOException;
import java.io.FileDescriptor;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.security.PrivilegedAction;
import java.security.AccessController;
import java.security.AccessControlContext;
......@@ -65,11 +65,8 @@ abstract class AsynchronousChannelGroupImpl
private final Queue<Runnable> taskQueue;
// group shutdown
// shutdownLock is RW lock so as to allow for concurrent queuing of tasks
// when using a fixed thread pool.
private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private final AtomicBoolean shutdown = new AtomicBoolean();
private final Object shutdownNowLock = new Object();
private volatile boolean shutdown;
private volatile boolean terminateInitiated;
AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
......@@ -214,7 +211,7 @@ abstract class AsynchronousChannelGroupImpl
@Override
public final boolean isShutdown() {
return shutdown;
return shutdown.get();
}
@Override
......@@ -260,17 +257,10 @@ abstract class AsynchronousChannelGroupImpl
@Override
public final void shutdown() {
shutdownLock.writeLock().lock();
try {
if (shutdown) {
// already shutdown
return;
}
shutdown = true;
} finally {
shutdownLock.writeLock().unlock();
if (shutdown.getAndSet(true)) {
// already shutdown
return;
}
// if there are channels in the group then shutdown will continue
// when the last channel is closed
if (!isEmpty()) {
......@@ -289,12 +279,7 @@ abstract class AsynchronousChannelGroupImpl
@Override
public final void shutdownNow() throws IOException {
shutdownLock.writeLock().lock();
try {
shutdown = true;
} finally {
shutdownLock.writeLock().unlock();
}
shutdown.set(true);
synchronized (shutdownNowLock) {
if (!terminateInitiated) {
terminateInitiated = true;
......@@ -305,6 +290,18 @@ abstract class AsynchronousChannelGroupImpl
}
}
/**
* For use by AsynchronousFileChannel to release resources without shutting
* down the thread pool.
*/
final void detachFromThreadPool() {
if (shutdown.getAndSet(true))
throw new AssertionError("Already shutdown");
if (!isEmpty())
throw new AssertionError("Group not empty");
shutdownHandlerTasks();
}
@Override
public final boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException
......
......@@ -25,8 +25,10 @@
package sun.nio.ch;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.locks.*;
import java.io.FileDescriptor;
import java.io.IOException;
......@@ -101,6 +103,33 @@ abstract class AsynchronousFileChannelImpl
// -- file locking --
abstract <A> Future<FileLock> implLock(long position,
long size,
boolean shared,
A attachment,
CompletionHandler<FileLock,? super A> handler);
@Override
public final Future<FileLock> lock(long position,
long size,
boolean shared)
{
return implLock(position, size, shared, null, null);
}
@Override
public final <A> void lock(long position,
long size,
boolean shared,
A attachment,
CompletionHandler<FileLock,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implLock(position, size, shared, attachment, handler);
}
private volatile FileLockTable fileLockTable;
final void ensureFileLockTableInitialized() throws IOException {
......@@ -175,4 +204,50 @@ abstract class AsynchronousFileChannelImpl
end();
}
}
// -- reading and writing --
abstract <A> Future<Integer> implRead(ByteBuffer dst,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler);
@Override
public final Future<Integer> read(ByteBuffer dst, long position) {
return implRead(dst, position, null, null);
}
@Override
public final <A> void read(ByteBuffer dst,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implRead(dst, position, attachment, handler);
}
abstract <A> Future<Integer> implWrite(ByteBuffer src,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler);
@Override
public final Future<Integer> write(ByteBuffer src, long position) {
return implWrite(src, position, null, null);
}
@Override
public final <A> void write(ByteBuffer src,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implWrite(src, position, attachment, handler);
}
}
......@@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import sun.net.NetHooks;
......@@ -108,6 +109,29 @@ abstract class AsynchronousServerSocketChannelImpl
implClose();
}
/**
* Invoked by accept to accept connection
*/
abstract Future<AsynchronousSocketChannel>
implAccept(Object attachment,
CompletionHandler<AsynchronousSocketChannel,Object> handler);
@Override
public final Future<AsynchronousSocketChannel> accept() {
return implAccept(null, null);
}
@Override
@SuppressWarnings("unchecked")
public final <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implAccept(attachment, (CompletionHandler<AsynchronousSocketChannel,Object>)handler);
}
final boolean isAcceptKilled() {
return acceptKilled;
}
......
......@@ -183,29 +183,54 @@ abstract class AsynchronousSocketChannelImpl
killWriting();
}
/**
* Invoked by connect to initiate the connect operation.
*/
abstract <A> Future<Void> implConnect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler);
@Override
public final Future<Void> connect(SocketAddress remote) {
return implConnect(remote, null, null);
}
@Override
public final <A> void connect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implConnect(remote, attachment, handler);
}
/**
* Invoked by read to initiate the I/O operation.
*/
abstract <V extends Number,A> Future<V> readImpl(ByteBuffer[] dsts,
boolean isScatteringRead,
abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
ByteBuffer dst,
ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<V,? super A> handler);
@SuppressWarnings("unchecked")
private <V extends Number,A> Future<V> read(ByteBuffer[] dsts,
boolean isScatteringRead,
private <V extends Number,A> Future<V> read(boolean isScatteringRead,
ByteBuffer dst,
ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
A attachment,
A att,
CompletionHandler<V,? super A> handler)
{
if (!isOpen()) {
CompletedFuture<V,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable e = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(e);
Invoker.invoke(this, handler, att, null, e);
return null;
}
if (remoteAddress == null)
......@@ -213,13 +238,13 @@ abstract class AsynchronousSocketChannelImpl
if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
boolean hasSpaceToRead = isScatteringRead || dsts[0].hasRemaining();
boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
boolean shutdown = false;
// check and update state
synchronized (readLock) {
if (readKilled)
throw new RuntimeException("Reading not allowed due to timeout or cancellation");
throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
if (reading)
throw new ReadPendingException();
if (readShutdown) {
......@@ -234,44 +259,53 @@ abstract class AsynchronousSocketChannelImpl
// immediately complete with -1 if shutdown for read
// immediately complete with 0 if no space remaining
if (shutdown || !hasSpaceToRead) {
CompletedFuture<V,A> result;
Number result;
if (isScatteringRead) {
Long value = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, value, attachment);
result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
} else {
int value = (shutdown) ? -1 : 0;
result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, value, attachment);
result = (shutdown) ? -1 : 0;
}
Invoker.invoke(handler, result);
return result;
if (handler == null)
return CompletedFuture.withResult((V)result);
Invoker.invoke(this, handler, att, (V)result, null);
return null;
}
return readImpl(dsts, isScatteringRead, timeout, unit, attachment, handler);
return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
}
@Override
public final <A> Future<Integer> read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
public final Future<Integer> read(ByteBuffer dst) {
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
public final <A> void read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
ByteBuffer[] bufs = new ByteBuffer[1];
bufs[0] = dst;
return read(bufs, false, timeout, unit, attachment, handler);
read(false, dst, null, timeout, unit, attachment, handler);
}
@Override
public final <A> Future<Long> read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler)
public final <A> void read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
......@@ -279,39 +313,41 @@ abstract class AsynchronousSocketChannelImpl
if (bufs[i].isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
}
return read(bufs, true, timeout, unit, attachment, handler);
read(true, null, bufs, timeout, unit, attachment, handler);
}
/**
* Invoked by write to initiate the I/O operation.
*/
abstract <V extends Number,A> Future<V> writeImpl(ByteBuffer[] srcs,
boolean isGatheringWrite,
abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
ByteBuffer src,
ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<V,? super A> handler);
@SuppressWarnings("unchecked")
private <V extends Number,A> Future<V> write(ByteBuffer[] srcs,
boolean isGatheringWrite,
private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
ByteBuffer src,
ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
A att,
CompletionHandler<V,? super A> handler)
{
boolean hasDataToWrite = isGatheringWrite || srcs[0].hasRemaining();
boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
boolean closed = false;
if (isOpen()) {
if (remoteAddress == null)
throw new NotYetConnectedException();
if (timeout < 0L)
if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
// check and update state
synchronized (writeLock) {
if (writeKilled)
throw new RuntimeException("Writing not allowed due to timeout or cancellation");
throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
if (writing)
throw new WritePendingException();
if (writeShutdown) {
......@@ -327,52 +363,57 @@ abstract class AsynchronousSocketChannelImpl
// channel is closed or shutdown for write
if (closed) {
CompletedFuture<V,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable e = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(e);
Invoker.invoke(this, handler, att, null, e);
return null;
}
// nothing to write so complete immediately
if (!hasDataToWrite) {
CompletedFuture<V,A> result;
if (isGatheringWrite) {
result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, 0L, attachment);
} else {
result = (CompletedFuture<V,A>)CompletedFuture.withResult(this, 0, attachment);
}
Invoker.invoke(handler, result);
return result;
Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
if (handler == null)
return CompletedFuture.withResult((V)result);
Invoker.invoke(this, handler, att, (V)result, null);
return null;
}
return writeImpl(srcs, isGatheringWrite, timeout, unit, attachment, handler);
return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
}
@Override
public final <A> Future<Integer> write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
public final Future<Integer> write(ByteBuffer src) {
return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
public final <A> void write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
ByteBuffer[] bufs = new ByteBuffer[1];
bufs[0] = src;
return write(bufs, false, timeout, unit, attachment, handler);
if (handler == null)
throw new NullPointerException("'handler' is null");
write(false, src, null, timeout, unit, attachment, handler);
}
@Override
public final <A> Future<Long> write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler)
public final <A> void write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
srcs = Util.subsequence(srcs, offset, length);
return write(srcs, true, timeout, unit, attachment, handler);
write(true, null, srcs, timeout, unit, attachment, handler);
}
@Override
......@@ -461,7 +502,6 @@ abstract class AsynchronousSocketChannelImpl
}
@Override
@SuppressWarnings("unchecked")
public final SocketAddress getRemoteAddress() throws IOException {
if (!isOpen())
throw new ClosedChannelException();
......
......@@ -25,7 +25,7 @@
package sun.nio.ch;
import java.nio.channels.AsynchronousChannel;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.io.IOException;
......@@ -35,39 +35,35 @@ import java.io.IOException;
* completed.
*/
final class CompletedFuture<V,A>
extends AbstractFuture<V,A>
{
final class CompletedFuture<V> implements Future<V> {
private final V result;
private final Throwable exc;
private CompletedFuture(AsynchronousChannel channel,
V result,
Throwable exc,
A attachment)
{
super(channel, attachment);
private CompletedFuture(V result, Throwable exc) {
this.result = result;
this.exc = exc;
}
@SuppressWarnings("unchecked")
static <V,A> CompletedFuture<V,A> withResult(AsynchronousChannel channel,
V result,
A attachment)
{
return new CompletedFuture<V,A>(channel, result, null, attachment);
static <V> CompletedFuture<V> withResult(V result) {
return new CompletedFuture<V>(result, null);
}
@SuppressWarnings("unchecked")
static <V,A> CompletedFuture<V,A> withFailure(AsynchronousChannel channel,
Throwable exc,
A attachment)
{
static <V> CompletedFuture<V> withFailure(Throwable exc) {
// exception must be IOException or SecurityException
if (!(exc instanceof IOException) && !(exc instanceof SecurityException))
exc = new IOException(exc);
return new CompletedFuture(channel, null, exc, attachment);
return new CompletedFuture(null, exc);
}
@SuppressWarnings("unchecked")
static <V> CompletedFuture<V> withResult(V result, Throwable exc) {
if (exc == null) {
return withResult(result);
} else {
return withFailure(exc);
}
}
@Override
......@@ -100,14 +96,4 @@ final class CompletedFuture<V,A>
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
Throwable exception() {
return exc;
}
@Override
V value() {
return result;
}
}
......@@ -117,33 +117,32 @@ class Invoker {
* Invoke handler without checking the thread identity or number of handlers
* on the thread stack.
*/
@SuppressWarnings("unchecked")
static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
AbstractFuture<V,A> result)
A attachment,
V value,
Throwable exc)
{
if (handler != null && !result.isCancelled()) {
Throwable exc = result.exception();
if (exc == null) {
handler.completed(result.value(), result.attachment());
} else {
handler.failed(exc, result.attachment());
}
// clear interrupt
Thread.interrupted();
if (exc == null) {
handler.completed(value, attachment);
} else {
handler.failed(exc, attachment);
}
}
// clear interrupt
Thread.interrupted();
}
/**
* Invoke handler after incrementing the invoke count.
* Invoke handler assuming thread identity already checked
*/
static <V,A> void invokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
CompletionHandler<V,? super A> handler,
AbstractFuture<V,A> result)
A attachment,
V result,
Throwable exc)
{
myGroupAndInvokeCount.incrementInvokeCount();
invokeUnchecked(handler, result);
Invoker.invokeUnchecked(handler, attachment, result, exc);
}
/**
......@@ -151,64 +150,64 @@ class Invoker {
* thread pool then the handler is invoked directly, otherwise it is
* invoked indirectly.
*/
static <V,A> void invoke(CompletionHandler<V,? super A> handler,
AbstractFuture<V,A> result)
static <V,A> void invoke(AsynchronousChannel channel,
CompletionHandler<V,? super A> handler,
A attachment,
V result,
Throwable exc)
{
if (handler != null) {
boolean invokeDirect = false;
boolean identityOkay = false;
GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
if (thisGroupAndInvokeCount != null) {
AsynchronousChannel channel = result.channel();
if ((thisGroupAndInvokeCount.group() == ((Groupable)channel).group()))
identityOkay = true;
if (identityOkay &&
(thisGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
{
// group match
invokeDirect = true;
}
boolean invokeDirect = false;
boolean identityOkay = false;
GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
if (thisGroupAndInvokeCount != null) {
if ((thisGroupAndInvokeCount.group() == ((Groupable)channel).group()))
identityOkay = true;
if (identityOkay &&
(thisGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
{
// group match
invokeDirect = true;
}
if (invokeDirect) {
thisGroupAndInvokeCount.incrementInvokeCount();
invokeUnchecked(handler, result);
} else {
try {
invokeIndirectly(handler, result);
} catch (RejectedExecutionException ree) {
// channel group shutdown; fallback to invoking directly
// if the current thread has the right identity.
if (identityOkay) {
invokeUnchecked(handler, result);
} else {
throw new ShutdownChannelGroupException();
}
}
if (invokeDirect) {
invokeDirect(thisGroupAndInvokeCount, handler, attachment, result, exc);
} else {
try {
invokeIndirectly(channel, handler, attachment, result, exc);
} catch (RejectedExecutionException ree) {
// channel group shutdown; fallback to invoking directly
// if the current thread has the right identity.
if (identityOkay) {
invokeDirect(thisGroupAndInvokeCount,
handler, attachment, result, exc);
} else {
throw new ShutdownChannelGroupException();
}
}
}
}
/**
* Invokes the handler "indirectly" in the channel group's thread pool.
* Invokes the handler indirectly via the channel group's thread pool.
*/
static <V,A> void invokeIndirectly(final CompletionHandler<V,? super A> handler,
final AbstractFuture<V,A> result)
static <V,A> void invokeIndirectly(AsynchronousChannel channel,
final CompletionHandler<V,? super A> handler,
final A attachment,
final V result,
final Throwable exc)
{
if (handler != null) {
AsynchronousChannel channel = result.channel();
try {
((Groupable)channel).group().executeOnPooledThread(new Runnable() {
public void run() {
GroupAndInvokeCount thisGroupAndInvokeCount =
myGroupAndInvokeCount.get();
if (thisGroupAndInvokeCount != null)
thisGroupAndInvokeCount.setInvokeCount(1);
invokeUnchecked(handler, result);
}
});
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
try {
((Groupable)channel).group().executeOnPooledThread(new Runnable() {
public void run() {
GroupAndInvokeCount thisGroupAndInvokeCount =
myGroupAndInvokeCount.get();
if (thisGroupAndInvokeCount != null)
thisGroupAndInvokeCount.setInvokeCount(1);
invokeUnchecked(handler, attachment, result, exc);
}
});
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
}
......@@ -216,19 +215,19 @@ class Invoker {
* Invokes the handler "indirectly" in the given Executor
*/
static <V,A> void invokeIndirectly(final CompletionHandler<V,? super A> handler,
final AbstractFuture<V,A> result,
final A attachment,
final V value,
final Throwable exc,
Executor executor)
{
if (handler != null) {
try {
executor.execute(new Runnable() {
public void run() {
invokeUnchecked(handler, result);
}
});
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
try {
executor.execute(new Runnable() {
public void run() {
invokeUnchecked(handler, attachment, value, exc);
}
});
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
}
......@@ -258,4 +257,52 @@ class Invoker {
throw new ShutdownChannelGroupException();
}
}
/**
* Invoke handler with completed result. This method does not check the
* thread identity or the number of handlers on the thread stack.
*/
static <V,A> void invokeUnchecked(PendingFuture<V,A> future) {
assert future.isDone();
CompletionHandler<V,? super A> handler = future.handler();
if (handler != null) {
invokeUnchecked(handler,
future.attachment(),
future.value(),
future.exception());
}
}
/**
* Invoke handler with completed result. If the current thread is in the
* channel group's thread pool then the handler is invoked directly,
* otherwise it is invoked indirectly.
*/
static <V,A> void invoke(PendingFuture<V,A> future) {
assert future.isDone();
CompletionHandler<V,? super A> handler = future.handler();
if (handler != null) {
invoke(future.channel(),
handler,
future.attachment(),
future.value(),
future.exception());
}
}
/**
* Invoke handler with completed result. The handler is invoked indirectly,
* via the channel group's thread pool.
*/
static <V,A> void invokeIndirectly(PendingFuture<V,A> future) {
assert future.isDone();
CompletionHandler<V,? super A> handler = future.handler();
if (handler != null) {
invokeIndirectly(future.channel(),
handler,
future.attachment(),
future.value(),
future.exception());
}
}
}
......@@ -34,13 +34,13 @@ import java.io.IOException;
* attachment of an additional arbitrary context object and a timer task.
*/
final class PendingFuture<V,A>
extends AbstractFuture<V,A>
{
final class PendingFuture<V,A> implements Future<V> {
private static final CancellationException CANCELLED =
new CancellationException();
private final AsynchronousChannel channel;
private final CompletionHandler<V,? super A> handler;
private final A attachment;
// true if result (or exception) is available
private volatile boolean haveResult;
......@@ -56,14 +56,14 @@ final class PendingFuture<V,A>
// optional context object
private volatile Object context;
PendingFuture(AsynchronousChannel channel,
CompletionHandler<V,? super A> handler,
A attachment,
Object context)
{
super(channel, attachment);
this.channel = channel;
this.handler = handler;
this.attachment = attachment;
this.context = context;
}
......@@ -71,14 +71,31 @@ final class PendingFuture<V,A>
CompletionHandler<V,? super A> handler,
A attachment)
{
super(channel, attachment);
this.channel = channel;
this.handler = handler;
this.attachment = attachment;
}
PendingFuture(AsynchronousChannel channel) {
this(channel, null, null);
}
PendingFuture(AsynchronousChannel channel, Object context) {
this(channel, null, null, context);
}
AsynchronousChannel channel() {
return channel;
}
CompletionHandler<V,? super A> handler() {
return handler;
}
A attachment() {
return attachment;
}
void setContext(Object context) {
this.context = context;
}
......@@ -113,36 +130,45 @@ final class PendingFuture<V,A>
/**
* Sets the result, or a no-op if the result or exception is already set.
*/
boolean setResult(V res) {
void setResult(V res) {
synchronized (this) {
if (haveResult)
return false;
return;
result = res;
haveResult = true;
if (timeoutTask != null)
timeoutTask.cancel(false);
if (latch != null)
latch.countDown();
return true;
}
}
/**
* Sets the result, or a no-op if the result or exception is already set.
*/
boolean setFailure(Throwable x) {
void setFailure(Throwable x) {
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
synchronized (this) {
if (haveResult)
return false;
return;
exc = x;
haveResult = true;
if (timeoutTask != null)
timeoutTask.cancel(false);
if (latch != null)
latch.countDown();
return true;
}
}
/**
* Sets the result
*/
void setResult(V res, Throwable x) {
if (x == null) {
setResult(res);
} else {
setFailure(x);
}
}
......@@ -178,12 +204,10 @@ final class PendingFuture<V,A>
return result;
}
@Override
Throwable exception() {
return (exc != CANCELLED) ? exc : null;
}
@Override
V value() {
return result;
}
......@@ -204,33 +228,6 @@ final class PendingFuture<V,A>
if (haveResult)
return false; // already completed
// A shutdown of the channel group will close all channels and
// shutdown the executor. To ensure that the completion handler
// is executed we queue the task while holding the lock.
if (handler != null) {
prepareForWait();
Runnable cancelTask = new Runnable() {
public void run() {
while (!haveResult) {
try {
latch.await();
} catch (InterruptedException ignore) { }
}
handler.cancelled(attachment());
}
};
AsynchronousChannel ch = channel();
if (ch instanceof Groupable) {
((Groupable)ch).group().executeOnPooledThread(cancelTask);
} else {
if (ch instanceof AsynchronousFileChannelImpl) {
((AsynchronousFileChannelImpl)ch).executor().execute(cancelTask);
} else {
throw new AssertionError("Should not get here");
}
}
}
// notify channel
if (channel() instanceof Cancellable)
((Cancellable)channel()).onCancel(this);
......@@ -249,7 +246,7 @@ final class PendingFuture<V,A>
} catch (IOException ignore) { }
}
// release waiters (this also releases the invoker)
// release waiters
if (latch != null)
latch.countDown();
return true;
......
......@@ -317,51 +317,71 @@ class SimpleAsynchronousDatagramChannelImpl
return new WrappedMembershipKey(this, key);
}
@Override
public <A> Future<Integer> send(ByteBuffer src,
SocketAddress target,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
private <A> Future<Integer> implSend(ByteBuffer src,
SocketAddress target,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
if (unit == null)
throw new NullPointerException();
CompletedFuture<Integer,A> result;
int n = 0;
Throwable exc = null;
try {
int n = dc.send(src, target);
result = CompletedFuture.withResult(this, n, attachment);
n = dc.send(src, target);
} catch (IOException ioe) {
result = CompletedFuture.withFailure(this, ioe, attachment);
exc = ioe;
}
Invoker.invoke(handler, result);
return result;
if (handler == null)
return CompletedFuture.withResult(n, exc);
Invoker.invoke(this, handler, attachment, n, exc);
return null;
}
@Override
public Future<Integer> send(ByteBuffer src, SocketAddress target) {
return implSend(src, target, null, null);
}
@Override
public <A> Future<Integer> write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
public <A> void send(ByteBuffer src,
SocketAddress target,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (timeout < 0L)
throw new IllegalArgumentException("Negative timeout");
if (unit == null)
throw new NullPointerException();
if (handler == null)
throw new NullPointerException("'handler' is null");
implSend(src, target, attachment, handler);
}
CompletedFuture<Integer,A> result;
private <A> Future<Integer> implWrite(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
int n = 0;
Throwable exc = null;
try {
int n = dc.write(src);
result = CompletedFuture.withResult(this, n, attachment);
n = dc.write(src);
} catch (IOException ioe) {
result = CompletedFuture.withFailure(this, ioe, attachment);
exc = ioe;
}
Invoker.invoke(handler, result);
return result;
if (handler == null)
return CompletedFuture.withResult(n, exc);
Invoker.invoke(this, handler, attachment, n, exc);
return null;
}
@Override
public Future<Integer> write(ByteBuffer src) {
return implWrite(src, null, null);
}
@Override
public <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implWrite(src, attachment, handler);
}
/**
......@@ -390,12 +410,11 @@ class SimpleAsynchronousDatagramChannelImpl
}
}
@Override
public <A> Future<SocketAddress> receive(final ByteBuffer dst,
final long timeout,
final TimeUnit unit,
A attachment,
final CompletionHandler<SocketAddress,? super A> handler)
private <A> Future<SocketAddress> implReceive(final ByteBuffer dst,
final long timeout,
final TimeUnit unit,
A attachment,
final CompletionHandler<SocketAddress,? super A> handler)
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
......@@ -406,10 +425,11 @@ class SimpleAsynchronousDatagramChannelImpl
// complete immediately if channel closed
if (!isOpen()) {
CompletedFuture<SocketAddress,A> result = CompletedFuture.withFailure(this,
new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invoke(this, handler, attachment, null, exc);
return null;
}
final AccessControlContext acc = (System.getSecurityManager() == null) ?
......@@ -471,7 +491,7 @@ class SimpleAsynchronousDatagramChannelImpl
x = new AsynchronousCloseException();
result.setFailure(x);
}
Invoker.invokeUnchecked(handler, result);
Invoker.invokeUnchecked(result);
}
};
try {
......@@ -483,11 +503,27 @@ class SimpleAsynchronousDatagramChannelImpl
}
@Override
public <A> Future<Integer> read(final ByteBuffer dst,
final long timeout,
final TimeUnit unit,
A attachment,
final CompletionHandler<Integer,? super A> handler)
public Future<SocketAddress> receive(ByteBuffer dst) {
return implReceive(dst, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
public <A> void receive(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<SocketAddress,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implReceive(dst, timeout, unit, attachment, handler);
}
private <A> Future<Integer> implRead(final ByteBuffer dst,
final long timeout,
final TimeUnit unit,
A attachment,
final CompletionHandler<Integer,? super A> handler)
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
......@@ -495,18 +531,20 @@ class SimpleAsynchronousDatagramChannelImpl
throw new IllegalArgumentException("Negative timeout");
if (unit == null)
throw new NullPointerException();
// another thread may disconnect before read is initiated
if (!dc.isConnected())
throw new NotYetConnectedException();
// complete immediately if channel closed
if (!isOpen()) {
CompletedFuture<Integer,A> result = CompletedFuture.withFailure(this,
new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invoke(this, handler, attachment, null, exc);
return null;
}
// another thread may disconnect before read is initiated
if (!dc.isConnected())
throw new NotYetConnectedException();
final PendingFuture<Integer,A> result =
new PendingFuture<Integer,A>(this, handler, attachment);
Runnable task = new Runnable() {
......@@ -563,7 +601,7 @@ class SimpleAsynchronousDatagramChannelImpl
x = new AsynchronousCloseException();
result.setFailure(x);
}
Invoker.invokeUnchecked(handler, result);
Invoker.invokeUnchecked(result);
}
};
try {
......@@ -574,6 +612,23 @@ class SimpleAsynchronousDatagramChannelImpl
return result;
}
@Override
public Future<Integer> read(ByteBuffer dst) {
return implRead(dst, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
public <A> void read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
implRead(dst, timeout, unit, attachment, handler);
}
@Override
public AsynchronousDatagramChannel bind(SocketAddress local)
throws IOException
......
......@@ -50,9 +50,6 @@ public class SimpleAsynchronousFileChannelImpl
// Used to make native read and write calls
private static final FileDispatcher nd = new FileDispatcherImpl();
// indicates if the associated thread pool is the default thread pool
private final boolean isDefaultExecutor;
// Thread-safe set of IDs of native threads, for signalling
private final NativeThreadSet threads = new NativeThreadSet(2);
......@@ -60,11 +57,9 @@ public class SimpleAsynchronousFileChannelImpl
SimpleAsynchronousFileChannelImpl(FileDescriptor fdObj,
boolean reading,
boolean writing,
ExecutorService executor,
boolean isDefaultexecutor)
ExecutorService executor)
{
super(fdObj, reading, writing, executor);
this.isDefaultExecutor = isDefaultexecutor;
}
public static AsynchronousFileChannel open(FileDescriptor fdo,
......@@ -73,17 +68,9 @@ public class SimpleAsynchronousFileChannelImpl
ThreadPool pool)
{
// Executor is either default or based on pool parameters
ExecutorService executor;
boolean isDefaultexecutor;
if (pool == null) {
executor = DefaultExecutorHolder.defaultExecutor;
isDefaultexecutor = true;
} else {
executor = pool.executor();
isDefaultexecutor = false;
}
return new SimpleAsynchronousFileChannelImpl(fdo,
reading, writing, executor, isDefaultexecutor);
ExecutorService executor = (pool == null) ?
DefaultExecutorHolder.defaultExecutor : pool.executor();
return new SimpleAsynchronousFileChannelImpl(fdo, reading, writing, executor);
}
@Override
......@@ -114,16 +101,6 @@ public class SimpleAsynchronousFileChannelImpl
// close file
nd.close(fdObj);
// shutdown executor if specific to this channel
if (!isDefaultExecutor) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
executor.shutdown();
return null;
}
});
}
}
@Override
......@@ -194,11 +171,11 @@ public class SimpleAsynchronousFileChannelImpl
}
@Override
public <A> Future<FileLock> lock(final long position,
final long size,
final boolean shared,
A attachment,
final CompletionHandler<FileLock,? super A> handler)
<A> Future<FileLock> implLock(final long position,
final long size,
final boolean shared,
final A attachment,
final CompletionHandler<FileLock,? super A> handler)
{
if (shared && !reading)
throw new NonReadableChannelException();
......@@ -208,16 +185,19 @@ public class SimpleAsynchronousFileChannelImpl
// add to lock table
final FileLockImpl fli = addToFileLockTable(position, size, shared);
if (fli == null) {
CompletedFuture<FileLock,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invokeIndirectly(handler, result, executor);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invokeIndirectly(handler, attachment, null, exc, executor);
return null;
}
final PendingFuture<FileLock,A> result =
new PendingFuture<FileLock,A>(this, handler, attachment);
final PendingFuture<FileLock,A> result = (handler == null) ?
new PendingFuture<FileLock,A>(this) : null;
Runnable task = new Runnable() {
public void run() {
Throwable exc = null;
int ti = threads.add();
try {
int n;
......@@ -226,31 +206,36 @@ public class SimpleAsynchronousFileChannelImpl
do {
n = nd.lock(fdObj, true, position, size, shared);
} while ((n == FileDispatcher.INTERRUPTED) && isOpen());
if (n == FileDispatcher.LOCKED && isOpen()) {
result.setResult(fli);
} else {
if (n != FileDispatcher.LOCKED || !isOpen()) {
throw new AsynchronousCloseException();
}
} catch (IOException x) {
removeFromFileLockTable(fli);
if (!isOpen())
x = new AsynchronousCloseException();
result.setFailure(x);
exc = x;
} finally {
end();
}
} finally {
threads.remove(ti);
}
Invoker.invokeUnchecked(handler, result);
if (handler == null) {
result.setResult(fli, exc);
} else {
Invoker.invokeUnchecked(handler, attachment, fli, exc);
}
}
};
boolean executed = false;
try {
executor.execute(task);
} catch (RejectedExecutionException ree) {
// rollback
removeFromFileLockTable(fli);
throw new ShutdownChannelGroupException();
executed = true;
} finally {
if (!executed) {
// rollback
removeFromFileLockTable(fli);
}
}
return result;
}
......@@ -301,10 +286,10 @@ public class SimpleAsynchronousFileChannelImpl
}
@Override
public <A> Future<Integer> read(final ByteBuffer dst,
final long position,
A attachment,
final CompletionHandler<Integer,? super A> handler)
<A> Future<Integer> implRead(final ByteBuffer dst,
final long position,
final A attachment,
final CompletionHandler<Integer,? super A> handler)
{
if (position < 0)
throw new IllegalArgumentException("Negative position");
......@@ -315,55 +300,52 @@ public class SimpleAsynchronousFileChannelImpl
// complete immediately if channel closed or no space remaining
if (!isOpen() || (dst.remaining() == 0)) {
CompletedFuture<Integer,A> result;
if (isOpen()) {
result = CompletedFuture.withResult(this, 0, attachment);
} else {
result = CompletedFuture.withFailure(this,
new ClosedChannelException(), attachment);
}
Invoker.invokeIndirectly(handler, result, executor);
return result;
Throwable exc = (isOpen()) ? null : new ClosedChannelException();
if (handler == null)
return CompletedFuture.withResult(0, exc);
Invoker.invokeIndirectly(handler, attachment, 0, exc, executor);
return null;
}
final PendingFuture<Integer,A> result =
new PendingFuture<Integer,A>(this, handler, attachment);
final PendingFuture<Integer,A> result = (handler == null) ?
new PendingFuture<Integer,A>(this) : null;
Runnable task = new Runnable() {
public void run() {
int n = 0;
Throwable exc = null;
int ti = threads.add();
try {
begin();
int n;
do {
n = IOUtil.read(fdObj, dst, position, nd, null);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n < 0 && !isOpen())
throw new AsynchronousCloseException();
result.setResult(n);
} catch (IOException x) {
if (!isOpen())
x = new AsynchronousCloseException();
result.setFailure(x);
exc = x;
} finally {
end();
threads.remove(ti);
}
Invoker.invokeUnchecked(handler, result);
if (handler == null) {
result.setResult(n, exc);
} else {
Invoker.invokeUnchecked(handler, attachment, n, exc);
}
}
};
try {
executor.execute(task);
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
executor.execute(task);
return result;
}
@Override
public <A> Future<Integer> write(final ByteBuffer src,
final long position,
A attachment,
final CompletionHandler<Integer,? super A> handler)
<A> Future<Integer> implWrite(final ByteBuffer src,
final long position,
final A attachment,
final CompletionHandler<Integer,? super A> handler)
{
if (position < 0)
throw new IllegalArgumentException("Negative position");
......@@ -372,47 +354,44 @@ public class SimpleAsynchronousFileChannelImpl
// complete immediately if channel is closed or no bytes remaining
if (!isOpen() || (src.remaining() == 0)) {
CompletedFuture<Integer,A> result;
if (isOpen()) {
result = CompletedFuture.withResult(this, 0, attachment);
} else {
result = CompletedFuture.withFailure(this,
new ClosedChannelException(), attachment);
}
Invoker.invokeIndirectly(handler, result, executor);
return result;
Throwable exc = (isOpen()) ? null : new ClosedChannelException();
if (handler == null)
return CompletedFuture.withResult(0, exc);
Invoker.invokeIndirectly(handler, attachment, 0, exc, executor);
return null;
}
final PendingFuture<Integer,A> result =
new PendingFuture<Integer,A>(this, handler, attachment);
final PendingFuture<Integer,A> result = (handler == null) ?
new PendingFuture<Integer,A>(this) : null;
Runnable task = new Runnable() {
public void run() {
int n = 0;
Throwable exc = null;
int ti = threads.add();
try {
begin();
int n;
do {
n = IOUtil.write(fdObj, src, position, nd, null);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n < 0 && !isOpen())
throw new AsynchronousCloseException();
result.setResult(n);
} catch (IOException x) {
if (!isOpen())
x = new AsynchronousCloseException();
result.setFailure(x);
exc = x;
} finally {
end();
threads.remove(ti);
}
Invoker.invokeUnchecked(handler, result);
if (handler == null) {
result.setResult(n, exc);
} else {
Invoker.invokeUnchecked(handler, attachment, n, exc);
}
}
};
try {
executor.execute(task);
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
executor.execute(task);
return result;
}
}
......@@ -248,12 +248,13 @@ final class EPollPort
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
final boolean isPooledThread = (myGroupAndInvokeCount != null);
boolean replaceMe = false;
Event ev;
try {
for (;;) {
// reset invoke count
if (myGroupAndInvokeCount != null)
if (isPooledThread)
myGroupAndInvokeCount.resetInvokeCount();
try {
......@@ -289,7 +290,7 @@ final class EPollPort
// process event
try {
ev.channel().onEvent(ev.events());
ev.channel().onEvent(ev.events(), isPooledThread);
} catch (Error x) {
replaceMe = true; throw x;
} catch (RuntimeException x) {
......
......@@ -49,7 +49,7 @@ abstract class Port extends AsynchronousChannelGroupImpl {
* Implemented by clients registered with this port.
*/
interface PollableChannel extends Closeable {
void onEvent(int events);
void onEvent(int events, boolean mayInvokeDirect);
}
// maps fd to "pollable" channel
......@@ -121,7 +121,7 @@ abstract class Port extends AsynchronousChannelGroupImpl {
final Object attachForeignChannel(final Channel channel, FileDescriptor fd) {
int fdVal = IOUtil.fdVal(fd);
register(fdVal, new PollableChannel() {
public void onEvent(int events) { }
public void onEvent(int events, boolean mayInvokeDirect) { }
public void close() throws IOException {
channel.close();
}
......
......@@ -151,12 +151,13 @@ class SolarisEventPort
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
final boolean isPooledThread = (myGroupAndInvokeCount != null);
boolean replaceMe = false;
long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT);
try {
for (;;) {
// reset invoke count
if (myGroupAndInvokeCount != null)
if (isPooledThread)
myGroupAndInvokeCount.resetInvokeCount();
// wait for I/O completion event
......@@ -205,7 +206,7 @@ class SolarisEventPort
if (ch != null) {
replaceMe = true;
// no need to translate events
ch.onEvent(events);
ch.onEvent(events, isPooledThread);
}
}
} finally {
......
......@@ -59,10 +59,13 @@ class UnixAsynchronousServerSocketChannelImpl
private final Object updateLock = new Object();
// pending accept
private PendingFuture<AsynchronousSocketChannel,Object> pendingAccept;
private boolean acceptPending;
private CompletionHandler<AsynchronousSocketChannel,Object> acceptHandler;
private Object acceptAttachment;
private PendingFuture<AsynchronousSocketChannel,Object> acceptFuture;
// context for permission check when security manager set
private AccessControlContext acc;
private AccessControlContext acceptAcc;
UnixAsynchronousServerSocketChannelImpl(Port port)
......@@ -83,15 +86,6 @@ class UnixAsynchronousServerSocketChannelImpl
port.register(fdVal, this);
}
// returns and clears the result of a pending accept
private PendingFuture<AsynchronousSocketChannel,Object> grabPendingAccept() {
synchronized (updateLock) {
PendingFuture<AsynchronousSocketChannel,Object> result = pendingAccept;
pendingAccept = null;
return result;
}
}
@Override
void implClose() throws IOException {
// remove the mapping
......@@ -101,17 +95,27 @@ class UnixAsynchronousServerSocketChannelImpl
nd.close(fd);
// if there is a pending accept then complete it
final PendingFuture<AsynchronousSocketChannel,Object> result =
grabPendingAccept();
if (result != null) {
// discard the stack trace as otherwise it may appear that implClose
// has thrown the exception.
AsynchronousCloseException x = new AsynchronousCloseException();
x.setStackTrace(new StackTraceElement[0]);
result.setFailure(x);
CompletionHandler<AsynchronousSocketChannel,Object> handler;
Object att;
PendingFuture<AsynchronousSocketChannel,Object> future;
synchronized (updateLock) {
if (!acceptPending)
return; // no pending accept
acceptPending = false;
handler = acceptHandler;
att = acceptAttachment;
future = acceptFuture;
}
// discard the stack trace as otherwise it may appear that implClose
// has thrown the exception.
AsynchronousCloseException x = new AsynchronousCloseException();
x.setStackTrace(new StackTraceElement[0]);
if (handler == null) {
future.setFailure(x);
} else {
// invoke by submitting task rather than directly
Invoker.invokeIndirectly(result.handler(), result);
Invoker.invokeIndirectly(this, handler, att, null, x);
}
}
......@@ -124,15 +128,17 @@ class UnixAsynchronousServerSocketChannelImpl
* Invoked by event handling thread when listener socket is polled
*/
@Override
public void onEvent(int events) {
PendingFuture<AsynchronousSocketChannel,Object> result = grabPendingAccept();
if (result == null)
return; // may have been grabbed by asynchronous close
public void onEvent(int events, boolean mayInvokeDirect) {
synchronized (updateLock) {
if (!acceptPending)
return; // may have been grabbed by asynchronous close
acceptPending = false;
}
// attempt to accept connection
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
boolean accepted = false;
Throwable exc = null;
try {
begin();
int n = accept0(this.fd, newfd, isaa);
......@@ -140,49 +146,52 @@ class UnixAsynchronousServerSocketChannelImpl
// spurious wakeup, is this possible?
if (n == IOStatus.UNAVAILABLE) {
synchronized (updateLock) {
this.pendingAccept = result;
acceptPending = true;
}
port.startPoll(fdVal, Port.POLLIN);
return;
}
// connection accepted
accepted = true;
} catch (Throwable x) {
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
enableAccept();
result.setFailure(x);
exc = x;
} finally {
end();
}
// Connection accepted so finish it when not holding locks.
AsynchronousSocketChannel child = null;
if (accepted) {
if (exc == null) {
try {
child = finishAccept(newfd, isaa[0], acc);
enableAccept();
result.setResult(child);
child = finishAccept(newfd, isaa[0], acceptAcc);
} catch (Throwable x) {
enableAccept();
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
result.setFailure(x);
exc = x;
}
}
// if an async cancel has already cancelled the operation then
// close the new channel so as to free resources
if (child != null && result.isCancelled()) {
try {
child.close();
} catch (IOException ignore) { }
}
// copy field befores accept is re-renabled
CompletionHandler<AsynchronousSocketChannel,Object> handler = acceptHandler;
Object att = acceptAttachment;
PendingFuture<AsynchronousSocketChannel,Object> future = acceptFuture;
// invoke the handler
Invoker.invoke(result.handler(), result);
// re-enable accepting and invoke handler
enableAccept();
if (handler == null) {
future.setResult(child, exc);
// if an async cancel has already cancelled the operation then
// close the new channel so as to free resources
if (child != null && future.isCancelled()) {
try {
child.close();
} catch (IOException ignore) { }
}
} else {
Invoker.invoke(this, handler, att, child, exc);
}
}
/**
......@@ -234,16 +243,18 @@ class UnixAsynchronousServerSocketChannelImpl
}
@Override
@SuppressWarnings("unchecked")
public <A> Future<AsynchronousSocketChannel> accept(A attachment,
final CompletionHandler<AsynchronousSocketChannel,? super A> handler)
Future<AsynchronousSocketChannel> implAccept(Object att,
CompletionHandler<AsynchronousSocketChannel,Object> handler)
{
// complete immediately if channel is closed
if (!isOpen()) {
CompletedFuture<AsynchronousSocketChannel,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invokeIndirectly(handler, result);
return result;
Throwable e = new ClosedChannelException();
if (handler == null) {
return CompletedFuture.withFailure(e);
} else {
Invoker.invoke(this, handler, att, null, e);
return null;
}
}
if (localAddress == null)
throw new NotYetBoundException();
......@@ -258,25 +269,31 @@ class UnixAsynchronousServerSocketChannelImpl
throw new AcceptPendingException();
// attempt accept
AbstractFuture<AsynchronousSocketChannel,A> result = null;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
Throwable exc = null;
try {
begin();
int n = accept0(this.fd, newfd, isaa);
if (n == IOStatus.UNAVAILABLE) {
// no connection to accept
result = new PendingFuture<AsynchronousSocketChannel,A>(this, handler, attachment);
// need calling context when there is security manager as
// permission check may be done in a different thread without
// any application call frames on the stack
synchronized (this) {
this.acc = (System.getSecurityManager() == null) ?
PendingFuture<AsynchronousSocketChannel,Object> result = null;
synchronized (updateLock) {
if (handler == null) {
this.acceptHandler = null;
result = new PendingFuture<AsynchronousSocketChannel,Object>(this);
this.acceptFuture = result;
} else {
this.acceptHandler = handler;
this.acceptAttachment = att;
}
this.acceptAcc = (System.getSecurityManager() == null) ?
null : AccessController.getContext();
this.pendingAccept =
(PendingFuture<AsynchronousSocketChannel,Object>)result;
this.acceptPending = true;
}
// register for connections
......@@ -287,25 +304,30 @@ class UnixAsynchronousServerSocketChannelImpl
// accept failed
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
result = CompletedFuture.withFailure(this, x, attachment);
exc = x;
} finally {
end();
}
// connection accepted immediately
if (result == null) {
AsynchronousSocketChannel child = null;
if (exc == null) {
// connection accepted immediately
try {
AsynchronousSocketChannel ch = finishAccept(newfd, isaa[0], null);
result = CompletedFuture.withResult(this, ch, attachment);
child = finishAccept(newfd, isaa[0], null);
} catch (Throwable x) {
result = CompletedFuture.withFailure(this, x, attachment);
exc = x;
}
}
// re-enable accepting and invoke handler
// re-enable accepting before invoking handler
enableAccept();
Invoker.invokeIndirectly(handler, result);
return result;
if (handler == null) {
return CompletedFuture.withResult(child, exc);
} else {
Invoker.invokeIndirectly(this, handler, att, child, exc);
return null;
}
}
// -- Native methods --
......
......@@ -34,6 +34,8 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.security.AccessController;
import sun.security.action.GetPropertyAction;
import sun.misc.Unsafe;
/**
......@@ -44,6 +46,7 @@ import sun.misc.Unsafe;
class Iocp extends AsynchronousChannelGroupImpl {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long INVALID_HANDLE_VALUE = -1L;
private static final boolean supportsThreadAgnosticIo;
// maps completion key to channel
private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
......@@ -87,6 +90,13 @@ class Iocp extends AsynchronousChannelGroupImpl {
<V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
}
/**
* Indicates if this operating system supports thread agnostic I/O.
*/
static boolean supportsThreadAgnosticIo() {
return supportsThreadAgnosticIo;
}
// release all resources
void implClose() {
synchronized (this) {
......@@ -216,8 +226,9 @@ class Iocp extends AsynchronousChannelGroupImpl {
} while ((key == 0) || keyToChannel.containsKey(key));
// associate with I/O completion port
if (handle != 0L)
if (handle != 0L) {
createIoCompletionPort(handle, port, key, 0);
}
// setup mapping
keyToChannel.put(key, ch);
......@@ -282,7 +293,7 @@ class Iocp extends AsynchronousChannelGroupImpl {
/**
* Invoked if the I/O operation completes successfully.
*/
public void completed(int bytesTransferred);
public void completed(int bytesTransferred, boolean canInvokeDirect);
/**
* Invoked if the I/O operation fails.
......@@ -305,6 +316,7 @@ class Iocp extends AsynchronousChannelGroupImpl {
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
boolean canInvokeDirect = (myGroupAndInvokeCount != null);
CompletionStatus ioResult = new CompletionStatus();
boolean replaceMe = false;
......@@ -382,7 +394,7 @@ class Iocp extends AsynchronousChannelGroupImpl {
ResultHandler rh = (ResultHandler)result.getContext();
replaceMe = true; // (if error/exception then replace thread)
if (error == 0) {
rh.completed(ioResult.bytesTransferred());
rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
} else {
rh.failed(error, translateErrorToIOException(error));
}
......@@ -433,5 +445,11 @@ class Iocp extends AsynchronousChannelGroupImpl {
static {
Util.load();
initIDs();
// thread agnostic I/O on Vista/2008 or newer
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String vers[] = osversion.split("\\.");
supportsThreadAgnosticIo = Integer.parseInt(vers[0]) >= 6;
}
}
......@@ -146,10 +146,12 @@ public class WindowsAsynchronousFileChannelImpl
// waits until all I/O operations have completed
ioCache.close();
// disassociate from port and shutdown thread pool if not default
// disassociate from port
iocp.disassociate(completionKey);
// for the non-default group close the port
if (!isDefaultIocp)
iocp.shutdown();
iocp.detachFromThreadPool();
}
@Override
......@@ -258,14 +260,18 @@ public class WindowsAsynchronousFileChannelImpl
}
// invoke completion handler
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
@Override
public void completed(int bytesTransferred) {
public void completed(int bytesTransferred, boolean canInvokeDirect) {
// release waiters and invoke completion handler
result.setResult(fli);
Invoker.invoke(result.handler(), result);
if (canInvokeDirect) {
Invoker.invokeUnchecked(result);
} else {
Invoker.invoke(result);
}
}
@Override
......@@ -279,16 +285,16 @@ public class WindowsAsynchronousFileChannelImpl
} else {
result.setFailure(new AsynchronousCloseException());
}
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
}
@Override
public <A> Future<FileLock> lock(long position,
long size,
boolean shared,
A attachment,
CompletionHandler<FileLock,? super A> handler)
<A> Future<FileLock> implLock(final long position,
final long size,
final boolean shared,
A attachment,
final CompletionHandler<FileLock,? super A> handler)
{
if (shared && !reading)
throw new NonReadableChannelException();
......@@ -298,10 +304,11 @@ public class WindowsAsynchronousFileChannelImpl
// add to lock table
FileLockImpl fli = addToFileLockTable(position, size, shared);
if (fli == null) {
CompletedFuture<FileLock,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invoke(this, handler, attachment, null, exc);
return null;
}
// create Future and task that will be invoked to acquire lock
......@@ -310,13 +317,20 @@ public class WindowsAsynchronousFileChannelImpl
LockTask lockTask = new LockTask<A>(position, fli, result);
result.setContext(lockTask);
// initiate I/O (can only be done from thread in thread pool)
try {
Invoker.invokeOnThreadInThreadPool(this, lockTask);
} catch (ShutdownChannelGroupException e) {
// rollback
removeFromFileLockTable(fli);
throw e;
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
lockTask.run();
} else {
boolean executed = false;
try {
Invoker.invokeOnThreadInThreadPool(this, lockTask);
executed = true;
} finally {
if (!executed) {
// rollback
removeFromFileLockTable(fli);
}
}
}
return result;
}
......@@ -461,14 +475,14 @@ public class WindowsAsynchronousFileChannelImpl
releaseBufferIfSubstituted();
// invoke completion handler
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
/**
* Executed when the I/O has completed
*/
@Override
public void completed(int bytesTransferred) {
public void completed(int bytesTransferred, boolean canInvokeDirect) {
updatePosition(bytesTransferred);
// return direct buffer to cache if substituted
......@@ -476,14 +490,18 @@ public class WindowsAsynchronousFileChannelImpl
// release waiters and invoke completion handler
result.setResult(bytesTransferred);
Invoker.invoke(result.handler(), result);
if (canInvokeDirect) {
Invoker.invokeUnchecked(result);
} else {
Invoker.invoke(result);
}
}
@Override
public void failed(int error, IOException x) {
// if EOF detected asynchronously then it is reported as error
if (error == ERROR_HANDLE_EOF) {
completed(-1);
completed(-1, false);
} else {
// return direct buffer to cache if substituted
releaseBufferIfSubstituted();
......@@ -494,16 +512,16 @@ public class WindowsAsynchronousFileChannelImpl
} else {
result.setFailure(new AsynchronousCloseException());
}
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
}
}
@Override
public <A> Future<Integer> read(ByteBuffer dst,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler)
<A> Future<Integer> implRead(ByteBuffer dst,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (!reading)
throw new NonReadableChannelException();
......@@ -514,10 +532,11 @@ public class WindowsAsynchronousFileChannelImpl
// check if channel is closed
if (!isOpen()) {
CompletedFuture<Integer,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invoke(this, handler, attachment, null, exc);
return null;
}
int pos = dst.position();
......@@ -527,10 +546,10 @@ public class WindowsAsynchronousFileChannelImpl
// no space remaining
if (rem == 0) {
CompletedFuture<Integer,A> result =
CompletedFuture.withResult(this, 0, attachment);
Invoker.invoke(handler, result);
return result;
if (handler == null)
return CompletedFuture.withResult(0);
Invoker.invoke(this, handler, attachment, 0, null);
return null;
}
// create Future and task that initiates read
......@@ -539,8 +558,12 @@ public class WindowsAsynchronousFileChannelImpl
ReadTask readTask = new ReadTask<A>(dst, pos, rem, position, result);
result.setContext(readTask);
// initiate I/O (can only be done from thread in thread pool)
Invoker.invokeOnThreadInThreadPool(this, readTask);
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
readTask.run();
} else {
Invoker.invokeOnThreadInThreadPool(this, readTask);
}
return result;
}
......@@ -639,14 +662,14 @@ public class WindowsAsynchronousFileChannelImpl
}
// invoke completion handler
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
/**
* Executed when the I/O has completed
*/
@Override
public void completed(int bytesTransferred) {
public void completed(int bytesTransferred, boolean canInvokeDirect) {
updatePosition(bytesTransferred);
// return direct buffer to cache if substituted
......@@ -654,7 +677,11 @@ public class WindowsAsynchronousFileChannelImpl
// release waiters and invoke completion handler
result.setResult(bytesTransferred);
Invoker.invoke(result.handler(), result);
if (canInvokeDirect) {
Invoker.invokeUnchecked(result);
} else {
Invoker.invoke(result);
}
}
@Override
......@@ -668,15 +695,14 @@ public class WindowsAsynchronousFileChannelImpl
} else {
result.setFailure(new AsynchronousCloseException());
}
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
}
@Override
public <A> Future<Integer> write(ByteBuffer src,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler)
<A> Future<Integer> implWrite(ByteBuffer src,
long position,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (!writing)
throw new NonWritableChannelException();
......@@ -685,10 +711,11 @@ public class WindowsAsynchronousFileChannelImpl
// check if channel is closed
if (!isOpen()) {
CompletedFuture<Integer,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invoke(this, handler, attachment, null, exc);
return null;
}
int pos = src.position();
......@@ -698,10 +725,10 @@ public class WindowsAsynchronousFileChannelImpl
// nothing to write
if (rem == 0) {
CompletedFuture<Integer,A> result =
CompletedFuture.withResult(this, 0, attachment);
Invoker.invoke(handler, result);
return result;
if (handler == null)
return CompletedFuture.withResult(0);
Invoker.invoke(this, handler, attachment, 0, null);
return null;
}
// create Future and task to initiate write
......@@ -710,8 +737,12 @@ public class WindowsAsynchronousFileChannelImpl
WriteTask writeTask = new WriteTask<A>(src, pos, rem, position, result);
result.setContext(writeTask);
// initiate I/O (can only be done from thread in thread pool)
Invoker.invokeOnThreadInThreadPool(this, writeTask);
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
writeTask.run();
} else {
Invoker.invokeOnThreadInThreadPool(this, writeTask);
}
return result;
}
......
......@@ -113,14 +113,14 @@ class WindowsAsynchronousServerSocketChannelImpl
/**
* Task to initiate accept operation and to handle result.
*/
private class AcceptTask<A> implements Runnable, Iocp.ResultHandler {
private class AcceptTask implements Runnable, Iocp.ResultHandler {
private final WindowsAsynchronousSocketChannelImpl channel;
private final AccessControlContext acc;
private final PendingFuture<AsynchronousSocketChannel,A> result;
private final PendingFuture<AsynchronousSocketChannel,Object> result;
AcceptTask(WindowsAsynchronousSocketChannelImpl channel,
AccessControlContext acc,
PendingFuture<AsynchronousSocketChannel,A> result)
PendingFuture<AsynchronousSocketChannel,Object> result)
{
this.channel = channel;
this.acc = acc;
......@@ -222,14 +222,14 @@ class WindowsAsynchronousServerSocketChannelImpl
}
// invoke completion handler
Invoker.invokeIndirectly(result.handler(), result);
Invoker.invokeIndirectly(result);
}
/**
* Executed when the I/O has completed
*/
@Override
public void completed(int bytesTransferred) {
public void completed(int bytesTransferred, boolean canInvokeDirect) {
try {
// connection accept after group has shutdown
if (iocp.isShutdown()) {
......@@ -269,7 +269,7 @@ class WindowsAsynchronousServerSocketChannelImpl
}
// invoke handler (but not directly)
Invoker.invokeIndirectly(result.handler(), result);
Invoker.invokeIndirectly(result);
}
@Override
......@@ -283,19 +283,20 @@ class WindowsAsynchronousServerSocketChannelImpl
} else {
result.setFailure(new AsynchronousCloseException());
}
Invoker.invokeIndirectly(result.handler(), result);
Invoker.invokeIndirectly(result);
}
}
@Override
public <A> Future<AsynchronousSocketChannel> accept(A attachment,
final CompletionHandler<AsynchronousSocketChannel,? super A> handler)
Future<AsynchronousSocketChannel> implAccept(Object attachment,
final CompletionHandler<AsynchronousSocketChannel,Object> handler)
{
if (!isOpen()) {
CompletedFuture<AsynchronousSocketChannel,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invokeIndirectly(handler, result);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invokeIndirectly(this, handler, attachment, null, exc);
return null;
}
if (isAcceptKilled())
throw new RuntimeException("Accept not allowed due to cancellation");
......@@ -319,10 +320,10 @@ class WindowsAsynchronousServerSocketChannelImpl
end();
}
if (ioe != null) {
CompletedFuture<AsynchronousSocketChannel,A> result =
CompletedFuture.withFailure(this, ioe, attachment);
Invoker.invokeIndirectly(handler, result);
return result;
if (handler == null)
return CompletedFuture.withFailure(ioe);
Invoker.invokeIndirectly(this, handler, attachment, null, ioe);
return null;
}
// need calling context when there is security manager as
......@@ -331,20 +332,21 @@ class WindowsAsynchronousServerSocketChannelImpl
AccessControlContext acc = (System.getSecurityManager() == null) ?
null : AccessController.getContext();
PendingFuture<AsynchronousSocketChannel,A> result =
new PendingFuture<AsynchronousSocketChannel,A>(this, handler, attachment);
AcceptTask task = new AcceptTask<A>(ch, acc, result);
PendingFuture<AsynchronousSocketChannel,Object> result =
new PendingFuture<AsynchronousSocketChannel,Object>(this, handler, attachment);
AcceptTask task = new AcceptTask(ch, acc, result);
result.setContext(task);
// check and set flag to prevent concurrent accepting
if (!accepting.compareAndSet(false, true))
throw new AcceptPendingException();
// initiate accept. As I/O operations are tied to the initiating thread
// then it will only be invoked direcly if this thread is in the thread
// pool. If this thread is not in the thread pool when a task is
// submitted to initiate the accept.
Invoker.invokeOnThreadInThreadPool(this, task);
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
task.run();
} else {
Invoker.invokeOnThreadInThreadPool(this, task);
}
return result;
}
......
......@@ -250,14 +250,14 @@ class WindowsAsynchronousSocketChannelImpl
closeChannel();
result.setFailure(toIOException(exc));
}
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
/**
* Invoked by handler thread when connection established.
*/
@Override
public void completed(int bytesTransferred) {
public void completed(int bytesTransferred, boolean canInvokeDirect) {
Throwable exc = null;
try {
begin();
......@@ -276,7 +276,11 @@ class WindowsAsynchronousSocketChannelImpl
result.setFailure(toIOException(exc));
}
Invoker.invoke(result.handler(), result);
if (canInvokeDirect) {
Invoker.invokeUnchecked(result);
} else {
Invoker.invoke(result);
}
}
/**
......@@ -290,20 +294,21 @@ class WindowsAsynchronousSocketChannelImpl
} else {
result.setFailure(new AsynchronousCloseException());
}
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
}
@Override
public <A> Future<Void> connect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler)
<A> Future<Void> implConnect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler)
{
if (!isOpen()) {
CompletedFuture<Void,A> result = CompletedFuture
.withFailure(this, new ClosedChannelException(), attachment);
Invoker.invoke(handler, result);
return result;
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invoke(this, handler, attachment, null, exc);
return null;
}
InetSocketAddress isa = Net.checkAddress(remote);
......@@ -337,10 +342,10 @@ class WindowsAsynchronousSocketChannelImpl
try {
close();
} catch (IOException ignore) { }
CompletedFuture<Void,A> result = CompletedFuture
.withFailure(this, bindException, attachment);
Invoker.invoke(handler, result);
return result;
if (handler == null)
return CompletedFuture.withFailure(bindException);
Invoker.invoke(this, handler, attachment, null, bindException);
return null;
}
// setup task
......@@ -349,8 +354,12 @@ class WindowsAsynchronousSocketChannelImpl
ConnectTask task = new ConnectTask<A>(isa, result);
result.setContext(task);
// initiate I/O (can only be done from thread in thread pool)
Invoker.invokeOnThreadInThreadPool(this, task);
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
task.run();
} else {
Invoker.invokeOnThreadInThreadPool(this, task);
}
return result;
}
......@@ -514,7 +523,7 @@ class WindowsAsynchronousSocketChannelImpl
}
// invoke completion handler
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
/**
......@@ -522,7 +531,7 @@ class WindowsAsynchronousSocketChannelImpl
*/
@Override
@SuppressWarnings("unchecked")
public void completed(int bytesTransferred) {
public void completed(int bytesTransferred, boolean canInvokeDirect) {
if (bytesTransferred == 0) {
bytesTransferred = -1; // EOF
} else {
......@@ -543,7 +552,11 @@ class WindowsAsynchronousSocketChannelImpl
result.setResult((V)Integer.valueOf(bytesTransferred));
}
}
Invoker.invoke(result.handler(), result);
if (canInvokeDirect) {
Invoker.invokeUnchecked(result);
} else {
Invoker.invoke(result);
}
}
@Override
......@@ -561,7 +574,7 @@ class WindowsAsynchronousSocketChannelImpl
enableReading();
result.setFailure(x);
}
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
/**
......@@ -579,13 +592,14 @@ class WindowsAsynchronousSocketChannelImpl
}
// invoke handler without any locks
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
}
@Override
<V extends Number,A> Future<V> readImpl(ByteBuffer[] bufs,
boolean scatteringRead,
<V extends Number,A> Future<V> implRead(boolean isScatteringRead,
ByteBuffer dst,
ByteBuffer[] dsts,
long timeout,
TimeUnit unit,
A attachment,
......@@ -594,7 +608,14 @@ class WindowsAsynchronousSocketChannelImpl
// setup task
PendingFuture<V,A> result =
new PendingFuture<V,A>(this, handler, attachment);
final ReadTask readTask = new ReadTask<V,A>(bufs, scatteringRead, result);
ByteBuffer[] bufs;
if (isScatteringRead) {
bufs = dsts;
} else {
bufs = new ByteBuffer[1];
bufs[0] = dst;
}
final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
result.setContext(readTask);
// schedule timeout
......@@ -607,8 +628,12 @@ class WindowsAsynchronousSocketChannelImpl
result.setTimeoutTask(timeoutTask);
}
// initiate I/O (can only be done from thread in thread pool)
Invoker.invokeOnThreadInThreadPool(this, readTask);
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
readTask.run();
} else {
Invoker.invokeOnThreadInThreadPool(this, readTask);
}
return result;
}
......@@ -710,7 +735,7 @@ class WindowsAsynchronousSocketChannelImpl
}
@Override
@SuppressWarnings("unchecked")
//@SuppressWarnings("unchecked")
public void run() {
long overlapped = 0L;
boolean prepared = false;
......@@ -759,7 +784,7 @@ class WindowsAsynchronousSocketChannelImpl
}
// invoke completion handler
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
/**
......@@ -767,7 +792,7 @@ class WindowsAsynchronousSocketChannelImpl
*/
@Override
@SuppressWarnings("unchecked")
public void completed(int bytesTransferred) {
public void completed(int bytesTransferred, boolean canInvokeDirect) {
updateBuffers(bytesTransferred);
// return direct buffer to cache if substituted
......@@ -784,7 +809,11 @@ class WindowsAsynchronousSocketChannelImpl
result.setResult((V)Integer.valueOf(bytesTransferred));
}
}
Invoker.invoke(result.handler(), result);
if (canInvokeDirect) {
Invoker.invokeUnchecked(result);
} else {
Invoker.invoke(result);
}
}
@Override
......@@ -802,7 +831,7 @@ class WindowsAsynchronousSocketChannelImpl
enableWriting();
result.setFailure(x);
}
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
/**
......@@ -820,13 +849,14 @@ class WindowsAsynchronousSocketChannelImpl
}
// invoke handler without any locks
Invoker.invoke(result.handler(), result);
Invoker.invoke(result);
}
}
@Override
<V extends Number,A> Future<V> writeImpl(ByteBuffer[] bufs,
boolean gatheringWrite,
<V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
ByteBuffer src,
ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
......@@ -835,6 +865,13 @@ class WindowsAsynchronousSocketChannelImpl
// setup task
PendingFuture<V,A> result =
new PendingFuture<V,A>(this, handler, attachment);
ByteBuffer[] bufs;
if (gatheringWrite) {
bufs = srcs;
} else {
bufs = new ByteBuffer[1];
bufs[0] = src;
}
final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
result.setContext(writeTask);
......@@ -849,7 +886,12 @@ class WindowsAsynchronousSocketChannelImpl
}
// initiate I/O (can only be done from thread in thread pool)
Invoker.invokeOnThreadInThreadPool(this, writeTask);
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
writeTask.run();
} else {
Invoker.invokeOnThreadInThreadPool(this, writeTask);
}
return result;
}
......
......@@ -58,6 +58,16 @@ Java_sun_nio_ch_Iocp_initIDs(JNIEnv* env, jclass this)
completionStatus_overlapped = (*env)->GetFieldID(env, clazz, "overlapped", "J");
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Iocp_osMajorVersion(JNIEnv* env, jclass this)
{
OSVERSIONINFOEX ver;
ver.dwOSVersionInfoSize = sizeof(ver);
GetVersionEx((OSVERSIONINFO *) &ver);
return (ver.dwPlatformId == VER_PLATFORM_WIN32_NT) ?
(jint)(ver.dwMajorVersion) : (jint)0;
}
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_Iocp_createIoCompletionPort(JNIEnv* env, jclass this,
jlong handle, jlong existingPort, jint completionKey, jint concurrency)
......
......@@ -22,7 +22,7 @@
*/
/* @test
* @bug 4607272
* @bug 4607272 6842687
* @summary Unit test for AsynchronousChannelGroup
* @build Restart
* @run main/othervm -XX:-UseVMInterruptibleIO Restart
......@@ -111,8 +111,6 @@ public class Restart {
}
public void failed(Throwable exc, Void att) {
}
public void cancelled(Void att) {
}
});
// establish loopback connection which should cause completion
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册