/* * Copyright 2007-2008 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, * CA 95054 USA or visit www.sun.com if you need additional information or * have any questions. */ /* * @test ReconnectableJMXConnector * @bug 5108776 * @summary Check that the Event Service can be used to build a * ReconnectableJMXConnector. * @author Eamonn McManus */ import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Date; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.management.ListenerNotFoundException; import javax.management.MBeanServer; import javax.management.MBeanServerConnection; import javax.management.MBeanServerFactory; import javax.management.Notification; import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.ObjectName; import javax.management.event.EventClient; import javax.management.remote.JMXConnectionNotification; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; import javax.management.remote.JMXServiceURL; import javax.security.auth.Subject; /* * This test checks that it is possible to use the Event Service to create * a "reconnectable connector". * * In the JMX Remote API, we deliberately specified that a connector client * (JMXConnector) that encounters a network failure is then permanently broken. * The idea being that adding recovery logic to the basic connector client * would make it much more complicated and less reliable, and the logic would * in any case never correspond to what a given situation needs. Some of * the tough questions are: Should the connector try to mask the failure by * blocking operations until the failure is resolved? How long should the * connector try to reestablish the connection before giving up? Rather than * try to solve this problem in the connector, we suggested that people who * wanted to recover from network failures could implement the JMXConnector * interface themselves so that it forwards to a wrapped JMXConnector that can * be replaced in case of network failure. * * This works fine except that the connector client has state, * in the form of listeners added by the user through the * MBeanServerConnection.addNotificationListener method. It's possible * for the wrapper to keep track of these listeners as well as forwarding * them to the wrapped JMXConnector, so that it can reapply them to * a replacement JMXConnector after failure recover. But it's quite * tricky, particularly because of the two- and four-argument versions of * removeNotificationListener. * * The Event Service can take care of this for you through the EventClient * class. Listeners added through that class are implemented in a way that * doesn't require the connector client to maintain any state, so they should * continue to work transparently after replacing the wrapped JMXConnector. * This test is a proof of concept that shows it works. Quite a number of * details would need to be changed to build a reliable reconnectable * connector. * * The test simulates network failure by rewrapping the wrapped JMXConnector's * MBeanServerConnection (MBSC) in a "breakable" MBSC which we can cause * to stop working. We do this in two phases. The first phase suspends * any MBSC calls just at the point where they would return to the caller. * The goal here is to block an EventClientDelegateMBean.fetchNotifications * operation when it has received notifications but not yet delivered them * to the EventClient. This is the most delicate point where a breakage * can occur, because the EventClientDelegate must not drop those notifs * from its buffer until another fetchNotifs call arrives with a later * sequence number (which is an implicit ack of the previous set of * notifs). Once the fetchNotifs call is suspended, we "kill" the MBSC, * causing it to throw IOException from this and any other calls. That * triggers the reconnect logic, which will make a new MBSC and issue * the same fetchNotifs call to it. * * The test could be improved by synchronizing explicitly between the * breakable MBSC and the mainline, so we only proceed to kill the MBSC * when we are sure that the fetchNotifs call is blocked. As it is, * we have a small delay which both ensures that no notifs are delivered * while the connection is suspended, and if the machine is fast enough * allows the fetchNotifs call to reach the blocking point. */ public class ReconnectableConnectorTest { private static class ReconnectableJMXConnector implements JMXConnector { private final JMXServiceURL url; private AtomicReference wrappedJMXC = new AtomicReference(); private AtomicReference wrappedMBSC = new AtomicReference(); private final NotificationBroadcasterSupport broadcaster = new NotificationBroadcasterSupport(); private final Lock connectLock = new ReentrantLock(); ReconnectableJMXConnector(JMXServiceURL url) { this.url = url; } private class ReconnectIH implements InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { return method.invoke(wrappedMBSC.get(), args); } catch (InvocationTargetException e) { if (e.getCause() instanceof IOException) { connect(); try { return method.invoke(wrappedMBSC.get(),args); } catch (InvocationTargetException ee) { throw ee.getCause(); } } throw e.getCause(); } } } private class FailureListener implements NotificationListener { public void handleNotification(Notification n, Object h) { String type = n.getType(); if (type.equals(JMXConnectionNotification.FAILED)) { try { connect(); } catch (IOException e) { broadcaster.sendNotification(n); } } else if (type.equals(JMXConnectionNotification.NOTIFS_LOST)) broadcaster.sendNotification(n); } } public void connect() throws IOException { connectLock.lock(); try { connectWithLock(); } finally { connectLock.unlock(); } } private void connectWithLock() throws IOException { MBeanServerConnection mbsc = wrappedMBSC.get(); if (mbsc != null) { try { mbsc.getDefaultDomain(); return; // the connection works } catch (IOException e) { // OK: the connection doesn't work, so make a new one } } // This is where we would need to add the fancy logic that // allows the connection to keep failing for a while // before giving up. JMXConnector jmxc = JMXConnectorFactory.connect(url); jmxc.addConnectionNotificationListener( new FailureListener(), null, null); wrappedJMXC.set(jmxc); if (false) wrappedMBSC.set(jmxc.getMBeanServerConnection()); else { mbsc = jmxc.getMBeanServerConnection(); InvocationHandler ih = new BreakableIH(mbsc); mbsc = (MBeanServerConnection) Proxy.newProxyInstance( MBeanServerConnection.class.getClassLoader(), new Class[] {MBeanServerConnection.class}, ih); wrappedMBSC.set(mbsc); } } private BreakableIH breakableIH() { MBeanServerConnection mbsc = wrappedMBSC.get(); return (BreakableIH) Proxy.getInvocationHandler(mbsc); } void suspend() { BreakableIH ih = breakableIH(); ih.suspend(); } void kill() throws IOException { BreakableIH ih = breakableIH(); wrappedJMXC.get().close(); ih.kill(); } public void connect(Map env) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } private final AtomicReference mbscRef = new AtomicReference(); public MBeanServerConnection getMBeanServerConnection() throws IOException { connect(); // Synchro here is not strictly correct: two threads could make // an MBSC at the same time. OK for a test but beware for real // code. MBeanServerConnection mbsc = mbscRef.get(); if (mbsc != null) return mbsc; mbsc = (MBeanServerConnection) Proxy.newProxyInstance( MBeanServerConnection.class.getClassLoader(), new Class[] {MBeanServerConnection.class}, new ReconnectIH()); mbsc = EventClient.getEventClientConnection(mbsc); mbscRef.set(mbsc); return mbsc; } public MBeanServerConnection getMBeanServerConnection( Subject delegationSubject) throws IOException { throw new UnsupportedOperationException("Not supported yet."); } public void close() throws IOException { wrappedJMXC.get().close(); } public void addConnectionNotificationListener( NotificationListener l, NotificationFilter f, Object h) { broadcaster.addNotificationListener(l, f, h); } public void removeConnectionNotificationListener(NotificationListener l) throws ListenerNotFoundException { broadcaster.removeNotificationListener(l); } public void removeConnectionNotificationListener( NotificationListener l, NotificationFilter f, Object h) throws ListenerNotFoundException { broadcaster.removeNotificationListener(l, f, h); } public String getConnectionId() throws IOException { return wrappedJMXC.get().getConnectionId(); } } // InvocationHandler that allows us to perform a two-phase "break" of // an object. The first phase suspends the object, so that calls to // it are blocked just before they return. The second phase unblocks // suspended threads and causes them to throw IOException. private static class BreakableIH implements InvocationHandler { private final Object wrapped; private final Holder state = new Holder("running"); BreakableIH(Object wrapped) { this.wrapped = wrapped; } void suspend() { state.set("suspended"); } void kill() { state.set("killed"); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object result; try { result = method.invoke(wrapped, args); } catch (InvocationTargetException e) { throw e.getCause(); } String s = state.get(); if (s.equals("suspended")) state.waitUntilEqual("killed", 3, TimeUnit.SECONDS); else if (s.equals("killed")) throw new IOException("Broken"); return result; } } private static class Holder { private T held; private Lock lock = new ReentrantLock(); private Condition changed = lock.newCondition(); Holder(T value) { lock.lock(); this.held = value; lock.unlock(); } void waitUntilEqual(T value, long timeout, TimeUnit units) throws InterruptedException { long millis = units.toMillis(timeout); long stop = System.currentTimeMillis() + millis; Date stopDate = new Date(stop); lock.lock(); try { while (!value.equals(held)) { boolean ok = changed.awaitUntil(stopDate); if (!ok) throw new InterruptedException("Timed out"); } } finally { lock.unlock(); } } void set(T value) { lock.lock(); try { held = value; changed.signalAll(); } finally { lock.unlock(); } } T get() { lock.lock(); try { return held; } finally { lock.unlock(); } } } private static class StoreListener implements NotificationListener { final BlockingQueue queue = new ArrayBlockingQueue(100); public void handleNotification(Notification n, Object h) { queue.add(n); } Notification nextNotification(long time, TimeUnit units) throws InterruptedException { Notification n = queue.poll(time, units); if (n == null) throw new NoSuchElementException("Notification wait timed out"); return n; } int notifCount() { return queue.size(); } } public static interface SenderMBean {} public static class Sender extends NotificationBroadcasterSupport implements SenderMBean { private AtomicLong seqNo = new AtomicLong(0); void send() { Notification n = new Notification("type", this, seqNo.getAndIncrement()); sendNotification(n); } } public static void main(String[] args) throws Exception { MBeanServer mbs = MBeanServerFactory.newMBeanServer(); Sender sender = new Sender(); ObjectName name = new ObjectName("a:b=c"); mbs.registerMBean(sender, name); System.out.println("Creating connector server"); JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///"); JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer( url, null, mbs); cs.start(); StoreListener csListener = new StoreListener(); cs.addNotificationListener(csListener, null, null); System.out.println("Creating reconnectable client"); JMXServiceURL addr = cs.getAddress(); ReconnectableJMXConnector cc = new ReconnectableJMXConnector(addr); MBeanServerConnection mbsc = cc.getMBeanServerConnection(); System.out.println("Checking server has sent new-client notif"); Notification csn = csListener.nextNotification(1, TimeUnit.SECONDS); assertEquals("CS notif type", JMXConnectionNotification.OPENED, csn.getType()); StoreListener listener = new StoreListener(); mbsc.addNotificationListener(name, listener, null, null); System.out.println("Sending 10 notifs and checking they are received"); for (int i = 0; i < 10; i++) sender.send(); checkNotifs(listener, 0, 10); System.out.println("Suspending the fetchNotifs operation"); cc.suspend(); System.out.println("Sending a notif while fetchNotifs is suspended"); sender.send(); System.out.println("Brief wait before checking no notif is received"); Thread.sleep(2); // dumpThreads(); assertEquals("notif queue while connector suspended", 0, listener.notifCount()); assertEquals("connector server notif queue while connector suspended", 0, csListener.notifCount()); System.out.println("Breaking the connection so fetchNotifs will fail over"); cc.kill(); System.out.println("Checking that client has reconnected"); csn = csListener.nextNotification(1, TimeUnit.SECONDS); assertEquals("First CS notif type after kill", JMXConnectionNotification.CLOSED, csn.getType()); csn = csListener.nextNotification(1, TimeUnit.SECONDS); assertEquals("Second CS notif type after kill", JMXConnectionNotification.OPENED, csn.getType()); System.out.println("Checking that suspended notif has been received"); checkNotifs(listener, 10, 11); } private static void checkNotifs( StoreListener sl, long start, long stop) throws Exception { for (long i = start; i < stop; i++) { Notification n = sl.nextNotification(1, TimeUnit.SECONDS); assertEquals("received sequence number", i, n.getSequenceNumber()); } } private static void assertEquals(String what, Object expect, Object actual) throws Exception { if (!expect.equals(actual)) { fail(what + " should be " + expect + " but is " + actual); } } private static void fail(String why) throws Exception { throw new Exception("TEST FAILED: " + why); } private static void dumpThreads() { System.out.println("Thread stack dump"); Map traces = Thread.getAllStackTraces(); for (Map.Entry entry : traces.entrySet()) { Thread t = entry.getKey(); System.out.println("===Thread " + t.getName() + "==="); for (StackTraceElement ste : entry.getValue()) System.out.println(" " + ste); } } }