diff --git a/CMakeLists.txt b/CMakeLists.txt index c2218be5efb90142ef7f16b788e141e025105771..2b6a80ca43cf131c6886455cb5a86a61246ac17c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,6 +126,7 @@ endif(WITH_GPU) add_subdirectory(proto) add_subdirectory(paddle) +add_subdirectory(go/master/c) add_subdirectory(python) add_subdirectory(go/pserver/cclient) diff --git a/go/cmake/golang.cmake b/go/cmake/golang.cmake index 7c85fb6298d02b85ea87af9b6f6e960463443b7d..a5a43886f887e495500fa26b3c26fa69c63eded0 100644 --- a/go/cmake/golang.cmake +++ b/go/cmake/golang.cmake @@ -26,27 +26,23 @@ function(GO_LIBRARY NAME BUILD_TYPE) # automatically get all dependencies specified in the source code # for given target. - add_custom_target(goGet env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get -d ${rel}/...) + add_custom_target(${NAME}_goGet env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get -d ${rel}/...) # make a symlink that references Paddle inside $GOPATH, so go get # will use the local changes in Paddle rather than checkout Paddle # in github. - add_custom_target(copyPaddle + add_custom_target(${NAME}_copyPaddle COMMAND rm -rf ${PADDLE_IN_GOPATH}/Paddle COMMAND ln -sf ${PADDLE_DIR} ${PADDLE_IN_GOPATH}/Paddle) - add_dependencies(goGet copyPaddle) + add_dependencies(${NAME}_goGet ${NAME}_copyPaddle) add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} - -gcflags=-shared -asmflags=-shared -installsuffix=_shared -a -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" ${CMAKE_GO_FLAGS} ${GO_SOURCE} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) - add_dependencies(${NAME} goGet) + add_dependencies(${NAME} ${NAME}_goGet) - if(NOT BUILD_TYPE STREQUAL "STATIC") - install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION bin) - endif() endfunction(GO_LIBRARY) diff --git a/go/connection/conn.go b/go/connection/conn.go index bc9b5f0617e35f049c3e14f0b441aca2033f9645..977e8cc123707dbcf055bb77399adbc232c575a0 100644 --- a/go/connection/conn.go +++ b/go/connection/conn.go @@ -2,9 +2,10 @@ package connection import ( "errors" - "log" "net/rpc" "sync" + + log "github.com/sirupsen/logrus" ) // TODO(helin): add TCP re-connect logic @@ -65,7 +66,7 @@ func (c *Conn) Connect(addr string) error { } else { err := client.Close() if err != nil { - log.Println(err) + log.Errorln(err) } return errors.New("client already set from a concurrent goroutine") diff --git a/go/master/c/CMakeLists.txt b/go/master/c/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..acce698051ec7217d60a40b3d9cdc98fb1499653 --- /dev/null +++ b/go/master/c/CMakeLists.txt @@ -0,0 +1,21 @@ +cmake_minimum_required(VERSION 3.0) + +get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake") + +project(cxx_go C Go) + +include(golang) +include(flags) + +set(MASTER_LIB_NAME "paddle_master") +go_library(${MASTER_LIB_NAME} SHARED) + +if(PROJ_ROOT) + add_custom_command(OUTPUT ${PROJ_ROOT}/python/paddle/v2/master/lib${MASTER_LIB_NAME}.so + COMMAND rm ${CMAKE_CURRENT_BINARY_DIR}/lib${MASTER_LIB_NAME}.h + COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/lib${MASTER_LIB_NAME}.so ${PROJ_ROOT}/python/paddle/v2/master/ + DEPENDS ${MASTER_LIB_NAME}) + add_custom_target(paddle_master_shared ALL DEPENDS ${PROJ_ROOT}/python/paddle/v2/master/lib${MASTER_LIB_NAME}.so) +endif(PROJ_ROOT) diff --git a/go/master/c/client.go b/go/master/c/client.go new file mode 100644 index 0000000000000000000000000000000000000000..b186474dc33138aeb02a2ffe34418b379b7a2db0 --- /dev/null +++ b/go/master/c/client.go @@ -0,0 +1,110 @@ +package main + +/* +#include +#include +#include + +#define PADDLE_MASTER_OK 0 +#define PADDLE_MASTER_ERROR -1 + +typedef int paddle_master_client; +*/ +import "C" + +import ( + "sync" + "unsafe" + + "github.com/PaddlePaddle/Paddle/go/master" + log "github.com/sirupsen/logrus" +) + +var nullPtr = unsafe.Pointer(uintptr(0)) +var mu sync.Mutex +var handleMap = make(map[C.paddle_master_client]*master.Client) +var curHandle C.paddle_master_client + +func add(c *master.Client) C.paddle_master_client { + mu.Lock() + defer mu.Unlock() + client := curHandle + curHandle++ + handleMap[client] = c + return client +} + +func get(client C.paddle_master_client) *master.Client { + mu.Lock() + defer mu.Unlock() + return handleMap[client] +} + +func remove(client C.paddle_master_client) *master.Client { + mu.Lock() + defer mu.Unlock() + h := handleMap[client] + delete(handleMap, client) + return h +} + +type addresser string + +func (a addresser) Address() string { + return string(a) +} + +//export paddle_new_master_client +func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client { + a := C.GoString(addr) + c := master.NewClient(addresser(a), bufSize) + return add(c) +} + +//export paddle_release_master_client +func paddle_release_master_client(client C.paddle_master_client) { + remove(client) +} + +//export paddle_set_dataset +func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int) C.int { + c := get(client) + var paths []string + for i := 0; i < int(size); i++ { + ptr := (**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(path)) + uintptr(i)*unsafe.Sizeof(*path))) + str := C.GoString(*ptr) + paths = append(paths, str) + } + err := c.SetDataset(paths) + if err != nil { + log.Errorln(err) + return C.PADDLE_MASTER_ERROR + } + + return C.PADDLE_MASTER_OK +} + +//export paddle_next_record +func paddle_next_record(client C.paddle_master_client, record **C.uchar) C.int { + c := get(client) + r := c.NextRecord() + if len(r) == 0 { + *record = (*C.uchar)(nullPtr) + return 0 + } + + size := C.size_t(len(r)) + *record = (*C.uchar)(C.malloc(size)) + C.memcpy(unsafe.Pointer(*record), unsafe.Pointer(&r[0]), size) + return C.int(size) +} + +//export mem_free +func mem_free(p unsafe.Pointer) { + // "free" may be a better name for this function, but doing so + // will cause calling any function of this library from Python + // ctypes hanging. + C.free(p) +} + +func main() {} diff --git a/go/master/client.go b/go/master/client.go index 20c66340dc28bc514b6c51583dd94830c42a41bf..8451820c1963dd5a4eff0c3ab7763eb6a8e05ba4 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -1,10 +1,12 @@ package master import ( - "log" + "os" "time" "github.com/PaddlePaddle/Paddle/go/connection" + "github.com/PaddlePaddle/recordio" + log "github.com/sirupsen/logrus" ) // Addresser provide the address of the master server. @@ -15,16 +17,61 @@ type Addresser interface { // Client is the client of the master server. type Client struct { conn *connection.Conn + ch chan []byte } // NewClient creates a new Client. -func NewClient(addr Addresser) *Client { +// +// bufSize is the record buffer size. NextRecord will read from this +// buffer. +func NewClient(addr Addresser, bufSize int) *Client { c := &Client{} c.conn = connection.New() + c.ch = make(chan []byte, bufSize) go c.monitorMaster(addr) + go c.getRecords() return c } +func (c *Client) getRecords() { + for { + t, err := c.getTask() + if err != nil { + // TODO(helin): wait before move on with next + // getTask call. + log.Errorln(err) + continue + } + + for _, chunk := range t.Chunks { + f, err := os.Open(chunk.Path) + if err != nil { + log.Errorln(err) + continue + } + + s := recordio.NewRangeScanner(f, &chunk.Index, -1, -1) + for s.Scan() { + c.ch <- s.Record() + } + + if s.Err() != nil { + log.Errorln(err, chunk.Path) + } + + err = f.Close() + if err != nil { + log.Errorln(err) + } + } + + // We treat a task as finished whenever the last data + // instance of the task is read. This is not exactly + // correct, but a reasonable approximation. + c.taskFinished(t.ID) + } +} + func (c *Client) monitorMaster(addr Addresser) { lastMaster := "" monitor := func() { @@ -35,12 +82,12 @@ func (c *Client) monitorMaster(addr Addresser) { if curMaster == "" { err := c.conn.Close() if err != nil { - log.Println(err) + log.Errorln(err) } } else { err := c.conn.Connect(curMaster) if err != nil { - log.Println(err) + log.Errorln(err) // connect to addr failed, set // to last known addr in order @@ -69,14 +116,22 @@ func (c *Client) SetDataset(globPaths []string) error { return c.conn.Call("Service.SetDataset", globPaths, nil) } -// GetTask gets a new task from the master server. -func (c *Client) GetTask() (Task, error) { +// getTask gets a new task from the master server. +func (c *Client) getTask() (Task, error) { var t Task err := c.conn.Call("Service.GetTask", 0, &t) return t, err } // TaskFinished tells the master server a task is finished. -func (c *Client) TaskFinished(taskID int) error { +func (c *Client) taskFinished(taskID int) error { return c.conn.Call("Service.TaskFinished", taskID, nil) } + +// NextRecord returns next record in the dataset. +// +// NextRecord will block until the next record is available. It is +// thread-safe. +func (c *Client) NextRecord() []byte { + return <-c.ch +} diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go new file mode 100644 index 0000000000000000000000000000000000000000..00fcca0e2cf44d0f4855fd366a8f80895abf8865 --- /dev/null +++ b/go/master/client_internal_test.go @@ -0,0 +1,121 @@ +package master + +import ( + "fmt" + "net" + "net/http" + "net/rpc" + "os" + "strconv" + "strings" + "testing" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/PaddlePaddle/Paddle/go/connection" + "github.com/PaddlePaddle/recordio" +) + +const ( + totalTask = 20 + chunkPerTask = 10 +) + +func init() { + log.SetLevel(log.ErrorLevel) +} + +type TestAddresser string + +func (a TestAddresser) Address() string { + return string(a) +} + +func TestGetFinishTask(t *testing.T) { + const path = "/tmp/master_client_test_0" + + l, err := net.Listen("tcp", ":0") + if err != nil { + panic(err) + } + + ss := strings.Split(l.Addr().String(), ":") + p, err := strconv.Atoi(ss[len(ss)-1]) + if err != nil { + panic(err) + } + + go func(l net.Listener) { + s := NewService(chunkPerTask, time.Second, 1) + server := rpc.NewServer() + err := server.Register(s) + if err != nil { + panic(err) + } + + mux := http.NewServeMux() + mux.Handle(rpc.DefaultRPCPath, server) + err = http.Serve(l, mux) + if err != nil { + panic(err) + } + }(l) + + f, err := os.Create(path) + if err != nil { + panic(err) + } + + for i := 0; i < totalTask*chunkPerTask; i++ { + w := recordio.NewWriter(f, -1, -1) + w.Write(nil) + // call Close to force RecordIO writing a chunk. + w.Close() + } + f.Close() + + // Manually intialize client to avoid calling c.getRecords() + c := &Client{} + c.conn = connection.New() + go c.monitorMaster(TestAddresser(fmt.Sprintf(":%d", p))) + c.SetDataset([]string{path}) + + checkOnePass := func(i int) { + var tasks []Task + for idx := 0; idx < totalTask; idx++ { + task, err := c.getTask() + if err != nil { + t.Fatalf("Error: %v, pass: %d\n", err, i) + } + tasks = append(tasks, task) + } + + _, err = c.getTask() + if err == nil { + t.Fatalf("Should get error, pass: %d\n", i) + } + + err = c.taskFinished(tasks[0].ID) + if err != nil { + t.Fatalf("Error: %v, pass: %d\n", err, i) + } + tasks = tasks[1:] + task, err := c.getTask() + if err != nil { + t.Fatal(err) + } + tasks = append(tasks, task) + + for _, task := range tasks { + err = c.taskFinished(task.ID) + if err != nil { + t.Fatalf("Error: %v, pass: %d\n", err, i) + } + } + } + + for i := 0; i < 10; i++ { + checkOnePass(i) + } +} diff --git a/go/master/client_test.go b/go/master/client_test.go index df708ad7912c07205ae0cee8d2ab1c06d65223cc..2b3f873ecf3a650cd91d1d9c20b414b05bbb0cd6 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -11,21 +11,15 @@ import ( "testing" "time" - log "github.com/sirupsen/logrus" - "github.com/PaddlePaddle/Paddle/go/master" "github.com/PaddlePaddle/recordio" ) -const ( - totalTask = 20 - chunkPerTask = 10 -) - -var port int - -func init() { - log.SetLevel(log.ErrorLevel) +func TestNextRecord(t *testing.T) { + const ( + path = "/tmp/master_client_TestFull" + total = 50 + ) l, err := net.Listen("tcp", ":0") if err != nil { @@ -37,10 +31,9 @@ func init() { if err != nil { panic(err) } - port = p go func(l net.Listener) { - s := master.NewService(chunkPerTask, time.Second, 1) + s := master.NewService(10, time.Second, 1) server := rpc.NewServer() err := server.Register(s) if err != nil { @@ -54,67 +47,33 @@ func init() { panic(err) } }(l) -} -type addresser string - -func (a addresser) Address() string { - return string(a) -} - -func TestClientFull(t *testing.T) { - const p = "/tmp/master_client_test_0" - f, err := os.Create(p) + f, err := os.Create(path) if err != nil { panic(err) } - for i := 0; i < totalTask*chunkPerTask; i++ { - w := recordio.NewWriter(f, -1, -1) - w.Write(nil) - // call Close to force RecordIO writing a chunk. - w.Close() + w := recordio.NewWriter(f, -1, -1) + for i := 0; i < total; i++ { + w.Write([]byte{byte(i)}) } + w.Close() f.Close() - c := master.NewClient(addresser(fmt.Sprintf(":%d", port))) - c.SetDataset([]string{p}) + c := master.NewClient(master.TestAddresser(fmt.Sprintf(":%d", p)), 10) + c.SetDataset([]string{path}) - checkOnePass := func(i int) { - var tasks []master.Task - for i := 0; i < totalTask; i++ { - task, err := c.GetTask() - if err != nil { - t.Fatal(i, err) + for pass := 0; pass < 50; pass++ { + received := make(map[byte]bool) + for i := 0; i < total; i++ { + r := c.NextRecord() + if len(r) != 1 { + t.Fatal("Length should be 1.", r) } - tasks = append(tasks, task) - } - - _, err = c.GetTask() - if err == nil { - t.Fatal(i, "should get error.") - } - - err = c.TaskFinished(tasks[0].ID) - if err != nil { - t.Fatal(err) - } - tasks = tasks[1:] - task, err := c.GetTask() - if err != nil { - t.Fatal(err) - } - tasks = append(tasks, task) - - for _, task := range tasks { - err = c.TaskFinished(task.ID) - if err != nil { - t.Fatal(i, err) + if received[r[0]] { + t.Fatal("Received duplicate.", received, r) } + received[r[0]] = true } } - - for i := 0; i < 10; i++ { - checkOnePass(i) - } } diff --git a/go/master/service.go b/go/master/service.go index 1e2a34972bb4763688d7df97d768320cd6f9e9d4..55e1e2d1a4a5cd6f5d5797b247e2ebe433607576 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -207,16 +207,26 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() { t.NumTimeout++ if t.NumTimeout > s.timeoutMax { - log.Warningf("Task %v failed %d times, discard.\n", t.Task, t.NumTimeout) + log.Warningf("Task %v timed out %d times, discard.\n", t.Task, t.NumTimeout) s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task) return } - log.Warningf("Task %v failed %d times, retry.\n", t.Task, t.NumTimeout) + log.Warningf("Task %v timed out %d times, retry.\n", t.Task, t.NumTimeout) s.taskQueues.Todo = append(s.taskQueues.Todo, t) } } +// must be called with lock held. +func (s *Service) logFields() log.Fields { + return log.Fields{ + "todoLen": len(s.taskQueues.Todo), + "pendingLen": len(s.taskQueues.Pending), + "doneLen": len(s.taskQueues.Done), + "failedLen": len(s.taskQueues.Failed), + } +} + // GetTask gets a new task from the service. func (s *Service) GetTask(dummy int, task *Task) error { select { @@ -230,7 +240,7 @@ func (s *Service) GetTask(dummy int, task *Task) error { if len(s.taskQueues.Done) == 0 { if len(s.taskQueues.Pending) == 0 { err := errors.New("all task failed") - log.Warningln(err) + log.WithFields(s.logFields()).Warningln("All tasks failed.") return err } @@ -243,12 +253,12 @@ func (s *Service) GetTask(dummy int, task *Task) error { // in package. So we need to figure out a way // for client to check this error correctly. err := errors.New("no more available task") - log.Warningln(err) + log.WithFields(s.logFields()).Warningln("No more available task.") return err } s.taskQueues.Todo = s.taskQueues.Done s.taskQueues.Done = nil - log.Infoln("No more todo task, but trainer is requesting task to do. Move all done task to todo.") + log.WithFields(s.logFields()).Infoln("No more todo task, but trainer is requesting task to do. Move all done task to todo.") } t := s.taskQueues.Todo[0] @@ -261,7 +271,7 @@ func (s *Service) GetTask(dummy int, task *Task) error { } *task = t.Task - log.Infof("Task #%d dispatched\n", task.ID) + log.WithFields(s.logFields()).Infof("Task #%d dispatched.", task.ID) time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.ID, t.Epoch)) return nil @@ -276,12 +286,10 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { s.mu.Lock() defer s.mu.Unlock() - log.Infof("Task %d finished\n", taskID) - t, ok := s.taskQueues.Pending[taskID] if !ok { err := errors.New("pending task not found") - log.Warningln(err) + log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID) return err } @@ -290,8 +298,10 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { s.taskQueues.Done = append(s.taskQueues.Done, t) delete(s.taskQueues.Pending, taskID) + log.WithFields(s.logFields()).Infof("Task #%d finished.", taskID) + if len(s.taskQueues.Pending) == 0 && len(s.taskQueues.Todo) == 0 { - log.Infoln("No more todo and pending task, start a new pass.") + log.WithFields(s.logFields()).Infoln("No more todo and pending task, start a new pass.") s.taskQueues.Todo = append(s.taskQueues.Todo, s.taskQueues.Done...) s.taskQueues.Done = nil } diff --git a/go/pserver/cclient/cclient.go b/go/pserver/cclient/cclient.go index 4476e762dad04009833421056aa5a49efd44ddaa..92a41b7f5434842c6318704dd85adf9e51c19944 100644 --- a/go/pserver/cclient/cclient.go +++ b/go/pserver/cclient/cclient.go @@ -1,7 +1,6 @@ package main /* -#include #include typedef enum { PADDLE_ELEMENT_TYPE_INT32 = 0, @@ -26,12 +25,12 @@ typedef int paddle_pserver_client; import "C" import ( - "log" "strings" "sync" "unsafe" "github.com/PaddlePaddle/Paddle/go/pserver" + log "github.com/sirupsen/logrus" ) var nullPtr = unsafe.Pointer(uintptr(0)) @@ -134,10 +133,10 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter, if err != nil { if err.Error() == pserver.AlreadyInitialized { - log.Printf("parameter %s already initialized, treat paddle_init_param as sucessful.\n", name) + log.Warningf("parameter %s already initialized, treat paddle_init_param as sucessful.\n", name) return C.PSERVER_OK } - log.Println(err) + log.Errorln(err) return C.PSERVER_ERROR } @@ -150,11 +149,11 @@ func paddle_finish_init_params(client C.paddle_pserver_client) C.int { err := c.FinishInitParams() if err != nil { if err.Error() == pserver.AlreadyInitialized { - log.Println("parameters already initialized, treat paddle_finish_init_params as sucessful.") + log.Warningln("parameters already initialized, treat paddle_finish_init_params as sucessful.") return C.PSERVER_OK } - log.Println(err) + log.Errorln(err) return C.PSERVER_ERROR } @@ -175,7 +174,7 @@ func paddle_send_grads(client C.paddle_pserver_client, grads **C.paddle_gradient c := get(client) err := c.SendGrads(gs) if err != nil { - log.Println(err) + log.Errorln(err) return C.PSERVER_ERROR } @@ -192,7 +191,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, c := get(client) ps, err := c.GetParams(ns) if err != nil { - log.Println(err) + log.Errorln(err) return C.PSERVER_ERROR } @@ -201,7 +200,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, for i, p := range ps { pn[i] = p.Name } - log.Printf("pserver returned wrong number of parameters. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", ")) + log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", ")) return C.PSERVER_ERROR } @@ -211,7 +210,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, for i, p := range ps { pn[i] = p.Name } - log.Printf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", ")) + log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", ")) return C.PSERVER_ERROR } } @@ -221,14 +220,14 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, param := *(**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst)))) if unsafe.Pointer(param) == nullPtr { - log.Println("must pre-allocate parameter.") + log.Errorln("must pre-allocate parameter.") return C.PSERVER_ERROR - } else { - if unsafe.Pointer(param.content) != nullPtr { - if int(param.content_len) != len(p.Content) { - log.Printf("the pre-allocated content len does not match parameter content len. Pre-allocated len: %d, returned len: %d", param.content_len, len(p.Content)) - return C.PSERVER_ERROR - } + } + + if unsafe.Pointer(param.content) != nullPtr { + if int(param.content_len) != len(p.Content) { + log.Errorf("the pre-allocated content len does not match parameter content len. Pre-allocated len: %d, returned len: %d", param.content_len, len(p.Content)) + return C.PSERVER_ERROR } } @@ -246,7 +245,7 @@ func paddle_save_model(client C.paddle_pserver_client, path *C.char) C.int { c := get(client) err := c.Save(p) if err != nil { - log.Println(err) + log.Errorln(err) return C.PSERVER_ERROR } diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index 07e1b86b43299f72e24bb83621847980eaaaf06e..03f749d4e46c4890c6dcfa25af572dab4a053c86 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -1,4 +1,5 @@ #include +#include #include "libpaddle_pserver_cclient.h" diff --git a/go/pserver/client.go b/go/pserver/client.go index afe1eecd015b84684329c0e624f3753852d7a8ce..dda915977282d4880ddcc8c18ef6fd80ede9e01b 100644 --- a/go/pserver/client.go +++ b/go/pserver/client.go @@ -2,11 +2,11 @@ package pserver import ( "hash/fnv" - "log" "sort" "time" "github.com/PaddlePaddle/Paddle/go/connection" + log "github.com/sirupsen/logrus" ) // TODO(helin): add RPC call retry logic @@ -64,7 +64,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) { if curServers[i].Addr == "" { err := c.pservers[i].Close() if err != nil { - log.Println(err) + log.Errorln(err) } continue @@ -72,7 +72,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) { err := c.pservers[i].Connect(curServers[i].Addr) if err != nil { - log.Println(err) + log.Errorln(err) // connect to addr failed, set // to last known addr in order diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 3640dd3a75ea212a84255ea7f6369b63606482ab..0e17c42d34f147db190ac5e5ccd5339360cc35bb 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -18,7 +18,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.in add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND ${CMAKE_COMMAND} -E touch ${OUTPUT_DIR}/.timestamp - DEPENDS gen_proto_py ${PY_FILES} ${external_project_dependencies}) + DEPENDS gen_proto_py ${PY_FILES} ${external_project_dependencies} paddle_master_shared) add_custom_target(paddle_python ALL DEPENDS ${OUTPUT_DIR}/.timestamp) diff --git a/python/paddle/v2/__init__.py b/python/paddle/v2/__init__.py index b9d0a7f29138cae281236b26509a56738f3801f4..102331c0bb6477cbeb618f015aad76a0414723ba 100644 --- a/python/paddle/v2/__init__.py +++ b/python/paddle/v2/__init__.py @@ -26,6 +26,7 @@ import evaluator from . import dataset from . import reader from . import plot +from . import master import attr import op import pooling @@ -37,9 +38,26 @@ import plot import image __all__ = [ - 'optimizer', 'layer', 'activation', 'parameters', 'init', 'trainer', - 'event', 'data_type', 'attr', 'pooling', 'data_feeder', 'dataset', 'reader', - 'topology', 'networks', 'infer', 'plot', 'evaluator', 'image' + 'optimizer', + 'layer', + 'activation', + 'parameters', + 'init', + 'trainer', + 'event', + 'data_type', + 'attr', + 'pooling', + 'data_feeder', + 'dataset', + 'reader', + 'topology', + 'networks', + 'infer', + 'plot', + 'evaluator', + 'image', + 'master', ] diff --git a/python/paddle/v2/master/.gitignore b/python/paddle/v2/master/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..a3ac6e1a33e74631136fc95574532284db7cd7cd --- /dev/null +++ b/python/paddle/v2/master/.gitignore @@ -0,0 +1,3 @@ +*.whl +*.so +*.pyc diff --git a/python/paddle/v2/master/__init__.py b/python/paddle/v2/master/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c8975b5d4a33cbecb4fa5a144bc610c36591d629 --- /dev/null +++ b/python/paddle/v2/master/__init__.py @@ -0,0 +1,3 @@ +from client import * + +__all__ = ['client'] diff --git a/python/paddle/v2/master/client.py b/python/paddle/v2/master/client.py new file mode 100644 index 0000000000000000000000000000000000000000..de8e9bb88e1064e41a80e0ef7838e307089a1331 --- /dev/null +++ b/python/paddle/v2/master/client.py @@ -0,0 +1,39 @@ +import ctypes +import os + +path = os.path.join(os.path.dirname(__file__), "libpaddle_master.so") +lib = ctypes.cdll.LoadLibrary(path) + + +class client(object): + """ + client is a client to the master server. + """ + + def __init__(self, addr, buf_size): + self.c = lib.paddle_new_master_client(addr, buf_size) + + def close(self): + lib.paddle_release_master_client(self.c) + self.c = None + + def set_dataset(self, paths): + holder_type = ctypes.c_char_p * len(paths) + holder = holder_type() + print paths + for idx, path in enumerate(paths): + c_ptr = ctypes.c_char_p(path) + holder[idx] = c_ptr + lib.paddle_set_dataset(self.c, holder, len(paths)) + + def next_record(self): + p = ctypes.c_char_p() + ret = ctypes.pointer(p) + size = lib.paddle_next_record(self.c, ret) + if size == 0: + # Empty record + return "" + record = ret.contents.value[:size] + # Memory created from C should be freed. + lib.mem_free(ret.contents) + return record diff --git a/python/setup.py.in b/python/setup.py.in index 93724f918801ea706517a1df158ceb78a1c2335c..8fe1cfd8b338b9b2e47edcec6d66bbcdd38b5198 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -1,6 +1,5 @@ from setuptools import setup - packages=['paddle', 'paddle.proto', 'paddle.trainer', @@ -9,7 +8,8 @@ packages=['paddle', 'paddle.v2', 'paddle.v2.dataset', 'paddle.v2.reader', - 'paddle.v2.plot'] + 'paddle.v2.plot', + 'paddle.v2.master'] setup_requires=["requests", "numpy", @@ -25,7 +25,8 @@ setup(name='paddle', description='Parallel Distributed Deep Learning', install_requires=setup_requires, packages=packages, + package_data={'paddle.v2.master': ['libpaddle_master.so'], }, package_dir={ '': '${CMAKE_CURRENT_SOURCE_DIR}' - } + }, )