提交 7b9c6d55 编写于 作者: D dl

6986050: Small clarifications and fixes for ForkJoin

Summary: Clarify FJ.get on throw InterruptedException, propagate ThreadFactory, shutdown transition
Reviewed-by: chegar
上级 6a380610
...@@ -42,7 +42,6 @@ import java.util.Collections; ...@@ -42,7 +42,6 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
...@@ -823,15 +822,13 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -823,15 +822,13 @@ public class ForkJoinPool extends AbstractExecutorService {
(workerCounts & RUNNING_COUNT_MASK) <= 1); (workerCounts & RUNNING_COUNT_MASK) <= 1);
long startTime = untimed? 0 : System.nanoTime(); long startTime = untimed? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt Thread.interrupted(); // clear/ignore interrupt
if (eventCount != ec || w.runState != 0 || if (eventCount != ec || w.isTerminating())
runState >= TERMINATING) // recheck after clear break; // recheck after clear
break;
if (untimed) if (untimed)
LockSupport.park(w); LockSupport.park(w);
else { else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS); LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
if (eventCount != ec || w.runState != 0 || if (eventCount != ec || w.isTerminating())
runState >= TERMINATING)
break; break;
if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS) if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
tryShutdownUnusedWorker(ec); tryShutdownUnusedWorker(ec);
...@@ -899,16 +896,23 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -899,16 +896,23 @@ public class ForkJoinPool extends AbstractExecutorService {
UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
wc + (ONE_RUNNING|ONE_TOTAL))) { wc + (ONE_RUNNING|ONE_TOTAL))) {
ForkJoinWorkerThread w = null; ForkJoinWorkerThread w = null;
Throwable fail = null;
try { try {
w = factory.newThread(this); w = factory.newThread(this);
} finally { // adjust on null or exceptional factory return } catch (Throwable ex) {
if (w == null) { fail = ex;
decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
tryTerminate(false); // handle failure during shutdown
}
} }
if (w == null) if (w == null) { // null or exceptional factory return
decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
tryTerminate(false); // handle failure during shutdown
// If originating from an external caller,
// propagate exception, else ignore
if (fail != null && runState < TERMINATING &&
!(Thread.currentThread() instanceof
ForkJoinWorkerThread))
UNSAFE.throwException(fail);
break; break;
}
w.start(recordWorker(w), ueh); w.start(recordWorker(w), ueh);
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
int c; // advance event count int c; // advance event count
...@@ -997,8 +1001,12 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -997,8 +1001,12 @@ public class ForkJoinPool extends AbstractExecutorService {
boolean active = w.active; boolean active = w.active;
boolean inactivate = false; boolean inactivate = false;
int pc = parallelism; int pc = parallelism;
int rs; while (w.runState == 0) {
while (w.runState == 0 && (rs = runState) < TERMINATING) { int rs = runState;
if (rs >= TERMINATING) { // propagate shutdown
w.shutdown();
break;
}
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
inactivate = active = w.active = false; inactivate = active = w.active = false;
...@@ -1126,6 +1134,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1126,6 +1134,7 @@ public class ForkJoinPool extends AbstractExecutorService {
return true; return true;
} }
/** /**
* Actions on transition to TERMINATING * Actions on transition to TERMINATING
* *
...@@ -1149,7 +1158,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1149,7 +1158,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (passes > 0 && !w.isTerminated()) { if (passes > 0 && !w.isTerminated()) {
w.cancelTasks(); w.cancelTasks();
LockSupport.unpark(w); LockSupport.unpark(w);
if (passes > 1) { if (passes > 1 && !w.isInterrupted()) {
try { try {
w.interrupt(); w.interrupt();
} catch (SecurityException ignore) { } catch (SecurityException ignore) {
...@@ -1725,6 +1734,13 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1725,6 +1734,13 @@ public class ForkJoinPool extends AbstractExecutorService {
return (runState & (TERMINATING|TERMINATED)) == TERMINATING; return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
} }
/**
* Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
*/
final boolean isAtLeastTerminating() {
return runState >= TERMINATING;
}
/** /**
* Returns {@code true} if this pool has been shut down. * Returns {@code true} if this pool has been shut down.
* *
......
...@@ -55,10 +55,10 @@ import java.util.WeakHashMap; ...@@ -55,10 +55,10 @@ import java.util.WeakHashMap;
* start other subtasks. As indicated by the name of this class, * start other subtasks. As indicated by the name of this class,
* many programs using {@code ForkJoinTask} employ only methods * many programs using {@code ForkJoinTask} employ only methods
* {@link #fork} and {@link #join}, or derivatives such as {@link * {@link #fork} and {@link #join}, or derivatives such as {@link
* #invokeAll}. However, this class also provides a number of other * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* methods that can come into play in advanced usages, as well as * provides a number of other methods that can come into play in
* extension mechanics that allow support of new forms of fork/join * advanced usages, as well as extension mechanics that allow
* processing. * support of new forms of fork/join processing.
* *
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}. * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of * The efficiency of {@code ForkJoinTask}s stems from a set of
...@@ -250,7 +250,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -250,7 +250,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s; // the odd construction reduces lock bias effects int s; // the odd construction reduces lock bias effects
while ((s = status) >= 0) { while ((s = status) >= 0) {
try { try {
synchronized(this) { synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait(); wait();
} }
...@@ -270,7 +270,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -270,7 +270,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s; int s;
if ((s = status) >= 0) { if ((s = status) >= 0) {
try { try {
synchronized(this) { synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL)) if (UNSAFE.compareAndSwapInt(this, statusOffset, s,SIGNAL))
wait(millis, 0); wait(millis, 0);
} }
...@@ -288,7 +288,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -288,7 +288,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private void externalAwaitDone() { private void externalAwaitDone() {
int s; int s;
while ((s = status) >= 0) { while ((s = status) >= 0) {
synchronized(this) { synchronized (this) {
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){ if (UNSAFE.compareAndSwapInt(this, statusOffset, s, SIGNAL)){
boolean interrupted = false; boolean interrupted = false;
while (status >= 0) { while (status >= 0) {
...@@ -669,11 +669,34 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -669,11 +669,34 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
setCompletion(NORMAL); setCompletion(NORMAL);
} }
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException { public final V get() throws InterruptedException, ExecutionException {
quietlyJoin(); int s;
if (Thread.interrupted()) if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
throw new InterruptedException(); quietlyJoin();
int s = status; s = status;
}
else {
while ((s = status) >= 0) {
synchronized (this) { // interruptible form of awaitDone
if (UNSAFE.compareAndSwapInt(this, statusOffset,
s, SIGNAL)) {
while (status >= 0)
wait();
}
}
}
}
if (s < NORMAL) { if (s < NORMAL) {
Throwable ex; Throwable ex;
if (s == CANCELLED) if (s == CANCELLED)
...@@ -684,6 +707,20 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -684,6 +707,20 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return getRawResult(); return getRawResult();
} }
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
* @throws TimeoutException if the wait timed out
*/
public final V get(long timeout, TimeUnit unit) public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException { throws InterruptedException, ExecutionException, TimeoutException {
Thread t = Thread.currentThread(); Thread t = Thread.currentThread();
...@@ -725,7 +762,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { ...@@ -725,7 +762,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
long ms = nt / 1000000; long ms = nt / 1000000;
int ns = (int) (nt % 1000000); int ns = (int) (nt % 1000000);
try { try {
synchronized(this) { synchronized (this) {
if (status >= 0) if (status >= 0)
wait(ms, ns); wait(ms, ns);
} }
......
...@@ -778,11 +778,20 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -778,11 +778,20 @@ public class ForkJoinWorkerThread extends Thread {
// status check methods used mainly by ForkJoinPool // status check methods used mainly by ForkJoinPool
final boolean isRunning() { return runState == 0; } final boolean isRunning() { return runState == 0; }
final boolean isTerminating() { return (runState & TERMINATING) != 0; }
final boolean isTerminated() { return (runState & TERMINATED) != 0; } final boolean isTerminated() { return (runState & TERMINATED) != 0; }
final boolean isSuspended() { return (runState & SUSPENDED) != 0; } final boolean isSuspended() { return (runState & SUSPENDED) != 0; }
final boolean isTrimmed() { return (runState & TRIMMED) != 0; } final boolean isTrimmed() { return (runState & TRIMMED) != 0; }
final boolean isTerminating() {
if ((runState & TERMINATING) != 0)
return true;
if (pool.isAtLeastTerminating()) { // propagate pool state
shutdown();
return true;
}
return false;
}
/** /**
* Sets state to TERMINATING. Does NOT unpark or interrupt * Sets state to TERMINATING. Does NOT unpark or interrupt
* to wake up if currently blocked. Callers must do so if desired. * to wake up if currently blocked. Callers must do so if desired.
......
...@@ -138,7 +138,7 @@ package java.util.concurrent; ...@@ -138,7 +138,7 @@ package java.util.concurrent;
* if (right.tryUnfork()) // directly calculate if not stolen * if (right.tryUnfork()) // directly calculate if not stolen
* sum += right.atLeaf(right.lo, right.hi); * sum += right.atLeaf(right.lo, right.hi);
* else { * else {
* right.helpJoin(); * right.join();
* sum += right.result; * sum += right.result;
* } * }
* right = right.next; * right = right.next;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册