提交 3b868f40 编写于 作者: D dfuchs

6610896: JMX Monitor handles thread groups incorrectly

Reviewed-by: emcmanus
上级 498d2fd6
...@@ -33,8 +33,9 @@ import java.security.AccessControlContext; ...@@ -33,8 +33,9 @@ import java.security.AccessControlContext;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
...@@ -174,14 +175,20 @@ public abstract class Monitor ...@@ -174,14 +175,20 @@ public abstract class Monitor
new DaemonThreadFactory("Scheduler")); new DaemonThreadFactory("Scheduler"));
/** /**
* Maximum Pool Size * Map containing the thread pool executor per thread group.
*/ */
private static final int maximumPoolSize; private static final Map<ThreadPoolExecutor, Void> executors =
new WeakHashMap<ThreadPoolExecutor, Void>();
/**
* Lock for executors map.
*/
private static final Object executorsLock = new Object();
/** /**
* Executor Service. * Maximum Pool Size
*/ */
private static final ExecutorService executor; private static final int maximumPoolSize;
static { static {
final String maximumPoolSizeSysProp = "jmx.x.monitor.maximum.pool.size"; final String maximumPoolSizeSysProp = "jmx.x.monitor.maximum.pool.size";
final String maximumPoolSizeStr = AccessController.doPrivileged( final String maximumPoolSizeStr = AccessController.doPrivileged(
...@@ -211,21 +218,8 @@ public abstract class Monitor ...@@ -211,21 +218,8 @@ public abstract class Monitor
maximumPoolSize = maximumPoolSizeTmp; maximumPoolSize = maximumPoolSizeTmp;
} }
} }
executor = new ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new DaemonThreadFactory("Executor"));
((ThreadPoolExecutor)executor).allowCoreThreadTimeOut(true);
} }
/**
* Monitor task to be executed by the Executor Service.
*/
private final MonitorTask monitorTask = new MonitorTask();
/** /**
* Future associated to the current monitor task. * Future associated to the current monitor task.
*/ */
...@@ -234,7 +228,7 @@ public abstract class Monitor ...@@ -234,7 +228,7 @@ public abstract class Monitor
/** /**
* Scheduler task to be executed by the Scheduler Service. * Scheduler task to be executed by the Scheduler Service.
*/ */
private final SchedulerTask schedulerTask = new SchedulerTask(monitorTask); private final SchedulerTask schedulerTask = new SchedulerTask();
/** /**
* ScheduledFuture associated to the current scheduler task. * ScheduledFuture associated to the current scheduler task.
...@@ -720,6 +714,7 @@ public abstract class Monitor ...@@ -720,6 +714,7 @@ public abstract class Monitor
// Start the scheduler. // Start the scheduler.
// //
cleanupFutures(); cleanupFutures();
schedulerTask.setMonitorTask(new MonitorTask());
schedulerFuture = scheduler.schedule(schedulerTask, schedulerFuture = scheduler.schedule(schedulerTask,
getGranularityPeriod(), getGranularityPeriod(),
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
...@@ -1468,7 +1463,7 @@ public abstract class Monitor ...@@ -1468,7 +1463,7 @@ public abstract class Monitor
*/ */
private class SchedulerTask implements Runnable { private class SchedulerTask implements Runnable {
private Runnable task = null; private MonitorTask task;
/* /*
* ------------------------------------------ * ------------------------------------------
...@@ -1476,7 +1471,16 @@ public abstract class Monitor ...@@ -1476,7 +1471,16 @@ public abstract class Monitor
* ------------------------------------------ * ------------------------------------------
*/ */
public SchedulerTask(Runnable task) { public SchedulerTask() {
}
/*
* ------------------------------------------
* GETTERS/SETTERS
* ------------------------------------------
*/
public void setMonitorTask(MonitorTask task) {
this.task = task; this.task = task;
} }
...@@ -1488,7 +1492,7 @@ public abstract class Monitor ...@@ -1488,7 +1492,7 @@ public abstract class Monitor
public void run() { public void run() {
synchronized (Monitor.this) { synchronized (Monitor.this) {
Monitor.this.monitorFuture = executor.submit(task); Monitor.this.monitorFuture = task.submit();
} }
} }
} }
...@@ -1501,6 +1505,8 @@ public abstract class Monitor ...@@ -1501,6 +1505,8 @@ public abstract class Monitor
*/ */
private class MonitorTask implements Runnable { private class MonitorTask implements Runnable {
private ThreadPoolExecutor executor;
/* /*
* ------------------------------------------ * ------------------------------------------
* CONSTRUCTORS * CONSTRUCTORS
...@@ -1508,6 +1514,38 @@ public abstract class Monitor ...@@ -1508,6 +1514,38 @@ public abstract class Monitor
*/ */
public MonitorTask() { public MonitorTask() {
// Find out if there's already an existing executor for the calling
// thread and reuse it. Otherwise, create a new one and store it in
// the executors map. If there is a SecurityManager, the group of
// System.getSecurityManager() is used, else the group of the thread
// instantiating this MonitorTask, i.e. the group of the thread that
// calls "Monitor.start()".
SecurityManager s = System.getSecurityManager();
ThreadGroup group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
synchronized (executorsLock) {
for (ThreadPoolExecutor e : executors.keySet()) {
DaemonThreadFactory tf =
(DaemonThreadFactory) e.getThreadFactory();
ThreadGroup tg = tf.getThreadGroup();
if (tg == group) {
executor = e;
break;
}
}
if (executor == null) {
executor = new ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new DaemonThreadFactory("ThreadGroup<" +
group.getName() + "> Executor", group));
executor.allowCoreThreadTimeOut(true);
executors.put(executor, null);
}
}
} }
/* /*
...@@ -1516,6 +1554,10 @@ public abstract class Monitor ...@@ -1516,6 +1554,10 @@ public abstract class Monitor
* ------------------------------------------ * ------------------------------------------
*/ */
public Future<?> submit() {
return executor.submit(this);
}
public void run() { public void run() {
final ScheduledFuture<?> sf; final ScheduledFuture<?> sf;
synchronized (Monitor.this) { synchronized (Monitor.this) {
...@@ -1574,6 +1616,15 @@ public abstract class Monitor ...@@ -1574,6 +1616,15 @@ public abstract class Monitor
namePrefix = "JMX Monitor " + poolName + " Pool [Thread-"; namePrefix = "JMX Monitor " + poolName + " Pool [Thread-";
} }
public DaemonThreadFactory(String poolName, ThreadGroup threadGroup) {
group = threadGroup;
namePrefix = "JMX Monitor " + poolName + " Pool [Thread-";
}
public ThreadGroup getThreadGroup() {
return group;
}
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread t = new Thread(group, Thread t = new Thread(group,
r, r,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册