/* * Copyright 2007 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Sun designates this * particular file as subject to the "Classpath" exception as provided * by Sun in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, * CA 95054 USA or visit www.sun.com if you need additional information or * have any questions. */ package javax.management.event; import com.sun.jmx.event.DaemonThreadFactory; import com.sun.jmx.event.LeaseRenewer; import com.sun.jmx.event.ReceiverBuffer; import com.sun.jmx.event.RepeatedSingletonJob; import com.sun.jmx.namespace.JMXNamespaceUtils; import com.sun.jmx.mbeanserver.PerThreadGroupPool; import com.sun.jmx.remote.util.ClassLogger; import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.management.InstanceNotFoundException; import javax.management.ListenerNotFoundException; import javax.management.MBeanNotificationInfo; import javax.management.MBeanServerConnection; import javax.management.Notification; import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.NotificationResult; import javax.management.remote.TargetedNotification; /** *
This class is used to manage its notification listeners on the client * side in the same way as on the MBean server side. This class needs to work * with an {@link EventClientDelegateMBean} on the server side.
* *A user can specify an {@link EventRelay} object to specify how to receive * notifications forwarded by the {@link EventClientDelegateMBean}. By default, * the class {@link FetchingEventRelay} is used.
* *A user can specify an {@link java.util.concurrent.Executor Executor} * to distribute notifications to local listeners. If no executor is * specified, the thread in the {@link EventRelay} which calls {@link * EventReceiver#receive EventReceiver.receive} will be reused to distribute * the notifications (in other words, to call the {@link * NotificationListener#handleNotification handleNotification} method of the * appropriate listeners). It is useful to make a separate thread do this * distribution in some cases. For example, if network communication is slow, * the forwarding thread can concentrate on communication while, locally, * the distributing thread distributes the received notifications. Another * usage is to share a thread pool between many clients, for scalability. * Note, though, that if the {@code Executor} can create more than one thread * then it is possible that listeners will see notifications in a different * order from the order in which they were sent.
* *An object of this class sends notifications to listeners added with * {@link #addEventClientListener}. The {@linkplain Notification#getType() * type} of each such notification is one of {@link #FAILED}, {@link #NONFATAL}, * or {@link #NOTIFS_LOST}.
* * @since JMX 2.0 */ public class EventClient implements EventConsumer, NotificationManager { /** *A notification string type used by an {@code EventClient} object * to inform a listener added by {@link #addEventClientListener} that * it failed to get notifications from a remote server, and that it is * possible that no more notifications will be delivered.
* * @see #addEventClientListener * @see EventReceiver#failed */ public static final String FAILED = "jmx.event.service.failed"; /** *Reports that an unexpected exception has been received by the {@link * EventRelay} object but that it is non-fatal. For example, a notification * received is not serializable or its class is not found.
* * @see #addEventClientListener * @see EventReceiver#nonFatal */ public static final String NONFATAL = "jmx.event.service.nonfatal"; /** *A notification string type used by an {@code EventClient} object to * inform a listener added by {@code #addEventClientListener} that it * has detected that notifications have been lost. The {@link * Notification#getUserData() userData} of the notification is a Long which * is an upper bound on the number of lost notifications that have just * been detected.
* * @see #addEventClientListener */ public static final String NOTIFS_LOST = "jmx.event.service.notifs.lost"; /** * The default lease time, {@value}, in milliseconds. * * @see EventClientDelegateMBean#lease */ public static final long DEFAULT_LEASE_TIMEOUT = 300000; /** *Constructs a default {@code EventClient} object.
* *This object creates a {@link FetchingEventRelay} object to * receive notifications forwarded by the {@link EventClientDelegateMBean}. * The {@link EventClientDelegateMBean} that it works with is the * one registered with the {@linkplain EventClientDelegate#OBJECT_NAME * default ObjectName}. The thread from the {@link FetchingEventRelay} * object that fetches the notifications is also used to distribute them. * * @param conn An {@link MBeanServerConnection} object used to communicate * with an {@link EventClientDelegateMBean} MBean. * * @throws IllegalArgumentException If {@code conn} is null. * @throws IOException If an I/O error occurs when communicating with the * {@code EventClientDelegateMBean}. */ public EventClient(MBeanServerConnection conn) throws IOException { this(EventClientDelegate.getProxy(conn)); } /** * Constructs an {@code EventClient} object with a specified * {@link EventClientDelegateMBean}. * *
This object creates a {@link FetchingEventRelay} object to receive
* notifications forwarded by the {@link EventClientDelegateMBean}. The
* thread from the {@link FetchingEventRelay} object that fetches the
* notifications is also used to distribute them.
*
* @param delegate An {@link EventClientDelegateMBean} object to work with.
*
* @throws IllegalArgumentException If {@code delegate} is null.
* @throws IOException If an I/O error occurs when communicating with the
* the {@link EventClientDelegateMBean}.
*/
public EventClient(EventClientDelegateMBean delegate)
throws IOException {
this(delegate, null, null, null, DEFAULT_LEASE_TIMEOUT);
}
/**
* Constructs an {@code EventClient} object with the specified
* {@link EventClientDelegateMBean}, {@link EventRelay}
* object, and distributing thread.
*
* @param delegate An {@link EventClientDelegateMBean} object to work with.
* Usually, this will be a proxy constructed using
* {@link EventClientDelegate#getProxy}.
* @param eventRelay An object used to receive notifications
* forwarded by the {@link EventClientDelegateMBean}. If {@code null}, a
* {@link FetchingEventRelay} object will be used.
* @param distributingExecutor Used to distribute notifications to local
* listeners. If {@code null}, the thread that calls {@link
* EventReceiver#receive EventReceiver.receive} from the {@link EventRelay}
* object is used.
* @param leaseScheduler An object that will be used to schedule the
* periodic {@linkplain EventClientDelegateMBean#lease lease updates}.
* If {@code null}, a default scheduler will be used.
* @param requestedLeaseTime The lease time used to keep this client alive
* in the {@link EventClientDelegateMBean}. A value of zero is equivalent
* to the {@linkplain #DEFAULT_LEASE_TIMEOUT default value}.
*
* @throws IllegalArgumentException If {@code delegate} is null.
* @throws IOException If an I/O error occurs when communicating with the
* {@link EventClientDelegateMBean}.
*/
public EventClient(EventClientDelegateMBean delegate,
EventRelay eventRelay,
Executor distributingExecutor,
ScheduledExecutorService leaseScheduler,
long requestedLeaseTime)
throws IOException {
if (delegate == null) {
throw new IllegalArgumentException("Null EventClientDelegateMBean");
}
if (requestedLeaseTime == 0)
requestedLeaseTime = DEFAULT_LEASE_TIMEOUT;
else if (requestedLeaseTime < 0) {
throw new IllegalArgumentException(
"Negative lease time: " + requestedLeaseTime);
}
eventClientDelegate = delegate;
if (eventRelay != null) {
this.eventRelay = eventRelay;
} else {
try {
this.eventRelay = new FetchingEventRelay(delegate);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
// impossible?
final IOException ioee = new IOException(e.toString());
ioee.initCause(e);
throw ioee;
}
}
if (distributingExecutor == null)
distributingExecutor = callerExecutor;
this.distributingExecutor = distributingExecutor;
this.dispatchingJob = new DispatchingJob();
clientId = this.eventRelay.getClientId();
this.requestedLeaseTime = requestedLeaseTime;
if (leaseScheduler == null)
leaseScheduler = defaultLeaseScheduler();
leaseRenewer = new LeaseRenewer(leaseScheduler, renewLease);
if (logger.traceOn()) {
logger.trace("init", "New EventClient: "+clientId);
}
}
private static ScheduledExecutorService defaultLeaseScheduler() {
// The default lease scheduler uses a ScheduledThreadPoolExecutor
// with a maximum of 20 threads. This means that if you have many
// EventClient instances and some of them get blocked (because of an
// unresponsive network, for example), then even the instances that
// are connected to responsive servers may have their leases expire.
// XXX check if the above is true and possibly fix.
PerThreadGroupPool.Create Closes this EventClient, removes all listeners and stops receiving
* notifications. This method calls {@link
* EventClientDelegateMBean#removeClient(String)} and {@link
* EventRelay#stop}. Both operations occur even if one of them
* throws an {@code IOException}.
*
* @throws IOException if an I/O error occurs when communicating with
* {@link EventClientDelegateMBean}, or if {@link EventRelay#stop}
* throws an {@code IOException}.
*/
public void close() throws IOException {
if (logger.traceOn()) {
logger.trace("close", clientId);
}
synchronized(listenerInfoMap) {
if (closed) {
return;
}
closed = true;
listenerInfoMap.clear();
}
if (leaseRenewer != null)
leaseRenewer.close();
IOException ioe = null;
try {
eventRelay.stop();
} catch (IOException e) {
ioe = e;
logger.debug("close", "EventRelay.stop", e);
}
try {
eventClientDelegate.removeClient(clientId);
} catch (Exception e) {
if (e instanceof IOException)
ioe = (IOException) e;
else
ioe = new IOException(e);
logger.debug("close",
"Got exception when removing "+clientId, e);
}
if (ioe != null)
throw ioe;
}
/**
* Determine if this {@code EventClient} is closed. Return the {@link EventRelay} associated with this
* {@code EventClient}. Return the lease time that this {@code EventClient} requests
* on every lease renewal. Adds a set of listeners to the remote MBeanServer. This method can
* be used to copy the listeners from one {@code EventClient} to another. A listener is represented by a {@link ListenerInfo} object. The listener
* is added by calling {@link #subscribe(ObjectName,
* NotificationListener, NotificationFilter, Object)} if the method
* {@link ListenerInfo#isSubscription() isSubscription}
* returns {@code true}; otherwise it is added by calling
* {@link #addNotificationListener(ObjectName, NotificationListener,
* NotificationFilter, Object)}. The method returns the listeners which were added successfully. The
* elements in the returned collection are a subset of the elements in
* {@code infoList}. If all listeners were added successfully, the two
* collections are the same. If no listener was added successfully, the
* returned collection is empty. Get the types of notification that an {@code EventClient} can send
* to listeners added with {@link #addEventClientListener
* addEventClientListener}. The {@code EventClient} object creates a default
* {@link FetchingEventRelay} object to
* receive notifications forwarded by the {@link EventClientDelegateMBean}.
* The {@link EventClientDelegateMBean} it works with is the
* default one registered with the ObjectName
* {@link EventClientDelegate#OBJECT_NAME
* OBJECT_NAME}.
* The thread from the {@link FetchingEventRelay} object that fetches the
* notifications is also used to distribute them.
*
* @param conn An {@link MBeanServerConnection} object used to communicate
* with an {@link EventClientDelegateMBean}.
* @throws IllegalArgumentException If the value of {@code conn} is null,
* or the default {@link EventClientDelegateMBean} is not registered.
* @throws IOException If an I/O error occurs.
*/
public static MBeanServerConnection getEventClientConnection(
final MBeanServerConnection conn)
throws IOException {
return getEventClientConnection(conn, null);
}
/**
* Constructs an MBeanServerConnection that uses an {@code EventClient}
* object with a user-specific {@link EventRelay}
* object.
*
* The {@link EventClientDelegateMBean} which it works with is the
* default one registered with the ObjectName
* {@link EventClientDelegate#OBJECT_NAME
* OBJECT_NAME}
* The thread that calls {@link EventReceiver#receive
* EventReceiver.receive} from the {@link EventRelay} object is used
* to distribute notifications to their listeners.
*
* @param conn An {@link MBeanServerConnection} object used to communicate
* with an {@link EventClientDelegateMBean}.
* @param eventRelay A user-specific object used to receive notifications
* forwarded by the {@link EventClientDelegateMBean}. If null, the default
* {@link FetchingEventRelay} object is used.
* @throws IllegalArgumentException If the value of {@code conn} is null,
* or the default {@link EventClientDelegateMBean} is not registered.
* @throws IOException If an I/O error occurs.
*/
public static MBeanServerConnection getEventClientConnection(
final MBeanServerConnection conn,
final EventRelay eventRelay)
throws IOException {
if (newEventConn == null) {
throw new IllegalArgumentException(
"Class not found: EventClientConnection");
}
checkInit(conn,null);
final Callable Get the client id of this {@code EventClient} in the
* {@link EventClientDelegateMBean}.
*
* @return the client id.
*
* @see EventClientDelegateMBean#addClient(String, Object[], String[])
* EventClientDelegateMBean.addClient
*/
public String getClientId() {
return clientId;
}
/**
* Returns a JMX Connector that will use an {@link EventClient}
* to subscribe for notifications. If the server doesn't have
* an {@link EventClientDelegateMBean}, then the connector will
* use the legacy notification mechanism instead.
*
* @param wrapped The underlying JMX Connector wrapped by the returned
* connector.
*
* @return A JMX Connector that will uses an {@link EventClient}, if
* available.
*
* @see EventClient#getEventClientConnection(MBeanServerConnection)
*/
public static JMXConnector withEventClient(final JMXConnector wrapped) {
return JMXNamespaceUtils.withEventClient(wrapped);
}
private static final PerThreadGroupPool