提交 dae3a856 编写于 作者: D dl

6868712: Improve concurrent queue tests

Summary: Fix all known flaky tests, plus minor maintenance
Reviewed-by: martin, chegar
上级 dd70c1b8
...@@ -178,6 +178,11 @@ public class BiggernYours { ...@@ -178,6 +178,11 @@ public class BiggernYours {
new ConcurrentLinkedQueue() { new ConcurrentLinkedQueue() {
public int size() {return randomize(super.size());}}); public int size() {return randomize(super.size());}});
// testCollections(
// new LinkedTransferQueue(),
// new LinkedTransferQueue() {
// public int size() {return randomize(super.size());}});
testCollections( testCollections(
new LinkedBlockingQueue(), new LinkedBlockingQueue(),
new LinkedBlockingQueue() { new LinkedBlockingQueue() {
......
...@@ -49,6 +49,7 @@ public class IteratorAtEnd { ...@@ -49,6 +49,7 @@ public class IteratorAtEnd {
testCollection(new LinkedBlockingQueue()); testCollection(new LinkedBlockingQueue());
testCollection(new ArrayBlockingQueue(100)); testCollection(new ArrayBlockingQueue(100));
testCollection(new ConcurrentLinkedQueue()); testCollection(new ConcurrentLinkedQueue());
// testCollection(new LinkedTransferQueue());
testMap(new HashMap()); testMap(new HashMap());
testMap(new Hashtable()); testMap(new Hashtable());
......
...@@ -76,6 +76,7 @@ public class MOAT { ...@@ -76,6 +76,7 @@ public class MOAT {
testCollection(new LinkedBlockingQueue<Integer>(20)); testCollection(new LinkedBlockingQueue<Integer>(20));
testCollection(new LinkedBlockingDeque<Integer>(20)); testCollection(new LinkedBlockingDeque<Integer>(20));
testCollection(new ConcurrentLinkedQueue<Integer>()); testCollection(new ConcurrentLinkedQueue<Integer>());
// testCollection(new LinkedTransferQueue<Integer>());
testCollection(new ConcurrentSkipListSet<Integer>()); testCollection(new ConcurrentSkipListSet<Integer>());
testCollection(Arrays.asList(new Integer(42))); testCollection(Arrays.asList(new Integer(42)));
testCollection(Arrays.asList(1,2,3)); testCollection(Arrays.asList(1,2,3));
...@@ -161,6 +162,7 @@ public class MOAT { ...@@ -161,6 +162,7 @@ public class MOAT {
equal(c.toString(),"[]"); equal(c.toString(),"[]");
equal(c.toArray().length, 0); equal(c.toArray().length, 0);
equal(c.toArray(new Object[0]).length, 0); equal(c.toArray(new Object[0]).length, 0);
check(c.toArray(new Object[]{42})[0] == null);
Object[] a = new Object[1]; a[0] = Boolean.TRUE; Object[] a = new Object[1]; a[0] = Boolean.TRUE;
equal(c.toArray(a), a); equal(c.toArray(a), a);
......
...@@ -234,6 +234,7 @@ public class RacingCollections { ...@@ -234,6 +234,7 @@ public class RacingCollections {
List<Queue<Integer>> list = List<Queue<Integer>> list =
new ArrayList<Queue<Integer>>(newConcurrentDeques()); new ArrayList<Queue<Integer>>(newConcurrentDeques());
list.add(new LinkedBlockingQueue<Integer>(10)); list.add(new LinkedBlockingQueue<Integer>(10));
// list.add(new LinkedTransferQueue<Integer>());
return list; return list;
} }
......
...@@ -69,6 +69,7 @@ public class RemoveContains { ...@@ -69,6 +69,7 @@ public class RemoveContains {
test(new ArrayBlockingQueue<String>(10)); test(new ArrayBlockingQueue<String>(10));
test(new LinkedBlockingQueue<String>(10)); test(new LinkedBlockingQueue<String>(10));
test(new LinkedBlockingDeque<String>(10)); test(new LinkedBlockingDeque<String>(10));
// test(new LinkedTransferQueue<String>());
test(new ArrayDeque<String>(10)); test(new ArrayDeque<String>(10));
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
......
...@@ -75,10 +75,12 @@ public class CancelledProducerConsumerLoops { ...@@ -75,10 +75,12 @@ public class CancelledProducerConsumerLoops {
} }
static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception { static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
if (print)
System.out.printf("%-18s", q.getClass().getSimpleName());
LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer); CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
Future[] prods = new Future[npairs]; Future<?>[] prods = new Future<?>[npairs];
Future[] cons = new Future[npairs]; Future<?>[] cons = new Future<?>[npairs];
for (int i = 0; i < npairs; ++i) { for (int i = 0; i < npairs; ++i) {
prods[i] = pool.submit(new Producer(q, barrier, iters)); prods[i] = pool.submit(new Producer(q, barrier, iters));
...@@ -119,21 +121,13 @@ public class CancelledProducerConsumerLoops { ...@@ -119,21 +121,13 @@ public class CancelledProducerConsumerLoops {
static void oneTest(int pairs, int iters) throws Exception { static void oneTest(int pairs, int iters) throws Exception {
if (print)
System.out.print("ArrayBlockingQueue ");
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
if (print)
System.out.print("LinkedBlockingQueue ");
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters); oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
if (print) // oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
System.out.print("SynchronousQueue ");
oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8); oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
/* PriorityBlockingQueue is unbounded /* PriorityBlockingQueue is unbounded
if (print)
System.out.print("PriorityBlockingQueue ");
oneRun(new PriorityBlockingQueue<Integer>(iters / 2 * pairs), pairs, iters / 4); oneRun(new PriorityBlockingQueue<Integer>(iters / 2 * pairs), pairs, iters / 4);
*/ */
} }
......
...@@ -66,7 +66,8 @@ public class Interrupt { ...@@ -66,7 +66,8 @@ public class Interrupt {
static void testQueue(final BlockingQueue<Object> q) { static void testQueue(final BlockingQueue<Object> q) {
try { try {
final BlockingDeque<Object> deq = final BlockingDeque<Object> deq =
q instanceof BlockingDeque ? (BlockingDeque<Object>) q : null; (q instanceof BlockingDeque<?>) ?
(BlockingDeque<Object>) q : null;
q.clear(); q.clear();
List<Fun> fs = new ArrayList<Fun>(); List<Fun> fs = new ArrayList<Fun>();
fs.add(new Fun() { void f() throws Throwable fs.add(new Fun() { void f() throws Throwable
...@@ -107,7 +108,10 @@ public class Interrupt { ...@@ -107,7 +108,10 @@ public class Interrupt {
{ deq.offerLast(1, 7, SECONDS); }}); { deq.offerLast(1, 7, SECONDS); }});
} }
checkInterrupted(fs); checkInterrupted(fs);
} catch (Throwable t) { unexpected(t); } } catch (Throwable t) {
System.out.printf("Failed: %s%n", q.getClass().getSimpleName());
unexpected(t);
}
} }
private static void realMain(final String[] args) throws Throwable { private static void realMain(final String[] args) throws Throwable {
......
...@@ -32,44 +32,18 @@ import java.util.*; ...@@ -32,44 +32,18 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
public class LastElement { public class LastElement {
static volatile int passed = 0, failed = 0; void test(String[] args) throws Throwable {
static void fail(String msg) {
failed++;
new Exception(msg).printStackTrace();
}
static void pass() {
passed++;
}
static void unexpected(Throwable t) {
failed++;
t.printStackTrace();
}
static void check(boolean condition, String msg) {
if (condition)
passed++;
else
fail(msg);
}
static void check(boolean condition) {
check(condition, "Assertion failure");
}
public static void main(String[] args) throws Throwable {
testQueue(new LinkedBlockingQueue<Integer>()); testQueue(new LinkedBlockingQueue<Integer>());
// Uncomment when LinkedBlockingDeque is integrated testQueue(new LinkedBlockingDeque<Integer>());
//testQueue(new LinkedBlockingDeque<Integer>()); testQueue(new ArrayBlockingQueue<Integer>(10, true));
testQueue(new ArrayBlockingQueue<Integer>(10)); testQueue(new ArrayBlockingQueue<Integer>(10, false));
// testQueue(new LinkedTransferQueue<Integer>());
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new Exception("Some tests failed"); if (failed > 0) throw new Exception("Some tests failed");
} }
private static void testQueue(BlockingQueue<Integer> q) throws Throwable { void testQueue(BlockingQueue<Integer> q) throws Throwable {
Integer one = 1; Integer one = 1;
Integer two = 2; Integer two = 2;
Integer three = 3; Integer three = 3;
...@@ -102,4 +76,21 @@ public class LastElement { ...@@ -102,4 +76,21 @@ public class LastElement {
catch (Throwable t) {unexpected(t);} catch (Throwable t) {unexpected(t);}
check(q.isEmpty() && q.size() == 0); check(q.isEmpty() && q.size() == 0);
} }
//--------------------- Infrastructure ---------------------------
volatile int passed = 0, failed = 0;
void pass() {passed++;}
void fail() {failed++; Thread.dumpStack();}
void fail(String msg) {System.err.println(msg); fail();}
void unexpected(Throwable t) {failed++; t.printStackTrace();}
void check(boolean cond) {if (cond) pass(); else fail();}
void equal(Object x, Object y) {
if (x == null ? y == null : x.equals(y)) pass();
else fail(x + " not equal to " + y);}
public static void main(String[] args) throws Throwable {
new LastElement().instanceMain(args);}
public void instanceMain(String[] args) throws Throwable {
try {test(args);} catch (Throwable t) {unexpected(t);}
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new AssertionError("Some tests failed");}
} }
...@@ -77,6 +77,7 @@ public class MultipleProducersSingleConsumerLoops { ...@@ -77,6 +77,7 @@ public class MultipleProducersSingleConsumerLoops {
print = true; print = true;
for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) { for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
System.out.println("----------------------------------------");
System.out.println("Producers:" + i); System.out.println("Producers:" + i);
oneTest(i, iters); oneTest(i, iters);
Thread.sleep(100); Thread.sleep(100);
...@@ -87,29 +88,20 @@ public class MultipleProducersSingleConsumerLoops { ...@@ -87,29 +88,20 @@ public class MultipleProducersSingleConsumerLoops {
} }
static void oneTest(int producers, int iters) throws Exception { static void oneTest(int producers, int iters) throws Exception {
if (print)
System.out.print("ArrayBlockingQueue ");
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
if (print)
System.out.print("LinkedBlockingQueue ");
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters); oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
// oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
// Don't run PBQ since can legitimately run out of memory // Don't run PBQ since can legitimately run out of memory
// if (print) // if (print)
// System.out.print("PriorityBlockingQueue "); // System.out.print("PriorityBlockingQueue ");
// oneRun(new PriorityBlockingQueue<Integer>(), producers, iters); // oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
if (print)
System.out.print("SynchronousQueue ");
oneRun(new SynchronousQueue<Integer>(), producers, iters); oneRun(new SynchronousQueue<Integer>(), producers, iters);
if (print) if (print)
System.out.print("SynchronousQueue(fair) "); System.out.println("fair implementations:");
oneRun(new SynchronousQueue<Integer>(true), producers, iters); oneRun(new SynchronousQueue<Integer>(true), producers, iters);
if (print)
System.out.print("ArrayBlockingQueue(fair)");
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
} }
...@@ -174,6 +166,8 @@ public class MultipleProducersSingleConsumerLoops { ...@@ -174,6 +166,8 @@ public class MultipleProducersSingleConsumerLoops {
} }
static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception { static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception {
if (print)
System.out.printf("%-18s", q.getClass().getSimpleName());
LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer); CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer);
for (int i = 0; i < nproducers; ++i) { for (int i = 0; i < nproducers; ++i) {
......
...@@ -34,81 +34,135 @@ ...@@ -34,81 +34,135 @@
/* /*
* @test * @test
* @bug 6805775 6815766 * @bug 6805775 6815766
* @run main OfferDrainToLoops 300
* @summary Test concurrent offer vs. drainTo * @summary Test concurrent offer vs. drainTo
*/ */
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
public class OfferDrainToLoops { public class OfferDrainToLoops {
final long testDurationMillisDefault = 10L * 1000L;
final long testDurationMillis;
OfferDrainToLoops(String[] args) {
testDurationMillis = (args.length > 0) ?
Long.valueOf(args[0]) : testDurationMillisDefault;
}
void checkNotContainsNull(Iterable it) { void checkNotContainsNull(Iterable it) {
for (Object x : it) for (Object x : it)
check(x != null); check(x != null);
} }
abstract class CheckedThread extends Thread {
abstract protected void realRun();
public void run() {
try { realRun(); } catch (Throwable t) { unexpected(t); }
}
{
setDaemon(true);
start();
}
}
void test(String[] args) throws Throwable { void test(String[] args) throws Throwable {
test(new LinkedBlockingQueue()); test(new LinkedBlockingQueue());
test(new LinkedBlockingQueue(2000)); test(new LinkedBlockingQueue(2000));
test(new LinkedBlockingDeque()); test(new LinkedBlockingDeque());
test(new LinkedBlockingDeque(2000)); test(new LinkedBlockingDeque(2000));
test(new ArrayBlockingQueue(2000)); test(new ArrayBlockingQueue(2000));
// test(new LinkedTransferQueue());
}
Random getRandom() {
return new Random();
// return ThreadLocalRandom.current();
} }
void test(final BlockingQueue q) throws Throwable { void test(final BlockingQueue q) throws Throwable {
System.out.println(q.getClass().getSimpleName()); System.out.println(q.getClass().getSimpleName());
final long testDurationSeconds = 1L; final long testDurationNanos = testDurationMillis * 1000L * 1000L;
final long testDurationMillis = testDurationSeconds * 1000L; final long quittingTimeNanos = System.nanoTime() + testDurationNanos;
final long quittingTimeNanos final long timeoutMillis = 10L * 1000L;
= System.nanoTime() + testDurationSeconds * 1000L * 1000L * 1000L;
Thread offerer = new CheckedThread() { /** Poor man's bounded buffer. */
final AtomicLong approximateCount = new AtomicLong(0L);
abstract class CheckedThread extends Thread {
CheckedThread(String name) {
super(name);
setDaemon(true);
start();
}
/** Polls for quitting time. */
protected boolean quittingTime() {
return System.nanoTime() - quittingTimeNanos > 0;
}
/** Polls occasionally for quitting time. */
protected boolean quittingTime(long i) {
return (i % 1024) == 0 && quittingTime();
}
abstract protected void realRun();
public void run() {
try { realRun(); } catch (Throwable t) { unexpected(t); }
}
}
Thread offerer = new CheckedThread("offerer") {
protected void realRun() { protected void realRun() {
for (long i = 0; ; i++) { long c = 0;
if ((i % 1024) == 0 && for (long i = 0; ! quittingTime(i); i++) {
System.nanoTime() - quittingTimeNanos > 0) if (q.offer(c)) {
break; if ((++c % 1024) == 0) {
while (! q.offer(i)) approximateCount.getAndAdd(1024);
while (approximateCount.get() > 10000)
Thread.yield();
}
} else {
Thread.yield(); Thread.yield();
}}}; }}}};
Thread drainer = new CheckedThread() { Thread drainer = new CheckedThread("drainer") {
protected void realRun() { protected void realRun() {
for (long i = 0; ; i++) { final Random rnd = getRandom();
if (System.nanoTime() - quittingTimeNanos > 0) while (! quittingTime()) {
break;
List list = new ArrayList(); List list = new ArrayList();
int n = q.drainTo(list); int n = rnd.nextBoolean() ?
q.drainTo(list) :
q.drainTo(list, 100);
approximateCount.getAndAdd(-n);
equal(list.size(), n); equal(list.size(), n);
for (int j = 0; j < n - 1; j++) for (int j = 0; j < n - 1; j++)
equal((Long) list.get(j) + 1L, list.get(j + 1)); equal((Long) list.get(j) + 1L, list.get(j + 1));
Thread.yield(); Thread.yield();
}}}; }
q.clear();
approximateCount.set(0); // Releases waiting offerer thread
}};
Thread scanner = new CheckedThread() { Thread scanner = new CheckedThread("scanner") {
protected void realRun() { protected void realRun() {
for (long i = 0; ; i++) { final Random rnd = getRandom();
if (System.nanoTime() - quittingTimeNanos > 0) while (! quittingTime()) {
switch (rnd.nextInt(3)) {
case 0: checkNotContainsNull(q); break;
case 1: q.size(); break;
case 2:
Long[] a = (Long[]) q.toArray(new Long[0]);
int n = a.length;
for (int j = 0; j < n - 1; j++) {
check(a[j] < a[j+1]);
check(a[j] != null);
}
break; break;
checkNotContainsNull(q); }
Thread.yield(); Thread.yield();
}}}; }}};
offerer.join(10 * testDurationMillis); for (Thread thread : new Thread[] { offerer, drainer, scanner }) {
drainer.join(10 * testDurationMillis); thread.join(timeoutMillis + testDurationMillis);
check(! offerer.isAlive()); if (thread.isAlive()) {
check(! drainer.isAlive()); System.err.printf("Hung thread: %s%n", thread.getName());
failed++;
for (StackTraceElement e : thread.getStackTrace())
System.err.println(e);
// Kludge alert
thread.stop();
thread.join(timeoutMillis);
}
}
} }
//--------------------- Infrastructure --------------------------- //--------------------- Infrastructure ---------------------------
...@@ -122,7 +176,7 @@ public class OfferDrainToLoops { ...@@ -122,7 +176,7 @@ public class OfferDrainToLoops {
if (x == null ? y == null : x.equals(y)) pass(); if (x == null ? y == null : x.equals(y)) pass();
else fail(x + " not equal to " + y);} else fail(x + " not equal to " + y);}
public static void main(String[] args) throws Throwable { public static void main(String[] args) throws Throwable {
new OfferDrainToLoops().instanceMain(args);} new OfferDrainToLoops(args).instanceMain(args);}
public void instanceMain(String[] args) throws Throwable { public void instanceMain(String[] args) throws Throwable {
try {test(args);} catch (Throwable t) {unexpected(t);} try {test(args);} catch (Throwable t) {unexpected(t);}
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
......
...@@ -46,6 +46,7 @@ public class PollMemoryLeak { ...@@ -46,6 +46,7 @@ public class PollMemoryLeak {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
final BlockingQueue[] qs = { final BlockingQueue[] qs = {
new LinkedBlockingQueue(10), new LinkedBlockingQueue(10),
// new LinkedTransferQueue(),
new ArrayBlockingQueue(10), new ArrayBlockingQueue(10),
new SynchronousQueue(), new SynchronousQueue(),
new SynchronousQueue(true), new SynchronousQueue(true),
......
...@@ -77,7 +77,8 @@ public class ProducerConsumerLoops { ...@@ -77,7 +77,8 @@ public class ProducerConsumerLoops {
print = true; print = true;
for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) { for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
System.out.println("Pairs:" + i); System.out.println("----------------------------------------");
System.out.println("Pairs: " + i);
oneTest(i, iters); oneTest(i, iters);
Thread.sleep(100); Thread.sleep(100);
} }
...@@ -87,28 +88,17 @@ public class ProducerConsumerLoops { ...@@ -87,28 +88,17 @@ public class ProducerConsumerLoops {
} }
static void oneTest(int pairs, int iters) throws Exception { static void oneTest(int pairs, int iters) throws Exception {
if (print)
System.out.print("ArrayBlockingQueue ");
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
if (print)
System.out.print("LinkedBlockingQueue ");
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters); oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
if (print) // oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
System.out.print("PriorityBlockingQueue ");
oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters); oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters);
if (print)
System.out.print("SynchronousQueue ");
oneRun(new SynchronousQueue<Integer>(), pairs, iters); oneRun(new SynchronousQueue<Integer>(), pairs, iters);
if (print) if (print)
System.out.print("SynchronousQueue(fair) "); System.out.println("fair implementations:");
oneRun(new SynchronousQueue<Integer>(true), pairs, iters);
if (print) oneRun(new SynchronousQueue<Integer>(true), pairs, iters);
System.out.print("ArrayBlockingQueue(fair)");
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, iters);
} }
...@@ -174,6 +164,8 @@ public class ProducerConsumerLoops { ...@@ -174,6 +164,8 @@ public class ProducerConsumerLoops {
} }
static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception { static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
if (print)
System.out.printf("%-18s", q.getClass().getSimpleName());
LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer); CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
for (int i = 0; i < npairs; ++i) { for (int i = 0; i < npairs; ++i) {
......
...@@ -63,7 +63,8 @@ public class SingleProducerMultipleConsumerLoops { ...@@ -63,7 +63,8 @@ public class SingleProducerMultipleConsumerLoops {
print = true; print = true;
for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) { for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
System.out.println("Consumers:" + i); System.out.println("----------------------------------------");
System.out.println("Consumers: " + i);
oneTest(i, iters); oneTest(i, iters);
Thread.sleep(100); Thread.sleep(100);
} }
...@@ -73,28 +74,15 @@ public class SingleProducerMultipleConsumerLoops { ...@@ -73,28 +74,15 @@ public class SingleProducerMultipleConsumerLoops {
} }
static void oneTest(int consumers, int iters) throws Exception { static void oneTest(int consumers, int iters) throws Exception {
if (print)
System.out.print("ArrayBlockingQueue ");
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
if (print)
System.out.print("LinkedBlockingQueue ");
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters); oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters);
if (print) // oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
System.out.print("PriorityBlockingQueue ");
oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters); oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters);
if (print)
System.out.print("SynchronousQueue ");
oneRun(new SynchronousQueue<Integer>(), consumers, iters); oneRun(new SynchronousQueue<Integer>(), consumers, iters);
if (print) if (print)
System.out.print("SynchronousQueue(fair) "); System.out.println("fair implementations:");
oneRun(new SynchronousQueue<Integer>(true), consumers, iters); oneRun(new SynchronousQueue<Integer>(true), consumers, iters);
if (print)
System.out.print("ArrayBlockingQueue(fair)");
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters);
} }
...@@ -163,6 +151,8 @@ public class SingleProducerMultipleConsumerLoops { ...@@ -163,6 +151,8 @@ public class SingleProducerMultipleConsumerLoops {
} }
static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception { static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception {
if (print)
System.out.printf("%-18s", q.getClass().getSimpleName());
LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer); CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer);
pool.execute(new Producer(q, barrier, iters * nconsumers)); pool.execute(new Producer(q, barrier, iters * nconsumers));
......
...@@ -60,20 +60,10 @@ public class ConcurrentQueueLoops { ...@@ -60,20 +60,10 @@ public class ConcurrentQueueLoops {
//queues.add(new ArrayBlockingQueue<Integer>(count, true)); //queues.add(new ArrayBlockingQueue<Integer>(count, true));
queues.add(new LinkedBlockingQueue<Integer>()); queues.add(new LinkedBlockingQueue<Integer>());
queues.add(new LinkedBlockingDeque<Integer>()); queues.add(new LinkedBlockingDeque<Integer>());
// queues.add(new LinkedTransferQueue<Integer>());
try {
queues.add((Queue<Integer>)
Class.forName("java.util.concurrent.LinkedTransferQueue")
.newInstance());
} catch (IllegalAccessException e) {
} catch (InstantiationException e) {
} catch (ClassNotFoundException e) {
// OK; not yet added to JDK
}
// Following additional implementations are available from: // Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new LinkedTransferQueue<Integer>());
// queues.add(new SynchronizedLinkedListQueue<Integer>()); // queues.add(new SynchronizedLinkedListQueue<Integer>());
// Avoid "first fast, second slow" benchmark effect. // Avoid "first fast, second slow" benchmark effect.
......
...@@ -41,8 +41,9 @@ ...@@ -41,8 +41,9 @@
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
// import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.PriorityQueue; import java.util.PriorityQueue;
...@@ -69,20 +70,10 @@ public class GCRetention { ...@@ -69,20 +70,10 @@ public class GCRetention {
queues.add(new PriorityBlockingQueue<Boolean>()); queues.add(new PriorityBlockingQueue<Boolean>());
queues.add(new PriorityQueue<Boolean>()); queues.add(new PriorityQueue<Boolean>());
queues.add(new LinkedList<Boolean>()); queues.add(new LinkedList<Boolean>());
// queues.add(new LinkedTransferQueue<Boolean>());
try {
queues.add((Queue<Boolean>)
Class.forName("java.util.concurrent.LinkedTransferQueue")
.newInstance());
} catch (IllegalAccessException e) {
} catch (InstantiationException e) {
} catch (ClassNotFoundException e) {
// OK; not yet added to JDK
}
// Following additional implementations are available from: // Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new LinkedTransferQueue<Boolean>());
// queues.add(new SynchronizedLinkedListQueue<Boolean>()); // queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect. // Avoid "first fast, second slow" benchmark effect.
......
...@@ -49,6 +49,7 @@ public class IteratorWeakConsistency { ...@@ -49,6 +49,7 @@ public class IteratorWeakConsistency {
test(new LinkedBlockingDeque()); test(new LinkedBlockingDeque());
test(new LinkedBlockingDeque(20)); test(new LinkedBlockingDeque(20));
test(new ConcurrentLinkedQueue()); test(new ConcurrentLinkedQueue());
// test(new LinkedTransferQueue());
// Other concurrent queues (e.g. ArrayBlockingQueue) do not // Other concurrent queues (e.g. ArrayBlockingQueue) do not
// currently have weakly consistent iterators. // currently have weakly consistent iterators.
// test(new ArrayBlockingQueue(20)); // test(new ArrayBlockingQueue(20));
......
...@@ -23,63 +23,136 @@ ...@@ -23,63 +23,136 @@
/* /*
* @test * @test
* @bug 6316155 6595669 * @bug 6316155 6595669 6871697 6868712
* @summary Test concurrent offer vs. remove * @summary Test concurrent offer vs. remove
* @run main OfferRemoveLoops 300
* @author Martin Buchholz * @author Martin Buchholz
*/ */
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
public class OfferRemoveLoops { public class OfferRemoveLoops {
final long testDurationMillisDefault = 10L * 1000L;
final long testDurationMillis;
OfferRemoveLoops(String[] args) {
testDurationMillis = (args.length > 0) ?
Long.valueOf(args[0]) : testDurationMillisDefault;
}
void checkNotContainsNull(Iterable it) {
for (Object x : it)
check(x != null);
}
void test(String[] args) throws Throwable { void test(String[] args) throws Throwable {
testQueue(new LinkedBlockingQueue<String>(10)); testQueue(new LinkedBlockingQueue(10));
testQueue(new LinkedBlockingQueue<String>()); testQueue(new LinkedBlockingQueue());
testQueue(new LinkedBlockingDeque<String>(10)); testQueue(new LinkedBlockingDeque(10));
testQueue(new LinkedBlockingDeque<String>()); testQueue(new LinkedBlockingDeque());
testQueue(new ArrayBlockingQueue<String>(10)); testQueue(new ArrayBlockingQueue(10));
testQueue(new PriorityBlockingQueue<String>(10)); testQueue(new PriorityBlockingQueue(10));
testQueue(new ConcurrentLinkedQueue<String>()); testQueue(new ConcurrentLinkedQueue());
// testQueue(new LinkedTransferQueue());
} }
abstract class CheckedThread extends Thread { Random getRandom() {
abstract protected void realRun(); return new Random();
public void run() { // return ThreadLocalRandom.current();
try { realRun(); } catch (Throwable t) { unexpected(t); }
}
} }
void testQueue(final Queue<String> q) throws Throwable { void testQueue(final Queue q) throws Throwable {
System.out.println(q.getClass().getSimpleName()); System.out.println(q.getClass().getSimpleName());
final int count = 1000 * 1000; final long testDurationNanos = testDurationMillis * 1000L * 1000L;
final long testDurationSeconds = 1L; final long quittingTimeNanos = System.nanoTime() + testDurationNanos;
final long testDurationMillis = testDurationSeconds * 1000L; final long timeoutMillis = 10L * 1000L;
final long quittingTimeNanos final int maxChunkSize = 1042;
= System.nanoTime() + testDurationSeconds * 1000L * 1000L * 1000L; final int maxQueueSize = 10 * maxChunkSize;
Thread t1 = new CheckedThread() {
/** Poor man's bounded buffer. */
final AtomicLong approximateCount = new AtomicLong(0L);
abstract class CheckedThread extends Thread {
CheckedThread(String name) {
super(name);
setDaemon(true);
start();
}
/** Polls for quitting time. */
protected boolean quittingTime() {
return System.nanoTime() - quittingTimeNanos > 0;
}
/** Polls occasionally for quitting time. */
protected boolean quittingTime(long i) {
return (i % 1024) == 0 && quittingTime();
}
abstract protected void realRun();
public void run() {
try { realRun(); } catch (Throwable t) { unexpected(t); }
}
}
Thread offerer = new CheckedThread("offerer") {
protected void realRun() { protected void realRun() {
for (int i = 0; i < count; i++) { final long chunkSize = getRandom().nextInt(maxChunkSize) + 2;
if ((i % 1024) == 0 && long c = 0;
System.nanoTime() - quittingTimeNanos > 0) for (long i = 0; ! quittingTime(i); i++) {
return; if (q.offer(Long.valueOf(c))) {
while (! q.remove(String.valueOf(i))) if ((++c % chunkSize) == 0) {
approximateCount.getAndAdd(chunkSize);
while (approximateCount.get() > maxQueueSize)
Thread.yield();
}
} else {
Thread.yield(); Thread.yield();
}}}; }}}};
Thread t2 = new CheckedThread() {
Thread remover = new CheckedThread("remover") {
protected void realRun() { protected void realRun() {
for (int i = 0; i < count; i++) { final long chunkSize = getRandom().nextInt(maxChunkSize) + 2;
if ((i % 1024) == 0 && long c = 0;
System.nanoTime() - quittingTimeNanos > 0) for (long i = 0; ! quittingTime(i); i++) {
return; if (q.remove(Long.valueOf(c))) {
while (! q.offer(String.valueOf(i))) if ((++c % chunkSize) == 0) {
approximateCount.getAndAdd(-chunkSize);
}
} else {
Thread.yield(); Thread.yield();
}}}; }
t1.setDaemon(true); t2.setDaemon(true); }
t1.start(); t2.start(); q.clear();
t1.join(10 * testDurationMillis); approximateCount.set(0); // Releases waiting offerer thread
t2.join(10 * testDurationMillis); }};
check(! t1.isAlive());
check(! t2.isAlive()); Thread scanner = new CheckedThread("scanner") {
protected void realRun() {
final Random rnd = getRandom();
while (! quittingTime()) {
switch (rnd.nextInt(3)) {
case 0: checkNotContainsNull(q); break;
case 1: q.size(); break;
case 2: checkNotContainsNull
(Arrays.asList(q.toArray(new Long[0])));
break;
}
Thread.yield();
}}};
for (Thread thread : new Thread[] { offerer, remover, scanner }) {
thread.join(timeoutMillis + testDurationMillis);
if (thread.isAlive()) {
System.err.printf("Hung thread: %s%n", thread.getName());
failed++;
for (StackTraceElement e : thread.getStackTrace())
System.err.println(e);
// Kludge alert
thread.stop();
thread.join(timeoutMillis);
}
}
} }
//--------------------- Infrastructure --------------------------- //--------------------- Infrastructure ---------------------------
...@@ -93,7 +166,7 @@ public class OfferRemoveLoops { ...@@ -93,7 +166,7 @@ public class OfferRemoveLoops {
if (x == null ? y == null : x.equals(y)) pass(); if (x == null ? y == null : x.equals(y)) pass();
else fail(x + " not equal to " + y);} else fail(x + " not equal to " + y);}
public static void main(String[] args) throws Throwable { public static void main(String[] args) throws Throwable {
new OfferRemoveLoops().instanceMain(args);} new OfferRemoveLoops(args).instanceMain(args);}
public void instanceMain(String[] args) throws Throwable { public void instanceMain(String[] args) throws Throwable {
try {test(args);} catch (Throwable t) {unexpected(t);} try {test(args);} catch (Throwable t) {unexpected(t);}
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
......
...@@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; ...@@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
// import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
...@@ -66,20 +67,10 @@ public class RemovePollRace { ...@@ -66,20 +67,10 @@ public class RemovePollRace {
queues.add(new ArrayBlockingQueue<Boolean>(count, true)); queues.add(new ArrayBlockingQueue<Boolean>(count, true));
queues.add(new LinkedBlockingQueue<Boolean>()); queues.add(new LinkedBlockingQueue<Boolean>());
queues.add(new LinkedBlockingDeque<Boolean>()); queues.add(new LinkedBlockingDeque<Boolean>());
// queues.add(new LinkedTransferQueue<Boolean>());
try {
queues.add((Queue<Boolean>)
Class.forName("java.util.concurrent.LinkedTransferQueue")
.newInstance());
} catch (IllegalAccessException e) {
} catch (InstantiationException e) {
} catch (ClassNotFoundException e) {
// OK; not yet added to JDK
}
// Following additional implementations are available from: // Following additional implementations are available from:
// http://gee.cs.oswego.edu/dl/concurrency-interest/index.html // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
// queues.add(new LinkedTransferQueue<Boolean>());
// queues.add(new SynchronizedLinkedListQueue<Boolean>()); // queues.add(new SynchronizedLinkedListQueue<Boolean>());
// Avoid "first fast, second slow" benchmark effect. // Avoid "first fast, second slow" benchmark effect.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册