提交 c08f212f 编写于 作者: D dl

8056248: Improve ForkJoin thread throttling

Reviewed-by: psandoz, martin, chegar
上级 0affdd8c
...@@ -297,15 +297,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -297,15 +297,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
} }
/** /**
* Tries to set SIGNAL status unless already completed. Used by * If not done, sets SIGNAL status and performs Object.wait(timeout).
* ForkJoinPool. Other variants are directly incorporated into * This task may or may not be done on exit. Ignores interrupts.
* externalAwaitDone etc.
* *
* @return true if successful * @param timeout using Object.wait conventions.
*/ */
final boolean trySetSignal() { final void internalWait(long timeout) {
int s = status; int s;
return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); if ((s = status) >= 0 && // force completer to issue notify
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
notifyAll();
}
}
} }
/** /**
...@@ -313,35 +320,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -313,35 +320,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return status upon completion * @return status upon completion
*/ */
private int externalAwaitDone() { private int externalAwaitDone() {
int s; int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool cp = ForkJoinPool.common; ForkJoinPool.common.externalHelpComplete(
if ((s = status) >= 0) { (CountedCompleter<?>)this, 0) :
if (cp != null) { ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (this instanceof CountedCompleter) if (s >= 0 && (s = status) >= 0) {
s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); boolean interrupted = false;
else if (cp.tryExternalUnpush(this)) do {
s = doExec(); if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
} synchronized (this) {
if (s >= 0 && (s = status) >= 0) { if (status >= 0) {
boolean interrupted = false; try {
do { wait(0L);
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { } catch (InterruptedException ie) {
synchronized (this) { interrupted = true;
if (status >= 0) {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
} }
else
notifyAll();
} }
else
notifyAll();
} }
} while ((s = status) >= 0); }
if (interrupted) } while ((s = status) >= 0);
Thread.currentThread().interrupt(); if (interrupted)
} Thread.currentThread().interrupt();
} }
return s; return s;
} }
...@@ -351,22 +352,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -351,22 +352,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
private int externalInterruptibleAwaitDone() throws InterruptedException { private int externalInterruptibleAwaitDone() throws InterruptedException {
int s; int s;
ForkJoinPool cp = ForkJoinPool.common;
if (Thread.interrupted()) if (Thread.interrupted())
throw new InterruptedException(); throw new InterruptedException();
if ((s = status) >= 0 && cp != null) { if ((s = status) >= 0 &&
if (this instanceof CountedCompleter) (s = ((this instanceof CountedCompleter) ?
cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); ForkJoinPool.common.externalHelpComplete(
else if (cp.tryExternalUnpush(this)) (CountedCompleter<?>)this, 0) :
doExec(); ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
} 0)) >= 0) {
while ((s = status) >= 0) { while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) { synchronized (this) {
if (status >= 0) if (status >= 0)
wait(); wait(0L);
else else
notifyAll(); notifyAll();
}
} }
} }
} }
...@@ -386,7 +387,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -386,7 +387,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue). (w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s : tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this) : wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone(); externalAwaitDone();
} }
...@@ -399,7 +400,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -399,7 +400,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s; Thread t; ForkJoinWorkerThread wt; int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s : return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) : (wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone(); externalAwaitDone();
} }
...@@ -577,7 +579,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -577,7 +579,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Throwable ex; Throwable ex;
if (e == null || (ex = e.ex) == null) if (e == null || (ex = e.ex) == null)
return null; return null;
if (false && e.thrower != Thread.currentThread().getId()) { if (e.thrower != Thread.currentThread().getId()) {
Class<? extends Throwable> ec = ex.getClass(); Class<? extends Throwable> ec = ex.getClass();
try { try {
Constructor<?> noArgCtor = null; Constructor<?> noArgCtor = null;
...@@ -587,13 +589,17 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -587,13 +589,17 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Class<?>[] ps = c.getParameterTypes(); Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0) if (ps.length == 0)
noArgCtor = c; noArgCtor = c;
else if (ps.length == 1 && ps[0] == Throwable.class) else if (ps.length == 1 && ps[0] == Throwable.class) {
return (Throwable)(c.newInstance(ex)); Throwable wx = (Throwable)c.newInstance(ex);
return (wx == null) ? ex : wx;
}
} }
if (noArgCtor != null) { if (noArgCtor != null) {
Throwable wx = (Throwable)(noArgCtor.newInstance()); Throwable wx = (Throwable)(noArgCtor.newInstance());
wx.initCause(ex); if (wx != null) {
return wx; wx.initCause(ex);
return wx;
}
} }
} catch (Exception ignore) { } catch (Exception ignore) {
} }
...@@ -1017,67 +1023,40 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -1017,67 +1023,40 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
public final V get(long timeout, TimeUnit unit) public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException { throws InterruptedException, ExecutionException, TimeoutException {
int s;
long nanos = unit.toNanos(timeout);
if (Thread.interrupted()) if (Thread.interrupted())
throw new InterruptedException(); throw new InterruptedException();
// Messy in part because we measure in nanosecs, but wait in millisecs if ((s = status) >= 0 && nanos > 0L) {
int s; long ms; long d = System.nanoTime() + nanos;
long ns = unit.toNanos(timeout); long deadline = (d == 0L) ? 1L : d; // avoid 0
ForkJoinPool cp;
if ((s = status) >= 0 && ns > 0L) {
long deadline = System.nanoTime() + ns;
ForkJoinPool p = null;
ForkJoinPool.WorkQueue w = null;
Thread t = Thread.currentThread(); Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) { if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
p = wt.pool; s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
w = wt.workQueue;
p.helpJoinOnce(w, this); // no retries on failure
}
else if ((cp = ForkJoinPool.common) != null) {
if (this instanceof CountedCompleter)
cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
else if (cp.tryExternalUnpush(this))
doExec();
} }
boolean canBlock = false; else if ((s = ((this instanceof CountedCompleter) ?
boolean interrupted = false; ForkJoinPool.common.externalHelpComplete(
try { (CountedCompleter<?>)this, 0) :
while ((s = status) >= 0) { ForkJoinPool.common.tryExternalUnpush(this) ?
if (w != null && w.qlock < 0) doExec() : 0)) >= 0) {
cancelIgnoringExceptions(this); long ns, ms; // measure in nanosecs, but wait in millisecs
else if (!canBlock) { while ((s = status) >= 0 &&
if (p == null || p.tryCompensate(p.ctl)) (ns = deadline - System.nanoTime()) > 0L) {
canBlock = true; if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
} U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
else { synchronized (this) {
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && if (status >= 0)
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { wait(ms); // OK to throw InterruptedException
synchronized (this) { else
if (status >= 0) { notifyAll();
try {
wait(ms);
} catch (InterruptedException ie) {
if (p == null)
interrupted = true;
}
}
else
notifyAll();
}
} }
if ((s = status) < 0 || interrupted ||
(ns = deadline - System.nanoTime()) <= 0L)
break;
} }
} }
} finally {
if (p != null && canBlock)
p.incrementActiveCount();
} }
if (interrupted)
throw new InterruptedException();
} }
if (s >= 0)
s = status;
if ((s &= DONE_MASK) != NORMAL) { if ((s &= DONE_MASK) != NORMAL) {
Throwable ex; Throwable ex;
if (s == CANCELLED) if (s == CANCELLED)
......
...@@ -66,7 +66,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -66,7 +66,7 @@ public class ForkJoinWorkerThread extends Thread {
* owning thread. * owning thread.
* *
* Support for (non-public) subclass InnocuousForkJoinWorkerThread * Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapulation (via Unsafe) * requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields. * both here and in the subclass to access and set Thread fields.
*/ */
...@@ -118,7 +118,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -118,7 +118,7 @@ public class ForkJoinWorkerThread extends Thread {
* @return the index number * @return the index number
*/ */
public int getPoolIndex() { public int getPoolIndex() {
return workQueue.poolIndex >>> 1; // ignore odd/even tag bit return workQueue.getPoolIndex();
} }
/** /**
...@@ -171,7 +171,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -171,7 +171,7 @@ public class ForkJoinWorkerThread extends Thread {
} }
/** /**
* Erases ThreadLocals by nulling out Thread maps * Erases ThreadLocals by nulling out Thread maps.
*/ */
final void eraseThreadLocals() { final void eraseThreadLocals() {
U.putObject(this, THREADLOCALS, null); U.putObject(this, THREADLOCALS, null);
...@@ -246,8 +246,8 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -246,8 +246,8 @@ public class ForkJoinWorkerThread extends Thread {
/** /**
* Returns a new group with the system ThreadGroup (the * Returns a new group with the system ThreadGroup (the
* topmost, parentless group) as parent. Uses Unsafe to * topmost, parent-less group) as parent. Uses Unsafe to
* traverse Thread group and ThreadGroup parent fields. * traverse Thread.group and ThreadGroup.parent fields.
*/ */
private static ThreadGroup createThreadGroup() { private static ThreadGroup createThreadGroup() {
try { try {
...@@ -274,4 +274,3 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -274,4 +274,3 @@ public class ForkJoinWorkerThread extends Thread {
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册