From f4eeb63d0c9b4aa54f29c5b5a2fce2c19fbba4f8 Mon Sep 17 00:00:00 2001 From: dl Date: Tue, 28 Jul 2009 17:17:55 -0700 Subject: [PATCH] 6805775: LinkedBlockingQueue Nodes should unlink themselves before becoming garbage 6815766: LinkedBlockingQueue's iterator can return null if drainTo(c) executes concurrently Summary: Faster, more correct. Use self-linking trick to avoid gc retention Reviewed-by: martin, dholmes --- .../util/concurrent/LinkedBlockingDeque.java | 390 +++++++++++------ .../util/concurrent/LinkedBlockingQueue.java | 403 ++++++++++-------- test/java/util/Collection/MOAT.java | 35 ++ .../BlockingQueue/OfferDrainToLoops.java | 130 ++++++ .../IteratorWeakConsistency.java | 93 ++++ 5 files changed, 728 insertions(+), 323 deletions(-) create mode 100644 test/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java create mode 100644 test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java diff --git a/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java b/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java index 48fc8717a..7db94a6ef 100644 --- a/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java +++ b/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java @@ -34,8 +34,13 @@ */ package java.util.concurrent; -import java.util.*; -import java.util.concurrent.locks.*; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on @@ -73,6 +78,20 @@ public class LinkedBlockingDeque /* * Implemented as a simple doubly-linked list protected by a * single lock and using conditions to manage blocking. + * + * To implement weakly consistent iterators, it appears we need to + * keep all Nodes GC-reachable from a predecessor dequeued Node. + * That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to jump to "first" (for next links) + * or "last" (for prev links). */ /* @@ -86,9 +105,27 @@ public class LinkedBlockingDeque /** Doubly-linked list node class */ static final class Node { + /** + * The item, or null if this node has been removed. + */ E item; + + /** + * One of: + * - the real predecessor Node + * - this Node, meaning the predecessor is tail + * - null, meaning there is no predecessor + */ Node prev; + + /** + * One of: + * - the real successor Node + * - this Node, meaning the successor is head + * - null, meaning there is no successor + */ Node next; + Node(E x, Node p, Node n) { item = x; prev = p; @@ -96,23 +133,37 @@ public class LinkedBlockingDeque } } - /** Pointer to first node */ - private transient Node first; - /** Pointer to last node */ - private transient Node last; + /** + * Pointer to first node. + * Invariant: (first == null && last == null) || + * (first.prev == null && first.item != null) + */ + transient Node first; + + /** + * Pointer to last node. + * Invariant: (first == null && last == null) || + * (last.next == null && last.item != null) + */ + transient Node last; + /** Number of items in the deque */ private transient int count; + /** Maximum number of items in the deque */ private final int capacity; + /** Main lock guarding all access */ - private final ReentrantLock lock = new ReentrantLock(); + final ReentrantLock lock = new ReentrantLock(); + /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); + /** Condition for waiting puts */ private final Condition notFull = lock.newCondition(); /** - * Creates a LinkedBlockingDeque with a capacity of + * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingDeque() { @@ -120,10 +171,10 @@ public class LinkedBlockingDeque } /** - * Creates a LinkedBlockingDeque with the given (fixed) capacity. + * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. * * @param capacity the capacity of this deque - * @throws IllegalArgumentException if capacity is less than 1 + * @throws IllegalArgumentException if {@code capacity} is less than 1 */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); @@ -131,7 +182,7 @@ public class LinkedBlockingDeque } /** - * Creates a LinkedBlockingDeque with a capacity of + * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of * the given collection, added in traversal order of the * collection's iterator. @@ -142,8 +193,18 @@ public class LinkedBlockingDeque */ public LinkedBlockingDeque(Collection c) { this(Integer.MAX_VALUE); - for (E e : c) - add(e); + final ReentrantLock lock = this.lock; + lock.lock(); // Never contended, but necessary for visibility + try { + for (E e : c) { + if (e == null) + throw new NullPointerException(); + if (!linkLast(e)) + throw new IllegalStateException("Deque full"); + } + } finally { + lock.unlock(); + } } @@ -153,9 +214,9 @@ public class LinkedBlockingDeque * Links e as first element, or returns false if full. */ private boolean linkFirst(E e) { + // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; - ++count; Node f = first; Node x = new Node(e, null, f); first = x; @@ -163,6 +224,7 @@ public class LinkedBlockingDeque last = x; else f.prev = x; + ++count; notEmpty.signal(); return true; } @@ -171,9 +233,9 @@ public class LinkedBlockingDeque * Links e as last element, or returns false if full. */ private boolean linkLast(E e) { + // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; - ++count; Node l = last; Node x = new Node(e, l, null); last = x; @@ -181,6 +243,7 @@ public class LinkedBlockingDeque first = x; else l.next = x; + ++count; notEmpty.signal(); return true; } @@ -189,10 +252,14 @@ public class LinkedBlockingDeque * Removes and returns first element, or null if empty. */ private E unlinkFirst() { + // assert lock.isHeldByCurrentThread(); Node f = first; if (f == null) return null; Node n = f.next; + E item = f.item; + f.item = null; + f.next = f; // help GC first = n; if (n == null) last = null; @@ -200,17 +267,21 @@ public class LinkedBlockingDeque n.prev = null; --count; notFull.signal(); - return f.item; + return item; } /** * Removes and returns last element, or null if empty. */ private E unlinkLast() { + // assert lock.isHeldByCurrentThread(); Node l = last; if (l == null) return null; Node p = l.prev; + E item = l.item; + l.item = null; + l.prev = l; // help GC last = p; if (p == null) first = null; @@ -218,31 +289,29 @@ public class LinkedBlockingDeque p.next = null; --count; notFull.signal(); - return l.item; + return item; } /** - * Unlink e + * Unlinks x. */ - private void unlink(Node x) { + void unlink(Node x) { + // assert lock.isHeldByCurrentThread(); Node p = x.prev; Node n = x.next; if (p == null) { - if (n == null) - first = last = null; - else { - n.prev = null; - first = n; - } + unlinkFirst(); } else if (n == null) { - p.next = null; - last = p; + unlinkLast(); } else { p.next = n; n.prev = p; + x.item = null; + // Don't mess with x's links. They may still be in use by + // an iterator. + --count; + notFull.signal(); } - --count; - notFull.signalAll(); } // BlockingDeque methods @@ -270,6 +339,7 @@ public class LinkedBlockingDeque */ public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(e); @@ -283,6 +353,7 @@ public class LinkedBlockingDeque */ public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(e); @@ -297,6 +368,7 @@ public class LinkedBlockingDeque */ public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(e)) @@ -312,6 +384,7 @@ public class LinkedBlockingDeque */ public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(e)) @@ -329,15 +402,15 @@ public class LinkedBlockingDeque throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - if (linkFirst(e)) - return true; + while (!linkFirst(e)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } + return true; } finally { lock.unlock(); } @@ -351,15 +424,15 @@ public class LinkedBlockingDeque throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - if (linkLast(e)) - return true; + while (!linkLast(e)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } + return true; } finally { lock.unlock(); } @@ -384,6 +457,7 @@ public class LinkedBlockingDeque } public E pollFirst() { + final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); @@ -393,6 +467,7 @@ public class LinkedBlockingDeque } public E pollLast() { + final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); @@ -402,6 +477,7 @@ public class LinkedBlockingDeque } public E takeFirst() throws InterruptedException { + final ReentrantLock lock = this.lock; lock.lock(); try { E x; @@ -414,6 +490,7 @@ public class LinkedBlockingDeque } public E takeLast() throws InterruptedException { + final ReentrantLock lock = this.lock; lock.lock(); try { E x; @@ -428,16 +505,16 @@ public class LinkedBlockingDeque public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - E x = unlinkFirst(); - if (x != null) - return x; + E x; + while ( (x = unlinkFirst()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } + return x; } finally { lock.unlock(); } @@ -446,16 +523,16 @@ public class LinkedBlockingDeque public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - E x = unlinkLast(); - if (x != null) - return x; + E x; + while ( (x = unlinkLast()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } + return x; } finally { lock.unlock(); } @@ -480,6 +557,7 @@ public class LinkedBlockingDeque } public E peekFirst() { + final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; @@ -489,6 +567,7 @@ public class LinkedBlockingDeque } public E peekLast() { + final ReentrantLock lock = this.lock; lock.lock(); try { return (last == null) ? null : last.item; @@ -499,6 +578,7 @@ public class LinkedBlockingDeque public boolean removeFirstOccurrence(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node p = first; p != null; p = p.next) { @@ -515,6 +595,7 @@ public class LinkedBlockingDeque public boolean removeLastOccurrence(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node p = last; p != null; p = p.prev) { @@ -619,14 +700,15 @@ public class LinkedBlockingDeque * Returns the number of additional elements that this deque can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this deque - * less the current size of this deque. + * less the current {@code size} of this deque. * *

Note that you cannot always tell if an attempt to insert - * an element will succeed by inspecting remainingCapacity + * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. */ public int remainingCapacity() { + final ReentrantLock lock = this.lock; lock.lock(); try { return capacity - count; @@ -642,22 +724,7 @@ public class LinkedBlockingDeque * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - lock.lock(); - try { - for (Node p = first; p != null; p = p.next) - c.add(p.item); - int n = count; - count = 0; - first = last = null; - notFull.signalAll(); - return n; - } finally { - lock.unlock(); - } + return drainTo(c, Integer.MAX_VALUE); } /** @@ -671,19 +738,14 @@ public class LinkedBlockingDeque throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; lock.lock(); try { - int n = 0; - while (n < maxElements && first != null) { - c.add(first.item); - first.prev = null; - first = first.next; - --count; - ++n; + int n = Math.min(maxElements, count); + for (int i = 0; i < n; i++) { + c.add(first.item); // In this order, in case add() throws. + unlinkFirst(); } - if (first == null) - last = null; - notFull.signalAll(); return n; } finally { lock.unlock(); @@ -712,16 +774,16 @@ public class LinkedBlockingDeque /** * Removes the first occurrence of the specified element from this deque. * If the deque does not contain the element, it is unchanged. - * More formally, removes the first element e such that - * o.equals(e) (if such an element exists). - * Returns true if this deque contained the specified element + * More formally, removes the first element {@code e} such that + * {@code o.equals(e)} (if such an element exists). + * Returns {@code true} if this deque contained the specified element * (or equivalently, if this deque changed as a result of the call). * *

This method is equivalent to * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}. * * @param o element to be removed from this deque, if present - * @return true if this deque changed as a result of the call + * @return {@code true} if this deque changed as a result of the call */ public boolean remove(Object o) { return removeFirstOccurrence(o); @@ -733,6 +795,7 @@ public class LinkedBlockingDeque * @return the number of elements in this deque */ public int size() { + final ReentrantLock lock = this.lock; lock.lock(); try { return count; @@ -742,15 +805,16 @@ public class LinkedBlockingDeque } /** - * Returns true if this deque contains the specified element. - * More formally, returns true if and only if this deque contains - * at least one element e such that o.equals(e). + * Returns {@code true} if this deque contains the specified element. + * More formally, returns {@code true} if and only if this deque contains + * at least one element {@code e} such that {@code o.equals(e)}. * * @param o object to be checked for containment in this deque - * @return true if this deque contains the specified element + * @return {@code true} if this deque contains the specified element */ public boolean contains(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node p = first; p != null; p = p.next) @@ -762,24 +826,46 @@ public class LinkedBlockingDeque } } - /** - * Variant of removeFirstOccurrence needed by iterator.remove. - * Searches for the node, not its contents. + /* + * TODO: Add support for more efficient bulk operations. + * + * We don't want to acquire the lock for every iteration, but we + * also want other threads a chance to interact with the + * collection, especially when count is close to capacity. */ - boolean removeNode(Node e) { - lock.lock(); - try { - for (Node p = first; p != null; p = p.next) { - if (p == e) { - unlink(p); - return true; - } - } - return false; - } finally { - lock.unlock(); - } - } + +// /** +// * Adds all of the elements in the specified collection to this +// * queue. Attempts to addAll of a queue to itself result in +// * {@code IllegalArgumentException}. Further, the behavior of +// * this operation is undefined if the specified collection is +// * modified while the operation is in progress. +// * +// * @param c collection containing elements to be added to this queue +// * @return {@code true} if this queue changed as a result of the call +// * @throws ClassCastException {@inheritDoc} +// * @throws NullPointerException {@inheritDoc} +// * @throws IllegalArgumentException {@inheritDoc} +// * @throws IllegalStateException {@inheritDoc} +// * @see #add(Object) +// */ +// public boolean addAll(Collection c) { +// if (c == null) +// throw new NullPointerException(); +// if (c == this) +// throw new IllegalArgumentException(); +// final ReentrantLock lock = this.lock; +// lock.lock(); +// try { +// boolean modified = false; +// for (E e : c) +// if (linkLast(e)) +// modified = true; +// return modified; +// } finally { +// lock.unlock(); +// } +// } /** * Returns an array containing all of the elements in this deque, in @@ -794,7 +880,9 @@ public class LinkedBlockingDeque * * @return an array containing all of the elements in this deque */ + @SuppressWarnings("unchecked") public Object[] toArray() { + final ReentrantLock lock = this.lock; lock.lock(); try { Object[] a = new Object[count]; @@ -817,22 +905,22 @@ public class LinkedBlockingDeque *

If this deque fits in the specified array with room to spare * (i.e., the array has more elements than this deque), the element in * the array immediately following the end of the deque is set to - * null. + * {@code null}. * *

Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * - *

Suppose x is a deque known to contain only strings. + *

Suppose {@code x} is a deque known to contain only strings. * The following code can be used to dump the deque into a newly - * allocated array of String: + * allocated array of {@code String}: * *

      *     String[] y = x.toArray(new String[0]);
* - * Note that toArray(new Object[0]) is identical in function to - * toArray(). + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. * * @param a the array into which the elements of the deque are to * be stored, if it is big enough; otherwise, a new array of the @@ -843,14 +931,14 @@ public class LinkedBlockingDeque * this deque * @throws NullPointerException if the specified array is null */ + @SuppressWarnings("unchecked") public T[] toArray(T[] a) { + final ReentrantLock lock = this.lock; lock.lock(); try { if (a.length < count) - a = (T[])java.lang.reflect.Array.newInstance( - a.getClass().getComponentType(), - count - ); + a = (T[])java.lang.reflect.Array.newInstance + (a.getClass().getComponentType(), count); int k = 0; for (Node p = first; p != null; p = p.next) @@ -864,6 +952,7 @@ public class LinkedBlockingDeque } public String toString() { + final ReentrantLock lock = this.lock; lock.lock(); try { return super.toString(); @@ -877,8 +966,16 @@ public class LinkedBlockingDeque * The deque will be empty after this call returns. */ public void clear() { + final ReentrantLock lock = this.lock; lock.lock(); try { + for (Node f = first; f != null; ) { + f.item = null; + Node n = f.next; + f.prev = null; + f.next = null; + f = n; + } first = last = null; count = 0; notFull.signalAll(); @@ -890,7 +987,7 @@ public class LinkedBlockingDeque /** * Returns an iterator over the elements in this deque in proper sequence. * The elements will be returned in order from first (head) to last (tail). - * The returned Iterator is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -906,7 +1003,7 @@ public class LinkedBlockingDeque * Returns an iterator over the elements in this deque in reverse * sequential order. The elements will be returned in order from * last (tail) to first (head). - * The returned Iterator is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -921,7 +1018,7 @@ public class LinkedBlockingDeque */ private abstract class AbstractItr implements Iterator { /** - * The next node to return in next + * The next node to return in next() */ Node next; @@ -939,15 +1036,44 @@ public class LinkedBlockingDeque */ private Node lastRet; + abstract Node firstNode(); + abstract Node nextNode(Node n); + AbstractItr() { - advance(); // set to initial position + // set to initial position + final ReentrantLock lock = LinkedBlockingDeque.this.lock; + lock.lock(); + try { + next = firstNode(); + nextItem = (next == null) ? null : next.item; + } finally { + lock.unlock(); + } } /** - * Advances next, or if not yet initialized, sets to first node. - * Implemented to move forward vs backward in the two subclasses. + * Advances next. */ - abstract void advance(); + void advance() { + final ReentrantLock lock = LinkedBlockingDeque.this.lock; + lock.lock(); + try { + // assert next != null; + Node s = nextNode(next); + if (s == next) { + next = firstNode(); + } else { + // Skip over removed nodes. + // May be necessary if multiple interior Nodes are removed. + while (s != null && s.item == null) + s = nextNode(s); + next = s; + } + nextItem = (next == null) ? null : next.item; + } finally { + lock.unlock(); + } + } public boolean hasNext() { return next != null; @@ -967,52 +1093,39 @@ public class LinkedBlockingDeque if (n == null) throw new IllegalStateException(); lastRet = null; - // Note: removeNode rescans looking for this node to make - // sure it was not already removed. Otherwise, trying to - // re-remove could corrupt list. - removeNode(n); - } - } - - /** Forward iterator */ - private class Itr extends AbstractItr { - void advance() { final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { - next = (next == null)? first : next.next; - nextItem = (next == null)? null : next.item; + if (n.item != null) + unlink(n); } finally { lock.unlock(); } } } - /** - * Descending iterator for LinkedBlockingDeque - */ + /** Forward iterator */ + private class Itr extends AbstractItr { + Node firstNode() { return first; } + Node nextNode(Node n) { return n.next; } + } + + /** Descending iterator */ private class DescendingItr extends AbstractItr { - void advance() { - final ReentrantLock lock = LinkedBlockingDeque.this.lock; - lock.lock(); - try { - next = (next == null)? last : next.prev; - nextItem = (next == null)? null : next.item; - } finally { - lock.unlock(); - } - } + Node firstNode() { return last; } + Node nextNode(Node n) { return n.prev; } } /** * Save the state of this deque to a stream (that is, serialize it). * * @serialData The capacity (int), followed by elements (each an - * Object) in the proper order, followed by a null + * {@code Object}) in the proper order, followed by a null * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { + final ReentrantLock lock = this.lock; lock.lock(); try { // Write out capacity and any hidden stuff @@ -1040,6 +1153,7 @@ public class LinkedBlockingDeque last = null; // Read in all elements and place in queue for (;;) { + @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) break; diff --git a/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java b/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java index dc56a0346..9c1c6cc7b 100644 --- a/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java +++ b/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java @@ -34,9 +34,14 @@ */ package java.util.concurrent; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import java.util.*; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; /** * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on @@ -86,15 +91,43 @@ public class LinkedBlockingQueue extends AbstractQueue * items have been entered since the signal. And symmetrically for * takes signalling puts. Operations such as remove(Object) and * iterators acquire both locks. + * + * Visibility between writers and readers is provided as follows: + * + * Whenever an element is enqueued, the putLock is acquired and + * count updated. A subsequent reader guarantees visibility to the + * enqueued Node by either acquiring the putLock (via fullyLock) + * or by acquiring the takeLock, and then reading n = count.get(); + * this gives visibility to the first n items. + * + * To implement weakly consistent iterators, it appears we need to + * keep all Nodes GC-reachable from a predecessor dequeued Node. + * That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to advance to head.next. */ /** * Linked list node class */ static class Node { - /** The item, volatile to ensure barrier separating write and read */ - volatile E item; + E item; + + /** + * One of: + * - the real successor Node + * - this Node, meaning the successor is head.next + * - null, meaning there is no successor (this is the last node) + */ Node next; + Node(E x) { item = x; } } @@ -104,10 +137,16 @@ public class LinkedBlockingQueue extends AbstractQueue /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(0); - /** Head of linked list */ + /** + * Head of linked list. + * Invariant: head.item == null + */ private transient Node head; - /** Tail of linked list */ + /** + * Tail of linked list. + * Invariant: last.next == null + */ private transient Node last; /** Lock held by take, poll, etc */ @@ -151,18 +190,26 @@ public class LinkedBlockingQueue extends AbstractQueue /** * Creates a node and links it at end of queue. + * * @param x the item */ - private void insert(E x) { + private void enqueue(E x) { + // assert putLock.isHeldByCurrentThread(); + // assert last.next == null; last = last.next = new Node(x); } /** - * Removes a node from head of queue, + * Removes a node from head of queue. + * * @return the node */ - private E extract() { - Node first = head.next; + private E dequeue() { + // assert takeLock.isHeldByCurrentThread(); + // assert head.item == null; + Node h = head; + Node first = h.next; + h.next = h; // help GC head = first; E x = first.item; first.item = null; @@ -172,7 +219,7 @@ public class LinkedBlockingQueue extends AbstractQueue /** * Lock to prevent both puts and takes. */ - private void fullyLock() { + void fullyLock() { putLock.lock(); takeLock.lock(); } @@ -180,14 +227,21 @@ public class LinkedBlockingQueue extends AbstractQueue /** * Unlock to allow both puts and takes. */ - private void fullyUnlock() { + void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } +// /** +// * Tells whether both locks are held by current thread. +// */ +// boolean isFullyLocked() { +// return (putLock.isHeldByCurrentThread() && +// takeLock.isHeldByCurrentThread()); +// } /** - * Creates a LinkedBlockingQueue with a capacity of + * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { @@ -195,10 +249,10 @@ public class LinkedBlockingQueue extends AbstractQueue } /** - * Creates a LinkedBlockingQueue with the given (fixed) capacity. + * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue - * @throws IllegalArgumentException if capacity is not greater + * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { @@ -208,7 +262,7 @@ public class LinkedBlockingQueue extends AbstractQueue } /** - * Creates a LinkedBlockingQueue with a capacity of + * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. @@ -219,8 +273,22 @@ public class LinkedBlockingQueue extends AbstractQueue */ public LinkedBlockingQueue(Collection c) { this(Integer.MAX_VALUE); - for (E e : c) - add(e); + final ReentrantLock putLock = this.putLock; + putLock.lock(); // Never contended, but necessary for visibility + try { + int n = 0; + for (E e : c) { + if (e == null) + throw new NullPointerException(); + if (n == capacity) + throw new IllegalStateException("Queue full"); + enqueue(e); + ++n; + } + count.set(n); + } finally { + putLock.unlock(); + } } @@ -241,10 +309,10 @@ public class LinkedBlockingQueue extends AbstractQueue * Returns the number of additional elements that this queue can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this queue - * less the current size of this queue. + * less the current {@code size} of this queue. * *

Note that you cannot always tell if an attempt to insert - * an element will succeed by inspecting remainingCapacity + * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. */ @@ -261,8 +329,8 @@ public class LinkedBlockingQueue extends AbstractQueue */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); - // Note: convention in all put/take/etc is to preset - // local var holding count negative to indicate failure unless set. + // Note: convention in all put/take/etc is to preset local var + // holding count negative to indicate failure unless set. int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; @@ -273,18 +341,13 @@ public class LinkedBlockingQueue extends AbstractQueue * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are - * signalled if it ever changes from - * capacity. Similarly for all other uses of count in - * other wait guards. + * signalled if it ever changes from capacity. Similarly + * for all other uses of count in other wait guards. */ - try { - while (count.get() == capacity) - notFull.await(); - } catch (InterruptedException ie) { - notFull.signal(); // propagate to a non-interrupted thread - throw ie; + while (count.get() == capacity) { + notFull.await(); } - insert(e); + enqueue(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); @@ -299,7 +362,7 @@ public class LinkedBlockingQueue extends AbstractQueue * Inserts the specified element at the tail of this queue, waiting if * necessary up to the specified wait time for space to become available. * - * @return true if successful, or false if + * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available. * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} @@ -314,23 +377,15 @@ public class LinkedBlockingQueue extends AbstractQueue final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { - for (;;) { - if (count.get() < capacity) { - insert(e); - c = count.getAndIncrement(); - if (c + 1 < capacity) - notFull.signal(); - break; - } + while (count.get() == capacity) { if (nanos <= 0) return false; - try { - nanos = notFull.awaitNanos(nanos); - } catch (InterruptedException ie) { - notFull.signal(); // propagate to a non-interrupted thread - throw ie; - } + nanos = notFull.awaitNanos(nanos); } + enqueue(e); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); } finally { putLock.unlock(); } @@ -342,7 +397,7 @@ public class LinkedBlockingQueue extends AbstractQueue /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, - * returning true upon success and false if this queue + * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to @@ -360,7 +415,7 @@ public class LinkedBlockingQueue extends AbstractQueue putLock.lock(); try { if (count.get() < capacity) { - insert(e); + enqueue(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); @@ -381,15 +436,10 @@ public class LinkedBlockingQueue extends AbstractQueue final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { - try { - while (count.get() == 0) - notEmpty.await(); - } catch (InterruptedException ie) { - notEmpty.signal(); // propagate to a non-interrupted thread - throw ie; + while (count.get() == 0) { + notEmpty.await(); } - - x = extract(); + x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); @@ -409,23 +459,15 @@ public class LinkedBlockingQueue extends AbstractQueue final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { - for (;;) { - if (count.get() > 0) { - x = extract(); - c = count.getAndDecrement(); - if (c > 1) - notEmpty.signal(); - break; - } + while (count.get() == 0) { if (nanos <= 0) return null; - try { - nanos = notEmpty.awaitNanos(nanos); - } catch (InterruptedException ie) { - notEmpty.signal(); // propagate to a non-interrupted thread - throw ie; - } + nanos = notEmpty.awaitNanos(nanos); } + x = dequeue(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); } finally { takeLock.unlock(); } @@ -444,7 +486,7 @@ public class LinkedBlockingQueue extends AbstractQueue takeLock.lock(); try { if (count.get() > 0) { - x = extract(); + x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); @@ -457,7 +499,6 @@ public class LinkedBlockingQueue extends AbstractQueue return x; } - public E peek() { if (count.get() == 0) return null; @@ -474,44 +515,48 @@ public class LinkedBlockingQueue extends AbstractQueue } } + /** + * Unlinks interior Node p with predecessor trail. + */ + void unlink(Node p, Node trail) { + // assert isFullyLocked(); + // p.next is not changed, to allow iterators that are + // traversing p to maintain their weak-consistency guarantee. + p.item = null; + trail.next = p.next; + if (last == p) + last = trail; + if (count.getAndDecrement() == capacity) + notFull.signal(); + } + /** * Removes a single instance of the specified element from this queue, - * if it is present. More formally, removes an element e such - * that o.equals(e), if this queue contains one or more such + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such * elements. - * Returns true if this queue contained the specified element + * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present - * @return true if this queue changed as a result of the call + * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { if (o == null) return false; - boolean removed = false; fullyLock(); try { - Node trail = head; - Node p = head.next; - while (p != null) { + for (Node trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { if (o.equals(p.item)) { - removed = true; - break; + unlink(p, trail); + return true; } - trail = p; - p = p.next; - } - if (removed) { - p.item = null; - trail.next = p.next; - if (last == p) - last = trail; - if (count.getAndDecrement() == capacity) - notFull.signalAll(); } + return false; } finally { fullyUnlock(); } - return removed; } /** @@ -551,22 +596,22 @@ public class LinkedBlockingQueue extends AbstractQueue *

If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to - * null. + * {@code null}. * *

Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * - *

Suppose x is a queue known to contain only strings. + *

Suppose {@code x} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly - * allocated array of String: + * allocated array of {@code String}: * *

      *     String[] y = x.toArray(new String[0]);
* - * Note that toArray(new Object[0]) is identical in function to - * toArray(). + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the @@ -577,6 +622,7 @@ public class LinkedBlockingQueue extends AbstractQueue * this queue * @throws NullPointerException if the specified array is null */ + @SuppressWarnings("unchecked") public T[] toArray(T[] a) { fullyLock(); try { @@ -586,7 +632,7 @@ public class LinkedBlockingQueue extends AbstractQueue (a.getClass().getComponentType(), size); int k = 0; - for (Node p = head.next; p != null; p = p.next) + for (Node p = head.next; p != null; p = p.next) a[k++] = (T)p.item; if (a.length > k) a[k] = null; @@ -612,11 +658,14 @@ public class LinkedBlockingQueue extends AbstractQueue public void clear() { fullyLock(); try { - head.next = null; - assert head.item == null; - last = head; + for (Node p, h = head; (p = h.next) != null; h = p) { + h.next = h; + p.item = null; + } + head = last; + // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) - notFull.signalAll(); + notFull.signal(); } finally { fullyUnlock(); } @@ -629,30 +678,7 @@ public class LinkedBlockingQueue extends AbstractQueue * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - Node first; - fullyLock(); - try { - first = head.next; - head.next = null; - assert head.item == null; - last = head; - if (count.getAndSet(0) == capacity) - notFull.signalAll(); - } finally { - fullyUnlock(); - } - // Transfer the elements outside of locks - int n = 0; - for (Node p = first; p != null; p = p.next) { - c.add(p.item); - p.item = null; - ++n; - } - return n; + return drainTo(c, Integer.MAX_VALUE); } /** @@ -666,33 +692,42 @@ public class LinkedBlockingQueue extends AbstractQueue throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); - fullyLock(); + boolean signalNotFull = false; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); try { - int n = 0; - Node p = head.next; - while (p != null && n < maxElements) { - c.add(p.item); - p.item = null; - p = p.next; - ++n; - } - if (n != 0) { - head.next = p; - assert head.item == null; - if (p == null) - last = head; - if (count.getAndAdd(-n) == capacity) - notFull.signalAll(); + int n = Math.min(maxElements, count.get()); + // count.get provides visibility to first n Nodes + Node h = head; + int i = 0; + try { + while (i < n) { + Node p = h.next; + c.add(p.item); + p.item = null; + h.next = h; + h = p; + ++i; + } + return n; + } finally { + // Restore invariants even if c.add() threw + if (i > 0) { + // assert h.item == null; + head = h; + signalNotFull = (count.getAndAdd(-i) == capacity); + } } - return n; } finally { - fullyUnlock(); + takeLock.unlock(); + if (signalNotFull) + signalNotFull(); } } /** * Returns an iterator over the elements in this queue in proper sequence. - * The returned Iterator is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -706,7 +741,7 @@ public class LinkedBlockingQueue extends AbstractQueue private class Itr implements Iterator { /* - * Basic weak-consistent iterator. At all times hold the next + * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ @@ -715,17 +750,13 @@ public class LinkedBlockingQueue extends AbstractQueue private E currentElement; Itr() { - final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; - final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; - putLock.lock(); - takeLock.lock(); + fullyLock(); try { current = head.next; if (current != null) currentElement = current.item; } finally { - takeLock.unlock(); - putLock.unlock(); + fullyUnlock(); } } @@ -733,54 +764,54 @@ public class LinkedBlockingQueue extends AbstractQueue return current != null; } + /** + * Unlike other traversal methods, iterators need to handle: + * - dequeued nodes (p.next == p) + * - interior removed nodes (p.item == null) + */ + private Node nextNode(Node p) { + Node s = p.next; + if (p == s) + return head.next; + // Skip over removed nodes. + // May be necessary if multiple interior Nodes are removed. + while (s != null && s.item == null) + s = s.next; + return s; + } + public E next() { - final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; - final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; - putLock.lock(); - takeLock.lock(); + fullyLock(); try { if (current == null) throw new NoSuchElementException(); E x = currentElement; lastRet = current; - current = current.next; - if (current != null) - currentElement = current.item; + current = nextNode(current); + currentElement = (current == null) ? null : current.item; return x; } finally { - takeLock.unlock(); - putLock.unlock(); + fullyUnlock(); } } public void remove() { if (lastRet == null) throw new IllegalStateException(); - final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; - final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; - putLock.lock(); - takeLock.lock(); + fullyLock(); try { Node node = lastRet; lastRet = null; - Node trail = head; - Node p = head.next; - while (p != null && p != node) { - trail = p; - p = p.next; - } - if (p == node) { - p.item = null; - trail.next = p.next; - if (last == p) - last = trail; - int c = count.getAndDecrement(); - if (c == capacity) - notFull.signalAll(); + for (Node trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { + if (p == node) { + unlink(p, trail); + break; + } } } finally { - takeLock.unlock(); - putLock.unlock(); + fullyUnlock(); } } } @@ -789,7 +820,7 @@ public class LinkedBlockingQueue extends AbstractQueue * Save the state to a stream (that is, serialize it). * * @serialData The capacity is emitted (int), followed by all of - * its elements (each an Object) in the proper order, + * its elements (each an {@code Object}) in the proper order, * followed by a null * @param s the stream */ @@ -815,6 +846,7 @@ public class LinkedBlockingQueue extends AbstractQueue /** * Reconstitute this queue instance from a stream (that is, * deserialize it). + * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) @@ -827,6 +859,7 @@ public class LinkedBlockingQueue extends AbstractQueue // Read in all elements and place in queue for (;;) { + @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) break; diff --git a/test/java/util/Collection/MOAT.java b/test/java/util/Collection/MOAT.java index f9d30f562..d2b6b6869 100644 --- a/test/java/util/Collection/MOAT.java +++ b/test/java/util/Collection/MOAT.java @@ -426,6 +426,36 @@ public class MOAT { q.poll(); equal(q.size(), 4); checkFunctionalInvariants(q); + if ((q instanceof LinkedBlockingQueue) || + (q instanceof LinkedBlockingDeque) || + (q instanceof ConcurrentLinkedQueue)) { + testQueueIteratorRemove(q); + } + } + + private static void testQueueIteratorRemove(Queue q) { + System.err.printf("testQueueIteratorRemove %s%n", + q.getClass().getSimpleName()); + q.clear(); + for (int i = 0; i < 5; i++) + q.add(i); + Iterator it = q.iterator(); + check(it.hasNext()); + for (int i = 3; i >= 0; i--) + q.remove(i); + equal(it.next(), 0); + equal(it.next(), 4); + + q.clear(); + for (int i = 0; i < 5; i++) + q.add(i); + it = q.iterator(); + equal(it.next(), 0); + check(it.hasNext()); + for (int i = 1; i < 4; i++) + q.remove(i); + equal(it.next(), 1); + equal(it.next(), 4); } private static void testList(final List l) { @@ -451,6 +481,11 @@ public class MOAT { } private static void testCollection(Collection c) { + try { testCollection1(c); } + catch (Throwable t) { unexpected(t); } + } + + private static void testCollection1(Collection c) { System.out.println("\n==> " + c.getClass().getName()); diff --git a/test/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java b/test/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java new file mode 100644 index 000000000..eb85f7bb3 --- /dev/null +++ b/test/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java @@ -0,0 +1,130 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +/* + * @test + * @bug 6805775 6815766 + * @summary Test concurrent offer vs. drainTo + */ + +import java.util.*; +import java.util.concurrent.*; + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class OfferDrainToLoops { + void checkNotContainsNull(Iterable it) { + for (Object x : it) + check(x != null); + } + + abstract class CheckedThread extends Thread { + abstract protected void realRun(); + public void run() { + try { realRun(); } catch (Throwable t) { unexpected(t); } + } + { + setDaemon(true); + start(); + } + } + + void test(String[] args) throws Throwable { + test(new LinkedBlockingQueue()); + test(new LinkedBlockingQueue(2000)); + test(new LinkedBlockingDeque()); + test(new LinkedBlockingDeque(2000)); + test(new ArrayBlockingQueue(2000)); + } + + void test(final BlockingQueue q) throws Throwable { + System.out.println(q.getClass().getSimpleName()); + final long testDurationSeconds = 1L; + final long testDurationMillis = testDurationSeconds * 1000L; + final long quittingTimeNanos + = System.nanoTime() + testDurationSeconds * 1000L * 1000L * 1000L; + + Thread offerer = new CheckedThread() { + protected void realRun() { + for (long i = 0; ; i++) { + if ((i % 1024) == 0 && + System.nanoTime() - quittingTimeNanos > 0) + break; + while (! q.offer(i)) + Thread.yield(); + }}}; + + Thread drainer = new CheckedThread() { + protected void realRun() { + for (long i = 0; ; i++) { + if (System.nanoTime() - quittingTimeNanos > 0) + break; + List list = new ArrayList(); + int n = q.drainTo(list); + equal(list.size(), n); + for (int j = 0; j < n - 1; j++) + equal((Long) list.get(j) + 1L, list.get(j + 1)); + Thread.yield(); + }}}; + + Thread scanner = new CheckedThread() { + protected void realRun() { + for (long i = 0; ; i++) { + if (System.nanoTime() - quittingTimeNanos > 0) + break; + checkNotContainsNull(q); + Thread.yield(); + }}}; + + offerer.join(10 * testDurationMillis); + drainer.join(10 * testDurationMillis); + check(! offerer.isAlive()); + check(! drainer.isAlive()); + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + public static void main(String[] args) throws Throwable { + new OfferDrainToLoops().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} +} diff --git a/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java b/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java new file mode 100644 index 000000000..727f64956 --- /dev/null +++ b/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java @@ -0,0 +1,93 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +import java.util.*; +import java.util.concurrent.*; + +/* + * @test + * @bug 6805775 6815766 + * @summary Check weak consistency of concurrent queue iterators + */ + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class IteratorWeakConsistency { + + void test(String[] args) throws Throwable { + test(new LinkedBlockingQueue()); + test(new LinkedBlockingQueue(20)); + test(new LinkedBlockingDeque()); + test(new LinkedBlockingDeque(20)); + test(new ConcurrentLinkedQueue()); + // Other concurrent queues (e.g. ArrayBlockingQueue) do not + // currently have weakly consistent iterators. + // test(new ArrayBlockingQueue(20)); + } + + void test(Queue q) throws Throwable { + // TODO: make this more general + for (int i = 0; i < 10; i++) + q.add(i); + Iterator it = q.iterator(); + q.poll(); + q.poll(); + q.poll(); + q.remove(7); + List list = new ArrayList(); + while (it.hasNext()) + list.add(it.next()); + equal(list, Arrays.asList(0, 3, 4, 5, 6, 8, 9)); + check(! list.contains(null)); + System.out.printf("%s: %s%n", + q.getClass().getSimpleName(), + list); + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + static Class thisClass = new Object(){}.getClass().getEnclosingClass(); + public static void main(String[] args) throws Throwable { + new IteratorWeakConsistency().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} +} -- GitLab