提交 a0fc2f27 编写于 作者: D dl

7005424: Resync java.util.concurrent classes with Dougs CVS - Jan 2011

Reviewed-by: dholmes, chegar, mduigou
上级 475dfb56
...@@ -1452,10 +1452,10 @@ public class Collections { ...@@ -1452,10 +1452,10 @@ public class Collections {
* when o is a Map.Entry, and calls o.setValue. * when o is a Map.Entry, and calls o.setValue.
*/ */
public boolean containsAll(Collection<?> coll) { public boolean containsAll(Collection<?> coll) {
Iterator<?> it = coll.iterator(); for (Object e : coll) {
while (it.hasNext()) if (!contains(e)) // Invokes safe contains() above
if (!contains(it.next())) // Invokes safe contains() above
return false; return false;
}
return true; return true;
} }
public boolean equals(Object o) { public boolean equals(Object o) {
......
...@@ -26,9 +26,9 @@ ...@@ -26,9 +26,9 @@
package java.util; package java.util;
/** /**
* Linked list implementation of the {@link List} and {@link Deque} interfaces. * Doubly-linked list implementation of the {@code List} and {@code Deque}
* Implements all optional operations, and permits all elements (including * interfaces. Implements all optional list operations, and permits all
* {@code null}). * elements (including {@code null}).
* *
* <p>All of the operations perform as could be expected for a doubly-linked * <p>All of the operations perform as could be expected for a doubly-linked
* list. Operations that index into the list will traverse the list from * list. Operations that index into the list will traverse the list from
......
...@@ -869,6 +869,8 @@ public class ConcurrentLinkedDeque<E> ...@@ -869,6 +869,8 @@ public class ConcurrentLinkedDeque<E>
/** /**
* Inserts the specified element at the front of this deque. * Inserts the specified element at the front of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException}.
* *
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
*/ */
...@@ -878,6 +880,8 @@ public class ConcurrentLinkedDeque<E> ...@@ -878,6 +880,8 @@ public class ConcurrentLinkedDeque<E>
/** /**
* Inserts the specified element at the end of this deque. * Inserts the specified element at the end of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException}.
* *
* <p>This method is equivalent to {@link #add}. * <p>This method is equivalent to {@link #add}.
* *
...@@ -889,8 +893,9 @@ public class ConcurrentLinkedDeque<E> ...@@ -889,8 +893,9 @@ public class ConcurrentLinkedDeque<E>
/** /**
* Inserts the specified element at the front of this deque. * Inserts the specified element at the front of this deque.
* As the deque is unbounded, this method will never return {@code false}.
* *
* @return {@code true} always * @return {@code true} (as specified by {@link Deque#offerFirst})
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
*/ */
public boolean offerFirst(E e) { public boolean offerFirst(E e) {
...@@ -900,10 +905,11 @@ public class ConcurrentLinkedDeque<E> ...@@ -900,10 +905,11 @@ public class ConcurrentLinkedDeque<E>
/** /**
* Inserts the specified element at the end of this deque. * Inserts the specified element at the end of this deque.
* As the deque is unbounded, this method will never return {@code false}.
* *
* <p>This method is equivalent to {@link #add}. * <p>This method is equivalent to {@link #add}.
* *
* @return {@code true} always * @return {@code true} (as specified by {@link Deque#offerLast})
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
*/ */
public boolean offerLast(E e) { public boolean offerLast(E e) {
...@@ -983,6 +989,7 @@ public class ConcurrentLinkedDeque<E> ...@@ -983,6 +989,7 @@ public class ConcurrentLinkedDeque<E>
/** /**
* Inserts the specified element at the tail of this deque. * Inserts the specified element at the tail of this deque.
* As the deque is unbounded, this method will never return {@code false}.
* *
* @return {@code true} (as specified by {@link Queue#offer}) * @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
...@@ -993,6 +1000,8 @@ public class ConcurrentLinkedDeque<E> ...@@ -993,6 +1000,8 @@ public class ConcurrentLinkedDeque<E>
/** /**
* Inserts the specified element at the tail of this deque. * Inserts the specified element at the tail of this deque.
* As the deque is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
* *
* @return {@code true} (as specified by {@link Collection#add}) * @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
......
...@@ -269,6 +269,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> ...@@ -269,6 +269,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
/** /**
* Inserts the specified element at the tail of this queue. * Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
* *
* @return {@code true} (as specified by {@link Collection#add}) * @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
...@@ -298,6 +300,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> ...@@ -298,6 +300,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
/** /**
* Inserts the specified element at the tail of this queue. * Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
* *
* @return {@code true} (as specified by {@link Queue#offer}) * @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
......
...@@ -374,17 +374,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -374,17 +374,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
null, null, 1); null, null, 1);
} }
/** Updater for casHead */
private static final
AtomicReferenceFieldUpdater<ConcurrentSkipListMap, HeadIndex>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(ConcurrentSkipListMap.class, HeadIndex.class, "head");
/** /**
* compareAndSet head node * compareAndSet head node
*/ */
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) { private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) {
return headUpdater.compareAndSet(this, cmp, val); return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
} }
/* ---------------- Nodes -------------- */ /* ---------------- Nodes -------------- */
...@@ -423,28 +417,18 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -423,28 +417,18 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
this.next = next; this.next = next;
} }
/** Updater for casNext */
static final AtomicReferenceFieldUpdater<Node, Node>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(Node.class, Node.class, "next");
/** Updater for casValue */
static final AtomicReferenceFieldUpdater<Node, Object>
valueUpdater = AtomicReferenceFieldUpdater.newUpdater
(Node.class, Object.class, "value");
/** /**
* compareAndSet value field * compareAndSet value field
*/ */
boolean casValue(Object cmp, Object val) { boolean casValue(Object cmp, Object val) {
return valueUpdater.compareAndSet(this, cmp, val); return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
} }
/** /**
* compareAndSet next field * compareAndSet next field
*/ */
boolean casNext(Node<K,V> cmp, Node<K,V> val) { boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return nextUpdater.compareAndSet(this, cmp, val); return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
} }
/** /**
...@@ -522,6 +506,14 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -522,6 +506,14 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
return null; return null;
return new AbstractMap.SimpleImmutableEntry<K,V>(key, v); return new AbstractMap.SimpleImmutableEntry<K,V>(key, v);
} }
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long valueOffset =
objectFieldOffset(UNSAFE, "value", Node.class);
private static final long nextOffset =
objectFieldOffset(UNSAFE, "next", Node.class);
} }
/* ---------------- Indexing -------------- */ /* ---------------- Indexing -------------- */
...@@ -547,16 +539,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -547,16 +539,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
this.right = right; this.right = right;
} }
/** Updater for casRight */
static final AtomicReferenceFieldUpdater<Index, Index>
rightUpdater = AtomicReferenceFieldUpdater.newUpdater
(Index.class, Index.class, "right");
/** /**
* compareAndSet right field * compareAndSet right field
*/ */
final boolean casRight(Index<K,V> cmp, Index<K,V> val) { final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
return rightUpdater.compareAndSet(this, cmp, val); return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
} }
/** /**
...@@ -591,6 +578,12 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -591,6 +578,12 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
final boolean unlink(Index<K,V> succ) { final boolean unlink(Index<K,V> succ) {
return !indexesDeletedNode() && casRight(succ, succ.right); return !indexesDeletedNode() && casRight(succ, succ.right);
} }
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long rightOffset =
objectFieldOffset(UNSAFE, "right", Index.class);
} }
/* ---------------- Head nodes -------------- */ /* ---------------- Head nodes -------------- */
...@@ -640,7 +633,8 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -640,7 +633,8 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
* cast key as Comparable, which may cause ClassCastException, * cast key as Comparable, which may cause ClassCastException,
* which is propagated back to caller. * which is propagated back to caller.
*/ */
private Comparable<? super K> comparable(Object key) throws ClassCastException { private Comparable<? super K> comparable(Object key)
throws ClassCastException {
if (key == null) if (key == null)
throw new NullPointerException(); throw new NullPointerException();
if (comparator != null) if (comparator != null)
...@@ -799,68 +793,12 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -799,68 +793,12 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
} }
/** /**
* Specialized variant of findNode to perform Map.get. Does a weak * Gets value for key using findNode.
* traversal, not bothering to fix any deleted index nodes,
* returning early if it happens to see key in index, and passing
* over any deleted base nodes, falling back to getUsingFindNode
* only if it would otherwise return value from an ongoing
* deletion. Also uses "bound" to eliminate need for some
* comparisons (see Pugh Cookbook). Also folds uses of null checks
* and node-skipping because markers have null keys.
* @param okey the key * @param okey the key
* @return the value, or null if absent * @return the value, or null if absent
*/ */
private V doGet(Object okey) { private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey); Comparable<? super K> key = comparable(okey);
Node<K,V> bound = null;
Index<K,V> q = head;
Index<K,V> r = q.right;
Node<K,V> n;
K k;
int c;
for (;;) {
Index<K,V> d;
// Traverse rights
if (r != null && (n = r.node) != bound && (k = n.key) != null) {
if ((c = key.compareTo(k)) > 0) {
q = r;
r = r.right;
continue;
} else if (c == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else
bound = n;
}
// Traverse down
if ((d = q.down) != null) {
q = d;
r = d.right;
} else
break;
}
// Traverse nexts
for (n = q.node.next; n != null; n = n.next) {
if ((k = n.key) != null) {
if ((c = key.compareTo(k)) == 0) {
Object v = n.value;
return (v != null)? (V)v : getUsingFindNode(key);
} else if (c < 0)
break;
}
}
return null;
}
/**
* Performs map.get via findNode. Used as a backup if doGet
* encounters an in-progress deletion.
* @param key the key
* @return the value, or null if absent
*/
private V getUsingFindNode(Comparable<? super K> key) {
/* /*
* Loop needed here and elsewhere in case value field goes * Loop needed here and elsewhere in case value field goes
* null just as it is about to be returned, in which case we * null just as it is about to be returned, in which case we
...@@ -943,7 +881,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -943,7 +881,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
x ^= x << 13; x ^= x << 13;
x ^= x >>> 17; x ^= x >>> 17;
randomSeed = x ^= x << 5; randomSeed = x ^= x << 5;
if ((x & 0x8001) != 0) // test highest and lowest bits if ((x & 0x80000001) != 0) // test highest and lowest bits
return 0; return 0;
int level = 1; int level = 1;
while (((x >>>= 1) & 1) != 0) ++level; while (((x >>>= 1) & 1) != 0) ++level;
...@@ -1256,7 +1194,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -1256,7 +1194,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
Node<K,V> n = b.next; Node<K,V> n = b.next;
for (;;) { for (;;) {
if (n == null) if (n == null)
return (b.isBaseHeader())? null : b; return b.isBaseHeader() ? null : b;
Node<K,V> f = n.next; // inconsistent read Node<K,V> f = n.next; // inconsistent read
if (n != b.next) if (n != b.next)
break; break;
...@@ -1374,7 +1312,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -1374,7 +1312,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
Node<K,V> n = b.next; Node<K,V> n = b.next;
for (;;) { for (;;) {
if (n == null) if (n == null)
return ((rel & LT) == 0 || b.isBaseHeader())? null : b; return ((rel & LT) == 0 || b.isBaseHeader()) ? null : b;
Node<K,V> f = n.next; Node<K,V> f = n.next;
if (n != b.next) // inconsistent read if (n != b.next) // inconsistent read
break; break;
...@@ -1390,7 +1328,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -1390,7 +1328,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
(c < 0 && (rel & LT) == 0)) (c < 0 && (rel & LT) == 0))
return n; return n;
if ( c <= 0 && (rel & LT) != 0) if ( c <= 0 && (rel & LT) != 0)
return (b.isBaseHeader())? null : b; return b.isBaseHeader() ? null : b;
b = n; b = n;
n = f; n = f;
} }
...@@ -1744,7 +1682,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -1744,7 +1682,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
if (n.getValidValue() != null) if (n.getValidValue() != null)
++count; ++count;
} }
return (count >= Integer.MAX_VALUE)? Integer.MAX_VALUE : (int)count; return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
} }
/** /**
...@@ -2099,7 +2037,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2099,7 +2037,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/ */
public K lowerKey(K key) { public K lowerKey(K key) {
Node<K,V> n = findNear(key, LT); Node<K,V> n = findNear(key, LT);
return (n == null)? null : n.key; return (n == null) ? null : n.key;
} }
/** /**
...@@ -2123,7 +2061,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2123,7 +2061,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/ */
public K floorKey(K key) { public K floorKey(K key) {
Node<K,V> n = findNear(key, LT|EQ); Node<K,V> n = findNear(key, LT|EQ);
return (n == null)? null : n.key; return (n == null) ? null : n.key;
} }
/** /**
...@@ -2145,7 +2083,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2145,7 +2083,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/ */
public K ceilingKey(K key) { public K ceilingKey(K key) {
Node<K,V> n = findNear(key, GT|EQ); Node<K,V> n = findNear(key, GT|EQ);
return (n == null)? null : n.key; return (n == null) ? null : n.key;
} }
/** /**
...@@ -2169,7 +2107,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2169,7 +2107,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
*/ */
public K higherKey(K key) { public K higherKey(K key) {
Node<K,V> n = findNear(key, GT); Node<K,V> n = findNear(key, GT);
return (n == null)? null : n.key; return (n == null) ? null : n.key;
} }
/** /**
...@@ -2342,7 +2280,8 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2342,7 +2280,8 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
return list; return list;
} }
static final class KeySet<E> extends AbstractSet<E> implements NavigableSet<E> { static final class KeySet<E>
extends AbstractSet<E> implements NavigableSet<E> {
private final ConcurrentNavigableMap<E,Object> m; private final ConcurrentNavigableMap<E,Object> m;
KeySet(ConcurrentNavigableMap<E,Object> map) { m = map; } KeySet(ConcurrentNavigableMap<E,Object> map) { m = map; }
public int size() { return m.size(); } public int size() { return m.size(); }
...@@ -2359,11 +2298,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2359,11 +2298,11 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
public E last() { return m.lastKey(); } public E last() { return m.lastKey(); }
public E pollFirst() { public E pollFirst() {
Map.Entry<E,Object> e = m.pollFirstEntry(); Map.Entry<E,Object> e = m.pollFirstEntry();
return e == null? null : e.getKey(); return (e == null) ? null : e.getKey();
} }
public E pollLast() { public E pollLast() {
Map.Entry<E,Object> e = m.pollLastEntry(); Map.Entry<E,Object> e = m.pollLastEntry();
return e == null? null : e.getKey(); return (e == null) ? null : e.getKey();
} }
public Iterator<E> iterator() { public Iterator<E> iterator() {
if (m instanceof ConcurrentSkipListMap) if (m instanceof ConcurrentSkipListMap)
...@@ -2710,9 +2649,9 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2710,9 +2649,9 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
rel &= ~m.LT; rel &= ~m.LT;
} }
if (tooLow(key)) if (tooLow(key))
return ((rel & m.LT) != 0)? null : lowestEntry(); return ((rel & m.LT) != 0) ? null : lowestEntry();
if (tooHigh(key)) if (tooHigh(key))
return ((rel & m.LT) != 0)? highestEntry() : null; return ((rel & m.LT) != 0) ? highestEntry() : null;
for (;;) { for (;;) {
Node<K,V> n = m.findNear(key, rel); Node<K,V> n = m.findNear(key, rel);
if (n == null || !inBounds(n.key)) if (n == null || !inBounds(n.key))
...@@ -2783,7 +2722,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2783,7 +2722,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
public V remove(Object key) { public V remove(Object key) {
K k = (K)key; K k = (K)key;
return (!inBounds(k))? null : m.remove(k); return (!inBounds(k)) ? null : m.remove(k);
} }
public int size() { public int size() {
...@@ -2794,7 +2733,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2794,7 +2733,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
if (n.getValidValue() != null) if (n.getValidValue() != null)
++count; ++count;
} }
return count >= Integer.MAX_VALUE? Integer.MAX_VALUE : (int)count; return count >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)count;
} }
public boolean isEmpty() { public boolean isEmpty() {
...@@ -2972,27 +2911,27 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -2972,27 +2911,27 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
} }
public K firstKey() { public K firstKey() {
return isDescending? highestKey() : lowestKey(); return isDescending ? highestKey() : lowestKey();
} }
public K lastKey() { public K lastKey() {
return isDescending? lowestKey() : highestKey(); return isDescending ? lowestKey() : highestKey();
} }
public Map.Entry<K,V> firstEntry() { public Map.Entry<K,V> firstEntry() {
return isDescending? highestEntry() : lowestEntry(); return isDescending ? highestEntry() : lowestEntry();
} }
public Map.Entry<K,V> lastEntry() { public Map.Entry<K,V> lastEntry() {
return isDescending? lowestEntry() : highestEntry(); return isDescending ? lowestEntry() : highestEntry();
} }
public Map.Entry<K,V> pollFirstEntry() { public Map.Entry<K,V> pollFirstEntry() {
return isDescending? removeHighest() : removeLowest(); return isDescending ? removeHighest() : removeLowest();
} }
public Map.Entry<K,V> pollLastEntry() { public Map.Entry<K,V> pollLastEntry() {
return isDescending? removeLowest() : removeHighest(); return isDescending ? removeLowest() : removeHighest();
} }
/* ---------------- Submap Views -------------- */ /* ---------------- Submap Views -------------- */
...@@ -3141,4 +3080,22 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> ...@@ -3141,4 +3080,22 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
} }
} }
} }
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long headOffset =
objectFieldOffset(UNSAFE, "head", ConcurrentSkipListMap.class);
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
} }
...@@ -832,7 +832,7 @@ public class CopyOnWriteArrayList<E> ...@@ -832,7 +832,7 @@ public class CopyOnWriteArrayList<E>
} }
/** /**
* Save the state of the list to a stream (i.e., serialize it). * Saves the state of the list to a stream (that is, serializes it).
* *
* @serialData The length of the array backing the list is emitted * @serialData The length of the array backing the list is emitted
* (int), followed by all of its elements (each an Object) * (int), followed by all of its elements (each an Object)
...@@ -842,27 +842,25 @@ public class CopyOnWriteArrayList<E> ...@@ -842,27 +842,25 @@ public class CopyOnWriteArrayList<E>
private void writeObject(java.io.ObjectOutputStream s) private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException{ throws java.io.IOException{
// Write out element count, and any hidden stuff
s.defaultWriteObject(); s.defaultWriteObject();
Object[] elements = getArray(); Object[] elements = getArray();
int len = elements.length;
// Write out array length // Write out array length
s.writeInt(len); s.writeInt(elements.length);
// Write out all elements in the proper order. // Write out all elements in the proper order.
for (int i = 0; i < len; i++) for (Object element : elements)
s.writeObject(elements[i]); s.writeObject(element);
} }
/** /**
* Reconstitute the list from a stream (i.e., deserialize it). * Reconstitutes the list from a stream (that is, deserializes it).
*
* @param s the stream * @param s the stream
*/ */
private void readObject(java.io.ObjectInputStream s) private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException { throws java.io.IOException, ClassNotFoundException {
// Read in size, and any hidden stuff
s.defaultReadObject(); s.defaultReadObject();
// bind to new lock // bind to new lock
......
...@@ -526,7 +526,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -526,7 +526,7 @@ public class ForkJoinPool extends AbstractExecutorService {
private volatile long eventWaiters; private volatile long eventWaiters;
private static final int EVENT_COUNT_SHIFT = 32; private static final int EVENT_COUNT_SHIFT = 32;
private static final long WAITER_ID_MASK = (1L << 16) - 1L; private static final int WAITER_ID_MASK = (1 << 16) - 1;
/** /**
* A counter for events that may wake up worker threads: * A counter for events that may wake up worker threads:
...@@ -615,7 +615,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -615,7 +615,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// are usually manually inlined by callers // are usually manually inlined by callers
/** /**
* Increments running count part of workerCounts * Increments running count part of workerCounts.
*/ */
final void incrementRunningCount() { final void incrementRunningCount() {
int c; int c;
...@@ -625,7 +625,17 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -625,7 +625,17 @@ public class ForkJoinPool extends AbstractExecutorService {
} }
/** /**
* Tries to decrement running count unless already zero * Tries to increment running count part of workerCounts.
*/
final boolean tryIncrementRunningCount() {
int c;
return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
c = workerCounts,
c + ONE_RUNNING);
}
/**
* Tries to decrement running count unless already zero.
*/ */
final boolean tryDecrementRunningCount() { final boolean tryDecrementRunningCount() {
int wc = workerCounts; int wc = workerCounts;
...@@ -698,10 +708,11 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -698,10 +708,11 @@ public class ForkJoinPool extends AbstractExecutorService {
for (k = 0; k < n && ws[k] != null; ++k) for (k = 0; k < n && ws[k] != null; ++k)
; ;
if (k == n) if (k == n)
ws = Arrays.copyOf(ws, n << 1); ws = workers = Arrays.copyOf(ws, n << 1);
} }
ws[k] = w; ws[k] = w;
workers = ws; // volatile array write ensures slot visibility int c = eventCount; // advance event count to ensure visibility
UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
...@@ -734,7 +745,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -734,7 +745,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/ */
final void workerTerminated(ForkJoinWorkerThread w) { final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w); forgetWorker(w);
decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL); decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w); tryAccumulateStealCount(w);
tryTerminate(false); tryTerminate(false);
...@@ -746,24 +757,23 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -746,24 +757,23 @@ public class ForkJoinPool extends AbstractExecutorService {
* Releases workers blocked on a count not equal to current count. * Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to * Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or * avoid wasted array checks. Gives up upon a change in count or
* upon releasing two workers, letting others take over. * upon releasing four workers, letting others take over.
*/ */
private void releaseEventWaiters() { private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers; ForkJoinWorkerThread[] ws = workers;
int n = ws.length; int n = ws.length;
long h = eventWaiters; long h = eventWaiters;
int ec = eventCount; int ec = eventCount;
boolean releasedOne = false; int releases = 4;
ForkJoinWorkerThread w; int id; ForkJoinWorkerThread w; int id;
while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
(int)(h >>> EVENT_COUNT_SHIFT) != ec && (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
id < n && (w = ws[id]) != null) { id < n && (w = ws[id]) != null) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
h, w.nextWaiter)) { h, w.nextWaiter)) {
LockSupport.unpark(w); LockSupport.unpark(w);
if (releasedOne) // exit on second release if (--releases == 0)
break; break;
releasedOne = true;
} }
if (eventCount != ec) if (eventCount != ec)
break; break;
...@@ -793,7 +803,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -793,7 +803,7 @@ public class ForkJoinPool extends AbstractExecutorService {
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h; long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) && while ((runState < SHUTDOWN || !tryTerminate(false)) &&
(((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 || (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
(int)(h >>> EVENT_COUNT_SHIFT) == ec) && (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
eventCount == ec) { eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
...@@ -820,9 +830,9 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -820,9 +830,9 @@ public class ForkJoinPool extends AbstractExecutorService {
if (tryAccumulateStealCount(w)) { // transfer while idle if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L || boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1); (workerCounts & RUNNING_COUNT_MASK) <= 1);
long startTime = untimed? 0 : System.nanoTime(); long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt Thread.interrupted(); // clear/ignore interrupt
if (eventCount != ec || w.isTerminating()) if (w.isTerminating() || eventCount != ec)
break; // recheck after clear break; // recheck after clear
if (untimed) if (untimed)
LockSupport.park(w); LockSupport.park(w);
...@@ -860,7 +870,8 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -860,7 +870,8 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((sw = spareWaiters) != 0 && if ((sw = spareWaiters) != 0 &&
(id = (sw & SPARE_ID_MASK) - 1) >= 0 && (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null && id < n && (w = ws[id]) != null &&
(workerCounts & RUNNING_COUNT_MASK) < parallelism && (runState >= TERMINATING ||
(workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
spareWaiters == sw && spareWaiters == sw &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset, UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) { sw, w.nextSpare)) {
...@@ -914,14 +925,10 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -914,14 +925,10 @@ public class ForkJoinPool extends AbstractExecutorService {
break; break;
} }
w.start(recordWorker(w), ueh); w.start(recordWorker(w), ueh);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
int c; // advance event count
UNSAFE.compareAndSwapInt(this, eventCountOffset,
c = eventCount, c+1);
break; // add at most one unless total below target break; // add at most one unless total below target
} }
} }
}
if (eventWaiters != 0L) if (eventWaiters != 0L)
releaseEventWaiters(); releaseEventWaiters();
} }
...@@ -955,7 +962,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -955,7 +962,7 @@ public class ForkJoinPool extends AbstractExecutorService {
} }
else if ((h = eventWaiters) != 0L) { else if ((h = eventWaiters) != 0L) {
long nh; long nh;
int id = ((int)(h & WAITER_ID_MASK)) - 1; int id = (((int)h) & WAITER_ID_MASK) - 1;
if (id >= 0 && id < n && (w = ws[id]) != null && if (id >= 0 && id < n && (w = ws[id]) != null &&
(nh = w.nextWaiter) != 0L && // keep at least one worker (nh = w.nextWaiter) != 0L && // keep at least one worker
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
...@@ -1008,19 +1015,26 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1008,19 +1015,26 @@ public class ForkJoinPool extends AbstractExecutorService {
break; break;
} }
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
inactivate = active = w.active = false; inactivate = active = w.active = false;
int wc = workerCounts; if (rs == SHUTDOWN) { // all inactive and shut down
tryTerminate(false);
continue;
}
}
int wc = workerCounts; // try to suspend as spare
if ((wc & RUNNING_COUNT_MASK) > pc) { if ((wc & RUNNING_COUNT_MASK) > pc) {
if (!(inactivate |= active) && // must inactivate to suspend if (!(inactivate |= active) && // must inactivate to suspend
workerCounts == wc && // try to suspend as spare workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset, UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) wc, wc - ONE_RUNNING))
w.suspendAsSpare(); w.suspendAsSpare();
} }
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
helpMaintainParallelism(); // not enough workers helpMaintainParallelism(); // not enough workers
else if (!ran) { else if (ran)
break;
else {
long h = eventWaiters; long h = eventWaiters;
int ec = eventCount; int ec = eventCount;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec) if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
...@@ -1032,8 +1046,6 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1032,8 +1046,6 @@ public class ForkJoinPool extends AbstractExecutorService {
else if (!(inactivate |= active)) else if (!(inactivate |= active))
eventSync(w, wec); // must inactivate before sync eventSync(w, wec); // must inactivate before sync
} }
else
break;
} }
} }
...@@ -1043,35 +1055,67 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1043,35 +1055,67 @@ public class ForkJoinPool extends AbstractExecutorService {
* *
* @param joinMe the task to join * @param joinMe the task to join
* @param worker the current worker thread * @param worker the current worker thread
* @param timed true if wait should time out
* @param nanos timeout value if timed
*/ */
final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) { final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
boolean timed, long nanos) {
long startTime = timed ? System.nanoTime() : 0L;
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
boolean running = true; // false when count decremented
while (joinMe.status >= 0) { while (joinMe.status >= 0) {
int wc; if (runState >= TERMINATING) {
worker.helpJoinTask(joinMe); joinMe.cancelIgnoringExceptions();
break;
}
running = worker.helpJoinTask(joinMe, running);
if (joinMe.status < 0) if (joinMe.status < 0)
break; break;
else if (retries > 0) if (retries > 0) {
--retries; --retries;
else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && continue;
UNSAFE.compareAndSwapInt(this, workerCountsOffset, }
wc, wc - ONE_RUNNING)) { int wc = workerCounts;
int stat, c; long h; if ((wc & RUNNING_COUNT_MASK) != 0) {
while ((stat = joinMe.status) >= 0 && if (running) {
(h = eventWaiters) != 0L && // help release others if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
(int)(h >>> EVENT_COUNT_SHIFT) != eventCount) wc, wc - ONE_RUNNING))
continue;
running = false;
}
long h = eventWaiters;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters(); releaseEventWaiters();
if (stat >= 0 && if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
((workerCounts & RUNNING_COUNT_MASK) == 0 || long ms; int ns;
(stat = if (!timed) {
joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0)) ms = JOIN_TIMEOUT_MILLIS;
helpMaintainParallelism(); // timeout or no running workers ns = 0;
}
else { // at most JOIN_TIMEOUT_MILLIS per wait
long nt = nanos - (System.nanoTime() - startTime);
if (nt <= 0L)
break;
ms = nt / 1000000;
if (ms > JOIN_TIMEOUT_MILLIS) {
ms = JOIN_TIMEOUT_MILLIS;
ns = 0;
}
else
ns = (int) (nt % 1000000);
}
joinMe.internalAwaitDone(ms, ns);
}
if (joinMe.status < 0)
break;
}
helpMaintainParallelism();
}
if (!running) {
int c;
do {} while (!UNSAFE.compareAndSwapInt do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset, (this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING)); c = workerCounts, c + ONE_RUNNING));
if (stat < 0)
break; // else restart
}
} }
} }
...@@ -1082,8 +1126,9 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1082,8 +1126,9 @@ public class ForkJoinPool extends AbstractExecutorService {
throws InterruptedException { throws InterruptedException {
while (!blocker.isReleasable()) { while (!blocker.isReleasable()) {
int wc = workerCounts; int wc = workerCounts;
if ((wc & RUNNING_COUNT_MASK) != 0 && if ((wc & RUNNING_COUNT_MASK) == 0)
UNSAFE.compareAndSwapInt(this, workerCountsOffset, helpMaintainParallelism();
else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING)) { wc, wc - ONE_RUNNING)) {
try { try {
while (!blocker.isReleasable()) { while (!blocker.isReleasable()) {
...@@ -1129,12 +1174,11 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1129,12 +1174,11 @@ public class ForkJoinPool extends AbstractExecutorService {
// Finish now if all threads terminated; else in some subsequent call // Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) { if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED); advanceRunLevel(TERMINATED);
termination.arrive(); termination.forceTermination();
} }
return true; return true;
} }
/** /**
* Actions on transition to TERMINATING * Actions on transition to TERMINATING
* *
...@@ -1325,17 +1369,13 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1325,17 +1369,13 @@ public class ForkJoinPool extends AbstractExecutorService {
// Execution methods // Execution methods
/** /**
* Common code for execute, invoke and submit * Submits task and creates, starts, or resumes some workers if necessary
*/ */
private <T> void doSubmit(ForkJoinTask<T> task) { private <T> void doSubmit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
if (runState >= SHUTDOWN)
throw new RejectedExecutionException();
submissionQueue.offer(task); submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
helpMaintainParallelism(); // create, start, or resume some workers helpMaintainParallelism();
} }
/** /**
...@@ -1348,9 +1388,34 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1348,9 +1388,34 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution * scheduled for execution
*/ */
public <T> T invoke(ForkJoinTask<T> task) { public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
if (runState >= SHUTDOWN)
throw new RejectedExecutionException();
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this)
return task.invoke(); // bypass submit if in same pool
else {
doSubmit(task); doSubmit(task);
return task.join(); return task.join();
} }
}
/**
* Unless terminating, forks task if within an ongoing FJ
* computation in the current pool, else submits as external task.
*/
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
if (runState >= SHUTDOWN)
throw new RejectedExecutionException();
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this)
task.fork();
else
doSubmit(task);
}
/** /**
* Arranges for (asynchronous) execution of the given task. * Arranges for (asynchronous) execution of the given task.
...@@ -1361,7 +1426,9 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1361,7 +1426,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution * scheduled for execution
*/ */
public void execute(ForkJoinTask<?> task) { public void execute(ForkJoinTask<?> task) {
doSubmit(task); if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
} }
// AbstractExecutorService methods // AbstractExecutorService methods
...@@ -1372,12 +1439,14 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1372,12 +1439,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution * scheduled for execution
*/ */
public void execute(Runnable task) { public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job; ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task; job = (ForkJoinTask<?>) task;
else else
job = ForkJoinTask.adapt(task, null); job = ForkJoinTask.adapt(task, null);
doSubmit(job); forkOrSubmit(job);
} }
/** /**
...@@ -1390,7 +1459,9 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1390,7 +1459,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution * scheduled for execution
*/ */
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
doSubmit(task); if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
return task; return task;
} }
...@@ -1400,8 +1471,10 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1400,8 +1471,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution * scheduled for execution
*/ */
public <T> ForkJoinTask<T> submit(Callable<T> task) { public <T> ForkJoinTask<T> submit(Callable<T> task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task); ForkJoinTask<T> job = ForkJoinTask.adapt(task);
doSubmit(job); forkOrSubmit(job);
return job; return job;
} }
...@@ -1411,8 +1484,10 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1411,8 +1484,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution * scheduled for execution
*/ */
public <T> ForkJoinTask<T> submit(Runnable task, T result) { public <T> ForkJoinTask<T> submit(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result); ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
doSubmit(job); forkOrSubmit(job);
return job; return job;
} }
...@@ -1422,12 +1497,14 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1422,12 +1497,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution * scheduled for execution
*/ */
public ForkJoinTask<?> submit(Runnable task) { public ForkJoinTask<?> submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job; ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task; job = (ForkJoinTask<?>) task;
else else
job = ForkJoinTask.adapt(task, null); job = ForkJoinTask.adapt(task, null);
doSubmit(job); forkOrSubmit(job);
return job; return job;
} }
...@@ -1725,8 +1802,11 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1725,8 +1802,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* commenced but not yet completed. This method may be useful for * commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient * debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have * period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, causing this executor not * ignored or suppressed interruption, or are waiting for IO,
* to properly terminate. * causing this executor not to properly terminate. (See the
* advisory notes for class {@link ForkJoinTask} stating that
* tasks should not normally entail blocking operations. But if
* they do, they must abort them on interrupt.)
* *
* @return {@code true} if terminating but not yet terminated * @return {@code true} if terminating but not yet terminated
*/ */
...@@ -1764,10 +1844,11 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1764,10 +1844,11 @@ public class ForkJoinPool extends AbstractExecutorService {
public boolean awaitTermination(long timeout, TimeUnit unit) public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException { throws InterruptedException {
try { try {
return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0; termination.awaitAdvanceInterruptibly(0, timeout, unit);
} catch (TimeoutException ex) { } catch (TimeoutException ex) {
return false; return false;
} }
return true;
} }
/** /**
......
...@@ -38,16 +38,18 @@ package java.util.concurrent; ...@@ -38,16 +38,18 @@ package java.util.concurrent;
import java.util.Random; import java.util.Random;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.RejectedExecutionException;
/** /**
* A thread managed by a {@link ForkJoinPool}. This class is * A thread managed by a {@link ForkJoinPool}, which executes
* subclassable solely for the sake of adding functionality -- there * {@link ForkJoinTask}s.
* are no overridable methods dealing with scheduling or execution. * This class is subclassable solely for the sake of adding
* However, you can override initialization and termination methods * functionality -- there are no overridable methods dealing with
* surrounding the main task processing loop. If you do create such a * scheduling or execution. However, you can override initialization
* subclass, you will also need to supply a custom {@link * and termination methods surrounding the main task processing loop.
* ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code * If you do create such a subclass, you will also need to supply a
* ForkJoinPool}. * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
* in a {@code ForkJoinPool}.
* *
* @since 1.7 * @since 1.7
* @author Doug Lea * @author Doug Lea
...@@ -376,7 +378,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -376,7 +378,7 @@ public class ForkJoinWorkerThread extends Thread {
/** /**
* Initializes internal state after construction but before * Initializes internal state after construction but before
* processing any tasks. If you override this method, you must * processing any tasks. If you override this method, you must
* invoke @code{super.onStart()} at the beginning of the method. * invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal * Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other * default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts * threads work correctly even before this thread starts
...@@ -384,7 +386,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -384,7 +386,7 @@ public class ForkJoinWorkerThread extends Thread {
*/ */
protected void onStart() { protected void onStart() {
int rs = seedGenerator.nextInt(); int rs = seedGenerator.nextInt();
seed = rs == 0? 1 : rs; // seed must be nonzero seed = (rs == 0) ? 1 : rs; // seed must be nonzero
// Allocate name string and arrays in this thread // Allocate name string and arrays in this thread
String pid = Integer.toString(pool.getPoolNumber()); String pid = Integer.toString(pool.getPoolNumber());
...@@ -426,7 +428,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -426,7 +428,7 @@ public class ForkJoinWorkerThread extends Thread {
/** /**
* This method is required to be public, but should never be * This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute * called explicitly. It performs the main run loop to execute
* ForkJoinTasks. * {@link ForkJoinTask}s.
*/ */
public void run() { public void run() {
Throwable exception = null; Throwable exception = null;
...@@ -628,6 +630,19 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -628,6 +630,19 @@ public class ForkJoinWorkerThread extends Thread {
if (t == null) // lost to stealer if (t == null) // lost to stealer
break; break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) { if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
/*
* Note: here and in related methods, as a
* performance (not correctness) issue, we'd like
* to encourage compiler not to arbitrarily
* postpone setting sp after successful CAS.
* Currently there is no intrinsic for arranging
* this, but using Unsafe putOrderedInt may be a
* preferable strategy on some compilers even
* though its main effect is a pre-, not post-
* fence. To simplify possible changes, the option
* is left in comments next to the associated
* assignments.
*/
sp = s; // putOrderedInt may encourage more timely write sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s); // UNSAFE.putOrderedInt(this, spOffset, s);
return t; return t;
...@@ -884,8 +899,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -884,8 +899,7 @@ public class ForkJoinWorkerThread extends Thread {
*/ */
final void cancelTasks() { final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
if (cj != null) { if (cj != null && cj.status >= 0) {
currentJoin = null;
cj.cancelIgnoringExceptions(); cj.cancelIgnoringExceptions();
try { try {
this.interrupt(); // awaken wait this.interrupt(); // awaken wait
...@@ -893,10 +907,8 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -893,10 +907,8 @@ public class ForkJoinWorkerThread extends Thread {
} }
} }
ForkJoinTask<?> cs = currentSteal; ForkJoinTask<?> cs = currentSteal;
if (cs != null) { if (cs != null && cs.status >= 0)
currentSteal = null;
cs.cancelIgnoringExceptions(); cs.cancelIgnoringExceptions();
}
while (base != sp) { while (base != sp) {
ForkJoinTask<?> t = deqTask(); ForkJoinTask<?> t = deqTask();
if (t != null) if (t != null)
...@@ -959,57 +971,23 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -959,57 +971,23 @@ public class ForkJoinWorkerThread extends Thread {
* Possibly runs some tasks and/or blocks, until task is done. * Possibly runs some tasks and/or blocks, until task is done.
* *
* @param joinMe the task to join * @param joinMe the task to join
* @param timed true if use timed wait
* @param nanos wait time if timed
*/ */
final void joinTask(ForkJoinTask<?> joinMe) { final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
// currentJoin only written by this thread; only need ordered store // currentJoin only written by this thread; only need ordered store
ForkJoinTask<?> prevJoin = currentJoin; ForkJoinTask<?> prevJoin = currentJoin;
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
if (sp != base) pool.awaitJoin(joinMe, this, timed, nanos);
localHelpJoinTask(joinMe);
if (joinMe.status >= 0)
pool.awaitJoin(joinMe, this);
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
} }
/** /**
* Run tasks in local queue until given task is done. * Tries to locate and help perform tasks for a stealer of the
* * given task, or in turn one of its stealers. Traces
* @param joinMe the task to join * currentSteal->currentJoin links looking for a thread working on
*/ * a descendant of the given task and with a non-empty queue to
private void localHelpJoinTask(ForkJoinTask<?> joinMe) { * steal back and execute tasks from.
int s;
ForkJoinTask<?>[] q;
while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
int i = (q.length - 1) & --s;
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to a stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
/*
* This recheck (and similarly in helpJoinTask)
* handles cases where joinMe is independently
* cancelled or forced even though there is other work
* available. Back out of the pop by putting t back
* into slot before we commit by writing sp.
*/
if (joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t);
break;
}
sp = s;
// UNSAFE.putOrderedInt(this, spOffset, s);
t.quietlyExec();
}
}
}
/**
* Unless terminating, tries to locate and help perform tasks for
* a stealer of the given task, or in turn one of its stealers.
* Traces currentSteal->currentJoin links looking for a thread
* working on a descendant of the given task and with a non-empty
* queue to steal back and execute tasks from.
* *
* The implementation is very branchy to cope with potential * The implementation is very branchy to cope with potential
* inconsistencies or loops encountering chains that are stale, * inconsistencies or loops encountering chains that are stale,
...@@ -1019,22 +997,54 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -1019,22 +997,54 @@ public class ForkJoinWorkerThread extends Thread {
* don't work out. * don't work out.
* *
* @param joinMe the task to join * @param joinMe the task to join
* @param running if false, then must update pool count upon
* running a task
* @return value of running on exit
*/ */
final void helpJoinTask(ForkJoinTask<?> joinMe) { final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
ForkJoinWorkerThread[] ws; /*
int n; * Initial checks to (1) abort if terminating; (2) clean out
if (joinMe.status < 0) // already done * old cancelled tasks from local queue; (3) if joinMe is next
return; * task, run it; (4) omit scan if local queue nonempty (since
if ((runState & TERMINATING) != 0) { // cancel if shutting down * it may contain non-descendents of joinMe).
*/
ForkJoinPool p = pool;
for (;;) {
ForkJoinTask<?>[] q;
int s;
if (joinMe.status < 0)
return running;
else if ((runState & TERMINATING) != 0) {
joinMe.cancelIgnoringExceptions(); joinMe.cancelIgnoringExceptions();
return; return running;
}
else if ((s = sp) == base || (q = queue) == null)
break; // queue empty
else {
int i = (q.length - 1) & --s;
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null)
break; // lost to a stealer
else if (t != joinMe && t.status >= 0)
return running; // cannot safely help
else if ((running ||
(running = p.tryIncrementRunningCount())) &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
t.quietlyExec();
}
}
} }
if ((ws = pool.workers) == null || (n = ws.length) <= 1)
return; // need at least 2 workers
int n; // worker array size
ForkJoinWorkerThread[] ws = p.workers;
if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
ForkJoinTask<?> task = joinMe; // base of chain ForkJoinTask<?> task = joinMe; // base of chain
ForkJoinWorkerThread thread = this; // thread with stolen task ForkJoinWorkerThread thread = this; // thread with stolen task
for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
// Try to find v, the stealer of task, by first using hint // Try to find v, the stealer of task, by first using hint
ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
if (v == null || v.currentSteal != task) { if (v == null || v.currentSteal != task) {
...@@ -1043,54 +1053,72 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -1043,54 +1053,72 @@ public class ForkJoinWorkerThread extends Thread {
ForkJoinTask<?> vs; ForkJoinTask<?> vs;
if ((v = ws[j]) != null && if ((v = ws[j]) != null &&
(vs = v.currentSteal) != null) { (vs = v.currentSteal) != null) {
if (joinMe.status < 0 || task.status < 0) if (joinMe.status < 0)
return; // stale or done break outer;
if (vs == task) { if (vs == task) {
if (task.status < 0)
break outer; // stale
thread.stealHint = j; thread.stealHint = j;
break; // save hint for next time break; // save hint for next time
} }
} }
} }
else else
return; // no stealer break outer; // no stealer
} }
} }
for (;;) { // Try to help v, using specialized form of deqTask
// Try to help v, using specialized form of deqTask
for (;;) {
if (joinMe.status < 0) if (joinMe.status < 0)
return; break outer;
int b = v.base; int b = v.base;
ForkJoinTask<?>[] q = v.queue; ForkJoinTask<?>[] q = v.queue;
if (b == v.sp || q == null) if (b == v.sp || q == null)
break; break; // empty
int i = (q.length - 1) & b; int i = (q.length - 1) & b;
long u = (i << qShift) + qBase; long u = (i << qShift) + qBase;
ForkJoinTask<?> t = q[i]; ForkJoinTask<?> t = q[i];
int pid = poolIndex;
ForkJoinTask<?> ps = currentSteal;
if (task.status < 0) if (task.status < 0)
return; // stale or done break outer; // stale
if (t != null && v.base == b++ && if (t != null &&
(running ||
(running = p.tryIncrementRunningCount())) &&
v.base == b++ &&
UNSAFE.compareAndSwapObject(q, u, t, null)) { UNSAFE.compareAndSwapObject(q, u, t, null)) {
if (joinMe.status < 0) { if (t != joinMe && joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t); UNSAFE.putObjectVolatile(q, u, t);
return; // back out on cancel break outer; // joinMe cancelled; back out
} }
v.base = b; v.base = b;
if (t.status >= 0) {
ForkJoinTask<?> ps = currentSteal;
int pid = poolIndex;
v.stealHint = pid; v.stealHint = pid;
UNSAFE.putOrderedObject(this, currentStealOffset, t); UNSAFE.putOrderedObject(this,
currentStealOffset, t);
t.quietlyExec(); t.quietlyExec();
UNSAFE.putOrderedObject(this, currentStealOffset, ps); UNSAFE.putOrderedObject(this,
currentStealOffset, ps);
}
}
else if ((runState & TERMINATING) != 0) {
joinMe.cancelIgnoringExceptions();
break outer;
} }
} }
// Try to descend to find v's stealer // Try to descend to find v's stealer
ForkJoinTask<?> next = v.currentJoin; ForkJoinTask<?> next = v.currentJoin;
if (task.status < 0 || next == null || next == task || if (task.status < 0 || next == null || next == task ||
joinMe.status < 0) joinMe.status < 0)
return; break; // done, stale, dead-end, or cyclic
task = next; task = next;
thread = v; thread = v;
} }
} }
return running;
}
/** /**
* Implements ForkJoinTask.getSurplusQueuedTaskCount(). * Implements ForkJoinTask.getSurplusQueuedTaskCount().
......
...@@ -1029,6 +1029,8 @@ public class LinkedBlockingDeque<E> ...@@ -1029,6 +1029,8 @@ public class LinkedBlockingDeque<E>
* elements as they existed upon construction of the iterator, and * elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications * may (but is not guaranteed to) reflect any modifications
* subsequent to construction. * subsequent to construction.
*
* @return an iterator over the elements in this deque in reverse order
*/ */
public Iterator<E> descendingIterator() { public Iterator<E> descendingIterator() {
return new DescendingItr(); return new DescendingItr();
......
...@@ -189,14 +189,14 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -189,14 +189,14 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
} }
/** /**
* Creates a node and links it at end of queue. * Links node at end of queue.
* *
* @param x the item * @param node the node
*/ */
private void enqueue(E x) { private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread(); // assert putLock.isHeldByCurrentThread();
// assert last.next == null; // assert last.next == null;
last = last.next = new Node<E>(x); last = last.next = node;
} }
/** /**
...@@ -282,7 +282,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -282,7 +282,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
throw new NullPointerException(); throw new NullPointerException();
if (n == capacity) if (n == capacity)
throw new IllegalStateException("Queue full"); throw new IllegalStateException("Queue full");
enqueue(e); enqueue(new Node<E>(e));
++n; ++n;
} }
count.set(n); count.set(n);
...@@ -332,6 +332,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -332,6 +332,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
// Note: convention in all put/take/etc is to preset local var // Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set. // holding count negative to indicate failure unless set.
int c = -1; int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock; final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count; final AtomicInteger count = this.count;
putLock.lockInterruptibly(); putLock.lockInterruptibly();
...@@ -347,7 +348,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -347,7 +348,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
while (count.get() == capacity) { while (count.get() == capacity) {
notFull.await(); notFull.await();
} }
enqueue(e); enqueue(node);
c = count.getAndIncrement(); c = count.getAndIncrement();
if (c + 1 < capacity) if (c + 1 < capacity)
notFull.signal(); notFull.signal();
...@@ -382,7 +383,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -382,7 +383,7 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
return false; return false;
nanos = notFull.awaitNanos(nanos); nanos = notFull.awaitNanos(nanos);
} }
enqueue(e); enqueue(new Node<E>(e));
c = count.getAndIncrement(); c = count.getAndIncrement();
if (c + 1 < capacity) if (c + 1 < capacity)
notFull.signal(); notFull.signal();
...@@ -411,11 +412,12 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -411,11 +412,12 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
if (count.get() == capacity) if (count.get() == capacity)
return false; return false;
int c = -1; int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock; final ReentrantLock putLock = this.putLock;
putLock.lock(); putLock.lock();
try { try {
if (count.get() < capacity) { if (count.get() < capacity) {
enqueue(e); enqueue(node);
c = count.getAndIncrement(); c = count.getAndIncrement();
if (c + 1 < capacity) if (c + 1 < capacity)
notFull.signal(); notFull.signal();
...@@ -559,6 +561,27 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -559,6 +561,27 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
} }
} }
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
/** /**
* Returns an array containing all of the elements in this queue, in * Returns an array containing all of the elements in this queue, in
* proper sequence. * proper sequence.
...@@ -645,7 +668,20 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -645,7 +668,20 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
public String toString() { public String toString() {
fullyLock(); fullyLock();
try { try {
return super.toString(); Node<E> p = head.next;
if (p == null)
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null)
return sb.append(']').toString();
sb.append(',').append(' ');
}
} finally { } finally {
fullyUnlock(); fullyUnlock();
} }
...@@ -727,12 +763,14 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E> ...@@ -727,12 +763,14 @@ public class LinkedBlockingQueue<E> extends AbstractQueue<E>
/** /**
* Returns an iterator over the elements in this queue in proper sequence. * Returns an iterator over the elements in this queue in proper sequence.
* The returned {@code Iterator} is a "weakly consistent" iterator that * The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException * will never throw {@link java.util.ConcurrentModificationException
* ConcurrentModificationException}, * ConcurrentModificationException}, and guarantees to traverse
* and guarantees to traverse elements as they existed upon * elements as they existed upon construction of the iterator, and
* construction of the iterator, and may (but is not guaranteed to) * may (but is not guaranteed to) reflect any modifications
* reflect any modifications subsequent to construction. * subsequent to construction.
* *
* @return an iterator over the elements in this queue in proper sequence * @return an iterator over the elements in this queue in proper sequence
*/ */
......
...@@ -37,10 +37,10 @@ package java.util.concurrent; ...@@ -37,10 +37,10 @@ package java.util.concurrent;
import java.util.AbstractQueue; import java.util.AbstractQueue;
import java.util.Collection; import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
/** /**
...@@ -588,7 +588,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> ...@@ -588,7 +588,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
throw new NullPointerException(); throw new NullPointerException();
Node s = null; // the node to append, if needed Node s = null; // the node to append, if needed
retry: for (;;) { // restart on append race retry:
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData; boolean isData = p.isData;
...@@ -599,7 +600,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> ...@@ -599,7 +600,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
if (p.casItem(item, e)) { // match if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) { for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null? q : n)) { if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext(); h.forgetNext();
break; break;
} // advance and retry } // advance and retry
...@@ -809,22 +810,61 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> ...@@ -809,22 +810,61 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* Moves to next node after prev, or first node if prev null. * Moves to next node after prev, or first node if prev null.
*/ */
private void advance(Node prev) { private void advance(Node prev) {
lastPred = lastRet; /*
lastRet = prev; * To track and avoid buildup of deleted nodes in the face
for (Node p = (prev == null) ? head : succ(prev); * of calls to both Queue.remove and Itr.remove, we must
p != null; p = succ(p)) { * include variants of unsplice and sweep upon each
Object item = p.item; * advance: Upon Itr.remove, we may need to catch up links
if (p.isData) { * from lastPred, and upon other removes, we might need to
if (item != null && item != p) { * skip ahead from stale nodes and unsplice deleted ones
nextItem = LinkedTransferQueue.this.<E>cast(item); * found while advancing.
nextNode = p; */
Node r, b; // reset lastPred upon possible deletion of lastRet
if ((r = lastRet) != null && !r.isMatched())
lastPred = r; // next lastPred is old lastRet
else if ((b = lastPred) == null || b.isMatched())
lastPred = null; // at start of list
else {
Node s, n; // help with removal of lastPred.next
while ((s = b.next) != null &&
s != b && s.isMatched() &&
(n = s.next) != null && n != s)
b.casNext(s, n);
}
this.lastRet = prev;
for (Node p = prev, s, n;;) {
s = (p == null) ? head : p.next;
if (s == null)
break;
else if (s == p) {
p = null;
continue;
}
Object item = s.item;
if (s.isData) {
if (item != null && item != s) {
nextItem = LinkedTransferQueue.<E>cast(item);
nextNode = s;
return; return;
} }
} }
else if (item == null) else if (item == null)
break; break;
// assert s.isMatched();
if (p == null)
p = s;
else if ((n = s.next) == null)
break;
else if (s == n)
p = null;
else
p.casNext(s, n);
} }
nextNode = null; nextNode = null;
nextItem = null;
} }
Itr() { Itr() {
...@@ -844,10 +884,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> ...@@ -844,10 +884,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
public final void remove() { public final void remove() {
Node p = lastRet; final Node lastRet = this.lastRet;
if (p == null) throw new IllegalStateException(); if (lastRet == null)
if (p.tryMatchData()) throw new IllegalStateException();
unsplice(lastPred, p); this.lastRet = null;
if (lastRet.tryMatchData())
unsplice(lastPred, lastRet);
} }
} }
...@@ -997,8 +1039,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> ...@@ -997,8 +1039,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* Inserts the specified element at the tail of this queue. * Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}. * As the queue is unbounded, this method will never return {@code false}.
* *
* @return {@code true} (as specified by * @return {@code true} (as specified by {@link Queue#offer})
* {@link BlockingQueue#offer(Object) BlockingQueue.offer})
* @throws NullPointerException if the specified element is null * @throws NullPointerException if the specified element is null
*/ */
public boolean offer(E e) { public boolean offer(E e) {
...@@ -1130,15 +1171,15 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> ...@@ -1130,15 +1171,15 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
/** /**
* Returns an iterator over the elements in this queue in proper * Returns an iterator over the elements in this queue in proper sequence.
* sequence, from head to tail. * The elements will be returned in order from first (head) to last (tail).
* *
* <p>The returned iterator is a "weakly consistent" iterator that * <p>The returned iterator is a "weakly consistent" iterator that
* will never throw * will never throw {@link java.util.ConcurrentModificationException
* {@link ConcurrentModificationException ConcurrentModificationException}, * ConcurrentModificationException}, and guarantees to traverse
* and guarantees to traverse elements as they existed upon * elements as they existed upon construction of the iterator, and
* construction of the iterator, and may (but is not guaranteed * may (but is not guaranteed to) reflect any modifications
* to) reflect any modifications subsequent to construction. * subsequent to construction.
* *
* @return an iterator over the elements in this queue in proper sequence * @return an iterator over the elements in this queue in proper sequence
*/ */
...@@ -1202,6 +1243,28 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> ...@@ -1202,6 +1243,28 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
return findAndRemove(o); return findAndRemove(o);
} }
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
for (Node p = head; p != null; p = succ(p)) {
Object item = p.item;
if (p.isData) {
if (item != null && item != p && o.equals(item))
return true;
}
else if (item == null)
break;
}
return false;
}
/** /**
* Always returns {@code Integer.MAX_VALUE} because a * Always returns {@code Integer.MAX_VALUE} because a
* {@code LinkedTransferQueue} is not capacity constrained. * {@code LinkedTransferQueue} is not capacity constrained.
......
...@@ -360,8 +360,12 @@ public class ScheduledThreadPoolExecutor ...@@ -360,8 +360,12 @@ public class ScheduledThreadPoolExecutor
getExecuteExistingDelayedTasksAfterShutdownPolicy(); getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic = boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy(); getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic) if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear(); q.clear();
}
else { else {
// Traverse snapshot to avoid iterator exceptions // Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) { for (Object e : q.toArray()) {
......
...@@ -163,7 +163,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -163,7 +163,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
/** /**
* Shared internal API for dual stacks and queues. * Shared internal API for dual stacks and queues.
*/ */
static abstract class Transferer { abstract static class Transferer {
/** /**
* Performs a put or take. * Performs a put or take.
* *
...@@ -190,7 +190,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -190,7 +190,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* seems not to vary with number of CPUs (beyond 2) so is just * seems not to vary with number of CPUs (beyond 2) so is just
* a constant. * a constant.
*/ */
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/** /**
* The number of times to spin before blocking in untimed waits. * The number of times to spin before blocking in untimed waits.
...@@ -241,19 +241,11 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -241,19 +241,11 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
this.item = item; this.item = item;
} }
static final AtomicReferenceFieldUpdater<SNode, SNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "next");
boolean casNext(SNode cmp, SNode val) { boolean casNext(SNode cmp, SNode val) {
return (cmp == next && return cmp == next &&
nextUpdater.compareAndSet(this, cmp, val)); UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
} }
static final AtomicReferenceFieldUpdater<SNode, SNode>
matchUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "match");
/** /**
* Tries to match node s to this node, if so, waking up thread. * Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters. * Fulfillers call tryMatch to identify their waiters.
...@@ -264,7 +256,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -264,7 +256,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
*/ */
boolean tryMatch(SNode s) { boolean tryMatch(SNode s) {
if (match == null && if (match == null &&
matchUpdater.compareAndSet(this, null, s)) { UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter; Thread w = waiter;
if (w != null) { // waiters need at most one unpark if (w != null) { // waiters need at most one unpark
waiter = null; waiter = null;
...@@ -279,23 +271,28 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -279,23 +271,28 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* Tries to cancel a wait by matching node to itself. * Tries to cancel a wait by matching node to itself.
*/ */
void tryCancel() { void tryCancel() {
matchUpdater.compareAndSet(this, null, this); UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
} }
boolean isCancelled() { boolean isCancelled() {
return match == this; return match == this;
} }
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long nextOffset =
objectFieldOffset(UNSAFE, "next", SNode.class);
private static final long matchOffset =
objectFieldOffset(UNSAFE, "match", SNode.class);
} }
/** The head (top) of the stack */ /** The head (top) of the stack */
volatile SNode head; volatile SNode head;
static final AtomicReferenceFieldUpdater<TransferStack, SNode>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferStack.class, SNode.class, "head");
boolean casHead(SNode h, SNode nh) { boolean casHead(SNode h, SNode nh) {
return h == head && headUpdater.compareAndSet(this, h, nh); return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
} }
/** /**
...@@ -338,7 +335,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -338,7 +335,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
*/ */
SNode s = null; // constructed/reused as needed SNode s = null; // constructed/reused as needed
int mode = (e == null)? REQUEST : DATA; int mode = (e == null) ? REQUEST : DATA;
for (;;) { for (;;) {
SNode h = head; SNode h = head;
...@@ -356,7 +353,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -356,7 +353,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
} }
if ((h = head) != null && h.next == s) if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller casHead(h, s.next); // help s's fulfiller
return mode == REQUEST? m.item : s.item; return (mode == REQUEST) ? m.item : s.item;
} }
} else if (!isFulfilling(h.mode)) { // try to fulfill } else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled if (h.isCancelled()) // already cancelled
...@@ -372,7 +369,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -372,7 +369,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
SNode mn = m.next; SNode mn = m.next;
if (m.tryMatch(s)) { if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m casHead(s, mn); // pop both s and m
return (mode == REQUEST)? m.item : s.item; return (mode == REQUEST) ? m.item : s.item;
} else // lost match } else // lost match
s.casNext(m, mn); // help unlink s.casNext(m, mn); // help unlink
} }
...@@ -423,11 +420,11 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -423,11 +420,11 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* and don't wait at all, so are trapped in transfer * and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill. * method rather than calling awaitFulfill.
*/ */
long lastTime = (timed)? System.nanoTime() : 0; long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread(); Thread w = Thread.currentThread();
SNode h = head; SNode h = head;
int spins = (shouldSpin(s)? int spins = (shouldSpin(s) ?
(timed? maxTimedSpins : maxUntimedSpins) : 0); (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) { for (;;) {
if (w.isInterrupted()) if (w.isInterrupted())
s.tryCancel(); s.tryCancel();
...@@ -444,7 +441,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -444,7 +441,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
} }
} }
if (spins > 0) if (spins > 0)
spins = shouldSpin(s)? (spins-1) : 0; spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null) else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter s.waiter = w; // establish waiter so can park next iter
else if (!timed) else if (!timed)
...@@ -499,6 +496,12 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -499,6 +496,12 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
p = n; p = n;
} }
} }
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long headOffset =
objectFieldOffset(UNSAFE, "head", TransferStack.class);
} }
/** Dual Queue */ /** Dual Queue */
...@@ -524,29 +527,21 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -524,29 +527,21 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
this.isData = isData; this.isData = isData;
} }
static final AtomicReferenceFieldUpdater<QNode, QNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, QNode.class, "next");
boolean casNext(QNode cmp, QNode val) { boolean casNext(QNode cmp, QNode val) {
return (next == cmp && return next == cmp &&
nextUpdater.compareAndSet(this, cmp, val)); UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
} }
static final AtomicReferenceFieldUpdater<QNode, Object>
itemUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, Object.class, "item");
boolean casItem(Object cmp, Object val) { boolean casItem(Object cmp, Object val) {
return (item == cmp && return item == cmp &&
itemUpdater.compareAndSet(this, cmp, val)); UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
} }
/** /**
* Tries to cancel by CAS'ing ref to this as item. * Tries to cancel by CAS'ing ref to this as item.
*/ */
void tryCancel(Object cmp) { void tryCancel(Object cmp) {
itemUpdater.compareAndSet(this, cmp, this); UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
} }
boolean isCancelled() { boolean isCancelled() {
...@@ -561,6 +556,13 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -561,6 +556,13 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
boolean isOffList() { boolean isOffList() {
return next == this; return next == this;
} }
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long nextOffset =
objectFieldOffset(UNSAFE, "next", QNode.class);
private static final long itemOffset =
objectFieldOffset(UNSAFE, "item", QNode.class);
} }
/** Head of queue */ /** Head of queue */
...@@ -580,41 +582,30 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -580,41 +582,30 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
tail = h; tail = h;
} }
static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferQueue.class, QNode.class, "head");
/** /**
* Tries to cas nh as new head; if successful, unlink * Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention. * old head's next node to avoid garbage retention.
*/ */
void advanceHead(QNode h, QNode nh) { void advanceHead(QNode h, QNode nh) {
if (h == head && headUpdater.compareAndSet(this, h, nh)) if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next h.next = h; // forget old next
} }
static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
tailUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferQueue.class, QNode.class, "tail");
/** /**
* Tries to cas nt as new tail. * Tries to cas nt as new tail.
*/ */
void advanceTail(QNode t, QNode nt) { void advanceTail(QNode t, QNode nt) {
if (tail == t) if (tail == t)
tailUpdater.compareAndSet(this, t, nt); UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
} }
static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferQueue.class, QNode.class, "cleanMe");
/** /**
* Tries to CAS cleanMe slot. * Tries to CAS cleanMe slot.
*/ */
boolean casCleanMe(QNode cmp, QNode val) { boolean casCleanMe(QNode cmp, QNode val) {
return (cleanMe == cmp && return cleanMe == cmp &&
cleanMeUpdater.compareAndSet(this, cmp, val)); UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
} }
/** /**
...@@ -683,7 +674,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -683,7 +674,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
s.item = s; s.item = s;
s.waiter = null; s.waiter = null;
} }
return (x != null)? x : e; return (x != null) ? x : e;
} else { // complementary-mode } else { // complementary-mode
QNode m = h.next; // node to fulfill QNode m = h.next; // node to fulfill
...@@ -700,7 +691,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -700,7 +691,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
advanceHead(h, m); // successfully fulfilled advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter); LockSupport.unpark(m.waiter);
return (x != null)? x : e; return (x != null) ? x : e;
} }
} }
} }
...@@ -716,10 +707,10 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -716,10 +707,10 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
*/ */
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */ /* Same idea as TransferStack.awaitFulfill */
long lastTime = (timed)? System.nanoTime() : 0; long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread(); Thread w = Thread.currentThread();
int spins = ((head.next == s) ? int spins = ((head.next == s) ?
(timed? maxTimedSpins : maxUntimedSpins) : 0); (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) { for (;;) {
if (w.isInterrupted()) if (w.isInterrupted())
s.tryCancel(e); s.tryCancel(e);
...@@ -799,6 +790,16 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -799,6 +790,16 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
return; // Postpone cleaning s return; // Postpone cleaning s
} }
} }
// unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long headOffset =
objectFieldOffset(UNSAFE, "head", TransferQueue.class);
private static final long tailOffset =
objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
private static final long cleanMeOffset =
objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
} }
/** /**
...@@ -824,7 +825,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -824,7 +825,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
* access; otherwise the order is unspecified. * access; otherwise the order is unspecified.
*/ */
public SynchronousQueue(boolean fair) { public SynchronousQueue(boolean fair) {
transferer = (fair)? new TransferQueue() : new TransferStack(); transferer = fair ? new TransferQueue() : new TransferStack();
} }
/** /**
...@@ -1141,4 +1142,17 @@ public class SynchronousQueue<E> extends AbstractQueue<E> ...@@ -1141,4 +1142,17 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
transferer = new TransferStack(); transferer = new TransferStack();
} }
// Unsafe mechanics
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}
} }
...@@ -1841,6 +1841,43 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -1841,6 +1841,43 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
} }
} }
/**
* Returns a string identifying this pool, as well as its state,
* including indications of run state and estimated worker and
* task counts.
*
* @return a string identifying this pool, as well as its state
*/
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
/* Extension hooks */ /* Extension hooks */
/** /**
...@@ -1961,7 +1998,9 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -1961,7 +1998,9 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* @throws RejectedExecutionException always. * @throws RejectedExecutionException always.
*/ */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException(); throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
} }
} }
......
...@@ -48,28 +48,37 @@ import java.util.*; ...@@ -48,28 +48,37 @@ import java.util.*;
public class AtomicIntegerArray implements java.io.Serializable { public class AtomicIntegerArray implements java.io.Serializable {
private static final long serialVersionUID = 2862133569453604235L; private static final long serialVersionUID = 2862133569453604235L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class); private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int scale = unsafe.arrayIndexScale(int[].class); private static final int shift;
private final int[] array; private final int[] array;
private long rawIndex(int i) { static {
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length) if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i); throw new IndexOutOfBoundsException("index " + i);
return base + (long) i * scale;
return byteOffset(i);
}
private static long byteOffset(int i) {
return ((long) i << shift) + base;
} }
/** /**
* Creates a new AtomicIntegerArray of given length. * Creates a new AtomicIntegerArray of the given length, with all
* elements initially zero.
* *
* @param length the length of the array * @param length the length of the array
*/ */
public AtomicIntegerArray(int length) { public AtomicIntegerArray(int length) {
array = new int[length]; array = new int[length];
// must perform at least one volatile write to conform to JMM
if (length > 0)
unsafe.putIntVolatile(array, rawIndex(0), 0);
} }
/** /**
...@@ -80,17 +89,8 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -80,17 +89,8 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @throws NullPointerException if array is null * @throws NullPointerException if array is null
*/ */
public AtomicIntegerArray(int[] array) { public AtomicIntegerArray(int[] array) {
if (array == null) // Visibility guaranteed by final field guarantees
throw new NullPointerException(); this.array = array.clone();
int length = array.length;
this.array = new int[length];
if (length > 0) {
int last = length-1;
for (int i = 0; i < last; ++i)
this.array[i] = array[i];
// Do the last write as volatile
unsafe.putIntVolatile(this.array, rawIndex(last), array[last]);
}
} }
/** /**
...@@ -109,7 +109,11 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -109,7 +109,11 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the current value * @return the current value
*/ */
public final int get(int i) { public final int get(int i) {
return unsafe.getIntVolatile(array, rawIndex(i)); return getRaw(checkedByteOffset(i));
}
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
} }
/** /**
...@@ -119,7 +123,7 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -119,7 +123,7 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @param newValue the new value * @param newValue the new value
*/ */
public final void set(int i, int newValue) { public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, rawIndex(i), newValue); unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
} }
/** /**
...@@ -130,7 +134,7 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -130,7 +134,7 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @since 1.6 * @since 1.6
*/ */
public final void lazySet(int i, int newValue) { public final void lazySet(int i, int newValue) {
unsafe.putOrderedInt(array, rawIndex(i), newValue); unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
} }
/** /**
...@@ -142,9 +146,10 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -142,9 +146,10 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the previous value * @return the previous value
*/ */
public final int getAndSet(int i, int newValue) { public final int getAndSet(int i, int newValue) {
long offset = checkedByteOffset(i);
while (true) { while (true) {
int current = get(i); int current = getRaw(offset);
if (compareAndSet(i, current, newValue)) if (compareAndSetRaw(offset, current, newValue))
return current; return current;
} }
} }
...@@ -160,8 +165,11 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -160,8 +165,11 @@ public class AtomicIntegerArray implements java.io.Serializable {
* the actual value was not equal to the expected value. * the actual value was not equal to the expected value.
*/ */
public final boolean compareAndSet(int i, int expect, int update) { public final boolean compareAndSet(int i, int expect, int update) {
return unsafe.compareAndSwapInt(array, rawIndex(i), return compareAndSetRaw(checkedByteOffset(i), expect, update);
expect, update); }
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
} }
/** /**
...@@ -188,12 +196,7 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -188,12 +196,7 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the previous value * @return the previous value
*/ */
public final int getAndIncrement(int i) { public final int getAndIncrement(int i) {
while (true) { return getAndAdd(i, 1);
int current = get(i);
int next = current + 1;
if (compareAndSet(i, current, next))
return current;
}
} }
/** /**
...@@ -203,12 +206,7 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -203,12 +206,7 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the previous value * @return the previous value
*/ */
public final int getAndDecrement(int i) { public final int getAndDecrement(int i) {
while (true) { return getAndAdd(i, -1);
int current = get(i);
int next = current - 1;
if (compareAndSet(i, current, next))
return current;
}
} }
/** /**
...@@ -219,10 +217,10 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -219,10 +217,10 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the previous value * @return the previous value
*/ */
public final int getAndAdd(int i, int delta) { public final int getAndAdd(int i, int delta) {
long offset = checkedByteOffset(i);
while (true) { while (true) {
int current = get(i); int current = getRaw(offset);
int next = current + delta; if (compareAndSetRaw(offset, current, current + delta))
if (compareAndSet(i, current, next))
return current; return current;
} }
} }
...@@ -234,12 +232,7 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -234,12 +232,7 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the updated value * @return the updated value
*/ */
public final int incrementAndGet(int i) { public final int incrementAndGet(int i) {
while (true) { return addAndGet(i, 1);
int current = get(i);
int next = current + 1;
if (compareAndSet(i, current, next))
return next;
}
} }
/** /**
...@@ -249,12 +242,7 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -249,12 +242,7 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the updated value * @return the updated value
*/ */
public final int decrementAndGet(int i) { public final int decrementAndGet(int i) {
while (true) { return addAndGet(i, -1);
int current = get(i);
int next = current - 1;
if (compareAndSet(i, current, next))
return next;
}
} }
/** /**
...@@ -265,22 +253,32 @@ public class AtomicIntegerArray implements java.io.Serializable { ...@@ -265,22 +253,32 @@ public class AtomicIntegerArray implements java.io.Serializable {
* @return the updated value * @return the updated value
*/ */
public final int addAndGet(int i, int delta) { public final int addAndGet(int i, int delta) {
long offset = checkedByteOffset(i);
while (true) { while (true) {
int current = get(i); int current = getRaw(offset);
int next = current + delta; int next = current + delta;
if (compareAndSet(i, current, next)) if (compareAndSetRaw(offset, current, next))
return next; return next;
} }
} }
/** /**
* Returns the String representation of the current values of array. * Returns the String representation of the current values of array.
* @return the String representation of the current values of array. * @return the String representation of the current values of array
*/ */
public String toString() { public String toString() {
if (array.length > 0) // force volatile read int iMax = array.length - 1;
get(0); if (iMax == -1)
return Arrays.toString(array); return "[]";
StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; ; i++) {
b.append(getRaw(byteOffset(i)));
if (i == iMax)
return b.append(']').toString();
b.append(',').append(' ');
}
} }
} }
...@@ -38,8 +38,8 @@ package java.util.concurrent.atomic; ...@@ -38,8 +38,8 @@ package java.util.concurrent.atomic;
/** /**
* An {@code AtomicMarkableReference} maintains an object reference * An {@code AtomicMarkableReference} maintains an object reference
* along with a mark bit, that can be updated atomically. * along with a mark bit, that can be updated atomically.
* <p> *
* <p> Implementation note. This implementation maintains markable * <p>Implementation note: This implementation maintains markable
* references by creating internal objects representing "boxed" * references by creating internal objects representing "boxed"
* [reference, boolean] pairs. * [reference, boolean] pairs.
* *
...@@ -49,15 +49,19 @@ package java.util.concurrent.atomic; ...@@ -49,15 +49,19 @@ package java.util.concurrent.atomic;
*/ */
public class AtomicMarkableReference<V> { public class AtomicMarkableReference<V> {
private static class ReferenceBooleanPair<T> { private static class Pair<T> {
private final T reference; final T reference;
private final boolean bit; final boolean mark;
ReferenceBooleanPair(T r, boolean i) { private Pair(T reference, boolean mark) {
reference = r; bit = i; this.reference = reference;
this.mark = mark;
}
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
} }
} }
private final AtomicReference<ReferenceBooleanPair<V>> atomicRef; private volatile Pair<V> pair;
/** /**
* Creates a new {@code AtomicMarkableReference} with the given * Creates a new {@code AtomicMarkableReference} with the given
...@@ -67,7 +71,7 @@ public class AtomicMarkableReference<V> { ...@@ -67,7 +71,7 @@ public class AtomicMarkableReference<V> {
* @param initialMark the initial mark * @param initialMark the initial mark
*/ */
public AtomicMarkableReference(V initialRef, boolean initialMark) { public AtomicMarkableReference(V initialRef, boolean initialMark) {
atomicRef = new AtomicReference<ReferenceBooleanPair<V>> (new ReferenceBooleanPair<V>(initialRef, initialMark)); pair = Pair.of(initialRef, initialMark);
} }
/** /**
...@@ -76,7 +80,7 @@ public class AtomicMarkableReference<V> { ...@@ -76,7 +80,7 @@ public class AtomicMarkableReference<V> {
* @return the current value of the reference * @return the current value of the reference
*/ */
public V getReference() { public V getReference() {
return atomicRef.get().reference; return pair.reference;
} }
/** /**
...@@ -85,7 +89,7 @@ public class AtomicMarkableReference<V> { ...@@ -85,7 +89,7 @@ public class AtomicMarkableReference<V> {
* @return the current value of the mark * @return the current value of the mark
*/ */
public boolean isMarked() { public boolean isMarked() {
return atomicRef.get().bit; return pair.mark;
} }
/** /**
...@@ -97,9 +101,9 @@ public class AtomicMarkableReference<V> { ...@@ -97,9 +101,9 @@ public class AtomicMarkableReference<V> {
* @return the current value of the reference * @return the current value of the reference
*/ */
public V get(boolean[] markHolder) { public V get(boolean[] markHolder) {
ReferenceBooleanPair<V> p = atomicRef.get(); Pair<V> pair = this.pair;
markHolder[0] = p.bit; markHolder[0] = pair.mark;
return p.reference; return pair.reference;
} }
/** /**
...@@ -122,13 +126,8 @@ public class AtomicMarkableReference<V> { ...@@ -122,13 +126,8 @@ public class AtomicMarkableReference<V> {
V newReference, V newReference,
boolean expectedMark, boolean expectedMark,
boolean newMark) { boolean newMark) {
ReferenceBooleanPair<V> current = atomicRef.get(); return compareAndSet(expectedReference, newReference,
return expectedReference == current.reference && expectedMark, newMark);
expectedMark == current.bit &&
((newReference == current.reference && newMark == current.bit) ||
atomicRef.weakCompareAndSet(current,
new ReferenceBooleanPair<V>(newReference,
newMark)));
} }
/** /**
...@@ -147,13 +146,13 @@ public class AtomicMarkableReference<V> { ...@@ -147,13 +146,13 @@ public class AtomicMarkableReference<V> {
V newReference, V newReference,
boolean expectedMark, boolean expectedMark,
boolean newMark) { boolean newMark) {
ReferenceBooleanPair<V> current = atomicRef.get(); Pair<V> current = pair;
return expectedReference == current.reference && return
expectedMark == current.bit && expectedReference == current.reference &&
((newReference == current.reference && newMark == current.bit) || expectedMark == current.mark &&
atomicRef.compareAndSet(current, ((newReference == current.reference &&
new ReferenceBooleanPair<V>(newReference, newMark == current.mark) ||
newMark))); casPair(current, Pair.of(newReference, newMark)));
} }
/** /**
...@@ -163,9 +162,9 @@ public class AtomicMarkableReference<V> { ...@@ -163,9 +162,9 @@ public class AtomicMarkableReference<V> {
* @param newMark the new value for the mark * @param newMark the new value for the mark
*/ */
public void set(V newReference, boolean newMark) { public void set(V newReference, boolean newMark) {
ReferenceBooleanPair<V> current = atomicRef.get(); Pair<V> current = pair;
if (newReference != current.reference || newMark != current.bit) if (newReference != current.reference || newMark != current.mark)
atomicRef.set(new ReferenceBooleanPair<V>(newReference, newMark)); this.pair = Pair.of(newReference, newMark);
} }
/** /**
...@@ -182,11 +181,32 @@ public class AtomicMarkableReference<V> { ...@@ -182,11 +181,32 @@ public class AtomicMarkableReference<V> {
* @return true if successful * @return true if successful
*/ */
public boolean attemptMark(V expectedReference, boolean newMark) { public boolean attemptMark(V expectedReference, boolean newMark) {
ReferenceBooleanPair<V> current = atomicRef.get(); Pair<V> current = pair;
return expectedReference == current.reference && return
(newMark == current.bit || expectedReference == current.reference &&
atomicRef.compareAndSet (newMark == current.mark ||
(current, new ReferenceBooleanPair<V>(expectedReference, casPair(current, Pair.of(expectedReference, newMark)));
newMark))); }
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicMarkableReference.class);
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// Convert Exception to corresponding Error
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
} }
} }
...@@ -734,8 +734,8 @@ java/util/concurrent/locks/Lock/TimedAcquireLeak.java generic-all ...@@ -734,8 +734,8 @@ java/util/concurrent/locks/Lock/TimedAcquireLeak.java generic-all
# Fails on solaris-sparc -server (Set not equal to copy. 1) # Fails on solaris-sparc -server (Set not equal to copy. 1)
java/util/EnumSet/EnumSetBash.java solaris-sparc java/util/EnumSet/EnumSetBash.java solaris-sparc
# Need to be marked othervm, or changed to be samevm safe # Fails on solaris-sparc, see 7011857
java/util/WeakHashMap/GCDuringIteration.java generic-all java/util/concurrent/Phaser/FickleRegister.java solaris-sparc
############################################################################ ############################################################################
...@@ -53,7 +53,9 @@ public class IteratorWeakConsistency { ...@@ -53,7 +53,9 @@ public class IteratorWeakConsistency {
test(new LinkedTransferQueue()); test(new LinkedTransferQueue());
// Other concurrent queues (e.g. ArrayBlockingQueue) do not // Other concurrent queues (e.g. ArrayBlockingQueue) do not
// currently have weakly consistent iterators. // currently have weakly consistent iterators.
// test(new ArrayBlockingQueue(20)); // As of 2010-09, ArrayBlockingQueue passes this test, but
// does not fully implement weak consistency.
test(new ArrayBlockingQueue(20));
} }
void test(Queue q) { void test(Queue q) {
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
/* /*
* @test * @test
* @bug 6399443 * @bug 6399443
* @run main/othervm AutoShutdown
* @summary Check for auto-shutdown and gc of singleThreadExecutors * @summary Check for auto-shutdown and gc of singleThreadExecutors
* @author Martin Buchholz * @author Martin Buchholz
*/ */
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册