提交 544c7db7 编写于 作者: H helinwang 提交者: GitHub

Merge pull request #3223 from helinwang/master_timeout

Master persist more states to etcd, schedule pending timeout after lo…
...@@ -77,11 +77,12 @@ type taskEntry struct { ...@@ -77,11 +77,12 @@ type taskEntry struct {
NumFailure int NumFailure int
} }
type taskQueues struct { type masterState struct {
Todo []taskEntry Todo []taskEntry
Pending map[int]taskEntry // map from task ID to task entry Pending map[int]taskEntry // map from task ID to task entry
Done []taskEntry Done []taskEntry
Failed []taskEntry Failed []taskEntry
CurPass int
} }
// Service is the master server service. // Service is the master server service.
...@@ -94,11 +95,11 @@ type Service struct { ...@@ -94,11 +95,11 @@ type Service struct {
ready chan struct{} ready chan struct{}
initDone bool initDone bool
mu sync.Mutex mu sync.Mutex
taskQueues taskQueues // State to be persisted to snapshot.
currPass int state masterState
jobTasks []taskEntry // The trainer that is currently saving model. This state is
// transient, does not need to be persisted to snapshot.
savingTrainer string savingTrainer string
} }
...@@ -141,8 +142,8 @@ func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failur ...@@ -141,8 +142,8 @@ func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failur
s.chunksPerTask = chunksPerTask s.chunksPerTask = chunksPerTask
s.timeoutDur = timeoutDur s.timeoutDur = timeoutDur
s.failureMax = failureMax s.failureMax = failureMax
s.taskQueues = taskQueues{} s.state = masterState{}
s.taskQueues.Pending = make(map[int]taskEntry) s.state.Pending = make(map[int]taskEntry)
s.ready = make(chan struct{}) s.ready = make(chan struct{})
s.store = store s.store = store
recovered, err := s.recover() recovered, err := s.recover()
...@@ -180,7 +181,7 @@ func (s *Service) recover() (bool, error) { ...@@ -180,7 +181,7 @@ func (s *Service) recover() (bool, error) {
} }
dec := gob.NewDecoder(gr) dec := gob.NewDecoder(gr)
var tqs taskQueues var tqs masterState
err = dec.Decode(&tqs) err = dec.Decode(&tqs)
if err != nil { if err != nil {
return false, err return false, err
...@@ -193,7 +194,12 @@ func (s *Service) recover() (bool, error) { ...@@ -193,7 +194,12 @@ func (s *Service) recover() (bool, error) {
log.Errorln(err) log.Errorln(err)
} }
s.taskQueues = tqs s.state = tqs
log.WithFields(s.logFields()).Infof("Master recovered from snapshot, scheduling pending task timeout check.")
for _, t := range s.state.Pending {
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch))
}
return true, nil return true, nil
} }
...@@ -208,7 +214,7 @@ func (s *Service) snapshot() error { ...@@ -208,7 +214,7 @@ func (s *Service) snapshot() error {
var buf bytes.Buffer var buf bytes.Buffer
gw := gzip.NewWriter(&buf) gw := gzip.NewWriter(&buf)
enc := gob.NewEncoder(gw) enc := gob.NewEncoder(gw)
err := enc.Encode(s.taskQueues) err := enc.Encode(s.state)
if err != nil { if err != nil {
return err return err
} }
...@@ -290,8 +296,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error { ...@@ -290,8 +296,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error {
return err return err
} }
s.jobTasks = partition(chunks, s.chunksPerTask) s.state.Todo = partition(chunks, s.chunksPerTask)
s.taskQueues.Todo = s.jobTasks
err = s.snapshot() err = s.snapshot()
if err != nil { if err != nil {
...@@ -319,17 +324,17 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) { ...@@ -319,17 +324,17 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
} }
}() }()
delete(s.taskQueues.Pending, t.Task.Meta.ID) delete(s.state.Pending, t.Task.Meta.ID)
t.NumFailure++ t.NumFailure++
if t.NumFailure > s.failureMax { if t.NumFailure > s.failureMax {
log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure) log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
s.taskQueues.Failed = append(s.taskQueues.Failed, t) s.state.Failed = append(s.state.Failed, t)
return return
} }
log.Warningf("Task %v failed %d times, re-dispatch.", t.Task, t.NumFailure) log.Warningf("Task %v failed %d times, re-dispatch.", t.Task, t.NumFailure)
s.taskQueues.Todo = append(s.taskQueues.Todo, t) s.state.Todo = append(s.state.Todo, t)
return return
} }
...@@ -338,7 +343,7 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() { ...@@ -338,7 +343,7 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
t, ok := s.taskQueues.Pending[taskID] t, ok := s.state.Pending[taskID]
if !ok { if !ok {
return return
} }
...@@ -350,10 +355,11 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() { ...@@ -350,10 +355,11 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
// must be called with lock held. // must be called with lock held.
func (s *Service) logFields() log.Fields { func (s *Service) logFields() log.Fields {
return log.Fields{ return log.Fields{
"todoLen": len(s.taskQueues.Todo), "todoLen": len(s.state.Todo),
"pendingLen": len(s.taskQueues.Pending), "pendingLen": len(s.state.Pending),
"doneLen": len(s.taskQueues.Done), "doneLen": len(s.state.Done),
"failedLen": len(s.taskQueues.Failed), "failedLen": len(s.state.Failed),
"curPass": s.state.CurPass,
} }
} }
...@@ -366,17 +372,17 @@ func (s *Service) GetTask(passID int, task *Task) error { ...@@ -366,17 +372,17 @@ func (s *Service) GetTask(passID int, task *Task) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if passID < s.currPass { if passID < s.state.CurPass {
return ErrPassBefore return ErrPassBefore
} }
if passID > s.currPass { if passID > s.state.CurPass {
// Client may get run to pass after master when one client faster than the // Client may get run to pass after master when one client faster than the
// other // other
return ErrPassAfter return ErrPassAfter
} }
if len(s.taskQueues.Todo) == 0 { if len(s.state.Todo) == 0 {
if len(s.taskQueues.Done) == 0 && len(s.taskQueues.Pending) == 0 { if len(s.state.Done) == 0 && len(s.state.Pending) == 0 {
log.WithFields(s.logFields()).Warningln("All tasks failed, may start next pass") log.WithFields(s.logFields()).Warningln("All tasks failed, may start next pass")
return ErrAllTaskFailed return ErrAllTaskFailed
} }
...@@ -384,10 +390,10 @@ func (s *Service) GetTask(passID int, task *Task) error { ...@@ -384,10 +390,10 @@ func (s *Service) GetTask(passID int, task *Task) error {
return ErrNoMoreAvailable return ErrNoMoreAvailable
} }
t := s.taskQueues.Todo[0] t := s.state.Todo[0]
t.Task.Meta.Epoch++ t.Task.Meta.Epoch++
s.taskQueues.Todo = s.taskQueues.Todo[1:] s.state.Todo = s.state.Todo[1:]
s.taskQueues.Pending[t.Task.Meta.ID] = t s.state.Pending[t.Task.Meta.ID] = t
err := s.snapshot() err := s.snapshot()
if err != nil { if err != nil {
return err return err
...@@ -409,7 +415,7 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { ...@@ -409,7 +415,7 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
t, ok := s.taskQueues.Pending[taskID] t, ok := s.state.Pending[taskID]
if !ok { if !ok {
log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID) log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID)
return nil return nil
...@@ -417,18 +423,18 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { ...@@ -417,18 +423,18 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
// task finished, reset timeout // task finished, reset timeout
t.NumFailure = 0 t.NumFailure = 0
s.taskQueues.Done = append(s.taskQueues.Done, t) s.state.Done = append(s.state.Done, t)
delete(s.taskQueues.Pending, taskID) delete(s.state.Pending, taskID)
log.WithFields(s.logFields()).Infof("Task #%d finished.", taskID) log.WithFields(s.logFields()).Infof("Task #%d finished.", taskID)
if len(s.taskQueues.Todo) == 0 && len(s.taskQueues.Pending) == 0 { if len(s.state.Todo) == 0 && len(s.state.Pending) == 0 {
// increase master side pass count if all tasks finished // increase master side pass count if all tasks finished
s.currPass++ s.state.CurPass++
s.taskQueues.Todo = s.jobTasks s.state.Todo = append(s.state.Done, s.state.Failed...)
s.taskQueues.Done = []taskEntry{} s.state.Done = []taskEntry{}
// TODO(typhoonzero): deal with failed tasks // TODO(typhoonzero): deal with failed tasks
s.taskQueues.Failed = []taskEntry{} s.state.Failed = []taskEntry{}
log.WithFields(s.logFields()).Warningf("all task finished, add new pass data, newpass: %d.", s.currPass) log.WithFields(s.logFields()).Warningf("all task finished, add new pass data, newpass: %d.", s.state.CurPass)
} }
err := s.snapshot() err := s.snapshot()
...@@ -447,7 +453,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error { ...@@ -447,7 +453,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
t, ok := s.taskQueues.Pending[meta.ID] t, ok := s.state.Pending[meta.ID]
if !ok { if !ok {
log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", t.Task.Meta) log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", t.Task.Meta)
return nil return nil
......
...@@ -103,7 +103,7 @@ func (p *EtcdClient) List() []Server { ...@@ -103,7 +103,7 @@ func (p *EtcdClient) List() []Server {
time.Sleep(p.timeout) time.Sleep(p.timeout)
continue continue
} }
log.Infof("got value (%s) for key: %s", psAddr, psKey) log.Debugf("got value (%s) for key: %s", psAddr, psKey)
servers[i].Index = i servers[i].Index = i
servers[i].Addr = psAddr servers[i].Addr = psAddr
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册