master.go 3.3 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

H
Helin Wang 已提交
15 16 17
package main

import (
18
	"fmt"
H
Helin Wang 已提交
19 20 21
	"net"
	"net/http"
	"net/rpc"
H
Helin Wang 已提交
22 23
	"os"
	"os/signal"
H
Helin Wang 已提交
24
	"strconv"
25
	"strings"
H
Helin Wang 已提交
26 27
	"time"

28
	log "github.com/inconshreveable/log15"
H
Helin Wang 已提交
29 30
	"github.com/namsral/flag"

31
	"github.com/PaddlePaddle/Paddle/go/master"
32
	"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
H
Helin Wang 已提交
33 34 35
)

func main() {
H
Helin Wang 已提交
36
	port := flag.Int("port", 8080, "port of the master server.")
37
	ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.")
38
	endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.")
39 40 41 42
	taskTimeoutDur := flag.Duration("task-timout-dur", 20*time.Minute, "task timout duration.")
	taskTimeoutMax := flag.Int("task-timeout-max", 3, "max timtout count for each task before it being declared failed task.")
	chunkPerTask := flag.Int("chunk-per-task", 10, "chunk per task.")
	logLevel := flag.String("log-level", "info",
43
		"log level, possible values: debug, info, warn, error, crit")
H
Helin Wang 已提交
44 45
	flag.Parse()

46 47 48 49
	lvl, err := log.LvlFromString(*logLevel)
	if err != nil {
		panic(err)
	}
50

51 52 53
	log.Root().SetHandler(
		log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)),
	)
54

55
	if *endpoints == "" {
56
		log.Warn("-endpoints not set, fault tolerance not be enabled.")
57 58 59 60 61
	}

	var store master.Store
	if *endpoints != "" {
		eps := strings.Split(*endpoints, ",")
62 63
		ip, err := networkhelper.GetExternalIP()
		if err != nil {
64 65
			log.Crit("get external ip error", log.Ctx{"error": err})
			panic(err)
66 67 68 69
		}

		addr := fmt.Sprintf("%s:%d", ip, *port)
		store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec)
70
		if err != nil {
71 72
			log.Crit("error creating etcd client.", log.Ctx{"error": err})
			panic(err)
73 74
		}
	} else {
H
Helin Wang 已提交
75
		store = &master.InMemStore{}
76
	}
H
Helin Wang 已提交
77

H
Helin Wang 已提交
78
	shutdown := func() {
79
		log.Info("shutting down gracefully")
80 81
		err := store.Shutdown()
		if err != nil {
82
			log.Error("shutdown error", log.Ctx{"error": err})
H
Helin Wang 已提交
83 84 85 86 87 88 89 90 91
		}
	}

	// Guaranteed to run even panic happens.
	defer shutdown()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

92 93
	s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
	if err != nil {
94 95
		log.Crit("error creating new service.", log.Ctx{"error": err})
		panic(err)
H
Helin Wang 已提交
96 97
	}

98
	err = rpc.Register(s)
H
Helin Wang 已提交
99
	if err != nil {
100 101
		log.Crit("error registering to etcd.", log.Ctx{"error": err})
		panic(err)
H
Helin Wang 已提交
102 103 104 105 106
	}

	rpc.HandleHTTP()
	l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
	if err != nil {
107 108
		log.Crit("error listing to port", log.Ctx{"error": err, "port": *port})
		panic(err)
H
Helin Wang 已提交
109 110
	}

H
Helin Wang 已提交
111 112 113
	go func() {
		err = http.Serve(l, nil)
		if err != nil {
114 115
			log.Crit("error serving HTTP", log.Ctx{"error": err})
			panic(err)
H
Helin Wang 已提交
116 117 118 119
		}
	}()

	<-c
H
Helin Wang 已提交
120
}