提交 e6ee3891 编写于 作者: E emcmanus

6733589: Intermittent failure of test/javax/management/eventService/SharingThreadTest.java

Reviewed-by: sjiang
上级 68172430
...@@ -32,8 +32,6 @@ ...@@ -32,8 +32,6 @@
*/ */
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -60,23 +58,22 @@ import javax.management.remote.JMXServiceURL; ...@@ -60,23 +58,22 @@ import javax.management.remote.JMXServiceURL;
public class SharingThreadTest { public class SharingThreadTest {
private static MBeanServer mbeanServer = MBeanServerFactory.createMBeanServer(); private static MBeanServer mbeanServer = MBeanServerFactory.createMBeanServer();
private static List<Notification> notifList = new ArrayList<Notification>();
private static ObjectName emitter; private static ObjectName emitter;
private static NotificationEmitter emitterImpl; private static NotificationEmitter emitterImpl;
private static JMXServiceURL url; private static JMXServiceURL url;
private static JMXConnectorServer server; private static JMXConnectorServer server;
private static JMXConnector conn;
private static int toSend = 10; private static int toSend = 10;
private static long sequenceNumber = 0;
private static final long bigWaiting = 6000; private static final long bigWaiting = 6000;
private static int counter = 0; private static int counter = 0;
private static int jobs = 10; private static int jobs = 10;
private static int endedJobs = 0; private static int endedJobs = 0;
private static volatile String failure;
private static Executor sharedExecutor = new ThreadPoolExecutor(0, 1, 1000, private static Executor sharedExecutor = new ThreadPoolExecutor(0, 1, 1000,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue(jobs)); TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(jobs));
//Executors.newFixedThreadPool(1); //Executors.newFixedThreadPool(1);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
...@@ -93,7 +90,7 @@ public class SharingThreadTest { ...@@ -93,7 +90,7 @@ public class SharingThreadTest {
EventClientDelegateMBean.OBJECT_NAME); EventClientDelegateMBean.OBJECT_NAME);
sharedExecutor = new ThreadPoolExecutor(1, 1, 1000, sharedExecutor = new ThreadPoolExecutor(1, 1, 1000,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue(jobs)); TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(jobs));
} }
emitter = new ObjectName("Default:name=NotificationEmitter"); emitter = new ObjectName("Default:name=NotificationEmitter");
...@@ -133,35 +130,16 @@ public class SharingThreadTest { ...@@ -133,35 +130,16 @@ public class SharingThreadTest {
noise.setDaemon(true); noise.setDaemon(true);
noise.start(); noise.start();
Thread[] threads = new Thread[jobs];
try { try {
for (String type: types) { for (String type: types) {
System.out.println("\n\n>>> Testing "+type+" on "+url+" ..."); System.out.println("\n\n>>> Testing "+type+" on "+url+" ...");
newConn(); JMXConnector conn = newConn();
for (int i=0; i<jobs; i++) { try {
threads[i] = new Thread(new Job(type)); testType(type, conn);
threads[i].setDaemon(true); } finally {
threads[i].start(); conn.close();
} System.out.println(">>> Testing "+type+" on "+url+" ... done");
// to wait
long toWait = bigWaiting*jobs;
long stopTime = System.currentTimeMillis() + toWait;
synchronized(SharingThreadTest.class) {
while (endedJobs < jobs && toWait > 0) {
SharingThreadTest.class.wait(toWait);
toWait = stopTime - System.currentTimeMillis();
}
}
if (endedJobs != jobs) {
throw new RuntimeException("Need to set bigger waiting timeout?");
} }
endedJobs = 0;
conn.close();
System.out.println(">>> Testing "+type+" on "+url+" ... done");
} }
} finally { } finally {
server.stop(); server.stop();
...@@ -169,13 +147,40 @@ public class SharingThreadTest { ...@@ -169,13 +147,40 @@ public class SharingThreadTest {
} }
} }
private static void testType(String type, JMXConnector conn) throws Exception {
Thread[] threads = new Thread[jobs];
for (int i=0; i<jobs; i++) {
threads[i] = new Thread(new Job(type, conn));
threads[i].setDaemon(true);
threads[i].start();
}
// to wait
long toWait = bigWaiting*jobs;
long stopTime = System.currentTimeMillis() + toWait;
synchronized(SharingThreadTest.class) {
while (endedJobs < jobs && toWait > 0 && failure == null) {
SharingThreadTest.class.wait(toWait);
toWait = stopTime - System.currentTimeMillis();
}
}
if (endedJobs != jobs && failure == null) {
throw new RuntimeException("Need to set bigger waiting timeout?");
}
endedJobs = 0;
}
public static class Job implements Runnable { public static class Job implements Runnable {
public Job(String type) { public Job(String type, JMXConnector conn) {
this.type = type; this.type = type;
this.conn = conn;
} }
public void run() { public void run() {
try { try {
test(type); test(type, conn);
synchronized(SharingThreadTest.class) { synchronized(SharingThreadTest.class) {
endedJobs++; endedJobs++;
...@@ -184,6 +189,7 @@ public class SharingThreadTest { ...@@ -184,6 +189,7 @@ public class SharingThreadTest {
} }
} }
} catch (RuntimeException re) { } catch (RuntimeException re) {
re.printStackTrace(System.out);
throw re; throw re;
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
...@@ -191,16 +197,17 @@ public class SharingThreadTest { ...@@ -191,16 +197,17 @@ public class SharingThreadTest {
} }
private final String type; private final String type;
private final JMXConnector conn;
} }
private static void test(String type) throws Exception { private static void test(String type, JMXConnector conn) throws Exception {
String id = getId(); String id = getId();
Listener listener = new Listener(id); Listener listener = new Listener(id);
Filter filter = new Filter(id); Filter filter = new Filter(id);
//newConn(); //newConn();
EventClient ec = newEventClient(type); EventClient ec = newEventClient(type, conn);
System.out.println(">>> ("+id+") To receive notifications "+toSend); System.out.println(">>> ("+id+") To receive notifications "+toSend);
ec.addNotificationListener(emitter, ec.addNotificationListener(emitter,
...@@ -213,8 +220,7 @@ public class SharingThreadTest { ...@@ -213,8 +220,7 @@ public class SharingThreadTest {
+toSend+", but got: "+listener.received); +toSend+", but got: "+listener.received);
} }
// not close the EventClient to keep using thread ec.close();
//ec.close();
} }
//-------------------------- //--------------------------
...@@ -232,16 +238,16 @@ public class SharingThreadTest { ...@@ -232,16 +238,16 @@ public class SharingThreadTest {
System.exit(1); System.exit(1);
} }
System.out.println("("+id+") received "+notif.getSequenceNumber()); System.out.println("("+id+") received "+notif.getSequenceNumber());
synchronized (notifList) { synchronized (this) {
received++; received++;
if (sequenceNB < 0) { if (sequenceNB < 0) {
sequenceNB = notif.getSequenceNumber(); sequenceNB = notif.getSequenceNumber();
} else if(++sequenceNB != notif.getSequenceNumber()) { } else if(++sequenceNB != notif.getSequenceNumber()) {
throw new RuntimeException("Wrong sequence number, expecte: " fail("(" + id + ") Wrong sequence number, expected: "
+sequenceNB+", but got: "+notif.getSequenceNumber()); +sequenceNB+", but got: "+notif.getSequenceNumber());
} }
if (received >= toSend) { if (received >= toSend || failure != null) {
this.notify(); this.notify();
} }
} }
...@@ -251,20 +257,13 @@ public class SharingThreadTest { ...@@ -251,20 +257,13 @@ public class SharingThreadTest {
long toWait = timeout; long toWait = timeout;
long stopTime = System.currentTimeMillis() + timeout; long stopTime = System.currentTimeMillis() + timeout;
synchronized(this) { synchronized(this) {
while (received < nb && toWait > 0) { while (received < nb && toWait > 0 && failure == null) {
this.wait(toWait); this.wait(toWait);
toWait = stopTime - System.currentTimeMillis(); toWait = stopTime - System.currentTimeMillis();
} }
} }
} }
public void clear() {
synchronized(this) {
received = 0;
sequenceNB = -1;
}
}
private String id; private String id;
private int received = 0; private int received = 0;
...@@ -282,11 +281,6 @@ public class SharingThreadTest { ...@@ -282,11 +281,6 @@ public class SharingThreadTest {
private String id; private String id;
} }
private static NotificationListener dummyListener = new NotificationListener() {
public void handleNotification(Notification notif, Object handback) {
}
};
public static class NotificationEmitter extends NotificationBroadcasterSupport public static class NotificationEmitter extends NotificationBroadcasterSupport
implements NotificationEmitterMBean { implements NotificationEmitterMBean {
...@@ -309,6 +303,7 @@ public class SharingThreadTest { ...@@ -309,6 +303,7 @@ public class SharingThreadTest {
if (userData != null) { if (userData != null) {
System.out.println(">>> ("+userData+") sending "+nb); System.out.println(">>> ("+userData+") sending "+nb);
} }
long sequenceNumber = 0;
for (int i = 0; i<nb; i++) { for (int i = 0; i<nb; i++) {
Notification notif = new Notification(myType, emitter, Notification notif = new Notification(myType, emitter,
sequenceNumber++); sequenceNumber++);
...@@ -333,11 +328,12 @@ public class SharingThreadTest { ...@@ -333,11 +328,12 @@ public class SharingThreadTest {
public void sendNotif(int nb, String userData); public void sendNotif(int nb, String userData);
} }
private static void newConn() throws IOException { private static JMXConnector newConn() throws IOException {
conn = JMXConnectorFactory.connect(url); return JMXConnectorFactory.connect(url);
} }
private static EventClient newEventClient(String type) throws Exception { private static EventClient newEventClient(String type, JMXConnector conn)
throws Exception {
EventClientDelegateMBean proxy = EventClientDelegateMBean proxy =
EventClientDelegate.getProxy(conn.getMBeanServerConnection()); EventClientDelegate.getProxy(conn.getMBeanServerConnection());
if (type.equals("PushEventRelay")) { if (type.equals("PushEventRelay")) {
...@@ -361,4 +357,9 @@ public class SharingThreadTest { ...@@ -361,4 +357,9 @@ public class SharingThreadTest {
return String.valueOf(counter++); return String.valueOf(counter++);
} }
} }
private static void fail(String msg) {
System.out.println("FAIL: " + msg);
failure = msg;
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册