From 025e7f9cb6c4865f18f66d4c6a19b60ddbea3da2 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 24 May 2017 20:08:55 -0400 Subject: [PATCH] implement basic master server --- paddle/go/cmd/master/master.go | 78 ++++++++++++++++++++++++++++++++++ paddle/go/master/service.go | 4 +- 2 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 paddle/go/cmd/master/master.go diff --git a/paddle/go/cmd/master/master.go b/paddle/go/cmd/master/master.go new file mode 100644 index 00000000000..8346b42a329 --- /dev/null +++ b/paddle/go/cmd/master/master.go @@ -0,0 +1,78 @@ +package main + +import ( + "flag" + "net" + "net/http" + "net/rpc" + "os" + "strconv" + "strings" + "time" + + "github.com/PaddlePaddle/Paddle/paddle/go/master" + "github.com/wangkuiyi/recordio" +) + +const ( + taskTimeoutDur = 20 * time.Minute + taskTimeoutMax = 3 +) + +func main() { + port := flag.Int("p", 0, "port of the master server") + dataset := flag.String("d", "", "dataset: comma separated path to RecordIO files") + faultTolerant := flag.Bool("fault-tolerance", false, "enable fault tolerance (requires etcd).") + flag.Parse() + + if *dataset == "" { + panic("no dataset specified.") + } + + if *faultTolerant { + panic("fault tolernat not implemented.") + } + + var chunks []master.Chunk + paths := strings.Split(*dataset, ",") + idx := 0 + for _, path := range paths { + f, err := os.Open(path) + if err != nil { + panic(err) + } + + index, err := recordio.LoadIndex(f) + if err != nil { + panic(err) + } + f.Close() + + count := index.NumChunks() + for i := 0; i < count; i++ { + chunk := master.Chunk{ + Idx: idx, + Path: path, + Index: *index.ChunkIndex(i), + } + chunks = append(chunks, chunk) + } + } + + s := master.NewService(chunks, taskTimeoutDur, taskTimeoutMax) + err := rpc.Register(s) + if err != nil { + panic(err) + } + + rpc.HandleHTTP() + l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) + if err != nil { + panic(err) + } + + err = http.Serve(l, nil) + if err != nil { + panic(err) + } +} diff --git a/paddle/go/master/service.go b/paddle/go/master/service.go index ae7f9687a5e..652d345e01d 100644 --- a/paddle/go/master/service.go +++ b/paddle/go/master/service.go @@ -64,14 +64,14 @@ func partition(chunks []Chunk, targetTaskCount int) []taskEntry { } // NewService creates a new service. -func NewService(chunks []Chunk, timeoutDur time.Duration, timeoutMax int) (*Service, error) { +func NewService(chunks []Chunk, timeoutDur time.Duration, timeoutMax int) *Service { s := &Service{} s.timeoutDur = timeoutDur s.timeoutMax = timeoutMax s.taskQueues = taskQueues{} s.taskQueues.Pending = make(map[int]taskEntry) s.taskQueues.Todo = partition(chunks, targetTaskCount) - return s, nil + return s } // Chunk is a chunk of data consisted of several data instances. -- GitLab