diff --git a/go/master/service.go b/go/master/service.go index c47319317ad7c086ff4832da1636a978ee94aacf..29ff63bcc90dbcd81af9771d39d443500ac79cb1 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -39,9 +39,9 @@ type Task struct { } type taskEntry struct { - NumTimeout int - Task Task - NumFailed int + Task Task + // A task fails if it's timeout or trainer reports it exits unnormally. + NumFailure int } type taskQueues struct { @@ -53,11 +53,11 @@ type taskQueues struct { // Service is the master server service. type Service struct { - chunksPerTask int - timeoutDur time.Duration - failortimeoutMax int - ready chan struct{} - store Store + chunksPerTask int + timeoutDur time.Duration + failureMax int + ready chan struct{} + store Store mu sync.Mutex initDone bool @@ -92,11 +92,11 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry { } // NewService creates a new service. -func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failortimeoutMax int) (*Service, error) { +func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failureMax int) (*Service, error) { s := &Service{} s.chunksPerTask = chunksPerTask s.timeoutDur = timeoutDur - s.failortimeoutMax = failortimeoutMax + s.failureMax = failureMax s.taskQueues = taskQueues{} s.taskQueues.Pending = make(map[int]taskEntry) s.ready = make(chan struct{}) @@ -258,7 +258,7 @@ func (s *Service) SetDataset(globPaths []string, dummy *int) error { return nil } -func (s *Service) checkTaskStatus(t taskEntry, epoch int) { +func (s *Service) procFailedTask(t taskEntry, epoch int) { if t.Task.Epoch != epoch { // new epoch, task launched after the // schedule of this timeout check or failed status report. @@ -274,14 +274,14 @@ func (s *Service) checkTaskStatus(t taskEntry, epoch int) { delete(s.taskQueues.Pending, t.Task.ID) - t.NumTimeout++ - if t.NumTimeout+t.NumFailed > s.failortimeoutMax { - log.Warningf("Task %v timed out %d times and failed %d times, discard.", t.Task, t.NumTimeout, t.NumFailed) + t.NumFailure++ + if t.NumFailure > s.failureMax { + log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure) s.taskQueues.Failed = append(s.taskQueues.Failed, t) return } - log.Warningf("Task %v timed out %d times and failed %d times, discard.", t.Task, t.NumTimeout, t.NumFailed) + log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure) s.taskQueues.Todo = append(s.taskQueues.Todo, t) return } @@ -296,7 +296,7 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() { return } - s.checkTaskStatus(t, epoch) + s.procFailedTask(t, epoch) } } @@ -377,8 +377,7 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { } // task finished, reset timeout - t.NumTimeout = 0 - t.NumFailed = 0 + t.NumFailure = 0 s.taskQueues.Done = append(s.taskQueues.Done, t) delete(s.taskQueues.Pending, taskID) @@ -413,6 +412,6 @@ func (s *Service) TaskFailed(taskID int, epoch int) error { return err } - s.checkTaskStatus(t, epoch) + s.procFailedTask(t, epoch) return nil }