From a868d010658642c3612ca37b488123d63623a967 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 25 May 2017 20:53:13 -0400 Subject: [PATCH] add cgo wrapper for recordio, make go_cmake automatically download go dependency --- paddle/go/CMakeLists.txt | 11 - paddle/go/adder.go | 10 - paddle/go/cclient/CMakeLists.txt | 31 +-- paddle/go/cclient/cclient.go | 7 +- paddle/go/cgo_test.cc | 5 - .../cmake/CMakeDetermineGoCompiler.cmake | 2 +- .../cmake/CMakeGoCompiler.cmake.in | 0 .../cmake/CMakeGoInformation.cmake | 0 .../cmake/CMakeTestGoCompiler.cmake | 0 paddle/go/{cclient => }/cmake/flags.cmake | 4 +- paddle/go/{cclient => }/cmake/golang.cmake | 40 ++-- paddle/go/crecordio/CMakeLists.txt | 12 + paddle/go/crecordio/crecordio.go | 208 ++++++++++++++++++ paddle/go/crecordio/register.go | 61 +++++ paddle/go/crecordio/test/CMakeLists.txt | 8 + paddle/go/crecordio/test/test.c | 31 +++ paddle/go/recordio/README.md | 5 +- 17 files changed, 361 insertions(+), 74 deletions(-) delete mode 100644 paddle/go/CMakeLists.txt delete mode 100644 paddle/go/adder.go delete mode 100644 paddle/go/cgo_test.cc rename paddle/go/{cclient => }/cmake/CMakeDetermineGoCompiler.cmake (94%) rename paddle/go/{cclient => }/cmake/CMakeGoCompiler.cmake.in (100%) rename paddle/go/{cclient => }/cmake/CMakeGoInformation.cmake (100%) rename paddle/go/{cclient => }/cmake/CMakeTestGoCompiler.cmake (100%) rename paddle/go/{cclient => }/cmake/flags.cmake (95%) rename paddle/go/{cclient => }/cmake/golang.cmake (50%) create mode 100644 paddle/go/crecordio/CMakeLists.txt create mode 100644 paddle/go/crecordio/crecordio.go create mode 100644 paddle/go/crecordio/register.go create mode 100644 paddle/go/crecordio/test/CMakeLists.txt create mode 100644 paddle/go/crecordio/test/test.c diff --git a/paddle/go/CMakeLists.txt b/paddle/go/CMakeLists.txt deleted file mode 100644 index 51c5252d663..00000000000 --- a/paddle/go/CMakeLists.txt +++ /dev/null @@ -1,11 +0,0 @@ -include_directories(${CMAKE_CURRENT_BINARY_DIR}) - -go_library(adder SRCS adder.go) - -if (WITH_TESTING) - cc_test(cgo_test - SRCS - cgo_test.cc - DEPS - adder) -endif() diff --git a/paddle/go/adder.go b/paddle/go/adder.go deleted file mode 100644 index e14f40fd9fe..00000000000 --- a/paddle/go/adder.go +++ /dev/null @@ -1,10 +0,0 @@ -package main - -import "C" - -//export GoAdder -func GoAdder(x, y int) int { - return x + y -} - -func main() {} // Required but ignored diff --git a/paddle/go/cclient/CMakeLists.txt b/paddle/go/cclient/CMakeLists.txt index c85ff3db09d..e3e9fa9f1a4 100644 --- a/paddle/go/cclient/CMakeLists.txt +++ b/paddle/go/cclient/CMakeLists.txt @@ -1,31 +1,12 @@ cmake_minimum_required(VERSION 3.0) -if(GTEST_INCLUDE_DIR AND GTEST_LIBRARIES) - message("-- Found gtest (include: ${GTEST_INCLUDE_DIR}, library: ${GTEST_LIBRARIES})") -else() - # find cmake directory modules - get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) - get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) - get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake") - set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake") +project(cxx_go C Go) - # enable c++11 - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +include(golang) +include(flags) - # enable gtest - set(THIRD_PARTY_PATH ./third_party) - set(WITH_TESTING ON) - include(external/gtest) -endif() - -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") - -project(cxx_go CXX C Go) - -include(cmake/golang.cmake) -include(cmake/flags.cmake) - -ExternalGoProject_Add(pserver github.com/PaddlePaddle/Paddle/paddle/go/pserver) -add_go_library(client STATIC pserver) +add_go_library(client STATIC) add_subdirectory(test) diff --git a/paddle/go/cclient/cclient.go b/paddle/go/cclient/cclient.go index dc86d47e8d0..654b6f68a4f 100644 --- a/paddle/go/cclient/cclient.go +++ b/paddle/go/cclient/cclient.go @@ -78,8 +78,11 @@ func cArrayToSlice(p unsafe.Pointer, len int) []byte { return nil } - // create a Go clice backed by a C array, - // reference: https://github.com/golang/go/wiki/cgo#turning-c-arrays-into-go-slices + // create a Go clice backed by a C array, reference: + // https://github.com/golang/go/wiki/cgo#turning-c-arrays-into-go-slices + // + // Go garbage collector will not interact with this data, need + // to be freed from C side. return (*[1 << 30]byte)(p)[:len:len] } diff --git a/paddle/go/cgo_test.cc b/paddle/go/cgo_test.cc deleted file mode 100644 index 64efa606fff..00000000000 --- a/paddle/go/cgo_test.cc +++ /dev/null @@ -1,5 +0,0 @@ -#include -#include "gtest/gtest.h" -#include "libadder.h" - -TEST(Cgo, Invoke) { EXPECT_EQ(GoAdder(30, 12), 42); } diff --git a/paddle/go/cclient/cmake/CMakeDetermineGoCompiler.cmake b/paddle/go/cmake/CMakeDetermineGoCompiler.cmake similarity index 94% rename from paddle/go/cclient/cmake/CMakeDetermineGoCompiler.cmake rename to paddle/go/cmake/CMakeDetermineGoCompiler.cmake index b3f8fbe271d..a9bb6906c74 100644 --- a/paddle/go/cclient/cmake/CMakeDetermineGoCompiler.cmake +++ b/paddle/go/cmake/CMakeDetermineGoCompiler.cmake @@ -38,7 +38,7 @@ endif() mark_as_advanced(CMAKE_Go_COMPILER) -configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeGoCompiler.cmake.in +configure_file(${CMAKE_MODULE_PATH}/CMakeGoCompiler.cmake.in ${CMAKE_PLATFORM_INFO_DIR}/CMakeGoCompiler.cmake @ONLY) set(CMAKE_Go_COMPILER_ENV_VAR "GO_COMPILER") diff --git a/paddle/go/cclient/cmake/CMakeGoCompiler.cmake.in b/paddle/go/cmake/CMakeGoCompiler.cmake.in similarity index 100% rename from paddle/go/cclient/cmake/CMakeGoCompiler.cmake.in rename to paddle/go/cmake/CMakeGoCompiler.cmake.in diff --git a/paddle/go/cclient/cmake/CMakeGoInformation.cmake b/paddle/go/cmake/CMakeGoInformation.cmake similarity index 100% rename from paddle/go/cclient/cmake/CMakeGoInformation.cmake rename to paddle/go/cmake/CMakeGoInformation.cmake diff --git a/paddle/go/cclient/cmake/CMakeTestGoCompiler.cmake b/paddle/go/cmake/CMakeTestGoCompiler.cmake similarity index 100% rename from paddle/go/cclient/cmake/CMakeTestGoCompiler.cmake rename to paddle/go/cmake/CMakeTestGoCompiler.cmake diff --git a/paddle/go/cclient/cmake/flags.cmake b/paddle/go/cmake/flags.cmake similarity index 95% rename from paddle/go/cclient/cmake/flags.cmake rename to paddle/go/cmake/flags.cmake index 062d5ab660d..a167c432a92 100644 --- a/paddle/go/cclient/cmake/flags.cmake +++ b/paddle/go/cmake/flags.cmake @@ -21,7 +21,7 @@ function(CheckCompilerCXX11Flag) if (${CMAKE_CXX_COMPILER_VERSION} VERSION_LESS 3.3) message(FATAL_ERROR "Unsupported Clang version. Clang >= 3.3 required.") endif() - endif() + endif() endif() endfunction() @@ -42,4 +42,4 @@ if (CUDA_VERSION VERSION_GREATER "8.0" OR CUDA_VERSION VERSION_EQUAL "8.0") list(APPEND __arch_flags " -gencode arch=compute_60,code=sm_60") endif() -set(CUDA_NVCC_FLAGS ${__arch_flags} ${CUDA_NVCC_FLAGS}) \ No newline at end of file +set(CUDA_NVCC_FLAGS ${__arch_flags} ${CUDA_NVCC_FLAGS}) diff --git a/paddle/go/cclient/cmake/golang.cmake b/paddle/go/cmake/golang.cmake similarity index 50% rename from paddle/go/cclient/cmake/golang.cmake rename to paddle/go/cmake/golang.cmake index 5d39868bfdf..caddaae1bf4 100644 --- a/paddle/go/cclient/cmake/golang.cmake +++ b/paddle/go/cmake/golang.cmake @@ -1,22 +1,7 @@ set(GOPATH "${CMAKE_CURRENT_BINARY_DIR}/go") file(MAKE_DIRECTORY ${GOPATH}) - -function(ExternalGoProject_Add TARG) - add_custom_target(${TARG} env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get ${ARGN}) -endfunction(ExternalGoProject_Add) - -function(add_go_executable NAME) - file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") - add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp - COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build - -o "${CMAKE_CURRENT_BINARY_DIR}/${NAME}" - ${CMAKE_GO_FLAGS} ${GO_SOURCE} - WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) - - add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) - install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${NAME} DESTINATION bin) -endfunction(add_go_executable) - +set(PADDLE_IN_GOPATH "${GOPATH}/src/github.com/PaddlePaddle") +file(MAKE_DIRECTORY ${PADDLE_IN_GOPATH}) function(ADD_GO_LIBRARY NAME BUILD_TYPE) if(BUILD_TYPE STREQUAL "STATIC") @@ -32,6 +17,26 @@ function(ADD_GO_LIBRARY NAME BUILD_TYPE) endif() file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") + file(RELATIVE_PATH rel ${CMAKE_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) + + # find Paddle directory. + get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) + get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) + get_filename_component(PADDLE_DIR ${PARENT_DIR} DIRECTORY) + + # automatically get all dependencies specified in the source code + # for given target. + add_custom_target(goGet env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get -d ${rel}/...) + + # make a symlink that references Paddle inside $GOPATH, so go get + # will use the local changes in Paddle rather than checkout Paddle + # in github. + if(NOT EXISTS ${PADDLE_IN_GOPATH}) + add_custom_target(copyPaddle + COMMAND ln -s ${PADDLE_DIR} ${PADDLE_IN_GOPATH}) + add_dependencies(goGet copyPaddle) + endif() + add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" @@ -39,6 +44,7 @@ function(ADD_GO_LIBRARY NAME BUILD_TYPE) WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) + add_dependencies(${NAME} goGet) if(NOT BUILD_TYPE STREQUAL "STATIC") install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION bin) diff --git a/paddle/go/crecordio/CMakeLists.txt b/paddle/go/crecordio/CMakeLists.txt new file mode 100644 index 00000000000..db8f556e50b --- /dev/null +++ b/paddle/go/crecordio/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.0) + +get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake") + +project(cxx_go C Go) + +include(golang) +include(flags) + +add_go_library(recordio STATIC) +add_subdirectory(test) diff --git a/paddle/go/crecordio/crecordio.go b/paddle/go/crecordio/crecordio.go new file mode 100644 index 00000000000..3335d0795f0 --- /dev/null +++ b/paddle/go/crecordio/crecordio.go @@ -0,0 +1,208 @@ +package main + +/* +#include + +typedef int reader; +typedef int writer; +*/ +import "C" + +import ( + "io" + "log" + "os" + "path/filepath" + "strings" + "unsafe" + + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" +) + +var nullPtr = unsafe.Pointer(uintptr(0)) + +type writer struct { + w *recordio.Writer + f *os.File +} + +type reader struct { + buffer chan []byte + cancel chan struct{} +} + +func read(paths []string, buffer chan<- []byte, cancel chan struct{}) { + var curFile *os.File + var curScanner *recordio.Scanner + var pathIdx int + + var nextFile func() bool + nextFile = func() bool { + if pathIdx >= len(paths) { + return false + } + + path := paths[pathIdx] + pathIdx++ + f, err := os.Open(path) + if err != nil { + return nextFile() + } + + idx, err := recordio.LoadIndex(f) + if err != nil { + log.Println(err) + err = f.Close() + if err != nil { + log.Println(err) + } + + return nextFile() + } + + curFile = f + curScanner = recordio.NewScanner(f, idx, 0, -1) + return true + } + + more := nextFile() + if !more { + close(buffer) + return + } + + closeFile := func() { + err := curFile.Close() + if err != nil { + log.Println(err) + } + curFile = nil + } + + for { + for curScanner.Scan() { + select { + case buffer <- curScanner.Record(): + case <-cancel: + close(buffer) + closeFile() + return + } + } + + if err := curScanner.Error(); err != nil && err != io.EOF { + log.Println(err) + } + + closeFile() + more := nextFile() + if !more { + close(buffer) + return + } + } +} + +//export paddle_new_writer +func paddle_new_writer(path *C.char) C.writer { + p := C.GoString(path) + f, err := os.Create(p) + if err != nil { + log.Println(err) + return -1 + } + + w := recordio.NewWriter(f, -1, -1) + writer := &writer{f: f, w: w} + return addWriter(writer) +} + +func cArrayToSlice(p unsafe.Pointer, len int) []byte { + if p == nullPtr { + return nil + } + + // create a Go clice backed by a C array, reference: + // https://github.com/golang/go/wiki/cgo#turning-c-arrays-into-go-slices + // + // Go garbage collector will not interact with this data, need + // to be freed from C side. + return (*[1 << 30]byte)(p)[:len:len] +} + +//export paddle_writer_write +func paddle_writer_write(writer C.writer, buf *C.uchar, size C.int) int { + w := getWriter(writer) + b := cArrayToSlice(unsafe.Pointer(buf), int(size)) + _, err := w.w.Write(b) + if err != nil { + log.Println(err) + return -1 + } + + return 0 +} + +//export paddle_writer_release +func paddle_writer_release(writer C.writer) { + w := removeWriter(writer) + w.w.Close() + w.f.Close() +} + +//export paddle_new_reader +func paddle_new_reader(path *C.char, bufferSize C.int) C.reader { + p := C.GoString(path) + ss := strings.Split(p, ",") + var paths []string + for _, s := range ss { + match, err := filepath.Glob(s) + if err != nil { + log.Printf("error applying glob to %s: %v\n", s, err) + return -1 + } + + paths = append(paths, match...) + } + + if len(paths) == 0 { + log.Println("no valid path provided.", p) + return -1 + } + + buffer := make(chan []byte, int(bufferSize)) + cancel := make(chan struct{}) + r := &reader{buffer: buffer, cancel: cancel} + go read(paths, buffer, cancel) + return addReader(r) +} + +//export paddle_reader_next_item +func paddle_reader_next_item(reader C.reader, size *C.int) *C.uchar { + r := getReader(reader) + buf, ok := <-r.buffer + if !ok { + // channel closed and empty, reached EOF. + *size = -1 + return (*C.uchar)(nullPtr) + } + + if len(buf) == 0 { + // empty item + *size = 0 + return (*C.uchar)(nullPtr) + } + + ptr := C.malloc(C.size_t(len(buf))) + C.memcpy(ptr, unsafe.Pointer(&buf[0]), C.size_t(len(buf))) + *size = C.int(len(buf)) + return (*C.uchar)(ptr) +} + +//export paddle_reader_release +func paddle_reader_release(reader C.reader) { + r := removeReader(reader) + close(r.cancel) +} + +func main() {} // Required but ignored diff --git a/paddle/go/crecordio/register.go b/paddle/go/crecordio/register.go new file mode 100644 index 00000000000..61dfdbd4ab6 --- /dev/null +++ b/paddle/go/crecordio/register.go @@ -0,0 +1,61 @@ +package main + +/* +typedef int reader; +typedef int writer; +*/ +import "C" + +import "sync" + +var mu sync.Mutex +var handleMap = make(map[C.reader]*reader) +var curHandle C.reader +var writerMap = make(map[C.writer]*writer) +var curWriterHandle C.writer + +func addReader(r *reader) C.reader { + mu.Lock() + defer mu.Unlock() + reader := curHandle + curHandle++ + handleMap[reader] = r + return reader +} + +func getReader(reader C.reader) *reader { + mu.Lock() + defer mu.Unlock() + return handleMap[reader] +} + +func removeReader(reader C.reader) *reader { + mu.Lock() + defer mu.Unlock() + r := handleMap[reader] + delete(handleMap, reader) + return r +} + +func addWriter(w *writer) C.writer { + mu.Lock() + defer mu.Unlock() + writer := curWriterHandle + curWriterHandle++ + writerMap[writer] = w + return writer +} + +func getWriter(writer C.writer) *writer { + mu.Lock() + defer mu.Unlock() + return writerMap[writer] +} + +func removeWriter(writer C.writer) *writer { + mu.Lock() + defer mu.Unlock() + w := writerMap[writer] + delete(writerMap, writer) + return w +} diff --git a/paddle/go/crecordio/test/CMakeLists.txt b/paddle/go/crecordio/test/CMakeLists.txt new file mode 100644 index 00000000000..bac1006ae12 --- /dev/null +++ b/paddle/go/crecordio/test/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 3.0) + +include_directories(${CMAKE_BINARY_DIR}) + +add_executable(recordio_test test.c) +add_dependencies(recordio_test recordio) +set (CMAKE_EXE_LINKER_FLAGS "-pthread") +target_link_libraries(recordio_test ${CMAKE_BINARY_DIR}/librecordio.a) diff --git a/paddle/go/crecordio/test/test.c b/paddle/go/crecordio/test/test.c new file mode 100644 index 00000000000..bbf5964fd39 --- /dev/null +++ b/paddle/go/crecordio/test/test.c @@ -0,0 +1,31 @@ +#include +#include + +#include "librecordio.h" + +void panic() { + // TODO(helin): fix: gtest using cmake is not working, using this + // hacky way for now. + *(void*)0; +} + +int main() { + writer w = paddle_new_writer("/tmp/test"); + paddle_writer_write(w, "hello", 6); + paddle_writer_write(w, "hi", 3); + paddle_writer_release(w); + + reader r = paddle_new_reader("/tmp/test", 10); + int size; + unsigned char* item = paddle_reader_next_item(r, &size); + if (!strcmp(item, "hello") || size != 6) { + panic(); + } + free(item); + + item = paddle_reader_next_item(r, &size); + if (!strcmp(item, "hi") || size != 2) { + panic(); + } + free(item); +} diff --git a/paddle/go/recordio/README.md b/paddle/go/recordio/README.md index 8b0b9308b1a..fbf568ceba4 100644 --- a/paddle/go/recordio/README.md +++ b/paddle/go/recordio/README.md @@ -8,6 +8,7 @@ w := recordio.NewWriter(f) w.Write([]byte("Hello")) w.Write([]byte("World!")) w.Close() +f.Close() ``` ## Read @@ -18,6 +19,7 @@ w.Close() f, e := os.Open("a_file.recordio") idx, e := recordio.LoadIndex(f) fmt.Println("Total records: ", idx.Len()) + f.Close() ``` 2. Create one or more scanner to read a range of records. The @@ -30,7 +32,8 @@ w.Close() for s.Scan() { fmt.Println(string(s.Record())) } - if s.Err() != nil && s.Err() != io.EOF { + if s.Error() != nil && s.Error() != io.EOF { log.Fatalf("Something wrong with scanning: %v", e) } + f.Close() ``` -- GitLab