diff --git a/src/share/classes/java/util/Collections.java b/src/share/classes/java/util/Collections.java
index 08f8adbe5b283ff450c28f13aea03ea0d0d0f5e7..8be24796f7def0849dd073716ba9778edb861428 100644
--- a/src/share/classes/java/util/Collections.java
+++ b/src/share/classes/java/util/Collections.java
@@ -1452,10 +1452,10 @@ public class Collections {
* when o is a Map.Entry, and calls o.setValue.
*/
public boolean containsAll(Collection> coll) {
- Iterator> it = coll.iterator();
- while (it.hasNext())
- if (!contains(it.next())) // Invokes safe contains() above
+ for (Object e : coll) {
+ if (!contains(e)) // Invokes safe contains() above
return false;
+ }
return true;
}
public boolean equals(Object o) {
diff --git a/src/share/classes/java/util/LinkedList.java b/src/share/classes/java/util/LinkedList.java
index fbf8f5f92ce91284d2c5deb513d425a6515d287f..e4d0ddc0787010d5899fecce05ce819fdfd1821b 100644
--- a/src/share/classes/java/util/LinkedList.java
+++ b/src/share/classes/java/util/LinkedList.java
@@ -26,9 +26,9 @@
package java.util;
/**
- * Linked list implementation of the {@link List} and {@link Deque} interfaces.
- * Implements all optional operations, and permits all elements (including
- * {@code null}).
+ * Doubly-linked list implementation of the {@code List} and {@code Deque}
+ * interfaces. Implements all optional list operations, and permits all
+ * elements (including {@code null}).
*
*
All of the operations perform as could be expected for a doubly-linked
* list. Operations that index into the list will traverse the list from
@@ -249,7 +249,7 @@ public class LinkedList
* @return the last element in this list
* @throws NoSuchElementException if this list is empty
*/
- public E getLast() {
+ public E getLast() {
final Node l = last;
if (l == null)
throw new NoSuchElementException();
diff --git a/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java b/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
index 0690c8216e9a030afba8f339ae1d0c14fa495a3c..4f4b9d96af60a4b463f1b09fc0194af8b8dd4aac 100644
--- a/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
+++ b/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
@@ -49,14 +49,14 @@ import java.util.*;
* This is a classic "bounded buffer", in which a
* fixed-sized array holds elements inserted by producers and
* extracted by consumers. Once created, the capacity cannot be
- * increased. Attempts to put an element into a full queue
- * will result in the operation blocking; attempts to take an
+ * changed. Attempts to {@code put} an element into a full queue
+ * will result in the operation blocking; attempts to {@code take} an
* element from an empty queue will similarly block.
*
- *
This class supports an optional fairness policy for ordering
+ *
This class supports an optional fairness policy for ordering
* waiting producer and consumer threads. By default, this ordering
* is not guaranteed. However, a queue constructed with fairness set
- * to true grants threads access in FIFO order. Fairness
+ * to {@code true} grants threads access in FIFO order. Fairness
* generally decreases throughput but reduces variability and avoids
* starvation.
*
@@ -83,14 +83,17 @@ public class ArrayBlockingQueue extends AbstractQueue
*/
private static final long serialVersionUID = -817911632652898426L;
- /** The queued items */
- private final E[] items;
- /** items index for next take, poll or remove */
- private int takeIndex;
- /** items index for next put, offer, or add. */
- private int putIndex;
- /** Number of items in the queue */
- private int count;
+ /** The queued items */
+ final Object[] items;
+
+ /** items index for next take, poll, peek or remove */
+ int takeIndex;
+
+ /** items index for next put, offer, or add */
+ int putIndex;
+
+ /** Number of elements in the queue */
+ int count;
/*
* Concurrency control uses the classic two-condition algorithm
@@ -98,7 +101,7 @@ public class ArrayBlockingQueue extends AbstractQueue
*/
/** Main lock guarding all access */
- private final ReentrantLock lock;
+ final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
@@ -110,7 +113,36 @@ public class ArrayBlockingQueue extends AbstractQueue
* Circularly increment i.
*/
final int inc(int i) {
- return (++i == items.length)? 0 : i;
+ return (++i == items.length) ? 0 : i;
+ }
+
+ /**
+ * Circularly decrement i.
+ */
+ final int dec(int i) {
+ return ((i == 0) ? items.length : i) - 1;
+ }
+
+ @SuppressWarnings("unchecked")
+ static E cast(Object item) {
+ return (E) item;
+ }
+
+ /**
+ * Returns item at index i.
+ */
+ final E itemAt(int i) {
+ return this.cast(items[i]);
+ }
+
+ /**
+ * Throws NullPointerException if argument is null.
+ *
+ * @param v the element
+ */
+ private static void checkNotNull(Object v) {
+ if (v == null)
+ throw new NullPointerException();
}
/**
@@ -129,8 +161,8 @@ public class ArrayBlockingQueue extends AbstractQueue
* Call only when holding lock.
*/
private E extract() {
- final E[] items = this.items;
- E x = items[takeIndex];
+ final Object[] items = this.items;
+ E x = this.cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
@@ -139,11 +171,12 @@ public class ArrayBlockingQueue extends AbstractQueue
}
/**
- * Utility for remove and iterator.remove: Delete item at position i.
+ * Deletes item at position i.
+ * Utility for remove and iterator.remove.
* Call only when holding lock.
*/
void removeAt(int i) {
- final E[] items = this.items;
+ final Object[] items = this.items;
// if removing front item, just advance
if (i == takeIndex) {
items[takeIndex] = null;
@@ -167,69 +200,82 @@ public class ArrayBlockingQueue extends AbstractQueue
}
/**
- * Creates an ArrayBlockingQueue with the given (fixed)
+ * Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
- * @throws IllegalArgumentException if capacity is less than 1
+ * @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
- * Creates an ArrayBlockingQueue with the given (fixed)
+ * Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
- * @param fair if true then queue accesses for threads blocked
+ * @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
- * if false the access order is unspecified.
- * @throws IllegalArgumentException if capacity is less than 1
+ * if {@code false} the access order is unspecified.
+ * @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
- this.items = (E[]) new Object[capacity];
+ this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
- * Creates an ArrayBlockingQueue with the given (fixed)
+ * Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param capacity the capacity of this queue
- * @param fair if true then queue accesses for threads blocked
+ * @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
- * if false the access order is unspecified.
+ * if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
- * @throws IllegalArgumentException if capacity is less than
- * c.size(), or less than 1.
+ * @throws IllegalArgumentException if {@code capacity} is less than
+ * {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection extends E> c) {
this(capacity, fair);
- if (capacity < c.size())
- throw new IllegalArgumentException();
- for (E e : c)
- add(e);
+ final ReentrantLock lock = this.lock;
+ lock.lock(); // Lock only for visibility, not mutual exclusion
+ try {
+ int i = 0;
+ try {
+ for (E e : c) {
+ checkNotNull(e);
+ items[i++] = e;
+ }
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ throw new IllegalArgumentException();
+ }
+ count = i;
+ putIndex = (i == capacity) ? 0 : i;
+ } finally {
+ lock.unlock();
+ }
}
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
- * returning true upon success and throwing an
- * IllegalStateException if this queue is full.
+ * returning {@code true} upon success and throwing an
+ * {@code IllegalStateException} if this queue is full.
*
* @param e the element to add
- * @return true (as specified by {@link Collection#add})
+ * @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
*/
@@ -240,14 +286,14 @@ public class ArrayBlockingQueue extends AbstractQueue
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
- * returning true upon success and false if this queue
+ * returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
+ checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
@@ -270,18 +316,12 @@ public class ArrayBlockingQueue extends AbstractQueue
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- final E[] items = this.items;
+ checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- try {
- while (count == items.length)
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
+ while (count == items.length)
+ notFull.await();
insert(e);
} finally {
lock.unlock();
@@ -299,25 +339,18 @@ public class ArrayBlockingQueue extends AbstractQueue
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
- if (e == null) throw new NullPointerException();
+ checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- if (count != items.length) {
- insert(e);
- return true;
- }
+ while (count == items.length) {
if (nanos <= 0)
return false;
- try {
- nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
+ nanos = notFull.awaitNanos(nanos);
}
+ insert(e);
+ return true;
} finally {
lock.unlock();
}
@@ -327,10 +360,7 @@ public class ArrayBlockingQueue extends AbstractQueue
final ReentrantLock lock = this.lock;
lock.lock();
try {
- if (count == 0)
- return null;
- E x = extract();
- return x;
+ return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
@@ -340,15 +370,9 @@ public class ArrayBlockingQueue extends AbstractQueue
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- try {
- while (count == 0)
- notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- E x = extract();
- return x;
+ while (count == 0)
+ notEmpty.await();
+ return extract();
} finally {
lock.unlock();
}
@@ -359,21 +383,12 @@ public class ArrayBlockingQueue extends AbstractQueue
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- for (;;) {
- if (count != 0) {
- E x = extract();
- return x;
- }
+ while (count == 0) {
if (nanos <= 0)
return null;
- try {
- nanos = notEmpty.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
-
+ nanos = notEmpty.awaitNanos(nanos);
}
+ return extract();
} finally {
lock.unlock();
}
@@ -383,7 +398,7 @@ public class ArrayBlockingQueue extends AbstractQueue
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return (count == 0) ? null : items[takeIndex];
+ return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
@@ -412,10 +427,10 @@ public class ArrayBlockingQueue extends AbstractQueue
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
- * less the current size of this queue.
+ * less the current {@code size} of this queue.
*
* Note that you cannot always tell if an attempt to insert
- * an element will succeed by inspecting remainingCapacity
+ * an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
@@ -431,59 +446,56 @@ public class ArrayBlockingQueue extends AbstractQueue
/**
* Removes a single instance of the specified element from this queue,
- * if it is present. More formally, removes an element e such
- * that o.equals(e), if this queue contains one or more such
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
* elements.
- * Returns true if this queue contained the specified element
+ * Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
+ * Removal of interior elements in circular array based queues
+ * is an intrinsically slow and disruptive operation, so should
+ * be undertaken only in exceptional circumstances, ideally
+ * only when the queue is known not to be accessible by other
+ * threads.
+ *
* @param o element to be removed from this queue, if present
- * @return true if this queue changed as a result of the call
+ * @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- int i = takeIndex;
- int k = 0;
- for (;;) {
- if (k++ >= count)
- return false;
+ for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
- i = inc(i);
}
-
+ return false;
} finally {
lock.unlock();
}
}
/**
- * Returns true if this queue contains the specified element.
- * More formally, returns true if and only if this queue contains
- * at least one element e such that o.equals(e).
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
- * @return true if this queue contains the specified element
+ * @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- int i = takeIndex;
- int k = 0;
- while (k++ < count) {
+ for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
if (o.equals(items[i]))
return true;
- i = inc(i);
- }
return false;
} finally {
lock.unlock();
@@ -504,17 +516,14 @@ public class ArrayBlockingQueue extends AbstractQueue
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
+ final int count = this.count;
Object[] a = new Object[count];
- int k = 0;
- int i = takeIndex;
- while (k < count) {
- a[k++] = items[i];
- i = inc(i);
- }
+ for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
+ a[k] = items[i];
return a;
} finally {
lock.unlock();
@@ -531,22 +540,22 @@ public class ArrayBlockingQueue extends AbstractQueue
* If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
- * null.
+ * {@code null}.
*
*
Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
- *
Suppose x is a queue known to contain only strings.
+ *
Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
- * allocated array of String:
+ * allocated array of {@code String}:
*
*
* String[] y = x.toArray(new String[0]);
*
- * Note that toArray(new Object[0]) is identical in function to
- * toArray().
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
@@ -557,24 +566,20 @@ public class ArrayBlockingQueue extends AbstractQueue
* this queue
* @throws NullPointerException if the specified array is null
*/
+ @SuppressWarnings("unchecked")
public T[] toArray(T[] a) {
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- if (a.length < count)
+ final int count = this.count;
+ final int len = a.length;
+ if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
- a.getClass().getComponentType(),
- count
- );
-
- int k = 0;
- int i = takeIndex;
- while (k < count) {
- a[k++] = (T)items[i];
- i = inc(i);
- }
- if (a.length > count)
+ a.getClass().getComponentType(), count);
+ for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
+ a[k] = (T) items[i];
+ if (len > count)
a[count] = null;
return a;
} finally {
@@ -586,7 +591,19 @@ public class ArrayBlockingQueue extends AbstractQueue
final ReentrantLock lock = this.lock;
lock.lock();
try {
- return super.toString();
+ int k = count;
+ if (k == 0)
+ return "[]";
+
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (int i = takeIndex; ; i = inc(i)) {
+ Object e = items[i];
+ sb.append(e == this ? "(this Collection)" : e);
+ if (--k == 0)
+ return sb.append(']').toString();
+ sb.append(',').append(' ');
+ }
} finally {
lock.unlock();
}
@@ -597,16 +614,12 @@ public class ArrayBlockingQueue extends AbstractQueue
* The queue will be empty after this call returns.
*/
public void clear() {
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
- int i = takeIndex;
- int k = count;
- while (k-- > 0) {
+ for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
items[i] = null;
- i = inc(i);
- }
count = 0;
putIndex = 0;
takeIndex = 0;
@@ -623,11 +636,10 @@ public class ArrayBlockingQueue extends AbstractQueue
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection super E> c) {
- if (c == null)
- throw new NullPointerException();
+ checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
@@ -635,7 +647,7 @@ public class ArrayBlockingQueue extends AbstractQueue
int n = 0;
int max = count;
while (n < max) {
- c.add(items[i]);
+ c.add(this.cast(items[i]));
items[i] = null;
i = inc(i);
++n;
@@ -659,22 +671,20 @@ public class ArrayBlockingQueue extends AbstractQueue
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection super E> c, int maxElements) {
- if (c == null)
- throw new NullPointerException();
+ checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
- final E[] items = this.items;
+ final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int n = 0;
- int sz = count;
- int max = (maxElements < count)? maxElements : count;
+ int max = (maxElements < count) ? maxElements : count;
while (n < max) {
- c.add(items[i]);
+ c.add(this.cast(items[i]));
items[i] = null;
i = inc(i);
++n;
@@ -690,11 +700,13 @@ public class ArrayBlockingQueue extends AbstractQueue
}
}
-
/**
* Returns an iterator over the elements in this queue in proper sequence.
- * The returned Iterator is a "weakly consistent" iterator that
- * will never throw {@link ConcurrentModificationException},
+ * The elements will be returned in order from first (head) to last (tail).
+ *
+ * The returned {@code Iterator} is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
@@ -702,83 +714,65 @@ public class ArrayBlockingQueue extends AbstractQueue
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator iterator() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return new Itr();
- } finally {
- lock.unlock();
- }
+ return new Itr();
}
/**
- * Iterator for ArrayBlockingQueue
+ * Iterator for ArrayBlockingQueue. To maintain weak consistency
+ * with respect to puts and takes, we (1) read ahead one slot, so
+ * as to not report hasNext true but then not have an element to
+ * return -- however we later recheck this slot to use the most
+ * current value; (2) ensure that each array slot is traversed at
+ * most once (by tracking "remaining" elements); (3) skip over
+ * null slots, which can occur if takes race ahead of iterators.
+ * However, for circular array-based queues, we cannot rely on any
+ * well established definition of what it means to be weakly
+ * consistent with respect to interior removes since these may
+ * require slot overwrites in the process of sliding elements to
+ * cover gaps. So we settle for resiliency, operating on
+ * established apparent nexts, which may miss some elements that
+ * have moved between calls to next.
*/
private class Itr implements Iterator {
- /**
- * Index of element to be returned by next,
- * or a negative number if no such.
- */
- private int nextIndex;
-
- /**
- * nextItem holds on to item fields because once we claim
- * that an element exists in hasNext(), we must return it in
- * the following next() call even if it was in the process of
- * being removed when hasNext() was called.
- */
- private E nextItem;
-
- /**
- * Index of element returned by most recent call to next.
- * Reset to -1 if this element is deleted by a call to remove.
- */
- private int lastRet;
+ private int remaining; // Number of elements yet to be returned
+ private int nextIndex; // Index of element to be returned by next
+ private E nextItem; // Element to be returned by next call to next
+ private E lastItem; // Element returned by last call to next
+ private int lastRet; // Index of last element returned, or -1 if none
Itr() {
- lastRet = -1;
- if (count == 0)
- nextIndex = -1;
- else {
- nextIndex = takeIndex;
- nextItem = items[takeIndex];
+ final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+ lock.lock();
+ try {
+ lastRet = -1;
+ if ((remaining = count) > 0)
+ nextItem = itemAt(nextIndex = takeIndex);
+ } finally {
+ lock.unlock();
}
}
public boolean hasNext() {
- /*
- * No sync. We can return true by mistake here
- * only if this iterator passed across threads,
- * which we don't support anyway.
- */
- return nextIndex >= 0;
- }
-
- /**
- * Checks whether nextIndex is valid; if so setting nextItem.
- * Stops iterator when either hits putIndex or sees null item.
- */
- private void checkNext() {
- if (nextIndex == putIndex) {
- nextIndex = -1;
- nextItem = null;
- } else {
- nextItem = items[nextIndex];
- if (nextItem == null)
- nextIndex = -1;
- }
+ return remaining > 0;
}
public E next() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
- if (nextIndex < 0)
+ if (remaining <= 0)
throw new NoSuchElementException();
lastRet = nextIndex;
- E x = nextItem;
- nextIndex = inc(nextIndex);
- checkNext();
+ E x = itemAt(nextIndex); // check for fresher value
+ if (x == null) {
+ x = nextItem; // we are forced to report old value
+ lastItem = null; // but ensure remove fails
+ }
+ else
+ lastItem = x;
+ while (--remaining > 0 && // skip over nulls
+ (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
+ ;
return x;
} finally {
lock.unlock();
@@ -793,15 +787,19 @@ public class ArrayBlockingQueue extends AbstractQueue
if (i == -1)
throw new IllegalStateException();
lastRet = -1;
-
- int ti = takeIndex;
- removeAt(i);
- // back up cursor (reset to front if was first element)
- nextIndex = (i == ti) ? takeIndex : i;
- checkNext();
+ E x = lastItem;
+ lastItem = null;
+ // only remove if item still at index
+ if (x != null && x == items[i]) {
+ boolean removingHead = (i == takeIndex);
+ removeAt(i);
+ if (!removingHead)
+ nextIndex = dec(nextIndex);
+ }
} finally {
lock.unlock();
}
}
}
+
}
diff --git a/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java b/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java
index 72133fedad7691db6cd64c944b17251fe3156bb0..2158084f9f9c1053b3068e6cbfd7cac01b92b2cf 100644
--- a/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java
+++ b/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java
@@ -869,6 +869,8 @@ public class ConcurrentLinkedDeque
/**
* Inserts the specified element at the front of this deque.
+ * As the deque is unbounded, this method will never throw
+ * {@link IllegalStateException}.
*
* @throws NullPointerException if the specified element is null
*/
@@ -878,6 +880,8 @@ public class ConcurrentLinkedDeque
/**
* Inserts the specified element at the end of this deque.
+ * As the deque is unbounded, this method will never throw
+ * {@link IllegalStateException}.
*
* This method is equivalent to {@link #add}.
*
@@ -889,8 +893,9 @@ public class ConcurrentLinkedDeque
/**
* Inserts the specified element at the front of this deque.
+ * As the deque is unbounded, this method will never return {@code false}.
*
- * @return {@code true} always
+ * @return {@code true} (as specified by {@link Deque#offerFirst})
* @throws NullPointerException if the specified element is null
*/
public boolean offerFirst(E e) {
@@ -900,10 +905,11 @@ public class ConcurrentLinkedDeque
/**
* Inserts the specified element at the end of this deque.
+ * As the deque is unbounded, this method will never return {@code false}.
*
* This method is equivalent to {@link #add}.
*
- * @return {@code true} always
+ * @return {@code true} (as specified by {@link Deque#offerLast})
* @throws NullPointerException if the specified element is null
*/
public boolean offerLast(E e) {
@@ -983,6 +989,7 @@ public class ConcurrentLinkedDeque
/**
* Inserts the specified element at the tail of this deque.
+ * As the deque is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
@@ -993,6 +1000,8 @@ public class ConcurrentLinkedDeque
/**
* Inserts the specified element at the tail of this deque.
+ * As the deque is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
diff --git a/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java b/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
index 6ff1b8a5119b9b25c837f4856424647ed95c9a83..b7beda274daec21bfe78a2e8042e144569477f57 100644
--- a/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
+++ b/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
@@ -269,6 +269,8 @@ public class ConcurrentLinkedQueue extends AbstractQueue
/**
* Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never throw
+ * {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
@@ -298,6 +300,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue
/**
* Inserts the specified element at the tail of this queue.
+ * As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
diff --git a/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java b/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java
index 4e0457f2877edac41980b4481b393e6b30e6b648..64d28cf60408f36f59e1f1bdc8c3ef6fcb7d025c 100644
--- a/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java
+++ b/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java
@@ -374,17 +374,11 @@ public class ConcurrentSkipListMap extends AbstractMap
null, null, 1);
}
- /** Updater for casHead */
- private static final
- AtomicReferenceFieldUpdater
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (ConcurrentSkipListMap.class, HeadIndex.class, "head");
-
/**
* compareAndSet head node
*/
private boolean casHead(HeadIndex cmp, HeadIndex val) {
- return headUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
/* ---------------- Nodes -------------- */
@@ -423,28 +417,18 @@ public class ConcurrentSkipListMap extends AbstractMap
this.next = next;
}
- /** Updater for casNext */
- static final AtomicReferenceFieldUpdater
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (Node.class, Node.class, "next");
-
- /** Updater for casValue */
- static final AtomicReferenceFieldUpdater
- valueUpdater = AtomicReferenceFieldUpdater.newUpdater
- (Node.class, Object.class, "value");
-
/**
* compareAndSet value field
*/
boolean casValue(Object cmp, Object val) {
- return valueUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
/**
* compareAndSet next field
*/
boolean casNext(Node cmp, Node val) {
- return nextUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
@@ -522,6 +506,14 @@ public class ConcurrentSkipListMap extends AbstractMap
return null;
return new AbstractMap.SimpleImmutableEntry(key, v);
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long valueOffset =
+ objectFieldOffset(UNSAFE, "value", Node.class);
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", Node.class);
+
}
/* ---------------- Indexing -------------- */
@@ -547,16 +539,11 @@ public class ConcurrentSkipListMap extends AbstractMap
this.right = right;
}
- /** Updater for casRight */
- static final AtomicReferenceFieldUpdater
- rightUpdater = AtomicReferenceFieldUpdater.newUpdater
- (Index.class, Index.class, "right");
-
/**
* compareAndSet right field
*/
final boolean casRight(Index cmp, Index val) {
- return rightUpdater.compareAndSet(this, cmp, val);
+ return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
/**
@@ -591,6 +578,12 @@ public class ConcurrentSkipListMap extends AbstractMap
final boolean unlink(Index succ) {
return !indexesDeletedNode() && casRight(succ, succ.right);
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long rightOffset =
+ objectFieldOffset(UNSAFE, "right", Index.class);
+
}
/* ---------------- Head nodes -------------- */
@@ -640,7 +633,8 @@ public class ConcurrentSkipListMap extends AbstractMap
* cast key as Comparable, which may cause ClassCastException,
* which is propagated back to caller.
*/
- private Comparable super K> comparable(Object key) throws ClassCastException {
+ private Comparable super K> comparable(Object key)
+ throws ClassCastException {
if (key == null)
throw new NullPointerException();
if (comparator != null)
@@ -799,68 +793,12 @@ public class ConcurrentSkipListMap extends AbstractMap
}
/**
- * Specialized variant of findNode to perform Map.get. Does a weak
- * traversal, not bothering to fix any deleted index nodes,
- * returning early if it happens to see key in index, and passing
- * over any deleted base nodes, falling back to getUsingFindNode
- * only if it would otherwise return value from an ongoing
- * deletion. Also uses "bound" to eliminate need for some
- * comparisons (see Pugh Cookbook). Also folds uses of null checks
- * and node-skipping because markers have null keys.
+ * Gets value for key using findNode.
* @param okey the key
* @return the value, or null if absent
*/
private V doGet(Object okey) {
Comparable super K> key = comparable(okey);
- Node bound = null;
- Index q = head;
- Index r = q.right;
- Node n;
- K k;
- int c;
- for (;;) {
- Index d;
- // Traverse rights
- if (r != null && (n = r.node) != bound && (k = n.key) != null) {
- if ((c = key.compareTo(k)) > 0) {
- q = r;
- r = r.right;
- continue;
- } else if (c == 0) {
- Object v = n.value;
- return (v != null)? (V)v : getUsingFindNode(key);
- } else
- bound = n;
- }
-
- // Traverse down
- if ((d = q.down) != null) {
- q = d;
- r = d.right;
- } else
- break;
- }
-
- // Traverse nexts
- for (n = q.node.next; n != null; n = n.next) {
- if ((k = n.key) != null) {
- if ((c = key.compareTo(k)) == 0) {
- Object v = n.value;
- return (v != null)? (V)v : getUsingFindNode(key);
- } else if (c < 0)
- break;
- }
- }
- return null;
- }
-
- /**
- * Performs map.get via findNode. Used as a backup if doGet
- * encounters an in-progress deletion.
- * @param key the key
- * @return the value, or null if absent
- */
- private V getUsingFindNode(Comparable super K> key) {
/*
* Loop needed here and elsewhere in case value field goes
* null just as it is about to be returned, in which case we
@@ -943,7 +881,7 @@ public class ConcurrentSkipListMap extends AbstractMap
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
- if ((x & 0x8001) != 0) // test highest and lowest bits
+ if ((x & 0x80000001) != 0) // test highest and lowest bits
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
@@ -1256,7 +1194,7 @@ public class ConcurrentSkipListMap extends AbstractMap
Node n = b.next;
for (;;) {
if (n == null)
- return (b.isBaseHeader())? null : b;
+ return b.isBaseHeader() ? null : b;
Node f = n.next; // inconsistent read
if (n != b.next)
break;
@@ -1374,7 +1312,7 @@ public class ConcurrentSkipListMap extends AbstractMap
Node n = b.next;
for (;;) {
if (n == null)
- return ((rel & LT) == 0 || b.isBaseHeader())? null : b;
+ return ((rel & LT) == 0 || b.isBaseHeader()) ? null : b;
Node f = n.next;
if (n != b.next) // inconsistent read
break;
@@ -1390,7 +1328,7 @@ public class ConcurrentSkipListMap extends AbstractMap
(c < 0 && (rel & LT) == 0))
return n;
if ( c <= 0 && (rel & LT) != 0)
- return (b.isBaseHeader())? null : b;
+ return b.isBaseHeader() ? null : b;
b = n;
n = f;
}
@@ -1744,7 +1682,7 @@ public class ConcurrentSkipListMap extends AbstractMap
if (n.getValidValue() != null)
++count;
}
- return (count >= Integer.MAX_VALUE)? Integer.MAX_VALUE : (int)count;
+ return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
/**
@@ -2099,7 +2037,7 @@ public class ConcurrentSkipListMap extends AbstractMap
*/
public K lowerKey(K key) {
Node n = findNear(key, LT);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2123,7 +2061,7 @@ public class ConcurrentSkipListMap extends AbstractMap
*/
public K floorKey(K key) {
Node n = findNear(key, LT|EQ);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2145,7 +2083,7 @@ public class ConcurrentSkipListMap extends AbstractMap
*/
public K ceilingKey(K key) {
Node n = findNear(key, GT|EQ);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2169,7 +2107,7 @@ public class ConcurrentSkipListMap extends AbstractMap
*/
public K higherKey(K key) {
Node n = findNear(key, GT);
- return (n == null)? null : n.key;
+ return (n == null) ? null : n.key;
}
/**
@@ -2342,7 +2280,8 @@ public class ConcurrentSkipListMap extends AbstractMap
return list;
}
- static final class KeySet extends AbstractSet implements NavigableSet {
+ static final class KeySet
+ extends AbstractSet implements NavigableSet {
private final ConcurrentNavigableMap m;
KeySet(ConcurrentNavigableMap map) { m = map; }
public int size() { return m.size(); }
@@ -2359,11 +2298,11 @@ public class ConcurrentSkipListMap extends AbstractMap
public E last() { return m.lastKey(); }
public E pollFirst() {
Map.Entry e = m.pollFirstEntry();
- return e == null? null : e.getKey();
+ return (e == null) ? null : e.getKey();
}
public E pollLast() {
Map.Entry e = m.pollLastEntry();
- return e == null? null : e.getKey();
+ return (e == null) ? null : e.getKey();
}
public Iterator iterator() {
if (m instanceof ConcurrentSkipListMap)
@@ -2710,9 +2649,9 @@ public class ConcurrentSkipListMap extends AbstractMap
rel &= ~m.LT;
}
if (tooLow(key))
- return ((rel & m.LT) != 0)? null : lowestEntry();
+ return ((rel & m.LT) != 0) ? null : lowestEntry();
if (tooHigh(key))
- return ((rel & m.LT) != 0)? highestEntry() : null;
+ return ((rel & m.LT) != 0) ? highestEntry() : null;
for (;;) {
Node n = m.findNear(key, rel);
if (n == null || !inBounds(n.key))
@@ -2783,7 +2722,7 @@ public class ConcurrentSkipListMap extends AbstractMap
public V remove(Object key) {
K k = (K)key;
- return (!inBounds(k))? null : m.remove(k);
+ return (!inBounds(k)) ? null : m.remove(k);
}
public int size() {
@@ -2794,7 +2733,7 @@ public class ConcurrentSkipListMap extends AbstractMap
if (n.getValidValue() != null)
++count;
}
- return count >= Integer.MAX_VALUE? Integer.MAX_VALUE : (int)count;
+ return count >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)count;
}
public boolean isEmpty() {
@@ -2972,27 +2911,27 @@ public class ConcurrentSkipListMap extends AbstractMap
}
public K firstKey() {
- return isDescending? highestKey() : lowestKey();
+ return isDescending ? highestKey() : lowestKey();
}
public K lastKey() {
- return isDescending? lowestKey() : highestKey();
+ return isDescending ? lowestKey() : highestKey();
}
public Map.Entry firstEntry() {
- return isDescending? highestEntry() : lowestEntry();
+ return isDescending ? highestEntry() : lowestEntry();
}
public Map.Entry lastEntry() {
- return isDescending? lowestEntry() : highestEntry();
+ return isDescending ? lowestEntry() : highestEntry();
}
public Map.Entry pollFirstEntry() {
- return isDescending? removeHighest() : removeLowest();
+ return isDescending ? removeHighest() : removeLowest();
}
public Map.Entry pollLastEntry() {
- return isDescending? removeLowest() : removeHighest();
+ return isDescending ? removeLowest() : removeHighest();
}
/* ---------------- Submap Views -------------- */
@@ -3141,4 +3080,22 @@ public class ConcurrentSkipListMap extends AbstractMap
}
}
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", ConcurrentSkipListMap.class);
+
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
}
diff --git a/src/share/classes/java/util/concurrent/CopyOnWriteArrayList.java b/src/share/classes/java/util/concurrent/CopyOnWriteArrayList.java
index 465426ff8231b551123e3a74e0e08e5ada11443f..8c5193538fae26b24131730f74c3f7b6d20ad503 100644
--- a/src/share/classes/java/util/concurrent/CopyOnWriteArrayList.java
+++ b/src/share/classes/java/util/concurrent/CopyOnWriteArrayList.java
@@ -832,7 +832,7 @@ public class CopyOnWriteArrayList
}
/**
- * Save the state of the list to a stream (i.e., serialize it).
+ * Saves the state of the list to a stream (that is, serializes it).
*
* @serialData The length of the array backing the list is emitted
* (int), followed by all of its elements (each an Object)
@@ -842,27 +842,25 @@ public class CopyOnWriteArrayList
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException{
- // Write out element count, and any hidden stuff
s.defaultWriteObject();
Object[] elements = getArray();
- int len = elements.length;
// Write out array length
- s.writeInt(len);
+ s.writeInt(elements.length);
// Write out all elements in the proper order.
- for (int i = 0; i < len; i++)
- s.writeObject(elements[i]);
+ for (Object element : elements)
+ s.writeObject(element);
}
/**
- * Reconstitute the list from a stream (i.e., deserialize it).
+ * Reconstitutes the list from a stream (that is, deserializes it).
+ *
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
- // Read in size, and any hidden stuff
s.defaultReadObject();
// bind to new lock
diff --git a/src/share/classes/java/util/concurrent/ForkJoinPool.java b/src/share/classes/java/util/concurrent/ForkJoinPool.java
index 263552a93ea544b7220a289f72f3355c6b7c1ddc..22938fe4ba90cb846ccac1fa5d81ee7e2137652f 100644
--- a/src/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -525,8 +525,8 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private volatile long eventWaiters;
- private static final int EVENT_COUNT_SHIFT = 32;
- private static final long WAITER_ID_MASK = (1L << 16) - 1L;
+ private static final int EVENT_COUNT_SHIFT = 32;
+ private static final int WAITER_ID_MASK = (1 << 16) - 1;
/**
* A counter for events that may wake up worker threads:
@@ -615,7 +615,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// are usually manually inlined by callers
/**
- * Increments running count part of workerCounts
+ * Increments running count part of workerCounts.
*/
final void incrementRunningCount() {
int c;
@@ -625,7 +625,17 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Tries to decrement running count unless already zero
+ * Tries to increment running count part of workerCounts.
+ */
+ final boolean tryIncrementRunningCount() {
+ int c;
+ return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ c = workerCounts,
+ c + ONE_RUNNING);
+ }
+
+ /**
+ * Tries to decrement running count unless already zero.
*/
final boolean tryDecrementRunningCount() {
int wc = workerCounts;
@@ -698,10 +708,11 @@ public class ForkJoinPool extends AbstractExecutorService {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
- ws = Arrays.copyOf(ws, n << 1);
+ ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
- workers = ws; // volatile array write ensures slot visibility
+ int c = eventCount; // advance event count to ensure visibility
+ UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
} finally {
lock.unlock();
}
@@ -734,7 +745,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
- decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
+ decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
@@ -746,24 +757,23 @@ public class ForkJoinPool extends AbstractExecutorService {
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or
- * upon releasing two workers, letting others take over.
+ * upon releasing four workers, letting others take over.
*/
private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
long h = eventWaiters;
int ec = eventCount;
- boolean releasedOne = false;
+ int releases = 4;
ForkJoinWorkerThread w; int id;
- while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+ while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
(int)(h >>> EVENT_COUNT_SHIFT) != ec &&
id < n && (w = ws[id]) != null) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
h, w.nextWaiter)) {
LockSupport.unpark(w);
- if (releasedOne) // exit on second release
+ if (--releases == 0)
break;
- releasedOne = true;
}
if (eventCount != ec)
break;
@@ -793,7 +803,7 @@ public class ForkJoinPool extends AbstractExecutorService {
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
- (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
+ (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
(int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
@@ -820,9 +830,9 @@ public class ForkJoinPool extends AbstractExecutorService {
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
- long startTime = untimed? 0 : System.nanoTime();
+ long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != ec || w.isTerminating())
+ if (w.isTerminating() || eventCount != ec)
break; // recheck after clear
if (untimed)
LockSupport.park(w);
@@ -860,7 +870,8 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((sw = spareWaiters) != 0 &&
(id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null &&
- (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
+ (runState >= TERMINATING ||
+ (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
spareWaiters == sw &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
sw, w.nextSpare)) {
@@ -914,12 +925,8 @@ public class ForkJoinPool extends AbstractExecutorService {
break;
}
w.start(recordWorker(w), ueh);
- if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
- int c; // advance event count
- UNSAFE.compareAndSwapInt(this, eventCountOffset,
- c = eventCount, c+1);
+ if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
break; // add at most one unless total below target
- }
}
}
if (eventWaiters != 0L)
@@ -955,7 +962,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else if ((h = eventWaiters) != 0L) {
long nh;
- int id = ((int)(h & WAITER_ID_MASK)) - 1;
+ int id = (((int)h) & WAITER_ID_MASK) - 1;
if (id >= 0 && id < n && (w = ws[id]) != null &&
(nh = w.nextWaiter) != 0L && // keep at least one worker
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
@@ -1003,24 +1010,31 @@ public class ForkJoinPool extends AbstractExecutorService {
int pc = parallelism;
while (w.runState == 0) {
int rs = runState;
- if (rs >= TERMINATING) { // propagate shutdown
+ if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
- UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
+ UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
inactivate = active = w.active = false;
- int wc = workerCounts;
+ if (rs == SHUTDOWN) { // all inactive and shut down
+ tryTerminate(false);
+ continue;
+ }
+ }
+ int wc = workerCounts; // try to suspend as spare
if ((wc & RUNNING_COUNT_MASK) > pc) {
if (!(inactivate |= active) && // must inactivate to suspend
- workerCounts == wc && // try to suspend as spare
+ workerCounts == wc &&
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - ONE_RUNNING))
w.suspendAsSpare();
}
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
helpMaintainParallelism(); // not enough workers
- else if (!ran) {
+ else if (ran)
+ break;
+ else {
long h = eventWaiters;
int ec = eventCount;
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
@@ -1032,8 +1046,6 @@ public class ForkJoinPool extends AbstractExecutorService {
else if (!(inactivate |= active))
eventSync(w, wec); // must inactivate before sync
}
- else
- break;
}
}
@@ -1043,35 +1055,67 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param joinMe the task to join
* @param worker the current worker thread
+ * @param timed true if wait should time out
+ * @param nanos timeout value if timed
*/
- final void awaitJoin(ForkJoinTask> joinMe, ForkJoinWorkerThread worker) {
+ final void awaitJoin(ForkJoinTask> joinMe, ForkJoinWorkerThread worker,
+ boolean timed, long nanos) {
+ long startTime = timed ? System.nanoTime() : 0L;
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
+ boolean running = true; // false when count decremented
while (joinMe.status >= 0) {
- int wc;
- worker.helpJoinTask(joinMe);
+ if (runState >= TERMINATING) {
+ joinMe.cancelIgnoringExceptions();
+ break;
+ }
+ running = worker.helpJoinTask(joinMe, running);
if (joinMe.status < 0)
break;
- else if (retries > 0)
+ if (retries > 0) {
--retries;
- else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING)) {
- int stat, c; long h;
- while ((stat = joinMe.status) >= 0 &&
- (h = eventWaiters) != 0L && // help release others
- (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+ continue;
+ }
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) != 0) {
+ if (running) {
+ if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING))
+ continue;
+ running = false;
+ }
+ long h = eventWaiters;
+ if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
releaseEventWaiters();
- if (stat >= 0 &&
- ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
- (stat =
- joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
- helpMaintainParallelism(); // timeout or no running workers
- do {} while (!UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- c = workerCounts, c + ONE_RUNNING));
- if (stat < 0)
- break; // else restart
+ if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
+ long ms; int ns;
+ if (!timed) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else { // at most JOIN_TIMEOUT_MILLIS per wait
+ long nt = nanos - (System.nanoTime() - startTime);
+ if (nt <= 0L)
+ break;
+ ms = nt / 1000000;
+ if (ms > JOIN_TIMEOUT_MILLIS) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else
+ ns = (int) (nt % 1000000);
+ }
+ joinMe.internalAwaitDone(ms, ns);
+ }
+ if (joinMe.status < 0)
+ break;
}
+ helpMaintainParallelism();
+ }
+ if (!running) {
+ int c;
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
}
}
@@ -1082,9 +1126,10 @@ public class ForkJoinPool extends AbstractExecutorService {
throws InterruptedException {
while (!blocker.isReleasable()) {
int wc = workerCounts;
- if ((wc & RUNNING_COUNT_MASK) != 0 &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING)) {
+ if ((wc & RUNNING_COUNT_MASK) == 0)
+ helpMaintainParallelism();
+ else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING)) {
try {
while (!blocker.isReleasable()) {
long h = eventWaiters;
@@ -1129,12 +1174,11 @@ public class ForkJoinPool extends AbstractExecutorService {
// Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED);
- termination.arrive();
+ termination.forceTermination();
}
return true;
}
-
/**
* Actions on transition to TERMINATING
*
@@ -1325,17 +1369,13 @@ public class ForkJoinPool extends AbstractExecutorService {
// Execution methods
/**
- * Common code for execute, invoke and submit
+ * Submits task and creates, starts, or resumes some workers if necessary
*/
private void doSubmit(ForkJoinTask task) {
- if (task == null)
- throw new NullPointerException();
- if (runState >= SHUTDOWN)
- throw new RejectedExecutionException();
submissionQueue.offer(task);
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
- helpMaintainParallelism(); // create, start, or resume some workers
+ helpMaintainParallelism();
}
/**
@@ -1348,8 +1388,33 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public T invoke(ForkJoinTask task) {
- doSubmit(task);
- return task.join();
+ if (task == null)
+ throw new NullPointerException();
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ return task.invoke(); // bypass submit if in same pool
+ else {
+ doSubmit(task);
+ return task.join();
+ }
+ }
+
+ /**
+ * Unless terminating, forks task if within an ongoing FJ
+ * computation in the current pool, else submits as external task.
+ */
+ private void forkOrSubmit(ForkJoinTask task) {
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ task.fork();
+ else
+ doSubmit(task);
}
/**
@@ -1361,7 +1426,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public void execute(ForkJoinTask> task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
}
// AbstractExecutorService methods
@@ -1372,12 +1439,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public void execute(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
job = (ForkJoinTask>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
}
/**
@@ -1390,7 +1459,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public ForkJoinTask submit(ForkJoinTask task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
return task;
}
@@ -1400,8 +1471,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public ForkJoinTask submit(Callable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask job = ForkJoinTask.adapt(task);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1411,8 +1484,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public ForkJoinTask submit(Runnable task, T result) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask job = ForkJoinTask.adapt(task, result);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1422,12 +1497,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public ForkJoinTask> submit(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
job = (ForkJoinTask>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1725,8 +1802,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
- * ignored or suppressed interruption, causing this executor not
- * to properly terminate.
+ * ignored or suppressed interruption, or are waiting for IO,
+ * causing this executor not to properly terminate. (See the
+ * advisory notes for class {@link ForkJoinTask} stating that
+ * tasks should not normally entail blocking operations. But if
+ * they do, they must abort them on interrupt.)
*
* @return {@code true} if terminating but not yet terminated
*/
@@ -1764,10 +1844,11 @@ public class ForkJoinPool extends AbstractExecutorService {
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
try {
- return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
+ termination.awaitAdvanceInterruptibly(0, timeout, unit);
} catch (TimeoutException ex) {
return false;
}
+ return true;
}
/**
diff --git a/src/share/classes/java/util/concurrent/ForkJoinTask.java b/src/share/classes/java/util/concurrent/ForkJoinTask.java
index cd18360f83da6f3e4a5e2ba725975c4bc663514a..b02323ffd6dc27ec7e11e4f075f1a6ecfb7bb92f 100644
--- a/src/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -42,6 +42,16 @@ import java.util.List;
import java.util.RandomAccess;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
@@ -129,6 +139,16 @@ import java.util.WeakHashMap;
* result in exceptions or errors, possibly including
* {@code ClassCastException}.
*
+ * Method {@link #join} and its variants are appropriate for use
+ * only when completion dependencies are acyclic; that is, the
+ * parallel computation can be described as a directed acyclic graph
+ * (DAG). Otherwise, executions may encounter a form of deadlock as
+ * tasks cyclically wait for each other. However, this framework
+ * supports other methods and techniques (for example the use of
+ * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
+ * may be of use in constructing custom subclasses for problems that
+ * are not statically structured as DAGs.
+ *
*
Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
* underlying lightweight task scheduling framework. Developers
@@ -143,9 +163,10 @@ import java.util.WeakHashMap;
* computation. Large tasks should be split into smaller subtasks,
* usually via recursive decomposition. As a very rough rule of thumb,
* a task should perform more than 100 and less than 10000 basic
- * computational steps. If tasks are too big, then parallelism cannot
- * improve throughput. If too small, then memory and internal task
- * maintenance overhead may overwhelm processing.
+ * computational steps, and should avoid indefinite looping. If tasks
+ * are too big, then parallelism cannot improve throughput. If too
+ * small, then memory and internal task maintenance overhead may
+ * overwhelm processing.
*
*
This class provides {@code adapt} methods for {@link Runnable}
* and {@link Callable}, that may be of use when mixing execution of
@@ -241,66 +262,84 @@ public abstract class ForkJoinTask implements Future, Serializable {
setCompletion(EXCEPTIONAL);
}
- /**
- * Blocks a worker thread until completion. Called only by
- * pool. Currently unused -- pool-based waits use timeout
- * version below.
- */
- final void internalAwaitDone() {
- int s; // the odd construction reduces lock bias effects
- while ((s = status) >= 0) {
- try {
- synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait();
- }
- } catch (InterruptedException ie) {
- cancelIfTerminating();
- }
- }
- }
-
/**
* Blocks a worker thread until completed or timed out. Called
* only by pool.
- *
- * @return status on exit
*/
- final int internalAwaitDone(long millis) {
- int s;
- if ((s = status) >= 0) {
- try {
+ final void internalAwaitDone(long millis, int nanos) {
+ int s = status;
+ if ((s == 0 &&
+ UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL)) ||
+ s > 0) {
+ try { // the odd construction reduces lock bias effects
synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
- wait(millis, 0);
+ if (status > 0)
+ wait(millis, nanos);
+ else
+ notifyAll();
}
} catch (InterruptedException ie) {
cancelIfTerminating();
}
- s = status;
}
- return s;
}
/**
* Blocks a non-worker-thread until completion.
*/
private void externalAwaitDone() {
- int s;
- while ((s = status) >= 0) {
+ if (status >= 0) {
+ boolean interrupted = false;
synchronized (this) {
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
- boolean interrupted = false;
- while (status >= 0) {
+ for (;;) {
+ int s = status;
+ if (s == 0)
+ UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL);
+ else if (s < 0) {
+ notifyAll();
+ break;
+ }
+ else {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
- if (interrupted)
- Thread.currentThread().interrupt();
- break;
+ }
+ }
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Blocks a non-worker-thread until completion or interruption or timeout.
+ */
+ private void externalInterruptibleAwaitDone(boolean timed, long nanos)
+ throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ if (status >= 0) {
+ long startTime = timed ? System.nanoTime() : 0L;
+ synchronized (this) {
+ for (;;) {
+ long nt;
+ int s = status;
+ if (s == 0)
+ UNSAFE.compareAndSwapInt(this, statusOffset,
+ 0, SIGNAL);
+ else if (s < 0) {
+ notifyAll();
+ break;
+ }
+ else if (!timed)
+ wait();
+ else if ((nt = nanos - (System.nanoTime()-startTime)) > 0L)
+ wait(nt / 1000000, (int)(nt % 1000000));
+ else
+ break;
}
}
}
@@ -335,7 +374,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* #isDone} returning {@code true}.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -349,10 +388,13 @@ public abstract class ForkJoinTask implements Future, Serializable {
}
/**
- * Returns the result of the computation when it {@link #isDone is done}.
- * This method differs from {@link #get()} in that
+ * Returns the result of the computation when it {@link #isDone is
+ * done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
- * {@code Error}, not {@code ExecutionException}.
+ * {@code Error}, not {@code ExecutionException}, and that
+ * interrupts of the calling thread do not cause the
+ * method to abruptly return by throwing {@code
+ * InterruptedException}.
*
* @return the computed result
*/
@@ -394,7 +436,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* unprocessed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -422,7 +464,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* normally or exceptionally, or left unprocessed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -477,7 +519,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* unprocessed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -529,25 +571,28 @@ public abstract class ForkJoinTask implements Future, Serializable {
/**
* Attempts to cancel execution of this task. This attempt will
- * fail if the task has already completed, has already been
- * cancelled, or could not be cancelled for some other reason. If
- * successful, and this task has not started when cancel is
- * called, execution of this task is suppressed, {@link
- * #isCancelled} will report true, and {@link #join} will result
- * in a {@code CancellationException} being thrown.
+ * fail if the task has already completed or could not be
+ * cancelled for some other reason. If successful, and this task
+ * has not started when {@code cancel} is called, execution of
+ * this task is suppressed. After this method returns
+ * successfully, unless there is an intervening call to {@link
+ * #reinitialize}, subsequent calls to {@link #isCancelled},
+ * {@link #isDone}, and {@code cancel} will return {@code true}
+ * and calls to {@link #join} and related methods will result in
+ * {@code CancellationException}.
*
* This method may be overridden in subclasses, but if so, must
- * still ensure that these minimal properties hold. In particular,
- * the {@code cancel} method itself must not throw exceptions.
+ * still ensure that these properties hold. In particular, the
+ * {@code cancel} method itself must not throw exceptions.
*
*
This method is designed to be invoked by other
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
- * @param mayInterruptIfRunning this value is ignored in the
- * default implementation because tasks are not
- * cancelled via interruption
+ * @param mayInterruptIfRunning this value has no effect in the
+ * default implementation because interrupts are not used to
+ * control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
@@ -681,23 +726,13 @@ public abstract class ForkJoinTask implements Future, Serializable {
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
- int s;
- if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
+ Thread t = Thread.currentThread();
+ if (t instanceof ForkJoinWorkerThread)
quietlyJoin();
- s = status;
- }
- else {
- while ((s = status) >= 0) {
- synchronized (this) { // interruptible form of awaitDone
- if (UNSAFE.compareAndSwapInt(this, statusOffset,
- s, SIGNAL)) {
- while (status >= 0)
- wait();
- }
- }
- }
- }
- if (s < NORMAL) {
+ else
+ externalInterruptibleAwaitDone(false, 0L);
+ int s = status;
+ if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
@@ -723,72 +758,18 @@ public abstract class ForkJoinTask implements Future, Serializable {
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ long nanos = unit.toNanos(timeout);
Thread t = Thread.currentThread();
- ForkJoinPool pool;
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
- if (status >= 0 && w.unpushTask(this))
- quietlyExec();
- pool = w.pool;
- }
+ if (t instanceof ForkJoinWorkerThread)
+ ((ForkJoinWorkerThread)t).joinTask(this, true, nanos);
else
- pool = null;
- /*
- * Timed wait loop intermixes cases for FJ (pool != null) and
- * non FJ threads. For FJ, decrement pool count but don't try
- * for replacement; increment count on completion. For non-FJ,
- * deal with interrupts. This is messy, but a little less so
- * than is splitting the FJ and nonFJ cases.
- */
- boolean interrupted = false;
- boolean dec = false; // true if pool count decremented
- long nanos = unit.toNanos(timeout);
- for (;;) {
- if (pool == null && Thread.interrupted()) {
- interrupted = true;
- break;
- }
- int s = status;
- if (s < 0)
- break;
- if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)) {
- long startTime = System.nanoTime();
- long nt; // wait time
- while (status >= 0 &&
- (nt = nanos - (System.nanoTime() - startTime)) > 0) {
- if (pool != null && !dec)
- dec = pool.tryDecrementRunningCount();
- else {
- long ms = nt / 1000000;
- int ns = (int) (nt % 1000000);
- try {
- synchronized (this) {
- if (status >= 0)
- wait(ms, ns);
- }
- } catch (InterruptedException ie) {
- if (pool != null)
- cancelIfTerminating();
- else {
- interrupted = true;
- break;
- }
- }
- }
- }
- break;
- }
- }
- if (pool != null && dec)
- pool.incrementRunningCount();
- if (interrupted)
- throw new InterruptedException();
- int es = status;
- if (es != NORMAL) {
+ externalInterruptibleAwaitDone(true, nanos);
+ int s = status;
+ if (s != NORMAL) {
Throwable ex;
- if (es == CANCELLED)
+ if (s == CANCELLED)
throw new CancellationException();
- if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
+ if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
throw new ExecutionException(ex);
throw new TimeoutException();
}
@@ -819,7 +800,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
return;
}
}
- w.joinTask(this);
+ w.joinTask(this, false, 0L);
}
}
else
@@ -855,7 +836,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* processed.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -874,6 +855,12 @@ public abstract class ForkJoinTask implements Future, Serializable {
* under any other usage conditions are not guaranteed.
* This method may be useful when executing
* pre-constructed trees of subtasks in loops.
+ *
+ * Upon completion of this method, {@code isDone()} reports
+ * {@code false}, and {@code getException()} reports {@code
+ * null}. However, the value returned by {@code getRawResult} is
+ * unaffected. To clear this value, you can invoke {@code
+ * setRawResult(null)}.
*/
public void reinitialize() {
if (status == EXCEPTIONAL)
@@ -895,11 +882,12 @@ public abstract class ForkJoinTask implements Future, Serializable {
}
/**
- * Returns {@code true} if the current thread is executing as a
- * ForkJoinPool computation.
+ * Returns {@code true} if the current thread is a {@link
+ * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
*
- * @return {@code true} if the current thread is executing as a
- * ForkJoinPool computation, or false otherwise
+ * @return {@code true} if the current thread is a {@link
+ * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
+ * or {@code false} otherwise
*/
public static boolean inForkJoinPool() {
return Thread.currentThread() instanceof ForkJoinWorkerThread;
@@ -914,7 +902,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* were not, stolen.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -933,7 +921,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* fork other tasks.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -956,7 +944,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* exceeded.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -1014,7 +1002,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* otherwise.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -1033,7 +1021,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* be useful otherwise.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
@@ -1056,7 +1044,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
* otherwise.
*
* This method may be invoked only from within {@code
- * ForkJoinTask} computations (as may be determined using method
+ * ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
diff --git a/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
index 4dcfbbd571c55cfdead8edf85afa1a5f0151a01d..7d79c5190e2fe15c137957d039a36d33b3e73d69 100644
--- a/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
@@ -38,16 +38,18 @@ package java.util.concurrent;
import java.util.Random;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.RejectedExecutionException;
/**
- * A thread managed by a {@link ForkJoinPool}. This class is
- * subclassable solely for the sake of adding functionality -- there
- * are no overridable methods dealing with scheduling or execution.
- * However, you can override initialization and termination methods
- * surrounding the main task processing loop. If you do create such a
- * subclass, you will also need to supply a custom {@link
- * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
- * ForkJoinPool}.
+ * A thread managed by a {@link ForkJoinPool}, which executes
+ * {@link ForkJoinTask}s.
+ * This class is subclassable solely for the sake of adding
+ * functionality -- there are no overridable methods dealing with
+ * scheduling or execution. However, you can override initialization
+ * and termination methods surrounding the main task processing loop.
+ * If you do create such a subclass, you will also need to supply a
+ * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
+ * in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
@@ -376,7 +378,7 @@ public class ForkJoinWorkerThread extends Thread {
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
- * invoke @code{super.onStart()} at the beginning of the method.
+ * invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
@@ -384,7 +386,7 @@ public class ForkJoinWorkerThread extends Thread {
*/
protected void onStart() {
int rs = seedGenerator.nextInt();
- seed = rs == 0? 1 : rs; // seed must be nonzero
+ seed = (rs == 0) ? 1 : rs; // seed must be nonzero
// Allocate name string and arrays in this thread
String pid = Integer.toString(pool.getPoolNumber());
@@ -426,7 +428,7 @@ public class ForkJoinWorkerThread extends Thread {
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
- * ForkJoinTasks.
+ * {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
@@ -628,6 +630,19 @@ public class ForkJoinWorkerThread extends Thread {
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ /*
+ * Note: here and in related methods, as a
+ * performance (not correctness) issue, we'd like
+ * to encourage compiler not to arbitrarily
+ * postpone setting sp after successful CAS.
+ * Currently there is no intrinsic for arranging
+ * this, but using Unsafe putOrderedInt may be a
+ * preferable strategy on some compilers even
+ * though its main effect is a pre-, not post-
+ * fence. To simplify possible changes, the option
+ * is left in comments next to the associated
+ * assignments.
+ */
sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
return t;
@@ -777,10 +792,10 @@ public class ForkJoinWorkerThread extends Thread {
// Run State management
// status check methods used mainly by ForkJoinPool
- final boolean isRunning() { return runState == 0; }
- final boolean isTerminated() { return (runState & TERMINATED) != 0; }
- final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
- final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
+ final boolean isRunning() { return runState == 0; }
+ final boolean isTerminated() { return (runState & TERMINATED) != 0; }
+ final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
+ final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
final boolean isTerminating() {
if ((runState & TERMINATING) != 0)
@@ -884,8 +899,7 @@ public class ForkJoinWorkerThread extends Thread {
*/
final void cancelTasks() {
ForkJoinTask> cj = currentJoin; // try to cancel ongoing tasks
- if (cj != null) {
- currentJoin = null;
+ if (cj != null && cj.status >= 0) {
cj.cancelIgnoringExceptions();
try {
this.interrupt(); // awaken wait
@@ -893,10 +907,8 @@ public class ForkJoinWorkerThread extends Thread {
}
}
ForkJoinTask> cs = currentSteal;
- if (cs != null) {
- currentSteal = null;
+ if (cs != null && cs.status >= 0)
cs.cancelIgnoringExceptions();
- }
while (base != sp) {
ForkJoinTask> t = deqTask();
if (t != null)
@@ -959,57 +971,23 @@ public class ForkJoinWorkerThread extends Thread {
* Possibly runs some tasks and/or blocks, until task is done.
*
* @param joinMe the task to join
+ * @param timed true if use timed wait
+ * @param nanos wait time if timed
*/
- final void joinTask(ForkJoinTask> joinMe) {
+ final void joinTask(ForkJoinTask> joinMe, boolean timed, long nanos) {
// currentJoin only written by this thread; only need ordered store
ForkJoinTask> prevJoin = currentJoin;
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
- if (sp != base)
- localHelpJoinTask(joinMe);
- if (joinMe.status >= 0)
- pool.awaitJoin(joinMe, this);
+ pool.awaitJoin(joinMe, this, timed, nanos);
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
}
/**
- * Run tasks in local queue until given task is done.
- *
- * @param joinMe the task to join
- */
- private void localHelpJoinTask(ForkJoinTask> joinMe) {
- int s;
- ForkJoinTask>[] q;
- while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
- int i = (q.length - 1) & --s;
- long u = (i << qShift) + qBase; // raw offset
- ForkJoinTask> t = q[i];
- if (t == null) // lost to a stealer
- break;
- if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
- /*
- * This recheck (and similarly in helpJoinTask)
- * handles cases where joinMe is independently
- * cancelled or forced even though there is other work
- * available. Back out of the pop by putting t back
- * into slot before we commit by writing sp.
- */
- if (joinMe.status < 0) {
- UNSAFE.putObjectVolatile(q, u, t);
- break;
- }
- sp = s;
- // UNSAFE.putOrderedInt(this, spOffset, s);
- t.quietlyExec();
- }
- }
- }
-
- /**
- * Unless terminating, tries to locate and help perform tasks for
- * a stealer of the given task, or in turn one of its stealers.
- * Traces currentSteal->currentJoin links looking for a thread
- * working on a descendant of the given task and with a non-empty
- * queue to steal back and execute tasks from.
+ * Tries to locate and help perform tasks for a stealer of the
+ * given task, or in turn one of its stealers. Traces
+ * currentSteal->currentJoin links looking for a thread working on
+ * a descendant of the given task and with a non-empty queue to
+ * steal back and execute tasks from.
*
* The implementation is very branchy to cope with potential
* inconsistencies or loops encountering chains that are stale,
@@ -1019,77 +997,127 @@ public class ForkJoinWorkerThread extends Thread {
* don't work out.
*
* @param joinMe the task to join
+ * @param running if false, then must update pool count upon
+ * running a task
+ * @return value of running on exit
*/
- final void helpJoinTask(ForkJoinTask> joinMe) {
- ForkJoinWorkerThread[] ws;
- int n;
- if (joinMe.status < 0) // already done
- return;
- if ((runState & TERMINATING) != 0) { // cancel if shutting down
- joinMe.cancelIgnoringExceptions();
- return;
+ final boolean helpJoinTask(ForkJoinTask> joinMe, boolean running) {
+ /*
+ * Initial checks to (1) abort if terminating; (2) clean out
+ * old cancelled tasks from local queue; (3) if joinMe is next
+ * task, run it; (4) omit scan if local queue nonempty (since
+ * it may contain non-descendents of joinMe).
+ */
+ ForkJoinPool p = pool;
+ for (;;) {
+ ForkJoinTask>[] q;
+ int s;
+ if (joinMe.status < 0)
+ return running;
+ else if ((runState & TERMINATING) != 0) {
+ joinMe.cancelIgnoringExceptions();
+ return running;
+ }
+ else if ((s = sp) == base || (q = queue) == null)
+ break; // queue empty
+ else {
+ int i = (q.length - 1) & --s;
+ long u = (i << qShift) + qBase; // raw offset
+ ForkJoinTask> t = q[i];
+ if (t == null)
+ break; // lost to a stealer
+ else if (t != joinMe && t.status >= 0)
+ return running; // cannot safely help
+ else if ((running ||
+ (running = p.tryIncrementRunningCount())) &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ sp = s; // putOrderedInt may encourage more timely write
+ // UNSAFE.putOrderedInt(this, spOffset, s);
+ t.quietlyExec();
+ }
+ }
}
- if ((ws = pool.workers) == null || (n = ws.length) <= 1)
- return; // need at least 2 workers
-
- ForkJoinTask> task = joinMe; // base of chain
- ForkJoinWorkerThread thread = this; // thread with stolen task
- for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
- // Try to find v, the stealer of task, by first using hint
- ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
- if (v == null || v.currentSteal != task) {
- for (int j = 0; ; ++j) { // search array
- if (j < n) {
- ForkJoinTask> vs;
- if ((v = ws[j]) != null &&
- (vs = v.currentSteal) != null) {
- if (joinMe.status < 0 || task.status < 0)
- return; // stale or done
- if (vs == task) {
- thread.stealHint = j;
- break; // save hint for next time
+
+ int n; // worker array size
+ ForkJoinWorkerThread[] ws = p.workers;
+ if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
+ ForkJoinTask> task = joinMe; // base of chain
+ ForkJoinWorkerThread thread = this; // thread with stolen task
+
+ outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
+ // Try to find v, the stealer of task, by first using hint
+ ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
+ if (v == null || v.currentSteal != task) {
+ for (int j = 0; ; ++j) { // search array
+ if (j < n) {
+ ForkJoinTask> vs;
+ if ((v = ws[j]) != null &&
+ (vs = v.currentSteal) != null) {
+ if (joinMe.status < 0)
+ break outer;
+ if (vs == task) {
+ if (task.status < 0)
+ break outer; // stale
+ thread.stealHint = j;
+ break; // save hint for next time
+ }
}
}
+ else
+ break outer; // no stealer
}
- else
- return; // no stealer
}
- }
- for (;;) { // Try to help v, using specialized form of deqTask
- if (joinMe.status < 0)
- return;
- int b = v.base;
- ForkJoinTask>[] q = v.queue;
- if (b == v.sp || q == null)
- break;
- int i = (q.length - 1) & b;
- long u = (i << qShift) + qBase;
- ForkJoinTask> t = q[i];
- int pid = poolIndex;
- ForkJoinTask> ps = currentSteal;
- if (task.status < 0)
- return; // stale or done
- if (t != null && v.base == b++ &&
- UNSAFE.compareAndSwapObject(q, u, t, null)) {
- if (joinMe.status < 0) {
- UNSAFE.putObjectVolatile(q, u, t);
- return; // back out on cancel
+
+ // Try to help v, using specialized form of deqTask
+ for (;;) {
+ if (joinMe.status < 0)
+ break outer;
+ int b = v.base;
+ ForkJoinTask>[] q = v.queue;
+ if (b == v.sp || q == null)
+ break; // empty
+ int i = (q.length - 1) & b;
+ long u = (i << qShift) + qBase;
+ ForkJoinTask> t = q[i];
+ if (task.status < 0)
+ break outer; // stale
+ if (t != null &&
+ (running ||
+ (running = p.tryIncrementRunningCount())) &&
+ v.base == b++ &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ if (t != joinMe && joinMe.status < 0) {
+ UNSAFE.putObjectVolatile(q, u, t);
+ break outer; // joinMe cancelled; back out
+ }
+ v.base = b;
+ if (t.status >= 0) {
+ ForkJoinTask> ps = currentSteal;
+ int pid = poolIndex;
+ v.stealHint = pid;
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, t);
+ t.quietlyExec();
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, ps);
+ }
+ }
+ else if ((runState & TERMINATING) != 0) {
+ joinMe.cancelIgnoringExceptions();
+ break outer;
}
- v.base = b;
- v.stealHint = pid;
- UNSAFE.putOrderedObject(this, currentStealOffset, t);
- t.quietlyExec();
- UNSAFE.putOrderedObject(this, currentStealOffset, ps);
}
+
+ // Try to descend to find v's stealer
+ ForkJoinTask> next = v.currentJoin;
+ if (task.status < 0 || next == null || next == task ||
+ joinMe.status < 0)
+ break; // done, stale, dead-end, or cyclic
+ task = next;
+ thread = v;
}
- // Try to descend to find v's stealer
- ForkJoinTask> next = v.currentJoin;
- if (task.status < 0 || next == null || next == task ||
- joinMe.status < 0)
- return;
- task = next;
- thread = v;
}
+ return running;
}
/**
diff --git a/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java b/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java
index 8051ccaa8483f71f0f873515a27f1880b051ab6f..f5d9da1cb178f52cfe7a505f3c69cdd9a10cf4d8 100644
--- a/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java
+++ b/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java
@@ -1029,6 +1029,8 @@ public class LinkedBlockingDeque
* elements as they existed upon construction of the iterator, and
* may (but is not guaranteed to) reflect any modifications
* subsequent to construction.
+ *
+ * @return an iterator over the elements in this deque in reverse order
*/
public Iterator descendingIterator() {
return new DescendingItr();
diff --git a/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java b/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java
index 9746b7dedec8500f0f884c6c966882465fc23c88..5f62e79ce56bdf90da60e21d7b3076f1a5123973 100644
--- a/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java
+++ b/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java
@@ -189,14 +189,14 @@ public class LinkedBlockingQueue extends AbstractQueue
}
/**
- * Creates a node and links it at end of queue.
+ * Links node at end of queue.
*
- * @param x the item
+ * @param node the node
*/
- private void enqueue(E x) {
+ private void enqueue(Node node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
- last = last.next = new Node(x);
+ last = last.next = node;
}
/**
@@ -282,7 +282,7 @@ public class LinkedBlockingQueue extends AbstractQueue
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
- enqueue(e);
+ enqueue(new Node(e));
++n;
}
count.set(n);
@@ -332,6 +332,7 @@ public class LinkedBlockingQueue extends AbstractQueue
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
+ Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
@@ -347,7 +348,7 @@ public class LinkedBlockingQueue extends AbstractQueue
while (count.get() == capacity) {
notFull.await();
}
- enqueue(e);
+ enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
@@ -382,7 +383,7 @@ public class LinkedBlockingQueue extends AbstractQueue
return false;
nanos = notFull.awaitNanos(nanos);
}
- enqueue(e);
+ enqueue(new Node(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
@@ -411,11 +412,12 @@ public class LinkedBlockingQueue