Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
10794cf4
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
10794cf4
编写于
8月 04, 2017
作者:
H
Helin Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Master persist more states to etcd, schedule pending timeout after load pending state.
上级
9bb57153
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
50 addition
and
43 deletion
+50
-43
go/master/service.go
go/master/service.go
+49
-42
go/pserver/client/etcd_client.go
go/pserver/client/etcd_client.go
+1
-1
未找到文件。
go/master/service.go
浏览文件 @
10794cf4
...
...
@@ -77,11 +77,13 @@ type taskEntry struct {
NumFailure
int
}
type
taskQueues
struct
{
type
masterState
struct
{
Todo
[]
taskEntry
Pending
map
[
int
]
taskEntry
// map from task ID to task entry
Done
[]
taskEntry
Failed
[]
taskEntry
CurPass
int
JobTasks
[]
taskEntry
}
// Service is the master server service.
...
...
@@ -95,10 +97,10 @@ type Service struct {
initDone
bool
mu
sync
.
Mutex
taskQueues
taskQueues
currPass
int
jobTasks
[]
taskEntry
// State to be persisted to snapshot.
state
masterState
// The trainer that is currently saving model. This state is
// transient, does not need to be persisted to snapshot.
savingTrainer
string
}
...
...
@@ -141,8 +143,8 @@ func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failur
s
.
chunksPerTask
=
chunksPerTask
s
.
timeoutDur
=
timeoutDur
s
.
failureMax
=
failureMax
s
.
taskQueues
=
taskQueues
{}
s
.
taskQueues
.
Pending
=
make
(
map
[
int
]
taskEntry
)
s
.
state
=
masterState
{}
s
.
state
.
Pending
=
make
(
map
[
int
]
taskEntry
)
s
.
ready
=
make
(
chan
struct
{})
s
.
store
=
store
recovered
,
err
:=
s
.
recover
()
...
...
@@ -180,7 +182,7 @@ func (s *Service) recover() (bool, error) {
}
dec
:=
gob
.
NewDecoder
(
gr
)
var
tqs
taskQueues
var
tqs
masterState
err
=
dec
.
Decode
(
&
tqs
)
if
err
!=
nil
{
return
false
,
err
...
...
@@ -193,7 +195,12 @@ func (s *Service) recover() (bool, error) {
log
.
Errorln
(
err
)
}
s
.
taskQueues
=
tqs
s
.
state
=
tqs
log
.
WithFields
(
s
.
logFields
())
.
Infof
(
"Master recovered from snapshot, scheduling pending task timeout check."
)
for
_
,
t
:=
range
s
.
state
.
Pending
{
time
.
AfterFunc
(
s
.
timeoutDur
,
s
.
checkTimeoutFunc
(
t
.
Task
.
Meta
.
ID
,
t
.
Task
.
Meta
.
Epoch
))
}
return
true
,
nil
}
...
...
@@ -208,7 +215,7 @@ func (s *Service) snapshot() error {
var
buf
bytes
.
Buffer
gw
:=
gzip
.
NewWriter
(
&
buf
)
enc
:=
gob
.
NewEncoder
(
gw
)
err
:=
enc
.
Encode
(
s
.
taskQueues
)
err
:=
enc
.
Encode
(
s
.
state
)
if
err
!=
nil
{
return
err
}
...
...
@@ -290,8 +297,8 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error {
return
err
}
s
.
j
obTasks
=
partition
(
chunks
,
s
.
chunksPerTask
)
s
.
taskQueues
.
Todo
=
s
.
j
obTasks
s
.
state
.
J
obTasks
=
partition
(
chunks
,
s
.
chunksPerTask
)
s
.
state
.
Todo
=
s
.
state
.
J
obTasks
err
=
s
.
snapshot
()
if
err
!=
nil
{
...
...
@@ -319,17 +326,17 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
}
}()
delete
(
s
.
taskQueues
.
Pending
,
t
.
Task
.
Meta
.
ID
)
delete
(
s
.
state
.
Pending
,
t
.
Task
.
Meta
.
ID
)
t
.
NumFailure
++
if
t
.
NumFailure
>
s
.
failureMax
{
log
.
Warningf
(
"Task %v failed %d times, discard."
,
t
.
Task
,
t
.
NumFailure
)
s
.
taskQueues
.
Failed
=
append
(
s
.
taskQueues
.
Failed
,
t
)
s
.
state
.
Failed
=
append
(
s
.
state
.
Failed
,
t
)
return
}
log
.
Warningf
(
"Task %v failed %d times, re-dispatch."
,
t
.
Task
,
t
.
NumFailure
)
s
.
taskQueues
.
Todo
=
append
(
s
.
taskQueues
.
Todo
,
t
)
s
.
state
.
Todo
=
append
(
s
.
state
.
Todo
,
t
)
return
}
...
...
@@ -338,7 +345,7 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
s
.
mu
.
Lock
()
defer
s
.
mu
.
Unlock
()
t
,
ok
:=
s
.
taskQueues
.
Pending
[
taskID
]
t
,
ok
:=
s
.
state
.
Pending
[
taskID
]
if
!
ok
{
return
}
...
...
@@ -350,10 +357,10 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
// must be called with lock held.
func
(
s
*
Service
)
logFields
()
log
.
Fields
{
return
log
.
Fields
{
"todoLen"
:
len
(
s
.
taskQueues
.
Todo
),
"pendingLen"
:
len
(
s
.
taskQueues
.
Pending
),
"doneLen"
:
len
(
s
.
taskQueues
.
Done
),
"failedLen"
:
len
(
s
.
taskQueues
.
Failed
),
"todoLen"
:
len
(
s
.
state
.
Todo
),
"pendingLen"
:
len
(
s
.
state
.
Pending
),
"doneLen"
:
len
(
s
.
state
.
Done
),
"failedLen"
:
len
(
s
.
state
.
Failed
),
}
}
...
...
@@ -366,17 +373,17 @@ func (s *Service) GetTask(passID int, task *Task) error {
s
.
mu
.
Lock
()
defer
s
.
mu
.
Unlock
()
if
passID
<
s
.
cur
rPass
{
if
passID
<
s
.
state
.
Cu
rPass
{
return
ErrPassBefore
}
if
passID
>
s
.
cur
rPass
{
if
passID
>
s
.
state
.
Cu
rPass
{
// Client may get run to pass after master when one client faster than the
// other
return
ErrPassAfter
}
if
len
(
s
.
taskQueues
.
Todo
)
==
0
{
if
len
(
s
.
taskQueues
.
Done
)
==
0
&&
len
(
s
.
taskQueues
.
Pending
)
==
0
{
if
len
(
s
.
state
.
Todo
)
==
0
{
if
len
(
s
.
state
.
Done
)
==
0
&&
len
(
s
.
state
.
Pending
)
==
0
{
log
.
WithFields
(
s
.
logFields
())
.
Warningln
(
"All tasks failed, may start next pass"
)
return
ErrAllTaskFailed
}
...
...
@@ -384,10 +391,10 @@ func (s *Service) GetTask(passID int, task *Task) error {
return
ErrNoMoreAvailable
}
t
:=
s
.
taskQueues
.
Todo
[
0
]
t
:=
s
.
state
.
Todo
[
0
]
t
.
Task
.
Meta
.
Epoch
++
s
.
taskQueues
.
Todo
=
s
.
taskQueues
.
Todo
[
1
:
]
s
.
taskQueues
.
Pending
[
t
.
Task
.
Meta
.
ID
]
=
t
s
.
state
.
Todo
=
s
.
state
.
Todo
[
1
:
]
s
.
state
.
Pending
[
t
.
Task
.
Meta
.
ID
]
=
t
err
:=
s
.
snapshot
()
if
err
!=
nil
{
return
err
...
...
@@ -409,7 +416,7 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s
.
mu
.
Lock
()
defer
s
.
mu
.
Unlock
()
t
,
ok
:=
s
.
taskQueues
.
Pending
[
taskID
]
t
,
ok
:=
s
.
state
.
Pending
[
taskID
]
if
!
ok
{
log
.
WithFields
(
s
.
logFields
())
.
Warningln
(
"Pending task #%d not found."
,
taskID
)
return
nil
...
...
@@ -417,18 +424,18 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
// task finished, reset timeout
t
.
NumFailure
=
0
s
.
taskQueues
.
Done
=
append
(
s
.
taskQueues
.
Done
,
t
)
delete
(
s
.
taskQueues
.
Pending
,
taskID
)
s
.
state
.
Done
=
append
(
s
.
state
.
Done
,
t
)
delete
(
s
.
state
.
Pending
,
taskID
)
log
.
WithFields
(
s
.
logFields
())
.
Infof
(
"Task #%d finished."
,
taskID
)
if
len
(
s
.
taskQueues
.
Todo
)
==
0
&&
len
(
s
.
taskQueues
.
Pending
)
==
0
{
if
len
(
s
.
state
.
Todo
)
==
0
&&
len
(
s
.
state
.
Pending
)
==
0
{
// increase master side pass count if all tasks finished
s
.
cur
rPass
++
s
.
taskQueues
.
Todo
=
s
.
j
obTasks
s
.
taskQueues
.
Done
=
[]
taskEntry
{}
s
.
state
.
Cu
rPass
++
s
.
state
.
Todo
=
s
.
state
.
J
obTasks
s
.
state
.
Done
=
[]
taskEntry
{}
// TODO(typhoonzero): deal with failed tasks
s
.
taskQueues
.
Failed
=
[]
taskEntry
{}
log
.
WithFields
(
s
.
logFields
())
.
Warningf
(
"all task finished, add new pass data, newpass: %d."
,
s
.
cur
rPass
)
s
.
state
.
Failed
=
[]
taskEntry
{}
log
.
WithFields
(
s
.
logFields
())
.
Warningf
(
"all task finished, add new pass data, newpass: %d."
,
s
.
state
.
Cu
rPass
)
}
err
:=
s
.
snapshot
()
...
...
@@ -447,7 +454,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
s
.
mu
.
Lock
()
defer
s
.
mu
.
Unlock
()
t
,
ok
:=
s
.
taskQueues
.
Pending
[
meta
.
ID
]
t
,
ok
:=
s
.
state
.
Pending
[
meta
.
ID
]
if
!
ok
{
log
.
WithFields
(
s
.
logFields
())
.
Warningln
(
"TaskFailed:Pending task #%v not found."
,
t
.
Task
.
Meta
)
return
nil
...
...
go/pserver/client/etcd_client.go
浏览文件 @
10794cf4
...
...
@@ -103,7 +103,7 @@ func (p *EtcdClient) List() []Server {
time
.
Sleep
(
p
.
timeout
)
continue
}
log
.
Info
f
(
"got value (%s) for key: %s"
,
psAddr
,
psKey
)
log
.
Debug
f
(
"got value (%s) for key: %s"
,
psAddr
,
psKey
)
servers
[
i
]
.
Index
=
i
servers
[
i
]
.
Addr
=
psAddr
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录