提交 6970eeae 编写于 作者: D dl

8026155: Enhance ForkJoin pool

Reviewed-by: chegar, alanb, ahgross
上级 a8bddc45
...@@ -49,6 +49,9 @@ import java.util.concurrent.RejectedExecutionException; ...@@ -49,6 +49,9 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture; import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
import java.security.Permissions;
/** /**
* An {@link ExecutorService} for running {@link ForkJoinTask}s. * An {@link ExecutorService} for running {@link ForkJoinTask}s.
...@@ -140,6 +143,9 @@ import java.util.concurrent.TimeUnit; ...@@ -140,6 +143,9 @@ import java.util.concurrent.TimeUnit;
* <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
* - the class name of a {@link UncaughtExceptionHandler} * - the class name of a {@link UncaughtExceptionHandler}
* </ul> * </ul>
* If a {@link SecurityManager} is present and no factory is
* specified, then the default pool uses a factory supplying
* threads that have no {@link Permissions} enabled.
* The system class loader is used to load these classes. * The system class loader is used to load these classes.
* Upon any error in establishing these settings, default parameters * Upon any error in establishing these settings, default parameters
* are used. It is possible to disable or limit the use of threads in * are used. It is possible to disable or limit the use of threads in
...@@ -501,6 +507,16 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -501,6 +507,16 @@ public class ForkJoinPool extends AbstractExecutorService {
* task status checks) in inapplicable cases amounts to an odd * task status checks) in inapplicable cases amounts to an odd
* form of limited spin-wait before blocking in ForkJoinTask.join. * form of limited spin-wait before blocking in ForkJoinTask.join.
* *
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
* present. These workers have no permissions set, do not belong
* to any user-defined ThreadGroup, and erase all ThreadLocals
* after executing any top-level task (see WorkQueue.runTask). The
* associated mechanics (mainly in ForkJoinWorkerThread) may be
* JVM-dependent and must access particular Thread class fields to
* achieve this effect.
*
* Style notes * Style notes
* =========== * ===========
* *
...@@ -882,6 +898,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -882,6 +898,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/ */
final void runTask(ForkJoinTask<?> task) { final void runTask(ForkJoinTask<?> task) {
if ((currentSteal = task) != null) { if ((currentSteal = task) != null) {
ForkJoinWorkerThread thread;
task.doExec(); task.doExec();
ForkJoinTask<?>[] a = array; ForkJoinTask<?>[] a = array;
int md = mode; int md = mode;
...@@ -899,6 +916,8 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -899,6 +916,8 @@ public class ForkJoinPool extends AbstractExecutorService {
t.doExec(); t.doExec();
} }
} }
if ((thread = owner) != null) // no need to do in finally clause
thread.afterTopLevelExec();
} }
} }
...@@ -1155,7 +1174,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -1155,7 +1174,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* Increment for seed generators. See class ThreadLocal for * Increment for seed generators. See class ThreadLocal for
* explanation. * explanation.
*/ */
private static final int SEED_INCREMENT = 0x61c88647; private static final int SEED_INCREMENT = 0x9e3779b9;
/* /*
* Bits and masks for control variables * Bits and masks for control variables
...@@ -2084,10 +2103,8 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -2084,10 +2103,8 @@ public class ForkJoinPool extends AbstractExecutorService {
((c & ~AC_MASK) | ((c & ~AC_MASK) |
((c & AC_MASK) + AC_UNIT)))); ((c & AC_MASK) + AC_UNIT))));
} }
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
(w.currentSteal = t).doExec(); w.runTask(t);
w.currentSteal = ps;
}
} }
else if (active) { // decrement active count without queuing else if (active) { // decrement active count without queuing
long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
...@@ -3282,8 +3299,7 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -3282,8 +3299,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/ */
private static ForkJoinPool makeCommonPool() { private static ForkJoinPool makeCommonPool() {
int parallelism = -1; int parallelism = -1;
ForkJoinWorkerThreadFactory factory ForkJoinWorkerThreadFactory factory = null;
= defaultForkJoinWorkerThreadFactory;
UncaughtExceptionHandler handler = null; UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty String pp = System.getProperty
...@@ -3302,7 +3318,12 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -3302,7 +3318,12 @@ public class ForkJoinPool extends AbstractExecutorService {
getSystemClassLoader().loadClass(hp).newInstance()); getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) { } catch (Exception ignore) {
} }
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1; parallelism = 1;
...@@ -3312,4 +3333,38 @@ public class ForkJoinPool extends AbstractExecutorService { ...@@ -3312,4 +3333,38 @@ public class ForkJoinPool extends AbstractExecutorService {
"ForkJoinPool.commonPool-worker-"); "ForkJoinPool.commonPool-worker-");
} }
/**
* Factory for innocuous worker threads
*/
static final class InnocuousForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
/**
* An ACC to restrict permissions for the factory itself.
* The constructed workers have no permissions set.
*/
private static final AccessControlContext innocuousAcc;
static {
Permissions innocuousPerms = new Permissions();
innocuousPerms.add(modifyThreadPermission);
innocuousPerms.add(new RuntimePermission(
"enableContextClassLoaderOverride"));
innocuousPerms.add(new RuntimePermission(
"modifyThreadGroup"));
innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
new ProtectionDomain(null, innocuousPerms)
});
}
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
java.security.AccessController.doPrivileged(
new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
public ForkJoinWorkerThread run() {
return new ForkJoinWorkerThread.
InnocuousForkJoinWorkerThread(pool);
}}, innocuousAcc);
}
}
} }
...@@ -35,6 +35,9 @@ ...@@ -35,6 +35,9 @@
package java.util.concurrent; package java.util.concurrent;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
/** /**
* A thread managed by a {@link ForkJoinPool}, which executes * A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s. * {@link ForkJoinTask}s.
...@@ -61,6 +64,10 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -61,6 +64,10 @@ public class ForkJoinWorkerThread extends Thread {
* completes. This leads to a visibility race, that is tolerated * completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the * by requiring that the workQueue field is only accessed by the
* owning thread. * owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/ */
final ForkJoinPool pool; // the pool this thread works in final ForkJoinPool pool; // the pool this thread works in
...@@ -79,6 +86,18 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -79,6 +86,18 @@ public class ForkJoinWorkerThread extends Thread {
this.workQueue = pool.registerWorker(this); this.workQueue = pool.registerWorker(this);
} }
/**
* Version for InnocuousForkJoinWorkerThread
*/
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
AccessControlContext acc) {
super(threadGroup, null, "aForkJoinWorkerThread");
U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
eraseThreadLocals(); // clear before registering
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/** /**
* Returns the pool hosting this thread. * Returns the pool hosting this thread.
* *
...@@ -131,6 +150,7 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -131,6 +150,7 @@ public class ForkJoinWorkerThread extends Thread {
* {@link ForkJoinTask}s. * {@link ForkJoinTask}s.
*/ */
public void run() { public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null; Throwable exception = null;
try { try {
onStart(); onStart();
...@@ -148,4 +168,110 @@ public class ForkJoinWorkerThread extends Thread { ...@@ -148,4 +168,110 @@ public class ForkJoinWorkerThread extends Thread {
} }
} }
} }
}
/**
* Erases ThreadLocals by nulling out Thread maps
*/
final void eraseThreadLocals() {
U.putObject(this, THREADLOCALS, null);
U.putObject(this, INHERITABLETHREADLOCALS, null);
}
/**
* Non-public hook method for InnocuousForkJoinWorkerThread
*/
void afterTopLevelExec() {
}
// Set up to allow setting thread fields in constructor
private static final sun.misc.Unsafe U;
private static final long THREADLOCALS;
private static final long INHERITABLETHREADLOCALS;
private static final long INHERITEDACCESSCONTROLCONTEXT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
THREADLOCALS = U.objectFieldOffset
(tk.getDeclaredField("threadLocals"));
INHERITABLETHREADLOCALS = U.objectFieldOffset
(tk.getDeclaredField("inheritableThreadLocals"));
INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
(tk.getDeclaredField("inheritedAccessControlContext"));
} catch (Exception e) {
throw new Error(e);
}
}
/**
* A worker thread that has no permissions, is not a member of any
* user-defined ThreadGroup, and erases all ThreadLocals after
* running each top-level task.
*/
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
/** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
private static final ThreadGroup innocuousThreadGroup =
createThreadGroup();
/** An AccessControlContext supporting no privileges */
private static final AccessControlContext INNOCUOUS_ACC =
new AccessControlContext(
new ProtectionDomain[] {
new ProtectionDomain(null, null)
});
InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
}
@Override // to erase ThreadLocals
void afterTopLevelExec() {
eraseThreadLocals();
}
@Override // to always report system loader
public ClassLoader getContextClassLoader() {
return ClassLoader.getSystemClassLoader();
}
@Override // to silently fail
public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
@Override // paranoically
public void setContextClassLoader(ClassLoader cl) {
throw new SecurityException("setContextClassLoader");
}
/**
* Returns a new group with the system ThreadGroup (the
* topmost, parentless group) as parent. Uses Unsafe to
* traverse Thread group and ThreadGroup parent fields.
*/
private static ThreadGroup createThreadGroup() {
try {
sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
Class<?> gk = ThreadGroup.class;
long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
ThreadGroup group = (ThreadGroup)
u.getObject(Thread.currentThread(), tg);
while (group != null) {
ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
if (parent == null)
return new ThreadGroup(group,
"InnocuousForkJoinWorkerThreadGroup");
group = parent;
}
} catch (Exception e) {
throw new Error(e);
}
// fall through if null as cannot-happen safeguard
throw new Error("Cannot create ThreadGroup");
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册