From 171a993f59894ffd8bee5178c69456af571fc005 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 7 Sep 2016 12:38:34 -0700 Subject: [PATCH] Fixed concurrent tests intermittently failing --- .../impl/ManagedCursorConcurrencyTest.java | 114 ++++++++---------- .../test/MockedBookKeeperTestCase.java | 7 +- 2 files changed, 56 insertions(+), 65 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java index d9f1b9ab53b..35b2538916c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java @@ -15,15 +15,15 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -47,7 +47,6 @@ import com.google.common.collect.Lists; public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { - Executor executor = Executors.newCachedThreadPool(); private static final Logger log = LoggerFactory.getLogger(ManagedCursorConcurrencyTest.class); private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback() { @@ -208,7 +207,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { assertEquals(closeFuture.get(), CLOSED); } - @Test + @Test(timeOut = 30000) public void testAckAndClose() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger_test_ack_and_close", new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); @@ -226,51 +225,44 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { final CountDownLatch counter = new CountDownLatch(2); final AtomicBoolean gotException = new AtomicBoolean(false); - Thread deleter = new Thread() { - public void run() { - try { - barrier.await(); + // Deleter thread + cachedExecutor.execute(() -> { + try { + barrier.await(); - for (Position position : addedEntries) { - cursor.asyncDelete(position, deleteCallback, position); - } - } catch (Exception e) { - e.printStackTrace(); - gotException.set(true); - } finally { - counter.countDown(); + for (Position position : addedEntries) { + cursor.asyncDelete(position, deleteCallback, position); } + } catch (Exception e) { + e.printStackTrace(); + gotException.set(true); + } finally { + counter.countDown(); } - }; + }); - Thread reader = new Thread() { - public void run() { - try { - barrier.await(); + // Reader thread + cachedExecutor.execute(() -> { + try { + barrier.await(); - for (int i = 0; i < 1000; i++) { - cursor.readEntries(1).forEach(e -> e.release()); - } - } catch (Exception e) { - e.printStackTrace(); - gotException.set(true); - } finally { - counter.countDown(); + for (int i = 0; i < 1000; i++) { + cursor.readEntries(1).forEach(e -> e.release()); } + } catch (Exception e) { + e.printStackTrace(); + gotException.set(true); + } finally { + counter.countDown(); } - }; - System.out.println("starting deleter and reader.." + System.currentTimeMillis()); - deleter.start(); - reader.start(); + }); counter.await(); assertEquals(gotException.get(), false); - System.out.println("Finished.." + System.currentTimeMillis()); - } - @Test + @Test(timeOut = 30000) public void testConcurrentIndividualDeletes() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(100)); @@ -291,24 +283,22 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { for (int thread = 0; thread < Threads; thread++) { final int myThread = thread; - executor.execute(new Runnable() { - public void run() { - try { - barrier.await(); - - for (int i = 0; i < N; i++) { - int threadId = i % Threads; - if (threadId == myThread) { - cursor.delete(addedEntries.get(i)); - } - } + cachedExecutor.execute(() -> { + try { + barrier.await(); - } catch (Exception e) { - e.printStackTrace(); - gotException.set(true); - } finally { - counter.countDown(); + for (int i = 0; i < N; i++) { + int threadId = i % Threads; + if (threadId == myThread) { + cursor.delete(addedEntries.get(i)); + } } + + } catch (Exception e) { + e.printStackTrace(); + gotException.set(true); + } finally { + counter.countDown(); } }); } @@ -319,7 +309,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { assertEquals(cursor.getMarkDeletedPosition(), addedEntries.get(addedEntries.size() - 1)); } - @Test + @Test(timeOut = 30000) public void testConcurrentReadOfSameEntry() throws Exception { ManagedLedger ledger = factory.open("testConcurrentReadOfSameEntry", new ManagedLedgerConfig()); final int numCursors = 5; @@ -346,7 +336,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { for (int i = 0; i < numCursors; i++) { final int cursorIndex = i; final ManagedCursor cursor = cursors.get(cursorIndex); - executor.execute(() -> { + cachedExecutor.execute(() -> { try { barrier.await(); for (int j = 0; j < N; j++) { @@ -369,7 +359,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { assertNull(result.get()); } - @Test + @Test(timeOut = 30000) public void testConcurrentIndividualDeletesWithGetNthEntry() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(100).setThrottleMarkDelete(0.5)); @@ -390,12 +380,12 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { final AtomicInteger iteration = new AtomicInteger(0); for (int i = 0; i < deleteEntries; i++) { - executor.execute(() -> { + executor.submit(safeRun(() -> { try { cursor.asyncDelete(addedEntries.get(iteration.getAndIncrement()), new DeleteCallback() { @Override public void deleteComplete(Object ctx) { - log.info("Successfully deleted cursor"); + // Ok } @Override @@ -410,8 +400,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { } finally { counter.countDown(); } - - }); + })); } counter.await(); @@ -426,6 +415,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { public void readEntryComplete(Entry entry, Object ctx) { successReadEntries.getAndIncrement(); entry.release(); + readCounter.countDown(); } @Override @@ -437,14 +427,12 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { } catch (Exception e) { e.printStackTrace(); gotException.set(true); - } finally { - readCounter.countDown(); } } readCounter.await(); - assertEquals(gotException.get(), false); - assertEquals(readEntries, successReadEntries.get()); + assertFalse(gotException.get()); + assertEquals(successReadEntries.get(), readEntries); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 8063a9b9d0e..1e1091416b4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -16,6 +16,8 @@ package org.apache.bookkeeper.test; import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.bookkeeper.client.MockBookKeeper; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -47,6 +49,7 @@ public abstract class MockedBookKeeperTestCase { protected ClientConfiguration baseClientConf = new ClientConfiguration(); protected OrderedSafeExecutor executor; + protected ExecutorService cachedExecutor; public MockedBookKeeperTestCase() { // By default start a 3 bookies cluster @@ -69,19 +72,19 @@ public abstract class MockedBookKeeperTestCase { } executor = new OrderedSafeExecutor(2, "test"); + cachedExecutor = Executors.newCachedThreadPool(); factory = new ManagedLedgerFactoryImpl(bkc, zkc); } @AfterMethod public void tearDown(Method method) throws Exception { LOG.info("@@@@@@@@@ stopping " + method); - System.gc(); factory.shutdown(); factory = null; stopBookKeeper(); stopZooKeeper(); executor.shutdown(); - System.gc(); + cachedExecutor.shutdown(); LOG.info("--------- stopped {}", method); } -- GitLab