提交 aa86f2c8 编写于 作者: P psandoz

8019481: Sync misc j.u.c classes from 166 to tl

Reviewed-by: martin
Contributed-by: NDoug Lea <dl@cs.oswego.edu>
上级 7bf1893e
...@@ -49,13 +49,13 @@ public class BrokenBarrierException extends Exception { ...@@ -49,13 +49,13 @@ public class BrokenBarrierException extends Exception {
private static final long serialVersionUID = 7117394618823254244L; private static final long serialVersionUID = 7117394618823254244L;
/** /**
* Constructs a <tt>BrokenBarrierException</tt> with no specified detail * Constructs a {@code BrokenBarrierException} with no specified detail
* message. * message.
*/ */
public BrokenBarrierException() {} public BrokenBarrierException() {}
/** /**
* Constructs a <tt>BrokenBarrierException</tt> with the specified * Constructs a {@code BrokenBarrierException} with the specified
* detail message. * detail message.
* *
* @param message the detail message * @param message the detail message
......
...@@ -92,15 +92,15 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer; ...@@ -92,15 +92,15 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
* private final CountDownLatch startSignal; * private final CountDownLatch startSignal;
* private final CountDownLatch doneSignal; * private final CountDownLatch doneSignal;
* Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { * Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
* this.startSignal = startSignal; * this.startSignal = startSignal;
* this.doneSignal = doneSignal; * this.doneSignal = doneSignal;
* } * }
* public void run() { * public void run() {
* try { * try {
* startSignal.await(); * startSignal.await();
* doWork(); * doWork();
* doneSignal.countDown(); * doneSignal.countDown();
* } catch (InterruptedException ex) {} // return; * } catch (InterruptedException ex) {} // return;
* } * }
* *
* void doWork() { ... } * void doWork() { ... }
...@@ -130,14 +130,14 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer; ...@@ -130,14 +130,14 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
* private final CountDownLatch doneSignal; * private final CountDownLatch doneSignal;
* private final int i; * private final int i;
* WorkerRunnable(CountDownLatch doneSignal, int i) { * WorkerRunnable(CountDownLatch doneSignal, int i) {
* this.doneSignal = doneSignal; * this.doneSignal = doneSignal;
* this.i = i; * this.i = i;
* } * }
* public void run() { * public void run() {
* try { * try {
* doWork(i); * doWork(i);
* doneSignal.countDown(); * doneSignal.countDown();
* } catch (InterruptedException ex) {} // return; * } catch (InterruptedException ex) {} // return;
* } * }
* *
* void doWork() { ... } * void doWork() { ... }
......
...@@ -45,14 +45,14 @@ import java.util.concurrent.locks.ReentrantLock; ...@@ -45,14 +45,14 @@ import java.util.concurrent.locks.ReentrantLock;
* <em>cyclic</em> because it can be re-used after the waiting threads * <em>cyclic</em> because it can be re-used after the waiting threads
* are released. * are released.
* *
* <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
* that is run once per barrier point, after the last thread in the party * that is run once per barrier point, after the last thread in the party
* arrives, but before any threads are released. * arrives, but before any threads are released.
* This <em>barrier action</em> is useful * This <em>barrier action</em> is useful
* for updating shared-state before any of the parties continue. * for updating shared-state before any of the parties continue.
* *
* <p><b>Sample usage:</b> Here is an example of * <p><b>Sample usage:</b> Here is an example of using a barrier in a
* using a barrier in a parallel decomposition design: * parallel decomposition design:
* *
* <pre> {@code * <pre> {@code
* class Solver { * class Solver {
...@@ -81,16 +81,20 @@ import java.util.concurrent.locks.ReentrantLock; ...@@ -81,16 +81,20 @@ import java.util.concurrent.locks.ReentrantLock;
* public Solver(float[][] matrix) { * public Solver(float[][] matrix) {
* data = matrix; * data = matrix;
* N = matrix.length; * N = matrix.length;
* barrier = new CyclicBarrier(N, * Runnable barrierAction =
* new Runnable() { * new Runnable() { public void run() { mergeRows(...); }};
* public void run() { * barrier = new CyclicBarrier(N, barrierAction);
* mergeRows(...);
* }
* });
* for (int i = 0; i < N; ++i)
* new Thread(new Worker(i)).start();
* *
* waitUntilDone(); * List<Thread> threads = new ArrayList<Thread>(N);
* for (int i = 0; i < N; i++) {
* Thread thread = new Thread(new Worker(i));
* threads.add(thread);
* thread.start();
* }
*
* // wait until done
* for (Thread thread : threads)
* thread.join();
* } * }
* }}</pre> * }}</pre>
* *
...@@ -98,8 +102,8 @@ import java.util.concurrent.locks.ReentrantLock; ...@@ -98,8 +102,8 @@ import java.util.concurrent.locks.ReentrantLock;
* barrier until all rows have been processed. When all rows are processed * barrier until all rows have been processed. When all rows are processed
* the supplied {@link Runnable} barrier action is executed and merges the * the supplied {@link Runnable} barrier action is executed and merges the
* rows. If the merger * rows. If the merger
* determines that a solution has been found then <tt>done()</tt> will return * determines that a solution has been found then {@code done()} will return
* <tt>true</tt> and each worker will terminate. * {@code true} and each worker will terminate.
* *
* <p>If the barrier action does not rely on the parties being suspended when * <p>If the barrier action does not rely on the parties being suspended when
* it is executed, then any of the threads in the party could execute that * it is executed, then any of the threads in the party could execute that
...@@ -112,7 +116,7 @@ import java.util.concurrent.locks.ReentrantLock; ...@@ -112,7 +116,7 @@ import java.util.concurrent.locks.ReentrantLock;
* // log the completion of this iteration * // log the completion of this iteration
* }}</pre> * }}</pre>
* *
* <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
* for failed synchronization attempts: If a thread leaves a barrier * for failed synchronization attempts: If a thread leaves a barrier
* point prematurely because of interruption, failure, or timeout, all * point prematurely because of interruption, failure, or timeout, all
* other threads waiting at that barrier point will also leave * other threads waiting at that barrier point will also leave
...@@ -139,7 +143,7 @@ public class CyclicBarrier { ...@@ -139,7 +143,7 @@ public class CyclicBarrier {
* is reset. There can be many generations associated with threads * is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock * using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these * may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which <tt>count</tt> applies) * can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped. * and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break * There need not be an active generation if there has been a break
* but no subsequent reset. * but no subsequent reset.
...@@ -259,7 +263,7 @@ public class CyclicBarrier { ...@@ -259,7 +263,7 @@ public class CyclicBarrier {
} }
/** /**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the * Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which * given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped, * will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier. * performed by the last thread entering the barrier.
...@@ -278,7 +282,7 @@ public class CyclicBarrier { ...@@ -278,7 +282,7 @@ public class CyclicBarrier {
} }
/** /**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the * Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and * given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped. * does not perform a predefined action when the barrier is tripped.
* *
...@@ -301,7 +305,7 @@ public class CyclicBarrier { ...@@ -301,7 +305,7 @@ public class CyclicBarrier {
/** /**
* Waits until all {@linkplain #getParties parties} have invoked * Waits until all {@linkplain #getParties parties} have invoked
* <tt>await</tt> on this barrier. * {@code await} on this barrier.
* *
* <p>If the current thread is not the last to arrive then it is * <p>If the current thread is not the last to arrive then it is
* disabled for thread scheduling purposes and lies dormant until * disabled for thread scheduling purposes and lies dormant until
...@@ -326,7 +330,7 @@ public class CyclicBarrier { ...@@ -326,7 +330,7 @@ public class CyclicBarrier {
* *
* <p>If the barrier is {@link #reset} while any thread is waiting, * <p>If the barrier is {@link #reset} while any thread is waiting,
* or if the barrier {@linkplain #isBroken is broken} when * or if the barrier {@linkplain #isBroken is broken} when
* <tt>await</tt> is invoked, or while any thread is waiting, then * {@code await} is invoked, or while any thread is waiting, then
* {@link BrokenBarrierException} is thrown. * {@link BrokenBarrierException} is thrown.
* *
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
...@@ -343,7 +347,7 @@ public class CyclicBarrier { ...@@ -343,7 +347,7 @@ public class CyclicBarrier {
* the broken state. * the broken state.
* *
* @return the arrival index of the current thread, where index * @return the arrival index of the current thread, where index
* <tt>{@link #getParties()} - 1</tt> indicates the first * {@code getParties() - 1} indicates the first
* to arrive and zero indicates the last to arrive * to arrive and zero indicates the last to arrive
* @throws InterruptedException if the current thread was interrupted * @throws InterruptedException if the current thread was interrupted
* while waiting * while waiting
...@@ -351,7 +355,7 @@ public class CyclicBarrier { ...@@ -351,7 +355,7 @@ public class CyclicBarrier {
* interrupted or timed out while the current thread was * interrupted or timed out while the current thread was
* waiting, or the barrier was reset, or the barrier was * waiting, or the barrier was reset, or the barrier was
* broken when {@code await} was called, or the barrier * broken when {@code await} was called, or the barrier
* action (if present) failed due an exception. * action (if present) failed due to an exception
*/ */
public int await() throws InterruptedException, BrokenBarrierException { public int await() throws InterruptedException, BrokenBarrierException {
try { try {
...@@ -363,7 +367,7 @@ public class CyclicBarrier { ...@@ -363,7 +367,7 @@ public class CyclicBarrier {
/** /**
* Waits until all {@linkplain #getParties parties} have invoked * Waits until all {@linkplain #getParties parties} have invoked
* <tt>await</tt> on this barrier, or the specified waiting time elapses. * {@code await} on this barrier, or the specified waiting time elapses.
* *
* <p>If the current thread is not the last to arrive then it is * <p>If the current thread is not the last to arrive then it is
* disabled for thread scheduling purposes and lies dormant until * disabled for thread scheduling purposes and lies dormant until
...@@ -393,7 +397,7 @@ public class CyclicBarrier { ...@@ -393,7 +397,7 @@ public class CyclicBarrier {
* *
* <p>If the barrier is {@link #reset} while any thread is waiting, * <p>If the barrier is {@link #reset} while any thread is waiting,
* or if the barrier {@linkplain #isBroken is broken} when * or if the barrier {@linkplain #isBroken is broken} when
* <tt>await</tt> is invoked, or while any thread is waiting, then * {@code await} is invoked, or while any thread is waiting, then
* {@link BrokenBarrierException} is thrown. * {@link BrokenBarrierException} is thrown.
* *
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
...@@ -412,16 +416,17 @@ public class CyclicBarrier { ...@@ -412,16 +416,17 @@ public class CyclicBarrier {
* @param timeout the time to wait for the barrier * @param timeout the time to wait for the barrier
* @param unit the time unit of the timeout parameter * @param unit the time unit of the timeout parameter
* @return the arrival index of the current thread, where index * @return the arrival index of the current thread, where index
* <tt>{@link #getParties()} - 1</tt> indicates the first * {@code getParties() - 1} indicates the first
* to arrive and zero indicates the last to arrive * to arrive and zero indicates the last to arrive
* @throws InterruptedException if the current thread was interrupted * @throws InterruptedException if the current thread was interrupted
* while waiting * while waiting
* @throws TimeoutException if the specified timeout elapses * @throws TimeoutException if the specified timeout elapses.
* In this case the barrier will be broken.
* @throws BrokenBarrierException if <em>another</em> thread was * @throws BrokenBarrierException if <em>another</em> thread was
* interrupted or timed out while the current thread was * interrupted or timed out while the current thread was
* waiting, or the barrier was reset, or the barrier was broken * waiting, or the barrier was reset, or the barrier was broken
* when {@code await} was called, or the barrier action (if * when {@code await} was called, or the barrier action (if
* present) failed due an exception * present) failed due to an exception
*/ */
public int await(long timeout, TimeUnit unit) public int await(long timeout, TimeUnit unit)
throws InterruptedException, throws InterruptedException,
......
...@@ -46,7 +46,7 @@ import java.util.concurrent.locks.LockSupport; ...@@ -46,7 +46,7 @@ import java.util.concurrent.locks.LockSupport;
* {@link java.util.concurrent.CountDownLatch CountDownLatch} * {@link java.util.concurrent.CountDownLatch CountDownLatch}
* but supporting more flexible usage. * but supporting more flexible usage.
* *
* <p> <b>Registration.</b> Unlike the case for other barriers, the * <p><b>Registration.</b> Unlike the case for other barriers, the
* number of parties <em>registered</em> to synchronize on a phaser * number of parties <em>registered</em> to synchronize on a phaser
* may vary over time. Tasks may be registered at any time (using * may vary over time. Tasks may be registered at any time (using
* methods {@link #register}, {@link #bulkRegister}, or forms of * methods {@link #register}, {@link #bulkRegister}, or forms of
...@@ -59,7 +59,7 @@ import java.util.concurrent.locks.LockSupport; ...@@ -59,7 +59,7 @@ import java.util.concurrent.locks.LockSupport;
* (However, you can introduce such bookkeeping by subclassing this * (However, you can introduce such bookkeeping by subclassing this
* class.) * class.)
* *
* <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code * <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
* Phaser} may be repeatedly awaited. Method {@link * Phaser} may be repeatedly awaited. Method {@link
* #arriveAndAwaitAdvance} has effect analogous to {@link * #arriveAndAwaitAdvance} has effect analogous to {@link
* java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
...@@ -103,7 +103,7 @@ import java.util.concurrent.locks.LockSupport; ...@@ -103,7 +103,7 @@ import java.util.concurrent.locks.LockSupport;
* *
* </ul> * </ul>
* *
* <p> <b>Termination.</b> A phaser may enter a <em>termination</em> * <p><b>Termination.</b> A phaser may enter a <em>termination</em>
* state, that may be checked using method {@link #isTerminated}. Upon * state, that may be checked using method {@link #isTerminated}. Upon
* termination, all synchronization methods immediately return without * termination, all synchronization methods immediately return without
* waiting for advance, as indicated by a negative return value. * waiting for advance, as indicated by a negative return value.
...@@ -118,7 +118,7 @@ import java.util.concurrent.locks.LockSupport; ...@@ -118,7 +118,7 @@ import java.util.concurrent.locks.LockSupport;
* also available to abruptly release waiting threads and allow them * also available to abruptly release waiting threads and allow them
* to terminate. * to terminate.
* *
* <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., * <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
* constructed in tree structures) to reduce contention. Phasers with * constructed in tree structures) to reduce contention. Phasers with
* large numbers of parties that would otherwise experience heavy * large numbers of parties that would otherwise experience heavy
* synchronization contention costs may instead be set up so that * synchronization contention costs may instead be set up so that
...@@ -300,18 +300,20 @@ public class Phaser { ...@@ -300,18 +300,20 @@ public class Phaser {
private static final int PHASE_SHIFT = 32; private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints private static final int UNARRIVED_MASK = 0xffff; // to mask ints
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63; private static final long TERMINATION_BIT = 1L << 63;
// some special values // some special values
private static final int ONE_ARRIVAL = 1; private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT; private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
private static final int EMPTY = 1; private static final int EMPTY = 1;
// The following unpacking methods are usually manually inlined // The following unpacking methods are usually manually inlined
private static int unarrivedOf(long s) { private static int unarrivedOf(long s) {
int counts = (int)s; int counts = (int)s;
return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK; return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
} }
private static int partiesOf(long s) { private static int partiesOf(long s) {
...@@ -372,37 +374,44 @@ public class Phaser { ...@@ -372,37 +374,44 @@ public class Phaser {
* Manually tuned to speed up and minimize race windows for the * Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field. * common case of just decrementing unarrived field.
* *
* @param deregister false for arrive, true for arriveAndDeregister * @param adjust value to subtract from state;
* ONE_ARRIVAL for arrive,
* ONE_DEREGISTER for arriveAndDeregister
*/ */
private int doArrive(boolean deregister) { private int doArrive(int adjust) {
int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
final Phaser root = this.root; final Phaser root = this.root;
for (;;) { for (;;) {
long s = (root == this) ? state : reconcileState(); long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT); int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0) if (phase < 0)
return phase; return phase;
else if (counts == EMPTY || unarrived < 0) { int counts = (int)s;
if (root == this || reconcileState() == s) int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
throw new IllegalStateException(badArrive(s)); if (unarrived <= 0)
} throw new IllegalStateException(badArrive(s));
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 0) { if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT; int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root != this) if (root == this) {
return parent.doArrive(nextUnarrived == 0); if (onAdvance(phase, nextUnarrived))
if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT;
n |= TERMINATION_BIT; else if (nextUnarrived == 0)
else if (nextUnarrived == 0) n |= EMPTY;
n |= EMPTY; else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else else
n |= nextUnarrived; phase = parent.doArrive(ONE_ARRIVAL);
n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
} }
return phase; return phase;
} }
...@@ -417,42 +426,49 @@ public class Phaser { ...@@ -417,42 +426,49 @@ public class Phaser {
*/ */
private int doRegister(int registrations) { private int doRegister(int registrations) {
// adjustment to state // adjustment to state
long adj = ((long)registrations << PARTIES_SHIFT) | registrations; long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent; final Phaser parent = this.parent;
int phase; int phase;
for (;;) { for (;;) {
long s = state; long s = (parent == null) ? state : reconcileState();
int counts = (int)s; int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT; int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK; int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties) if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s)); throw new IllegalStateException(badRegister(s));
else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0) phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break; break;
else if (counts != EMPTY) { // not 1st registration if (counts != EMPTY) { // not 1st registration
if (parent == null || reconcileState() == s) { if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null); root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset, else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adj)) s, s + adjust))
break; break;
} }
} }
else if (parent == null) { // 1st root registration else if (parent == null) { // 1st root registration
long next = ((long)phase << PHASE_SHIFT) | adj; long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break; break;
} }
else { else {
synchronized (this) { // 1st sub registration synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock if (state == s) { // recheck under lock
parent.doRegister(1); phase = parent.doRegister(1);
do { // force current phase if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT); phase = (int)(root.state >>> PHASE_SHIFT);
// assert phase < 0 || (int)state == EMPTY; // assert (int)s == EMPTY;
} while (!UNSAFE.compareAndSwapLong }
(this, stateOffset, state,
((long)phase << PHASE_SHIFT) | adj));
break; break;
} }
} }
...@@ -467,10 +483,6 @@ public class Phaser { ...@@ -467,10 +483,6 @@ public class Phaser {
* subphasers have not yet done so, in which case they must finish * subphasers have not yet done so, in which case they must finish
* their own advance by setting unarrived to parties (or if * their own advance by setting unarrived to parties (or if
* parties is zero, resetting to unregistered EMPTY state). * parties is zero, resetting to unregistered EMPTY state).
* However, this method may also be called when "floating"
* subphasers with possibly some unarrived parties are merely
* catching up to current phase, in which case counts are
* unaffected.
* *
* @return reconciled state * @return reconciled state
*/ */
...@@ -478,16 +490,16 @@ public class Phaser { ...@@ -478,16 +490,16 @@ public class Phaser {
final Phaser root = this.root; final Phaser root = this.root;
long s = state; long s = state;
if (root != this) { if (root != this) {
int phase, u, p; int phase, p;
// CAS root phase with current parties; possibly trip unarrived // CAS to root phase with current parties, tripping unarrived
while ((phase = (int)(root.state >>> PHASE_SHIFT)) != while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) && (int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong !UNSAFE.compareAndSwapLong
(this, stateOffset, s, (this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) | s = (((long)phase << PHASE_SHIFT) |
(s & PARTIES_MASK) | ((phase < 0) ? (s & COUNTS_MASK) :
((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY : (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
(u = (int)s & UNARRIVED_MASK) == 0 ? p : u)))) ((s & PARTIES_MASK) | p))))))
s = state; s = state;
} }
return s; return s;
...@@ -619,7 +631,7 @@ public class Phaser { ...@@ -619,7 +631,7 @@ public class Phaser {
* of unarrived parties would become negative * of unarrived parties would become negative
*/ */
public int arrive() { public int arrive() {
return doArrive(false); return doArrive(ONE_ARRIVAL);
} }
/** /**
...@@ -639,7 +651,7 @@ public class Phaser { ...@@ -639,7 +651,7 @@ public class Phaser {
* of registered or unarrived parties would become negative * of registered or unarrived parties would become negative
*/ */
public int arriveAndDeregister() { public int arriveAndDeregister() {
return doArrive(true); return doArrive(ONE_DEREGISTER);
} }
/** /**
...@@ -666,17 +678,15 @@ public class Phaser { ...@@ -666,17 +678,15 @@ public class Phaser {
for (;;) { for (;;) {
long s = (root == this) ? state : reconcileState(); long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT); int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0) if (phase < 0)
return phase; return phase;
else if (counts == EMPTY || unarrived < 0) { int counts = (int)s;
if (reconcileState() == s) int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
throw new IllegalStateException(badArrive(s)); if (unarrived <= 0)
} throw new IllegalStateException(badArrive(s));
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) { s -= ONE_ARRIVAL)) {
if (unarrived != 0) if (unarrived > 1)
return root.internalAwaitAdvance(phase, null); return root.internalAwaitAdvance(phase, null);
if (root != this) if (root != this)
return parent.arriveAndAwaitAdvance(); return parent.arriveAndAwaitAdvance();
...@@ -809,8 +819,8 @@ public class Phaser { ...@@ -809,8 +819,8 @@ public class Phaser {
if (UNSAFE.compareAndSwapLong(root, stateOffset, if (UNSAFE.compareAndSwapLong(root, stateOffset,
s, s | TERMINATION_BIT)) { s, s | TERMINATION_BIT)) {
// signal all threads // signal all threads
releaseWaiters(0); releaseWaiters(0); // Waiters on evenQ
releaseWaiters(1); releaseWaiters(1); // Waiters on oddQ
return; return;
} }
} }
...@@ -1016,7 +1026,7 @@ public class Phaser { ...@@ -1016,7 +1026,7 @@ public class Phaser {
/** /**
* Possibly blocks and waits for phase to advance unless aborted. * Possibly blocks and waits for phase to advance unless aborted.
* Call only from root node. * Call only on root phaser.
* *
* @param phase current phase * @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout; * @param node if non-null, the wait node to track interrupt and timeout;
...@@ -1024,6 +1034,7 @@ public class Phaser { ...@@ -1024,6 +1034,7 @@ public class Phaser {
* @return current phase * @return current phase
*/ */
private int internalAwaitAdvance(int phase, QNode node) { private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change int lastUnarrived = 0; // to increase spins upon change
...@@ -1082,7 +1093,7 @@ public class Phaser { ...@@ -1082,7 +1093,7 @@ public class Phaser {
final boolean timed; final boolean timed;
boolean wasInterrupted; boolean wasInterrupted;
long nanos; long nanos;
long lastTime; final long deadline;
volatile Thread thread; // nulled to cancel wait volatile Thread thread; // nulled to cancel wait
QNode next; QNode next;
...@@ -1093,7 +1104,7 @@ public class Phaser { ...@@ -1093,7 +1104,7 @@ public class Phaser {
this.interruptible = interruptible; this.interruptible = interruptible;
this.nanos = nanos; this.nanos = nanos;
this.timed = timed; this.timed = timed;
this.lastTime = timed ? System.nanoTime() : 0L; this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread(); thread = Thread.currentThread();
} }
...@@ -1112,9 +1123,7 @@ public class Phaser { ...@@ -1112,9 +1123,7 @@ public class Phaser {
} }
if (timed) { if (timed) {
if (nanos > 0L) { if (nanos > 0L) {
long now = System.nanoTime(); nanos = deadline - System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
} }
if (nanos <= 0L) { if (nanos <= 0L) {
thread = null; thread = null;
...@@ -1129,7 +1138,7 @@ public class Phaser { ...@@ -1129,7 +1138,7 @@ public class Phaser {
return true; return true;
else if (!timed) else if (!timed)
LockSupport.park(this); LockSupport.park(this);
else if (nanos > 0) else if (nanos > 0L)
LockSupport.parkNanos(this, nanos); LockSupport.parkNanos(this, nanos);
return isReleasable(); return isReleasable();
} }
......
...@@ -36,10 +36,10 @@ ...@@ -36,10 +36,10 @@
package java.util.concurrent; package java.util.concurrent;
/** /**
* A <tt>TimeUnit</tt> represents time durations at a given unit of * A {@code TimeUnit} represents time durations at a given unit of
* granularity and provides utility methods to convert across units, * granularity and provides utility methods to convert across units,
* and to perform timing and delay operations in these units. A * and to perform timing and delay operations in these units. A
* <tt>TimeUnit</tt> does not maintain time information, but only * {@code TimeUnit} does not maintain time information, but only
* helps organize and use time representations that may be maintained * helps organize and use time representations that may be maintained
* separately across various contexts. A nanosecond is defined as one * separately across various contexts. A nanosecond is defined as one
* thousandth of a microsecond, a microsecond as one thousandth of a * thousandth of a microsecond, a microsecond as one thousandth of a
...@@ -47,7 +47,7 @@ package java.util.concurrent; ...@@ -47,7 +47,7 @@ package java.util.concurrent;
* as sixty seconds, an hour as sixty minutes, and a day as twenty four * as sixty seconds, an hour as sixty minutes, and a day as twenty four
* hours. * hours.
* *
* <p>A <tt>TimeUnit</tt> is mainly used to inform time-based methods * <p>A {@code TimeUnit} is mainly used to inform time-based methods
* how a given timing parameter should be interpreted. For example, * how a given timing parameter should be interpreted. For example,
* the following code will timeout in 50 milliseconds if the {@link * the following code will timeout in 50 milliseconds if the {@link
* java.util.concurrent.locks.Lock lock} is not available: * java.util.concurrent.locks.Lock lock} is not available:
...@@ -63,7 +63,7 @@ package java.util.concurrent; ...@@ -63,7 +63,7 @@ package java.util.concurrent;
* *
* Note however, that there is no guarantee that a particular timeout * Note however, that there is no guarantee that a particular timeout
* implementation will be able to notice the passage of time at the * implementation will be able to notice the passage of time at the
* same granularity as the given <tt>TimeUnit</tt>. * same granularity as the given {@code TimeUnit}.
* *
* @since 1.5 * @since 1.5
* @author Doug Lea * @author Doug Lea
...@@ -174,83 +174,82 @@ public enum TimeUnit { ...@@ -174,83 +174,82 @@ public enum TimeUnit {
// etc. are not declared abstract but otherwise act as abstract methods. // etc. are not declared abstract but otherwise act as abstract methods.
/** /**
* Convert the given time duration in the given unit to this * Converts the given time duration in the given unit to this unit.
* unit. Conversions from finer to coarser granularities * Conversions from finer to coarser granularities truncate, so
* truncate, so lose precision. For example converting * lose precision. For example, converting {@code 999} milliseconds
* <tt>999</tt> milliseconds to seconds results in * to seconds results in {@code 0}. Conversions from coarser to
* <tt>0</tt>. Conversions from coarser to finer granularities * finer granularities with arguments that would numerically
* with arguments that would numerically overflow saturate to * overflow saturate to {@code Long.MIN_VALUE} if negative or
* <tt>Long.MIN_VALUE</tt> if negative or <tt>Long.MAX_VALUE</tt> * {@code Long.MAX_VALUE} if positive.
* if positive.
* *
* <p>For example, to convert 10 minutes to milliseconds, use: * <p>For example, to convert 10 minutes to milliseconds, use:
* <tt>TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)</tt> * {@code TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)}
* *
* @param sourceDuration the time duration in the given <tt>sourceUnit</tt> * @param sourceDuration the time duration in the given {@code sourceUnit}
* @param sourceUnit the unit of the <tt>sourceDuration</tt> argument * @param sourceUnit the unit of the {@code sourceDuration} argument
* @return the converted duration in this unit, * @return the converted duration in this unit,
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively * or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow. * overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
*/ */
public long convert(long sourceDuration, TimeUnit sourceUnit) { public long convert(long sourceDuration, TimeUnit sourceUnit) {
throw new AbstractMethodError(); throw new AbstractMethodError();
} }
/** /**
* Equivalent to <tt>NANOSECONDS.convert(duration, this)</tt>. * Equivalent to
* {@link #convert(long, TimeUnit) NANOSECONDS.convert(duration, this)}.
* @param duration the duration * @param duration the duration
* @return the converted duration, * @return the converted duration,
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively * or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow. * overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
* @see #convert
*/ */
public long toNanos(long duration) { public long toNanos(long duration) {
throw new AbstractMethodError(); throw new AbstractMethodError();
} }
/** /**
* Equivalent to <tt>MICROSECONDS.convert(duration, this)</tt>. * Equivalent to
* {@link #convert(long, TimeUnit) MICROSECONDS.convert(duration, this)}.
* @param duration the duration * @param duration the duration
* @return the converted duration, * @return the converted duration,
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively * or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow. * overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
* @see #convert
*/ */
public long toMicros(long duration) { public long toMicros(long duration) {
throw new AbstractMethodError(); throw new AbstractMethodError();
} }
/** /**
* Equivalent to <tt>MILLISECONDS.convert(duration, this)</tt>. * Equivalent to
* {@link #convert(long, TimeUnit) MILLISECONDS.convert(duration, this)}.
* @param duration the duration * @param duration the duration
* @return the converted duration, * @return the converted duration,
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively * or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow. * overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
* @see #convert
*/ */
public long toMillis(long duration) { public long toMillis(long duration) {
throw new AbstractMethodError(); throw new AbstractMethodError();
} }
/** /**
* Equivalent to <tt>SECONDS.convert(duration, this)</tt>. * Equivalent to
* {@link #convert(long, TimeUnit) SECONDS.convert(duration, this)}.
* @param duration the duration * @param duration the duration
* @return the converted duration, * @return the converted duration,
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively * or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow. * overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
* @see #convert
*/ */
public long toSeconds(long duration) { public long toSeconds(long duration) {
throw new AbstractMethodError(); throw new AbstractMethodError();
} }
/** /**
* Equivalent to <tt>MINUTES.convert(duration, this)</tt>. * Equivalent to
* {@link #convert(long, TimeUnit) MINUTES.convert(duration, this)}.
* @param duration the duration * @param duration the duration
* @return the converted duration, * @return the converted duration,
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively * or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow. * overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
* @see #convert
* @since 1.6 * @since 1.6
*/ */
public long toMinutes(long duration) { public long toMinutes(long duration) {
...@@ -258,12 +257,12 @@ public enum TimeUnit { ...@@ -258,12 +257,12 @@ public enum TimeUnit {
} }
/** /**
* Equivalent to <tt>HOURS.convert(duration, this)</tt>. * Equivalent to
* {@link #convert(long, TimeUnit) HOURS.convert(duration, this)}.
* @param duration the duration * @param duration the duration
* @return the converted duration, * @return the converted duration,
* or <tt>Long.MIN_VALUE</tt> if conversion would negatively * or {@code Long.MIN_VALUE} if conversion would negatively
* overflow, or <tt>Long.MAX_VALUE</tt> if it would positively overflow. * overflow, or {@code Long.MAX_VALUE} if it would positively overflow.
* @see #convert
* @since 1.6 * @since 1.6
*/ */
public long toHours(long duration) { public long toHours(long duration) {
...@@ -271,10 +270,10 @@ public enum TimeUnit { ...@@ -271,10 +270,10 @@ public enum TimeUnit {
} }
/** /**
* Equivalent to <tt>DAYS.convert(duration, this)</tt>. * Equivalent to
* {@link #convert(long, TimeUnit) DAYS.convert(duration, this)}.
* @param duration the duration * @param duration the duration
* @return the converted duration * @return the converted duration
* @see #convert
* @since 1.6 * @since 1.6
*/ */
public long toDays(long duration) { public long toDays(long duration) {
...@@ -294,9 +293,9 @@ public enum TimeUnit { ...@@ -294,9 +293,9 @@ public enum TimeUnit {
* Performs a timed {@link Object#wait(long, int) Object.wait} * Performs a timed {@link Object#wait(long, int) Object.wait}
* using this time unit. * using this time unit.
* This is a convenience method that converts timeout arguments * This is a convenience method that converts timeout arguments
* into the form required by the <tt>Object.wait</tt> method. * into the form required by the {@code Object.wait} method.
* *
* <p>For example, you could implement a blocking <tt>poll</tt> * <p>For example, you could implement a blocking {@code poll}
* method (see {@link BlockingQueue#poll BlockingQueue.poll}) * method (see {@link BlockingQueue#poll BlockingQueue.poll})
* using: * using:
* *
...@@ -327,7 +326,7 @@ public enum TimeUnit { ...@@ -327,7 +326,7 @@ public enum TimeUnit {
* Performs a timed {@link Thread#join(long, int) Thread.join} * Performs a timed {@link Thread#join(long, int) Thread.join}
* using this time unit. * using this time unit.
* This is a convenience method that converts time arguments into the * This is a convenience method that converts time arguments into the
* form required by the <tt>Thread.join</tt> method. * form required by the {@code Thread.join} method.
* *
* @param thread the thread to wait for * @param thread the thread to wait for
* @param timeout the maximum time to wait. If less than * @param timeout the maximum time to wait. If less than
...@@ -347,7 +346,7 @@ public enum TimeUnit { ...@@ -347,7 +346,7 @@ public enum TimeUnit {
* Performs a {@link Thread#sleep(long, int) Thread.sleep} using * Performs a {@link Thread#sleep(long, int) Thread.sleep} using
* this time unit. * this time unit.
* This is a convenience method that converts time arguments into the * This is a convenience method that converts time arguments into the
* form required by the <tt>Thread.sleep</tt> method. * form required by the {@code Thread.sleep} method.
* *
* @param timeout the minimum time to sleep. If less than * @param timeout the minimum time to sleep. If less than
* or equal to zero, do not sleep at all. * or equal to zero, do not sleep at all.
......
...@@ -40,7 +40,7 @@ package java.util.concurrent; ...@@ -40,7 +40,7 @@ package java.util.concurrent;
* operations for which a timeout is specified need a means to * operations for which a timeout is specified need a means to
* indicate that the timeout has occurred. For many such operations it * indicate that the timeout has occurred. For many such operations it
* is possible to return a value that indicates timeout; when that is * is possible to return a value that indicates timeout; when that is
* not possible or desirable then <tt>TimeoutException</tt> should be * not possible or desirable then {@code TimeoutException} should be
* declared and thrown. * declared and thrown.
* *
* @since 1.5 * @since 1.5
...@@ -50,13 +50,13 @@ public class TimeoutException extends Exception { ...@@ -50,13 +50,13 @@ public class TimeoutException extends Exception {
private static final long serialVersionUID = 1900926677490660714L; private static final long serialVersionUID = 1900926677490660714L;
/** /**
* Constructs a <tt>TimeoutException</tt> with no specified detail * Constructs a {@code TimeoutException} with no specified detail
* message. * message.
*/ */
public TimeoutException() {} public TimeoutException() {}
/** /**
* Constructs a <tt>TimeoutException</tt> with the specified detail * Constructs a {@code TimeoutException} with the specified detail
* message. * message.
* *
* @param message the detail message * @param message the detail message
......
...@@ -48,7 +48,7 @@ ...@@ -48,7 +48,7 @@
* *
* {@link java.util.concurrent.Executor} is a simple standardized * {@link java.util.concurrent.Executor} is a simple standardized
* interface for defining custom thread-like subsystems, including * interface for defining custom thread-like subsystems, including
* thread pools, asynchronous IO, and lightweight task frameworks. * thread pools, asynchronous I/O, and lightweight task frameworks.
* Depending on which concrete Executor class is being used, tasks may * Depending on which concrete Executor class is being used, tasks may
* execute in a newly created thread, an existing task-execution thread, * execute in a newly created thread, an existing task-execution thread,
* or the thread calling {@link java.util.concurrent.Executor#execute * or the thread calling {@link java.util.concurrent.Executor#execute
...@@ -102,8 +102,10 @@ ...@@ -102,8 +102,10 @@
* <h2>Queues</h2> * <h2>Queues</h2>
* *
* The {@link java.util.concurrent.ConcurrentLinkedQueue} class * The {@link java.util.concurrent.ConcurrentLinkedQueue} class
* supplies an efficient scalable thread-safe non-blocking FIFO * supplies an efficient scalable thread-safe non-blocking FIFO queue.
* queue. * The {@link java.util.concurrent.ConcurrentLinkedDeque} class is
* similar, but additionally supports the {@link java.util.Deque}
* interface.
* *
* <p>Five implementations in {@code java.util.concurrent} support * <p>Five implementations in {@code java.util.concurrent} support
* the extended {@link java.util.concurrent.BlockingQueue} * the extended {@link java.util.concurrent.BlockingQueue}
...@@ -117,7 +119,7 @@ ...@@ -117,7 +119,7 @@
* for producer-consumer, messaging, parallel tasking, and * for producer-consumer, messaging, parallel tasking, and
* related concurrent designs. * related concurrent designs.
* *
* <p> Extended interface {@link java.util.concurrent.TransferQueue}, * <p>Extended interface {@link java.util.concurrent.TransferQueue},
* and implementation {@link java.util.concurrent.LinkedTransferQueue} * and implementation {@link java.util.concurrent.LinkedTransferQueue}
* introduce a synchronous {@code transfer} method (along with related * introduce a synchronous {@code transfer} method (along with related
* features) in which a producer may optionally block awaiting its * features) in which a producer may optionally block awaiting its
...@@ -216,9 +218,9 @@ ...@@ -216,9 +218,9 @@
* it may (or may not) reflect any updates since the iterator was * it may (or may not) reflect any updates since the iterator was
* created. * created.
* *
* <h2><a name="MemoryVisibility">Memory Consistency Properties</a></h2> * <h2 id="MemoryVisibility">Memory Consistency Properties</h2>
* *
* <a href="http://docs.oracle.com/javase/specs/jls/se7/html/index.html"> * <a href="http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5">
* Chapter 17 of the Java Language Specification</a> defines the * Chapter 17 of the Java Language Specification</a> defines the
* <i>happens-before</i> relation on memory operations such as reads and * <i>happens-before</i> relation on memory operations such as reads and
* writes of shared variables. The results of a write by one thread are * writes of shared variables. The results of a write by one thread are
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册