提交 54c45f34 编写于 作者: R robm

7184932: Remove the temporary Selector usage in the NIO socket adapters

Reviewed-by: alanb
上级 3199964b
......@@ -108,6 +108,7 @@ SUNWprivate_1.1 {
Java_sun_nio_ch_Net_setInterface6;
Java_sun_nio_ch_Net_getInterface6;
Java_sun_nio_ch_Net_shutdown;
Java_sun_nio_ch_Net_poll;
Java_sun_nio_ch_PollArrayWrapper_interrupt;
Java_sun_nio_ch_PollArrayWrapper_poll0;
Java_sun_nio_ch_ServerSocketChannelImpl_accept0;
......
......@@ -116,6 +116,7 @@ SUNWprivate_1.1 {
Java_sun_nio_ch_Net_setInterface6;
Java_sun_nio_ch_Net_getInterface6;
Java_sun_nio_ch_Net_shutdown;
Java_sun_nio_ch_Net_poll;
Java_sun_nio_ch_PollArrayWrapper_interrupt;
Java_sun_nio_ch_PollArrayWrapper_poll0;
Java_sun_nio_ch_ServerSocketChannelImpl_accept0;
......
......@@ -104,6 +104,7 @@ SUNWprivate_1.1 {
Java_sun_nio_ch_Net_setInterface6;
Java_sun_nio_ch_Net_getInterface6;
Java_sun_nio_ch_Net_shutdown;
Java_sun_nio_ch_Net_poll;
Java_sun_nio_ch_PollArrayWrapper_interrupt;
Java_sun_nio_ch_PollArrayWrapper_poll0;
Java_sun_nio_ch_ServerSocketChannelImpl_accept0;
......
......@@ -1059,6 +1059,28 @@ class DatagramChannelImpl
return translateReadyOps(ops, 0, sk);
}
// package-private
int poll(int events, long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && !isBlocking();
synchronized (readLock) {
int n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen())
return 0;
readerThread = NativeThread.current();
}
n = Net.poll(fd, events, timeout);
} finally {
readerThread = 0;
end(n > 0);
}
return n;
}
}
/**
* Translates an interest operation set into a native poll event set
*/
......
......@@ -176,40 +176,31 @@ public class DatagramSocketAdaptor
return dc.receive(bb);
}
// Implement timeout with a selector
SelectionKey sk = null;
Selector sel = null;
dc.configureBlocking(false);
try {
int n;
SocketAddress sender;
if ((sender = dc.receive(bb)) != null)
return sender;
sel = Util.getTemporarySelector(dc);
sk = dc.register(sel, SelectionKey.OP_READ);
long to = timeout;
for (;;) {
if (!dc.isOpen())
throw new ClosedChannelException();
long st = System.currentTimeMillis();
int ns = sel.select(to);
if (ns > 0 && sk.isReadable()) {
int result = dc.poll(PollArrayWrapper.POLLIN, to);
if (result > 0 &&
((result & PollArrayWrapper.POLLIN) != 0)) {
if ((sender = dc.receive(bb)) != null)
return sender;
}
sel.selectedKeys().remove(sk);
to -= System.currentTimeMillis() - st;
if (to <= 0)
throw new SocketTimeoutException();
}
} finally {
if (sk != null)
sk.cancel();
if (dc.isOpen())
dc.configureBlocking(true);
if (sel != null)
Util.releaseTemporarySelector(sel);
}
}
......
......@@ -409,6 +409,9 @@ public class Net {
int level, int opt, int arg)
throws IOException;
static native int poll(FileDescriptor fd, int events, long timeout)
throws IOException;
// -- Multicast support --
......
......@@ -102,37 +102,26 @@ public class ServerSocketAdaptor // package-private
return sc.socket();
}
// Implement timeout with a selector
SelectionKey sk = null;
Selector sel = null;
ssc.configureBlocking(false);
try {
SocketChannel sc;
if ((sc = ssc.accept()) != null)
return sc.socket();
sel = Util.getTemporarySelector(ssc);
sk = ssc.register(sel, SelectionKey.OP_ACCEPT);
long to = timeout;
for (;;) {
if (!ssc.isOpen())
throw new ClosedChannelException();
long st = System.currentTimeMillis();
int ns = sel.select(to);
if (ns > 0 &&
sk.isAcceptable() && ((sc = ssc.accept()) != null))
int result = ssc.poll(PollArrayWrapper.POLLIN, to);
if (result > 0 && ((sc = ssc.accept()) != null))
return sc.socket();
sel.selectedKeys().remove(sk);
to -= System.currentTimeMillis() - st;
if (to <= 0)
throw new SocketTimeoutException();
}
} finally {
if (sk != null)
sk.cancel();
if (ssc.isOpen())
ssc.configureBlocking(true);
if (sel != null)
Util.releaseTemporarySelector(sel);
}
} catch (Exception x) {
......
......@@ -324,6 +324,28 @@ class ServerSocketChannelImpl
return translateReadyOps(ops, 0, sk);
}
// package-private
int poll(int events, long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && !isBlocking();
synchronized (lock) {
int n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen())
return 0;
thread = NativeThread.current();
}
n = Net.poll(fd, events, timeout);
} finally {
thread = 0;
end(n > 0);
}
return n;
}
}
/**
* Translates an interest operation set into a native poll event set
*/
......
......@@ -97,25 +97,19 @@ public class SocketAdaptor
return;
}
// Implement timeout with a selector
SelectionKey sk = null;
Selector sel = null;
sc.configureBlocking(false);
try {
if (sc.connect(remote))
return;
sel = Util.getTemporarySelector(sc);
sk = sc.register(sel, SelectionKey.OP_CONNECT);
long to = timeout;
for (;;) {
if (!sc.isOpen())
throw new ClosedChannelException();
long st = System.currentTimeMillis();
int ns = sel.select(to);
if (ns > 0 &&
sk.isConnectable() && sc.finishConnect())
int result = sc.poll(PollArrayWrapper.POLLCONN, to);
if (result > 0 && sc.finishConnect())
break;
sel.selectedKeys().remove(sk);
to -= System.currentTimeMillis() - st;
if (to <= 0) {
try {
......@@ -125,12 +119,8 @@ public class SocketAdaptor
}
}
} finally {
if (sk != null)
sk.cancel();
if (sc.isOpen())
sc.configureBlocking(true);
if (sel != null)
Util.releaseTemporarySelector(sel);
}
} catch (Exception x) {
......@@ -199,39 +189,29 @@ public class SocketAdaptor
throw new IllegalBlockingModeException();
if (timeout == 0)
return sc.read(bb);
// Implement timeout with a selector
SelectionKey sk = null;
Selector sel = null;
sc.configureBlocking(false);
try {
int n;
if ((n = sc.read(bb)) != 0)
return n;
sel = Util.getTemporarySelector(sc);
sk = sc.register(sel, SelectionKey.OP_READ);
long to = timeout;
for (;;) {
if (!sc.isOpen())
throw new ClosedChannelException();
long st = System.currentTimeMillis();
int ns = sel.select(to);
if (ns > 0 && sk.isReadable()) {
int result = sc.poll(PollArrayWrapper.POLLIN, to);
if (result > 0) {
if ((n = sc.read(bb)) != 0)
return n;
}
sel.selectedKeys().remove(sk);
to -= System.currentTimeMillis() - st;
if (to <= 0)
throw new SocketTimeoutException();
}
} finally {
if (sk != null)
sk.cancel();
if (sc.isOpen())
sc.configureBlocking(true);
if (sel != null)
Util.releaseTemporarySelector(sel);
}
}
......
......@@ -914,6 +914,28 @@ class SocketChannelImpl
return translateReadyOps(ops, 0, sk);
}
// package-private
int poll(int events, long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && !isBlocking();
synchronized (readLock) {
int n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen())
return 0;
readerThread = NativeThread.current();
}
n = Net.poll(fd, events, timeout);
} finally {
readerCleanup();
end(n > 0);
}
return n;
}
}
/**
* Translates an interest operation set into a native poll event set
*/
......
......@@ -218,66 +218,6 @@ public class Util {
((DirectBuffer)buf).cleaner().clean();
}
private static class SelectorWrapper {
private Selector sel;
private SelectorWrapper (Selector sel) {
this.sel = sel;
Cleaner.create(this, new Closer(sel));
}
private static class Closer implements Runnable {
private Selector sel;
private Closer (Selector sel) {
this.sel = sel;
}
public void run () {
try {
sel.close();
} catch (Throwable th) {
throw new Error(th);
}
}
}
public Selector get() { return sel;}
}
// Per-thread cached selector
private static ThreadLocal<SoftReference<SelectorWrapper>> localSelector
= new ThreadLocal<SoftReference<SelectorWrapper>>();
// Hold a reference to the selWrapper object to prevent it from
// being cleaned when the temporary selector wrapped is on lease.
private static ThreadLocal<SelectorWrapper> localSelectorWrapper
= new ThreadLocal<SelectorWrapper>();
// When finished, invoker must ensure that selector is empty
// by cancelling any related keys and explicitly releasing
// the selector by invoking releaseTemporarySelector()
static Selector getTemporarySelector(SelectableChannel sc)
throws IOException
{
SoftReference<SelectorWrapper> ref = localSelector.get();
SelectorWrapper selWrapper = null;
Selector sel = null;
if (ref == null
|| ((selWrapper = ref.get()) == null)
|| ((sel = selWrapper.get()) == null)
|| (sel.provider() != sc.provider())) {
sel = sc.provider().openSelector();
selWrapper = new SelectorWrapper(sel);
localSelector.set(new SoftReference<SelectorWrapper>(selWrapper));
}
localSelectorWrapper.set(selWrapper);
return sel;
}
static void releaseTemporarySelector(Selector sel)
throws IOException
{
// Selector should be empty
sel.selectNow(); // Flush cancelled keys
assert sel.keys().isEmpty() : "Temporary selector not empty";
localSelectorWrapper.set(null);
}
// -- Random stuff --
......
......@@ -38,6 +38,7 @@
#include "net_util_md.h"
#include "nio_util.h"
#include "nio.h"
#include "sun_nio_ch_PollArrayWrapper.h"
#ifdef _ALLBSD_SOURCE
......@@ -627,6 +628,26 @@ Java_sun_nio_ch_Net_shutdown(JNIEnv *env, jclass cl, jobject fdo, jint jhow)
handleSocketError(env, errno);
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_poll(JNIEnv* env, jclass this, jobject fdo, jint events, jlong timeout)
{
struct pollfd pfd;
int rv;
pfd.fd = fdval(env, fdo);
pfd.events = events;
rv = poll(&pfd, 1, timeout);
if (rv >= 0) {
return pfd.revents;
} else if (errno == EINTR) {
return IOS_INTERRUPTED;
} else if (rv < 0) {
handleSocketError(env, errno);
return IOS_THROWN;
}
}
/* Declared in nio_util.h */
jint
......
......@@ -35,6 +35,7 @@
#include "net_util.h"
#include "sun_nio_ch_Net.h"
#include "sun_nio_ch_PollArrayWrapper.h"
/**
* Definitions to allow for building with older SDK include files.
......@@ -524,3 +525,49 @@ Java_sun_nio_ch_Net_shutdown(JNIEnv *env, jclass cl, jobject fdo, jint jhow) {
NET_ThrowNew(env, WSAGetLastError(), "shutdown");
}
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_poll(JNIEnv* env, jclass this, jobject fdo, jint events, jlong timeout)
{
int rv;
int revents = 0;
struct timeval t;
int lastError = 0;
fd_set rd, wr, ex;
jint fd = fdval(env, fdo);
t.tv_sec = timeout / 1000;
t.tv_usec = (timeout % 1000) * 1000;
FD_ZERO(&rd);
FD_ZERO(&wr);
FD_ZERO(&ex);
if (events & sun_nio_ch_PollArrayWrapper_POLLIN) {
FD_SET(fd, &rd);
}
if (events & sun_nio_ch_PollArrayWrapper_POLLOUT ||
events & sun_nio_ch_PollArrayWrapper_POLLCONN) {
FD_SET(fd, &wr);
}
FD_SET(fd, &ex);
rv = select(fd+1, &rd, &wr, &ex, &t);
/* save last winsock error */
if (rv == SOCKET_ERROR) {
handleSocketError(env, lastError);
return IOS_THROWN;
} else if (rv >= 0) {
rv = 0;
if (FD_ISSET(fd, &rd)) {
rv |= sun_nio_ch_PollArrayWrapper_POLLIN;
}
if (FD_ISSET(fd, &wr)) {
rv |= sun_nio_ch_PollArrayWrapper_POLLOUT;
}
if (FD_ISSET(fd, &ex)) {
rv |= sun_nio_ch_PollArrayWrapper_POLLERR;
}
}
return rv;
}
/*
* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/* @test
* @bug 7184932
* @summary Test asynchronous close and interrupt of timed socket adapter methods
*/
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.net.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.Random;
public class AdaptorCloseAndInterrupt {
private static final ScheduledExecutorService pool =
Executors.newScheduledThreadPool(1);
final ServerSocketChannel listener;
final DatagramChannel peer;
final int port;
public AdaptorCloseAndInterrupt() {
listener = null;
peer = null;
port = -1;
}
public AdaptorCloseAndInterrupt(ServerSocketChannel listener) {
this.listener = listener;
this.port = listener.socket().getLocalPort();
this.peer = null;
}
public AdaptorCloseAndInterrupt(DatagramChannel listener) {
this.peer = listener;
this.port = peer.socket().getLocalPort();
this.listener = null;
}
public static void main(String args[]) throws Exception {
try {
try (ServerSocketChannel listener = ServerSocketChannel.open()) {
listener.socket().bind(null);
new AdaptorCloseAndInterrupt(listener).scReadAsyncClose();
new AdaptorCloseAndInterrupt(listener).scReadAsyncInterrupt();
}
try (DatagramChannel peer = DatagramChannel.open()) {
peer.socket().bind(null);
new AdaptorCloseAndInterrupt(peer).dcReceiveAsyncClose();
new AdaptorCloseAndInterrupt(peer).dcReceiveAsyncInterrupt();
}
new AdaptorCloseAndInterrupt().ssAcceptAsyncClose();
new AdaptorCloseAndInterrupt().ssAcceptAsyncInterrupt();
} finally {
pool.shutdown();
}
System.out.println("Test Passed");
}
void scReadAsyncClose() throws IOException {
try {
SocketChannel sc = SocketChannel.open(new InetSocketAddress(
"127.0.0.1", port));
sc.socket().setSoTimeout(30*1000);
doAsyncClose(sc);
try {
sc.socket().getInputStream().read(new byte[100]);
throw new RuntimeException("read should not have completed");
} catch (ClosedChannelException expected) {}
if (!sc.socket().isClosed())
throw new RuntimeException("socket is not closed");
} finally {
// accept connection and close it.
listener.accept().close();
}
}
void scReadAsyncInterrupt() throws IOException {
try {
final SocketChannel sc = SocketChannel.open(new InetSocketAddress(
"127.0.0.1", port));
sc.socket().setSoTimeout(30*1000);
doAsyncInterrupt();
try {
sc.socket().getInputStream().read(new byte[100]);
throw new RuntimeException("read should not have completed");
} catch (ClosedByInterruptException expected) {
Thread.currentThread().interrupted();
}
if (!sc.socket().isClosed())
throw new RuntimeException("socket is not closed");
} finally {
// accept connection and close it.
listener.accept().close();
}
}
void dcReceiveAsyncClose() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.connect(new InetSocketAddress(
"127.0.0.1", port));
dc.socket().setSoTimeout(30*1000);
doAsyncClose(dc);
try {
dc.socket().receive(new DatagramPacket(new byte[100], 100));
throw new RuntimeException("receive should not have completed");
} catch (ClosedChannelException expected) {}
if (!dc.socket().isClosed())
throw new RuntimeException("socket is not closed");
}
void dcReceiveAsyncInterrupt() throws IOException {
DatagramChannel dc = DatagramChannel.open();
dc.connect(new InetSocketAddress(
"127.0.0.1", port));
dc.socket().setSoTimeout(30*1000);
doAsyncInterrupt();
try {
dc.socket().receive(new DatagramPacket(new byte[100], 100));
throw new RuntimeException("receive should not have completed");
} catch (ClosedByInterruptException expected) {
Thread.currentThread().interrupted();
}
if (!dc.socket().isClosed())
throw new RuntimeException("socket is not closed");
}
void ssAcceptAsyncClose() throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(null);
ssc.socket().setSoTimeout(30*1000);
doAsyncClose(ssc);
try {
ssc.socket().accept();
throw new RuntimeException("accept should not have completed");
} catch (ClosedChannelException expected) {}
if (!ssc.socket().isClosed())
throw new RuntimeException("socket is not closed");
}
void ssAcceptAsyncInterrupt() throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(null);
ssc.socket().setSoTimeout(30*1000);
doAsyncInterrupt();
try {
ssc.socket().accept();
throw new RuntimeException("accept should not have completed");
} catch (ClosedByInterruptException expected) {
Thread.currentThread().interrupted();
}
if (!ssc.socket().isClosed())
throw new RuntimeException("socket is not closed");
}
void doAsyncClose(final AbstractSelectableChannel sc) {
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
sc.close();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
void doAsyncInterrupt() {
final Thread current = Thread.currentThread();
AdaptorCloseAndInterrupt.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
current.interrupt();
return null;
}
}, new Random().nextInt(1000), TimeUnit.MILLISECONDS);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册