pserver.go 1.8 KB
Newer Older
1 2 3 4 5 6 7
package main

import (
	"net"
	"net/http"
	"net/rpc"
	"strconv"
W
wuyi05 已提交
8
	"time"
9

H
Helin Wang 已提交
10
	"github.com/namsral/flag"
11
	"github.com/topicai/candy"
H
Helin Wang 已提交
12

13
	"github.com/PaddlePaddle/Paddle/go/pserver"
W
wuyi05 已提交
14
	log "github.com/sirupsen/logrus"
15 16 17
)

func main() {
H
Helin Wang 已提交
18
	port := flag.Int("port", 0, "port of the pserver")
Q
Qiao Longfei 已提交
19
	index := flag.Int("index", -1, "index of this pserver, should be larger or equal than 0")
W
wuyi05 已提交
20 21
	etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
		"comma separated endpoint string for pserver to connect to etcd")
22
	etcdTimeout := flag.Duration("etcd-timeout", 5*time.Second, "timeout for etcd calls")
Y
yi.wu 已提交
23
	numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
D
dongzhihong 已提交
24
	checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
25
	checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
W
wuyi05 已提交
26 27
	logLevel := flag.String("log-level", "info",
		"log level, possible values: debug, info, warning, error, fatal, panic")
28 29
	flag.Parse()

W
wuyi05 已提交
30
	level, err := log.ParseLevel(*logLevel)
31 32
	candy.Must(err)

W
wuyi05 已提交
33 34
	log.SetLevel(level)

Q
Qiao Longfei 已提交
35
	var idx int
36

D
dongzhihong 已提交
37
	var cp pserver.Checkpoint
D
dongzhihong 已提交
38
	var e *pserver.EtcdClient
Q
Qiao Longfei 已提交
39 40 41
	if *index >= 0 {
		idx = *index
	} else {
42
		e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *etcdTimeout)
43
		idx, err = e.Register(*port)
44 45 46
		candy.Must(err)

		cp, err = pserver.NewCheckpointFromFile(*checkpointPath, idx, e)
Q
Qiao Longfei 已提交
47
		if err != nil {
48
			log.Errorf("Fetch checkpoint failed, %s", err)
Q
Qiao Longfei 已提交
49
		}
50 51
	}

D
dongzhihong 已提交
52
	s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp)
53 54
	candy.Must(err)

W
wuyi05 已提交
55
	err = rpc.Register(s)
56
	candy.Must(err)
57 58 59

	rpc.HandleHTTP()
	l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
60
	candy.Must(err)
61

W
wuyi05 已提交
62
	log.Infof("start pserver at port %d", *port)
63
	err = http.Serve(l, nil)
64
	candy.Must(err)
65
}