提交 d876f750 编写于 作者: M Matteo Merli 提交者: GitHub

Merge pull request #2 from merlimat/master

Configure Travis CI build
language: java
jdk:
- oraclejdk8
cache:
directories:
- $HOME/.m2
......@@ -15,15 +15,15 @@
*/
package org.apache.bookkeeper.mledger.impl;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
......@@ -47,7 +47,6 @@ import com.google.common.collect.Lists;
public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
Executor executor = Executors.newCachedThreadPool();
private static final Logger log = LoggerFactory.getLogger(ManagedCursorConcurrencyTest.class);
private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback() {
......@@ -208,7 +207,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
assertEquals(closeFuture.get(), CLOSED);
}
@Test
@Test(timeOut = 30000)
public void testAckAndClose() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_test_ack_and_close",
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
......@@ -226,51 +225,44 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);
Thread deleter = new Thread() {
public void run() {
try {
barrier.await();
// Deleter thread
cachedExecutor.execute(() -> {
try {
barrier.await();
for (Position position : addedEntries) {
cursor.asyncDelete(position, deleteCallback, position);
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (Position position : addedEntries) {
cursor.asyncDelete(position, deleteCallback, position);
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
};
});
Thread reader = new Thread() {
public void run() {
try {
barrier.await();
// Reader thread
cachedExecutor.execute(() -> {
try {
barrier.await();
for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(e -> e.release());
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(e -> e.release());
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
};
System.out.println("starting deleter and reader.." + System.currentTimeMillis());
deleter.start();
reader.start();
});
counter.await();
assertEquals(gotException.get(), false);
System.out.println("Finished.." + System.currentTimeMillis());
}
@Test
@Test(timeOut = 30000)
public void testConcurrentIndividualDeletes() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(100));
......@@ -291,24 +283,22 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
for (int thread = 0; thread < Threads; thread++) {
final int myThread = thread;
executor.execute(new Runnable() {
public void run() {
try {
barrier.await();
for (int i = 0; i < N; i++) {
int threadId = i % Threads;
if (threadId == myThread) {
cursor.delete(addedEntries.get(i));
}
}
cachedExecutor.execute(() -> {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (int i = 0; i < N; i++) {
int threadId = i % Threads;
if (threadId == myThread) {
cursor.delete(addedEntries.get(i));
}
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
});
}
......@@ -319,7 +309,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
assertEquals(cursor.getMarkDeletedPosition(), addedEntries.get(addedEntries.size() - 1));
}
@Test
@Test(timeOut = 30000)
public void testConcurrentReadOfSameEntry() throws Exception {
ManagedLedger ledger = factory.open("testConcurrentReadOfSameEntry", new ManagedLedgerConfig());
final int numCursors = 5;
......@@ -346,7 +336,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
for (int i = 0; i < numCursors; i++) {
final int cursorIndex = i;
final ManagedCursor cursor = cursors.get(cursorIndex);
executor.execute(() -> {
cachedExecutor.execute(() -> {
try {
barrier.await();
for (int j = 0; j < N; j++) {
......@@ -369,7 +359,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
assertNull(result.get());
}
@Test
@Test(timeOut = 30000)
public void testConcurrentIndividualDeletesWithGetNthEntry() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(100).setThrottleMarkDelete(0.5));
......@@ -390,12 +380,12 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
final AtomicInteger iteration = new AtomicInteger(0);
for (int i = 0; i < deleteEntries; i++) {
executor.execute(() -> {
executor.submit(safeRun(() -> {
try {
cursor.asyncDelete(addedEntries.get(iteration.getAndIncrement()), new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
log.info("Successfully deleted cursor");
// Ok
}
@Override
......@@ -410,8 +400,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
} finally {
counter.countDown();
}
});
}));
}
counter.await();
......@@ -426,6 +415,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
public void readEntryComplete(Entry entry, Object ctx) {
successReadEntries.getAndIncrement();
entry.release();
readCounter.countDown();
}
@Override
......@@ -437,14 +427,12 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
readCounter.countDown();
}
}
readCounter.await();
assertEquals(gotException.get(), false);
assertEquals(readEntries, successReadEntries.get());
assertFalse(gotException.get());
assertEquals(successReadEntries.get(), readEntries);
}
}
......@@ -16,6 +16,8 @@
package org.apache.bookkeeper.test;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.MockBookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
......@@ -47,6 +49,7 @@ public abstract class MockedBookKeeperTestCase {
protected ClientConfiguration baseClientConf = new ClientConfiguration();
protected OrderedSafeExecutor executor;
protected ExecutorService cachedExecutor;
public MockedBookKeeperTestCase() {
// By default start a 3 bookies cluster
......@@ -69,19 +72,19 @@ public abstract class MockedBookKeeperTestCase {
}
executor = new OrderedSafeExecutor(2, "test");
cachedExecutor = Executors.newCachedThreadPool();
factory = new ManagedLedgerFactoryImpl(bkc, zkc);
}
@AfterMethod
public void tearDown(Method method) throws Exception {
LOG.info("@@@@@@@@@ stopping " + method);
System.gc();
factory.shutdown();
factory = null;
stopBookKeeper();
stopZooKeeper();
executor.shutdown();
System.gc();
cachedExecutor.shutdown();
LOG.info("--------- stopped {}", method);
}
......
......@@ -15,16 +15,17 @@
*/
package com.yahoo.pulsar.zookeeper;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.KeeperException.Code;
......@@ -33,7 +34,6 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
......@@ -103,35 +103,34 @@ public class ZookeeperCacheTest {
ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
ZooKeeperCacheListener<Set<String>> counter = new ZooKeeperCacheListener<Set<String>>() {
private int callbackCount = 0;
@Override
public void onUpdate(String path, Set<String> data, Stat stat) {
++callbackCount;
}
public String toString() {
return String.valueOf(callbackCount);
}
AtomicInteger notificationCount = new AtomicInteger(0);
ZooKeeperCacheListener<Set<String>> counter = (path, data, stat) -> {
notificationCount.incrementAndGet();
};
// Register counter twice and unregister once, so callback should be counted correctly
cache.registerListener(counter);
cache.registerListener(counter);
cache.unregisterListener(counter);
assertEquals(counter.toString(), "0");
assertEquals(notificationCount.get(), 0);
assertEquals(cache.get(), Sets.newTreeSet());
zkClient.create("/test/z1", new byte[0], null, null);
zkClient.create("/test/z2", new byte[0], null, null);
Thread.sleep(20);
// Wait for cache to be updated in background
while (notificationCount.get() < 2) {
Thread.sleep(1);
}
assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1", "z2")));
assertEquals(cache.get("/test"), new TreeSet<String>(Lists.newArrayList("z1", "z2")));
assertEquals(counter.toString(), "2");
assertEquals(notificationCount.get(), 2);
zkClient.delete("/test/z2", -1);
Thread.sleep(20);
while (notificationCount.get() < 3) {
Thread.sleep(1);
}
assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1")));
assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1")));
......@@ -145,7 +144,7 @@ public class ZookeeperCacheTest {
// Ok
}
assertEquals(counter.toString(), "3");
assertEquals(notificationCount.get(), 3);
}
@Test
......@@ -222,17 +221,9 @@ public class ZookeeperCacheTest {
};
// Create callback counter
ZooKeeperCacheListener<String> counter = new ZooKeeperCacheListener<String>() {
private int callbackCount = 0;
@Override
public void onUpdate(String path, String data, Stat stat) {
++callbackCount;
}
public String toString() {
return String.valueOf(callbackCount);
}
AtomicInteger notificationCount = new AtomicInteger(0);
ZooKeeperCacheListener<String> counter = (path, data, stat) -> {
notificationCount.incrementAndGet();
};
// Register counter twice and unregister once, so callback should be counted correctly
......@@ -248,19 +239,21 @@ public class ZookeeperCacheTest {
String newValue = "test2";
// case 1: update and create znode directly and verify that the cache is retrieving the correct data
assertEquals(counter.toString(), "0");
assertEquals(notificationCount.get(), 0);
zkClient.setData("/my_test", newValue.getBytes(), -1);
zkClient.create("/my_test2", value.getBytes(), null, null);
// Wait for the watch to be triggered
Thread.sleep(100);
while (notificationCount.get() < 1) {
Thread.sleep(1);
}
// retrieve the data from the cache and verify it is the updated/new data
assertEquals(zkCache.get("/my_test"), newValue);
assertEquals(zkCache.get("/my_test2"), value);
// The callback method should be called just only once
assertEquals(counter.toString(), "1");
assertEquals(notificationCount.get(), 1);
// case 2: force the ZooKeeper session to be expired and verify that the data is still accessible
zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
......@@ -283,16 +276,12 @@ public class ZookeeperCacheTest {
// case 4: directly delete the znode while the session is not re-connected yet. Verify that the deletion is not
// seen by the cache
zkClient.failAfter(-1, Code.OK);
zkClient.delete("/my_test2", -1, (rc, path, ctx) -> {
}, null);
zkClient.delete("/my_test2", -1);
// Make sure it has not been updated yet
assertEquals(zkCache.get("/my_test2"), value);
zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.SyncConnected, null));
assertEquals(zkCache.get("/other"), newValue);
// add sleep to avoid sporadic failures
Thread.sleep(100);
// Make sure that the value is now directly from ZK and deleted
try {
zkCache.get("/my_test2");
......@@ -315,6 +304,6 @@ public class ZookeeperCacheTest {
executor.shutdown();
// Update shouldn't happen after the last check
assertEquals(counter.toString(), "1");
assertEquals(notificationCount.get(), 1);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册