/* * Copyright 2007 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Sun designates this * particular file as subject to the "Classpath" exception as provided * by Sun in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, * CA 95054 USA or visit www.sun.com if you need additional information or * have any questions. */ package javax.management.event; import com.sun.jmx.event.DaemonThreadFactory; import com.sun.jmx.event.RepeatedSingletonJob; import com.sun.jmx.remote.util.ClassLogger; import java.io.IOException; import java.io.NotSerializableException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.management.MBeanException; import javax.management.remote.NotificationResult; /** * This class is an implementation of the {@link EventRelay} interface. It calls * {@link EventClientDelegateMBean#fetchNotifications * fetchNotifications(String, long, int, long)} to get * notifications and then forwards them to an {@link EventReceiver} object. * * @since JMX 2.0 */ public class FetchingEventRelay implements EventRelay { /** * The default buffer size: {@value #DEFAULT_BUFFER_SIZE}. */ public final static int DEFAULT_BUFFER_SIZE = 1000; /** * The default waiting timeout: {@value #DEFAULT_WAITING_TIMEOUT} * in millseconds when fetching notifications from * an {@code EventClientDelegateMBean}. */ public final static long DEFAULT_WAITING_TIMEOUT = 60000; /** * The default maximum notifications to fetch every time: * {@value #DEFAULT_MAX_NOTIFICATIONS}. */ public final static int DEFAULT_MAX_NOTIFICATIONS = DEFAULT_BUFFER_SIZE; /** * Constructs a default {@code FetchingEventRelay} object by using the default * configuration: {@code DEFAULT_BUFFER_SIZE}, {@code DEFAULT_WAITING_TIMEOUT} * {@code DEFAULT_MAX_NOTIFICATIONS}. A single thread is created * to do fetching. * * @param delegate The {@code EventClientDelegateMBean} to work with. * @throws IOException If failed to work with the {@code delegate}. * @throws MBeanException if unable to add a client to the remote * {@code EventClientDelegateMBean} (see {@link * EventClientDelegateMBean#addClient(String, Object[], String[]) * EventClientDelegateMBean.addClient}). * @throws IllegalArgumentException If {@code delegate} is {@code null}. */ public FetchingEventRelay(EventClientDelegateMBean delegate) throws IOException, MBeanException { this(delegate, null); } /** * Constructs a {@code FetchingEventRelay} object by using the default * configuration: {@code DEFAULT_BUFFER_SIZE}, {@code DEFAULT_WAITING_TIMEOUT} * {@code DEFAULT_MAX_NOTIFICATIONS}, with a user-specific executor to do * the fetching. * * @param delegate The {@code EventClientDelegateMBean} to work with. * @param executor Used to do the fetching. A new thread is created if * {@code null}. * @throws IOException If failed to work with the {@code delegate}. * @throws MBeanException if unable to add a client to the remote * {@code EventClientDelegateMBean} (see {@link * EventClientDelegateMBean#addClient(String, Object[], String[]) * EventClientDelegateMBean.addClient}). * @throws IllegalArgumentException If {@code delegate} is {@code null}. */ public FetchingEventRelay(EventClientDelegateMBean delegate, Executor executor) throws IOException, MBeanException { this(delegate, DEFAULT_BUFFER_SIZE, DEFAULT_WAITING_TIMEOUT, DEFAULT_MAX_NOTIFICATIONS, executor); } /** * Constructs a {@code FetchingEventRelay} object with user-specific * configuration and executor to fetch notifications via the * {@link EventClientDelegateMBean}. * * @param delegate The {@code EventClientDelegateMBean} to work with. * @param bufferSize The buffer size for saving notifications in * {@link EventClientDelegateMBean} before they are fetched. * @param timeout The waiting time in millseconds when fetching * notifications from an {@code EventClientDelegateMBean}. * @param maxNotifs The maximum notifications to fetch every time. * @param executor Used to do the fetching. A new thread is created if * {@code null}. * @throws IOException if failed to communicate with the {@code delegate}. * @throws MBeanException if unable to add a client to the remote * {@code EventClientDelegateMBean} (see {@link * EventClientDelegateMBean#addClient(String, Object[], String[]) * EventClientDelegateMBean.addClient}). * @throws IllegalArgumentException If {@code delegate} is {@code null}. */ public FetchingEventRelay(EventClientDelegateMBean delegate, int bufferSize, long timeout, int maxNotifs, Executor executor) throws IOException, MBeanException { this(delegate, bufferSize, timeout, maxNotifs, executor, FetchingEventForwarder.class.getName(), new Object[] {bufferSize}, new String[] {int.class.getName()}); } /** * Constructs a {@code FetchingEventRelay} object with user-specific * configuration and executor to fetch notifications via the * {@link EventClientDelegateMBean}. * * @param delegate The {@code EventClientDelegateMBean} to work with. * @param bufferSize The buffer size for saving notifications in * {@link EventClientDelegateMBean} before they are fetched. * @param timeout The waiting time in millseconds when fetching * notifications from an {@code EventClientDelegateMBean}. * @param maxNotifs The maximum notifications to fetch every time. * @param executor Used to do the fetching. * @param forwarderName the class name of a user specific EventForwarder * to create in server to forward notifications to this object. The class * should be a subclass of the class {@link FetchingEventForwarder}. * @param params the parameters passed to create {@code forwarderName} * @param sig the signature of the {@code params} * @throws IOException if failed to communicate with the {@code delegate}. * @throws MBeanException if unable to add a client to the remote * {@code EventClientDelegateMBean} (see {@link * EventClientDelegateMBean#addClient(String, Object[], String[]) * EventClientDelegateMBean.addClient}). * @throws IllegalArgumentException if {@code bufferSize} or * {@code maxNotifs} is less than {@code 1} * @throws NullPointerException if {@code delegate} is {@code null}. */ public FetchingEventRelay(EventClientDelegateMBean delegate, int bufferSize, long timeout, int maxNotifs, Executor executor, String forwarderName, Object[] params, String[] sig) throws IOException, MBeanException { if (logger.traceOn()) { logger.trace("FetchingEventRelay", "delegateMBean "+ bufferSize+" "+ timeout+" "+ maxNotifs+" "+ executor+" "+ forwarderName+" "); } if(delegate == null) { throw new NullPointerException("Null EventClientDelegateMBean!"); } if (bufferSize<=1) { throw new IllegalArgumentException( "The bufferSize cannot be less than 1, no meaning."); } if (maxNotifs<=1) { throw new IllegalArgumentException( "The maxNotifs cannot be less than 1, no meaning."); } clientId = delegate.addClient( forwarderName, params, sig); this.delegate = delegate; this.timeout = timeout; this.maxNotifs = maxNotifs; if (executor == null) { executor = Executors.newSingleThreadScheduledExecutor( daemonThreadFactory); } this.executor = executor; if (executor instanceof ScheduledExecutorService) leaseScheduler = (ScheduledExecutorService) executor; else { leaseScheduler = Executors.newSingleThreadScheduledExecutor( daemonThreadFactory); } startSequenceNumber = 0; fetchingJob = new MyJob(); } public void setEventReceiver(EventReceiver eventReceiver) { if (logger.traceOn()) { logger.trace("setEventReceiver", ""+eventReceiver); } EventReceiver old = this.eventReceiver; synchronized(fetchingJob) { this.eventReceiver = eventReceiver; if (old == null && eventReceiver != null) fetchingJob.resume(); } } public String getClientId() { return clientId; } public void stop() { if (logger.traceOn()) { logger.trace("stop", ""); } synchronized(fetchingJob) { if (stopped) { return; } stopped = true; clientId = null; } } private class MyJob extends RepeatedSingletonJob { public MyJob() { super(executor); } public boolean isSuspended() { boolean b; synchronized(FetchingEventRelay.this) { b = stopped || (eventReceiver == null) || (clientId == null); } if (logger.traceOn()) { logger.trace("-MyJob-isSuspended", ""+b); } return b; } public void task() { logger.trace("MyJob-task", ""); long fetchTimeout = timeout; NotificationResult nr = null; Throwable failedExcep = null; try { nr = delegate.fetchNotifications( clientId, startSequenceNumber, maxNotifs, fetchTimeout); } catch (Exception e) { if (isSerialOrClassNotFound(e)) { try { nr = fetchOne(); } catch (Exception ee) { failedExcep = e; } } else { failedExcep = e; } } if (failedExcep != null && !isSuspended()) { logger.fine("MyJob-task", "Failed to fetch notification, stopping...", failedExcep); try { eventReceiver.failed(failedExcep); } catch (Exception e) { logger.trace( "MyJob-task", "exception from eventReceiver.failed", e); } stop(); } else if (nr != null) { try { eventReceiver.receive(nr); } catch (RuntimeException e) { logger.trace( "MyJob-task", "exception delivering notifs to EventClient", e); } finally { startSequenceNumber = nr.getNextSequenceNumber(); } } } } private NotificationResult fetchOne() throws Exception { logger.trace("fetchOne", ""); while (true) { try { // 1 notif to skip possible missing class return delegate.fetchNotifications( clientId, startSequenceNumber, 1, timeout); } catch (Exception e) { if (isSerialOrClassNotFound(e)) { // skip and continue if (logger.traceOn()) { logger.trace("fetchOne", "Ignore", e); } eventReceiver.nonFatal(e); startSequenceNumber++; } else { throw e; } } } } static boolean isSerialOrClassNotFound(Exception e) { Throwable cause = e.getCause(); while (cause != null && !(cause instanceof ClassNotFoundException) && !(cause instanceof NotSerializableException)) { cause = cause.getCause(); } return (cause instanceof ClassNotFoundException || cause instanceof NotSerializableException); } private long startSequenceNumber = 0; private EventReceiver eventReceiver = null; private final EventClientDelegateMBean delegate; private String clientId; private boolean stopped = false; private volatile ScheduledFuture leaseRenewalFuture; private final Executor executor; private final ScheduledExecutorService leaseScheduler; private final MyJob fetchingJob; private final long timeout; private final int maxNotifs; private static final ClassLogger logger = new ClassLogger("javax.management.event", "FetchingEventRelay"); private static final ThreadFactory daemonThreadFactory = new DaemonThreadFactory("FetchingEventRelay-executor"); }