pserver.go 2.9 KB
Newer Older
1
// Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
D
dongzhihong 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17 18 19 20
package main

import (
	"net"
	"net/http"
	"net/rpc"
21 22
	"os"
	"os/signal"
23
	"strconv"
W
wuyi05 已提交
24
	"time"
25

H
Helin Wang 已提交
26
	"github.com/namsral/flag"
27
	"github.com/topicai/candy"
H
Helin Wang 已提交
28

29
	"github.com/PaddlePaddle/Paddle/go/pserver"
30
	log "github.com/inconshreveable/log15"
31 32 33
)

func main() {
34
	port := flag.Int("port", 8001, "port of the pserver")
H
Helin Wang 已提交
35
	index := flag.Int("index", -1, "index of the pserver, set to -1 if use etcd for auto pserver index registry")
W
wuyi05 已提交
36 37
	etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
		"comma separated endpoint string for pserver to connect to etcd")
38 39
	dialTimeout := flag.Duration("dial-timeout", 5*time.Second, "dial timeout")
	etcdTTL := flag.Int("etcd-ttl", 5, "etcd time to live in seconds")
Y
yi.wu 已提交
40
	numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
D
dongzhihong 已提交
41
	checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
42
	checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
W
wuyi05 已提交
43
	logLevel := flag.String("log-level", "info",
44
		"log level, possible values: debug, info, warn, error, crit")
45 46
	flag.Parse()

47 48 49 50
	lvl, err := log.LvlFromString(*logLevel)
	if err != nil {
		panic(err)
	}
51

52 53 54
	log.Root().SetHandler(
		log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)),
	)
W
wuyi05 已提交
55

Q
Qiao Longfei 已提交
56
	var idx int
57

D
dongzhihong 已提交
58
	var cp pserver.Checkpoint
D
dongzhihong 已提交
59
	var e *pserver.EtcdClient
Q
Qiao Longfei 已提交
60 61 62
	if *index >= 0 {
		idx = *index
	} else {
63
		e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *dialTimeout, *etcdTTL)
64
		idx, err = e.Register(*port)
65 66
		candy.Must(err)

H
Helin Wang 已提交
67
		cp, err = pserver.LoadCheckpoint(e, idx)
Q
Qiao Longfei 已提交
68
		if err != nil {
69
			if err == pserver.ErrCheckpointNotFound {
H
Helin Wang 已提交
70
				log.Info("load checkpoint error", "error", err)
71
			} else {
H
Helin Wang 已提交
72
				panic(err)
73
			}
Q
Qiao Longfei 已提交
74
		}
75 76
	}

77
	shutdown := func() {
78
		log.Info("shutting down gracefully")
H
Helin Wang 已提交
79 80
		sErr := e.Shutdown()
		if sErr != nil {
81
			log.Error("error shutting down", log.Ctx{"error": sErr})
82 83 84 85 86 87 88 89 90
		}
	}

	// Guaranteed to run even panic happens.
	defer shutdown()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

D
dongzhihong 已提交
91
	s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp)
92 93
	candy.Must(err)

W
wuyi05 已提交
94
	err = rpc.Register(s)
95
	candy.Must(err)
96 97 98

	rpc.HandleHTTP()
	l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
99
	candy.Must(err)
100

101
	go func() {
H
Helin Wang 已提交
102
		log.Info("serving pserver", log.Ctx{"port": *port})
103 104 105 106 107
		err = http.Serve(l, nil)
		candy.Must(err)
	}()

	<-c
108
}