提交 4d10f182 编写于 作者: A alanb

8009751: (se) Selector spin when select, close and interestOps(0) invoked at same time (lnx)

Reviewed-by: zhouyx, chegar, robm
上级 9625f7d1
......@@ -26,9 +26,9 @@
package sun.nio.ch;
import java.io.IOException;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
/**
* Manipulates a native array of epoll_event structs on Linux:
......@@ -52,89 +52,91 @@ import java.util.Iterator;
* this implementation we set data.fd to be the file descriptor that we
* register. That way, we have the file descriptor available when we
* process the events.
*
* All file descriptors registered with epoll have the POLLHUP and POLLERR
* events enabled even when registered with an event set of 0. To ensure
* that epoll_wait doesn't poll an idle file descriptor when the underlying
* connection is closed or reset then its registration is deleted from
* epoll (it will be re-added again if the event set is changed)
*/
class EPollArrayWrapper {
// EPOLL_EVENTS
static final int EPOLLIN = 0x001;
private static final int EPOLLIN = 0x001;
// opcodes
static final int EPOLL_CTL_ADD = 1;
static final int EPOLL_CTL_DEL = 2;
static final int EPOLL_CTL_MOD = 3;
private static final int EPOLL_CTL_ADD = 1;
private static final int EPOLL_CTL_DEL = 2;
private static final int EPOLL_CTL_MOD = 3;
// Miscellaneous constants
static final int SIZE_EPOLLEVENT = sizeofEPollEvent();
static final int EVENT_OFFSET = 0;
static final int DATA_OFFSET = offsetofData();
static final int FD_OFFSET = DATA_OFFSET;
static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 8192);
private static final int SIZE_EPOLLEVENT = sizeofEPollEvent();
private static final int EVENT_OFFSET = 0;
private static final int DATA_OFFSET = offsetofData();
private static final int FD_OFFSET = DATA_OFFSET;
private static final int OPEN_MAX = IOUtil.fdLimit();
private static final int NUM_EPOLLEVENTS = Math.min(OPEN_MAX, 8192);
// Base address of the native pollArray
private final long pollArrayAddress;
// Special value to indicate that an update should be ignored
private static final byte KILLED = (byte)-1;
// Set of "idle" channels
private final HashSet<SelChImpl> idleSet;
// Initial size of arrays for fd registration changes
private static final int INITIAL_PENDING_UPDATE_SIZE = 64;
EPollArrayWrapper() {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
for (int i=0; i<NUM_EPOLLEVENTS; i++) {
putEventOps(i, 0);
putData(i, 0L);
}
// create idle set
idleSet = new HashSet<SelChImpl>();
}
// maximum size of updatesLow
private static final int MAX_UPDATE_ARRAY_SIZE = Math.min(OPEN_MAX, 64*1024);
// Used to update file description registrations
private static class Updator {
SelChImpl channel;
int opcode;
int events;
Updator(SelChImpl channel, int opcode, int events) {
this.channel = channel;
this.opcode = opcode;
this.events = events;
}
Updator(SelChImpl channel, int opcode) {
this(channel, opcode, 0);
}
}
private LinkedList<Updator> updateList = new LinkedList<Updator>();
// The fd of the epoll driver
private final int epfd;
// The epoll_event array for results from epoll_wait
private AllocatedNativeObject pollArray;
private final AllocatedNativeObject pollArray;
// The fd of the epoll driver
final int epfd;
// Base address of the epoll_event array
private final long pollArrayAddress;
// The fd of the interrupt line going out
int outgoingInterruptFD;
private int outgoingInterruptFD;
// The fd of the interrupt line coming in
int incomingInterruptFD;
private int incomingInterruptFD;
// The index of the interrupt FD
int interruptedIndex;
private int interruptedIndex;
// Number of updated pollfd entries
int updated;
// object to synchronize fd registration changes
private final Object updateLock = new Object();
// number of file descriptors with registration changes pending
private int updateCount;
// file descriptors with registration changes pending
private int[] updateDescriptors = new int[INITIAL_PENDING_UPDATE_SIZE];
// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
private Map<Integer,Byte> eventsHigh;
// Used by release and updateRegistrations to track whether a file
// descriptor is registered with epoll.
private final BitSet registered = new BitSet();
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
......@@ -146,11 +148,6 @@ class EPollArrayWrapper {
pollArray.putInt(offset, event);
}
void putData(int i, long value) {
int offset = SIZE_EPOLLEVENT * i + DATA_OFFSET;
pollArray.putLong(offset, value);
}
void putDescriptor(int i, int fd) {
int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;
pollArray.putInt(offset, fd);
......@@ -167,51 +164,83 @@ class EPollArrayWrapper {
}
/**
* Update the events for a given channel.
* Sets the pending update events for the given file descriptor. This
* method has no effect if the update events is already set to KILLED,
* unless {@code force} is {@code true}.
*/
void setInterest(SelChImpl channel, int mask) {
synchronized (updateList) {
// if the previous pending operation is to add this file descriptor
// to epoll then update its event set
if (updateList.size() > 0) {
Updator last = updateList.getLast();
if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
last.events = mask;
return;
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if ((eventsHigh.get(key) != KILLED) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
// update existing registration
updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
/**
* Returns the pending update events for the given file descriptor.
*/
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
/**
* Add a channel's file descriptor to epoll
* Update the events for a given file descriptor
*/
void add(SelChImpl channel) {
synchronized (updateList) {
updateList.add(new Updator(channel, EPOLL_CTL_ADD));
void setInterest(int fd, int mask) {
synchronized (updateLock) {
// record the file descriptor and events
int oldCapacity = updateDescriptors.length;
if (updateCount == oldCapacity) {
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
int[] newDescriptors = new int[newCapacity];
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
updateDescriptors = newDescriptors;
}
updateDescriptors[updateCount++] = fd;
// events are stored as bytes for efficiency reasons
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
setUpdateEvents(fd, b, false);
}
}
/**
* Remove a channel's file descriptor from epoll
* Add a file descriptor
*/
void release(SelChImpl channel) {
synchronized (updateList) {
// flush any pending updates
for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
if (it.next().channel == channel) {
it.remove();
void add(int fd) {
// force the initial update events to 0 as it may be KILLED by a
// previous registration.
synchronized (updateLock) {
assert !registered.get(fd);
setUpdateEvents(fd, (byte)0, true);
}
}
// remove from the idle set (if present)
idleSet.remove(channel);
/**
* Remove a file descriptor
*/
void remove(int fd) {
synchronized (updateLock) {
// kill pending and future update for this file descriptor
setUpdateEvents(fd, KILLED, false);
// remove from epoll (if registered)
epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
// remove from epoll
if (registered.get(fd)) {
epollCtl(epfd, EPOLL_CTL_DEL, fd, 0);
registered.clear(fd);
}
}
}
......@@ -239,36 +268,38 @@ class EPollArrayWrapper {
/**
* Update the pending registrations.
*/
void updateRegistrations() {
synchronized (updateList) {
Updator u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
// if the events are 0 then file descriptor is put into "idle
// set" to prevent it being polled
if (u.events == 0) {
boolean added = idleSet.add(u.channel);
// if added to idle set then remove from epoll if registered
if (added && (u.opcode == EPOLL_CTL_MOD))
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
// events are specified. If file descriptor was in idle set
// it must be re-registered (by converting opcode to ADD)
boolean idle = false;
if (!idleSet.isEmpty())
idle = idleSet.remove(u.channel);
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
// interrupt support
boolean interrupted = false;
private boolean interrupted = false;
public void interrupt() {
interrupt(outgoingInterruptFD);
......
......@@ -53,26 +53,24 @@ class EPollSelectorImpl
private volatile boolean closed = false;
// Lock for interrupt triggering and clearing
private Object interruptLock = new Object();
private final Object interruptLock = new Object();
private boolean interruptTriggered = false;
/**
* Package private constructor called by factory method in
* the abstract superclass Selector.
*/
EPollSelectorImpl(SelectorProvider sp) {
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<Integer,SelectionKeyImpl>();
fdToKey = new HashMap<>();
}
protected int doSelect(long timeout)
throws IOException
{
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
......@@ -161,8 +159,9 @@ class EPollSelectorImpl
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
pollWrapper.add(ch);
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
......@@ -171,7 +170,7 @@ class EPollSelectorImpl
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(Integer.valueOf(fd));
pollWrapper.release(ch);
pollWrapper.remove(fd);
ski.setIndex(-1);
keys.remove(ski);
selectedKeys.remove(ski);
......@@ -181,10 +180,11 @@ class EPollSelectorImpl
((SelChImpl)selch).kill();
}
public void putEventOps(SelectionKeyImpl sk, int ops) {
public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed)
throw new ClosedSelectorException();
pollWrapper.setInterest(sk.channel, ops);
SelChImpl ch = ski.channel;
pollWrapper.setInterest(ch.getFDVal(), ops);
}
public Selector wakeup() {
......@@ -200,5 +200,4 @@ class EPollSelectorImpl
static {
Util.load();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册