Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
武汉红喜
whatsmars
提交
9dfad682
W
whatsmars
项目概览
武汉红喜
/
whatsmars
通知
3
Star
0
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
W
whatsmars
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
9dfad682
编写于
1月 19, 2019
作者:
武汉红喜
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
ConfigServer简单实现
上级
a715dbcd
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
919 addition
and
4 deletion
+919
-4
whatsmars-zk/README.md
whatsmars-zk/README.md
+3
-0
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/CommonClient.java
...k/src/main/java/org/hongxi/whatsmars/zk/CommonClient.java
+178
-0
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/ConfigClient.java
...rc/main/java/org/hongxi/whatsmars/zk/cs/ConfigClient.java
+234
-0
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/ConfigManager.java
...c/main/java/org/hongxi/whatsmars/zk/cs/ConfigManager.java
+201
-0
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/ConfigServer.java
...rc/main/java/org/hongxi/whatsmars/zk/cs/ConfigServer.java
+219
-0
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/Constants.java
...k/src/main/java/org/hongxi/whatsmars/zk/cs/Constants.java
+7
-0
whatsmars-zk/src/test/java/org/hongxi/whatsmars/zk/cs/ConfigServerTest.java
...est/java/org/hongxi/whatsmars/zk/cs/ConfigServerTest.java
+65
-0
whatsmars-zk/src/test/java/org/hongxi/whatsmars/zk/curator/CuratorTest.java
...est/java/org/hongxi/whatsmars/zk/curator/CuratorTest.java
+3
-1
whatsmars-zk/src/test/java/org/hongxi/whatsmars/zk/curator/DubboTest.java
.../test/java/org/hongxi/whatsmars/zk/curator/DubboTest.java
+6
-2
whatsmars-zk/src/test/java/org/hongxi/whatsmars/zk/curator/WatcherTest.java
...est/java/org/hongxi/whatsmars/zk/curator/WatcherTest.java
+3
-1
未找到文件。
whatsmars-zk/README.md
0 → 100644
浏览文件 @
9dfad682
# Zookeeper
-
curator demo
-
ConfigServer简单实现
\ No newline at end of file
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/CommonClient.java
0 → 100644
浏览文件 @
9dfad682
package
org.hongxi.whatsmars.zk
;
import
java.io.IOException
;
import
java.util.concurrent.locks.Condition
;
import
java.util.concurrent.locks.ReentrantLock
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.WatchedEvent
;
import
org.apache.zookeeper.Watcher
;
import
org.apache.zookeeper.Watcher.Event.EventType
;
import
org.apache.zookeeper.ZooDefs.Ids
;
import
org.apache.zookeeper.ZooKeeper
;
import
org.apache.zookeeper.ZooKeeper.States
;
/**
* zookeeper中常用特性,模拟多客户端场景
*/
public
class
CommonClient
implements
Watcher
{
// 控制zkClient实例化过程
private
ReentrantLock
initLock
=
new
ReentrantLock
();
private
Condition
initCondition
=
initLock
.
newCondition
();
private
String
servers
;
private
int
sessionTimeout
=
15000
;
// default
private
ZooKeeper
zkClient
;
private
boolean
canReadOnly
;
public
CommonClient
(
String
servers
,
int
sessionTimeout
,
boolean
canReadOnly
)
throws
IOException
,
InterruptedException
{
this
.
servers
=
servers
;
this
.
sessionTimeout
=
sessionTimeout
;
this
.
canReadOnly
=
canReadOnly
;
initZkClient
();
}
public
String
create
(
String
path
,
byte
[]
data
)
throws
Exception
{
if
(!
this
.
isAlive
())
{
getZkClient
(
true
);
}
return
this
.
zkClient
.
create
(
path
,
data
,
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
}
private
void
initZkClient
()
throws
IOException
,
InterruptedException
{
// 实例化
initLock
.
lockInterruptibly
();
try
{
while
(!
this
.
isRunning
())
{
// 异步操作,此处,我们让此操作“变成”同步
zkClient
=
new
ZooKeeper
(
servers
,
sessionTimeout
,
this
,
canReadOnly
);
initCondition
.
await
();
// 在实例化成功之前,client将不能为用户服务。
}
}
catch
(
IOException
e
)
{
throw
e
;
}
finally
{
initLock
.
unlock
();
}
}
/**
* 获得zookeeper实例
*
* @param rebuild 是否可以自动重建实例,当发生session过期时或者链接被关闭
* @return
* @throws IOException
* @throws InterruptedException
*/
public
ZooKeeper
getZkClient
(
boolean
rebuild
)
throws
IOException
,
InterruptedException
{
if
(
this
.
isRunning
())
{
return
zkClient
;
}
if
(!
rebuild
)
{
if
(
zkClient
!=
null
)
{
zkClient
.
close
();
zkClient
=
null
;
}
return
null
;
}
this
.
initZkClient
();
return
zkClient
;
}
public
boolean
isRunning
()
{
if
(
this
.
zkClient
==
null
)
{
return
false
;
}
States
state
=
this
.
zkClient
.
getState
();
switch
(
state
)
{
case
NOT_CONNECTED:
// first time
case
CLOSED:
// error,or expired
case
AUTH_FAILED:
return
false
;
default
:
return
true
;
}
}
private
boolean
isAlive
()
{
if
(
zkClient
==
null
)
{
return
false
;
}
States
state
=
zkClient
.
getState
();
switch
(
state
)
{
case
CONNECTED:
case
CONNECTEDREADONLY:
case
CONNECTING:
return
true
;
default
:
return
false
;
}
}
/**
* 处理当前zkClient中关于“链接状态迁移”的事件
*/
public
void
process
(
WatchedEvent
event
)
{
// 如果是“数据变更”事件
if
(
event
.
getType
()
!=
EventType
.
None
)
{
//
processExt
(
event
);
}
else
{
// 如果是链接状态迁移
// 参见keeperState
switch
(
event
.
getState
())
{
case
SyncConnected:
// 链接成功
initLock
.
lock
();
initCondition
.
notifyAll
();
initLock
.
unlock
();
break
;
case
Expired:
// 链接成功
initLock
.
lock
();
if
(
zkClient
!=
null
)
{
try
{
zkClient
.
close
();
}
catch
(
Exception
e
)
{
//
}
}
zkClient
=
null
;
initCondition
.
notifyAll
();
initLock
.
unlock
();
break
;
// session过期
case
Disconnected:
// 链接断开,或session迁移
System
.
out
.
println
(
"Connecting...."
);
break
;
default
:
break
;
}
}
}
public
void
processExt
(
WatchedEvent
event
)
{
EventType
et
=
event
.
getType
();
if
(
et
==
EventType
.
None
)
{
return
;
}
switch
(
et
)
{
case
NodeChildrenChanged:
case
NodeDataChanged:
case
NodeCreated:
case
NodeDeleted:
// self
break
;
default
:
break
;
}
}
}
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/ConfigClient.java
0 → 100644
浏览文件 @
9dfad682
package
org.hongxi.whatsmars.zk.cs
;
import
java.util.Collections
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Random
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.locks.ReentrantLock
;
import
org.apache.zookeeper.WatchedEvent
;
import
org.apache.zookeeper.Watcher
;
import
org.apache.zookeeper.Watcher.Event.EventType
;
import
org.apache.zookeeper.ZooKeeper
;
import
org.apache.zookeeper.ZooKeeper.States
;
/**
* configServer注册的数据,configClient消费。设计思路和configServer一致。
* 针对client获取数据的方式很多,如下是2中思路 1) 使用zk watch,当数据变更时即使获取 2) 开启守护线程,间歇性读取
* 这两中方式各有优缺点,使用watch,间接的增加了zk环境的事件push的压力和“波及面”,当客户端为N,每个客户端wath的节点数为M
* 那么在极端情况下,zk需要分发的watch个数为M*N,而且可能因为configServer的数据变更较多,导致watch处罚次数较多。
* 同时因为网络问题,client可能丢失某些事件而导致无法及时获取数据。
* <p>
* 如果使用2),直接避免了1)所带来的问题,但是因为间歇性的读取,可能导致zk数据变更无法被即使获得。同时还有其他的问题,比如如果当前
* client所关注的serverType集合较大,而且数据尺寸较大,可能会导致每次全量读取都会消耗较长的时间和网络IO,如果“间歇时间”较短 +
* 数据较大, 也会对整个环境有很大影响。
* <p>
* 不过,此实例假设configServer所注册的数据较小,configClient与zk之间的网络情况较佳。因此我决定采取2)
*/
public
class
ConfigClient
{
private
ZooKeeper
zkClient
;
// inner cache;key:serverType,value:serverList
private
Map
<
String
,
Set
<
String
>>
servers
=
new
ConcurrentHashMap
<
String
,
Set
<
String
>>();
// 当前configClient需要获取的数据分类。即当前client对何种serverType感兴趣
private
Set
<
String
>
serverTypes
=
new
HashSet
<
String
>();
private
Watcher
dw
=
new
InnerZK
();
// 只关注链接状态迁移事件,区别于configServer
private
ReentrantLock
lock
=
new
ReentrantLock
();
// 对于首次链接,或者网络异常进行一次阻塞方式的数据同步,将阻塞其他线程对client的操作。
private
Object
tag
=
new
Object
();
private
boolean
init
=
false
;
// 是否已经初始化
private
Thread
thread
=
new
DumpThread
();
// 数据同步线程
public
ConfigClient
(
String
...
types
)
{
if
(
types
==
null
||
types
.
length
==
0
)
{
throw
new
RuntimeException
(
"ConfigClient,serverTypes cant be empty..please check!"
);
}
for
(
String
type
:
types
)
{
if
(
type
==
null
||
type
.
isEmpty
()
||
type
.
contains
(
"/"
))
{
System
.
out
.
println
(
"ConfigClient,ignore :"
+
type
);
continue
;
}
serverTypes
.
add
(
type
);
}
thread
.
setDaemon
(
true
);
thread
.
start
();
}
/**
* 获得指定serverType的节点数据
*
* @param serverType
* @return
*/
public
Set
<
String
>
getServers
(
String
serverType
)
{
synchronized
(
tag
)
{
while
(!
init
)
{
try
{
// 阻塞直到成功,在链接异常时的dump期间,所有客户端访问需要阻塞;在dumpThread中dump,不会阻塞。
// 当然你可以设计为不阻塞。
// 不过需要注意“首次实例化一定要阻塞”,因为configClient实例化zk是在dumpThread中,
// 如果执行new ConfigClient()之后,立即调用getServers方法,可能导致一个调用者获得空集合;
tag
.
wait
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
break
;
}
}
}
if
(
servers
.
containsKey
(
serverType
))
{
return
Collections
.
unmodifiableSet
(
servers
.
get
(
serverType
));
}
return
null
;
}
public
Set
<
String
>
getServerTypes
()
{
return
serverTypes
;
}
// ///////////////////////////////////////////inner
// work////////////////////////////
/**
* 和zk同步数据
*/
private
void
dump
()
{
lock
.
lock
();
try
{
for
(
String
serverType
:
serverTypes
)
{
dump
(
serverType
);
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
lock
.
unlock
();
}
}
/**
* 同步制定serverType的数据
*
* @param serverType
*/
private
void
dump
(
String
serverType
)
{
lock
.
lock
();
try
{
String
parent
=
"/"
+
serverType
;
List
<
String
>
children
=
zkClient
.
getChildren
(
parent
,
false
,
null
);
// 注册watch
if
(
children
==
null
||
children
.
isEmpty
())
{
return
;
}
Set
<
String
>
snap
=
new
HashSet
<
String
>();
;
for
(
String
path
:
children
)
{
byte
[]
data
=
zkClient
.
getData
(
parent
+
"/"
+
path
,
false
,
null
);
snap
.
add
(
new
String
(
data
));
}
servers
.
put
(
serverType
,
snap
);
// 直接替换
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
lock
.
unlock
();
}
}
class
InnerZK
implements
Watcher
{
public
void
process
(
WatchedEvent
event
)
{
// 如果是“数据变更”事件,不关注数据变更事件,事实上,我们也不会注册此类型事件
if
(
event
.
getType
()
!=
EventType
.
None
)
{
return
;
}
// 如果是链接状态迁移
// 参见keeperState
switch
(
event
.
getState
())
{
case
SyncConnected:
System
.
out
.
println
(
"Connected..."
);
init
=
false
;
dump
();
// 每次链接重建,都需要手动dump一下数据
init
=
true
;
synchronized
(
tag
)
{
tag
.
notifyAll
();
}
break
;
case
Expired:
System
.
out
.
println
(
"Expired..."
);
// 将在DumpThread中自动创建
break
;
// session过期
case
Disconnected:
// 链接断开,或session迁移
System
.
out
.
println
(
"Connecting...."
);
break
;
case
AuthFailed:
init
=
true
;
synchronized
(
tag
)
{
tag
.
notifyAll
();
}
if
(
thread
.
isAlive
())
{
thread
.
interrupt
();
servers
.
clear
();
}
default
:
break
;
}
}
}
class
DumpThread
extends
Thread
{
@Override
public
void
run
()
{
try
{
Random
r
=
new
Random
();
int
i
=
0
;
while
(
true
)
{
System
.
out
.
println
(
"Client handler,running...tid: "
+
Thread
.
currentThread
().
getId
());
// 如果zk尚未实例化,或者链接异常
if
(
zkClient
==
null
||
(
zkClient
.
getState
()
==
States
.
NOT_CONNECTED
||
zkClient
.
getState
()
==
States
.
CLOSED
))
{
lock
.
lock
();
try
{
// 回话重建等异常行为
zkClient
=
new
ZooKeeper
(
Constants
.
connectString
,
10000
,
dw
,
true
);
System
.
out
.
println
(
"Reconnected success!..."
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
i
++;
// 惰性延迟,每失败一次,多休眠100ms
Thread
.
sleep
(
2000
+
i
*
100
);
}
finally
{
lock
.
unlock
();
}
continue
;
}
if
(
zkClient
.
getState
().
isConnected
())
{
// 休眠,为了避免client网络“大规模”故障时,同时访问zk带来的性能波动
Thread
.
sleep
(
1000
+
r
.
nextInt
(
6000
));
dump
();
i
=
0
;
// reset
}
}
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
System
.
out
.
println
(
"Exit..."
);
if
(
zkClient
!=
null
)
{
try
{
zkClient
.
close
();
}
catch
(
Exception
ze
)
{
ze
.
printStackTrace
();
}
}
}
}
}
}
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/ConfigManager.java
0 → 100644
浏览文件 @
9dfad682
package
org.hongxi.whatsmars.zk.cs
;
import
java.util.Set
;
import
java.util.concurrent.CopyOnWriteArraySet
;
import
java.util.concurrent.locks.ReentrantLock
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.KeeperException.NodeExistsException
;
import
org.apache.zookeeper.WatchedEvent
;
import
org.apache.zookeeper.Watcher
;
import
org.apache.zookeeper.Watcher.Event.EventType
;
import
org.apache.zookeeper.ZooDefs.Ids
;
import
org.apache.zookeeper.ZooKeeper
;
import
org.apache.zookeeper.ZooKeeper.States
;
import
org.apache.zookeeper.data.Stat
;
/**
* 负责管理所有的“serverType”,对于zk而言,负责创建/删除一级节点。每个一级节点表示一个“serverType”。
* 每个serverType都有多个子节点,子节点由configServer实例负责注册。
*/
public
class
ConfigManager
{
Set
<
String
>
serverTypes
=
new
CopyOnWriteArraySet
<
String
>();
private
ZooKeeper
zkClient
;
private
ReentrantLock
lock
=
new
ReentrantLock
();
// 同步锁,事实上本例可以不用。。仅供参考
// 当zk环境故障时,是否自动重连,自动重连就意味着开启守护线程检测zk环境,
// 此方式适用于zk client不关心session过期,“session重建”带来的数据变更(例如临时节点)不会造成系统异常情况下
private
boolean
autoReconnected
=
false
;
private
Thread
thread
=
null
;
private
Watcher
dw
=
new
InnerZK
();
// default watcher
private
boolean
outdate
=
false
;
// 数据是否过期,在autoReconnected情况下使用,如果没有采用“自动重连”,在session过期后,将不会重建session,
// 并把outdate标记为true
public
ConfigManager
()
{
this
(
false
);
}
/**
* 首次链接必须正常,自动重连,将不会对“首次链接”起作用
*
* @param autoReconneted
*/
public
ConfigManager
(
boolean
autoReconneted
)
{
this
.
autoReconnected
=
autoReconneted
;
if
(
this
.
autoReconnected
)
{
thread
=
new
Thread
(
new
FailureHandler
());
thread
.
setDaemon
(
true
);
thread
.
start
();
}
else
{
try
{
// 回话重建等异常行为
zkClient
=
new
ZooKeeper
(
Constants
.
connectString
,
3000
,
dw
,
false
);
System
.
out
.
println
(
"Reconnected success!..."
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
throw
new
RuntimeException
(
e
);
}
}
}
public
void
add
(
String
path
)
{
serverTypes
.
add
(
"/"
+
path
);
rebuild
();
}
public
void
remote
(
String
path
)
{
serverTypes
.
remove
(
path
);
}
public
Set
<
String
>
getServerTypes
()
{
return
serverTypes
;
}
public
boolean
isOutdate
()
{
return
outdate
;
}
////////////////////////////////////////////////inner work//////////////////////////////////
/**
* 创建所有serverType的跟节点,比如/cache-server,所有一级节点由此类统一负责创建。
*/
private
void
rebuild
()
{
lock
.
lock
();
if
(
zkClient
==
null
||
!
zkClient
.
getState
().
isConnected
())
{
return
;
}
for
(
String
path
:
serverTypes
)
{
try
{
Stat
stat
=
zkClient
.
exists
(
path
,
false
);
if
(
stat
==
null
)
{
try
{
zkClient
.
create
(
path
,
null
,
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
}
catch
(
NodeExistsException
ne
)
{
// 如果多个manager同时创建节点,可能会导致此异常,此处忽略它。
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
lock
.
unlock
();
}
}
}
/**
* watcher,负责处理事件或者异步操作(本代码实例,未展示异步操作)
*/
class
InnerZK
implements
Watcher
{
public
void
process
(
WatchedEvent
event
)
{
// 如果是“数据变更”事件,忽略
if
(
event
.
getType
()
!=
EventType
.
None
)
{
return
;
}
// 如果是链接状态迁移
// 参见keeperState
switch
(
event
.
getState
())
{
case
SyncConnected:
System
.
out
.
println
(
"Connected..."
);
rebuild
();
// 每次重连,都检测一下数据状态。
outdate
=
false
;
break
;
case
Expired:
System
.
out
.
println
(
"Expired..."
);
// session重建
outdate
=
true
;
break
;
// session过期
case
Disconnected:
// 链接断开,或session迁移
System
.
out
.
println
(
"Connecting...."
);
break
;
case
AuthFailed:
if
(
autoReconnected
&&
thread
.
isAlive
())
{
thread
.
interrupt
();
}
throw
new
RuntimeException
(
"ZK Connection auth failed..."
);
default
:
break
;
}
}
}
class
FailureHandler
implements
Runnable
{
/**
* zk故障担保线程,如果需要故障检测或者容错,请将此实例交付给单独线程执行
* 比如:因为网络问题,zk实例将可能长时间处于无法链接状态,或者其它异常,导致zk实例化出错等
*/
public
void
run
()
{
try
{
int
i
=
0
;
int
l
=
100
;
// 每次重建,将时间延迟100ms
while
(
true
)
{
System
.
out
.
println
(
"Manager handler,running...tid: "
+
Thread
.
currentThread
().
getId
());
if
(
zkClient
==
null
||
(
zkClient
.
getState
()
==
States
.
NOT_CONNECTED
||
zkClient
.
getState
()
==
States
.
CLOSED
))
{
lock
.
lock
();
try
{
// 回话重建等异常行为
zkClient
=
new
ZooKeeper
(
Constants
.
connectString
,
3000
,
dw
,
false
);
System
.
out
.
println
(
"Reconnected success!..."
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
i
++;
Thread
.
sleep
(
3000
+
i
*
l
);
// 在zk环境异常情况下,每3秒重试一次
}
finally
{
lock
.
unlock
();
}
continue
;
}
if
(
zkClient
.
getState
().
isConnected
())
{
Thread
.
sleep
(
3000
);
// 如果被“中断”,直接退出
i
=
0
;
}
}
}
catch
(
InterruptedException
e
)
{
System
.
out
.
println
(
"Exit..."
);
if
(
zkClient
!=
null
)
{
try
{
zkClient
.
close
();
}
catch
(
Exception
ze
)
{
ze
.
printStackTrace
();
}
}
}
}
}
}
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/ConfigServer.java
0 → 100644
浏览文件 @
9dfad682
package
org.hongxi.whatsmars.zk.cs
;
import
java.util.Random
;
import
java.util.concurrent.locks.ReentrantLock
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.KeeperException.NodeExistsException
;
import
org.apache.zookeeper.WatchedEvent
;
import
org.apache.zookeeper.Watcher
;
import
org.apache.zookeeper.Watcher.Event.EventType
;
import
org.apache.zookeeper.ZooDefs.Ids
;
import
org.apache.zookeeper.ZooKeeper
;
import
org.apache.zookeeper.ZooKeeper.States
;
import
org.apache.zookeeper.data.Stat
;
/**
* configServer,负责向zk注册当前server的信息,可被configClient获得信息。
*/
public
class
ConfigServer
{
private
ZooKeeper
zkClient
;
private
String
path
;
private
String
serverType
;
// 当前configServer的类型,我们假设一个configServer实例持有一种“serverType”
private
ReentrantLock
lock
=
new
ReentrantLock
();
private
boolean
autoReconnected
=
false
;
private
Thread
thread
=
null
;
private
Watcher
dw
=
new
InnerZK
();
// default watcher
private
boolean
outdate
=
false
;
// 数据是否过期,在autoReconnected情况下使用,如果没有采用“自动重连”,在session过期后,将不会重建session,并把outdate标记为true
// 控制首次访问
private
Object
tag
=
new
Object
();
private
boolean
init
=
false
;
public
ConfigServer
(
String
serverType
)
{
this
(
serverType
,
false
);
}
public
ConfigServer
(
String
serverType
,
boolean
autoReconnected
)
{
this
.
serverType
=
serverType
;
this
.
autoReconnected
=
autoReconnected
;
if
(
this
.
autoReconnected
)
{
thread
=
new
Thread
(
new
FailureHandler
());
thread
.
setDaemon
(
true
);
thread
.
start
();
}
else
{
try
{
// 回话重建等异常行为
zkClient
=
new
ZooKeeper
(
Constants
.
connectString
,
3000
,
dw
,
false
);
System
.
out
.
println
(
"Reconnected success!..."
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
throw
new
RuntimeException
(
e
);
}
}
}
public
boolean
isOutdate
()
{
return
outdate
;
}
/**
* 注册server信息,从zk角度来说,就是创建serverType的一个子节点。
*
* @return
*/
private
boolean
register
()
{
lock
.
lock
();
init
=
false
;
try
{
Stat
stat
=
zkClient
.
exists
(
"/"
+
serverType
,
true
);
// 注册“父节点”watch,跟踪父节点的创建/删除
// 创建跟节点:/cache-server
// 如果跟节点不存在,则等待configManager去创建,创建成功后,将会在下文的watch事件中创建此子节点。
if
(
stat
==
null
)
{
return
false
;
}
// 创建临时子节点:/cache-server/cs01;
Random
r
=
new
Random
();
String
data
=
"127.0.0.1:"
+
r
.
nextInt
(
65535
);
// tmp data,模拟一个ip +
// port参数
path
=
zkClient
.
create
(
"/"
+
serverType
+
"/id_"
,
data
.
getBytes
(),
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
EPHEMERAL_SEQUENTIAL
);
System
.
out
.
println
(
"Register path:"
+
path
);
init
=
true
;
synchronized
(
tag
)
{
tag
.
notifyAll
();
}
}
catch
(
NodeExistsException
ne
)
{
// ignore.
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
lock
.
unlock
();
}
return
true
;
}
public
String
getPath
()
{
synchronized
(
tag
)
{
while
(!
init
)
{
try
{
tag
.
wait
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
break
;
}
}
}
return
path
;
}
class
InnerZK
implements
Watcher
{
public
void
process
(
WatchedEvent
event
)
{
// 如果是“数据变更”事件
if
(
event
.
getType
()
!=
EventType
.
None
)
{
switch
(
event
.
getType
())
{
// 如果其父节点(/serverType)被创建,
// 此时configServer也开始注册其子节点信息,watcher在下文中SyncConnected中注册。
case
NodeCreated:
register
();
break
;
case
NodeDeleted:
// 如果父节点被删除,那么此后子节点也将不复存在
path
=
null
;
register
();
// 注册watch,检测父节点/serverType再次创建。
break
;
default
:
break
;
}
return
;
}
// 如果是链接状态迁移
// 参见keeperState
switch
(
event
.
getState
())
{
case
SyncConnected:
System
.
out
.
println
(
"Connected..."
);
// 如果path == null,则表明是首次链接或者session重建。
if
(
path
==
null
)
{
try
{
register
();
// 创建子节点,并对其父节点注册watch。
outdate
=
false
;
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
break
;
case
Expired:
System
.
out
.
println
(
"Expired..."
);
outdate
=
true
;
init
=
true
;
synchronized
(
tag
)
{
tag
.
notifyAll
();
}
break
;
// session过期
case
Disconnected:
// 链接断开,或session迁移
System
.
out
.
println
(
"Connecting...."
);
break
;
case
AuthFailed:
init
=
true
;
synchronized
(
tag
)
{
tag
.
notifyAll
();
}
if
(
autoReconnected
&&
thread
.
isAlive
())
{
thread
.
interrupt
();
}
throw
new
RuntimeException
(
"ZK Connection auth failed..."
);
default
:
break
;
}
}
}
class
FailureHandler
implements
Runnable
{
public
void
run
()
{
try
{
int
i
=
0
;
int
l
=
10
;
while
(
true
)
{
System
.
out
.
println
(
"Server handler,running...tid: "
+
Thread
.
currentThread
().
getId
());
if
(
zkClient
==
null
||
(
zkClient
.
getState
()
==
States
.
NOT_CONNECTED
||
zkClient
.
getState
()
==
States
.
CLOSED
))
{
lock
.
lock
();
try
{
// 回话重建等异常行为
zkClient
=
new
ZooKeeper
(
Constants
.
connectString
,
3000
,
dw
,
false
);
System
.
out
.
println
(
"Reconnected success!..."
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
i
++;
Thread
.
sleep
(
3000
+
i
*
l
);
// 在zk环境异常情况下,每3秒重试一次
}
finally
{
lock
.
unlock
();
}
continue
;
}
if
(
zkClient
.
getState
().
isConnected
())
{
Thread
.
sleep
(
3000
);
// 如果被“中断”,直接退出
i
=
0
;
}
}
}
catch
(
InterruptedException
e
)
{
System
.
out
.
println
(
"Exit..."
);
if
(
zkClient
!=
null
)
{
try
{
zkClient
.
close
();
}
catch
(
Exception
ze
)
{
ze
.
printStackTrace
();
}
}
}
}
}
}
whatsmars-zk/src/main/java/org/hongxi/whatsmars/zk/cs/Constants.java
0 → 100644
浏览文件 @
9dfad682
package
org.hongxi.whatsmars.zk.cs
;
public
class
Constants
{
static
final
String
connectString
=
"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
;
}
whatsmars-zk/src/test/java/org/hongxi/whatsmars/zk/cs/ConfigServerTest.java
0 → 100644
浏览文件 @
9dfad682
package
org.hongxi.whatsmars.zk.cs
;
import
org.junit.Test
;
import
java.util.Set
;
/**
* 假设一种场景:服务A向ZK注册自己的服务信息,比如IP + Port;客户端B向ZK获取服务的列表,并使用服务。
* 比如CacheServer向zk注册ip和客户端port;其他client端可以向zk获取cacheserver的ip + port,以便此后建立链接。
*/
public
class
ConfigServerTest
{
@Test
public
void
testConfigServer
()
{
String
serverType
=
"cache-server"
;
try
{
ConfigManager
manager
=
new
ConfigManager
(
true
);
manager
.
add
(
serverType
);
ConfigServer
s1
=
new
ConfigServer
(
serverType
);
ConfigServer
s2
=
new
ConfigServer
(
serverType
,
true
);
ConfigServer
s3
=
new
ConfigServer
(
serverType
);
ConfigClient
c1
=
new
ConfigClient
(
serverType
);
ConfigClient
c2
=
new
ConfigClient
(
serverType
);
ConfigClient
c3
=
new
ConfigClient
(
serverType
);
Thread
.
sleep
(
3000
);
System
.
out
.
println
(
"+++++++++++++++++++++++++"
);
System
.
out
.
println
(
"S1"
+
s1
.
getPath
());
// 注意zk链接,是异步的,有可能此处为 null
System
.
out
.
println
(
"S2"
+
s2
.
getPath
());
System
.
out
.
println
(
"S3"
+
s3
.
getPath
());
while
(
true
)
{
System
.
out
.
println
(
"-------------------------"
);
Set
<
String
>
l1
=
c1
.
getServers
(
serverType
);
if
(
l1
==
null
)
{
System
.
out
.
println
(
"l1 is null..."
);
}
else
{
for
(
String
path
:
l1
)
{
System
.
out
.
println
(
"l1:"
+
path
);
}
}
Set
<
String
>
l2
=
c2
.
getServers
(
serverType
);
if
(
l2
==
null
)
{
System
.
out
.
println
(
"l2 is null..."
);
}
else
{
for
(
String
path
:
l2
)
{
System
.
out
.
println
(
"l2:"
+
path
);
}
}
Set
<
String
>
l3
=
c3
.
getServers
(
serverType
);
if
(
l3
==
null
)
{
System
.
out
.
println
(
"l3 is null..."
);
}
else
{
for
(
String
path
:
l3
)
{
System
.
out
.
println
(
"l3:"
+
path
);
}
}
Thread
.
sleep
(
2000
);
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
whatsmars-zk/src/
main
/java/org/hongxi/whatsmars/zk/curator/CuratorTest.java
→
whatsmars-zk/src/
test
/java/org/hongxi/whatsmars/zk/curator/CuratorTest.java
浏览文件 @
9dfad682
...
...
@@ -5,13 +5,15 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import
org.apache.curator.retry.RetryNTimes
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.data.Stat
;
import
org.junit.Test
;
/**
* Created by shenhongxi on 2018/10/27.
*/
public
class
CuratorTest
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
@Test
public
void
testCurator
()
throws
Exception
{
CuratorFramework
client
=
CuratorFrameworkFactory
.
newClient
(
"127.0.0.1:2181"
,
new
RetryNTimes
(
10
,
5000
));
client
.
start
();
System
.
out
.
println
(
"zk client started!"
);
...
...
whatsmars-zk/src/
main
/java/org/hongxi/whatsmars/zk/curator/DubboTest.java
→
whatsmars-zk/src/
test
/java/org/hongxi/whatsmars/zk/curator/DubboTest.java
浏览文件 @
9dfad682
...
...
@@ -3,6 +3,8 @@ package org.hongxi.whatsmars.zk.curator;
import
org.apache.curator.framework.CuratorFramework
;
import
org.apache.curator.framework.CuratorFrameworkFactory
;
import
org.apache.curator.retry.RetryNTimes
;
import
org.junit.Before
;
import
org.junit.Test
;
import
java.util.ArrayList
;
import
java.util.List
;
...
...
@@ -14,11 +16,13 @@ public class DubboTest {
private
static
CuratorFramework
client
=
CuratorFrameworkFactory
.
newClient
(
"127.0.0.1:2181"
,
new
RetryNTimes
(
10
,
5000
));
static
{
@Before
public
void
start
()
{
client
.
start
();
}
public
static
void
main
(
String
[]
args
)
{
@Test
public
void
testDubbo
()
{
String
path
=
"/dubbo"
;
print
(
path
);
final
String
path2
=
"/dubbo/org.hongxi.whatsmars.dubbo.demo.api.DemoService"
;
...
...
whatsmars-zk/src/
main
/java/org/hongxi/whatsmars/zk/curator/WatcherTest.java
→
whatsmars-zk/src/
test
/java/org/hongxi/whatsmars/zk/curator/WatcherTest.java
浏览文件 @
9dfad682
...
...
@@ -5,13 +5,15 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import
org.apache.curator.framework.recipes.cache.ChildData
;
import
org.apache.curator.framework.recipes.cache.PathChildrenCache
;
import
org.apache.curator.retry.RetryNTimes
;
import
org.junit.Test
;
/**
* Created by shenhongxi on 2018/10/27.
*/
public
class
WatcherTest
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
@Test
public
void
testWatcher
()
throws
Exception
{
CuratorFramework
client
=
CuratorFrameworkFactory
.
newClient
(
"127.0.0.1:2181"
,
new
RetryNTimes
(
10
,
5000
));
client
.
start
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录