From c79c53a27f65610323c9cb93342dc8372a12344d Mon Sep 17 00:00:00 2001 From: dl Date: Mon, 20 Sep 2010 18:05:09 -0700 Subject: [PATCH] 6981113: Add ConcurrentLinkedDeque Summary: Extend techniques developed for ConcurrentLinkedQueue and LinkedTransferQueue to implement a non-blocking concurrent Deque with interior removes. Reviewed-by: martin, dholmes, chegar --- make/java/java/FILES_java.gmk | 1 + .../concurrent/ConcurrentLinkedDeque.java | 1445 +++++++++++++++++ .../concurrent/ConcurrentLinkedQueue.java | 362 +++-- test/java/util/Collection/BiggernYours.java | 5 + test/java/util/Collection/IteratorAtEnd.java | 1 + test/java/util/Collection/MOAT.java | 6 +- .../util/Collections/RacingCollections.java | 2 + test/java/util/Deque/ChorusLine.java | 1 + .../ConcurrentQueueLoops.java | 3 +- .../ConcurrentQueues/GCRetention.java | 2 + .../IteratorWeakConsistency.java | 1 + .../ConcurrentQueues/OfferRemoveLoops.java | 1 + .../ConcurrentQueues/RemovePollRace.java | 2 + 13 files changed, 1690 insertions(+), 142 deletions(-) create mode 100644 src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java diff --git a/make/java/java/FILES_java.gmk b/make/java/java/FILES_java.gmk index 416eeb343..f3e349ede 100644 --- a/make/java/java/FILES_java.gmk +++ b/make/java/java/FILES_java.gmk @@ -272,6 +272,7 @@ JAVA_JAVA_java = \ java/util/concurrent/CancellationException.java \ java/util/concurrent/CompletionService.java \ java/util/concurrent/ConcurrentHashMap.java \ + java/util/concurrent/ConcurrentLinkedDeque.java \ java/util/concurrent/ConcurrentLinkedQueue.java \ java/util/concurrent/ConcurrentMap.java \ java/util/concurrent/ConcurrentNavigableMap.java \ diff --git a/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java b/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java new file mode 100644 index 000000000..4837661a4 --- /dev/null +++ b/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java @@ -0,0 +1,1445 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea and Martin Buchholz with assistance from members of + * JCP JSR-166 Expert Group and released to the public domain, as explained + * at http://creativecommons.org/licenses/publicdomain + */ + +package java.util.concurrent; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.Deque; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Queue; + +/** + * An unbounded concurrent {@linkplain Deque deque} based on linked nodes. + * Concurrent insertion, removal, and access operations execute safely + * across multiple threads. + * A {@code ConcurrentLinkedDeque} is an appropriate choice when + * many threads will share access to a common collection. + * Like most other concurrent collection implementations, this class + * does not permit the use of {@code null} elements. + * + *

Iterators are weakly consistent, returning elements + * reflecting the state of the deque at some point at or since the + * creation of the iterator. They do not throw {@link + * java.util.ConcurrentModificationException + * ConcurrentModificationException}, and may proceed concurrently with + * other operations. + * + *

Beware that, unlike in most collections, the {@code size} + * method is NOT a constant-time operation. Because of the + * asynchronous nature of these deques, determining the current number + * of elements requires a traversal of the elements. + * + *

This class and its iterator implement all of the optional + * methods of the {@link Deque} and {@link Iterator} interfaces. + * + *

Memory consistency effects: As with other concurrent collections, + * actions in a thread prior to placing an object into a + * {@code ConcurrentLinkedDeque} + * happen-before + * actions subsequent to the access or removal of that element from + * the {@code ConcurrentLinkedDeque} in another thread. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.7 + * @author Doug Lea + * @author Martin Buchholz + * @param the type of elements held in this collection + */ + +public class ConcurrentLinkedDeque + extends AbstractCollection + implements Deque, java.io.Serializable { + + /* + * This is an implementation of a concurrent lock-free deque + * supporting interior removes but not interior insertions, as + * required to support the entire Deque interface. + * + * We extend the techniques developed for ConcurrentLinkedQueue and + * LinkedTransferQueue (see the internal docs for those classes). + * Understanding the ConcurrentLinkedQueue implementation is a + * prerequisite for understanding the implementation of this class. + * + * The data structure is a symmetrical doubly-linked "GC-robust" + * linked list of nodes. We minimize the number of volatile writes + * using two techniques: advancing multiple hops with a single CAS + * and mixing volatile and non-volatile writes of the same memory + * locations. + * + * A node contains the expected E ("item") and links to predecessor + * ("prev") and successor ("next") nodes: + * + * class Node { volatile Node prev, next; volatile E item; } + * + * A node p is considered "live" if it contains a non-null item + * (p.item != null). When an item is CASed to null, the item is + * atomically logically deleted from the collection. + * + * At any time, there is precisely one "first" node with a null + * prev reference that terminates any chain of prev references + * starting at a live node. Similarly there is precisely one + * "last" node terminating any chain of next references starting at + * a live node. The "first" and "last" nodes may or may not be live. + * The "first" and "last" nodes are always mutually reachable. + * + * A new element is added atomically by CASing the null prev or + * next reference in the first or last node to a fresh node + * containing the element. The element's node atomically becomes + * "live" at that point. + * + * A node is considered "active" if it is a live node, or the + * first or last node. Active nodes cannot be unlinked. + * + * A "self-link" is a next or prev reference that is the same node: + * p.prev == p or p.next == p + * Self-links are used in the node unlinking process. Active nodes + * never have self-links. + * + * A node p is active if and only if: + * + * p.item != null || + * (p.prev == null && p.next != p) || + * (p.next == null && p.prev != p) + * + * The deque object has two node references, "head" and "tail". + * The head and tail are only approximations to the first and last + * nodes of the deque. The first node can always be found by + * following prev pointers from head; likewise for tail. However, + * it is permissible for head and tail to be referring to deleted + * nodes that have been unlinked and so may not be reachable from + * any live node. + * + * There are 3 stages of node deletion; + * "logical deletion", "unlinking", and "gc-unlinking". + * + * 1. "logical deletion" by CASing item to null atomically removes + * the element from the collection, and makes the containing node + * eligible for unlinking. + * + * 2. "unlinking" makes a deleted node unreachable from active + * nodes, and thus eventually reclaimable by GC. Unlinked nodes + * may remain reachable indefinitely from an iterator. + * + * Physical node unlinking is merely an optimization (albeit a + * critical one), and so can be performed at our convenience. At + * any time, the set of live nodes maintained by prev and next + * links are identical, that is, the live nodes found via next + * links from the first node is equal to the elements found via + * prev links from the last node. However, this is not true for + * nodes that have already been logically deleted - such nodes may + * be reachable in one direction only. + * + * 3. "gc-unlinking" takes unlinking further by making active + * nodes unreachable from deleted nodes, making it easier for the + * GC to reclaim future deleted nodes. This step makes the data + * structure "gc-robust", as first described in detail by Boehm + * (http://portal.acm.org/citation.cfm?doid=503272.503282). + * + * GC-unlinked nodes may remain reachable indefinitely from an + * iterator, but unlike unlinked nodes, are never reachable from + * head or tail. + * + * Making the data structure GC-robust will eliminate the risk of + * unbounded memory retention with conservative GCs and is likely + * to improve performance with generational GCs. + * + * When a node is dequeued at either end, e.g. via poll(), we would + * like to break any references from the node to active nodes. We + * develop further the use of self-links that was very effective in + * other concurrent collection classes. The idea is to replace + * prev and next pointers with special values that are interpreted + * to mean off-the-list-at-one-end. These are approximations, but + * good enough to preserve the properties we want in our + * traversals, e.g. we guarantee that a traversal will never visit + * the same element twice, but we don't guarantee whether a + * traversal that runs out of elements will be able to see more + * elements later after enqueues at that end. Doing gc-unlinking + * safely is particularly tricky, since any node can be in use + * indefinitely (for example by an iterator). We must ensure that + * the nodes pointed at by head/tail never get gc-unlinked, since + * head/tail are needed to get "back on track" by other nodes that + * are gc-unlinked. gc-unlinking accounts for much of the + * implementation complexity. + * + * Since neither unlinking nor gc-unlinking are necessary for + * correctness, there are many implementation choices regarding + * frequency (eagerness) of these operations. Since volatile + * reads are likely to be much cheaper than CASes, saving CASes by + * unlinking multiple adjacent nodes at a time may be a win. + * gc-unlinking can be performed rarely and still be effective, + * since it is most important that long chains of deleted nodes + * are occasionally broken. + * + * The actual representation we use is that p.next == p means to + * goto the first node (which in turn is reached by following prev + * pointers from head), and p.next == null && p.prev == p means + * that the iteration is at an end and that p is a (final static) + * dummy node, NEXT_TERMINATOR, and not the last active node. + * Finishing the iteration when encountering such a TERMINATOR is + * good enough for read-only traversals, so such traversals can use + * p.next == null as the termination condition. When we need to + * find the last (active) node, for enqueueing a new node, we need + * to check whether we have reached a TERMINATOR node; if so, + * restart traversal from tail. + * + * The implementation is completely directionally symmetrical, + * except that most public methods that iterate through the list + * follow next pointers ("forward" direction). + * + * We believe (without full proof) that all single-element deque + * operations (e.g., addFirst, peekLast, pollLast) are linearizable + * (see Herlihy and Shavit's book). However, some combinations of + * operations are known not to be linearizable. In particular, + * when an addFirst(A) is racing with pollFirst() removing B, it is + * possible for an observer iterating over the elements to observe + * A B C and subsequently observe A C, even though no interior + * removes are ever performed. Nevertheless, iterators behave + * reasonably, providing the "weakly consistent" guarantees. + * + * Empirically, microbenchmarks suggest that this class adds about + * 40% overhead relative to ConcurrentLinkedQueue, which feels as + * good as we can hope for. + */ + + private static final long serialVersionUID = 876323262645176354L; + + /** + * A node from which the first node on list (that is, the unique node p + * with p.prev == null && p.next != p) can be reached in O(1) time. + * Invariants: + * - the first node is always O(1) reachable from head via prev links + * - all live nodes are reachable from the first node via succ() + * - head != null + * - (tmp = head).next != tmp || tmp != head + * - head is never gc-unlinked (but may be unlinked) + * Non-invariants: + * - head.item may or may not be null + * - head may not be reachable from the first or last node, or from tail + */ + private transient volatile Node head; + + /** + * A node from which the last node on list (that is, the unique node p + * with p.next == null && p.prev != p) can be reached in O(1) time. + * Invariants: + * - the last node is always O(1) reachable from tail via next links + * - all live nodes are reachable from the last node via pred() + * - tail != null + * - tail is never gc-unlinked (but may be unlinked) + * Non-invariants: + * - tail.item may or may not be null + * - tail may not be reachable from the first or last node, or from head + */ + private transient volatile Node tail; + + private final static Node PREV_TERMINATOR, NEXT_TERMINATOR; + + static { + PREV_TERMINATOR = new Node(null); + PREV_TERMINATOR.next = PREV_TERMINATOR; + NEXT_TERMINATOR = new Node(null); + NEXT_TERMINATOR.prev = NEXT_TERMINATOR; + } + + @SuppressWarnings("unchecked") + Node prevTerminator() { + return (Node) PREV_TERMINATOR; + } + + @SuppressWarnings("unchecked") + Node nextTerminator() { + return (Node) NEXT_TERMINATOR; + } + + static final class Node { + volatile Node prev; + volatile E item; + volatile Node next; + + /** + * Constructs a new node. Uses relaxed write because item can + * only be seen after publication via casNext or casPrev. + */ + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + + void lazySetPrev(Node val) { + UNSAFE.putOrderedObject(this, prevOffset, val); + } + + boolean casPrev(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val); + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = + sun.misc.Unsafe.getUnsafe(); + private static final long prevOffset = + objectFieldOffset(UNSAFE, "prev", Node.class); + private static final long itemOffset = + objectFieldOffset(UNSAFE, "item", Node.class); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", Node.class); + } + + /** + * Links e as first element. + */ + private void linkFirst(E e) { + checkNotNull(e); + final Node newNode = new Node(e); + + restartFromHead: + for (;;) + for (Node h = head, p = h, q;;) { + if ((q = p.prev) != null && + (q = (p = q).prev) != null) + // Check for head updates every other hop. + // If p == q, we are sure to follow head instead. + p = (h != (h = head)) ? h : q; + else if (p.next == p) // PREV_TERMINATOR + continue restartFromHead; + else { + // p is first node + newNode.lazySetNext(p); // CAS piggyback + if (p.casPrev(null, newNode)) { + // Successful CAS is the linearization point + // for e to become an element of this deque, + // and for newNode to become "live". + if (p != h) // hop two nodes at a time + casHead(h, newNode); // Failure is OK. + return; + } + // Lost CAS race to another thread; re-read prev + } + } + } + + /** + * Links e as last element. + */ + private void linkLast(E e) { + checkNotNull(e); + final Node newNode = new Node(e); + + restartFromTail: + for (;;) + for (Node t = tail, p = t, q;;) { + if ((q = p.next) != null && + (q = (p = q).next) != null) + // Check for tail updates every other hop. + // If p == q, we are sure to follow tail instead. + p = (t != (t = tail)) ? t : q; + else if (p.prev == p) // NEXT_TERMINATOR + continue restartFromTail; + else { + // p is last node + newNode.lazySetPrev(p); // CAS piggyback + if (p.casNext(null, newNode)) { + // Successful CAS is the linearization point + // for e to become an element of this deque, + // and for newNode to become "live". + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. + return; + } + // Lost CAS race to another thread; re-read next + } + } + } + + private final static int HOPS = 2; + + /** + * Unlinks non-null node x. + */ + void unlink(Node x) { + // assert x != null; + // assert x.item == null; + // assert x != PREV_TERMINATOR; + // assert x != NEXT_TERMINATOR; + + final Node prev = x.prev; + final Node next = x.next; + if (prev == null) { + unlinkFirst(x, next); + } else if (next == null) { + unlinkLast(x, prev); + } else { + // Unlink interior node. + // + // This is the common case, since a series of polls at the + // same end will be "interior" removes, except perhaps for + // the first one, since end nodes cannot be unlinked. + // + // At any time, all active nodes are mutually reachable by + // following a sequence of either next or prev pointers. + // + // Our strategy is to find the unique active predecessor + // and successor of x. Try to fix up their links so that + // they point to each other, leaving x unreachable from + // active nodes. If successful, and if x has no live + // predecessor/successor, we additionally try to gc-unlink, + // leaving active nodes unreachable from x, by rechecking + // that the status of predecessor and successor are + // unchanged and ensuring that x is not reachable from + // tail/head, before setting x's prev/next links to their + // logical approximate replacements, self/TERMINATOR. + Node activePred, activeSucc; + boolean isFirst, isLast; + int hops = 1; + + // Find active predecessor + for (Node p = prev; ; ++hops) { + if (p.item != null) { + activePred = p; + isFirst = false; + break; + } + Node q = p.prev; + if (q == null) { + if (p.next == p) + return; + activePred = p; + isFirst = true; + break; + } + else if (p == q) + return; + else + p = q; + } + + // Find active successor + for (Node p = next; ; ++hops) { + if (p.item != null) { + activeSucc = p; + isLast = false; + break; + } + Node q = p.next; + if (q == null) { + if (p.prev == p) + return; + activeSucc = p; + isLast = true; + break; + } + else if (p == q) + return; + else + p = q; + } + + // TODO: better HOP heuristics + if (hops < HOPS + // always squeeze out interior deleted nodes + && (isFirst | isLast)) + return; + + // Squeeze out deleted nodes between activePred and + // activeSucc, including x. + skipDeletedSuccessors(activePred); + skipDeletedPredecessors(activeSucc); + + // Try to gc-unlink, if possible + if ((isFirst | isLast) && + + // Recheck expected state of predecessor and successor + (activePred.next == activeSucc) && + (activeSucc.prev == activePred) && + (isFirst ? activePred.prev == null : activePred.item != null) && + (isLast ? activeSucc.next == null : activeSucc.item != null)) { + + updateHead(); // Ensure x is not reachable from head + updateTail(); // Ensure x is not reachable from tail + + // Finally, actually gc-unlink + x.lazySetPrev(isFirst ? prevTerminator() : x); + x.lazySetNext(isLast ? nextTerminator() : x); + } + } + } + + /** + * Unlinks non-null first node. + */ + private void unlinkFirst(Node first, Node next) { + // assert first != null; + // assert next != null; + // assert first.item == null; + for (Node o = null, p = next, q;;) { + if (p.item != null || (q = p.next) == null) { + if (o != null && p.prev != p && first.casNext(next, p)) { + skipDeletedPredecessors(p); + if (first.prev == null && + (p.next == null || p.item != null) && + p.prev == first) { + + updateHead(); // Ensure o is not reachable from head + updateTail(); // Ensure o is not reachable from tail + + // Finally, actually gc-unlink + o.lazySetNext(o); + o.lazySetPrev(prevTerminator()); + } + } + return; + } + else if (p == q) + return; + else { + o = p; + p = q; + } + } + } + + /** + * Unlinks non-null last node. + */ + private void unlinkLast(Node last, Node prev) { + // assert last != null; + // assert prev != null; + // assert last.item == null; + for (Node o = null, p = prev, q;;) { + if (p.item != null || (q = p.prev) == null) { + if (o != null && p.next != p && last.casPrev(prev, p)) { + skipDeletedSuccessors(p); + if (last.next == null && + (p.prev == null || p.item != null) && + p.next == last) { + + updateHead(); // Ensure o is not reachable from head + updateTail(); // Ensure o is not reachable from tail + + // Finally, actually gc-unlink + o.lazySetPrev(o); + o.lazySetNext(nextTerminator()); + } + } + return; + } + else if (p == q) + return; + else { + o = p; + p = q; + } + } + } + + /** + * Guarantees that any node which was unlinked before a call to + * this method will be unreachable from head after it returns. + * Does not guarantee to eliminate slack, only that head will + * point to a node that was active while this method was running. + */ + private final void updateHead() { + // Either head already points to an active node, or we keep + // trying to cas it to the first node until it does. + Node h, p, q; + restartFromHead: + while ((h = head).item == null && (p = h.prev) != null) { + for (;;) { + if ((q = p.prev) == null || + (q = (p = q).prev) == null) { + // It is possible that p is PREV_TERMINATOR, + // but if so, the CAS is guaranteed to fail. + if (casHead(h, p)) + return; + else + continue restartFromHead; + } + else if (h != head) + continue restartFromHead; + else + p = q; + } + } + } + + /** + * Guarantees that any node which was unlinked before a call to + * this method will be unreachable from tail after it returns. + * Does not guarantee to eliminate slack, only that tail will + * point to a node that was active while this method was running. + */ + private final void updateTail() { + // Either tail already points to an active node, or we keep + // trying to cas it to the last node until it does. + Node t, p, q; + restartFromTail: + while ((t = tail).item == null && (p = t.next) != null) { + for (;;) { + if ((q = p.next) == null || + (q = (p = q).next) == null) { + // It is possible that p is NEXT_TERMINATOR, + // but if so, the CAS is guaranteed to fail. + if (casTail(t, p)) + return; + else + continue restartFromTail; + } + else if (t != tail) + continue restartFromTail; + else + p = q; + } + } + } + + private void skipDeletedPredecessors(Node x) { + whileActive: + do { + Node prev = x.prev; + // assert prev != null; + // assert x != NEXT_TERMINATOR; + // assert x != PREV_TERMINATOR; + Node p = prev; + findActive: + for (;;) { + if (p.item != null) + break findActive; + Node q = p.prev; + if (q == null) { + if (p.next == p) + continue whileActive; + break findActive; + } + else if (p == q) + continue whileActive; + else + p = q; + } + + // found active CAS target + if (prev == p || x.casPrev(prev, p)) + return; + + } while (x.item != null || x.next == null); + } + + private void skipDeletedSuccessors(Node x) { + whileActive: + do { + Node next = x.next; + // assert next != null; + // assert x != NEXT_TERMINATOR; + // assert x != PREV_TERMINATOR; + Node p = next; + findActive: + for (;;) { + if (p.item != null) + break findActive; + Node q = p.next; + if (q == null) { + if (p.prev == p) + continue whileActive; + break findActive; + } + else if (p == q) + continue whileActive; + else + p = q; + } + + // found active CAS target + if (next == p || x.casNext(next, p)) + return; + + } while (x.item != null || x.prev == null); + } + + /** + * Returns the successor of p, or the first node if p.next has been + * linked to self, which will only be true if traversing with a + * stale pointer that is now off the list. + */ + final Node succ(Node p) { + // TODO: should we skip deleted nodes here? + Node q = p.next; + return (p == q) ? first() : q; + } + + /** + * Returns the predecessor of p, or the last node if p.prev has been + * linked to self, which will only be true if traversing with a + * stale pointer that is now off the list. + */ + final Node pred(Node p) { + Node q = p.prev; + return (p == q) ? last() : q; + } + + /** + * Returns the first node, the unique node p for which: + * p.prev == null && p.next != p + * The returned node may or may not be logically deleted. + * Guarantees that head is set to the returned node. + */ + Node first() { + restartFromHead: + for (;;) + for (Node h = head, p = h, q;;) { + if ((q = p.prev) != null && + (q = (p = q).prev) != null) + // Check for head updates every other hop. + // If p == q, we are sure to follow head instead. + p = (h != (h = head)) ? h : q; + else if (p == h + // It is possible that p is PREV_TERMINATOR, + // but if so, the CAS is guaranteed to fail. + || casHead(h, p)) + return p; + else + continue restartFromHead; + } + } + + /** + * Returns the last node, the unique node p for which: + * p.next == null && p.prev != p + * The returned node may or may not be logically deleted. + * Guarantees that tail is set to the returned node. + */ + Node last() { + restartFromTail: + for (;;) + for (Node t = tail, p = t, q;;) { + if ((q = p.next) != null && + (q = (p = q).next) != null) + // Check for tail updates every other hop. + // If p == q, we are sure to follow tail instead. + p = (t != (t = tail)) ? t : q; + else if (p == t + // It is possible that p is NEXT_TERMINATOR, + // but if so, the CAS is guaranteed to fail. + || casTail(t, p)) + return p; + else + continue restartFromTail; + } + } + + // Minor convenience utilities + + /** + * Throws NullPointerException if argument is null. + * + * @param v the element + */ + private static void checkNotNull(Object v) { + if (v == null) + throw new NullPointerException(); + } + + /** + * Returns element unless it is null, in which case throws + * NoSuchElementException. + * + * @param v the element + * @return the element + */ + private E screenNullResult(E v) { + if (v == null) + throw new NoSuchElementException(); + return v; + } + + /** + * Creates an array list and fills it with elements of this list. + * Used by toArray. + * + * @return the arrayList + */ + private ArrayList toArrayList() { + ArrayList list = new ArrayList(); + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + if (item != null) + list.add(item); + } + return list; + } + + /** + * Constructs an empty deque. + */ + public ConcurrentLinkedDeque() { + head = tail = new Node(null); + } + + /** + * Constructs a deque initially containing the elements of + * the given collection, added in traversal order of the + * collection's iterator. + * + * @param c the collection of elements to initially contain + * @throws NullPointerException if the specified collection or any + * of its elements are null + */ + public ConcurrentLinkedDeque(Collection c) { + // Copy c into a private chain of Nodes + Node h = null, t = null; + for (E e : c) { + checkNotNull(e); + Node newNode = new Node(e); + if (h == null) + h = t = newNode; + else { + t.lazySetNext(newNode); + newNode.lazySetPrev(t); + t = newNode; + } + } + initHeadTail(h, t); + } + + /** + * Initializes head and tail, ensuring invariants hold. + */ + private void initHeadTail(Node h, Node t) { + if (h == t) { + if (h == null) + h = t = new Node(null); + else { + // Avoid edge case of a single Node with non-null item. + Node newNode = new Node(null); + t.lazySetNext(newNode); + newNode.lazySetPrev(t); + t = newNode; + } + } + head = h; + tail = t; + } + + /** + * Inserts the specified element at the front of this deque. + * + * @throws NullPointerException {@inheritDoc} + */ + public void addFirst(E e) { + linkFirst(e); + } + + /** + * Inserts the specified element at the end of this deque. + * + *

This method is equivalent to {@link #add}. + * + * @throws NullPointerException {@inheritDoc} + */ + public void addLast(E e) { + linkLast(e); + } + + /** + * Inserts the specified element at the front of this deque. + * + * @return {@code true} always + * @throws NullPointerException {@inheritDoc} + */ + public boolean offerFirst(E e) { + linkFirst(e); + return true; + } + + /** + * Inserts the specified element at the end of this deque. + * + *

This method is equivalent to {@link #add}. + * + * @return {@code true} always + * @throws NullPointerException {@inheritDoc} + */ + public boolean offerLast(E e) { + linkLast(e); + return true; + } + + public E peekFirst() { + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + if (item != null) + return item; + } + return null; + } + + public E peekLast() { + for (Node p = last(); p != null; p = pred(p)) { + E item = p.item; + if (item != null) + return item; + } + return null; + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E getFirst() { + return screenNullResult(peekFirst()); + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E getLast() { + return screenNullResult(peekLast()); + } + + public E pollFirst() { + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + if (item != null && p.casItem(item, null)) { + unlink(p); + return item; + } + } + return null; + } + + public E pollLast() { + for (Node p = last(); p != null; p = pred(p)) { + E item = p.item; + if (item != null && p.casItem(item, null)) { + unlink(p); + return item; + } + } + return null; + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E removeFirst() { + return screenNullResult(pollFirst()); + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E removeLast() { + return screenNullResult(pollLast()); + } + + // *** Queue and stack methods *** + + /** + * Inserts the specified element at the tail of this deque. + * + * @return {@code true} (as specified by {@link Queue#offer}) + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e) { + return offerLast(e); + } + + /** + * Inserts the specified element at the tail of this deque. + * + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null + */ + public boolean add(E e) { + return offerLast(e); + } + + public E poll() { return pollFirst(); } + public E remove() { return removeFirst(); } + public E peek() { return peekFirst(); } + public E element() { return getFirst(); } + public void push(E e) { addFirst(e); } + public E pop() { return removeFirst(); } + + /** + * Removes the first element {@code e} such that + * {@code o.equals(e)}, if such an element exists in this deque. + * If the deque does not contain the element, it is unchanged. + * + * @param o element to be removed from this deque, if present + * @return {@code true} if the deque contained the specified element + * @throws NullPointerException if the specified element is {@code null} + */ + public boolean removeFirstOccurrence(Object o) { + checkNotNull(o); + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + if (item != null && o.equals(item) && p.casItem(item, null)) { + unlink(p); + return true; + } + } + return false; + } + + /** + * Removes the last element {@code e} such that + * {@code o.equals(e)}, if such an element exists in this deque. + * If the deque does not contain the element, it is unchanged. + * + * @param o element to be removed from this deque, if present + * @return {@code true} if the deque contained the specified element + * @throws NullPointerException if the specified element is {@code null} + */ + public boolean removeLastOccurrence(Object o) { + checkNotNull(o); + for (Node p = last(); p != null; p = pred(p)) { + E item = p.item; + if (item != null && o.equals(item) && p.casItem(item, null)) { + unlink(p); + return true; + } + } + return false; + } + + /** + * Returns {@code true} if this deque contains at least one + * element {@code e} such that {@code o.equals(e)}. + * + * @param o element whose presence in this deque is to be tested + * @return {@code true} if this deque contains the specified element + */ + public boolean contains(Object o) { + if (o == null) return false; + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + if (item != null && o.equals(item)) + return true; + } + return false; + } + + /** + * Returns {@code true} if this collection contains no elements. + * + * @return {@code true} if this collection contains no elements + */ + public boolean isEmpty() { + return peekFirst() == null; + } + + /** + * Returns the number of elements in this deque. If this deque + * contains more than {@code Integer.MAX_VALUE} elements, it + * returns {@code Integer.MAX_VALUE}. + * + *

Beware that, unlike in most collections, this method is + * NOT a constant-time operation. Because of the + * asynchronous nature of these deques, determining the current + * number of elements requires traversing them all to count them. + * Additionally, it is possible for the size to change during + * execution of this method, in which case the returned result + * will be inaccurate. Thus, this method is typically not very + * useful in concurrent applications. + * + * @return the number of elements in this deque + */ + public int size() { + int count = 0; + for (Node p = first(); p != null; p = succ(p)) + if (p.item != null) + // Collection.size() spec says to max out + if (++count == Integer.MAX_VALUE) + break; + return count; + } + + /** + * Removes the first element {@code e} such that + * {@code o.equals(e)}, if such an element exists in this deque. + * If the deque does not contain the element, it is unchanged. + * + * @param o element to be removed from this deque, if present + * @return {@code true} if the deque contained the specified element + * @throws NullPointerException if the specified element is {@code null} + */ + public boolean remove(Object o) { + return removeFirstOccurrence(o); + } + + /** + * Appends all of the elements in the specified collection to the end of + * this deque, in the order that they are returned by the specified + * collection's iterator. Attempts to {@code addAll} of a deque to + * itself result in {@code IllegalArgumentException}. + * + * @param c the elements to be inserted into this deque + * @return {@code true} if this deque changed as a result of the call + * @throws NullPointerException if the specified collection or any + * of its elements are null + * @throws IllegalArgumentException if the collection is this deque + */ + public boolean addAll(Collection c) { + if (c == this) + // As historically specified in AbstractQueue#addAll + throw new IllegalArgumentException(); + + // Copy c into a private chain of Nodes + Node beginningOfTheEnd = null, last = null; + for (E e : c) { + checkNotNull(e); + Node newNode = new Node(e); + if (beginningOfTheEnd == null) + beginningOfTheEnd = last = newNode; + else { + last.lazySetNext(newNode); + newNode.lazySetPrev(last); + last = newNode; + } + } + if (beginningOfTheEnd == null) + return false; + + // Atomically append the chain at the tail of this collection + restartFromTail: + for (;;) + for (Node t = tail, p = t, q;;) { + if ((q = p.next) != null && + (q = (p = q).next) != null) + // Check for tail updates every other hop. + // If p == q, we are sure to follow tail instead. + p = (t != (t = tail)) ? t : q; + else if (p.prev == p) // NEXT_TERMINATOR + continue restartFromTail; + else { + // p is last node + beginningOfTheEnd.lazySetPrev(p); // CAS piggyback + if (p.casNext(null, beginningOfTheEnd)) { + // Successful CAS is the linearization point + // for all elements to be added to this queue. + if (!casTail(t, last)) { + // Try a little harder to update tail, + // since we may be adding many elements. + t = tail; + if (last.next == null) + casTail(t, last); + } + return true; + } + // Lost CAS race to another thread; re-read next + } + } + } + + /** + * Removes all of the elements from this deque. + */ + public void clear() { + while (pollFirst() != null) + ; + } + + /** + * Returns an array containing all of the elements in this deque, in + * proper sequence (from first to last element). + * + *

The returned array will be "safe" in that no references to it are + * maintained by this deque. (In other words, this method must allocate + * a new array). The caller is thus free to modify the returned array. + * + *

This method acts as bridge between array-based and collection-based + * APIs. + * + * @return an array containing all of the elements in this deque + */ + public Object[] toArray() { + return toArrayList().toArray(); + } + + /** + * Returns an array containing all of the elements in this deque, + * in proper sequence (from first to last element); the runtime + * type of the returned array is that of the specified array. If + * the deque fits in the specified array, it is returned therein. + * Otherwise, a new array is allocated with the runtime type of + * the specified array and the size of this deque. + * + *

If this deque fits in the specified array with room to spare + * (i.e., the array has more elements than this deque), the element in + * the array immediately following the end of the deque is set to + * {@code null}. + * + *

Like the {@link #toArray()} method, this method acts as + * bridge between array-based and collection-based APIs. Further, + * this method allows precise control over the runtime type of the + * output array, and may, under certain circumstances, be used to + * save allocation costs. + * + *

Suppose {@code x} is a deque known to contain only strings. + * The following code can be used to dump the deque into a newly + * allocated array of {@code String}: + * + *

+     *     String[] y = x.toArray(new String[0]);
+ * + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. + * + * @param a the array into which the elements of the deque are to + * be stored, if it is big enough; otherwise, a new array of the + * same runtime type is allocated for this purpose + * @return an array containing all of the elements in this deque + * @throws ArrayStoreException if the runtime type of the specified array + * is not a supertype of the runtime type of every element in + * this deque + * @throws NullPointerException if the specified array is null + */ + public T[] toArray(T[] a) { + return toArrayList().toArray(a); + } + + /** + * Returns an iterator over the elements in this deque in proper sequence. + * The elements will be returned in order from first (head) to last (tail). + * + *

The returned {@code Iterator} is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException + * ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this deque in proper sequence + */ + public Iterator iterator() { + return new Itr(); + } + + /** + * Returns an iterator over the elements in this deque in reverse + * sequential order. The elements will be returned in order from + * last (tail) to first (head). + * + *

The returned {@code Iterator} is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException + * ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this deque in reverse order + */ + public Iterator descendingIterator() { + return new DescendingItr(); + } + + private abstract class AbstractItr implements Iterator { + /** + * Next node to return item for. + */ + private Node nextNode; + + /** + * nextItem holds on to item fields because once we claim + * that an element exists in hasNext(), we must return it in + * the following next() call even if it was in the process of + * being removed when hasNext() was called. + */ + private E nextItem; + + /** + * Node returned by most recent call to next. Needed by remove. + * Reset to null if this element is deleted by a call to remove. + */ + private Node lastRet; + + abstract Node startNode(); + abstract Node nextNode(Node p); + + AbstractItr() { + advance(); + } + + /** + * Sets nextNode and nextItem to next valid node, or to null + * if no such. + */ + private void advance() { + lastRet = nextNode; + + Node p = (nextNode == null) ? startNode() : nextNode(nextNode); + for (;; p = nextNode(p)) { + if (p == null) { + // p might be active end or TERMINATOR node; both are OK + nextNode = null; + nextItem = null; + break; + } + E item = p.item; + if (item != null) { + nextNode = p; + nextItem = item; + break; + } + } + } + + public boolean hasNext() { + return nextItem != null; + } + + public E next() { + E item = nextItem; + if (item == null) throw new NoSuchElementException(); + advance(); + return item; + } + + public void remove() { + Node l = lastRet; + if (l == null) throw new IllegalStateException(); + l.item = null; + unlink(l); + lastRet = null; + } + } + + /** Forward iterator */ + private class Itr extends AbstractItr { + Node startNode() { return first(); } + Node nextNode(Node p) { return succ(p); } + } + + /** Descending iterator */ + private class DescendingItr extends AbstractItr { + Node startNode() { return last(); } + Node nextNode(Node p) { return pred(p); } + } + + /** + * Saves the state to a stream (that is, serializes it). + * + * @serialData All of the elements (each an {@code E}) in + * the proper order, followed by a null + * @param s the stream + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + + // Write out any hidden stuff + s.defaultWriteObject(); + + // Write out all elements in the proper order. + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + if (item != null) + s.writeObject(item); + } + + // Use trailing null as sentinel + s.writeObject(null); + } + + /** + * Reconstitutes the instance from a stream (that is, deserializes it). + * @param s the stream + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + + // Read in elements until trailing null sentinel found + Node h = null, t = null; + Object item; + while ((item = s.readObject()) != null) { + @SuppressWarnings("unchecked") + Node newNode = new Node((E) item); + if (h == null) + h = t = newNode; + else { + t.lazySetNext(newNode); + newNode.lazySetPrev(t); + t = newNode; + } + } + initHeadTail(h, t); + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = + sun.misc.Unsafe.getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", ConcurrentLinkedDeque.class); + private static final long tailOffset = + objectFieldOffset(UNSAFE, "tail", ConcurrentLinkedDeque.class); + + private boolean casHead(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); + } + + private boolean casTail(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); + } + + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } +} diff --git a/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java b/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java index 081d18eee..0d6381cd9 100644 --- a/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java +++ b/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java @@ -28,9 +28,9 @@ * However, the following notice accompanied the original version of this * file: * - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * Written by Doug Lea and Martin Buchholz with assistance from members of + * JCP JSR-166 Expert Group and released to the public domain, as explained + * at http://creativecommons.org/licenses/publicdomain */ package java.util.concurrent; @@ -53,7 +53,8 @@ import java.util.Queue; * operations obtain elements at the head of the queue. * A {@code ConcurrentLinkedQueue} is an appropriate choice when * many threads will share access to a common collection. - * This queue does not permit {@code null} elements. + * Like most other concurrent collection implementations, this class + * does not permit the use of {@code null} elements. * *

This implementation employs an efficient "wait-free" * algorithm based on one described in by Maged M. Michael and Michael L. Scott. * + *

Iterators are weakly consistent, returning elements + * reflecting the state of the queue at some point at or since the + * creation of the iterator. They do not throw {@link + * ConcurrentModificationException}, and may proceed concurrently with + * other operations. Elements contained in the queue since the creation + * of the iterator will be returned exactly once. + * *

Beware that, unlike in most collections, the {@code size} method * is NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current number * of elements requires a traversal of the elements. * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. + *

This class and its iterator implement all of the optional + * methods of the {@link Queue} and {@link Iterator} interfaces. * *

Memory consistency effects: As with other concurrent * collections, actions in a thread prior to placing an object into a @@ -132,9 +139,10 @@ public class ConcurrentLinkedQueue extends AbstractQueue * * Both head and tail are permitted to lag. In fact, failing to * update them every time one could is a significant optimization - * (fewer CASes). This is controlled by local "hops" variables - * that only trigger helping-CASes after experiencing multiple - * lags. + * (fewer CASes). As with LinkedTransferQueue (see the internal + * documentation for that class), we use a slack threshold of two; + * that is, we update head/tail when the current pointer appears + * to be two or more steps away from the first/last node. * * Since head and tail are updated concurrently and independently, * it is possible for tail to lag behind head (why not)? @@ -148,8 +156,8 @@ public class ConcurrentLinkedQueue extends AbstractQueue * this is merely an optimization. * * When constructing a Node (before enqueuing it) we avoid paying - * for a volatile write to item by using lazySet instead of a - * normal write. This allows the cost of enqueue to be + * for a volatile write to item by using Unsafe.putObject instead + * of a normal write. This allows the cost of enqueue to be * "one-and-a-half" CASes. * * Both head and tail may or may not point to a Node with a @@ -161,38 +169,25 @@ public class ConcurrentLinkedQueue extends AbstractQueue */ private static class Node { - private volatile E item; - private volatile Node next; + volatile E item; + volatile Node next; + /** + * Constructs a new node. Uses relaxed write because item can + * only be seen after publication via casNext. + */ Node(E item) { - // Piggyback on imminent casNext() - lazySetItem(item); - } - - E getItem() { - return item; + UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } - void setItem(E val) { - item = val; - } - - void lazySetItem(E val) { - UNSAFE.putOrderedObject(this, itemOffset, val); - } - void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); } - Node getNext() { - return next; - } - boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } @@ -219,7 +214,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ - private transient volatile Node head = new Node(null); + private transient volatile Node head; /** * A node from which the last node on list (that is, the unique @@ -233,25 +228,41 @@ public class ConcurrentLinkedQueue extends AbstractQueue * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ - private transient volatile Node tail = head; + private transient volatile Node tail; /** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ - public ConcurrentLinkedQueue() {} + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } /** * Creates a {@code ConcurrentLinkedQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. + * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public ConcurrentLinkedQueue(Collection c) { - for (E e : c) - add(e); + Node h = null, t = null; + for (E e : c) { + checkNotNull(e); + Node newNode = new Node(e); + if (h == null) + h = t = newNode; + else { + t.lazySetNext(newNode); + t = newNode; + } + } + if (h == null) + h = t = new Node(null); + head = h; + tail = t; } // Have to override just to update the javadoc @@ -266,13 +277,6 @@ public class ConcurrentLinkedQueue extends AbstractQueue return offer(e); } - /** - * We don't bother to update head or tail pointers if fewer than - * HOPS links from "true" location. We assume that volatile - * writes are significantly more expensive than volatile reads. - */ - private static final int HOPS = 1; - /** * Try to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. @@ -288,7 +292,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue * stale pointer that is now off the list. */ final Node succ(Node p) { - Node next = p.getNext(); + Node next = p.next; return (p == next) ? head : next; } @@ -299,68 +303,75 @@ public class ConcurrentLinkedQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { - if (e == null) throw new NullPointerException(); - Node n = new Node(e); - retry: - for (;;) { - Node t = tail; - Node p = t; - for (int hops = 0; ; hops++) { - Node next = succ(p); - if (next != null) { - if (hops > HOPS && t != tail) - continue retry; - p = next; - } else if (p.casNext(null, n)) { - if (hops >= HOPS) - casTail(t, n); // Failure is OK. + checkNotNull(e); + final Node newNode = new Node(e); + + for (Node t = tail, p = t;;) { + Node q = p.next; + if (q == null) { + // p is last node + if (p.casNext(null, newNode)) { + // Successful CAS is the linearization point + // for e to become an element of this queue, + // and for newNode to become "live". + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. return true; - } else { - p = succ(p); } + // Lost CAS race to another thread; re-read next } + else if (p == q) + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + p = (t != (t = tail)) ? t : head; + else + // Check for tail updates after two hops. + p = (p != t && t != (t = tail)) ? t : q; } } public E poll() { - Node h = head; - Node p = h; - for (int hops = 0; ; hops++) { - E item = p.getItem(); - - if (item != null && p.casItem(item, null)) { - if (hops >= HOPS) { - Node q = p.getNext(); - updateHead(h, (q != null) ? q : p); + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + + if (item != null && p.casItem(item, null)) { + // Successful CAS is the linearization point + // for item to be removed from this queue. + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; } - return item; - } - Node next = succ(p); - if (next == null) { - updateHead(h, p); - break; + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + else if (p == q) + continue restartFromHead; + else + p = q; } - p = next; } - return null; } public E peek() { - Node h = head; - Node p = h; - E item; + restartFromHead: for (;;) { - item = p.getItem(); - if (item != null) - break; - Node next = succ(p); - if (next == null) { - break; + for (Node h = head, p = h, q;;) { + E item = p.item; + if (item != null || (q = p.next) == null) { + updateHead(h, p); + return item; + } + else if (p == q) + continue restartFromHead; + else + p = q; } - p = next; } - updateHead(h, p); - return item; } /** @@ -372,24 +383,20 @@ public class ConcurrentLinkedQueue extends AbstractQueue * of losing a race to a concurrent poll(). */ Node first() { - Node h = head; - Node p = h; - Node result; + restartFromHead: for (;;) { - E item = p.getItem(); - if (item != null) { - result = p; - break; - } - Node next = succ(p); - if (next == null) { - result = null; - break; + for (Node h = head, p = h, q;;) { + boolean hasItem = (p.item != null); + if (hasItem || (q = p.next) == null) { + updateHead(h, p); + return hasItem ? p : null; + } + else if (p == q) + continue restartFromHead; + else + p = q; } - p = next; } - updateHead(h, p); - return result; } /** @@ -410,18 +417,20 @@ public class ConcurrentLinkedQueue extends AbstractQueue * NOT a constant-time operation. Because of the * asynchronous nature of these queues, determining the current * number of elements requires an O(n) traversal. + * Additionally, if elements are added or removed during execution + * of this method, the returned result may be inaccurate. Thus, + * this method is typically not very useful in concurrent + * applications. * * @return the number of elements in this queue */ public int size() { int count = 0; - for (Node p = first(); p != null; p = succ(p)) { - if (p.getItem() != null) { - // Collections.size() spec says to max out + for (Node p = first(); p != null; p = succ(p)) + if (p.item != null) + // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; - } - } return count; } @@ -436,9 +445,8 @@ public class ConcurrentLinkedQueue extends AbstractQueue public boolean contains(Object o) { if (o == null) return false; for (Node p = first(); p != null; p = succ(p)) { - E item = p.getItem(); - if (item != null && - o.equals(item)) + E item = p.item; + if (item != null && o.equals(item)) return true; } return false; @@ -459,7 +467,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue if (o == null) return false; Node pred = null; for (Node p = first(); p != null; p = succ(p)) { - E item = p.getItem(); + E item = p.item; if (item != null && o.equals(item) && p.casItem(item, null)) { @@ -473,6 +481,69 @@ public class ConcurrentLinkedQueue extends AbstractQueue return false; } + /** + * Appends all of the elements in the specified collection to the end of + * this queue, in the order that they are returned by the specified + * collection's iterator. Attempts to {@code addAll} of a queue to + * itself result in {@code IllegalArgumentException}. + * + * @param c the elements to be inserted into this queue + * @return {@code true} if this queue changed as a result of the call + * @throws NullPointerException if the specified collection or any + * of its elements are null + * @throws IllegalArgumentException if the collection is this queue + */ + public boolean addAll(Collection c) { + if (c == this) + // As historically specified in AbstractQueue#addAll + throw new IllegalArgumentException(); + + // Copy c into a private chain of Nodes + Node beginningOfTheEnd = null, last = null; + for (E e : c) { + checkNotNull(e); + Node newNode = new Node(e); + if (beginningOfTheEnd == null) + beginningOfTheEnd = last = newNode; + else { + last.lazySetNext(newNode); + last = newNode; + } + } + if (beginningOfTheEnd == null) + return false; + + // Atomically append the chain at the tail of this collection + for (Node t = tail, p = t;;) { + Node q = p.next; + if (q == null) { + // p is last node + if (p.casNext(null, beginningOfTheEnd)) { + // Successful CAS is the linearization point + // for all elements to be added to this queue. + if (!casTail(t, last)) { + // Try a little harder to update tail, + // since we may be adding many elements. + t = tail; + if (last.next == null) + casTail(t, last); + } + return true; + } + // Lost CAS race to another thread; re-read next + } + else if (p == q) + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + p = (t != (t = tail)) ? t : head; + else + // Check for tail updates after two hops. + p = (p != t && t != (t = tail)) ? t : q; + } + } + /** * Returns an array containing all of the elements in this queue, in * proper sequence. @@ -490,7 +561,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue // Use ArrayList to deal with resizing. ArrayList al = new ArrayList(); for (Node p = first(); p != null; p = succ(p)) { - E item = p.getItem(); + E item = p.item; if (item != null) al.add(item); } @@ -539,7 +610,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue int k = 0; Node p; for (p = first(); p != null && k < a.length; p = succ(p)) { - E item = p.getItem(); + E item = p.item; if (item != null) a[k++] = (T)item; } @@ -552,7 +623,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue // If won't fit, use ArrayList version ArrayList al = new ArrayList(); for (Node q = first(); q != null; q = succ(q)) { - E item = q.getItem(); + E item = q.item; if (item != null) al.add(item); } @@ -561,7 +632,9 @@ public class ConcurrentLinkedQueue extends AbstractQueue /** * Returns an iterator over the elements in this queue in proper sequence. - * The returned iterator is a "weakly consistent" iterator that + * The elements will be returned in order from first (head) to last (tail). + * + *

The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link java.util.ConcurrentModificationException * ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon @@ -620,7 +693,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue nextItem = null; return x; } - E item = p.getItem(); + E item = p.item; if (item != null) { nextNode = p; nextItem = item; @@ -648,13 +721,13 @@ public class ConcurrentLinkedQueue extends AbstractQueue Node l = lastRet; if (l == null) throw new IllegalStateException(); // rely on a future traversal to relink. - l.setItem(null); + l.item = null; lastRet = null; } } /** - * Save the state to a stream (that is, serialize it). + * Saves the state to a stream (that is, serializes it). * * @serialData All of the elements (each an {@code E}) in * the proper order, followed by a null @@ -668,7 +741,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue // Write out all elements in the proper order. for (Node p = first(); p != null; p = succ(p)) { - Object item = p.getItem(); + Object item = p.item; if (item != null) s.writeObject(item); } @@ -678,25 +751,40 @@ public class ConcurrentLinkedQueue extends AbstractQueue } /** - * Reconstitute the Queue instance from a stream (that is, - * deserialize it). + * Reconstitutes the instance from a stream (that is, deserializes it). * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - // Read in capacity, and any hidden stuff s.defaultReadObject(); - head = new Node(null); - tail = head; - // Read in all elements and place in queue - for (;;) { + + // Read in elements until trailing null sentinel found + Node h = null, t = null; + Object item; + while ((item = s.readObject()) != null) { @SuppressWarnings("unchecked") - E item = (E)s.readObject(); - if (item == null) - break; - else - offer(item); + Node newNode = new Node((E) item); + if (h == null) + h = t = newNode; + else { + t.lazySetNext(newNode); + t = newNode; + } } + if (h == null) + h = t = new Node(null); + head = h; + tail = t; + } + + /** + * Throws NullPointerException if argument is null. + * + * @param v the element + */ + private static void checkNotNull(Object v) { + if (v == null) + throw new NullPointerException(); } // Unsafe mechanics @@ -715,10 +803,6 @@ public class ConcurrentLinkedQueue extends AbstractQueue return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); } - private void lazySetHead(Node val) { - UNSAFE.putOrderedObject(this, headOffset, val); - } - static long objectFieldOffset(sun.misc.Unsafe UNSAFE, String field, Class klazz) { try { diff --git a/test/java/util/Collection/BiggernYours.java b/test/java/util/Collection/BiggernYours.java index 8a0e2d126..d1e1e6188 100644 --- a/test/java/util/Collection/BiggernYours.java +++ b/test/java/util/Collection/BiggernYours.java @@ -173,6 +173,11 @@ public class BiggernYours { new ConcurrentHashMap() { public int size() {return randomize(super.size());}}); + testCollections( + new ConcurrentLinkedDeque(), + new ConcurrentLinkedDeque() { + public int size() {return randomize(super.size());}}); + testCollections( new ConcurrentLinkedQueue(), new ConcurrentLinkedQueue() { diff --git a/test/java/util/Collection/IteratorAtEnd.java b/test/java/util/Collection/IteratorAtEnd.java index 4f9cd0a67..9d10f33de 100644 --- a/test/java/util/Collection/IteratorAtEnd.java +++ b/test/java/util/Collection/IteratorAtEnd.java @@ -48,6 +48,7 @@ public class IteratorAtEnd { testCollection(new PriorityQueue()); testCollection(new LinkedBlockingQueue()); testCollection(new ArrayBlockingQueue(100)); + testCollection(new ConcurrentLinkedDeque()); testCollection(new ConcurrentLinkedQueue()); testCollection(new LinkedTransferQueue()); diff --git a/test/java/util/Collection/MOAT.java b/test/java/util/Collection/MOAT.java index 133447444..4de8ca6ee 100644 --- a/test/java/util/Collection/MOAT.java +++ b/test/java/util/Collection/MOAT.java @@ -75,6 +75,7 @@ public class MOAT { testCollection(new ArrayBlockingQueue(20)); testCollection(new LinkedBlockingQueue(20)); testCollection(new LinkedBlockingDeque(20)); + testCollection(new ConcurrentLinkedDeque()); testCollection(new ConcurrentLinkedQueue()); testCollection(new LinkedTransferQueue()); testCollection(new ConcurrentSkipListSet()); @@ -431,8 +432,9 @@ public class MOAT { q.poll(); equal(q.size(), 4); checkFunctionalInvariants(q); - if ((q instanceof LinkedBlockingQueue) || - (q instanceof LinkedBlockingDeque) || + if ((q instanceof LinkedBlockingQueue) || + (q instanceof LinkedBlockingDeque) || + (q instanceof ConcurrentLinkedDeque) || (q instanceof ConcurrentLinkedQueue)) { testQueueIteratorRemove(q); } diff --git a/test/java/util/Collections/RacingCollections.java b/test/java/util/Collections/RacingCollections.java index ae3abbed5..1bc513ea6 100644 --- a/test/java/util/Collections/RacingCollections.java +++ b/test/java/util/Collections/RacingCollections.java @@ -235,6 +235,7 @@ public class RacingCollections { new ArrayList>(newConcurrentDeques()); list.add(new LinkedBlockingQueue(10)); list.add(new LinkedTransferQueue()); + list.add(new ConcurrentLinkedQueue()); return list; } @@ -248,6 +249,7 @@ public class RacingCollections { private static List> newConcurrentDeques() { List> list = new ArrayList>(); list.add(new LinkedBlockingDeque(10)); + list.add(new ConcurrentLinkedDeque()); return list; } diff --git a/test/java/util/Deque/ChorusLine.java b/test/java/util/Deque/ChorusLine.java index d8711f6ce..5dbb5231a 100644 --- a/test/java/util/Deque/ChorusLine.java +++ b/test/java/util/Deque/ChorusLine.java @@ -129,6 +129,7 @@ public class ChorusLine { deqs.add(new ArrayDeque()); deqs.add(new LinkedList()); deqs.add(new LinkedBlockingDeque()); + deqs.add(new ConcurrentLinkedDeque()); equal(deqs); diff --git a/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java b/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java index ee9cb3eb4..b99362853 100644 --- a/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java +++ b/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java @@ -55,6 +55,7 @@ public class ConcurrentQueueLoops { Collection> concurrentQueues() { List> queues = new ArrayList>(); + queues.add(new ConcurrentLinkedDeque()); queues.add(new ConcurrentLinkedQueue()); queues.add(new ArrayBlockingQueue(items, false)); //queues.add(new ArrayBlockingQueue(count, true)); @@ -105,7 +106,7 @@ public class ConcurrentQueueLoops { final Queue queue; final CyclicBarrier barrier; int items; - Stage (Queue q, CyclicBarrier b, int items) { + Stage(Queue q, CyclicBarrier b, int items) { queue = q; barrier = b; this.items = items; diff --git a/test/java/util/concurrent/ConcurrentQueues/GCRetention.java b/test/java/util/concurrent/ConcurrentQueues/GCRetention.java index 3376f315d..1786ed350 100644 --- a/test/java/util/concurrent/ConcurrentQueues/GCRetention.java +++ b/test/java/util/concurrent/ConcurrentQueues/GCRetention.java @@ -40,6 +40,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; @@ -62,6 +63,7 @@ public class GCRetention { Collection> queues() { List> queues = new ArrayList>(); + queues.add(new ConcurrentLinkedDeque()); queues.add(new ConcurrentLinkedQueue()); queues.add(new ArrayBlockingQueue(count, false)); queues.add(new ArrayBlockingQueue(count, true)); diff --git a/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java b/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java index af34b0cfd..36411ad36 100644 --- a/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java +++ b/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java @@ -48,6 +48,7 @@ public class IteratorWeakConsistency { test(new LinkedBlockingQueue(20)); test(new LinkedBlockingDeque()); test(new LinkedBlockingDeque(20)); + test(new ConcurrentLinkedDeque()); test(new ConcurrentLinkedQueue()); test(new LinkedTransferQueue()); // Other concurrent queues (e.g. ArrayBlockingQueue) do not diff --git a/test/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java b/test/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java index a3a5fbdca..798a148e5 100644 --- a/test/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java +++ b/test/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java @@ -55,6 +55,7 @@ public class OfferRemoveLoops { testQueue(new LinkedBlockingDeque()); testQueue(new ArrayBlockingQueue(10)); testQueue(new PriorityBlockingQueue(10)); + testQueue(new ConcurrentLinkedDeque()); testQueue(new ConcurrentLinkedQueue()); testQueue(new LinkedTransferQueue()); } diff --git a/test/java/util/concurrent/ConcurrentQueues/RemovePollRace.java b/test/java/util/concurrent/ConcurrentQueues/RemovePollRace.java index a646eee99..1bccb53cd 100644 --- a/test/java/util/concurrent/ConcurrentQueues/RemovePollRace.java +++ b/test/java/util/concurrent/ConcurrentQueues/RemovePollRace.java @@ -41,6 +41,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; @@ -62,6 +63,7 @@ public class RemovePollRace { Collection> concurrentQueues() { List> queues = new ArrayList>(); + queues.add(new ConcurrentLinkedDeque()); queues.add(new ConcurrentLinkedQueue()); queues.add(new ArrayBlockingQueue(count, false)); queues.add(new ArrayBlockingQueue(count, true)); -- GitLab