pserver.go 2.8 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17 18 19 20
package main

import (
	"net"
	"net/http"
	"net/rpc"
21 22
	"os"
	"os/signal"
23
	"strconv"
W
wuyi05 已提交
24
	"time"
25

H
Helin Wang 已提交
26
	"github.com/namsral/flag"
27
	"github.com/topicai/candy"
H
Helin Wang 已提交
28

29
	"github.com/PaddlePaddle/Paddle/go/pserver"
W
wuyi05 已提交
30
	log "github.com/sirupsen/logrus"
31 32 33
)

func main() {
H
Helin Wang 已提交
34
	port := flag.Int("port", 0, "port of the pserver")
Q
Qiao Longfei 已提交
35
	index := flag.Int("index", -1, "index of this pserver, should be larger or equal than 0")
W
wuyi05 已提交
36 37
	etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
		"comma separated endpoint string for pserver to connect to etcd")
38 39
	dialTimeout := flag.Duration("dial-timeout", 5*time.Second, "dial timeout")
	etcdTTL := flag.Int("etcd-ttl", 5, "etcd time to live in seconds")
Y
yi.wu 已提交
40
	numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
D
dongzhihong 已提交
41
	checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
42
	checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
W
wuyi05 已提交
43 44
	logLevel := flag.String("log-level", "info",
		"log level, possible values: debug, info, warning, error, fatal, panic")
45 46
	flag.Parse()

W
wuyi05 已提交
47
	level, err := log.ParseLevel(*logLevel)
48 49
	candy.Must(err)

W
wuyi05 已提交
50 51
	log.SetLevel(level)

Q
Qiao Longfei 已提交
52
	var idx int
53

D
dongzhihong 已提交
54
	var cp pserver.Checkpoint
D
dongzhihong 已提交
55
	var e *pserver.EtcdClient
Q
Qiao Longfei 已提交
56 57 58
	if *index >= 0 {
		idx = *index
	} else {
59
		e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *dialTimeout, *etcdTTL)
60
		idx, err = e.Register(*port)
61 62 63
		candy.Must(err)

		cp, err = pserver.NewCheckpointFromFile(*checkpointPath, idx, e)
Q
Qiao Longfei 已提交
64
		if err != nil {
65 66 67 68 69
			if err == pserver.ErrCheckpointNotFound {
				log.Infof("Could not find the pserver checkpoint.")
			} else {
				log.Errorf("Fetch checkpoint failed, %s", err)
			}
Q
Qiao Longfei 已提交
70
		}
71 72
	}

73 74
	shutdown := func() {
		log.Infoln("shutting down gracefully")
H
Helin Wang 已提交
75 76 77
		sErr := e.Shutdown()
		if sErr != nil {
			log.Errorln(sErr)
78 79 80 81 82 83 84 85 86
		}
	}

	// Guaranteed to run even panic happens.
	defer shutdown()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

D
dongzhihong 已提交
87
	s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp)
88 89
	candy.Must(err)

W
wuyi05 已提交
90
	err = rpc.Register(s)
91
	candy.Must(err)
92 93 94

	rpc.HandleHTTP()
	l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
95
	candy.Must(err)
96

97 98 99 100 101 102 103
	go func() {
		log.Infof("start pserver at port %d", *port)
		err = http.Serve(l, nil)
		candy.Must(err)
	}()

	<-c
104
}