提交 198bb3a6 编写于 作者: D dfuchs

Merge

......@@ -22,7 +22,6 @@
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
package com.sun.jmx.remote.internal;
import java.io.IOException;
......@@ -34,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import javax.security.auth.Subject;
......@@ -54,6 +54,9 @@ import com.sun.jmx.remote.util.EnvHelp;
public abstract class ClientNotifForwarder {
private final AccessControlContext acc;
public ClientNotifForwarder(Map env) {
this(null, env);
}
......@@ -87,6 +90,8 @@ public abstract class ClientNotifForwarder {
this.command = command;
if (thread == null) {
thread = new Thread() {
@Override
public void run() {
while (true) {
Runnable r;
......@@ -130,6 +135,7 @@ public abstract class ClientNotifForwarder {
this.defaultClassLoader = defaultClassLoader;
this.executor = ex;
this.acc = AccessController.getContext();
}
/**
......@@ -390,28 +396,85 @@ public abstract class ClientNotifForwarder {
setState(TERMINATED);
}
// -------------------------------------------------
// private classes
// -------------------------------------------------
// -------------------------------------------------
// private classes
// -------------------------------------------------
//
private class NotifFetcher implements Runnable {
private volatile boolean alreadyLogged = false;
private void logOnce(String msg, SecurityException x) {
if (alreadyLogged) return;
// Log only once.
logger.config("setContextClassLoader",msg);
if (x != null) logger.fine("setContextClassLoader", x);
alreadyLogged = true;
}
// Set new context class loader, returns previous one.
private final ClassLoader setContextClassLoader(final ClassLoader loader) {
final AccessControlContext ctxt = ClientNotifForwarder.this.acc;
// if ctxt is null, log a config message and throw a
// SecurityException.
if (ctxt == null) {
logOnce("AccessControlContext must not be null.",null);
throw new SecurityException("AccessControlContext must not be null");
}
return AccessController.doPrivileged(
new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
try {
// get context class loader - may throw
// SecurityException - though unlikely.
final ClassLoader previous =
Thread.currentThread().getContextClassLoader();
// if nothing needs to be done, break here...
if (loader == previous) return previous;
// reset context class loader - may throw
// SecurityException
Thread.currentThread().setContextClassLoader(loader);
return previous;
} catch (SecurityException x) {
logOnce("Permission to set ContextClassLoader missing. " +
"Notifications will not be dispatched. " +
"Please check your Java policy configuration: " +
x, x);
throw x;
}
}
}, ctxt);
}
public void run() {
final ClassLoader previous;
if (defaultClassLoader != null) {
previous = setContextClassLoader(defaultClassLoader);
} else {
previous = null;
}
try {
doRun();
} finally {
if (defaultClassLoader != null) {
setContextClassLoader(previous);
}
}
}
private void doRun() {
synchronized (ClientNotifForwarder.this) {
currentFetchThread = Thread.currentThread();
if (state == STARTING)
if (state == STARTING) {
setState(STARTED);
}
}
if (defaultClassLoader != null) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().
setContextClassLoader(defaultClassLoader);
return null;
}
});
}
NotificationResult nr = null;
if (!shouldStop() && (nr = fetchNotifs()) != null) {
......@@ -444,8 +507,9 @@ public abstract class ClientNotifForwarder {
// check if an mbean unregistration notif
if (!listenerID.equals(mbeanRemovedNotifID)) {
final ClientListenerInfo li = infoList.get(listenerID);
if (li != null)
listeners.put(listenerID,li);
if (li != null) {
listeners.put(listenerID, li);
}
continue;
}
final Notification notif = tn.getNotification();
......@@ -786,9 +850,7 @@ public abstract class ClientNotifForwarder {
private long clientSequenceNumber = -1;
private final int maxNotifications;
private final long timeout;
private Integer mbeanRemovedNotifID = null;
private Thread currentFetchThread;
// state
......
......@@ -596,7 +596,7 @@ public class CounterMonitor extends Monitor implements CounterMonitorMBean {
* types sent by the counter monitor.
*/
public MBeanNotificationInfo[] getNotificationInfo() {
return notifsInfo;
return notifsInfo.clone();
}
/*
......
......@@ -478,7 +478,7 @@ public class GaugeMonitor extends Monitor implements GaugeMonitorMBean {
* types sent by the gauge monitor.
*/
public MBeanNotificationInfo[] getNotificationInfo() {
return notifsInfo;
return notifsInfo.clone();
}
/*
......
......@@ -32,9 +32,11 @@ import java.io.IOException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -164,7 +166,10 @@ public abstract class Monitor
/**
* AccessControlContext of the Monitor.start() caller.
*/
private AccessControlContext acc;
private static final AccessControlContext noPermissionsACC =
new AccessControlContext(
new ProtectionDomain[] {new ProtectionDomain(null, null)});
private volatile AccessControlContext acc = noPermissionsACC;
/**
* Scheduler Service.
......@@ -174,14 +179,20 @@ public abstract class Monitor
new DaemonThreadFactory("Scheduler"));
/**
* Maximum Pool Size
* Map containing the thread pool executor per thread group.
*/
private static final int maximumPoolSize;
private static final Map<ThreadPoolExecutor, Void> executors =
new WeakHashMap<ThreadPoolExecutor, Void>();
/**
* Lock for executors map.
*/
private static final Object executorsLock = new Object();
/**
* Executor Service.
* Maximum Pool Size
*/
private static final ExecutorService executor;
private static final int maximumPoolSize;
static {
final String maximumPoolSizeSysProp = "jmx.x.monitor.maximum.pool.size";
final String maximumPoolSizeStr = AccessController.doPrivileged(
......@@ -211,21 +222,8 @@ public abstract class Monitor
maximumPoolSize = maximumPoolSizeTmp;
}
}
executor = new ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new DaemonThreadFactory("Executor"));
((ThreadPoolExecutor)executor).allowCoreThreadTimeOut(true);
}
/**
* Monitor task to be executed by the Executor Service.
*/
private final MonitorTask monitorTask = new MonitorTask();
/**
* Future associated to the current monitor task.
*/
......@@ -234,7 +232,7 @@ public abstract class Monitor
/**
* Scheduler task to be executed by the Scheduler Service.
*/
private final SchedulerTask schedulerTask = new SchedulerTask(monitorTask);
private final SchedulerTask schedulerTask = new SchedulerTask();
/**
* ScheduledFuture associated to the current scheduler task.
......@@ -720,6 +718,7 @@ public abstract class Monitor
// Start the scheduler.
//
cleanupFutures();
schedulerTask.setMonitorTask(new MonitorTask());
schedulerFuture = scheduler.schedule(schedulerTask,
getGranularityPeriod(),
TimeUnit.MILLISECONDS);
......@@ -749,7 +748,7 @@ public abstract class Monitor
// Reset the AccessControlContext.
//
acc = null;
acc = noPermissionsACC;
// Reset the complex type attribute information
// such that it is recalculated again.
......@@ -1468,7 +1467,7 @@ public abstract class Monitor
*/
private class SchedulerTask implements Runnable {
private Runnable task = null;
private MonitorTask task;
/*
* ------------------------------------------
......@@ -1476,7 +1475,16 @@ public abstract class Monitor
* ------------------------------------------
*/
public SchedulerTask(Runnable task) {
public SchedulerTask() {
}
/*
* ------------------------------------------
* GETTERS/SETTERS
* ------------------------------------------
*/
public void setMonitorTask(MonitorTask task) {
this.task = task;
}
......@@ -1488,7 +1496,7 @@ public abstract class Monitor
public void run() {
synchronized (Monitor.this) {
Monitor.this.monitorFuture = executor.submit(task);
Monitor.this.monitorFuture = task.submit();
}
}
}
......@@ -1501,6 +1509,8 @@ public abstract class Monitor
*/
private class MonitorTask implements Runnable {
private ThreadPoolExecutor executor;
/*
* ------------------------------------------
* CONSTRUCTORS
......@@ -1508,6 +1518,38 @@ public abstract class Monitor
*/
public MonitorTask() {
// Find out if there's already an existing executor for the calling
// thread and reuse it. Otherwise, create a new one and store it in
// the executors map. If there is a SecurityManager, the group of
// System.getSecurityManager() is used, else the group of the thread
// instantiating this MonitorTask, i.e. the group of the thread that
// calls "Monitor.start()".
SecurityManager s = System.getSecurityManager();
ThreadGroup group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
synchronized (executorsLock) {
for (ThreadPoolExecutor e : executors.keySet()) {
DaemonThreadFactory tf =
(DaemonThreadFactory) e.getThreadFactory();
ThreadGroup tg = tf.getThreadGroup();
if (tg == group) {
executor = e;
break;
}
}
if (executor == null) {
executor = new ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new DaemonThreadFactory("ThreadGroup<" +
group.getName() + "> Executor", group));
executor.allowCoreThreadTimeOut(true);
executors.put(executor, null);
}
}
}
/*
......@@ -1516,12 +1558,18 @@ public abstract class Monitor
* ------------------------------------------
*/
public Future<?> submit() {
return executor.submit(this);
}
public void run() {
final ScheduledFuture<?> sf;
final AccessControlContext ac;
synchronized (Monitor.this) {
sf = Monitor.this.schedulerFuture;
ac = Monitor.this.acc;
}
AccessController.doPrivileged(new PrivilegedAction<Void>() {
PrivilegedAction<Void> action = new PrivilegedAction<Void>() {
public Void run() {
if (Monitor.this.isActive()) {
final int an[] = alreadyNotifieds;
......@@ -1534,7 +1582,11 @@ public abstract class Monitor
}
return null;
}
}, Monitor.this.acc);
};
if (ac == null) {
throw new SecurityException("AccessControlContext cannot be null");
}
AccessController.doPrivileged(action, ac);
synchronized (Monitor.this) {
if (Monitor.this.isActive() &&
Monitor.this.schedulerFuture == sf) {
......@@ -1574,6 +1626,15 @@ public abstract class Monitor
namePrefix = "JMX Monitor " + poolName + " Pool [Thread-";
}
public DaemonThreadFactory(String poolName, ThreadGroup threadGroup) {
group = threadGroup;
namePrefix = "JMX Monitor " + poolName + " Pool [Thread-";
}
public ThreadGroup getThreadGroup() {
return group;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group,
r,
......
......@@ -184,6 +184,7 @@ public class StringMonitor extends Monitor implements StringMonitorMBean {
* @return The derived gauge of the specified object.
*
*/
@Override
public synchronized String getDerivedGauge(ObjectName object) {
return (String) super.getDerivedGauge(object);
}
......@@ -199,6 +200,7 @@ public class StringMonitor extends Monitor implements StringMonitorMBean {
* @return The derived gauge timestamp of the specified object.
*
*/
@Override
public synchronized long getDerivedGaugeTimeStamp(ObjectName object) {
return super.getDerivedGaugeTimeStamp(object);
}
......@@ -341,8 +343,9 @@ public class StringMonitor extends Monitor implements StringMonitorMBean {
* the Java class of the notification and the notification types sent by
* the string monitor.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo() {
return notifsInfo;
return notifsInfo.clone();
}
/*
......
......@@ -52,6 +52,9 @@ class Request {
os = rawout;
do {
startLine = readLine();
if (startLine == null) {
return;
}
/* skip blank lines */
} while (startLine.equals (""));
}
......
......@@ -441,6 +441,7 @@ class ServerImpl implements TimeSource {
rawin = sslStreams.getInputStream();
rawout = sslStreams.getOutputStream();
engine = sslStreams.getSSLEngine();
connection.sslStreams = sslStreams;
} else {
rawin = new BufferedInputStream(
new Request.ReadStream (
......@@ -450,6 +451,8 @@ class ServerImpl implements TimeSource {
ServerImpl.this, chan
);
}
connection.raw = rawin;
connection.rawout = rawout;
}
Request req = new Request (rawin, rawout);
requestLine = req.requestLine();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册