From 00a90a93a1f0069a752c4730c9b03ea462748366 Mon Sep 17 00:00:00 2001 From: emcmanus Date: Fri, 12 Sep 2008 15:17:52 +0200 Subject: [PATCH] 6747411: EventClient causes thread leaks Summary: Reworked thread management in EventClient and related classes. Reviewed-by: sjiang, dfuchs --- .../com/sun/jmx/event/LeaseManager.java | 11 +- .../sun/jmx/event/RepeatedSingletonJob.java | 4 +- .../internal/ClientCommunicatorAdmin.java | 4 +- .../javax/management/event/EventClient.java | 5 +- .../management/event/FetchingEventRelay.java | 52 +++--- .../event/RMIPushEventForwarder.java | 2 +- .../management/remote/rmi/RMIConnector.java | 2 +- .../eventService/EventClientThreadTest.java | 176 ++++++++++++++++++ .../eventService/SharingThreadTest.java | 2 +- 9 files changed, 220 insertions(+), 38 deletions(-) create mode 100644 test/javax/management/eventService/EventClientThreadTest.java diff --git a/src/share/classes/com/sun/jmx/event/LeaseManager.java b/src/share/classes/com/sun/jmx/event/LeaseManager.java index cb1b88bf5..33409a06c 100644 --- a/src/share/classes/com/sun/jmx/event/LeaseManager.java +++ b/src/share/classes/com/sun/jmx/event/LeaseManager.java @@ -27,7 +27,6 @@ package com.sun.jmx.event; import com.sun.jmx.remote.util.ClassLogger; import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -115,6 +114,7 @@ public class LeaseManager { scheduled = null; } callback.run(); + executor.shutdown(); } } @@ -131,6 +131,13 @@ public class LeaseManager { logger.trace("stop", "canceling lease"); scheduled.cancel(false); scheduled = null; + try { + executor.shutdown(); + } catch (SecurityException e) { + // OK: caller doesn't have RuntimePermission("modifyThread") + // which is unlikely in reality but triggers a test failure otherwise + logger.trace("stop", "exception from executor.shutdown", e); + } } private final Runnable callback; @@ -138,7 +145,7 @@ public class LeaseManager { private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, - new DaemonThreadFactory("LeaseManager")); + new DaemonThreadFactory("JMX LeaseManager %d")); private static final ClassLogger logger = new ClassLogger("javax.management.event", "LeaseManager"); diff --git a/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java b/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java index 7de1b40e9..2fe4a3a15 100644 --- a/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java +++ b/src/share/classes/com/sun/jmx/event/RepeatedSingletonJob.java @@ -95,7 +95,9 @@ public abstract class RepeatedSingletonJob implements Runnable { executor.execute(this); } catch (RejectedExecutionException e) { logger.warning( - "setEventReceiver", "Executor threw exception", e); + "execute", + "Executor threw exception (" + this.getClass().getName() + ")", + e); throw new RejectedExecutionException( "Executor.execute threw exception -" + "should not be possible", e); diff --git a/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java b/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java index a6635aad8..f90cbc4c5 100644 --- a/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java +++ b/src/share/classes/com/sun/jmx/remote/internal/ClientCommunicatorAdmin.java @@ -32,13 +32,15 @@ import com.sun.jmx.remote.util.ClassLogger; import com.sun.jmx.remote.util.EnvHelp; public abstract class ClientCommunicatorAdmin { + private static volatile long threadNo = 1; + public ClientCommunicatorAdmin(long period) { this.period = period; if (period > 0) { checker = new Checker(); - Thread t = new Thread(checker); + Thread t = new Thread(checker, "JMX client heartbeat " + ++threadNo); t.setDaemon(true); t.start(); } else diff --git a/src/share/classes/javax/management/event/EventClient.java b/src/share/classes/javax/management/event/EventClient.java index 4b8101353..10a4df500 100644 --- a/src/share/classes/javax/management/event/EventClient.java +++ b/src/share/classes/javax/management/event/EventClient.java @@ -264,11 +264,12 @@ public class EventClient implements EventConsumer, NotificationManager { new PerThreadGroupPool.Create() { public ScheduledThreadPoolExecutor createThreadPool(ThreadGroup group) { ThreadFactory daemonThreadFactory = new DaemonThreadFactory( - "EventClient lease renewer %d"); + "JMX EventClient lease renewer %d"); ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor( 20, daemonThreadFactory); - exec.setKeepAliveTime(3, TimeUnit.SECONDS); + exec.setKeepAliveTime(1, TimeUnit.SECONDS); exec.allowCoreThreadTimeOut(true); + exec.setRemoveOnCancelPolicy(true); return exec; } }; diff --git a/src/share/classes/javax/management/event/FetchingEventRelay.java b/src/share/classes/javax/management/event/FetchingEventRelay.java index 2b65f9b12..9aa68df0f 100644 --- a/src/share/classes/javax/management/event/FetchingEventRelay.java +++ b/src/share/classes/javax/management/event/FetchingEventRelay.java @@ -31,10 +31,8 @@ import com.sun.jmx.remote.util.ClassLogger; import java.io.IOException; import java.io.NotSerializableException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.management.MBeanException; @@ -215,50 +213,47 @@ public class FetchingEventRelay implements EventRelay { this.maxNotifs = maxNotifs; if (executor == null) { - executor = Executors.newSingleThreadScheduledExecutor( + ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); - } + stpe.setKeepAliveTime(1, TimeUnit.SECONDS); + stpe.allowCoreThreadTimeOut(true); + executor = stpe; + this.defaultExecutor = stpe; + } else + this.defaultExecutor = null; this.executor = executor; - if (executor instanceof ScheduledExecutorService) - leaseScheduler = (ScheduledExecutorService) executor; - else { - leaseScheduler = Executors.newSingleThreadScheduledExecutor( - daemonThreadFactory); - } startSequenceNumber = 0; fetchingJob = new MyJob(); } - public void setEventReceiver(EventReceiver eventReceiver) { + public synchronized void setEventReceiver(EventReceiver eventReceiver) { if (logger.traceOn()) { logger.trace("setEventReceiver", ""+eventReceiver); } EventReceiver old = this.eventReceiver; - synchronized(fetchingJob) { - this.eventReceiver = eventReceiver; - if (old == null && eventReceiver != null) - fetchingJob.resume(); - } + this.eventReceiver = eventReceiver; + if (old == null && eventReceiver != null) + fetchingJob.resume(); } public String getClientId() { return clientId; } - public void stop() { + public synchronized void stop() { if (logger.traceOn()) { logger.trace("stop", ""); } - synchronized(fetchingJob) { - if (stopped) { - return; - } - - stopped = true; - clientId = null; + if (stopped) { + return; } + + stopped = true; + clientId = null; + if (defaultExecutor != null) + defaultExecutor.shutdown(); } private class MyJob extends RepeatedSingletonJob { @@ -372,10 +367,9 @@ public class FetchingEventRelay implements EventRelay { private final EventClientDelegateMBean delegate; private String clientId; private boolean stopped = false; - private volatile ScheduledFuture leaseRenewalFuture; private final Executor executor; - private final ScheduledExecutorService leaseScheduler; + private final ExecutorService defaultExecutor; private final MyJob fetchingJob; private final long timeout; @@ -385,5 +379,5 @@ public class FetchingEventRelay implements EventRelay { new ClassLogger("javax.management.event", "FetchingEventRelay"); private static final ThreadFactory daemonThreadFactory = - new DaemonThreadFactory("FetchingEventRelay-executor"); + new DaemonThreadFactory("JMX FetchingEventRelay executor %d"); } diff --git a/src/share/classes/javax/management/event/RMIPushEventForwarder.java b/src/share/classes/javax/management/event/RMIPushEventForwarder.java index 2018f98ad..751300d54 100644 --- a/src/share/classes/javax/management/event/RMIPushEventForwarder.java +++ b/src/share/classes/javax/management/event/RMIPushEventForwarder.java @@ -185,7 +185,7 @@ public class RMIPushEventForwarder implements EventForwarder { private static final ExecutorService executor = Executors.newCachedThreadPool( - new DaemonThreadFactory("RMIEventForwarder Executor")); + new DaemonThreadFactory("JMX RMIEventForwarder Executor")); private final SendingJob sendingJob = new SendingJob(); private final BlockingQueue buffer; diff --git a/src/share/classes/javax/management/remote/rmi/RMIConnector.java b/src/share/classes/javax/management/remote/rmi/RMIConnector.java index bdcbb1568..a620235ac 100644 --- a/src/share/classes/javax/management/remote/rmi/RMIConnector.java +++ b/src/share/classes/javax/management/remote/rmi/RMIConnector.java @@ -420,7 +420,7 @@ public class RMIConnector implements JMXConnector, Serializable, JMXAddressable new PerThreadGroupPool.Create() { public ThreadPoolExecutor createThreadPool(ThreadGroup group) { ThreadFactory daemonThreadFactory = new DaemonThreadFactory( - "RMIConnector listener dispatch %d"); + "JMX RMIConnector listener dispatch %d"); ThreadPoolExecutor exec = new ThreadPoolExecutor( 1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingDeque(), diff --git a/test/javax/management/eventService/EventClientThreadTest.java b/test/javax/management/eventService/EventClientThreadTest.java new file mode 100644 index 000000000..910bc9cc2 --- /dev/null +++ b/test/javax/management/eventService/EventClientThreadTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2008 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * @test + * @bug 6747411 + * @summary Check that EventClient instances don't leak threads. + * @author Eamonn McManus + */ + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.Set; +import java.util.TreeSet; +import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerDelegate; +import javax.management.MBeanServerNotification; +import javax.management.Notification; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; +import javax.management.ObjectName; +import javax.management.event.EventClient; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; + +public class EventClientThreadTest { + private static final int MAX_TIME_SECONDS = 20; + + private static final BlockingQueue queue = + new ArrayBlockingQueue(100); + + private static final NotificationListener queueListener = + new NotificationListener() { + public void handleNotification(Notification notification, + Object handback) { + queue.add(notification); + } + }; + + private static final NotificationFilter dummyFilter = + new NotificationFilter() { + public boolean isNotificationEnabled(Notification notification) { + return true; + } + }; + + public static void main(String[] args) throws Exception { + long start = System.currentTimeMillis(); + long deadline = start + MAX_TIME_SECONDS * 1000; + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://"); + JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer( + url, null, mbs); + cs.start(); + JMXServiceURL addr = cs.getAddress(); + JMXConnector cc = JMXConnectorFactory.connect(addr); + MBeanServerConnection mbsc = cc.getMBeanServerConnection(); + + ThreadMXBean threads = ManagementFactory.getThreadMXBean(); + + System.out.println("Opening and closing some EventClients..."); + // If we create a connection, then create and destroy EventClients + // over it, then close it, there should be no "JMX *" threads left. + for (int i = 0; i < 5; i++) + test(mbsc); + + cc.close(); + + showTime("opening and closing initial EventClients", start); + + Set jmxThreads = threadsMatching("JMX .*"); + while (!jmxThreads.isEmpty() && System.currentTimeMillis() < deadline) { + Set jmxThreadsNow = threadsMatching("JMX .*"); + Set gone = new TreeSet(jmxThreads); + gone.removeAll(jmxThreadsNow); + for (String s : gone) + showTime("expiry of \"" + s + "\"", start); + jmxThreads = jmxThreadsNow; + Thread.sleep(10); + } + if (System.currentTimeMillis() >= deadline) { + showThreads(threads); + throw new Exception("Timed out waiting for JMX threads to expire"); + } + + showTime("waiting for JMX threads to expire", start); + + System.out.println("TEST PASSED"); + } + + static void showThreads(ThreadMXBean threads) throws Exception { + long[] ids = threads.getAllThreadIds(); + for (long id : ids) { + ThreadInfo ti = threads.getThreadInfo(id); + String name = (ti == null) ? "(defunct)" : ti.getThreadName(); + System.out.printf("%4d %s\n", id, name); + } + } + + static void showTime(String what, long start) { + long elapsed = System.currentTimeMillis() - start; + System.out.printf("Time after %s: %.3f s\n", what, elapsed / 1000.0); + } + + static Set threadsMatching(String pattern) { + Set matching = new TreeSet(); + ThreadMXBean threads = ManagementFactory.getThreadMXBean(); + long[] ids = threads.getAllThreadIds(); + for (long id : ids) { + ThreadInfo ti = threads.getThreadInfo(id); + String name = (ti == null) ? "(defunct)" : ti.getThreadName(); + if (name.matches(pattern)) + matching.add(name); + } + return matching; + } + + static void test(MBeanServerConnection mbsc) throws Exception { + final ObjectName delegateName = MBeanServerDelegate.DELEGATE_NAME; + final ObjectName testName = new ObjectName("test:type=Test"); + EventClient ec = new EventClient(mbsc); + ec.addNotificationListener(delegateName, queueListener, null, null); + mbsc.createMBean(MBeanServerDelegate.class.getName(), testName); + mbsc.unregisterMBean(testName); + final String[] expectedTypes = { + MBeanServerNotification.REGISTRATION_NOTIFICATION, + MBeanServerNotification.UNREGISTRATION_NOTIFICATION, + }; + for (String s : expectedTypes) { + Notification n = queue.poll(3, TimeUnit.SECONDS); + if (n == null) + throw new Exception("Timed out waiting for notif: " + s); + if (!(n instanceof MBeanServerNotification)) + throw new Exception("Got notif of wrong class: " + n.getClass()); + if (!n.getType().equals(s)) { + throw new Exception("Got notif of wrong type: " + n.getType() + + " (expecting " + s + ")"); + } + } + ec.removeNotificationListener(delegateName, queueListener); + + ec.addNotificationListener(delegateName, queueListener, dummyFilter, "foo"); + ec.removeNotificationListener(delegateName, queueListener, dummyFilter, "foo"); + + ec.close(); + } +} \ No newline at end of file diff --git a/test/javax/management/eventService/SharingThreadTest.java b/test/javax/management/eventService/SharingThreadTest.java index a3d7fd37a..7339d0806 100644 --- a/test/javax/management/eventService/SharingThreadTest.java +++ b/test/javax/management/eventService/SharingThreadTest.java @@ -1,4 +1,4 @@ -/*/* +/* * Copyright 2007 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * -- GitLab