service.go 9.7 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17
package master

import (
18 19 20
	"bytes"
	"compress/gzip"
	"encoding/gob"
21
	"errors"
22 23
	"os"
	"path/filepath"
24 25 26
	"sync"
	"time"

H
Helin Wang 已提交
27 28
	log "github.com/sirupsen/logrus"

29
	"github.com/PaddlePaddle/recordio"
30 31
)

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
const (
	dialTimeout = 5 * time.Second
)

// Store is the interface for save and load the master state.
type Store interface {
	Save([]byte) error
	Load() ([]byte, error)
}

// Chunk is a chunk of data consisted of several data instances.
type Chunk struct {
	Path  string
	Index recordio.Index // chunk index
}

G
gongweibao 已提交
48 49 50 51 52 53
// TaskMeta is a struct which stores task's meta info.
type TaskMeta struct {
	ID    int
	Epoch int
}

54 55
// Task is the basic unit of data instances assigned to trainers.
type Task struct {
G
gongweibao 已提交
56
	Meta   TaskMeta
57 58 59 60
	Chunks []Chunk
}

type taskEntry struct {
G
gongweibao 已提交
61 62 63
	Task Task
	// A task fails if it's timeout or trainer reports it exits unnormally.
	NumFailure int
64 65 66 67 68 69
}

type taskQueues struct {
	Todo    []taskEntry
	Pending map[int]taskEntry // map from task ID to task entry
	Done    []taskEntry
G
gongweibao 已提交
70
	Failed  []taskEntry
71 72
}

73 74
// Service is the master server service.
type Service struct {
G
gongweibao 已提交
75 76 77 78 79
	chunksPerTask int
	timeoutDur    time.Duration
	failureMax    int
	ready         chan struct{}
	store         Store
80 81

	mu         sync.Mutex
H
Helin Wang 已提交
82
	initDone   bool
83 84 85
	taskQueues taskQueues
}

H
Helin Wang 已提交
86
func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
87
	id := 0
H
Helin Wang 已提交
88 89
	if chunksPerTask <= 0 {
		chunksPerTask = 1
90 91 92 93 94
	}

	var result []taskEntry
	var cur taskEntry
	for i, c := range chunks {
H
Helin Wang 已提交
95
		if i%chunksPerTask == 0 && len(cur.Task.Chunks) > 0 {
G
gongweibao 已提交
96
			cur.Task.Meta.ID = id
97 98 99 100 101 102 103 104 105
			id++
			result = append(result, cur)
			cur.Task.Chunks = nil
		}

		cur.Task.Chunks = append(cur.Task.Chunks, c)
	}

	if len(cur.Task.Chunks) > 0 {
G
gongweibao 已提交
106
		cur.Task.Meta.ID = id
107 108 109 110 111 112 113
		result = append(result, cur)
	}

	return result
}

// NewService creates a new service.
G
gongweibao 已提交
114
func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failureMax int) (*Service, error) {
115
	s := &Service{}
116
	s.chunksPerTask = chunksPerTask
117
	s.timeoutDur = timeoutDur
G
gongweibao 已提交
118
	s.failureMax = failureMax
119 120
	s.taskQueues = taskQueues{}
	s.taskQueues.Pending = make(map[int]taskEntry)
121
	s.ready = make(chan struct{})
122 123 124 125 126
	s.store = store
	recovered, err := s.recover()
	if err != nil {
		return nil, err
	}
127

128 129 130 131 132
	if recovered {
		// Recovered. Now the state is already initialized,
		// and the master is ready.
		s.initDone = true
		close(s.ready)
133
		log.Info("Master recovered from saved state.")
134
	}
135

136
	return s, nil
137 138
}

139 140 141 142 143 144
// recover recovers service state from etcd.
func (s *Service) recover() (bool, error) {
	state, err := s.store.Load()
	if err != nil {
		return false, err
	}
145

146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
	if state == nil {
		log.Infoln("No state exists, not recovered.")
		return false, nil
	}

	log.Infof("Loaded snapshot of size: %d bytes.", len(state))
	gr, err := gzip.NewReader(bytes.NewReader(state))
	if err != nil {
		return false, err
	}

	dec := gob.NewDecoder(gr)
	var tqs taskQueues
	err = dec.Decode(&tqs)
	if err != nil {
		return false, err
	}

	err = gr.Close()
	if err != nil {
		// Only close failed, recover actually succeed, so
		// just log error.
		log.Errorln(err)
	}

	s.taskQueues = tqs
	return true, nil
173 174
}

175
// snapshot *must* be called with s.mu being held.
176
func (s *Service) snapshot() error {
177
	// TODO(helin): etcd request has a size limit, so the snapshot
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
	// size is limited by the max request size. We should either
	// divide the snapshot into smaller chunks and save under
	// different keys, or configure the request size to be big
	// enough:
	// https://github.com/coreos/etcd/blob/2f84f3d8d8ed8f9537ab6ffa44a3a1c7eddfa9b1/embed/config.go#L44
	var buf bytes.Buffer
	gw := gzip.NewWriter(&buf)
	enc := gob.NewEncoder(gw)
	err := enc.Encode(s.taskQueues)
	if err != nil {
		return err
	}
	err = gw.Close()
	if err != nil {
		return err
	}

	state := buf.Bytes()
	log.Infof("Saving snapshot of size: %d bytes.", len(state))
	return s.store.Save(state)
198 199
}

H
Helin Wang 已提交
200
func readChunks(globPaths []string) ([]Chunk, error) {
201 202 203 204 205 206
	var chunks []Chunk
	var paths []string

	for _, s := range globPaths {
		match, err := filepath.Glob(s)
		if err != nil {
H
Helin Wang 已提交
207
			return nil, err
208 209 210 211 212
		}
		paths = append(paths, match...)
	}

	if len(paths) == 0 {
H
Helin Wang 已提交
213
		return nil, errors.New("no valid dataset specified")
214 215 216 217 218
	}

	for _, path := range paths {
		f, err := os.Open(path)
		if err != nil {
H
Helin Wang 已提交
219
			return nil, err
220 221 222 223
		}

		index, err := recordio.LoadIndex(f)
		if err != nil {
H
Helin Wang 已提交
224
			return nil, err
225 226 227
		}
		err = f.Close()
		if err != nil {
H
Helin Wang 已提交
228
			return nil, err
229 230 231
		}

		count := index.NumChunks()
232
		log.Infof("readChunks: file %s has %d chunks", path, count)
233 234 235 236 237 238 239 240 241
		for i := 0; i < count; i++ {
			chunk := Chunk{
				Path:  path,
				Index: *index.ChunkIndex(i),
			}
			chunks = append(chunks, chunk)
		}
	}

H
Helin Wang 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
	return chunks, nil
}

// SetDataset sets dataset to dispatch for the master server.
//
// SetDataset can be call multiple times. But only the first call will
// be honored.
func (s *Service) SetDataset(globPaths []string, dummy *int) error {
	if len(globPaths) == 0 {
		return errors.New("no dataset specified")
	}

	s.mu.Lock()
	defer s.mu.Unlock()
	if s.initDone {
		// Already initialized. All trainer will call
		// SetDataset, but we only handle the first one. Treat
		// other calls as successful but do nothing.
		return nil
	}

H
Helin Wang 已提交
263
	chunks, err := readChunks(globPaths)
H
Helin Wang 已提交
264 265 266 267
	if err != nil {
		return err
	}

268 269
	s.taskQueues.Todo = partition(chunks, s.chunksPerTask)

H
Helin Wang 已提交
270
	err = s.snapshot()
271
	if err != nil {
H
Helin Wang 已提交
272
		log.Errorln(err)
273 274 275 276
		return err
	}

	close(s.ready)
H
Helin Wang 已提交
277
	s.initDone = true
278 279 280
	return nil
}

G
gongweibao 已提交
281 282
func (s *Service) processFailedTask(t taskEntry, epoch int) {
	if t.Task.Meta.Epoch != epoch {
G
gongweibao 已提交
283 284 285 286 287 288 289 290 291 292 293 294
		// new epoch, task launched after the
		// schedule of this timeout check or failed status report.
		return
	}

	defer func() {
		err := s.snapshot()
		if err != nil {
			log.Errorln(err)
		}
	}()

G
gongweibao 已提交
295
	delete(s.taskQueues.Pending, t.Task.Meta.ID)
G
gongweibao 已提交
296

G
gongweibao 已提交
297 298 299
	t.NumFailure++
	if t.NumFailure > s.failureMax {
		log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
G
gongweibao 已提交
300 301 302 303
		s.taskQueues.Failed = append(s.taskQueues.Failed, t)
		return
	}

G
gongweibao 已提交
304
	log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
G
gongweibao 已提交
305 306 307
	s.taskQueues.Todo = append(s.taskQueues.Todo, t)
}

H
Helin Wang 已提交
308 309 310 311 312 313 314 315 316 317
func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
	return func() {
		s.mu.Lock()
		defer s.mu.Unlock()

		t, ok := s.taskQueues.Pending[taskID]
		if !ok {
			return
		}

G
gongweibao 已提交
318
		s.processFailedTask(t, epoch)
H
Helin Wang 已提交
319 320 321
	}
}

H
Helin Wang 已提交
322 323 324 325 326 327 328 329 330 331
// must be called with lock held.
func (s *Service) logFields() log.Fields {
	return log.Fields{
		"todoLen":    len(s.taskQueues.Todo),
		"pendingLen": len(s.taskQueues.Pending),
		"doneLen":    len(s.taskQueues.Done),
		"failedLen":  len(s.taskQueues.Failed),
	}
}

332 333
// GetTask gets a new task from the service.
func (s *Service) GetTask(dummy int, task *Task) error {
334 335 336 337
	select {
	case <-s.ready:
	}

338 339 340 341
	s.mu.Lock()
	defer s.mu.Unlock()

	if len(s.taskQueues.Todo) == 0 {
342 343
		if len(s.taskQueues.Done) == 0 {
			if len(s.taskQueues.Pending) == 0 {
H
Helin Wang 已提交
344
				err := errors.New("all task failed")
H
Helin Wang 已提交
345
				log.WithFields(s.logFields()).Warningln("All tasks failed.")
H
Helin Wang 已提交
346
				return err
347 348 349 350 351
			}

			// TODO(helin): client need to retry in this
			// error case. Gotcha: RPC client can't
			// compare returned error with predefined
H
Helin Wang 已提交
352 353 354 355 356
			// errors like io.EOF, because the error
			// instance deserialized from RPC is a
			// different instance than the error defined
			// in package. So we need to figure out a way
			// for client to check this error correctly.
H
Helin Wang 已提交
357
			err := errors.New("no more available task")
H
Helin Wang 已提交
358
			log.WithFields(s.logFields()).Warningln("No more available task.")
H
Helin Wang 已提交
359
			return err
360 361
		}
		s.taskQueues.Todo = s.taskQueues.Done
H
Helin Wang 已提交
362
		s.taskQueues.Done = nil
H
Helin Wang 已提交
363
		log.WithFields(s.logFields()).Infoln("No more todo task, but trainer is requesting task to do. Move all done task to todo.")
364 365 366
	}

	t := s.taskQueues.Todo[0]
G
gongweibao 已提交
367
	t.Task.Meta.Epoch++
368
	s.taskQueues.Todo = s.taskQueues.Todo[1:]
G
gongweibao 已提交
369
	s.taskQueues.Pending[t.Task.Meta.ID] = t
370 371 372 373 374
	err := s.snapshot()
	if err != nil {
		return err
	}

375
	*task = t.Task
G
gongweibao 已提交
376
	log.WithFields(s.logFields()).Infof("Task #%v dispatched.", t.Task.Meta)
377

G
gongweibao 已提交
378
	time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch))
379 380 381 382 383
	return nil
}

// TaskFinished tell the service that a task is finished.
func (s *Service) TaskFinished(taskID int, dummy *int) error {
384 385 386 387
	select {
	case <-s.ready:
	}

388 389 390 391 392
	s.mu.Lock()
	defer s.mu.Unlock()

	t, ok := s.taskQueues.Pending[taskID]
	if !ok {
H
Helin Wang 已提交
393
		log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID)
G
gongweibao 已提交
394
		return nil
395 396 397
	}

	// task finished, reset timeout
G
gongweibao 已提交
398
	t.NumFailure = 0
399 400
	s.taskQueues.Done = append(s.taskQueues.Done, t)
	delete(s.taskQueues.Pending, taskID)
401

H
Helin Wang 已提交
402 403
	log.WithFields(s.logFields()).Infof("Task #%d finished.", taskID)

H
Helin Wang 已提交
404
	if len(s.taskQueues.Pending) == 0 && len(s.taskQueues.Todo) == 0 {
H
Helin Wang 已提交
405
		log.WithFields(s.logFields()).Infoln("No more todo and pending task, start a new pass.")
406
		s.taskQueues.Todo = append(s.taskQueues.Todo, s.taskQueues.Done...)
407 408 409
		s.taskQueues.Done = nil
	}

H
Helin Wang 已提交
410 411 412 413 414
	err := s.snapshot()
	if err != nil {
		log.Errorln(err)
	}
	return err
415
}
G
gongweibao 已提交
416

G
gongweibao 已提交
417
// TaskFailed tells the service that a task is failed.
G
gongweibao 已提交
418
func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
G
gongweibao 已提交
419 420 421 422 423 424 425
	select {
	case <-s.ready:
	}

	s.mu.Lock()
	defer s.mu.Unlock()

G
gongweibao 已提交
426
	t, ok := s.taskQueues.Pending[meta.ID]
G
gongweibao 已提交
427
	if !ok {
G
gongweibao 已提交
428
		log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", t.Task.Meta)
G
gongweibao 已提交
429
		return nil
G
gongweibao 已提交
430 431
	}

G
gongweibao 已提交
432
	s.processFailedTask(t, meta.Epoch)
G
gongweibao 已提交
433 434
	return nil
}