Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
94b941a4
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
94b941a4
编写于
9月 07, 2016
作者:
M
Matteo Merli
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fixed ZookeeperCacheTest timing dependent failures
上级
171a993f
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
28 addition
and
39 deletion
+28
-39
pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java
...t/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java
+28
-39
未找到文件。
pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java
浏览文件 @
94b941a4
...
...
@@ -15,16 +15,17 @@
*/
package
com.yahoo.pulsar.zookeeper
;
import
static
org
.
testng
.
AssertJUnit
.
assertNotNull
;
import
static
org
.
testng
.
AssertJUnit
.
assertNull
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
fail
;
import
static
org
.
testng
.
AssertJUnit
.
assertNotNull
;
import
static
org
.
testng
.
AssertJUnit
.
assertNull
;
import
java.util.Set
;
import
java.util.TreeSet
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.ScheduledThreadPoolExecutor
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
org.apache.bookkeeper.util.OrderedSafeExecutor
;
import
org.apache.zookeeper.KeeperException.Code
;
...
...
@@ -33,7 +34,6 @@ import org.apache.zookeeper.WatchedEvent;
import
org.apache.zookeeper.Watcher.Event
;
import
org.apache.zookeeper.Watcher.Event.KeeperState
;
import
org.apache.zookeeper.ZooKeeper
;
import
org.apache.zookeeper.data.Stat
;
import
org.testng.Assert
;
import
org.testng.annotations.AfterMethod
;
import
org.testng.annotations.BeforeMethod
;
...
...
@@ -103,35 +103,34 @@ public class ZookeeperCacheTest {
ZooKeeperChildrenCache
cache
=
new
ZooKeeperChildrenCache
(
zkCacheService
,
"/test"
);
// Create callback counter
ZooKeeperCacheListener
<
Set
<
String
>>
counter
=
new
ZooKeeperCacheListener
<
Set
<
String
>>()
{
private
int
callbackCount
=
0
;
@Override
public
void
onUpdate
(
String
path
,
Set
<
String
>
data
,
Stat
stat
)
{
++
callbackCount
;
}
public
String
toString
()
{
return
String
.
valueOf
(
callbackCount
);
}
AtomicInteger
notificationCount
=
new
AtomicInteger
(
0
);
ZooKeeperCacheListener
<
Set
<
String
>>
counter
=
(
path
,
data
,
stat
)
->
{
notificationCount
.
incrementAndGet
();
};
// Register counter twice and unregister once, so callback should be counted correctly
cache
.
registerListener
(
counter
);
cache
.
registerListener
(
counter
);
cache
.
unregisterListener
(
counter
);
assertEquals
(
counter
.
toString
(),
"0"
);
assertEquals
(
notificationCount
.
get
(),
0
);
assertEquals
(
cache
.
get
(),
Sets
.
newTreeSet
());
zkClient
.
create
(
"/test/z1"
,
new
byte
[
0
],
null
,
null
);
zkClient
.
create
(
"/test/z2"
,
new
byte
[
0
],
null
,
null
);
Thread
.
sleep
(
20
);
// Wait for cache to be updated in background
while
(
notificationCount
.
get
()
<
2
)
{
Thread
.
sleep
(
1
);
}
assertEquals
(
cache
.
get
(),
new
TreeSet
<
String
>(
Lists
.
newArrayList
(
"z1"
,
"z2"
)));
assertEquals
(
cache
.
get
(
"/test"
),
new
TreeSet
<
String
>(
Lists
.
newArrayList
(
"z1"
,
"z2"
)));
assertEquals
(
counter
.
toString
(),
"2"
);
assertEquals
(
notificationCount
.
get
(),
2
);
zkClient
.
delete
(
"/test/z2"
,
-
1
);
Thread
.
sleep
(
20
);
while
(
notificationCount
.
get
()
<
3
)
{
Thread
.
sleep
(
1
);
}
assertEquals
(
cache
.
get
(),
new
TreeSet
<
String
>(
Lists
.
newArrayList
(
"z1"
)));
assertEquals
(
cache
.
get
(),
new
TreeSet
<
String
>(
Lists
.
newArrayList
(
"z1"
)));
...
...
@@ -145,7 +144,7 @@ public class ZookeeperCacheTest {
// Ok
}
assertEquals
(
counter
.
toString
(),
"3"
);
assertEquals
(
notificationCount
.
get
(),
3
);
}
@Test
...
...
@@ -222,17 +221,9 @@ public class ZookeeperCacheTest {
};
// Create callback counter
ZooKeeperCacheListener
<
String
>
counter
=
new
ZooKeeperCacheListener
<
String
>()
{
private
int
callbackCount
=
0
;
@Override
public
void
onUpdate
(
String
path
,
String
data
,
Stat
stat
)
{
++
callbackCount
;
}
public
String
toString
()
{
return
String
.
valueOf
(
callbackCount
);
}
AtomicInteger
notificationCount
=
new
AtomicInteger
(
0
);
ZooKeeperCacheListener
<
String
>
counter
=
(
path
,
data
,
stat
)
->
{
notificationCount
.
incrementAndGet
();
};
// Register counter twice and unregister once, so callback should be counted correctly
...
...
@@ -248,19 +239,21 @@ public class ZookeeperCacheTest {
String
newValue
=
"test2"
;
// case 1: update and create znode directly and verify that the cache is retrieving the correct data
assertEquals
(
counter
.
toString
(),
"0"
);
assertEquals
(
notificationCount
.
get
(),
0
);
zkClient
.
setData
(
"/my_test"
,
newValue
.
getBytes
(),
-
1
);
zkClient
.
create
(
"/my_test2"
,
value
.
getBytes
(),
null
,
null
);
// Wait for the watch to be triggered
Thread
.
sleep
(
100
);
while
(
notificationCount
.
get
()
<
1
)
{
Thread
.
sleep
(
1
);
}
// retrieve the data from the cache and verify it is the updated/new data
assertEquals
(
zkCache
.
get
(
"/my_test"
),
newValue
);
assertEquals
(
zkCache
.
get
(
"/my_test2"
),
value
);
// The callback method should be called just only once
assertEquals
(
counter
.
toString
(),
"1"
);
assertEquals
(
notificationCount
.
get
(),
1
);
// case 2: force the ZooKeeper session to be expired and verify that the data is still accessible
zkCacheService
.
process
(
new
WatchedEvent
(
Event
.
EventType
.
None
,
KeeperState
.
Expired
,
null
));
...
...
@@ -283,16 +276,12 @@ public class ZookeeperCacheTest {
// case 4: directly delete the znode while the session is not re-connected yet. Verify that the deletion is not
// seen by the cache
zkClient
.
failAfter
(-
1
,
Code
.
OK
);
zkClient
.
delete
(
"/my_test2"
,
-
1
,
(
rc
,
path
,
ctx
)
->
{
},
null
);
zkClient
.
delete
(
"/my_test2"
,
-
1
);
// Make sure it has not been updated yet
assertEquals
(
zkCache
.
get
(
"/my_test2"
),
value
);
zkCacheService
.
process
(
new
WatchedEvent
(
Event
.
EventType
.
None
,
KeeperState
.
SyncConnected
,
null
));
assertEquals
(
zkCache
.
get
(
"/other"
),
newValue
);
// add sleep to avoid sporadic failures
Thread
.
sleep
(
100
);
// Make sure that the value is now directly from ZK and deleted
try
{
zkCache
.
get
(
"/my_test2"
);
...
...
@@ -315,6 +304,6 @@ public class ZookeeperCacheTest {
executor
.
shutdown
();
// Update shouldn't happen after the last check
assertEquals
(
counter
.
toString
(),
"1"
);
assertEquals
(
notificationCount
.
get
(),
1
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录