提交 a693e4dc 编写于 作者: C chegar

8008378: FJP.commonPool support parallelism 0, add awaitQuiescence

Reviewed-by: chegar
Contributed-by: NDoug Lea &lt;dl@cs.oswego.edu&gt;, Chris Hegarty <chris.hegarty@oracle.com>
上级 f69cd76a
......@@ -35,6 +35,7 @@
package java.util.concurrent;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -104,38 +105,45 @@ import java.util.concurrent.TimeUnit;
* there is little difference among choice of methods.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <caption>Summary of task execution methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
* <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
* </tr>
* <tr>
* <td> <b>Arrange async execution</td>
* <td> <b>Arrange async execution</b></td>
* <td> {@link #execute(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork}</td>
* </tr>
* <tr>
* <td> <b>Await and obtain result</td>
* <td> <b>Await and obtain result</b></td>
* <td> {@link #invoke(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#invoke}</td>
* </tr>
* <tr>
* <td> <b>Arrange exec and obtain Future</td>
* <td> <b>Arrange exec and obtain Future</b></td>
* <td> {@link #submit(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
* </tr>
* </table>
*
* <p>The common pool is by default constructed with default
* parameters, but these may be controlled by setting three {@link
* System#getProperty system properties} with prefix {@code
* java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
* an integer greater than zero, {@code threadFactory} -- the class
* name of a {@link ForkJoinWorkerThreadFactory}, and {@code
* exceptionHandler} -- the class name of a {@link
* java.lang.Thread.UncaughtExceptionHandler
* Thread.UncaughtExceptionHandler}. Upon any error in establishing
* these settings, default parameters are used.
* parameters, but these may be controlled by setting three
* {@linkplain System#getProperty system properties}:
* <ul>
* <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
* - the parallelism level, a non-negative integer
* <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
* - the class name of a {@link ForkJoinWorkerThreadFactory}
* <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
* - the class name of a {@link UncaughtExceptionHandler}
* </ul>
* The system class loader is used to load these classes.
* Upon any error in establishing these settings, default parameters
* are used. It is possible to disable or limit the use of threads in
* the common pool by setting the parallelism property to zero, and/or
* using a factory that may return {@code null}.
*
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
......@@ -225,18 +233,18 @@ public class ForkJoinPool extends AbstractExecutorService {
* for work-stealing (this would contaminate lifo/fifo
* processing). Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocal Submitter class contains a value initially used as
* a hash code for choosing existing queues, but may be randomly
* repositioned upon contention with other submitters. In
* essence, submitters act like workers except that they are
* restricted to executing local tasks that they submitted (or in
* the case of CountedCompleters, others with the same root task).
* However, because most shared/external queue operations are more
* expensive than internal, and because, at steady state, external
* submitters will compete for CPU with workers, ForkJoinTask.join
* and related methods disable them from repeatedly helping to
* process tasks if all workers are active. Insertion of tasks in
* shared mode requires a lock (mainly to protect in the case of
* ThreadLocalRandom probe value serves as a hash code for
* choosing existing queues, and may be randomly repositioned upon
* contention with other submitters. In essence, submitters act
* like workers except that they are restricted to executing local
* tasks that they submitted (or in the case of CountedCompleters,
* others with the same root task). However, because most
* shared/external queue operations are more expensive than
* internal, and because, at steady state, external submitters
* will compete for CPU with workers, ForkJoinTask.join and
* related methods disable them from repeatedly helping to process
* tasks if all workers are active. Insertion of tasks in shared
* mode requires a lock (mainly to protect in the case of
* resizing) but we use only a simple spinlock (using bits in
* field qlock), because submitters encountering a busy queue move
* on to try or create other queues -- they block only when
......@@ -469,7 +477,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* Common Pool
* ===========
*
* The static commonPool always exists after static
* The static common Pool always exists after static
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
......@@ -548,6 +556,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param pool the pool this thread works in
* @throws NullPointerException if the pool is null
* @return the new worker thread
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
......@@ -563,26 +572,6 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
/**
* Per-thread records for threads that submit to pools. Currently
* holds only pseudo-random seed / index that is used to choose
* submission queues in method externalPush. In the future, this may
* also incorporate a means to implement different task rejection
* and resubmission policies.
*
* Seeds for submitters and workers/workQueues work in basically
* the same way but are initialized and updated using slightly
* different mechanics. Both are initialized using the same
* approach as in class ThreadLocal, where successive values are
* unlikely to collide with previous values. Seeds are then
* randomly modified upon collisions using xorshifts, which
* requires a non-zero seed.
*/
static final class Submitter {
int seed;
Submitter(int s) { seed = s; }
}
/**
* Class for artificial tasks that are used to replace the target
* of local joins if they are removed from an interior queue slot
......@@ -737,7 +726,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throw RejectedExecutionException if array cannot be resized
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
......@@ -936,7 +925,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* or any other cancelled task. Returns (true) on any CAS
* or consistency check failure so caller can retry.
*
* @return false if no progress can be made, else true;
* @return false if no progress can be made, else true
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
boolean stat = true, removed = false, empty = true;
......@@ -981,7 +970,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Polls for and executes the given task or any other task in
* its CountedCompleter computation
* its CountedCompleter computation.
*/
final boolean pollAndExecCC(ForkJoinTask<?> root) {
ForkJoinTask<?>[] a; int b; Object o;
......@@ -1055,7 +1044,6 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final int ABASE;
private static final int ASHIFT;
static {
int s;
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = WorkQueue.class;
......@@ -1063,13 +1051,13 @@ public class ForkJoinPool extends AbstractExecutorService {
QLOCK = U.objectFieldOffset
(k.getDeclaredField("qlock"));
ABASE = U.arrayBaseOffset(ak);
s = U.arrayIndexScale(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}
}
......@@ -1082,15 +1070,6 @@ public class ForkJoinPool extends AbstractExecutorService {
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
/**
* Per-thread submission bookkeeping. Shared across all pools
* to reduce ThreadLocal pollution and because random motion
* to avoid contention in one pool is likely to hold for others.
* Lazily initialized on first submission (but null-checked
* in other contexts to avoid unnecessary initialization).
*/
static final ThreadLocal<Submitter> submitters;
/**
* Permission required for callers of methods that may start or
* kill threads.
......@@ -1103,12 +1082,15 @@ public class ForkJoinPool extends AbstractExecutorService {
* to paranoically avoid potential initialization circularities
* as well as to simplify generated code.
*/
static final ForkJoinPool commonPool;
static final ForkJoinPool common;
/**
* Common pool parallelism. Must equal commonPool.parallelism.
* Common pool parallelism. To allow simpler use and management
* when common pool threads are disabled, we allow the underlying
* common.config field to be zero, but in that case still report
* parallelism as 1 to reflect resulting caller-runs mechanics.
*/
static final int commonPoolParallelism;
static final int commonParallelism;
/**
* Sequence number for creating workerNamePrefix.
......@@ -1116,8 +1098,8 @@ public class ForkJoinPool extends AbstractExecutorService {
private static int poolNumberSequence;
/**
* Return the next sequence number. We don't expect this to
* ever contend so use simple builtin sync.
* Returns the next sequence number. We don't expect this to
* ever contend, so use simple builtin sync.
*/
private static final synchronized int nextPoolId() {
return ++poolNumberSequence;
......@@ -1161,7 +1143,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private static final int SEED_INCREMENT = 0x61c88647;
/**
/*
* Bits and masks for control variables
*
* Field ctl is a long packed with:
......@@ -1268,39 +1250,28 @@ public class ForkJoinPool extends AbstractExecutorService {
final int config; // mode and parallelism level
WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
volatile Object pad18, pad19, pad1a, pad1b;
/*
/**
* Acquires the plock lock to protect worker array and related
* updates. This method is called only if an initial CAS on plock
* fails. This acts as a spinLock for normal cases, but falls back
* fails. This acts as a spinlock for normal cases, but falls back
* to builtin monitor to block when (rarely) needed. This would be
* a terrible idea for a highly contended lock, but works fine as
* a more conservative alternative to a pure spinlock.
*/
private int acquirePlock() {
int spins = PL_SPINS, r = 0, ps, nps;
int spins = PL_SPINS, ps, nps;
for (;;) {
if (((ps = plock) & PL_LOCK) == 0 &&
U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
return nps;
else if (r == 0) { // randomize spins if possible
Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
if ((t instanceof ForkJoinWorkerThread) &&
(w = ((ForkJoinWorkerThread)t).workQueue) != null)
r = w.seed;
else if ((z = submitters.get()) != null)
r = z.seed;
else
r = 1;
}
else if (spins >= 0) {
r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
if (r >= 0)
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
......@@ -1331,39 +1302,6 @@ public class ForkJoinPool extends AbstractExecutorService {
synchronized (this) { notifyAll(); }
}
/**
* Performs secondary initialization, called when plock is zero.
* Creates workQueue array and sets plock to a valid value. The
* lock body must be exception-free (so no try/finally) so we
* optimistically allocate new array outside the lock and throw
* away if (very rarely) not needed. (A similar tactic is used in
* fullExternalPush.) Because the plock seq value can eventually
* wrap around zero, this method harmlessly fails to reinitialize
* if workQueues exists, while still advancing plock.
*
* Additionally tries to create the first worker.
*/
private void initWorkers() {
WorkQueue[] ws, nws; int ps;
int p = config & SMASK; // find power of two table size
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
n = (n + 1) << 1;
if ((ws = workQueues) == null || ws.length == 0)
nws = new WorkQueue[n];
else
nws = null;
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
if (((ws = workQueues) == null || ws.length == 0) && nws != null)
workQueues = nws;
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
releasePlock(nps);
tryAddWorker();
}
/**
* Tries to create and start one worker if fewer than target
* parallelism level exist. Adjusts counts etc on failure.
......@@ -1406,7 +1344,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
wt.setDaemon(true);
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
......@@ -1450,7 +1388,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* array, and adjusts counts. If pool is shutting down, tries to
* complete termination.
*
* @param wt the worker thread or null if construction failed
* @param wt the worker thread, or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
......@@ -1489,7 +1427,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (e > 0) { // activate or create replacement
if ((ws = workQueues) == null ||
(i = e & SMASK) >= ws.length ||
(v = ws[i]) != null)
(v = ws[i]) == null)
break;
long nc = (((long)(v.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
......@@ -1526,10 +1464,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
if ((z = submitters.get()) != null && plock > 0 &&
WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a;
if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 &&
(ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & z.seed & SQMASK]) != null &&
(q = ws[m & z & SQMASK]) != null &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
int b = q.base, s = q.top, n, an;
if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
......@@ -1549,34 +1487,48 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Full version of externalPush. This method is called, among
* other times, upon the first submission of the first task to the
* pool, so must perform secondary initialization (via
* initWorkers). It also detects first submission by an external
* thread by looking up its ThreadLocal, and creates a new shared
* queue if the one at index if empty or contended. The plock lock
* body must be exception-free (so no try/finally) so we
* optimistically allocate new queues outside the lock and throw
* them away if (very rarely) not needed.
* pool, so must perform secondary initialization. It also
* detects first submission by an external thread by looking up
* its ThreadLocal, and creates a new shared queue if the one at
* index if empty or contended. The plock lock body must be
* exception-free (so no try/finally) so we optimistically
* allocate new queues outside the lock and throw them away if
* (very rarely) not needed.
*
* Secondary initialization occurs when plock is zero, to create
* workQueue array and set plock to a valid value. This lock body
* must also be exception-free. Because the plock seq value can
* eventually wrap around zero, this method harmlessly fails to
* reinitialize if workQueues exists, while still advancing plock.
*/
private void fullExternalPush(ForkJoinTask<?> task) {
int r = 0; // random index seed
for (Submitter z = submitters.get();;) {
WorkQueue[] ws; WorkQueue q; int ps, m, k;
if (z == null) {
if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
r += SEED_INCREMENT) && r != 0)
submitters.set(z = new Submitter(r));
}
else if (r == 0) { // move to a different index
r = z.seed;
r ^= r << 13; // same xorshift as WorkQueues
r ^= r >>> 17;
z.seed = r ^ (r << 5);
int r;
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
else if ((ps = plock) < 0)
for (;;) {
WorkQueue[] ws; WorkQueue q; int ps, m, k;
boolean move = false;
if ((ps = plock) < 0)
throw new RejectedExecutionException();
else if (ps == 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0)
initWorkers();
(m = ws.length - 1) < 0) { // initialize workQueues
int p = config & SMASK; // find power of two table size
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
new WorkQueue[n] : null);
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
if (((ws = workQueues) == null || ws.length == 0) && nws != null)
workQueues = nws;
int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
releasePlock(nps);
}
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
......@@ -1598,7 +1550,7 @@ public class ForkJoinPool extends AbstractExecutorService {
return;
}
}
r = 0; // move on failure
move = true; // move on failure
}
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
q = new WorkQueue(this, null, SHARED_QUEUE, r);
......@@ -1612,7 +1564,9 @@ public class ForkJoinPool extends AbstractExecutorService {
releasePlock(nps);
}
else
r = 0; // try elsewhere while lock held
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
......@@ -1703,7 +1657,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* park awaiting signal, else lingering to help scan and signal.
*
* * If a non-empty queue discovered or left as a hint,
* help wake up other workers before return
* help wake up other workers before return.
*
* @param w the worker (via its WorkQueue)
* @return a task or null if none found
......@@ -1758,14 +1712,13 @@ public class ForkJoinPool extends AbstractExecutorService {
else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
idleAwaitWork(w, nc, c);
}
else if (w.eventCount < 0 && !tryTerminate(false, false) &&
ctl == c) { // block
else if (w.eventCount < 0 && ctl == c) {
Thread wt = Thread.currentThread();
Thread.interrupted(); // clear status
U.putObject(wt, PARKBLOCKER, this);
w.parker = wt; // emulate LockSupport.park
if (w.eventCount < 0) // recheck
U.park(false, 0L);
U.park(false, 0L); // block
w.parker = null;
U.putObject(wt, PARKBLOCKER, null);
}
......@@ -1774,7 +1727,7 @@ public class ForkJoinPool extends AbstractExecutorService {
(ws = workQueues) != null && h < ws.length &&
(q = ws[h]) != null) { // signal others before retry
WorkQueue v; Thread p; int u, i, s;
for (int n = (config & SMASK) >>> 1;;) {
for (int n = (config & SMASK) - 1;;) {
int idleCount = (w.eventCount < 0) ? 0 : -1;
if (((s = idleCount - q.base + q.top) <= n &&
(n = s) <= 0) ||
......@@ -1814,7 +1767,8 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
if (w != null && w.eventCount < 0 &&
!tryTerminate(false, false) && (int)prevCtl != 0) {
!tryTerminate(false, false) && (int)prevCtl != 0 &&
ctl == currentCtl) {
int dc = -(short)(currentCtl >>> TC_SHIFT);
long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
......@@ -1832,6 +1786,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
w.hint = -1;
w.qlock = -1; // shrink
break;
}
......@@ -1973,7 +1928,6 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param task the task to join
* @param mode if shared, exit upon completing any task
* if all workers are active
*
*/
private int helpComplete(ForkJoinTask<?> task, int mode) {
WorkQueue[] ws; WorkQueue q; int m, n, s, u;
......@@ -2125,29 +2079,22 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Returns a (probably) non-empty steal queue, if one is found
* during a random, then cyclic scan, else null. This method must
* be retried by caller if, by the time it tries to use the queue,
* it is empty.
* during a scan, else null. This method must be retried by
* caller if, by the time it tries to use the queue, it is empty.
* @param r a (random) seed for scanning
*/
private WorkQueue findNonEmptyStealQueue(int r) {
for (WorkQueue[] ws;;) {
int ps = plock, m, n;
if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
return null;
for (int j = (m + 1) << 2; ;) {
WorkQueue q = ws[(((r + j) << 1) | 1) & m];
if (q != null && (n = q.base - q.top) < 0) {
if (n < -1)
signalWork(q);
for (;;) {
int ps = plock, m; WorkQueue[] ws; WorkQueue q;
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
for (int j = (m + 1) << 2; j >= 0; --j) {
if ((q = ws[(((r + j) << 1) | 1) & m]) != null &&
q.base - q.top < 0)
return q;
}
else if (--j < 0) {
}
if (plock == ps)
return null;
break;
}
}
}
}
......@@ -2159,37 +2106,34 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void helpQuiescePool(WorkQueue w) {
for (boolean active = true;;) {
ForkJoinTask<?> localTask; // exhaust local queue
while ((localTask = w.nextLocalTask()) != null)
localTask.doExec();
// Similar to loop in scan(), but ignoring submissions
WorkQueue q = findNonEmptyStealQueue(w.nextSeed());
if (q != null) {
ForkJoinTask<?> t; int b;
long c; WorkQueue q; ForkJoinTask<?> t; int b;
while ((t = w.nextLocalTask()) != null) {
if (w.base - w.top < 0)
signalWork(w);
t.doExec();
}
if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
if (!active) { // re-establish active count
long c;
active = true;
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, c + AC_UNIT));
}
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
if (q.base - q.top < 0)
signalWork(q);
w.runSubtask(t);
}
else {
long c;
if (active) { // decrement active count without queuing
active = false;
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, c -= AC_UNIT));
}
else
c = ctl; // re-increment on exit
if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, c + AC_UNIT));
break;
}
else if (active) { // decrement active count without queuing
long nc = (c = ctl) - AC_UNIT;
if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
return; // bypass decrement-then-increment
if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
}
else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
return;
}
}
......@@ -2205,10 +2149,13 @@ public class ForkJoinPool extends AbstractExecutorService {
return t;
if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
return null;
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
if (q.base - q.top < 0)
signalWork(q);
return t;
}
}
}
/**
* Returns a cheap heuristic guide for task partitioning when
......@@ -2235,7 +2182,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* producing extra tasks amortizes the uncertainty of progress and
* diffusion assumptions.
*
* So, users will want to use values larger, but not much larger
* So, users will want to use values larger (but not much larger)
* than 1 to both smooth over transient shortages and hedge
* against uneven progress; as traded off against the cost of
* extra task overhead. We leave the user to pick a threshold
......@@ -2288,8 +2235,19 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now, boolean enable) {
if (this == commonPool) // cannot shut down
int ps;
if (this == common) // cannot shut down
return false;
if ((ps = plock) >= 0) { // enable by setting plock
if (!enable)
return false;
if ((ps & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
releasePlock(nps);
}
for (long c;;) {
if (((c = ctl) & STOP_BIT) != 0) { // already terminating
if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
......@@ -2299,34 +2257,27 @@ public class ForkJoinPool extends AbstractExecutorService {
}
return true;
}
if (plock >= 0) { // not yet enabled
int ps;
if (!enable)
return false;
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
releasePlock(SHUTDOWN);
}
if (!now) { // check if idle & no tasks
if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
hasQueuedSubmissions())
WorkQueue[] ws; WorkQueue w;
if ((int)(c >> AC_SHIFT) != -(config & SMASK))
return false;
// Check for unqueued inactive workers. One pass suffices.
WorkQueue[] ws = workQueues; WorkQueue w;
if (ws != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null && w.eventCount >= 0)
if ((ws = workQueues) != null) {
for (int i = 0; i < ws.length; ++i) {
if ((w = ws[i]) != null) {
if (!w.isEmpty()) { // signal unprocessed tasks
signalWork(w);
return false;
}
if ((i & 1) != 0 && w.eventCount >= 0)
return false; // unqueued inactive worker
}
}
}
}
if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
for (int pass = 0; pass < 3; ++pass) {
WorkQueue[] ws = workQueues;
if (ws != null) {
WorkQueue w; Thread wt;
WorkQueue[] ws; WorkQueue w; Thread wt;
if ((ws = workQueues) != null) {
int n = ws.length;
for (int i = 0; i < n; ++i) {
if ((w = ws[i]) != null) {
......@@ -2337,7 +2288,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (!wt.isInterrupted()) {
try {
wt.interrupt();
} catch (SecurityException ignore) {
} catch (Throwable ignore) {
}
}
U.unpark(wt);
......@@ -2348,7 +2299,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// Wake up workers parked on event queue
int i, e; long cc; Thread p;
while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
(i = e & SMASK) < n &&
(i = e & SMASK) < n && i >= 0 &&
(w = ws[i]) != null) {
long nc = ((long)(w.nextWait & E_MASK) |
((cc + AC_UNIT) & AC_MASK) |
......@@ -2374,26 +2325,26 @@ public class ForkJoinPool extends AbstractExecutorService {
* least one task.
*/
static WorkQueue commonSubmitterQueue() {
ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
return ((z = submitters.get()) != null &&
(p = commonPool) != null &&
ForkJoinPool p; WorkQueue[] ws; int m, z;
return ((z = ThreadLocalRandom.getProbe()) != 0 &&
(p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0) ?
ws[m & z.seed & SQMASK] : null;
ws[m & z & SQMASK] : null;
}
/**
* Tries to pop the given task from submitter's queue in common pool.
*/
static boolean tryExternalUnpush(ForkJoinTask<?> t) {
ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
ForkJoinTask<?>[] a; int m, s;
ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
ForkJoinTask<?>[] a; int m, s, z;
if (t != null &&
(z = submitters.get()) != null &&
(p = commonPool) != null &&
(z = ThreadLocalRandom.getProbe()) != 0 &&
(p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0 &&
(q = ws[m & z.seed & SQMASK]) != null &&
(q = ws[m & z & SQMASK]) != null &&
(s = q.top) != q.base &&
(a = q.array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
......@@ -2445,7 +2396,8 @@ public class ForkJoinPool extends AbstractExecutorService {
if (task != null)
task.doExec();
if (root.status < 0 ||
(u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
(config != 0 &&
((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)))
break;
if (task == null) {
helpSignal(root, q.poolIndex);
......@@ -2463,14 +2415,14 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static void externalHelpJoin(ForkJoinTask<?> t) {
// Some hard-to-avoid overlap with tryExternalUnpush
ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
ForkJoinTask<?>[] a; int m, s, n;
ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w;
ForkJoinTask<?>[] a; int m, s, n, z;
if (t != null &&
(z = submitters.get()) != null &&
(p = commonPool) != null &&
(z = ThreadLocalRandom.getProbe()) != 0 &&
(p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0 &&
(q = ws[m & z.seed & SQMASK]) != null &&
(q = ws[m & z & SQMASK]) != null &&
(a = q.array) != null) {
int am = a.length - 1;
if ((s = q.top) != q.base) {
......@@ -2496,18 +2448,6 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
/**
* Restricted version of helpQuiescePool for external callers
*/
static void externalHelpQuiescePool() {
ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
if ((p = commonPool) != null &&
(q = p.findNonEmptyStealQueue(1)) != null &&
(b = q.base) - q.top < 0 &&
(t = q.pollAt(b)) != null)
t.doExec();
}
// Exported methods
// Constructors
......@@ -2524,7 +2464,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool() {
this(Runtime.getRuntime().availableProcessors(),
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
......@@ -2572,50 +2512,63 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
if (factory == null)
throw new NullPointerException();
}
private static int checkParallelism(int parallelism) {
if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
this.factory = factory;
this.ueh = handler;
this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
int pn = nextPoolId();
StringBuilder sb = new StringBuilder("ForkJoinPool-");
sb.append(Integer.toString(pn));
sb.append("-worker-");
this.workerNamePrefix = sb.toString();
return parallelism;
}
private static ForkJoinWorkerThreadFactory checkFactory
(ForkJoinWorkerThreadFactory factory) {
if (factory == null)
throw new NullPointerException();
return factory;
}
/**
* Constructor for common pool, suitable only for static initialization.
* Basically the same as above, but uses smallest possible initial footprint.
* Creates a {@code ForkJoinPool} with the given parameters, without
* any security checks or parameter validation. Invoked directly by
* makeCommonPool.
*/
ForkJoinPool(int parallelism, long ctl,
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler) {
this.config = parallelism;
this.ctl = ctl;
UncaughtExceptionHandler handler,
boolean asyncMode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
/**
* Returns the common pool instance. This pool is statically
* constructed; its run state is unaffected by attempts to
* {@link #shutdown} or {@link #shutdownNow}.
* constructed; its run state is unaffected by attempts to {@link
* #shutdown} or {@link #shutdownNow}. However this pool and any
* ongoing processing are automatically terminated upon program
* {@link System#exit}. Any program that relies on asynchronous
* task processing to complete before program termination should
* invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
* before exit.
*
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
// assert commonPool != null : "static init error";
return commonPool;
// assert common != null : "static init error";
return common;
}
// Execution methods
......@@ -2671,7 +2624,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.AdaptedRunnableAction(task);
job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
......@@ -2738,27 +2691,23 @@ public class ForkJoinPool extends AbstractExecutorService {
// In previous versions of this class, this method constructed
// a task to run ForkJoinTask.invokeAll, but now external
// invocation of multiple tasks is at least as efficient.
List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size());
// Workaround needed because method wasn't declared with
// wildcards in return type but should have been.
@SuppressWarnings({"unchecked", "rawtypes"})
List<Future<T>> futures = (List<Future<T>>) (List) fs;
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
futures.add(f);
externalPush(f);
fs.add(f);
}
for (ForkJoinTask<T> f : fs)
f.quietlyJoin();
for (int i = 0, size = futures.size(); i < size; i++)
((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
done = true;
return futures;
} finally {
if (!done)
for (ForkJoinTask<T> f : fs)
f.cancel(false);
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(false);
}
}
......@@ -2777,7 +2726,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @return the handler, or {@code null} if none
*/
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
public UncaughtExceptionHandler getUncaughtExceptionHandler() {
return ueh;
}
......@@ -2787,7 +2736,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
return config & SMASK;
int par = (config & SMASK);
return (par > 0) ? par : 1;
}
/**
......@@ -2797,7 +2747,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @since 1.8
*/
public static int getCommonPoolParallelism() {
return commonPoolParallelism;
return commonParallelism;
}
/**
......@@ -3055,7 +3005,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* Possibly initiates an orderly shutdown in which previously
* submitted tasks are executed, but no new tasks will be
* accepted. Invocation has no effect on execution state if this
* is the {@link #commonPool}, and no additional effect if
* is the {@link #commonPool()}, and no additional effect if
* already shut down. Tasks that are in the process of being
* submitted concurrently during the course of this method may or
* may not be rejected.
......@@ -3073,7 +3023,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Possibly attempts to cancel and/or stop all tasks, and reject
* all subsequently submitted tasks. Invocation has no effect on
* execution state if this is the {@link #commonPool}, and no
* execution state if this is the {@link #commonPool()}, and no
* additional effect if already shut down. Otherwise, tasks that
* are in the process of being submitted or executed concurrently
* during the course of this method may or may not be
......@@ -3136,9 +3086,10 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Blocks until all tasks have completed execution after a
* shutdown request, or the timeout occurs, or the current thread
* is interrupted, whichever happens first. Note that the {@link
* #commonPool()} never terminates until program shutdown so
* this method will always time out.
* is interrupted, whichever happens first. Because the {@link
* #commonPool()} never terminates until program shutdown, when
* applied to the common pool, this method is equivalent to {@link
* #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
......@@ -3148,6 +3099,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (this == common) {
awaitQuiescence(timeout, unit);
return false;
}
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
......@@ -3166,6 +3123,62 @@ public class ForkJoinPool extends AbstractExecutorService {
return terminated;
}
/**
* If called by a ForkJoinTask operating in this pool, equivalent
* in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
* waits and/or attempts to assist performing tasks until this
* pool {@link #isQuiescent} or the indicated timeout elapses.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if quiescent; {@code false} if the
* timeout elapsed.
*/
public boolean awaitQuiescence(long timeout, TimeUnit unit) {
long nanos = unit.toNanos(timeout);
ForkJoinWorkerThread wt;
Thread thread = Thread.currentThread();
if ((thread instanceof ForkJoinWorkerThread) &&
(wt = (ForkJoinWorkerThread)thread).pool == this) {
helpQuiescePool(wt.workQueue);
return true;
}
long startTime = System.nanoTime();
WorkQueue[] ws;
int r = 0, m;
boolean found = true;
while (!isQuiescent() && (ws = workQueues) != null &&
(m = ws.length - 1) >= 0) {
if (!found) {
if ((System.nanoTime() - startTime) > nanos)
return false;
Thread.yield(); // cannot block
}
found = false;
for (int j = (m + 1) << 2; j >= 0; --j) {
ForkJoinTask<?> t; WorkQueue q; int b;
if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
found = true;
if ((t = q.pollAt(b)) != null) {
if (q.base - q.top < 0)
signalWork(q);
t.doExec();
}
break;
}
}
}
return true;
}
/**
* Waits and/or attempts to assist performing tasks indefinitely
* until the {@link #commonPool()} {@link #isQuiescent}.
*/
static void quiesceCommonPool() {
common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
......@@ -3175,9 +3188,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* not necessary. Method {@code block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
* thread invoking {@link ForkJoinPool#managedBlock}. The
* unusual methods in this API accommodate synchronizers that may,
* but don't usually, block for long periods. Similarly, they
* thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
* The unusual methods in this API accommodate synchronizers that
* may, but don't usually, block for long periods. Similarly, they
* allow more efficient internal handling of cases in which
* additional workers may be, but usually are not, needed to
* ensure sufficient parallelism. Toward this end,
......@@ -3235,6 +3248,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Returns {@code true} if blocking is unnecessary.
* @return {@code true} if blocking is unnecessary
*/
boolean isReleasable();
}
......@@ -3319,7 +3333,7 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final long QLOCK;
static {
int s; // initialize field offsets for CAS etc
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
......@@ -3339,54 +3353,58 @@ public class ForkJoinPool extends AbstractExecutorService {
(wk.getDeclaredField("qlock"));
Class<?> ak = ForkJoinTask[].class;
ABASE = U.arrayBaseOffset(ak);
s = U.arrayIndexScale(ak);
ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0)
throw new Error("data type scale not a power of two");
submitters = new ThreadLocal<Submitter>();
ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
/*
* Establish common pool parameters. For extra caution,
* computations to set up common pool state are here; the
* constructor just assigns these values to fields.
*/
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
int par = 0;
Thread.UncaughtExceptionHandler handler = null;
try { // TBD: limit or report ignored exceptions?
/**
* Creates and returns the common pool, respecting user settings
* specified via system properties.
*/
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory
= defaultForkJoinWorkerThreadFactory;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accesing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
if (pp != null)
par = Integer.parseInt(pp);
} catch (Exception ignore) {
}
if (par <= 0)
par = Runtime.getRuntime().availableProcessors();
if (par > MAX_CAP)
par = MAX_CAP;
commonPoolParallelism = par;
long np = (long)(-par); // precompute initial ctl value
long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
commonPool = new ForkJoinPool(par, ct, fac, handler);
if (parallelism < 0)
parallelism = Runtime.getRuntime().availableProcessors();
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, false,
"ForkJoinPool.commonPool-worker-");
}
}
......@@ -464,7 +464,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Records exception and possibly propagates
* Records exception and possibly propagates.
*
* @return status on exit
*/
......@@ -497,7 +497,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Removes exception node and clears status
* Removes exception node and clears status.
*/
private void clearExceptionalCompletion() {
int h = System.identityHashCode(this);
......@@ -635,7 +635,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
throw (Error)ex;
if (ex instanceof RuntimeException)
throw (RuntimeException)ex;
throw uncheckedThrowable(ex, RuntimeException.class);
ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
}
......@@ -645,8 +645,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* unchecked exceptions
*/
@SuppressWarnings("unchecked") static <T extends Throwable>
T uncheckedThrowable(final Throwable t, final Class<T> c) {
return (T)t; // rely on vacuous cast
void uncheckedThrow(Throwable t) throws T {
if (t != null)
throw (T)t; // rely on vacuous cast
}
/**
......@@ -681,7 +682,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.commonPool.externalPush(this);
ForkJoinPool.common.externalPush(this);
return this;
}
......@@ -857,7 +858,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
* invoke {@link #completeExceptionally(Throwable)}.
*
* @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to
......@@ -1007,8 +1008,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (Thread.interrupted())
throw new InterruptedException();
// Messy in part because we measure in nanosecs, but wait in millisecs
int s; long ns, ms;
if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {
int s; long ms;
long ns = unit.toNanos(timeout);
if ((s = status) >= 0 && ns > 0L) {
long deadline = System.nanoTime() + ns;
ForkJoinPool p = null;
ForkJoinPool.WorkQueue w = null;
......@@ -1104,7 +1106,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
wt.pool.helpQuiescePool(wt.workQueue);
}
else
ForkJoinPool.externalHelpQuiescePool();
ForkJoinPool.quiesceCommonPool();
}
/**
......@@ -1390,6 +1392,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private static final long serialVersionUID = 5232453952276885070L;
}
/**
* Adaptor for Runnables in which failure forces worker exception
*/
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
void internalPropagateException(Throwable ex) {
rethrow(ex); // rethrow outside exec() catches.
}
private static final long serialVersionUID = 5232453952276885070L;
}
/**
* Adaptor for Callables
*/
......
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
/*
* @test
* @bug 8008378
* @summary Basic checks for parallelism 0, and null returning factory
* @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.parallelism=0 ThreadLessCommon
* @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.threadFactory=ThreadLessCommon$NullForkJoinWorkerThreadFactory ThreadLessCommon
* @author Chris Hegarty
*/
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
public class ThreadLessCommon {
static final int THRESHOLD = 1000;
static final boolean debug = true;
private static void realMain(String[] args) throws Throwable {
if (debug) {
String pp = System.getProperty(
"java.util.concurrent.ForkJoinPool.common.parallelism");
System.out.println(
"java.util.concurrent.ForkJoinPool.common.parallelism:" + pp);
String tf = System.getProperty(
"java.util.concurrent.ForkJoinPool.common.threadFactory");
System.out.println(
"java.util.concurrent.ForkJoinPool.common.threadFactory:" + tf);
}
long from = 0, to = 50000;
RecursiveTask<Long> task = new SumTask(from, to, Thread.currentThread());
long sum = task.invoke();
System.out.printf("%nSum: from [%d] to [%d] = [%d]%n", from, to, sum);
task.fork();
sum = task.join();
System.out.printf("%nSum: from [%d] to [%d] = [%d]%n", from, to, sum);
sum = ForkJoinPool.commonPool().invoke(task.fork());
System.out.printf("%nSum: from [%d] to [%d] = [%d]%n", from, to, sum);
}
static class SumTask extends RecursiveTask<Long> {
final Thread expectedThread;
final long from;
final long to;
SumTask(long from, long to, Thread thread) {
this.from = from; this.to = to; expectedThread = thread;
}
@Override
public Long compute() {
check(Thread.currentThread() == expectedThread,
"Expected " + expectedThread + ", got " + Thread.currentThread());
long range = to - from;
if (range < THRESHOLD) {
long acc = 0;
for (long i = from; i <= to; i++)
acc = acc + i;
return acc;
} else {
long half = from + range / 2;
SumTask t1 = new SumTask(from, half ,expectedThread);
SumTask t2 = new SumTask(half+1, to ,expectedThread);
if (half % 2 == 0) {
t1.fork();
return t2.compute() + t1.join();
} else {
invokeAll(t1, t2);
try { return t1.get() + t2.get(); }
catch (Exception x) { unexpected(x); return 0L;}
}
}
}
}
public static class NullForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory
{
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return null;
}
}
//--------------------- Infrastructure ---------------------------
static volatile int passed = 0, failed = 0;
static void pass() {passed++;}
static void fail() {failed++; /*Thread.dumpStack();*/}
static void fail(String msg) {System.out.println(msg); fail();}
static void unexpected(Throwable t) {failed++; t.printStackTrace();}
static void check(boolean cond, String msg) {if (cond) pass(); else fail(msg);}
static void equal(Object x, Object y) {
if (x == null ? y == null : x.equals(y)) pass();
else fail(x + " not equal to " + y);}
public static void main(String[] args) throws Throwable {
try {realMain(args);} catch (Throwable t) {unexpected(t);}
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new AssertionError("Some tests failed");}
}
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
/*
* @test
* @bug 8008378
* @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.exceptionHandler=ThrowingRunnable
* ThrowingRunnable
* @summary FJP.execute(Runnable), uncaught exception should cause worker thread
* to die.
* @author Chris Hegarty
*/
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class ThrowingRunnable implements Runnable, UncaughtExceptionHandler {
static final Phaser phaser = new Phaser(2);
private static void realMain(String[] args) throws Throwable {
ThrowingRunnable r = new ThrowingRunnable();
ForkJoinPool.commonPool().execute(r);
phaser.awaitAdvanceInterruptibly(phaser.arrive(), 10, TimeUnit.SECONDS);
pass();
}
@Override
public void run() {
throw new RuntimeException("This is an exception.");
}
@Override
public void uncaughtException(Thread t, Throwable e) {
pass();
phaser.arrive();
}
//--------------------- Infrastructure ---------------------------
static volatile int passed = 0, failed = 0;
static void pass() {passed++;}
static void fail() {failed++; /*Thread.dumpStack();*/}
static void fail(String msg) {System.out.println(msg); fail();}
static void unexpected(Throwable t) {failed++; t.printStackTrace();}
static void check(boolean cond, String msg) {if (cond) pass(); else fail(msg);}
static void equal(Object x, Object y) {
if (x == null ? y == null : x.equals(y)) pass();
else fail(x + " not equal to " + y);}
public static void main(String[] args) throws Throwable {
try {realMain(args);} catch (Throwable t) {unexpected(t);}
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new AssertionError("Some tests failed");}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册