pserver.go 2.5 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17 18 19 20 21
package main

import (
	"net"
	"net/http"
	"net/rpc"
	"strconv"
W
wuyi05 已提交
22
	"time"
23

H
Helin Wang 已提交
24
	"github.com/namsral/flag"
25
	"github.com/topicai/candy"
H
Helin Wang 已提交
26

27
	"github.com/PaddlePaddle/Paddle/go/pserver"
W
wuyi05 已提交
28
	log "github.com/sirupsen/logrus"
29 30 31
)

func main() {
H
Helin Wang 已提交
32
	port := flag.Int("port", 0, "port of the pserver")
Q
Qiao Longfei 已提交
33
	index := flag.Int("index", -1, "index of this pserver, should be larger or equal than 0")
W
wuyi05 已提交
34 35
	etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
		"comma separated endpoint string for pserver to connect to etcd")
36
	etcdTimeout := flag.Duration("etcd-timeout", 5*time.Second, "timeout for etcd calls")
Y
yi.wu 已提交
37
	numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
D
dongzhihong 已提交
38
	checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
39
	checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
W
wuyi05 已提交
40 41
	logLevel := flag.String("log-level", "info",
		"log level, possible values: debug, info, warning, error, fatal, panic")
42 43
	flag.Parse()

W
wuyi05 已提交
44
	level, err := log.ParseLevel(*logLevel)
45 46
	candy.Must(err)

W
wuyi05 已提交
47 48
	log.SetLevel(level)

Q
Qiao Longfei 已提交
49
	var idx int
50

D
dongzhihong 已提交
51
	var cp pserver.Checkpoint
D
dongzhihong 已提交
52
	var e *pserver.EtcdClient
Q
Qiao Longfei 已提交
53 54 55
	if *index >= 0 {
		idx = *index
	} else {
56
		e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *etcdTimeout)
57
		idx, err = e.Register(*port)
58 59 60
		candy.Must(err)

		cp, err = pserver.NewCheckpointFromFile(*checkpointPath, idx, e)
Q
Qiao Longfei 已提交
61
		if err != nil {
62 63 64 65 66
			if err == pserver.ErrCheckpointNotFound {
				log.Infof("Could not find the pserver checkpoint.")
			} else {
				log.Errorf("Fetch checkpoint failed, %s", err)
			}
Q
Qiao Longfei 已提交
67
		}
68 69
	}

D
dongzhihong 已提交
70
	s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp)
71 72
	candy.Must(err)

W
wuyi05 已提交
73
	err = rpc.Register(s)
74
	candy.Must(err)
75 76 77

	rpc.HandleHTTP()
	l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
78
	candy.Must(err)
79

W
wuyi05 已提交
80
	log.Infof("start pserver at port %d", *port)
81
	err = http.Serve(l, nil)
82
	candy.Must(err)
83
}