From aa86f2c8ed18781f88c74e24c03a3d3db4a00722 Mon Sep 17 00:00:00 2001 From: psandoz Date: Wed, 3 Jul 2013 11:58:10 +0200 Subject: [PATCH] 8019481: Sync misc j.u.c classes from 166 to tl Reviewed-by: martin Contributed-by: Doug Lea --- .../concurrent/BrokenBarrierException.java | 4 +- .../java/util/concurrent/CountDownLatch.java | 26 +- .../java/util/concurrent/CyclicBarrier.java | 59 +- .../java/util/concurrent/Exchanger.java | 835 +++++++++--------- .../classes/java/util/concurrent/Phaser.java | 143 +-- .../java/util/concurrent/TimeUnit.java | 93 +- .../util/concurrent/TimeoutException.java | 6 +- .../java/util/concurrent/package-info.java | 14 +- 8 files changed, 584 insertions(+), 596 deletions(-) diff --git a/src/share/classes/java/util/concurrent/BrokenBarrierException.java b/src/share/classes/java/util/concurrent/BrokenBarrierException.java index 2c8f7e339..11f126e01 100644 --- a/src/share/classes/java/util/concurrent/BrokenBarrierException.java +++ b/src/share/classes/java/util/concurrent/BrokenBarrierException.java @@ -49,13 +49,13 @@ public class BrokenBarrierException extends Exception { private static final long serialVersionUID = 7117394618823254244L; /** - * Constructs a BrokenBarrierException with no specified detail + * Constructs a {@code BrokenBarrierException} with no specified detail * message. */ public BrokenBarrierException() {} /** - * Constructs a BrokenBarrierException with the specified + * Constructs a {@code BrokenBarrierException} with the specified * detail message. * * @param message the detail message diff --git a/src/share/classes/java/util/concurrent/CountDownLatch.java b/src/share/classes/java/util/concurrent/CountDownLatch.java index 055eb1137..a8b50ca39 100644 --- a/src/share/classes/java/util/concurrent/CountDownLatch.java +++ b/src/share/classes/java/util/concurrent/CountDownLatch.java @@ -92,15 +92,15 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer; * private final CountDownLatch startSignal; * private final CountDownLatch doneSignal; * Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { - * this.startSignal = startSignal; - * this.doneSignal = doneSignal; + * this.startSignal = startSignal; + * this.doneSignal = doneSignal; * } * public void run() { - * try { - * startSignal.await(); - * doWork(); - * doneSignal.countDown(); - * } catch (InterruptedException ex) {} // return; + * try { + * startSignal.await(); + * doWork(); + * doneSignal.countDown(); + * } catch (InterruptedException ex) {} // return; * } * * void doWork() { ... } @@ -130,14 +130,14 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer; * private final CountDownLatch doneSignal; * private final int i; * WorkerRunnable(CountDownLatch doneSignal, int i) { - * this.doneSignal = doneSignal; - * this.i = i; + * this.doneSignal = doneSignal; + * this.i = i; * } * public void run() { - * try { - * doWork(i); - * doneSignal.countDown(); - * } catch (InterruptedException ex) {} // return; + * try { + * doWork(i); + * doneSignal.countDown(); + * } catch (InterruptedException ex) {} // return; * } * * void doWork() { ... } diff --git a/src/share/classes/java/util/concurrent/CyclicBarrier.java b/src/share/classes/java/util/concurrent/CyclicBarrier.java index eb25879db..d1186d8eb 100644 --- a/src/share/classes/java/util/concurrent/CyclicBarrier.java +++ b/src/share/classes/java/util/concurrent/CyclicBarrier.java @@ -45,14 +45,14 @@ import java.util.concurrent.locks.ReentrantLock; * cyclic because it can be re-used after the waiting threads * are released. * - *

A CyclicBarrier supports an optional {@link Runnable} command + *

A {@code CyclicBarrier} supports an optional {@link Runnable} command * that is run once per barrier point, after the last thread in the party * arrives, but before any threads are released. * This barrier action is useful * for updating shared-state before any of the parties continue. * - *

Sample usage: Here is an example of - * using a barrier in a parallel decomposition design: + *

Sample usage: Here is an example of using a barrier in a + * parallel decomposition design: * *

 {@code
  * class Solver {
@@ -81,16 +81,20 @@ import java.util.concurrent.locks.ReentrantLock;
  *   public Solver(float[][] matrix) {
  *     data = matrix;
  *     N = matrix.length;
- *     barrier = new CyclicBarrier(N,
- *                                 new Runnable() {
- *                                   public void run() {
- *                                     mergeRows(...);
- *                                   }
- *                                 });
- *     for (int i = 0; i < N; ++i)
- *       new Thread(new Worker(i)).start();
+ *     Runnable barrierAction =
+ *       new Runnable() { public void run() { mergeRows(...); }};
+ *     barrier = new CyclicBarrier(N, barrierAction);
  *
- *     waitUntilDone();
+ *     List threads = new ArrayList(N);
+ *     for (int i = 0; i < N; i++) {
+ *       Thread thread = new Thread(new Worker(i));
+ *       threads.add(thread);
+ *       thread.start();
+ *     }
+ *
+ *     // wait until done
+ *     for (Thread thread : threads)
+ *       thread.join();
  *   }
  * }}
* @@ -98,8 +102,8 @@ import java.util.concurrent.locks.ReentrantLock; * barrier until all rows have been processed. When all rows are processed * the supplied {@link Runnable} barrier action is executed and merges the * rows. If the merger - * determines that a solution has been found then done() will return - * true and each worker will terminate. + * determines that a solution has been found then {@code done()} will return + * {@code true} and each worker will terminate. * *

If the barrier action does not rely on the parties being suspended when * it is executed, then any of the threads in the party could execute that @@ -112,7 +116,7 @@ import java.util.concurrent.locks.ReentrantLock; * // log the completion of this iteration * }} * - *

The CyclicBarrier uses an all-or-none breakage model + *

The {@code CyclicBarrier} uses an all-or-none breakage model * for failed synchronization attempts: If a thread leaves a barrier * point prematurely because of interruption, failure, or timeout, all * other threads waiting at that barrier point will also leave @@ -139,7 +143,7 @@ public class CyclicBarrier { * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these - * can be active at a time (the one to which count applies) + * can be active at a time (the one to which {@code count} applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. @@ -259,7 +263,7 @@ public class CyclicBarrier { } /** - * Creates a new CyclicBarrier that will trip when the + * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. @@ -278,7 +282,7 @@ public class CyclicBarrier { } /** - * Creates a new CyclicBarrier that will trip when the + * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * @@ -301,7 +305,7 @@ public class CyclicBarrier { /** * Waits until all {@linkplain #getParties parties} have invoked - * await on this barrier. + * {@code await} on this barrier. * *

If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until @@ -326,7 +330,7 @@ public class CyclicBarrier { * *

If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when - * await is invoked, or while any thread is waiting, then + * {@code await} is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * *

If any thread is {@linkplain Thread#interrupt interrupted} while waiting, @@ -343,7 +347,7 @@ public class CyclicBarrier { * the broken state. * * @return the arrival index of the current thread, where index - * {@link #getParties()} - 1 indicates the first + * {@code getParties() - 1} indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting @@ -351,7 +355,7 @@ public class CyclicBarrier { * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was * broken when {@code await} was called, or the barrier - * action (if present) failed due an exception. + * action (if present) failed due to an exception */ public int await() throws InterruptedException, BrokenBarrierException { try { @@ -363,7 +367,7 @@ public class CyclicBarrier { /** * Waits until all {@linkplain #getParties parties} have invoked - * await on this barrier, or the specified waiting time elapses. + * {@code await} on this barrier, or the specified waiting time elapses. * *

If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until @@ -393,7 +397,7 @@ public class CyclicBarrier { * *

If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when - * await is invoked, or while any thread is waiting, then + * {@code await} is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * *

If any thread is {@linkplain Thread#interrupt interrupted} while @@ -412,16 +416,17 @@ public class CyclicBarrier { * @param timeout the time to wait for the barrier * @param unit the time unit of the timeout parameter * @return the arrival index of the current thread, where index - * {@link #getParties()} - 1 indicates the first + * {@code getParties() - 1} indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting - * @throws TimeoutException if the specified timeout elapses + * @throws TimeoutException if the specified timeout elapses. + * In this case the barrier will be broken. * @throws BrokenBarrierException if another thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was broken * when {@code await} was called, or the barrier action (if - * present) failed due an exception + * present) failed due to an exception */ public int await(long timeout, TimeUnit unit) throws InterruptedException, diff --git a/src/share/classes/java/util/concurrent/Exchanger.java b/src/share/classes/java/util/concurrent/Exchanger.java index 5accdb1ce..980b0e187 100644 --- a/src/share/classes/java/util/concurrent/Exchanger.java +++ b/src/share/classes/java/util/concurrent/Exchanger.java @@ -35,7 +35,8 @@ */ package java.util.concurrent; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; /** @@ -52,7 +53,7 @@ import java.util.concurrent.locks.LockSupport; * to swap buffers between threads so that the thread filling the * buffer gets a freshly emptied one when it needs it, handing off the * filled one to the thread emptying the buffer. - *

{@code
+ *  
 {@code
  * class FillAndEmpty {
  *   Exchanger exchanger = new Exchanger();
  *   DataBuffer initialEmptyBuffer = ... a made-up type
@@ -88,8 +89,7 @@ import java.util.concurrent.locks.LockSupport;
  *     new Thread(new FillingLoop()).start();
  *     new Thread(new EmptyingLoop()).start();
  *   }
- * }
- * }
+ * }}
* *

Memory consistency effects: For each pair of threads that * successfully exchange objects via an {@code Exchanger}, actions @@ -103,486 +103,425 @@ import java.util.concurrent.locks.LockSupport; * @param The type of objects that may be exchanged */ public class Exchanger { + /* - * Algorithm Description: + * Overview: The core algorithm is, for an exchange "slot", + * and a participant (caller) with an item: + * + * for (;;) { + * if (slot is empty) { // offer + * place item in a Node; + * if (can CAS slot from empty to node) { + * wait for release; + * return matching item in node; + * } + * } + * else if (can CAS slot from node to empty) { // release + * get the item in node; + * set matching item in node; + * release waiting thread; + * } + * // else retry on CAS failure + * } + * + * This is among the simplest forms of a "dual data structure" -- + * see Scott and Scherer's DISC 04 paper and + * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html * - * The basic idea is to maintain a "slot", which is a reference to - * a Node containing both an Item to offer and a "hole" waiting to - * get filled in. If an incoming "occupying" thread sees that the - * slot is null, it CAS'es (compareAndSets) a Node there and waits - * for another to invoke exchange. That second "fulfilling" thread - * sees that the slot is non-null, and so CASes it back to null, - * also exchanging items by CASing the hole, plus waking up the - * occupying thread if it is blocked. In each case CAS'es may - * fail because a slot at first appears non-null but is null upon - * CAS, or vice-versa. So threads may need to retry these - * actions. + * This works great in principle. But in practice, like many + * algorithms centered on atomic updates to a single location, it + * scales horribly when there are more than a few participants + * using the same Exchanger. So the implementation instead uses a + * form of elimination arena, that spreads out this contention by + * arranging that some threads typically use different slots, + * while still ensuring that eventually, any two parties will be + * able to exchange items. That is, we cannot completely partition + * across threads, but instead give threads arena indices that + * will on average grow under contention and shrink under lack of + * contention. We approach this by defining the Nodes that we need + * anyway as ThreadLocals, and include in them per-thread index + * and related bookkeeping state. (We can safely reuse per-thread + * nodes rather than creating them fresh each time because slots + * alternate between pointing to a node vs null, so cannot + * encounter ABA problems. However, we do need some care in + * resetting them between uses.) * - * This simple approach works great when there are only a few - * threads using an Exchanger, but performance rapidly - * deteriorates due to CAS contention on the single slot when - * there are lots of threads using an exchanger. So instead we use - * an "arena"; basically a kind of hash table with a dynamically - * varying number of slots, any one of which can be used by - * threads performing an exchange. Incoming threads pick slots - * based on a hash of their Thread ids. If an incoming thread - * fails to CAS in its chosen slot, it picks an alternative slot - * instead. And similarly from there. If a thread successfully - * CASes into a slot but no other thread arrives, it tries - * another, heading toward the zero slot, which always exists even - * if the table shrinks. The particular mechanics controlling this - * are as follows: + * Implementing an effective arena requires allocating a bunch of + * space, so we only do so upon detecting contention (except on + * uniprocessors, where they wouldn't help, so aren't used). + * Otherwise, exchanges use the single-slot slotExchange method. + * On contention, not only must the slots be in different + * locations, but the locations must not encounter memory + * contention due to being on the same cache line (or more + * generally, the same coherence unit). Because, as of this + * writing, there is no way to determine cacheline size, we define + * a value that is enough for common platforms. Additionally, + * extra care elsewhere is taken to avoid other false/unintended + * sharing and to enhance locality, including adding padding (via + * sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger + * field, and reworking some park/unpark mechanics compared to + * LockSupport versions. * - * Waiting: Slot zero is special in that it is the only slot that - * exists when there is no contention. A thread occupying slot - * zero will block if no thread fulfills it after a short spin. - * In other cases, occupying threads eventually give up and try - * another slot. Waiting threads spin for a while (a period that - * should be a little less than a typical context-switch time) - * before either blocking (if slot zero) or giving up (if other - * slots) and restarting. There is no reason for threads to block - * unless there are unlikely to be any other threads present. - * Occupants are mainly avoiding memory contention so sit there - * quietly polling for a shorter period than it would take to - * block and then unblock them. Non-slot-zero waits that elapse - * because of lack of other threads waste around one extra - * context-switch time per try, which is still on average much - * faster than alternative approaches. + * The arena starts out with only one used slot. We expand the + * effective arena size by tracking collisions; i.e., failed CASes + * while trying to exchange. By nature of the above algorithm, the + * only kinds of collision that reliably indicate contention are + * when two attempted releases collide -- one of two attempted + * offers can legitimately fail to CAS without indicating + * contention by more than one other thread. (Note: it is possible + * but not worthwhile to more precisely detect contention by + * reading slot values after CAS failures.) When a thread has + * collided at each slot within the current arena bound, it tries + * to expand the arena size by one. We track collisions within + * bounds by using a version (sequence) number on the "bound" + * field, and conservatively reset collision counts when a + * participant notices that bound has been updated (in either + * direction). * - * Sizing: Usually, using only a few slots suffices to reduce - * contention. Especially with small numbers of threads, using - * too many slots can lead to just as poor performance as using - * too few of them, and there's not much room for error. The - * variable "max" maintains the number of slots actually in - * use. It is increased when a thread sees too many CAS - * failures. (This is analogous to resizing a regular hash table - * based on a target load factor, except here, growth steps are - * just one-by-one rather than proportional.) Growth requires - * contention failures in each of three tried slots. Requiring - * multiple failures for expansion copes with the fact that some - * failed CASes are not due to contention but instead to simple - * races between two threads or thread pre-emptions occurring - * between reading and CASing. Also, very transient peak - * contention can be much higher than the average sustainable - * levels. An attempt to decrease the max limit is usually made - * when a non-slot-zero wait elapses without being fulfilled. - * Threads experiencing elapsed waits move closer to zero, so - * eventually find existing (or future) threads even if the table - * has been shrunk due to inactivity. The chosen mechanics and - * thresholds for growing and shrinking are intrinsically - * entangled with indexing and hashing inside the exchange code, - * and can't be nicely abstracted out. + * The effective arena size is reduced (when there is more than + * one slot) by giving up on waiting after a while and trying to + * decrement the arena size on expiration. The value of "a while" + * is an empirical matter. We implement by piggybacking on the + * use of spin->yield->block that is essential for reasonable + * waiting performance anyway -- in a busy exchanger, offers are + * usually almost immediately released, in which case context + * switching on multiprocessors is extremely slow/wasteful. Arena + * waits just omit the blocking part, and instead cancel. The spin + * count is empirically chosen to be a value that avoids blocking + * 99% of the time under maximum sustained exchange rates on a + * range of test machines. Spins and yields entail some limited + * randomness (using a cheap xorshift) to avoid regular patterns + * that can induce unproductive grow/shrink cycles. (Using a + * pseudorandom also helps regularize spin cycle duration by + * making branches unpredictable.) Also, during an offer, a + * waiter can "know" that it will be released when its slot has + * changed, but cannot yet proceed until match is set. In the + * mean time it cannot cancel the offer, so instead spins/yields. + * Note: It is possible to avoid this secondary check by changing + * the linearization point to be a CAS of the match field (as done + * in one case in the Scott & Scherer DISC paper), which also + * increases asynchrony a bit, at the expense of poorer collision + * detection and inability to always reuse per-thread nodes. So + * the current scheme is typically a better tradeoff. * - * Hashing: Each thread picks its initial slot to use in accord - * with a simple hashcode. The sequence is the same on each - * encounter by any given thread, but effectively random across - * threads. Using arenas encounters the classic cost vs quality - * tradeoffs of all hash tables. Here, we use a one-step FNV-1a - * hash code based on the current thread's Thread.getId(), along - * with a cheap approximation to a mod operation to select an - * index. The downside of optimizing index selection in this way - * is that the code is hardwired to use a maximum table size of - * 32. But this value more than suffices for known platforms and - * applications. + * On collisions, indices traverse the arena cyclically in reverse + * order, restarting at the maximum index (which will tend to be + * sparsest) when bounds change. (On expirations, indices instead + * are halved until reaching 0.) It is possible (and has been + * tried) to use randomized, prime-value-stepped, or double-hash + * style traversal instead of simple cyclic traversal to reduce + * bunching. But empirically, whatever benefits these may have + * don't overcome their added overhead: We are managing operations + * that occur very quickly unless there is sustained contention, + * so simpler/faster control policies work better than more + * accurate but slower ones. * - * Probing: On sensed contention of a selected slot, we probe - * sequentially through the table, analogously to linear probing - * after collision in a hash table. (We move circularly, in - * reverse order, to mesh best with table growth and shrinkage - * rules.) Except that to minimize the effects of false-alarms - * and cache thrashing, we try the first selected slot twice - * before moving. + * Because we use expiration for arena size control, we cannot + * throw TimeoutExceptions in the timed version of the public + * exchange method until the arena size has shrunken to zero (or + * the arena isn't enabled). This may delay response to timeout + * but is still within spec. * - * Padding: Even with contention management, slots are heavily - * contended, so use cache-padding to avoid poor memory - * performance. Because of this, slots are lazily constructed - * only when used, to avoid wasting this space unnecessarily. - * While isolation of locations is not much of an issue at first - * in an application, as time goes on and garbage-collectors - * perform compaction, slots are very likely to be moved adjacent - * to each other, which can cause much thrashing of cache lines on - * MPs unless padding is employed. + * Essentially all of the implementation is in methods + * slotExchange and arenaExchange. These have similar overall + * structure, but differ in too many details to combine. The + * slotExchange method uses the single Exchanger field "slot" + * rather than arena array elements. However, it still needs + * minimal collision detection to trigger arena construction. + * (The messiest part is making sure interrupt status and + * InterruptedExceptions come out right during transitions when + * both methods may be called. This is done by using null return + * as a sentinel to recheck interrupt status.) * - * This is an improvement of the algorithm described in the paper - * "A Scalable Elimination-based Exchange Channel" by William - * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05 - * workshop. Available at: http://hdl.handle.net/1802/2104 + * As is too common in this sort of code, methods are monolithic + * because most of the logic relies on reads of fields that are + * maintained as local variables so can't be nicely factored -- + * mainly, here, bulky spin->yield->block/cancel code), and + * heavily dependent on intrinsics (Unsafe) to use inlined + * embedded CAS and related memory access operations (that tend + * not to be as readily inlined by dynamic compilers when they are + * hidden behind other methods that would more nicely name and + * encapsulate the intended effects). This includes the use of + * putOrderedX to clear fields of the per-thread Nodes between + * uses. Note that field Node.item is not declared as volatile + * even though it is read by releasing threads, because they only + * do so after CAS operations that must precede access, and all + * uses by the owning thread are otherwise acceptably ordered by + * other operations. (Because the actual points of atomicity are + * slot CASes, it would also be legal for the write to Node.match + * in a release to be weaker than a full volatile write. However, + * this is not done because it could allow further postponement of + * the write, delaying progress.) */ - /** The number of CPUs, for sizing and spin control */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); - /** - * The capacity of the arena. Set to a value that provides more - * than enough space to handle contention. On small machines - * most slots won't be used, but it is still not wasted because - * the extra space provides some machine-level address padding - * to minimize interference with heavily CAS'ed Slot locations. - * And on very large machines, performance eventually becomes - * bounded by memory bandwidth, not numbers of threads/CPUs. - * This constant cannot be changed without also modifying - * indexing and hashing algorithms. + * The byte distance (as a shift value) between any two used slots + * in the arena. 1 << ASHIFT should be at least cacheline size. */ - private static final int CAPACITY = 32; + private static final int ASHIFT = 7; /** - * The value of "max" that will hold all threads without - * contention. When this value is less than CAPACITY, some - * otherwise wasted expansion can be avoided. + * The maximum supported arena index. The maximum allocatable + * arena size is MMASK + 1. Must be a power of two minus one, less + * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices + * for the expected scaling limits of the main algorithms. */ - private static final int FULL = - Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1); + private static final int MMASK = 0xff; /** - * The number of times to spin (doing nothing except polling a - * memory location) before blocking or giving up while waiting to - * be fulfilled. Should be zero on uniprocessors. On - * multiprocessors, this value should be large enough so that two - * threads exchanging items as fast as possible block only when - * one of them is stalled (due to GC or preemption), but not much - * longer, to avoid wasting CPU resources. Seen differently, this - * value is a little over half the number of cycles of an average - * context switch time on most systems. The value here is - * approximately the average of those across a range of tested - * systems. + * Unit for sequence/version bits of bound field. Each successful + * change to the bound also adds SEQ. */ - private static final int SPINS = (NCPU == 1) ? 0 : 2000; + private static final int SEQ = MMASK + 1; + + /** The number of CPUs, for sizing and spin control */ + private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** - * The number of times to spin before blocking in timed waits. - * Timed waits spin more slowly because checking the time takes - * time. The best value relies mainly on the relative rate of - * System.nanoTime vs memory accesses. The value is empirically - * derived to work well across a variety of systems. + * The maximum slot index of the arena: The number of slots that + * can in principle hold all threads without contention, or at + * most the maximum indexable value. */ - private static final int TIMED_SPINS = SPINS / 20; + static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; /** - * Sentinel item representing cancellation of a wait due to - * interruption, timeout, or elapsed spin-waits. This value is - * placed in holes on cancellation, and used as a return value - * from waiting methods to indicate failure to set or get hole. + * The bound for spins while waiting for a match. The actual + * number of iterations will on average be about twice this value + * due to randomization. Note: Spinning is disabled when NCPU==1. */ - private static final Object CANCEL = new Object(); + private static final int SPINS = 1 << 10; /** * Value representing null arguments/returns from public - * methods. This disambiguates from internal requirement that - * holes start out as null to mean they are not yet set. + * methods. Needed because the API originally didn't disallow null + * arguments, which it should have. */ private static final Object NULL_ITEM = new Object(); /** - * Nodes hold partially exchanged data. This class - * opportunistically subclasses AtomicReference to represent the - * hole. So get() returns hole, and compareAndSet CAS'es value - * into hole. This class cannot be parameterized as "V" because - * of the use of non-V CANCEL sentinels. + * Sentinel value returned by internal exchange methods upon + * timeout, to avoid need for separate timed versions of these + * methods. */ - @SuppressWarnings("serial") - private static final class Node extends AtomicReference { - /** The element offered by the Thread creating this node. */ - public final Object item; - - /** The Thread waiting to be signalled; null until waiting. */ - public volatile Thread waiter; - - /** - * Creates node with given item and empty hole. - * @param item the item - */ - public Node(Object item) { - this.item = item; - } - } + private static final Object TIMED_OUT = new Object(); /** - * A Slot is an AtomicReference with heuristic padding to lessen - * cache effects of this heavily CAS'ed location. While the - * padding adds noticeable space, all slots are created only on - * demand, and there will be more than one of them only when it - * would improve throughput more than enough to outweigh using - * extra space. + * Nodes hold partially exchanged data, plus other per-thread + * bookkeeping. Padded via @sun.misc.Contended to reduce memory + * contention. */ - @SuppressWarnings("serial") - private static final class Slot extends AtomicReference { - // Improve likelihood of isolation on <= 64 byte cache lines - long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; + @sun.misc.Contended static final class Node { + int index; // Arena index + int bound; // Last recorded value of Exchanger.bound + int collides; // Number of CAS failures at current bound + int hash; // Pseudo-random for spins + Object item; // This thread's current item + volatile Object match; // Item provided by releasing thread + volatile Thread parked; // Set to this thread when parked, else null } - /** - * Slot array. Elements are lazily initialized when needed. - * Declared volatile to enable double-checked lazy construction. - */ - private volatile Slot[] arena = new Slot[CAPACITY]; - - /** - * The maximum slot index being used. The value sometimes - * increases when a thread experiences too many CAS contentions, - * and sometimes decreases when a spin-wait elapses. Changes - * are performed only via compareAndSet, to avoid stale values - * when a thread happens to stall right before setting. - */ - private final AtomicInteger max = new AtomicInteger(); - - /** - * Main exchange function, handling the different policy variants. - * Uses Object, not "V" as argument and return value to simplify - * handling of sentinel values. Callers from public methods decode - * and cast accordingly. - * - * @param item the (non-null) item to exchange - * @param timed true if the wait is timed - * @param nanos if timed, the maximum wait time - * @return the other thread's item, or CANCEL if interrupted or timed out - */ - private Object doExchange(Object item, boolean timed, long nanos) { - Node me = new Node(item); // Create in case occupying - int index = hashIndex(); // Index of current slot - int fails = 0; // Number of CAS failures - - for (;;) { - Object y; // Contents of current slot - Slot slot = arena[index]; - if (slot == null) // Lazily initialize slots - createSlot(index); // Continue loop to reread - else if ((y = slot.get()) != null && // Try to fulfill - slot.compareAndSet(y, null)) { - Node you = (Node)y; // Transfer item - if (you.compareAndSet(null, item)) { - LockSupport.unpark(you.waiter); - return you.item; - } // Else cancelled; continue - } - else if (y == null && // Try to occupy - slot.compareAndSet(null, me)) { - if (index == 0) // Blocking wait for slot 0 - return timed ? - awaitNanos(me, slot, nanos) : - await(me, slot); - Object v = spinWait(me, slot); // Spin wait for non-0 - if (v != CANCEL) - return v; - me = new Node(item); // Throw away cancelled node - int m = max.get(); - if (m > (index >>>= 1)) // Decrease index - max.compareAndSet(m, m - 1); // Maybe shrink table - } - else if (++fails > 1) { // Allow 2 fails on 1st slot - int m = max.get(); - if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) - index = m + 1; // Grow on 3rd failed slot - else if (--index < 0) - index = m; // Circularly traverse - } - } - } - - /** - * Returns a hash index for the current thread. Uses a one-step - * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/) - * based on the current thread's Thread.getId(). These hash codes - * have more uniform distribution properties with respect to small - * moduli (here 1-31) than do other simple hashing functions. - * - *

To return an index between 0 and max, we use a cheap - * approximation to a mod operation, that also corrects for bias - * due to non-power-of-2 remaindering (see {@link - * java.util.Random#nextInt}). Bits of the hashcode are masked - * with "nbits", the ceiling power of two of table size (looked up - * in a table packed into three ints). If too large, this is - * retried after rotating the hash by nbits bits, while forcing new - * top bit to 0, which guarantees eventual termination (although - * with a non-random-bias). This requires an average of less than - * 2 tries for all table sizes, and has a maximum 2% difference - * from perfectly uniform slot probabilities when applied to all - * possible hash codes for sizes less than 32. - * - * @return a per-thread-random index, 0 <= index < max - */ - private final int hashIndex() { - long id = Thread.currentThread().getId(); - int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193; - - int m = max.get(); - int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1)) - ((0x000001f8 >>> m) & 2) | // The constants hold - ((0xffff00f2 >>> m) & 1)); // a lookup table - int index; - while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on - hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m - return index; + /** The corresponding thread local class */ + static final class Participant extends ThreadLocal { + public Node initialValue() { return new Node(); } } /** - * Creates a new slot at given index. Called only when the slot - * appears to be null. Relies on double-check using builtin - * locks, since they rarely contend. This in turn relies on the - * arena array being declared volatile. - * - * @param index the index to add slot at + * Per-thread state */ - private void createSlot(int index) { - // Create slot outside of lock to narrow sync region - Slot newSlot = new Slot(); - Slot[] a = arena; - synchronized (a) { - if (a[index] == null) - a[index] = newSlot; - } - } + private final Participant participant; /** - * Tries to cancel a wait for the given node waiting in the given - * slot, if so, helping clear the node from its slot to avoid - * garbage retention. - * - * @param node the waiting node - * @param the slot it is waiting in - * @return true if successfully cancelled + * Elimination array; null until enabled (within slotExchange). + * Element accesses use emulation of volatile gets and CAS. */ - private static boolean tryCancel(Node node, Slot slot) { - if (!node.compareAndSet(null, CANCEL)) - return false; - if (slot.get() == node) // pre-check to minimize contention - slot.compareAndSet(node, null); - return true; - } - - // Three forms of waiting. Each just different enough not to merge - // code with others. + private volatile Node[] arena; /** - * Spin-waits for hole for a non-0 slot. Fails if spin elapses - * before hole filled. Does not check interrupt, relying on check - * in public exchange method to abort if interrupted on entry. - * - * @param node the waiting node - * @return on success, the hole; on failure, CANCEL + * Slot used until contention detected. */ - private static Object spinWait(Node node, Slot slot) { - int spins = SPINS; - for (;;) { - Object v = node.get(); - if (v != null) - return v; - else if (spins > 0) - --spins; - else - tryCancel(node, slot); - } - } + private volatile Node slot; /** - * Waits for (by spinning and/or blocking) and gets the hole - * filled in by another thread. Fails if interrupted before - * hole filled. - * - * When a node/thread is about to block, it sets its waiter field - * and then rechecks state at least one more time before actually - * parking, thus covering race vs fulfiller noticing that waiter - * is non-null so should be woken. - * - * Thread interruption status is checked only surrounding calls to - * park. The caller is assumed to have checked interrupt status - * on entry. - * - * @param node the waiting node - * @return on success, the hole; on failure, CANCEL + * The index of the largest valid arena position, OR'ed with SEQ + * number in high bits, incremented on each update. The initial + * update from 0 to SEQ is used to ensure that the arena array is + * constructed only once. */ - private static Object await(Node node, Slot slot) { - Thread w = Thread.currentThread(); - int spins = SPINS; - for (;;) { - Object v = node.get(); - if (v != null) - return v; - else if (spins > 0) // Spin-wait phase - --spins; - else if (node.waiter == null) // Set up to block next - node.waiter = w; - else if (w.isInterrupted()) // Abort on interrupt - tryCancel(node, slot); - else // Block - LockSupport.park(node); - } - } + private volatile int bound; /** - * Waits for (at index 0) and gets the hole filled in by another - * thread. Fails if timed out or interrupted before hole filled. - * Same basic logic as untimed version, but a bit messier. + * Exchange function when arenas enabled. See above for explanation. * - * @param node the waiting node - * @param nanos the wait time - * @return on success, the hole; on failure, CANCEL + * @param item the (non-null) item to exchange + * @param timed true if the wait is timed + * @param ns if timed, the maximum wait time, else 0L + * @return the other thread's item; or null if interrupted; or + * TIMED_OUT if timed and timed out */ - private Object awaitNanos(Node node, Slot slot, long nanos) { - int spins = TIMED_SPINS; - long lastTime = 0; - Thread w = null; - for (;;) { - Object v = node.get(); - if (v != null) + private final Object arenaExchange(Object item, boolean timed, long ns) { + Node[] a = arena; + Node p = participant.get(); + for (int i = p.index;;) { // access slot at i + int b, m, c; long j; // j is raw array offset + Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); + if (q != null && U.compareAndSwapObject(a, j, q, null)) { + Object v = q.item; // release + q.match = item; + Thread w = q.parked; + if (w != null) + U.unpark(w); return v; - long now = System.nanoTime(); - if (w == null) - w = Thread.currentThread(); - else - nanos -= now - lastTime; - lastTime = now; - if (nanos > 0) { - if (spins > 0) - --spins; - else if (node.waiter == null) - node.waiter = w; - else if (w.isInterrupted()) - tryCancel(node, slot); + } + else if (i <= (m = (b = bound) & MMASK) && q == null) { + p.item = item; // offer + if (U.compareAndSwapObject(a, j, null, p)) { + long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; + Thread t = Thread.currentThread(); // wait + for (int h = p.hash, spins = SPINS;;) { + Object v = p.match; + if (v != null) { + U.putOrderedObject(p, MATCH, null); + p.item = null; // clear for next use + p.hash = h; + return v; + } + else if (spins > 0) { + h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift + if (h == 0) // initialize hash + h = SPINS | (int)t.getId(); + else if (h < 0 && // approx 50% true + (--spins & ((SPINS >>> 1) - 1)) == 0) + Thread.yield(); // two yields per wait + } + else if (U.getObjectVolatile(a, j) != p) + spins = SPINS; // releaser hasn't set match yet + else if (!t.isInterrupted() && m == 0 && + (!timed || + (ns = end - System.nanoTime()) > 0L)) { + U.putObject(t, BLOCKER, this); // emulate LockSupport + p.parked = t; // minimize window + if (U.getObjectVolatile(a, j) == p) + U.park(false, ns); + p.parked = null; + U.putObject(t, BLOCKER, null); + } + else if (U.getObjectVolatile(a, j) == p && + U.compareAndSwapObject(a, j, p, null)) { + if (m != 0) // try to shrink + U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); + p.item = null; + p.hash = h; + i = p.index >>>= 1; // descend + if (Thread.interrupted()) + return null; + if (timed && m == 0 && ns <= 0L) + return TIMED_OUT; + break; // expired; restart + } + } + } else - LockSupport.parkNanos(node, nanos); + p.item = null; // clear offer + } + else { + if (p.bound != b) { // stale; reset + p.bound = b; + p.collides = 0; + i = (i != m || m == 0) ? m : m - 1; + } + else if ((c = p.collides) < m || m == FULL || + !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { + p.collides = c + 1; + i = (i == 0) ? m : i - 1; // cyclically traverse + } + else + i = m + 1; // grow + p.index = i; } - else if (tryCancel(node, slot) && !w.isInterrupted()) - return scanOnTimeout(node); } } /** - * Sweeps through arena checking for any waiting threads. Called - * only upon return from timeout while waiting in slot 0. When a - * thread gives up on a timed wait, it is possible that a - * previously-entered thread is still waiting in some other - * slot. So we scan to check for any. This is almost always - * overkill, but decreases the likelihood of timeouts when there - * are other threads present to far less than that in lock-based - * exchangers in which earlier-arriving threads may still be - * waiting on entry locks. + * Exchange function used until arenas enabled. See above for explanation. * - * @param node the waiting node - * @return another thread's item, or CANCEL + * @param item the item to exchange + * @param timed true if the wait is timed + * @param ns if timed, the maximum wait time, else 0L + * @return the other thread's item; or null if either the arena + * was enabled or the thread was interrupted before completion; or + * TIMED_OUT if timed and timed out */ - private Object scanOnTimeout(Node node) { - Object y; - for (int j = arena.length - 1; j >= 0; --j) { - Slot slot = arena[j]; - if (slot != null) { - while ((y = slot.get()) != null) { - if (slot.compareAndSet(y, null)) { - Node you = (Node)y; - if (you.compareAndSet(null, node.item)) { - LockSupport.unpark(you.waiter); - return you.item; - } - } + private final Object slotExchange(Object item, boolean timed, long ns) { + Node p = participant.get(); + Thread t = Thread.currentThread(); + if (t.isInterrupted()) // preserve interrupt status so caller can recheck + return null; + + for (Node q;;) { + if ((q = slot) != null) { + if (U.compareAndSwapObject(this, SLOT, q, null)) { + Object v = q.item; + q.match = item; + Thread w = q.parked; + if (w != null) + U.unpark(w); + return v; } + // create arena on contention, but continue until slot null + if (NCPU > 1 && bound == 0 && + U.compareAndSwapInt(this, BOUND, 0, SEQ)) + arena = new Node[(FULL + 2) << ASHIFT]; + } + else if (arena != null) + return null; // caller must reroute to arenaExchange + else { + p.item = item; + if (U.compareAndSwapObject(this, SLOT, null, p)) + break; + p.item = null; } } - return CANCEL; + + // await release + int h = p.hash; + long end = timed ? System.nanoTime() + ns : 0L; + int spins = (NCPU > 1) ? SPINS : 1; + Object v; + while ((v = p.match) == null) { + if (spins > 0) { + h ^= h << 1; h ^= h >>> 3; h ^= h << 10; + if (h == 0) + h = SPINS | (int)t.getId(); + else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) + Thread.yield(); + } + else if (slot != p) + spins = SPINS; + else if (!t.isInterrupted() && arena == null && + (!timed || (ns = end - System.nanoTime()) > 0L)) { + U.putObject(t, BLOCKER, this); + p.parked = t; + if (slot == p) + U.park(false, ns); + p.parked = null; + U.putObject(t, BLOCKER, null); + } + else if (U.compareAndSwapObject(this, SLOT, p, null)) { + v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; + break; + } + } + U.putOrderedObject(p, MATCH, null); + p.item = null; + p.hash = h; + return v; } /** * Creates a new Exchanger. */ public Exchanger() { + participant = new Participant(); } /** @@ -620,15 +559,14 @@ public class Exchanger { */ @SuppressWarnings("unchecked") public V exchange(V x) throws InterruptedException { - if (!Thread.interrupted()) { - Object o = doExchange((x == null) ? NULL_ITEM : x, false, 0); - if (o == NULL_ITEM) - return null; - if (o != CANCEL) - return (V)o; - Thread.interrupted(); // Clear interrupt status on IE throw - } - throw new InterruptedException(); + Object v; + Object item = (x == null) ? NULL_ITEM : x; // translate null args + if ((arena != null || + (v = slotExchange(item, false, 0L)) == null) && + ((Thread.interrupted() || // disambiguates null return + (v = arenaExchange(item, false, 0L)) == null))) + throw new InterruptedException(); + return (v == NULL_ITEM) ? null : (V)v; } /** @@ -666,7 +604,7 @@ public class Exchanger { * * @param x the object to exchange * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument + * @param unit the time unit of the {@code timeout} argument * @return the object provided by the other thread * @throws InterruptedException if the current thread was * interrupted while waiting @@ -676,16 +614,51 @@ public class Exchanger { @SuppressWarnings("unchecked") public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - if (!Thread.interrupted()) { - Object o = doExchange((x == null) ? NULL_ITEM : x, - true, unit.toNanos(timeout)); - if (o == NULL_ITEM) - return null; - if (o != CANCEL) - return (V)o; - if (!Thread.interrupted()) - throw new TimeoutException(); + Object v; + Object item = (x == null) ? NULL_ITEM : x; + long ns = unit.toNanos(timeout); + if ((arena != null || + (v = slotExchange(item, true, ns)) == null) && + ((Thread.interrupted() || + (v = arenaExchange(item, true, ns)) == null))) + throw new InterruptedException(); + if (v == TIMED_OUT) + throw new TimeoutException(); + return (v == NULL_ITEM) ? null : (V)v; + } + + // Unsafe mechanics + private static final sun.misc.Unsafe U; + private static final long BOUND; + private static final long SLOT; + private static final long MATCH; + private static final long BLOCKER; + private static final int ABASE; + static { + int s; + try { + U = sun.misc.Unsafe.getUnsafe(); + Class ek = Exchanger.class; + Class nk = Node.class; + Class ak = Node[].class; + Class tk = Thread.class; + BOUND = U.objectFieldOffset + (ek.getDeclaredField("bound")); + SLOT = U.objectFieldOffset + (ek.getDeclaredField("slot")); + MATCH = U.objectFieldOffset + (nk.getDeclaredField("match")); + BLOCKER = U.objectFieldOffset + (tk.getDeclaredField("parkBlocker")); + s = U.arrayIndexScale(ak); + // ABASE absorbs padding in front of element 0 + ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); + + } catch (Exception e) { + throw new Error(e); } - throw new InterruptedException(); + if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) + throw new Error("Unsupported array scale"); } + } diff --git a/src/share/classes/java/util/concurrent/Phaser.java b/src/share/classes/java/util/concurrent/Phaser.java index c8afecc3c..394f62bcc 100644 --- a/src/share/classes/java/util/concurrent/Phaser.java +++ b/src/share/classes/java/util/concurrent/Phaser.java @@ -46,7 +46,7 @@ import java.util.concurrent.locks.LockSupport; * {@link java.util.concurrent.CountDownLatch CountDownLatch} * but supporting more flexible usage. * - *

Registration. Unlike the case for other barriers, the + *

Registration. Unlike the case for other barriers, the * number of parties registered to synchronize on a phaser * may vary over time. Tasks may be registered at any time (using * methods {@link #register}, {@link #bulkRegister}, or forms of @@ -59,7 +59,7 @@ import java.util.concurrent.locks.LockSupport; * (However, you can introduce such bookkeeping by subclassing this * class.) * - *

Synchronization. Like a {@code CyclicBarrier}, a {@code + *

Synchronization. Like a {@code CyclicBarrier}, a {@code * Phaser} may be repeatedly awaited. Method {@link * #arriveAndAwaitAdvance} has effect analogous to {@link * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each @@ -103,7 +103,7 @@ import java.util.concurrent.locks.LockSupport; * * * - *

Termination. A phaser may enter a termination + *

Termination. A phaser may enter a termination * state, that may be checked using method {@link #isTerminated}. Upon * termination, all synchronization methods immediately return without * waiting for advance, as indicated by a negative return value. @@ -118,7 +118,7 @@ import java.util.concurrent.locks.LockSupport; * also available to abruptly release waiting threads and allow them * to terminate. * - *

Tiering. Phasers may be tiered (i.e., + *

Tiering. Phasers may be tiered (i.e., * constructed in tree structures) to reduce contention. Phasers with * large numbers of parties that would otherwise experience heavy * synchronization contention costs may instead be set up so that @@ -300,18 +300,20 @@ public class Phaser { private static final int PHASE_SHIFT = 32; private static final int UNARRIVED_MASK = 0xffff; // to mask ints private static final long PARTIES_MASK = 0xffff0000L; // to mask longs + private static final long COUNTS_MASK = 0xffffffffL; private static final long TERMINATION_BIT = 1L << 63; // some special values private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; + private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; private static final int EMPTY = 1; // The following unpacking methods are usually manually inlined private static int unarrivedOf(long s) { int counts = (int)s; - return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK; + return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); } private static int partiesOf(long s) { @@ -372,37 +374,44 @@ public class Phaser { * Manually tuned to speed up and minimize race windows for the * common case of just decrementing unarrived field. * - * @param deregister false for arrive, true for arriveAndDeregister + * @param adjust value to subtract from state; + * ONE_ARRIVAL for arrive, + * ONE_DEREGISTER for arriveAndDeregister */ - private int doArrive(boolean deregister) { - int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL; + private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); - int counts = (int)s; - int unarrived = (counts & UNARRIVED_MASK) - 1; if (phase < 0) return phase; - else if (counts == EMPTY || unarrived < 0) { - if (root == this || reconcileState() == s) - throw new IllegalStateException(badArrive(s)); - } - else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { - if (unarrived == 0) { + int counts = (int)s; + int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); + if (unarrived <= 0) + throw new IllegalStateException(badArrive(s)); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { + if (unarrived == 1) { long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; - if (root != this) - return parent.doArrive(nextUnarrived == 0); - if (onAdvance(phase, nextUnarrived)) - n |= TERMINATION_BIT; - else if (nextUnarrived == 0) - n |= EMPTY; + if (root == this) { + if (onAdvance(phase, nextUnarrived)) + n |= TERMINATION_BIT; + else if (nextUnarrived == 0) + n |= EMPTY; + else + n |= nextUnarrived; + int nextPhase = (phase + 1) & MAX_PHASE; + n |= (long)nextPhase << PHASE_SHIFT; + UNSAFE.compareAndSwapLong(this, stateOffset, s, n); + releaseWaiters(phase); + } + else if (nextUnarrived == 0) { // propagate deregistration + phase = parent.doArrive(ONE_DEREGISTER); + UNSAFE.compareAndSwapLong(this, stateOffset, + s, s | EMPTY); + } else - n |= nextUnarrived; - n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT; - UNSAFE.compareAndSwapLong(this, stateOffset, s, n); - releaseWaiters(phase); + phase = parent.doArrive(ONE_ARRIVAL); } return phase; } @@ -417,42 +426,49 @@ public class Phaser { */ private int doRegister(int registrations) { // adjustment to state - long adj = ((long)registrations << PARTIES_SHIFT) | registrations; + long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { - long s = state; + long s = (parent == null) ? state : reconcileState(); int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); - else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0) + phase = (int)(s >>> PHASE_SHIFT); + if (phase < 0) break; - else if (counts != EMPTY) { // not 1st registration + if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0) // wait out advance root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, - s, s + adj)) + s, s + adjust)) break; } } else if (parent == null) { // 1st root registration - long next = ((long)phase << PHASE_SHIFT) | adj; + long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } else { synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock - parent.doRegister(1); - do { // force current phase + phase = parent.doRegister(1); + if (phase < 0) + break; + // finish registration whenever parent registration + // succeeded, even when racing with termination, + // since these are part of the same "transaction". + while (!UNSAFE.compareAndSwapLong + (this, stateOffset, s, + ((long)phase << PHASE_SHIFT) | adjust)) { + s = state; phase = (int)(root.state >>> PHASE_SHIFT); - // assert phase < 0 || (int)state == EMPTY; - } while (!UNSAFE.compareAndSwapLong - (this, stateOffset, state, - ((long)phase << PHASE_SHIFT) | adj)); + // assert (int)s == EMPTY; + } break; } } @@ -467,10 +483,6 @@ public class Phaser { * subphasers have not yet done so, in which case they must finish * their own advance by setting unarrived to parties (or if * parties is zero, resetting to unregistered EMPTY state). - * However, this method may also be called when "floating" - * subphasers with possibly some unarrived parties are merely - * catching up to current phase, in which case counts are - * unaffected. * * @return reconciled state */ @@ -478,16 +490,16 @@ public class Phaser { final Phaser root = this.root; long s = state; if (root != this) { - int phase, u, p; - // CAS root phase with current parties; possibly trip unarrived + int phase, p; + // CAS to root phase with current parties, tripping unarrived while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && !UNSAFE.compareAndSwapLong (this, stateOffset, s, s = (((long)phase << PHASE_SHIFT) | - (s & PARTIES_MASK) | - ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY : - (u = (int)s & UNARRIVED_MASK) == 0 ? p : u)))) + ((phase < 0) ? (s & COUNTS_MASK) : + (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : + ((s & PARTIES_MASK) | p)))))) s = state; } return s; @@ -619,7 +631,7 @@ public class Phaser { * of unarrived parties would become negative */ public int arrive() { - return doArrive(false); + return doArrive(ONE_ARRIVAL); } /** @@ -639,7 +651,7 @@ public class Phaser { * of registered or unarrived parties would become negative */ public int arriveAndDeregister() { - return doArrive(true); + return doArrive(ONE_DEREGISTER); } /** @@ -666,17 +678,15 @@ public class Phaser { for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); - int counts = (int)s; - int unarrived = (counts & UNARRIVED_MASK) - 1; if (phase < 0) return phase; - else if (counts == EMPTY || unarrived < 0) { - if (reconcileState() == s) - throw new IllegalStateException(badArrive(s)); - } - else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, - s -= ONE_ARRIVAL)) { - if (unarrived != 0) + int counts = (int)s; + int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); + if (unarrived <= 0) + throw new IllegalStateException(badArrive(s)); + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, + s -= ONE_ARRIVAL)) { + if (unarrived > 1) return root.internalAwaitAdvance(phase, null); if (root != this) return parent.arriveAndAwaitAdvance(); @@ -809,8 +819,8 @@ public class Phaser { if (UNSAFE.compareAndSwapLong(root, stateOffset, s, s | TERMINATION_BIT)) { // signal all threads - releaseWaiters(0); - releaseWaiters(1); + releaseWaiters(0); // Waiters on evenQ + releaseWaiters(1); // Waiters on oddQ return; } } @@ -1016,7 +1026,7 @@ public class Phaser { /** * Possibly blocks and waits for phase to advance unless aborted. - * Call only from root node. + * Call only on root phaser. * * @param phase current phase * @param node if non-null, the wait node to track interrupt and timeout; @@ -1024,6 +1034,7 @@ public class Phaser { * @return current phase */ private int internalAwaitAdvance(int phase, QNode node) { + // assert root == this; releaseWaiters(phase-1); // ensure old queue clean boolean queued = false; // true when node is enqueued int lastUnarrived = 0; // to increase spins upon change @@ -1082,7 +1093,7 @@ public class Phaser { final boolean timed; boolean wasInterrupted; long nanos; - long lastTime; + final long deadline; volatile Thread thread; // nulled to cancel wait QNode next; @@ -1093,7 +1104,7 @@ public class Phaser { this.interruptible = interruptible; this.nanos = nanos; this.timed = timed; - this.lastTime = timed ? System.nanoTime() : 0L; + this.deadline = timed ? System.nanoTime() + nanos : 0L; thread = Thread.currentThread(); } @@ -1112,9 +1123,7 @@ public class Phaser { } if (timed) { if (nanos > 0L) { - long now = System.nanoTime(); - nanos -= now - lastTime; - lastTime = now; + nanos = deadline - System.nanoTime(); } if (nanos <= 0L) { thread = null; @@ -1129,7 +1138,7 @@ public class Phaser { return true; else if (!timed) LockSupport.park(this); - else if (nanos > 0) + else if (nanos > 0L) LockSupport.parkNanos(this, nanos); return isReleasable(); } diff --git a/src/share/classes/java/util/concurrent/TimeUnit.java b/src/share/classes/java/util/concurrent/TimeUnit.java index ab3aa854f..bd56b4835 100644 --- a/src/share/classes/java/util/concurrent/TimeUnit.java +++ b/src/share/classes/java/util/concurrent/TimeUnit.java @@ -36,10 +36,10 @@ package java.util.concurrent; /** - * A TimeUnit represents time durations at a given unit of + * A {@code TimeUnit} represents time durations at a given unit of * granularity and provides utility methods to convert across units, * and to perform timing and delay operations in these units. A - * TimeUnit does not maintain time information, but only + * {@code TimeUnit} does not maintain time information, but only * helps organize and use time representations that may be maintained * separately across various contexts. A nanosecond is defined as one * thousandth of a microsecond, a microsecond as one thousandth of a @@ -47,7 +47,7 @@ package java.util.concurrent; * as sixty seconds, an hour as sixty minutes, and a day as twenty four * hours. * - *

A TimeUnit is mainly used to inform time-based methods + *

A {@code TimeUnit} is mainly used to inform time-based methods * how a given timing parameter should be interpreted. For example, * the following code will timeout in 50 milliseconds if the {@link * java.util.concurrent.locks.Lock lock} is not available: @@ -63,7 +63,7 @@ package java.util.concurrent; * * Note however, that there is no guarantee that a particular timeout * implementation will be able to notice the passage of time at the - * same granularity as the given TimeUnit. + * same granularity as the given {@code TimeUnit}. * * @since 1.5 * @author Doug Lea @@ -174,83 +174,82 @@ public enum TimeUnit { // etc. are not declared abstract but otherwise act as abstract methods. /** - * Convert the given time duration in the given unit to this - * unit. Conversions from finer to coarser granularities - * truncate, so lose precision. For example converting - * 999 milliseconds to seconds results in - * 0. Conversions from coarser to finer granularities - * with arguments that would numerically overflow saturate to - * Long.MIN_VALUE if negative or Long.MAX_VALUE - * if positive. + * Converts the given time duration in the given unit to this unit. + * Conversions from finer to coarser granularities truncate, so + * lose precision. For example, converting {@code 999} milliseconds + * to seconds results in {@code 0}. Conversions from coarser to + * finer granularities with arguments that would numerically + * overflow saturate to {@code Long.MIN_VALUE} if negative or + * {@code Long.MAX_VALUE} if positive. * *

For example, to convert 10 minutes to milliseconds, use: - * TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES) + * {@code TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)} * - * @param sourceDuration the time duration in the given sourceUnit - * @param sourceUnit the unit of the sourceDuration argument + * @param sourceDuration the time duration in the given {@code sourceUnit} + * @param sourceUnit the unit of the {@code sourceDuration} argument * @return the converted duration in this unit, - * or Long.MIN_VALUE if conversion would negatively - * overflow, or Long.MAX_VALUE if it would positively overflow. + * or {@code Long.MIN_VALUE} if conversion would negatively + * overflow, or {@code Long.MAX_VALUE} if it would positively overflow. */ public long convert(long sourceDuration, TimeUnit sourceUnit) { throw new AbstractMethodError(); } /** - * Equivalent to NANOSECONDS.convert(duration, this). + * Equivalent to + * {@link #convert(long, TimeUnit) NANOSECONDS.convert(duration, this)}. * @param duration the duration * @return the converted duration, - * or Long.MIN_VALUE if conversion would negatively - * overflow, or Long.MAX_VALUE if it would positively overflow. - * @see #convert + * or {@code Long.MIN_VALUE} if conversion would negatively + * overflow, or {@code Long.MAX_VALUE} if it would positively overflow. */ public long toNanos(long duration) { throw new AbstractMethodError(); } /** - * Equivalent to MICROSECONDS.convert(duration, this). + * Equivalent to + * {@link #convert(long, TimeUnit) MICROSECONDS.convert(duration, this)}. * @param duration the duration * @return the converted duration, - * or Long.MIN_VALUE if conversion would negatively - * overflow, or Long.MAX_VALUE if it would positively overflow. - * @see #convert + * or {@code Long.MIN_VALUE} if conversion would negatively + * overflow, or {@code Long.MAX_VALUE} if it would positively overflow. */ public long toMicros(long duration) { throw new AbstractMethodError(); } /** - * Equivalent to MILLISECONDS.convert(duration, this). + * Equivalent to + * {@link #convert(long, TimeUnit) MILLISECONDS.convert(duration, this)}. * @param duration the duration * @return the converted duration, - * or Long.MIN_VALUE if conversion would negatively - * overflow, or Long.MAX_VALUE if it would positively overflow. - * @see #convert + * or {@code Long.MIN_VALUE} if conversion would negatively + * overflow, or {@code Long.MAX_VALUE} if it would positively overflow. */ public long toMillis(long duration) { throw new AbstractMethodError(); } /** - * Equivalent to SECONDS.convert(duration, this). + * Equivalent to + * {@link #convert(long, TimeUnit) SECONDS.convert(duration, this)}. * @param duration the duration * @return the converted duration, - * or Long.MIN_VALUE if conversion would negatively - * overflow, or Long.MAX_VALUE if it would positively overflow. - * @see #convert + * or {@code Long.MIN_VALUE} if conversion would negatively + * overflow, or {@code Long.MAX_VALUE} if it would positively overflow. */ public long toSeconds(long duration) { throw new AbstractMethodError(); } /** - * Equivalent to MINUTES.convert(duration, this). + * Equivalent to + * {@link #convert(long, TimeUnit) MINUTES.convert(duration, this)}. * @param duration the duration * @return the converted duration, - * or Long.MIN_VALUE if conversion would negatively - * overflow, or Long.MAX_VALUE if it would positively overflow. - * @see #convert + * or {@code Long.MIN_VALUE} if conversion would negatively + * overflow, or {@code Long.MAX_VALUE} if it would positively overflow. * @since 1.6 */ public long toMinutes(long duration) { @@ -258,12 +257,12 @@ public enum TimeUnit { } /** - * Equivalent to HOURS.convert(duration, this). + * Equivalent to + * {@link #convert(long, TimeUnit) HOURS.convert(duration, this)}. * @param duration the duration * @return the converted duration, - * or Long.MIN_VALUE if conversion would negatively - * overflow, or Long.MAX_VALUE if it would positively overflow. - * @see #convert + * or {@code Long.MIN_VALUE} if conversion would negatively + * overflow, or {@code Long.MAX_VALUE} if it would positively overflow. * @since 1.6 */ public long toHours(long duration) { @@ -271,10 +270,10 @@ public enum TimeUnit { } /** - * Equivalent to DAYS.convert(duration, this). + * Equivalent to + * {@link #convert(long, TimeUnit) DAYS.convert(duration, this)}. * @param duration the duration * @return the converted duration - * @see #convert * @since 1.6 */ public long toDays(long duration) { @@ -294,9 +293,9 @@ public enum TimeUnit { * Performs a timed {@link Object#wait(long, int) Object.wait} * using this time unit. * This is a convenience method that converts timeout arguments - * into the form required by the Object.wait method. + * into the form required by the {@code Object.wait} method. * - *

For example, you could implement a blocking poll + *

For example, you could implement a blocking {@code poll} * method (see {@link BlockingQueue#poll BlockingQueue.poll}) * using: * @@ -327,7 +326,7 @@ public enum TimeUnit { * Performs a timed {@link Thread#join(long, int) Thread.join} * using this time unit. * This is a convenience method that converts time arguments into the - * form required by the Thread.join method. + * form required by the {@code Thread.join} method. * * @param thread the thread to wait for * @param timeout the maximum time to wait. If less than @@ -347,7 +346,7 @@ public enum TimeUnit { * Performs a {@link Thread#sleep(long, int) Thread.sleep} using * this time unit. * This is a convenience method that converts time arguments into the - * form required by the Thread.sleep method. + * form required by the {@code Thread.sleep} method. * * @param timeout the minimum time to sleep. If less than * or equal to zero, do not sleep at all. diff --git a/src/share/classes/java/util/concurrent/TimeoutException.java b/src/share/classes/java/util/concurrent/TimeoutException.java index ed08990c7..b54c52b21 100644 --- a/src/share/classes/java/util/concurrent/TimeoutException.java +++ b/src/share/classes/java/util/concurrent/TimeoutException.java @@ -40,7 +40,7 @@ package java.util.concurrent; * operations for which a timeout is specified need a means to * indicate that the timeout has occurred. For many such operations it * is possible to return a value that indicates timeout; when that is - * not possible or desirable then TimeoutException should be + * not possible or desirable then {@code TimeoutException} should be * declared and thrown. * * @since 1.5 @@ -50,13 +50,13 @@ public class TimeoutException extends Exception { private static final long serialVersionUID = 1900926677490660714L; /** - * Constructs a TimeoutException with no specified detail + * Constructs a {@code TimeoutException} with no specified detail * message. */ public TimeoutException() {} /** - * Constructs a TimeoutException with the specified detail + * Constructs a {@code TimeoutException} with the specified detail * message. * * @param message the detail message diff --git a/src/share/classes/java/util/concurrent/package-info.java b/src/share/classes/java/util/concurrent/package-info.java index 4c56e0390..b236c290b 100644 --- a/src/share/classes/java/util/concurrent/package-info.java +++ b/src/share/classes/java/util/concurrent/package-info.java @@ -48,7 +48,7 @@ * * {@link java.util.concurrent.Executor} is a simple standardized * interface for defining custom thread-like subsystems, including - * thread pools, asynchronous IO, and lightweight task frameworks. + * thread pools, asynchronous I/O, and lightweight task frameworks. * Depending on which concrete Executor class is being used, tasks may * execute in a newly created thread, an existing task-execution thread, * or the thread calling {@link java.util.concurrent.Executor#execute @@ -102,8 +102,10 @@ *

Queues

* * The {@link java.util.concurrent.ConcurrentLinkedQueue} class - * supplies an efficient scalable thread-safe non-blocking FIFO - * queue. + * supplies an efficient scalable thread-safe non-blocking FIFO queue. + * The {@link java.util.concurrent.ConcurrentLinkedDeque} class is + * similar, but additionally supports the {@link java.util.Deque} + * interface. * *

Five implementations in {@code java.util.concurrent} support * the extended {@link java.util.concurrent.BlockingQueue} @@ -117,7 +119,7 @@ * for producer-consumer, messaging, parallel tasking, and * related concurrent designs. * - *

Extended interface {@link java.util.concurrent.TransferQueue}, + *

Extended interface {@link java.util.concurrent.TransferQueue}, * and implementation {@link java.util.concurrent.LinkedTransferQueue} * introduce a synchronous {@code transfer} method (along with related * features) in which a producer may optionally block awaiting its @@ -216,9 +218,9 @@ * it may (or may not) reflect any updates since the iterator was * created. * - *

Memory Consistency Properties

+ *

Memory Consistency Properties

* - * + * * Chapter 17 of the Java Language Specification defines the * happens-before relation on memory operations such as reads and * writes of shared variables. The results of a write by one thread are -- GitLab