提交 b229f16d 编写于 作者: A alanb

8200583: (se) Selector clean-up, part 4

Reviewed-by: bpb, chegar
上级 3b45965c
...@@ -63,20 +63,14 @@ class EPollSelectorImpl extends SelectorImpl { ...@@ -63,20 +63,14 @@ class EPollSelectorImpl extends SelectorImpl {
// maps file descriptor to selection key, synchronize on selector // maps file descriptor to selection key, synchronize on selector
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
// pending new registrations/updates, queued by implRegister and putEventOpos // pending new registrations/updates, queued by setEventOps
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing // interrupt triggering and clearing
private final Object interruptLock = new Object(); private final Object interruptLock = new Object();
private boolean interruptTriggered; private boolean interruptTriggered;
/**
* Package private constructor called by factory method in
* the abstract superclass Selector.
*/
EPollSelectorImpl(SelectorProvider sp) throws IOException { EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp); super(sp);
...@@ -140,30 +134,21 @@ class EPollSelectorImpl extends SelectorImpl { ...@@ -140,30 +134,21 @@ class EPollSelectorImpl extends SelectorImpl {
} }
/** /**
* Process new registrations and changes to the interest ops. * Process changes to the interest ops.
*/ */
private void processUpdateQueue() { private void processUpdateQueue() {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
synchronized (updateLock) { synchronized (updateLock) {
SelectionKeyImpl ski; SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) {
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) { if (ski.isValid()) {
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski); // add to fdToKey if needed
assert previous == null; SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert ski.registeredEvents() == 0; assert (previous == null) || (previous == ski);
}
}
// changes to interest ops int newEvents = ski.translateInterestOps();
assert updateKeys.size() == updateEvents.size();
while ((ski = updateKeys.pollFirst()) != null) {
int newEvents = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) {
int registeredEvents = ski.registeredEvents(); int registeredEvents = ski.registeredEvents();
if (newEvents != registeredEvents) { if (newEvents != registeredEvents) {
if (newEvents == 0) { if (newEvents == 0) {
...@@ -206,11 +191,11 @@ class EPollSelectorImpl extends SelectorImpl { ...@@ -206,11 +191,11 @@ class EPollSelectorImpl extends SelectorImpl {
if (ski != null) { if (ski != null) {
int rOps = EPoll.getEvents(event); int rOps = EPoll.getEvents(event);
if (selectedKeys.contains(ski)) { if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) { if (ski.translateAndSetReadyOps(rOps)) {
numKeysUpdated++; numKeysUpdated++;
} }
} else { } else {
ski.channel.translateAndSetReadyOps(rOps, ski); ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski); selectedKeys.add(ski);
numKeysUpdated++; numKeysUpdated++;
...@@ -243,20 +228,12 @@ class EPollSelectorImpl extends SelectorImpl { ...@@ -243,20 +228,12 @@ class EPollSelectorImpl extends SelectorImpl {
FileDispatcherImpl.closeIntFD(fd1); FileDispatcherImpl.closeIntFD(fd1);
} }
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
@Override @Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid(); assert !ski.isValid();
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
if (fdToKey.remove(fd) != null) { if (fdToKey.remove(fd) != null) {
if (ski.registeredEvents() != 0) { if (ski.registeredEvents() != 0) {
EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
...@@ -268,10 +245,9 @@ class EPollSelectorImpl extends SelectorImpl { ...@@ -268,10 +245,9 @@ class EPollSelectorImpl extends SelectorImpl {
} }
@Override @Override
public void putEventOps(SelectionKeyImpl ski, int events) { public void setEventOps(SelectionKeyImpl ski) {
ensureOpen(); ensureOpen();
synchronized (updateLock) { synchronized (updateLock) {
updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski); updateKeys.addLast(ski);
} }
} }
......
...@@ -62,11 +62,9 @@ class KQueueSelectorImpl extends SelectorImpl { ...@@ -62,11 +62,9 @@ class KQueueSelectorImpl extends SelectorImpl {
// maps file descriptor to selection key, synchronize on selector // maps file descriptor to selection key, synchronize on selector
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
// pending new registrations/updates, queued by implRegister and putEventOps // pending new registrations/updates, queued by setEventOps
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing // interrupt triggering and clearing
private final Object interruptLock = new Object(); private final Object interruptLock = new Object();
...@@ -138,30 +136,21 @@ class KQueueSelectorImpl extends SelectorImpl { ...@@ -138,30 +136,21 @@ class KQueueSelectorImpl extends SelectorImpl {
} }
/** /**
* Process new registrations and changes to the interest ops. * Process changes to the interest ops.
*/ */
private void processUpdateQueue() { private void processUpdateQueue() {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
synchronized (updateLock) { synchronized (updateLock) {
SelectionKeyImpl ski; SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) {
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) { if (ski.isValid()) {
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski); // add to fdToKey if needed
assert previous == null; SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert ski.registeredEvents() == 0; assert (previous == null) || (previous == ski);
}
}
// changes to interest ops int newEvents = ski.translateInterestOps();
assert updateKeys.size() == updateKeys.size();
while ((ski = updateKeys.pollFirst()) != null) {
int newEvents = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) {
int registeredEvents = ski.registeredEvents(); int registeredEvents = ski.registeredEvents();
if (newEvents != registeredEvents) { if (newEvents != registeredEvents) {
...@@ -229,16 +218,16 @@ class KQueueSelectorImpl extends SelectorImpl { ...@@ -229,16 +218,16 @@ class KQueueSelectorImpl extends SelectorImpl {
if (selectedKeys.contains(ski)) { if (selectedKeys.contains(ski)) {
// file descriptor may be polled more than once per poll // file descriptor may be polled more than once per poll
if (ski.lastPolled != pollCount) { if (ski.lastPolled != pollCount) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) { if (ski.translateAndSetReadyOps(rOps)) {
numKeysUpdated++; numKeysUpdated++;
ski.lastPolled = pollCount; ski.lastPolled = pollCount;
} }
} else { } else {
// ready ops have already been set on this update // ready ops have already been set on this update
ski.channel.translateAndUpdateReadyOps(rOps, ski); ski.translateAndUpdateReadyOps(rOps);
} }
} else { } else {
ski.channel.translateAndSetReadyOps(rOps, ski); ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski); selectedKeys.add(ski);
numKeysUpdated++; numKeysUpdated++;
...@@ -272,20 +261,12 @@ class KQueueSelectorImpl extends SelectorImpl { ...@@ -272,20 +261,12 @@ class KQueueSelectorImpl extends SelectorImpl {
FileDispatcherImpl.closeIntFD(fd1); FileDispatcherImpl.closeIntFD(fd1);
} }
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
@Override @Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid(); assert !ski.isValid();
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
int registeredEvents = ski.registeredEvents(); int registeredEvents = ski.registeredEvents();
if (fdToKey.remove(fd) != null) { if (fdToKey.remove(fd) != null) {
if (registeredEvents != 0) { if (registeredEvents != 0) {
...@@ -301,10 +282,9 @@ class KQueueSelectorImpl extends SelectorImpl { ...@@ -301,10 +282,9 @@ class KQueueSelectorImpl extends SelectorImpl {
} }
@Override @Override
public void putEventOps(SelectionKeyImpl ski, int events) { public void setEventOps(SelectionKeyImpl ski) {
ensureOpen(); ensureOpen();
synchronized (updateLock) { synchronized (updateLock) {
updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski); updateKeys.addLast(ski);
} }
} }
......
...@@ -49,7 +49,7 @@ public interface SelChImpl extends Channel { ...@@ -49,7 +49,7 @@ public interface SelChImpl extends Channel {
* contains at least one bit that the previous value did not * contains at least one bit that the previous value did not
* contain * contain
*/ */
boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk); boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski);
/** /**
* Sets the specified ops if present in interestOps. The specified * Sets the specified ops if present in interestOps. The specified
...@@ -59,7 +59,7 @@ public interface SelChImpl extends Channel { ...@@ -59,7 +59,7 @@ public interface SelChImpl extends Channel {
* contains at least one bit that the previous value did not * contains at least one bit that the previous value did not
* contain * contain
*/ */
boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk); boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski);
/** /**
* Translates an interest operation set into a native event set * Translates an interest operation set into a native event set
......
...@@ -39,7 +39,7 @@ import java.nio.channels.spi.AbstractSelectionKey; ...@@ -39,7 +39,7 @@ import java.nio.channels.spi.AbstractSelectionKey;
public final class SelectionKeyImpl public final class SelectionKeyImpl
extends AbstractSelectionKey extends AbstractSelectionKey
{ {
final SelChImpl channel; // package-private private final SelChImpl channel;
private final SelectorImpl selector; private final SelectorImpl selector;
private volatile int interestOps; private volatile int interestOps;
...@@ -61,6 +61,10 @@ public final class SelectionKeyImpl ...@@ -61,6 +61,10 @@ public final class SelectionKeyImpl
throw new CancelledKeyException(); throw new CancelledKeyException();
} }
int getFDVal() {
return channel.getFDVal();
}
@Override @Override
public SelectableChannel channel() { public SelectableChannel channel() {
return (SelectableChannel)channel; return (SelectableChannel)channel;
...@@ -103,8 +107,8 @@ public final class SelectionKeyImpl ...@@ -103,8 +107,8 @@ public final class SelectionKeyImpl
public SelectionKey nioInterestOps(int ops) { public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0) if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException(); throw new IllegalArgumentException();
selector.putEventOps(this, channel.translateInterestOps(ops));
interestOps = ops; interestOps = ops;
selector.setEventOps(this);
return this; return this;
} }
...@@ -112,6 +116,18 @@ public final class SelectionKeyImpl ...@@ -112,6 +116,18 @@ public final class SelectionKeyImpl
return interestOps; return interestOps;
} }
int translateInterestOps() {
return channel.translateInterestOps(interestOps);
}
boolean translateAndSetReadyOps(int ops) {
return channel.translateAndSetReadyOps(ops, this);
}
boolean translateAndUpdateReadyOps(int ops) {
return channel.translateAndUpdateReadyOps(ops, this);
}
void registeredEvents(int events) { void registeredEvents(int events) {
// assert Thread.holdsLock(selector); // assert Thread.holdsLock(selector);
this.registeredEvents = events; this.registeredEvents = events;
......
...@@ -64,17 +64,20 @@ public abstract class SelectorImpl ...@@ -64,17 +64,20 @@ public abstract class SelectorImpl
publicSelectedKeys = Util.ungrowableSet(selectedKeys); publicSelectedKeys = Util.ungrowableSet(selectedKeys);
} }
@Override private void ensureOpen() {
public final Set<SelectionKey> keys() {
if (!isOpen()) if (!isOpen())
throw new ClosedSelectorException(); throw new ClosedSelectorException();
}
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys; return publicKeys;
} }
@Override @Override
public final Set<SelectionKey> selectedKeys() { public final Set<SelectionKey> selectedKeys() {
if (!isOpen()) ensureOpen();
throw new ClosedSelectorException();
return publicSelectedKeys; return publicSelectedKeys;
} }
...@@ -112,8 +115,7 @@ public abstract class SelectorImpl ...@@ -112,8 +115,7 @@ public abstract class SelectorImpl
private int lockAndDoSelect(long timeout) throws IOException { private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) { synchronized (this) {
if (!isOpen()) ensureOpen();
throw new ClosedSelectorException();
synchronized (publicKeys) { synchronized (publicKeys) {
synchronized (publicSelectedKeys) { synchronized (publicSelectedKeys) {
return doSelect(timeout); return doSelect(timeout);
...@@ -176,7 +178,8 @@ public abstract class SelectorImpl ...@@ -176,7 +178,8 @@ public abstract class SelectorImpl
throw new IllegalSelectorException(); throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment); k.attach(attachment);
// register before adding to key set
// register with selector (if needed) before adding to key set
implRegister(k); implRegister(k);
synchronized (publicKeys) { synchronized (publicKeys) {
keys.add(k); keys.add(k);
...@@ -185,7 +188,15 @@ public abstract class SelectorImpl ...@@ -185,7 +188,15 @@ public abstract class SelectorImpl
return k; return k;
} }
protected abstract void implRegister(SelectionKeyImpl ski); /**
* Register the key in the selector.
*
* The default implementation checks if the selector is open. It should
* be overridden by selector implementations as needed.
*/
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
}
protected abstract void implDereg(SelectionKeyImpl ski) throws IOException; protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;
...@@ -222,5 +233,5 @@ public abstract class SelectorImpl ...@@ -222,5 +233,5 @@ public abstract class SelectorImpl
/** /**
* Change the event set in the selector * Change the event set in the selector
*/ */
protected abstract void putEventOps(SelectionKeyImpl ski, int events); protected abstract void setEventOps(SelectionKeyImpl ski);
} }
...@@ -55,17 +55,14 @@ class DevPollSelectorImpl ...@@ -55,17 +55,14 @@ class DevPollSelectorImpl
// maps file descriptor to selection key, synchronize on selector // maps file descriptor to selection key, synchronize on selector
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
// pending new registrations/updates, queued by implRegister and putEventOps // pending new registrations/updates, queued by setEventOps
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing // interrupt triggering and clearing
private final Object interruptLock = new Object(); private final Object interruptLock = new Object();
private boolean interruptTriggered; private boolean interruptTriggered;
DevPollSelectorImpl(SelectorProvider sp) throws IOException { DevPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp); super(sp);
this.pollWrapper = new DevPollArrayWrapper(); this.pollWrapper = new DevPollArrayWrapper();
...@@ -88,18 +85,34 @@ class DevPollSelectorImpl ...@@ -88,18 +85,34 @@ class DevPollSelectorImpl
} }
@Override @Override
protected int doSelect(long timeout) protected int doSelect(long timeout) throws IOException {
throws IOException
{
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
boolean blocking = (timeout != 0);
long to = timeout;
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);
int numEntries; int numEntries;
processUpdateQueue(); processUpdateQueue();
processDeregisterQueue(); processDeregisterQueue();
try { try {
begin(blocking); begin(blocking);
numEntries = pollWrapper.poll(timeout);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = pollWrapper.poll(to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);
} finally { } finally {
end(blocking); end(blocking);
} }
...@@ -108,7 +121,7 @@ class DevPollSelectorImpl ...@@ -108,7 +121,7 @@ class DevPollSelectorImpl
} }
/** /**
* Process new registrations and changes to the interest ops. * Process changes to the interest ops.
*/ */
private void processUpdateQueue() throws IOException { private void processUpdateQueue() throws IOException {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
...@@ -116,25 +129,18 @@ class DevPollSelectorImpl ...@@ -116,25 +129,18 @@ class DevPollSelectorImpl
synchronized (updateLock) { synchronized (updateLock) {
SelectionKeyImpl ski; SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
int fd = ski.channel.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski);
assert previous == null;
assert ski.registeredEvents() == 0;
}
}
// Translate the queued updates to changes to the set of monitored // Translate the queued updates to changes to the set of monitored
// file descriptors. The changes are written to the /dev/poll driver // file descriptors. The changes are written to the /dev/poll driver
// in bulk. // in bulk.
assert updateKeys.size() == updateEvents.size();
int index = 0; int index = 0;
while ((ski = updateKeys.pollFirst()) != null) { while ((ski = updateKeys.pollFirst()) != null) {
int newEvents = updateEvents.pollFirst(); if (ski.isValid()) {
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) { // add to fdToKey if needed
SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert (previous == null) || (previous == ski);
int newEvents = ski.translateInterestOps();
int registeredEvents = ski.registeredEvents(); int registeredEvents = ski.registeredEvents();
if (newEvents != registeredEvents) { if (newEvents != registeredEvents) {
if (registeredEvents != 0) if (registeredEvents != 0)
...@@ -178,11 +184,11 @@ class DevPollSelectorImpl ...@@ -178,11 +184,11 @@ class DevPollSelectorImpl
if (ski != null) { if (ski != null) {
int rOps = pollWrapper.getReventOps(i); int rOps = pollWrapper.getReventOps(i);
if (selectedKeys.contains(ski)) { if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) { if (ski.translateAndSetReadyOps(rOps)) {
numKeysUpdated++; numKeysUpdated++;
} }
} else { } else {
ski.channel.translateAndSetReadyOps(rOps, ski); ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski); selectedKeys.add(ski);
numKeysUpdated++; numKeysUpdated++;
...@@ -214,20 +220,13 @@ class DevPollSelectorImpl ...@@ -214,20 +220,13 @@ class DevPollSelectorImpl
FileDispatcherImpl.closeIntFD(fd1); FileDispatcherImpl.closeIntFD(fd1);
} }
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
@Override @Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid(); assert !ski.isValid();
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
if (fdToKey.remove(fd) != null) { if (fdToKey.remove(fd) != null) {
if (ski.registeredEvents() != 0) { if (ski.registeredEvents() != 0) {
pollWrapper.register(fd, POLLREMOVE); pollWrapper.register(fd, POLLREMOVE);
...@@ -239,10 +238,9 @@ class DevPollSelectorImpl ...@@ -239,10 +238,9 @@ class DevPollSelectorImpl
} }
@Override @Override
public void putEventOps(SelectionKeyImpl ski, int events) { public void setEventOps(SelectionKeyImpl ski) {
ensureOpen(); ensureOpen();
synchronized (updateLock) { synchronized (updateLock) {
updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski); updateKeys.addLast(ski);
} }
} }
......
...@@ -72,12 +72,10 @@ class EventPortSelectorImpl ...@@ -72,12 +72,10 @@ class EventPortSelectorImpl
// the last update operation, incremented by processUpdateQueue // the last update operation, incremented by processUpdateQueue
private int lastUpdate; private int lastUpdate;
// pending new registrations/updates, queued by implRegister, putEventOps, // pending new registrations/updates, queued by setEventOps and
// and updateSelectedKeys // updateSelectedKeys
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing // interrupt triggering and clearing
private final Object interruptLock = new Object(); private final Object interruptLock = new Object();
...@@ -146,23 +144,14 @@ class EventPortSelectorImpl ...@@ -146,23 +144,14 @@ class EventPortSelectorImpl
synchronized (updateLock) { synchronized (updateLock) {
SelectionKeyImpl ski; SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) {
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) { if (ski.isValid()) {
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski); // add to fdToKey if needed
assert previous == null; SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert ski.registeredEvents() == 0; assert (previous == null) || (previous == ski);
}
}
// changes to interest ops int newEvents = ski.translateInterestOps();
assert updateKeys.size() == updateEvents.size();
while ((ski = updateKeys.pollFirst()) != null) {
int newEvents = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) {
if (newEvents != ski.registeredEvents()) { if (newEvents != ski.registeredEvents()) {
if (newEvents == 0) { if (newEvents == 0) {
port_dissociate(pfd, PORT_SOURCE_FD, fd); port_dissociate(pfd, PORT_SOURCE_FD, fd);
...@@ -199,22 +188,20 @@ class EventPortSelectorImpl ...@@ -199,22 +188,20 @@ class EventPortSelectorImpl
if (ski != null) { if (ski != null) {
int rOps = getEventOps(i); int rOps = getEventOps(i);
if (selectedKeys.contains(ski)) { if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) { if (ski.translateAndSetReadyOps(rOps)) {
numKeysUpdated++; numKeysUpdated++;
} }
} else { } else {
ski.channel.translateAndSetReadyOps(rOps, ski); ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski); selectedKeys.add(ski);
numKeysUpdated++; numKeysUpdated++;
} }
} }
// re-queue key to head so that it is re-associated at // re-queue key so it re-associated at next select
// next select (and before other changes)
updateEvents.addFirst(ski.registeredEvents());
updateKeys.addFirst(ski);
ski.registeredEvents(0); ski.registeredEvents(0);
updateKeys.addLast(ski);
} }
} else if (source == PORT_SOURCE_USER) { } else if (source == PORT_SOURCE_USER) {
interrupted = true; interrupted = true;
...@@ -244,20 +231,12 @@ class EventPortSelectorImpl ...@@ -244,20 +231,12 @@ class EventPortSelectorImpl
pollArray.free(); pollArray.free();
} }
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
@Override @Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid(); assert !ski.isValid();
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
if (fdToKey.remove(fd) != null) { if (fdToKey.remove(fd) != null) {
if (ski.registeredEvents() != 0) { if (ski.registeredEvents() != 0) {
port_dissociate(pfd, PORT_SOURCE_FD, fd); port_dissociate(pfd, PORT_SOURCE_FD, fd);
...@@ -269,10 +248,9 @@ class EventPortSelectorImpl ...@@ -269,10 +248,9 @@ class EventPortSelectorImpl
} }
@Override @Override
public void putEventOps(SelectionKeyImpl ski, int events) { public void setEventOps(SelectionKeyImpl ski) {
ensureOpen(); ensureOpen();
synchronized (updateLock) { synchronized (updateLock) {
updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski); updateKeys.addLast(ski);
} }
} }
......
...@@ -23,84 +23,20 @@ ...@@ -23,84 +23,20 @@
* questions. * questions.
*/ */
#include <sys/devpoll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <poll.h>
#include "jni.h" #include "jni.h"
#include "jni_util.h" #include "jni_util.h"
#include "jvm.h" #include "jvm.h"
#include "jlong.h" #include "jlong.h"
#include "sun_nio_ch_DevPollArrayWrapper.h" #include "nio.h"
#include <poll.h> #include "nio_util.h"
#include <unistd.h>
#include <sys/time.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef uint32_t caddr32_t;
/* /dev/poll ioctl */
#define DPIOC (0xD0 << 8)
#define DP_POLL (DPIOC | 1) /* poll on fds in cached in /dev/poll */
#define DP_ISPOLLED (DPIOC | 2) /* is this fd cached in /dev/poll */
#define DEVPOLLSIZE 1000 /* /dev/poll table size increment */
#define POLLREMOVE 0x0800 /* Removes fd from monitored set */
/*
* /dev/poll DP_POLL ioctl format
*/
typedef struct dvpoll {
pollfd_t *dp_fds; /* pollfd array */
nfds_t dp_nfds; /* num of pollfd's in dp_fds[] */
int dp_timeout; /* time out in millisec */
} dvpoll_t;
typedef struct dvpoll32 {
caddr32_t dp_fds; /* pollfd array */
uint32_t dp_nfds; /* num of pollfd's in dp_fds[] */
int32_t dp_timeout; /* time out in millisec */
} dvpoll32_t;
#ifdef __cplusplus
}
#endif
#define RESTARTABLE(_cmd, _result) do { \
do { \
_result = _cmd; \
} while((_result == -1) && (errno == EINTR)); \
} while(0)
static int #include "sun_nio_ch_DevPollArrayWrapper.h"
idevpoll(jint wfd, int dpctl, struct dvpoll a)
{
jlong start, now;
int remaining = a.dp_timeout;
struct timeval t;
int diff;
gettimeofday(&t, NULL);
start = t.tv_sec * 1000 + t.tv_usec / 1000;
for (;;) {
/* poll(7d) ioctl does not return remaining count */
int res = ioctl(wfd, dpctl, &a);
if (res < 0 && errno == EINTR) {
if (remaining >= 0) {
gettimeofday(&t, NULL);
now = t.tv_sec * 1000 + t.tv_usec / 1000;
diff = now - start;
remaining -= diff;
if (diff < 0 || remaining <= 0) {
return 0;
}
start = now;
a.dp_timeout = remaining;
}
} else {
return res;
}
}
}
JNIEXPORT jint JNICALL JNIEXPORT jint JNICALL
Java_sun_nio_ch_DevPollArrayWrapper_init(JNIEnv *env, jobject this) Java_sun_nio_ch_DevPollArrayWrapper_init(JNIEnv *env, jobject this)
...@@ -153,26 +89,24 @@ Java_sun_nio_ch_DevPollArrayWrapper_registerMultiple(JNIEnv *env, jobject this, ...@@ -153,26 +89,24 @@ Java_sun_nio_ch_DevPollArrayWrapper_registerMultiple(JNIEnv *env, jobject this,
JNIEXPORT jint JNICALL JNIEXPORT jint JNICALL
Java_sun_nio_ch_DevPollArrayWrapper_poll0(JNIEnv *env, jobject this, Java_sun_nio_ch_DevPollArrayWrapper_poll0(JNIEnv *env, jobject this,
jlong address, jint numfds, jlong address, jint numfds,
jlong timeout, jint wfd) jlong timeout, jint wfd)
{ {
struct dvpoll a; struct dvpoll a;
void *pfd = (void *) jlong_to_ptr(address); void *pfd = (void *) jlong_to_ptr(address);
int result = 0; int result;
a.dp_fds = pfd; a.dp_fds = pfd;
a.dp_nfds = numfds; a.dp_nfds = numfds;
a.dp_timeout = (int)timeout; a.dp_timeout = (int)timeout;
result = ioctl(wfd, DP_POLL, &a);
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE (ioctl(wfd, DP_POLL, &a), result);
} else { /* Bounded wait; bounded restarts */
result = idevpoll(wfd, DP_POLL, a);
}
if (result < 0) { if (result < 0) {
JNU_ThrowIOExceptionWithLastError(env, "Error reading driver"); if (errno == EINTR) {
return -1; return IOS_INTERRUPTED;
} else {
JNU_ThrowIOExceptionWithLastError(env, "Error reading driver");
return IOS_THROWN;
}
} }
return result; return result;
} }
...@@ -60,7 +60,6 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -60,7 +60,6 @@ class PollSelectorImpl extends SelectorImpl {
// pending updates, queued by putEventOps // pending updates, queued by putEventOps
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateEvents = new ArrayDeque<>();
// interrupt triggering and clearing // interrupt triggering and clearing
private final Object interruptLock = new Object(); private final Object interruptLock = new Object();
...@@ -136,10 +135,9 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -136,10 +135,9 @@ class PollSelectorImpl extends SelectorImpl {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
synchronized (updateLock) { synchronized (updateLock) {
assert updateKeys.size() == updateEvents.size();
SelectionKeyImpl ski; SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) { while ((ski = updateKeys.pollFirst()) != null) {
int newEvents = updateEvents.pollFirst(); int newEvents = ski.translateInterestOps();
if (ski.isValid()) { if (ski.isValid()) {
int index = ski.getIndex(); int index = ski.getIndex();
assert index >= 0 && index < pollArraySize; assert index >= 0 && index < pollArraySize;
...@@ -173,14 +171,14 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -173,14 +171,14 @@ class PollSelectorImpl extends SelectorImpl {
int rOps = getReventOps(i); int rOps = getReventOps(i);
if (rOps != 0) { if (rOps != 0) {
SelectionKeyImpl ski = pollKeys.get(i); SelectionKeyImpl ski = pollKeys.get(i);
assert ski.channel.getFDVal() == getDescriptor(i); assert ski.getFDVal() == getDescriptor(i);
if (ski.isValid()) { if (ski.isValid()) {
if (selectedKeys.contains(ski)) { if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) { if (ski.translateAndSetReadyOps(rOps)) {
numKeysUpdated++; numKeysUpdated++;
} }
} else { } else {
ski.channel.translateAndSetReadyOps(rOps, ski); ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski); selectedKeys.add(ski);
numKeysUpdated++; numKeysUpdated++;
...@@ -233,10 +231,9 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -233,10 +231,9 @@ class PollSelectorImpl extends SelectorImpl {
} }
@Override @Override
public void putEventOps(SelectionKeyImpl ski, int events) { public void setEventOps(SelectionKeyImpl ski) {
ensureOpen(); ensureOpen();
synchronized (updateLock) { synchronized (updateLock) {
updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski); updateKeys.addLast(ski);
} }
} }
...@@ -285,7 +282,7 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -285,7 +282,7 @@ class PollSelectorImpl extends SelectorImpl {
int index = pollArraySize; int index = pollArraySize;
assert index > 0; assert index > 0;
putDescriptor(index, ski.channel.getFDVal()); putDescriptor(index, ski.getFDVal());
putEventOps(index, ops); putEventOps(index, ops);
putReventOps(index, 0); putReventOps(index, 0);
ski.setIndex(index); ski.setIndex(index);
...@@ -301,7 +298,7 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -301,7 +298,7 @@ class PollSelectorImpl extends SelectorImpl {
private void update(SelectionKeyImpl ski, int ops) { private void update(SelectionKeyImpl ski, int ops) {
int index = ski.getIndex(); int index = ski.getIndex();
assert index > 0 && index < pollArraySize; assert index > 0 && index < pollArraySize;
assert getDescriptor(index) == ski.channel.getFDVal(); assert getDescriptor(index) == ski.getFDVal();
putEventOps(index, ops); putEventOps(index, ops);
} }
...@@ -311,7 +308,7 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -311,7 +308,7 @@ class PollSelectorImpl extends SelectorImpl {
private void remove(SelectionKeyImpl ski) { private void remove(SelectionKeyImpl ski) {
int index = ski.getIndex(); int index = ski.getIndex();
assert index > 0 && index < pollArraySize; assert index > 0 && index < pollArraySize;
assert getDescriptor(index) == ski.channel.getFDVal(); assert getDescriptor(index) == ski.getFDVal();
// replace pollfd at index with the last pollfd in array // replace pollfd at index with the last pollfd in array
int lastIndex = pollArraySize - 1; int lastIndex = pollArraySize - 1;
...@@ -321,7 +318,7 @@ class PollSelectorImpl extends SelectorImpl { ...@@ -321,7 +318,7 @@ class PollSelectorImpl extends SelectorImpl {
int lastFd = getDescriptor(lastIndex); int lastFd = getDescriptor(lastIndex);
int lastOps = getEventOps(lastIndex); int lastOps = getEventOps(lastIndex);
int lastRevents = getReventOps(lastIndex); int lastRevents = getReventOps(lastIndex);
assert lastKey.channel.getFDVal() == lastFd; assert lastKey.getFDVal() == lastFd;
putDescriptor(index, lastFd); putDescriptor(index, lastFd);
putEventOps(index, lastOps); putEventOps(index, lastOps);
putReventOps(index, lastRevents); putReventOps(index, lastRevents);
......
...@@ -64,7 +64,7 @@ class PollArrayWrapper { ...@@ -64,7 +64,7 @@ class PollArrayWrapper {
// Prepare another pollfd struct for use. // Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) { void putEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.channel.getFDVal()); putDescriptor(index, ski.getFDVal());
putEventOps(index, 0); putEventOps(index, 0);
} }
......
...@@ -83,12 +83,12 @@ class WindowsSelectorImpl extends SelectorImpl { ...@@ -83,12 +83,12 @@ class WindowsSelectorImpl extends SelectorImpl {
return get(Integer.valueOf(desc)); return get(Integer.valueOf(desc));
} }
private MapEntry put(SelectionKeyImpl ski) { private MapEntry put(SelectionKeyImpl ski) {
return put(Integer.valueOf(ski.channel.getFDVal()), new MapEntry(ski)); return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
} }
private MapEntry remove(SelectionKeyImpl ski) { private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = Integer.valueOf(ski.channel.getFDVal()); Integer fd = Integer.valueOf(ski.getFDVal());
MapEntry x = get(fd); MapEntry x = get(fd);
if ((x != null) && (x.ski.channel == ski.channel)) if ((x != null) && (x.ski.channel() == ski.channel()))
return remove(fd); return remove(fd);
return null; return null;
} }
...@@ -114,11 +114,10 @@ class WindowsSelectorImpl extends SelectorImpl { ...@@ -114,11 +114,10 @@ class WindowsSelectorImpl extends SelectorImpl {
private final Object interruptLock = new Object(); private final Object interruptLock = new Object();
private volatile boolean interruptTriggered; private volatile boolean interruptTriggered;
// pending new registrations/updates, queued by implRegister and putEventOps // pending new registrations/updates, queued by implRegister and setEventOps
private final Object updateLock = new Object(); private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateEvents = new ArrayDeque<>();
WindowsSelectorImpl(SelectorProvider sp) throws IOException { WindowsSelectorImpl(SelectorProvider sp) throws IOException {
...@@ -204,10 +203,9 @@ class WindowsSelectorImpl extends SelectorImpl { ...@@ -204,10 +203,9 @@ class WindowsSelectorImpl extends SelectorImpl {
} }
// changes to interest ops // changes to interest ops
assert updateKeys.size() == updateEvents.size();
while ((ski = updateKeys.pollFirst()) != null) { while ((ski = updateKeys.pollFirst()) != null) {
int events = updateEvents.pollFirst(); int events = ski.translateInterestOps();
int fd = ski.channel.getFDVal(); int fd = ski.getFDVal();
if (ski.isValid() && fdMap.containsKey(fd)) { if (ski.isValid() && fdMap.containsKey(fd)) {
int index = ski.getIndex(); int index = ski.getIndex();
assert index >= 0 && index < totalChannels; assert index >= 0 && index < totalChannels;
...@@ -408,13 +406,13 @@ class WindowsSelectorImpl extends SelectorImpl { ...@@ -408,13 +406,13 @@ class WindowsSelectorImpl extends SelectorImpl {
if (selectedKeys.contains(sk)) { // Key in selected set if (selectedKeys.contains(sk)) { // Key in selected set
if (me.clearedCount != updateCount) { if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) && if (sk.translateAndSetReadyOps(rOps) &&
(me.updateCount != updateCount)) { (me.updateCount != updateCount)) {
me.updateCount = updateCount; me.updateCount = updateCount;
numKeysUpdated++; numKeysUpdated++;
} }
} else { // The readyOps have been set; now add } else { // The readyOps have been set; now add
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && if (sk.translateAndUpdateReadyOps(rOps) &&
(me.updateCount != updateCount)) { (me.updateCount != updateCount)) {
me.updateCount = updateCount; me.updateCount = updateCount;
numKeysUpdated++; numKeysUpdated++;
...@@ -423,14 +421,14 @@ class WindowsSelectorImpl extends SelectorImpl { ...@@ -423,14 +421,14 @@ class WindowsSelectorImpl extends SelectorImpl {
me.clearedCount = updateCount; me.clearedCount = updateCount;
} else { // Key is not in selected set yet } else { // Key is not in selected set yet
if (me.clearedCount != updateCount) { if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk); sk.translateAndSetReadyOps(rOps);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk); selectedKeys.add(sk);
me.updateCount = updateCount; me.updateCount = updateCount;
numKeysUpdated++; numKeysUpdated++;
} }
} else { // The readyOps have been set; now add } else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk); sk.translateAndUpdateReadyOps(rOps);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk); selectedKeys.add(sk);
me.updateCount = updateCount; me.updateCount = updateCount;
...@@ -613,10 +611,9 @@ class WindowsSelectorImpl extends SelectorImpl { ...@@ -613,10 +611,9 @@ class WindowsSelectorImpl extends SelectorImpl {
} }
@Override @Override
public void putEventOps(SelectionKeyImpl ski, int events) { public void setEventOps(SelectionKeyImpl ski) {
ensureOpen(); ensureOpen();
synchronized (updateLock) { synchronized (updateLock) {
updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski); updateKeys.addLast(ski);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册