提交 108b0fad 编写于 作者: G gongweibao

fix by helin and wuyi's comments

上级 52cc601b
...@@ -39,9 +39,9 @@ type Task struct { ...@@ -39,9 +39,9 @@ type Task struct {
} }
type taskEntry struct { type taskEntry struct {
NumTimeout int Task Task
Task Task // A task fails if it's timeout or trainer reports it exits unnormally.
NumFailed int NumFailure int
} }
type taskQueues struct { type taskQueues struct {
...@@ -53,11 +53,11 @@ type taskQueues struct { ...@@ -53,11 +53,11 @@ type taskQueues struct {
// Service is the master server service. // Service is the master server service.
type Service struct { type Service struct {
chunksPerTask int chunksPerTask int
timeoutDur time.Duration timeoutDur time.Duration
failortimeoutMax int failureMax int
ready chan struct{} ready chan struct{}
store Store store Store
mu sync.Mutex mu sync.Mutex
initDone bool initDone bool
...@@ -92,11 +92,11 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry { ...@@ -92,11 +92,11 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
} }
// NewService creates a new service. // NewService creates a new service.
func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failortimeoutMax int) (*Service, error) { func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failureMax int) (*Service, error) {
s := &Service{} s := &Service{}
s.chunksPerTask = chunksPerTask s.chunksPerTask = chunksPerTask
s.timeoutDur = timeoutDur s.timeoutDur = timeoutDur
s.failortimeoutMax = failortimeoutMax s.failureMax = failureMax
s.taskQueues = taskQueues{} s.taskQueues = taskQueues{}
s.taskQueues.Pending = make(map[int]taskEntry) s.taskQueues.Pending = make(map[int]taskEntry)
s.ready = make(chan struct{}) s.ready = make(chan struct{})
...@@ -258,7 +258,7 @@ func (s *Service) SetDataset(globPaths []string, dummy *int) error { ...@@ -258,7 +258,7 @@ func (s *Service) SetDataset(globPaths []string, dummy *int) error {
return nil return nil
} }
func (s *Service) checkTaskStatus(t taskEntry, epoch int) { func (s *Service) procFailedTask(t taskEntry, epoch int) {
if t.Task.Epoch != epoch { if t.Task.Epoch != epoch {
// new epoch, task launched after the // new epoch, task launched after the
// schedule of this timeout check or failed status report. // schedule of this timeout check or failed status report.
...@@ -274,14 +274,14 @@ func (s *Service) checkTaskStatus(t taskEntry, epoch int) { ...@@ -274,14 +274,14 @@ func (s *Service) checkTaskStatus(t taskEntry, epoch int) {
delete(s.taskQueues.Pending, t.Task.ID) delete(s.taskQueues.Pending, t.Task.ID)
t.NumTimeout++ t.NumFailure++
if t.NumTimeout+t.NumFailed > s.failortimeoutMax { if t.NumFailure > s.failureMax {
log.Warningf("Task %v timed out %d times and failed %d times, discard.", t.Task, t.NumTimeout, t.NumFailed) log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
s.taskQueues.Failed = append(s.taskQueues.Failed, t) s.taskQueues.Failed = append(s.taskQueues.Failed, t)
return return
} }
log.Warningf("Task %v timed out %d times and failed %d times, discard.", t.Task, t.NumTimeout, t.NumFailed) log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
s.taskQueues.Todo = append(s.taskQueues.Todo, t) s.taskQueues.Todo = append(s.taskQueues.Todo, t)
return return
} }
...@@ -296,7 +296,7 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() { ...@@ -296,7 +296,7 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
return return
} }
s.checkTaskStatus(t, epoch) s.procFailedTask(t, epoch)
} }
} }
...@@ -377,8 +377,7 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { ...@@ -377,8 +377,7 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
} }
// task finished, reset timeout // task finished, reset timeout
t.NumTimeout = 0 t.NumFailure = 0
t.NumFailed = 0
s.taskQueues.Done = append(s.taskQueues.Done, t) s.taskQueues.Done = append(s.taskQueues.Done, t)
delete(s.taskQueues.Pending, taskID) delete(s.taskQueues.Pending, taskID)
...@@ -413,6 +412,6 @@ func (s *Service) TaskFailed(taskID int, epoch int) error { ...@@ -413,6 +412,6 @@ func (s *Service) TaskFailed(taskID int, epoch int) error {
return err return err
} }
s.checkTaskStatus(t, epoch) s.procFailedTask(t, epoch)
return nil return nil
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册