提交 00a90a93 编写于 作者: E emcmanus

6747411: EventClient causes thread leaks

Summary: Reworked thread management in EventClient and related classes.
Reviewed-by: sjiang, dfuchs
上级 0c38c873
......@@ -27,7 +27,6 @@ package com.sun.jmx.event;
import com.sun.jmx.remote.util.ClassLogger;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
......@@ -115,6 +114,7 @@ public class LeaseManager {
scheduled = null;
}
callback.run();
executor.shutdown();
}
}
......@@ -131,6 +131,13 @@ public class LeaseManager {
logger.trace("stop", "canceling lease");
scheduled.cancel(false);
scheduled = null;
try {
executor.shutdown();
} catch (SecurityException e) {
// OK: caller doesn't have RuntimePermission("modifyThread")
// which is unlikely in reality but triggers a test failure otherwise
logger.trace("stop", "exception from executor.shutdown", e);
}
}
private final Runnable callback;
......@@ -138,7 +145,7 @@ public class LeaseManager {
private final ScheduledExecutorService executor
= Executors.newScheduledThreadPool(1,
new DaemonThreadFactory("LeaseManager"));
new DaemonThreadFactory("JMX LeaseManager %d"));
private static final ClassLogger logger =
new ClassLogger("javax.management.event", "LeaseManager");
......
......@@ -95,7 +95,9 @@ public abstract class RepeatedSingletonJob implements Runnable {
executor.execute(this);
} catch (RejectedExecutionException e) {
logger.warning(
"setEventReceiver", "Executor threw exception", e);
"execute",
"Executor threw exception (" + this.getClass().getName() + ")",
e);
throw new RejectedExecutionException(
"Executor.execute threw exception -" +
"should not be possible", e);
......
......@@ -32,13 +32,15 @@ import com.sun.jmx.remote.util.ClassLogger;
import com.sun.jmx.remote.util.EnvHelp;
public abstract class ClientCommunicatorAdmin {
private static volatile long threadNo = 1;
public ClientCommunicatorAdmin(long period) {
this.period = period;
if (period > 0) {
checker = new Checker();
Thread t = new Thread(checker);
Thread t = new Thread(checker, "JMX client heartbeat " + ++threadNo);
t.setDaemon(true);
t.start();
} else
......
......@@ -264,11 +264,12 @@ public class EventClient implements EventConsumer, NotificationManager {
new PerThreadGroupPool.Create<ScheduledThreadPoolExecutor>() {
public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) {
ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
"EventClient lease renewer %d");
"JMX EventClient lease renewer %d");
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(
20, daemonThreadFactory);
exec.setKeepAliveTime(3, TimeUnit.SECONDS);
exec.setKeepAliveTime(1, TimeUnit.SECONDS);
exec.allowCoreThreadTimeOut(true);
exec.setRemoveOnCancelPolicy(true);
return exec;
}
};
......
......@@ -31,10 +31,8 @@ import com.sun.jmx.remote.util.ClassLogger;
import java.io.IOException;
import java.io.NotSerializableException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanException;
......@@ -215,50 +213,47 @@ public class FetchingEventRelay implements EventRelay {
this.maxNotifs = maxNotifs;
if (executor == null) {
executor = Executors.newSingleThreadScheduledExecutor(
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1,
daemonThreadFactory);
}
stpe.setKeepAliveTime(1, TimeUnit.SECONDS);
stpe.allowCoreThreadTimeOut(true);
executor = stpe;
this.defaultExecutor = stpe;
} else
this.defaultExecutor = null;
this.executor = executor;
if (executor instanceof ScheduledExecutorService)
leaseScheduler = (ScheduledExecutorService) executor;
else {
leaseScheduler = Executors.newSingleThreadScheduledExecutor(
daemonThreadFactory);
}
startSequenceNumber = 0;
fetchingJob = new MyJob();
}
public void setEventReceiver(EventReceiver eventReceiver) {
public synchronized void setEventReceiver(EventReceiver eventReceiver) {
if (logger.traceOn()) {
logger.trace("setEventReceiver", ""+eventReceiver);
}
EventReceiver old = this.eventReceiver;
synchronized(fetchingJob) {
this.eventReceiver = eventReceiver;
if (old == null && eventReceiver != null)
fetchingJob.resume();
}
this.eventReceiver = eventReceiver;
if (old == null && eventReceiver != null)
fetchingJob.resume();
}
public String getClientId() {
return clientId;
}
public void stop() {
public synchronized void stop() {
if (logger.traceOn()) {
logger.trace("stop", "");
}
synchronized(fetchingJob) {
if (stopped) {
return;
}
stopped = true;
clientId = null;
if (stopped) {
return;
}
stopped = true;
clientId = null;
if (defaultExecutor != null)
defaultExecutor.shutdown();
}
private class MyJob extends RepeatedSingletonJob {
......@@ -372,10 +367,9 @@ public class FetchingEventRelay implements EventRelay {
private final EventClientDelegateMBean delegate;
private String clientId;
private boolean stopped = false;
private volatile ScheduledFuture<?> leaseRenewalFuture;
private final Executor executor;
private final ScheduledExecutorService leaseScheduler;
private final ExecutorService defaultExecutor;
private final MyJob fetchingJob;
private final long timeout;
......@@ -385,5 +379,5 @@ public class FetchingEventRelay implements EventRelay {
new ClassLogger("javax.management.event",
"FetchingEventRelay");
private static final ThreadFactory daemonThreadFactory =
new DaemonThreadFactory("FetchingEventRelay-executor");
new DaemonThreadFactory("JMX FetchingEventRelay executor %d");
}
......@@ -185,7 +185,7 @@ public class RMIPushEventForwarder implements EventForwarder {
private static final ExecutorService executor =
Executors.newCachedThreadPool(
new DaemonThreadFactory("RMIEventForwarder Executor"));
new DaemonThreadFactory("JMX RMIEventForwarder Executor"));
private final SendingJob sendingJob = new SendingJob();
private final BlockingQueue<TargetedNotification> buffer;
......
......@@ -420,7 +420,7 @@ public class RMIConnector implements JMXConnector, Serializable, JMXAddressable
new PerThreadGroupPool.Create<ThreadPoolExecutor>() {
public ThreadPoolExecutor createThreadPool(ThreadGroup group) {
ThreadFactory daemonThreadFactory = new DaemonThreadFactory(
"RMIConnector listener dispatch %d");
"JMX RMIConnector listener dispatch %d");
ThreadPoolExecutor exec = new ThreadPoolExecutor(
1, 10, 1, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(),
......
/*
* Copyright 2008 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
/*
* @test
* @bug 6747411
* @summary Check that EventClient instances don't leak threads.
* @author Eamonn McManus
*/
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import java.util.TreeSet;
import javax.management.MBeanServer;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerDelegate;
import javax.management.MBeanServerNotification;
import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.event.EventClient;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
public class EventClientThreadTest {
private static final int MAX_TIME_SECONDS = 20;
private static final BlockingQueue<Notification> queue =
new ArrayBlockingQueue(100);
private static final NotificationListener queueListener =
new NotificationListener() {
public void handleNotification(Notification notification,
Object handback) {
queue.add(notification);
}
};
private static final NotificationFilter dummyFilter =
new NotificationFilter() {
public boolean isNotificationEnabled(Notification notification) {
return true;
}
};
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
long deadline = start + MAX_TIME_SECONDS * 1000;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://");
JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
url, null, mbs);
cs.start();
JMXServiceURL addr = cs.getAddress();
JMXConnector cc = JMXConnectorFactory.connect(addr);
MBeanServerConnection mbsc = cc.getMBeanServerConnection();
ThreadMXBean threads = ManagementFactory.getThreadMXBean();
System.out.println("Opening and closing some EventClients...");
// If we create a connection, then create and destroy EventClients
// over it, then close it, there should be no "JMX *" threads left.
for (int i = 0; i < 5; i++)
test(mbsc);
cc.close();
showTime("opening and closing initial EventClients", start);
Set<String> jmxThreads = threadsMatching("JMX .*");
while (!jmxThreads.isEmpty() && System.currentTimeMillis() < deadline) {
Set<String> jmxThreadsNow = threadsMatching("JMX .*");
Set<String> gone = new TreeSet<String>(jmxThreads);
gone.removeAll(jmxThreadsNow);
for (String s : gone)
showTime("expiry of \"" + s + "\"", start);
jmxThreads = jmxThreadsNow;
Thread.sleep(10);
}
if (System.currentTimeMillis() >= deadline) {
showThreads(threads);
throw new Exception("Timed out waiting for JMX threads to expire");
}
showTime("waiting for JMX threads to expire", start);
System.out.println("TEST PASSED");
}
static void showThreads(ThreadMXBean threads) throws Exception {
long[] ids = threads.getAllThreadIds();
for (long id : ids) {
ThreadInfo ti = threads.getThreadInfo(id);
String name = (ti == null) ? "(defunct)" : ti.getThreadName();
System.out.printf("%4d %s\n", id, name);
}
}
static void showTime(String what, long start) {
long elapsed = System.currentTimeMillis() - start;
System.out.printf("Time after %s: %.3f s\n", what, elapsed / 1000.0);
}
static Set<String> threadsMatching(String pattern) {
Set<String> matching = new TreeSet<String>();
ThreadMXBean threads = ManagementFactory.getThreadMXBean();
long[] ids = threads.getAllThreadIds();
for (long id : ids) {
ThreadInfo ti = threads.getThreadInfo(id);
String name = (ti == null) ? "(defunct)" : ti.getThreadName();
if (name.matches(pattern))
matching.add(name);
}
return matching;
}
static void test(MBeanServerConnection mbsc) throws Exception {
final ObjectName delegateName = MBeanServerDelegate.DELEGATE_NAME;
final ObjectName testName = new ObjectName("test:type=Test");
EventClient ec = new EventClient(mbsc);
ec.addNotificationListener(delegateName, queueListener, null, null);
mbsc.createMBean(MBeanServerDelegate.class.getName(), testName);
mbsc.unregisterMBean(testName);
final String[] expectedTypes = {
MBeanServerNotification.REGISTRATION_NOTIFICATION,
MBeanServerNotification.UNREGISTRATION_NOTIFICATION,
};
for (String s : expectedTypes) {
Notification n = queue.poll(3, TimeUnit.SECONDS);
if (n == null)
throw new Exception("Timed out waiting for notif: " + s);
if (!(n instanceof MBeanServerNotification))
throw new Exception("Got notif of wrong class: " + n.getClass());
if (!n.getType().equals(s)) {
throw new Exception("Got notif of wrong type: " + n.getType() +
" (expecting " + s + ")");
}
}
ec.removeNotificationListener(delegateName, queueListener);
ec.addNotificationListener(delegateName, queueListener, dummyFilter, "foo");
ec.removeNotificationListener(delegateName, queueListener, dummyFilter, "foo");
ec.close();
}
}
\ No newline at end of file
/*/*
/*
* Copyright 2007 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册