master.go 1.7 KB
Newer Older
H
Helin Wang 已提交
1 2 3 4 5 6 7
package main

import (
	"net"
	"net/http"
	"net/rpc"
	"strconv"
8 9
	"strings"
	"sync"
H
Helin Wang 已提交
10 11
	"time"

H
Helin Wang 已提交
12
	"github.com/namsral/flag"
13
	log "github.com/sirupsen/logrus"
H
Helin Wang 已提交
14

15
	"github.com/PaddlePaddle/Paddle/go/master"
H
Helin Wang 已提交
16 17
)

18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
type inMemStore struct {
	mu  sync.Mutex
	buf []byte
}

func (m *inMemStore) Save(b []byte) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	m.buf = b
	return nil
}

func (m *inMemStore) Load() ([]byte, error) {
	m.mu.Lock()
	defer m.mu.Unlock()

	return m.buf, nil
}

H
Helin Wang 已提交
38
func main() {
H
Helin Wang 已提交
39
	port := flag.Int("port", 8080, "port of the master server.")
40

41 42
	ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.")
	endpoints := flag.String("endpoints", "", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.")
H
Helin Wang 已提交
43 44 45
	taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
	taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
	chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
H
Helin Wang 已提交
46 47
	flag.Parse()

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
	if *endpoints == "" {
		log.Warningln("-endpoints not set, fault tolerance not be enabled.")
	}

	var store master.Store
	if *endpoints != "" {
		eps := strings.Split(*endpoints, ",")
		var err error
		store, err = master.NewEtcdStore(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec)
		if err != nil {
			log.Fatal(err)
		}
	} else {
		store = &inMemStore{}
	}
H
Helin Wang 已提交
63

64 65 66
	s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
	if err != nil {
		log.Fatal(err)
H
Helin Wang 已提交
67 68
	}

69
	err = rpc.Register(s)
H
Helin Wang 已提交
70
	if err != nil {
71
		log.Fatal(err)
H
Helin Wang 已提交
72 73 74 75 76
	}

	rpc.HandleHTTP()
	l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
	if err != nil {
77
		log.Fatal(err)
H
Helin Wang 已提交
78 79 80 81
	}

	err = http.Serve(l, nil)
	if err != nil {
82
		log.Fatal(err)
H
Helin Wang 已提交
83 84
	}
}