master.go 2.8 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

H
Helin Wang 已提交
15 16 17
package main

import (
18
	"fmt"
H
Helin Wang 已提交
19 20 21
	"net"
	"net/http"
	"net/rpc"
H
Helin Wang 已提交
22 23
	"os"
	"os/signal"
H
Helin Wang 已提交
24
	"strconv"
25
	"strings"
H
Helin Wang 已提交
26 27
	"time"

H
Helin Wang 已提交
28
	"github.com/namsral/flag"
29
	log "github.com/sirupsen/logrus"
30
	"github.com/topicai/candy"
H
Helin Wang 已提交
31

32
	"github.com/PaddlePaddle/Paddle/go/master"
33
	"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
H
Helin Wang 已提交
34 35 36
)

func main() {
H
Helin Wang 已提交
37
	port := flag.Int("port", 8080, "port of the master server.")
38
	ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.")
39
	endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.")
40 41 42 43 44
	taskTimeoutDur := flag.Duration("task-timout-dur", 20*time.Minute, "task timout duration.")
	taskTimeoutMax := flag.Int("task-timeout-max", 3, "max timtout count for each task before it being declared failed task.")
	chunkPerTask := flag.Int("chunk-per-task", 10, "chunk per task.")
	logLevel := flag.String("log-level", "info",
		"log level, possible values: debug, info, warning, error, fatal, panic")
H
Helin Wang 已提交
45 46
	flag.Parse()

47 48 49 50 51
	level, e := log.ParseLevel(*logLevel)
	candy.Must(e)

	log.SetLevel(level)

52 53 54 55 56 57 58
	if *endpoints == "" {
		log.Warningln("-endpoints not set, fault tolerance not be enabled.")
	}

	var store master.Store
	if *endpoints != "" {
		eps := strings.Split(*endpoints, ",")
59 60 61 62 63 64 65
		ip, err := networkhelper.GetExternalIP()
		if err != nil {
			log.Fatal(err)
		}

		addr := fmt.Sprintf("%s:%d", ip, *port)
		store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec)
66 67 68 69
		if err != nil {
			log.Fatal(err)
		}
	} else {
H
Helin Wang 已提交
70
		store = &master.InMemStore{}
71
	}
H
Helin Wang 已提交
72

H
Helin Wang 已提交
73 74
	shutdown := func() {
		log.Infoln("shutting down gracefully")
75 76 77
		err := store.Shutdown()
		if err != nil {
			log.Errorln(err)
H
Helin Wang 已提交
78 79 80 81 82 83 84 85 86
		}
	}

	// Guaranteed to run even panic happens.
	defer shutdown()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

87 88 89
	s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
	if err != nil {
		log.Fatal(err)
H
Helin Wang 已提交
90 91
	}

92
	err = rpc.Register(s)
H
Helin Wang 已提交
93
	if err != nil {
94
		log.Fatal(err)
H
Helin Wang 已提交
95 96 97 98 99
	}

	rpc.HandleHTTP()
	l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
	if err != nil {
100
		log.Fatal(err)
H
Helin Wang 已提交
101 102
	}

H
Helin Wang 已提交
103 104 105 106 107 108 109 110
	go func() {
		err = http.Serve(l, nil)
		if err != nil {
			log.Fatal(err)
		}
	}()

	<-c
H
Helin Wang 已提交
111
}