提交 8709cbcd 编写于 作者: A alanb

6693490: (se) select throws "File exists" IOException under load (lnx)

Reviewed-by: sherman
上级 37500321
......@@ -25,6 +25,7 @@
package sun.nio.ch;
import java.nio.channels.Channel;
import java.io.FileDescriptor;
import java.io.IOException;
......@@ -35,7 +36,7 @@ import java.io.IOException;
* @since 1.4
*/
interface SelChImpl {
interface SelChImpl extends Channel {
FileDescriptor getFD();
......
......@@ -78,8 +78,8 @@ class EPollArrayWrapper {
// Base address of the native pollArray
private final long pollArrayAddress;
// Set of "idle" file descriptors
private final HashSet<Integer> idleSet;
// Set of "idle" channels
private final HashSet<SelChImpl> idleSet;
EPollArrayWrapper() {
// creates the epoll file descriptor
......@@ -96,19 +96,22 @@ class EPollArrayWrapper {
}
// create idle set
idleSet = new HashSet<Integer>();
idleSet = new HashSet<SelChImpl>();
}
// Used to update file description registrations
private static class Updator {
SelChImpl channel;
int opcode;
int fd;
int events;
Updator(int opcode, int fd, int events) {
Updator(SelChImpl channel, int opcode, int events) {
this.channel = channel;
this.opcode = opcode;
this.fd = fd;
this.events = events;
}
Updator(SelChImpl channel, int opcode) {
this(channel, opcode, 0);
}
}
private LinkedList<Updator> updateList = new LinkedList<Updator>();
......@@ -163,60 +166,54 @@ class EPollArrayWrapper {
}
/**
* Update the events for a given file descriptor.
* Update the events for a given channel.
*/
void setInterest(int fd, int mask) {
void setInterest(SelChImpl channel, int mask) {
synchronized (updateList) {
// if the interest events are 0 then add to idle set, and delete
// from epoll if registered (or pending)
if (mask == 0) {
if (idleSet.add(fd)) {
updateList.add(new Updator(EPOLL_CTL_DEL, fd, 0));
}
return;
}
// if file descriptor is idle then add to epoll
if (!idleSet.isEmpty() && idleSet.remove(fd)) {
updateList.add(new Updator(EPOLL_CTL_ADD, fd, mask));
return;
}
// if the previous pending operation is to add this file descriptor
// to epoll then update its event set
if (updateList.size() > 0) {
Updator last = updateList.getLast();
if (last.fd == fd && last.opcode == EPOLL_CTL_ADD) {
if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
last.events = mask;
return;
}
}
// update existing registration
updateList.add(new Updator(EPOLL_CTL_MOD, fd, mask));
updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
}
}
/**
* Add a new file descriptor to epoll
* Add a channel's file descriptor to epoll
*/
void add(int fd) {
void add(SelChImpl channel) {
synchronized (updateList) {
updateList.add(new Updator(EPOLL_CTL_ADD, fd, 0));
updateList.add(new Updator(channel, EPOLL_CTL_ADD));
}
}
/**
* Remove a file descriptor from epoll
* Remove a channel's file descriptor from epoll
*/
void release(int fd) {
void release(SelChImpl channel) {
synchronized (updateList) {
// if file descriptor is idle then remove from idle set, otherwise
// delete from epoll
if (!idleSet.remove(fd)) {
updateList.add(new Updator(EPOLL_CTL_DEL, fd, 0));
// flush any pending updates
int i = 0;
while (i < updateList.size()) {
if (updateList.get(i).channel == channel) {
updateList.remove(i);
} else {
i++;
}
}
// remove from the idle set (if present)
idleSet.remove(channel);
// remove from epoll (if registered)
epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
}
}
......@@ -248,7 +245,26 @@ class EPollArrayWrapper {
synchronized (updateList) {
Updator u = null;
while ((u = updateList.poll()) != null) {
epollCtl(epfd, u.opcode, u.fd, u.events);
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
// if the events are 0 then file descriptor is put into "idle
// set" to prevent it being polled
if (u.events == 0) {
boolean added = idleSet.add(u.channel);
// if added to idle set then remove from epoll if registered
if (added && (u.opcode == EPOLL_CTL_MOD))
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
} else {
// events are specified. If file descriptor was in idle set
// it must be re-registered (by converting opcode to ADD)
boolean idle = false;
if (!idleSet.isEmpty())
idle = idleSet.remove(u.channel);
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
}
}
}
}
......
......@@ -139,7 +139,6 @@ class EPollSelectorImpl
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
pollWrapper.release(fd0);
pollWrapper.closeEPollFD();
// it is possible
selectedKeys = null;
......@@ -162,17 +161,18 @@ class EPollSelectorImpl
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
int fd = IOUtil.fdVal(ski.channel.getFD());
fdToKey.put(Integer.valueOf(fd), ski);
pollWrapper.add(fd);
SelChImpl ch = ski.channel;
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
pollWrapper.add(ch);
keys.add(ski);
}
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
int fd = ski.channel.getFDVal();
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(Integer.valueOf(fd));
pollWrapper.release(fd);
pollWrapper.release(ch);
ski.setIndex(-1);
keys.remove(ski);
selectedKeys.remove(ski);
......@@ -185,8 +185,7 @@ class EPollSelectorImpl
void putEventOps(SelectionKeyImpl sk, int ops) {
if (closed)
throw new ClosedSelectorException();
int fd = IOUtil.fdVal(sk.channel.getFD());
pollWrapper.setInterest(fd, ops);
pollWrapper.setInterest(sk.channel, ops);
}
public Selector wakeup() {
......
/*
* Copyright 2009 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
/* @test
* @bug 6693490
* @summary Pre-close file descriptor may inadvertently get registered with
* epoll during close
*/
import java.net.*;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
public class RegAfterPreClose {
static volatile boolean done;
/**
* A task that continuously connects to a given address and immediately
* closes the connection.
*/
static class Connector implements Runnable {
private final SocketAddress sa;
Connector(int port) throws IOException {
InetAddress lh = InetAddress.getLocalHost();
this.sa = new InetSocketAddress(lh, port);
}
public void run() {
while (!done) {
try {
SocketChannel.open(sa).close();
} catch (IOException x) {
// back-off as probably resource related
try {
Thread.sleep(10);
} catch (InterruptedException ignore) { }
}
}
}
}
/**
* A task that closes a channel.
*/
static class Closer implements Runnable {
private final Channel channel;
Closer(Channel sc) {
this.channel = sc;
}
public void run() {
try {
channel.close();
} catch (IOException ignore) { }
}
}
public static void main(String[] args) throws Exception {
// create listener
InetSocketAddress isa = new InetSocketAddress(0);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(isa);
// register with Selector
final Selector sel = Selector.open();
ssc.configureBlocking(false);
SelectionKey key = ssc.register(sel, SelectionKey.OP_ACCEPT);
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
};
// schedule test to run for 1 minute
Executors.newScheduledThreadPool(1, factory).schedule(new Runnable() {
public void run() {
done = true;
sel.wakeup();
}}, 1, TimeUnit.MINUTES);
// create Executor that handles tasks that closes channels
// "asynchronously" - this creates the conditions to provoke the bug.
Executor executor = Executors.newFixedThreadPool(2, factory);
// submit task that connects to listener
executor.execute(new Connector(ssc.socket().getLocalPort()));
// loop accepting connections until done (or an IOException is thrown)
while (!done) {
sel.select();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
if (sc != null) {
sc.configureBlocking(false);
sc.register(sel, SelectionKey.OP_READ);
executor.execute(new Closer(sc));
}
}
sel.selectedKeys().clear();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册