Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
171a993f
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
171a993f
编写于
9月 07, 2016
作者:
M
Matteo Merli
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fixed concurrent tests intermittently failing
上级
d79c33f2
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
56 addition
and
65 deletion
+56
-65
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
...bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
+51
-63
managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
.../org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+5
-2
未找到文件。
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
浏览文件 @
171a993f
...
...
@@ -15,15 +15,15 @@
*/
package
org.apache.bookkeeper.mledger.impl
;
import
static
org
.
apache
.
bookkeeper
.
mledger
.
util
.
SafeRun
.
safeRun
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertFalse
;
import
static
org
.
testng
.
Assert
.
assertNull
;
import
java.util.List
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.CyclicBarrier
;
import
java.util.concurrent.Executor
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.concurrent.atomic.AtomicReference
;
...
...
@@ -47,7 +47,6 @@ import com.google.common.collect.Lists;
public
class
ManagedCursorConcurrencyTest
extends
MockedBookKeeperTestCase
{
Executor
executor
=
Executors
.
newCachedThreadPool
();
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
ManagedCursorConcurrencyTest
.
class
);
private
final
AsyncCallbacks
.
DeleteCallback
deleteCallback
=
new
AsyncCallbacks
.
DeleteCallback
()
{
...
...
@@ -208,7 +207,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
assertEquals
(
closeFuture
.
get
(),
CLOSED
);
}
@Test
@Test
(
timeOut
=
30000
)
public
void
testAckAndClose
()
throws
Exception
{
ManagedLedger
ledger
=
factory
.
open
(
"my_test_ledger_test_ack_and_close"
,
new
ManagedLedgerConfig
().
setMaxEntriesPerLedger
(
2
));
...
...
@@ -226,51 +225,44 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
final
CountDownLatch
counter
=
new
CountDownLatch
(
2
);
final
AtomicBoolean
gotException
=
new
AtomicBoolean
(
false
);
Thread
deleter
=
new
Thread
()
{
public
void
run
()
{
try
{
barrier
.
await
();
// Deleter thread
cachedExecutor
.
execute
(()
->
{
try
{
barrier
.
await
();
for
(
Position
position
:
addedEntries
)
{
cursor
.
asyncDelete
(
position
,
deleteCallback
,
position
);
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
gotException
.
set
(
true
);
}
finally
{
counter
.
countDown
();
for
(
Position
position
:
addedEntries
)
{
cursor
.
asyncDelete
(
position
,
deleteCallback
,
position
);
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
gotException
.
set
(
true
);
}
finally
{
counter
.
countDown
();
}
};
}
)
;
Thread
reader
=
new
Thread
()
{
public
void
run
()
{
try
{
barrier
.
await
();
// Reader thread
cachedExecutor
.
execute
(()
->
{
try
{
barrier
.
await
();
for
(
int
i
=
0
;
i
<
1000
;
i
++)
{
cursor
.
readEntries
(
1
).
forEach
(
e
->
e
.
release
());
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
gotException
.
set
(
true
);
}
finally
{
counter
.
countDown
();
for
(
int
i
=
0
;
i
<
1000
;
i
++)
{
cursor
.
readEntries
(
1
).
forEach
(
e
->
e
.
release
());
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
gotException
.
set
(
true
);
}
finally
{
counter
.
countDown
();
}
};
System
.
out
.
println
(
"starting deleter and reader.."
+
System
.
currentTimeMillis
());
deleter
.
start
();
reader
.
start
();
});
counter
.
await
();
assertEquals
(
gotException
.
get
(),
false
);
System
.
out
.
println
(
"Finished.."
+
System
.
currentTimeMillis
());
}
@Test
@Test
(
timeOut
=
30000
)
public
void
testConcurrentIndividualDeletes
()
throws
Exception
{
ManagedLedger
ledger
=
factory
.
open
(
"my_test_ledger"
,
new
ManagedLedgerConfig
().
setMaxEntriesPerLedger
(
100
));
...
...
@@ -291,24 +283,22 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
for
(
int
thread
=
0
;
thread
<
Threads
;
thread
++)
{
final
int
myThread
=
thread
;
executor
.
execute
(
new
Runnable
()
{
public
void
run
()
{
try
{
barrier
.
await
();
for
(
int
i
=
0
;
i
<
N
;
i
++)
{
int
threadId
=
i
%
Threads
;
if
(
threadId
==
myThread
)
{
cursor
.
delete
(
addedEntries
.
get
(
i
));
}
}
cachedExecutor
.
execute
(()
->
{
try
{
barrier
.
await
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
()
;
gotException
.
set
(
true
);
}
finally
{
counter
.
countDown
();
for
(
int
i
=
0
;
i
<
N
;
i
++
)
{
int
threadId
=
i
%
Threads
;
if
(
threadId
==
myThread
)
{
cursor
.
delete
(
addedEntries
.
get
(
i
));
}
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
gotException
.
set
(
true
);
}
finally
{
counter
.
countDown
();
}
});
}
...
...
@@ -319,7 +309,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
assertEquals
(
cursor
.
getMarkDeletedPosition
(),
addedEntries
.
get
(
addedEntries
.
size
()
-
1
));
}
@Test
@Test
(
timeOut
=
30000
)
public
void
testConcurrentReadOfSameEntry
()
throws
Exception
{
ManagedLedger
ledger
=
factory
.
open
(
"testConcurrentReadOfSameEntry"
,
new
ManagedLedgerConfig
());
final
int
numCursors
=
5
;
...
...
@@ -346,7 +336,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
for
(
int
i
=
0
;
i
<
numCursors
;
i
++)
{
final
int
cursorIndex
=
i
;
final
ManagedCursor
cursor
=
cursors
.
get
(
cursorIndex
);
e
xecutor
.
execute
(()
->
{
cachedE
xecutor
.
execute
(()
->
{
try
{
barrier
.
await
();
for
(
int
j
=
0
;
j
<
N
;
j
++)
{
...
...
@@ -369,7 +359,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
assertNull
(
result
.
get
());
}
@Test
@Test
(
timeOut
=
30000
)
public
void
testConcurrentIndividualDeletesWithGetNthEntry
()
throws
Exception
{
ManagedLedger
ledger
=
factory
.
open
(
"my_test_ledger"
,
new
ManagedLedgerConfig
().
setMaxEntriesPerLedger
(
100
).
setThrottleMarkDelete
(
0.5
));
...
...
@@ -390,12 +380,12 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
final
AtomicInteger
iteration
=
new
AtomicInteger
(
0
);
for
(
int
i
=
0
;
i
<
deleteEntries
;
i
++)
{
executor
.
execute
(()
->
{
executor
.
submit
(
safeRun
(()
->
{
try
{
cursor
.
asyncDelete
(
addedEntries
.
get
(
iteration
.
getAndIncrement
()),
new
DeleteCallback
()
{
@Override
public
void
deleteComplete
(
Object
ctx
)
{
log
.
info
(
"Successfully deleted cursor"
);
// Ok
}
@Override
...
...
@@ -410,8 +400,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
}
finally
{
counter
.
countDown
();
}
});
}));
}
counter
.
await
();
...
...
@@ -426,6 +415,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
public
void
readEntryComplete
(
Entry
entry
,
Object
ctx
)
{
successReadEntries
.
getAndIncrement
();
entry
.
release
();
readCounter
.
countDown
();
}
@Override
...
...
@@ -437,14 +427,12 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
gotException
.
set
(
true
);
}
finally
{
readCounter
.
countDown
();
}
}
readCounter
.
await
();
assert
Equals
(
gotException
.
get
(),
false
);
assertEquals
(
readEntries
,
successReadEntries
.
get
()
);
assert
False
(
gotException
.
get
()
);
assertEquals
(
successReadEntries
.
get
(),
readEntries
);
}
}
managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
浏览文件 @
171a993f
...
...
@@ -16,6 +16,8 @@
package
org.apache.bookkeeper.test
;
import
java.lang.reflect.Method
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
org.apache.bookkeeper.client.MockBookKeeper
;
import
org.apache.bookkeeper.conf.ClientConfiguration
;
...
...
@@ -47,6 +49,7 @@ public abstract class MockedBookKeeperTestCase {
protected
ClientConfiguration
baseClientConf
=
new
ClientConfiguration
();
protected
OrderedSafeExecutor
executor
;
protected
ExecutorService
cachedExecutor
;
public
MockedBookKeeperTestCase
()
{
// By default start a 3 bookies cluster
...
...
@@ -69,19 +72,19 @@ public abstract class MockedBookKeeperTestCase {
}
executor
=
new
OrderedSafeExecutor
(
2
,
"test"
);
cachedExecutor
=
Executors
.
newCachedThreadPool
();
factory
=
new
ManagedLedgerFactoryImpl
(
bkc
,
zkc
);
}
@AfterMethod
public
void
tearDown
(
Method
method
)
throws
Exception
{
LOG
.
info
(
"@@@@@@@@@ stopping "
+
method
);
System
.
gc
();
factory
.
shutdown
();
factory
=
null
;
stopBookKeeper
();
stopZooKeeper
();
executor
.
shutdown
();
System
.
gc
();
cachedExecutor
.
shutdown
();
LOG
.
info
(
"--------- stopped {}"
,
method
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录