diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4cd8eb12f6b23c67e8fb22f43d57afd4a96770fd..61b989dc698798eca932516e558c63f107ef2754 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,3 +21,10 @@ sha: 28c0ea8a67a3e2dbbf4822ef44e85b63a0080a29 hooks: - id: clang-formater +- repo: https://github.com/dnephin/pre-commit-golang + sha: e4693a4c282b4fc878eda172a929f7a6508e7d16 + hooks: + - id: go-fmt + files: (.*\.go) + - id: go-lint + files: (.*\.go) diff --git a/.travis.yml b/.travis.yml index 16432dac0cf9121a74f6a7ecae462b22973461a1..498674469b27f585af798b95f30b74ebed99e32c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,16 +33,17 @@ addons: - ccache before_install: - if [[ "$JOB" == "check_style" ]]; then sudo ln -s /usr/bin/clang-format-3.8 /usr/bin/clang-format; fi - # Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python + # Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python # protobuf version. - pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker - pip install rarfile + - curl https://glide.sh/get | bash - eval "$(GIMME_GO_VERSION=1.8.3 gimme)" - | function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } script: - | - export WITH_GOLANG=ON && timeout 2580 paddle/scripts/travis/${JOB}.sh # 43min timeout + timeout 2580 paddle/scripts/travis/${JOB}.sh # 43min timeout RESULT=$?; if [ $RESULT -eq 0 ] || [ $RESULT -eq 142 ]; then true; else false; fi; notifications: email: diff --git a/cmake/external/protobuf.cmake b/cmake/external/protobuf.cmake index 3c74944bc21a131fe90e61777d3dce8b3f21900a..e629d61585c2d2ff916187ee28d4fd089a5bd857 100644 --- a/cmake/external/protobuf.cmake +++ b/cmake/external/protobuf.cmake @@ -17,6 +17,65 @@ INCLUDE(ExternalProject) FIND_PACKAGE(Protobuf QUIET) SET(PROTOBUF_FOUND "OFF") +if(NOT COMMAND protobuf_generate_python) # before cmake 3.4, protobuf_genrerate_python is not defined. + function(protobuf_generate_python SRCS) + # shameless copy from https://github.com/Kitware/CMake/blob/master/Modules/FindProtobuf.cmake + if(NOT ARGN) + message(SEND_ERROR "Error: PROTOBUF_GENERATE_PYTHON() called without any proto files") + return() + endif() + + if(PROTOBUF_GENERATE_CPP_APPEND_PATH) + # Create an include path for each file specified + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(ABS_PATH ${ABS_FIL} PATH) + list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${ABS_PATH}) + endif() + endforeach() + else() + set(_protobuf_include_path -I ${CMAKE_CURRENT_SOURCE_DIR}) + endif() + + if(DEFINED PROTOBUF_IMPORT_DIRS AND NOT DEFINED Protobuf_IMPORT_DIRS) + set(Protobuf_IMPORT_DIRS "${PROTOBUF_IMPORT_DIRS}") + endif() + + if(DEFINED Protobuf_IMPORT_DIRS) + foreach(DIR ${Protobuf_IMPORT_DIRS}) + get_filename_component(ABS_PATH ${DIR} ABSOLUTE) + list(FIND _protobuf_include_path ${ABS_PATH} _contains_already) + if(${_contains_already} EQUAL -1) + list(APPEND _protobuf_include_path -I ${ABS_PATH}) + endif() + endforeach() + endif() + + set(${SRCS}) + foreach(FIL ${ARGN}) + get_filename_component(ABS_FIL ${FIL} ABSOLUTE) + get_filename_component(FIL_WE ${FIL} NAME_WE) + if(NOT PROTOBUF_GENERATE_CPP_APPEND_PATH) + get_filename_component(FIL_DIR ${FIL} DIRECTORY) + if(FIL_DIR) + set(FIL_WE "${FIL_DIR}/${FIL_WE}") + endif() + endif() + + list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2.py") + add_custom_command( + OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}_pb2.py" + COMMAND ${Protobuf_PROTOC_EXECUTABLE} --python_out ${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL} + DEPENDS ${ABS_FIL} ${Protobuf_PROTOC_EXECUTABLE} + COMMENT "Running Python protocol buffer compiler on ${FIL}" + VERBATIM ) + endforeach() + + set(${SRCS} ${${SRCS}} PARENT_SCOPE) + endfunction() +endif() # Print and set the protobuf library information, # finish this cmake process and exit from this file. diff --git a/cmake/generic.cmake b/cmake/generic.cmake index cae9524b2fe1417f7792cbfd959280206f473797..8f65decda77946878daffda0afffab8e0b74b83d 100644 --- a/cmake/generic.cmake +++ b/cmake/generic.cmake @@ -99,25 +99,44 @@ function(merge_static_libs TARGET_NAME) set(libs ${ARGN}) list(REMOVE_DUPLICATES libs) - # First get the file names of the libraries to be merged + # Get all propagation dependencies from the merged libraries foreach(lib ${libs}) - set(libfiles ${libfiles} $) + list(APPEND libs_deps ${${lib}_LIB_DEPENDS}) endforeach() if(APPLE) # Use OSX's libtool to merge archives + # To produce a library we need at least one source file. + # It is created by add_custom_command below and will helps + # also help to track dependencies. set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}_dummy.c) + + # Make the generated dummy source file depended on all static input + # libs. If input lib changes,the source file is touched + # which causes the desired effect (relink). + add_custom_command(OUTPUT ${dummyfile} + COMMAND ${CMAKE_COMMAND} -E touch ${dummyfile} + DEPENDS ${libs}) + + # Generate dummy staic lib file(WRITE ${dummyfile} "const char * dummy = \"${dummyfile}\";") add_library(${TARGET_NAME} STATIC ${dummyfile}) + target_link_libraries(${TARGET_NAME} ${libs_deps}) + + foreach(lib ${libs}) + # Get the file names of the libraries to be merged + set(libfiles ${libfiles} $) + endforeach() add_custom_command(TARGET ${TARGET_NAME} POST_BUILD COMMAND rm "${CMAKE_CURRENT_BINARY_DIR}/lib${TARGET_NAME}.a" COMMAND /usr/bin/libtool -static -o "${CMAKE_CURRENT_BINARY_DIR}/lib${TARGET_NAME}.a" ${libfiles}) - else() # general UNIX: use "ar" to extract objects and re-add to a common lib + else() # general UNIX: use "ar" to extract objects and re-add to a common lib foreach(lib ${libs}) set(objlistfile ${lib}.objlist) # list of objects in the input library set(objdir ${lib}.objdir) add_custom_command(OUTPUT ${objdir} - COMMAND ${CMAKE_COMMAND} -E make_directory ${objdir}) + COMMAND ${CMAKE_COMMAND} -E make_directory ${objdir} + DEPENDS ${lib}) add_custom_command(OUTPUT ${objlistfile} COMMAND ${CMAKE_AR} -x "$" @@ -125,27 +144,27 @@ function(merge_static_libs TARGET_NAME) DEPENDS ${lib} ${objdir} WORKING_DIRECTORY ${objdir}) - # Empty dummy source file that goes into merged library - set(mergebase ${lib}.mergebase.c) - add_custom_command(OUTPUT ${mergebase} - COMMAND ${CMAKE_COMMAND} -E touch ${mergebase} - DEPENDS ${objlistfile}) + # Empty dummy source file that goes into merged library + set(mergebase ${lib}.mergebase.c) + add_custom_command(OUTPUT ${mergebase} + COMMAND ${CMAKE_COMMAND} -E touch ${mergebase} + DEPENDS ${objlistfile}) list(APPEND mergebases "${mergebase}") endforeach() - # We need a target for the output merged library add_library(${TARGET_NAME} STATIC ${mergebases}) + target_link_libraries(${TARGET_NAME} ${libs_deps}) + + # Get the file name of the generated library set(outlibfile "$") foreach(lib ${libs}) add_custom_command(TARGET ${TARGET_NAME} POST_BUILD - COMMAND ${CMAKE_AR} ru ${outlibfile} @"../${lib}.objlist" - WORKING_DIRECTORY ${lib}.objdir) + COMMAND ${CMAKE_AR} cr ${outlibfile} *.o + COMMAND ${CMAKE_RANLIB} ${outlibfile} + WORKING_DIRECTORY ${lib}.objdir) endforeach() - - add_custom_command(TARGET ${TARGET_NAME} POST_BUILD - COMMAND ${CMAKE_RANLIB} ${outlibfile}) endif() endfunction(merge_static_libs) @@ -194,7 +213,7 @@ function(cc_test TARGET_NAME) add_executable(${TARGET_NAME} ${cc_test_SRCS}) target_link_libraries(${TARGET_NAME} ${cc_test_DEPS} gtest gtest_main) add_dependencies(${TARGET_NAME} ${cc_test_DEPS} gtest gtest_main) - add_test(${TARGET_NAME} ${TARGET_NAME}) + add_test(NAME ${TARGET_NAME} COMMAND ${TARGET_NAME} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) endif() endfunction(cc_test) @@ -281,10 +300,11 @@ function(go_library TARGET_NAME) file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") string(REPLACE "${PADDLE_GO_PATH}/" "" CMAKE_CURRENT_SOURCE_REL_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + # FIXME: link path add_custom_command(TARGET ${TARGET_NAME} POST_BUILD COMMAND rm "${${TARGET_NAME}_LIB_PATH}" # Golang build source code - COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} + COMMAND GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} -o "${${TARGET_NAME}_LIB_PATH}" "./${CMAKE_CURRENT_SOURCE_REL_DIR}/${GO_SOURCE}" # must run under GOPATH @@ -299,11 +319,13 @@ function(go_binary TARGET_NAME) cmake_parse_arguments(go_binary "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) string(REPLACE "${PADDLE_GO_PATH}/" "" CMAKE_CURRENT_SOURCE_REL_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + # FIXME: link path add_custom_command(OUTPUT ${TARGET_NAME}_timestamp - COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build + COMMAND env LIBRARY_PATH=${CMAKE_BINARY_DIR}/go/pserver/client/c/:$ENV{LIBRARY_PATH} + GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build -o "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}" "./${CMAKE_CURRENT_SOURCE_REL_DIR}/${go_binary_SRCS}" - WORKING_DIRECTORY "${PADDLE_IN_GOPATH}/go") + WORKING_DIRECTORY "${PADDLE_IN_GOPATH}/go") # TODO: don't know what ${TARGET_NAME}_link does add_custom_target(${TARGET_NAME} ALL DEPENDS go_vendor ${TARGET_NAME}_timestamp ${go_binary_DEPS}) install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME} DESTINATION bin) @@ -332,3 +354,12 @@ function(proto_library TARGET_NAME) protobuf_generate_cpp(proto_srcs proto_hdrs ${proto_library_SRCS}) cc_library(${TARGET_NAME} SRCS ${proto_srcs} DEPS ${proto_library_DEPS} protobuf) endfunction() + +function(py_proto_compile TARGET_NAME) + set(oneValueArgs "") + set(multiValueArgs SRCS) + cmake_parse_arguments(py_proto_compile "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + set(py_srcs) + protobuf_generate_python(py_srcs ${py_proto_compile_SRCS}) + add_custom_target(${TARGET_NAME} ALL DEPENDS ${py_srcs}) +endfunction() \ No newline at end of file diff --git a/go/cmd/master/CMakeLists.txt b/go/cmd/master/CMakeLists.txt index 9e149967e71c9439bb00b973aa8723a809604aaf..1058ffa86b3f00b5e9525edca39a843da9b62db8 100644 --- a/go/cmd/master/CMakeLists.txt +++ b/go/cmd/master/CMakeLists.txt @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -go_binary(master SRC master.go) +go_binary(master SRC master.go DEPS paddle_go_optimizer) diff --git a/go/cmd/pserver/CMakeLists.txt b/go/cmd/pserver/CMakeLists.txt index bc1da3348cc21377421ce3db21ab8d4a8ee05894..51db6dff043db362460dcbb7328c99d3cdd51a9b 100644 --- a/go/cmd/pserver/CMakeLists.txt +++ b/go/cmd/pserver/CMakeLists.txt @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -go_binary(pserver SRCS pserver.go) +go_binary(pserver SRCS pserver.go DEPS paddle_go_optimizer) diff --git a/go/master/c/client.go b/go/master/c/client.go index 9e35e986002c0ae3b7593150ece96dba29a1521b..31f431197454c2ec6a25624d37b60876d99f0087 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -104,11 +104,22 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int return C.PADDLE_MASTER_OK } +// return value: +// 0:ok +// -1:error //export paddle_next_record func paddle_next_record(client C.paddle_master_client, record **C.uchar) C.int { c := get(client) - r := c.NextRecord() + r, err := c.NextRecord() + if err != nil { + // Error + // TODO: return the type of error? + *record = (*C.uchar)(nullPtr) + return -1 + } + if len(r) == 0 { + // Empty record *record = (*C.uchar)(nullPtr) return 0 } diff --git a/go/master/client.go b/go/master/client.go index d3bea49d0a8166420e83478076cc7bc81e48598d..05383f1bf40c0e2b1f974e802ee8fd6aac109b00 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -11,7 +11,12 @@ import ( // Client is the client of the master server. type Client struct { conn *connection.Conn - ch chan []byte + ch chan record +} + +type record struct { + r []byte + err error } // NewClient creates a new Client. @@ -21,7 +26,7 @@ type Client struct { func NewClient(addrCh <-chan string, bufSize int) *Client { c := &Client{} c.conn = connection.New() - c.ch = make(chan []byte, bufSize) + c.ch = make(chan record, bufSize) go c.monitorMaster(addrCh) go c.getRecords() return c @@ -46,10 +51,11 @@ func (c *Client) getRecords() { s := recordio.NewRangeScanner(f, &chunk.Index, -1, -1) for s.Scan() { - c.ch <- s.Record() + c.ch <- record{s.Record(), nil} } if s.Err() != nil { + c.ch <- record{nil, s.Err()} log.Errorln(err, chunk.Path) } @@ -116,6 +122,7 @@ func (c *Client) taskFinished(taskID int) error { // // NextRecord will block until the next record is available. It is // thread-safe. -func (c *Client) NextRecord() []byte { - return <-c.ch +func (c *Client) NextRecord() ([]byte, error) { + r := <-c.ch + return r.r, r.err } diff --git a/go/master/client_test.go b/go/master/client_test.go index c00aeebfd5d1fef6de4a8c67bf7f998a42ee863b..6666d3860c412daa8711fbfa2d729a261b3fd887 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -68,12 +68,17 @@ func TestNextRecord(t *testing.T) { for pass := 0; pass < 50; pass++ { received := make(map[byte]bool) for i := 0; i < total; i++ { - r := c.NextRecord() + r, err := c.NextRecord() + if err != nil { + t.Fatal(pass, i, "Read error:", err) + } + if len(r) != 1 { - t.Fatal("Length should be 1.", r) + t.Fatal(pass, i, "Length should be 1.", r) } + if received[r[0]] { - t.Fatal("Received duplicate.", received, r) + t.Fatal(pass, i, "Received duplicate.", received, r) } received[r[0]] = true } diff --git a/go/pserver/client/c/CMakeLists.txt b/go/pserver/client/c/CMakeLists.txt index a3fcaeef190a178c1eed806f3e03a14ced780eef..93a0a27f858f8654e0a6114abe7e326b086b8bf9 100644 --- a/go/pserver/client/c/CMakeLists.txt +++ b/go/pserver/client/c/CMakeLists.txt @@ -1,5 +1,8 @@ cc_library(paddle_go_optimizer DEPS paddle_optimizer paddle_proto glog gflags protobuf) +target_link_libraries(paddle_go_optimizer stdc++ m) go_library(paddle_pserver_cclient STATIC DEPS paddle_go_optimizer) if(WITH_TESTING) - add_subdirectory(test) + # FIXME: this test requires pserver which is not managed by the test + # we need some kind of e2e testing machanism. + # add_subdirectory(test) endif() diff --git a/go/pserver/client/c/test/CMakeLists.txt b/go/pserver/client/c/test/CMakeLists.txt index f287f850719afecf918f6a53f6528d1d15ff4672..dce8645ce753f6a14b298726c714be18de3834e4 100644 --- a/go/pserver/client/c/test/CMakeLists.txt +++ b/go/pserver/client/c/test/CMakeLists.txt @@ -1,2 +1,2 @@ -cc_test(test_cclient SRCS test_cclient.c DEPS paddle_pserver_cclient) +cc_test(test_cclient SRCS test_cclient.c DEPS paddle_pserver_cclient paddle_go_optimizer) add_style_check_target(test_cclient test_cclient.c) diff --git a/go/pserver/optimizer.go b/go/pserver/optimizer.go index d84f55b98742dbd00a8495b72e04d31f5bfc2782..54d108209402c27e79a9948f60ecbdadeffc7d9b 100644 --- a/go/pserver/optimizer.go +++ b/go/pserver/optimizer.go @@ -2,7 +2,7 @@ package pserver // #cgo CFLAGS: -I ../../ // //FIXME: ldflags contain "build" path -// #cgo LDFLAGS: ../../build/go/pserver/client/c/libpaddle_go_optimizer.a -lstdc++ -lm +// #cgo LDFLAGS: ${SRCDIR}/../../build/go/pserver/client/c/libpaddle_go_optimizer.a -lstdc++ -lm // #include "paddle/optimizer/optimizer.h" // #include // #include @@ -56,8 +56,8 @@ func newOptimizer(paramWithConfigs ParameterWithConfig) *optimizer { func (o *optimizer) GetWeights() []byte { var buffer unsafe.Pointer - buffer_len := C.paddle_optimizer_get_weights(o.opt, &buffer) - return cArrayToSlice(buffer, int(buffer_len)*C.sizeof_float) + bufferLen := C.paddle_optimizer_get_weights(o.opt, &buffer) + return cArrayToSlice(buffer, int(bufferLen)*C.sizeof_float) } func (o *optimizer) UpdateParameter(g Gradient) error { diff --git a/go/pserver/service.go b/go/pserver/service.go index 7711dc027e173e862f9b33e7a57224097026872c..ad16a5708d10bdcb5189a1e1e8abf13c54a72265 100644 --- a/go/pserver/service.go +++ b/go/pserver/service.go @@ -10,8 +10,10 @@ import ( type ElementType int const ( + // AlreadyInitialized is true if pserver is initialized AlreadyInitialized = "pserver already initialized" - Uninitialized = "pserver not fully initialized" + // Uninitialized is true if pserver not fully initialized + Uninitialized = "pserver not fully initialized" ) // Supported element types @@ -55,7 +57,7 @@ func NewService(idx int) (*Service, error) { s := &Service{ idx: idx, } - s.optMap = make(map[string]*optimizer) + s.optMap = make(map[string]*optimizer) s.initialized = make(chan struct{}) return s, nil } diff --git a/paddle/api/CMakeLists.txt b/paddle/api/CMakeLists.txt index 39d8aa075bc072d37dc8df67746f0d2b503418a6..84da89a1422b6095b995744cebb6a3af98a071c6 100644 --- a/paddle/api/CMakeLists.txt +++ b/paddle/api/CMakeLists.txt @@ -66,6 +66,7 @@ SWIG_LINK_LIBRARIES(swig_paddle paddle_trainer_lib paddle_network paddle_parameter + paddle_optimizer paddle_math paddle_utils paddle_proto diff --git a/paddle/framework/CMakeLists.txt b/paddle/framework/CMakeLists.txt index dcd70d285174a600b77523b606fbffc832ea68c3..970b2b9abde7be17a2427911dc9e3a4fa638a327 100644 --- a/paddle/framework/CMakeLists.txt +++ b/paddle/framework/CMakeLists.txt @@ -9,6 +9,9 @@ cc_test(enforce_test SRCS enforce_test.cc) proto_library(attr_type SRCS attr_type.proto) proto_library(op_proto SRCS op_proto.proto DEPS attr_type) cc_test(op_proto_test SRCS op_proto_test.cc DEPS op_proto protobuf) - proto_library(op_desc SRCS op_desc.proto DEPS attr_type) cc_test(op_desc_test SRCS op_desc_test.cc DEPS op_desc protobuf) +py_proto_compile(framework_py_proto SRCS attr_type.proto op_proto.proto op_desc.proto) +# Generate an empty __init__.py to make framework_py_proto as a valid python module. +add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) +add_dependencies(framework_py_proto framework_py_proto_init) diff --git a/paddle/gserver/layers/AverageLayer.h b/paddle/gserver/layers/AverageLayer.h index 332552a30479a368c24db10e5ef3a9d59408c8ef..db4a17bfb07de98fc092621a378c4fc23fa3adab 100644 --- a/paddle/gserver/layers/AverageLayer.h +++ b/paddle/gserver/layers/AverageLayer.h @@ -25,6 +25,10 @@ namespace paddle { * If SequenceLevel = kNonSeq: * Output: output size is the number of input sequences (NOT input instances) * output[i] = average_{for each instance in this sequence}{input[i]} + * If stride_ > 0: + * Output: a shorten sequence. Stride is the step size by which we slide a + * window upon the input sequence, and the average pooling + * operation is then applied to each interval independently. * If SequenceLevel = kSeq: * Check input sequence must has sub-sequence * Output: output size is the number of input sub-sequences diff --git a/paddle/gserver/layers/CrossChannelNormLayer.cpp b/paddle/gserver/layers/CrossChannelNormLayer.cpp index 3fbccc11032caa4878ce8dcfe7c34a261acee68b..d72503217f1c9533d0c78a2a1a853559f2a1294f 100644 --- a/paddle/gserver/layers/CrossChannelNormLayer.cpp +++ b/paddle/gserver/layers/CrossChannelNormLayer.cpp @@ -36,6 +36,16 @@ MatrixPtr CrossChannelNormLayer::createSpatialMatrix(MatrixPtr data, data->getData() + iter * spatialDim, 1, spatialDim, false, useGpu_); } +bool CrossChannelNormLayer::init(const LayerMap& layerMap, + const ParameterMap& parameterMap) { + Layer::init(layerMap, parameterMap); + CHECK(parameters_[0]); + const NormConfig& conf = config_.inputs(0).norm_conf(); + channels_ = conf.channels(); + scale_.reset(new Weight(channels_, 1, parameters_[0])); + return true; +} + void CrossChannelNormLayer::forward(PassType passType) { Layer::forward(passType); MatrixPtr inV = getInputValue(0); @@ -51,9 +61,7 @@ void CrossChannelNormLayer::forward(PassType passType) { Matrix::resizeOrCreate(dataBuffer_, batchSize, dataDim, false, useGpu_); Matrix::resizeOrCreate(spatialBuffer_, 1, spatialDim, false, useGpu_); Matrix::resizeOrCreate(normBuffer_, batchSize, spatialDim, false, useGpu_); - normBuffer_->zeroMem(); - // add eps to avoid overflow - normBuffer_->addScalar(*normBuffer_, 1e-6); + inV->square2(*dataBuffer_); for (size_t i = 0; i < batchSize; i++) { const MatrixPtr inVTmp = createSampleMatrix(inV, i, spatialDim); @@ -63,6 +71,8 @@ void CrossChannelNormLayer::forward(PassType passType) { // compute norm. spatialBuffer_->sumCols(*dataTmp, 1, 0); + // add eps to avoid overflow + spatialBuffer_->add(1e-6); spatialBuffer_->sqrt2(*spatialBuffer_); normTmp->copyFrom(*spatialBuffer_); outVTmp->copyFrom(*inVTmp); @@ -82,6 +92,9 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) { size_t dataDim = inG->getWidth(); size_t spatialDim = dataDim / channels_; + MatrixPtr inGBuffer; + Matrix::resizeOrCreate(inGBuffer, channels_, spatialDim, false, useGpu_); + dataBuffer_->dotMul(*outG, *outV); Matrix::resizeOrCreate(scaleDiff_, channels_, 1, false, useGpu_); Matrix::resizeOrCreate(channelBuffer_, channels_, 1, false, useGpu_); @@ -100,22 +113,24 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) { scaleDiff_->add(*channelBuffer_, 1.); sampleBuffer_->dotMul(*inVTmp, *outGTmp); - spatialBuffer_->sumCols(*sampleBuffer_, 1., 1.); + spatialBuffer_->sumCols(*sampleBuffer_, 1., 0.); // scale the grad - inGTmp->copyFrom(*inVTmp); - inGTmp->mulRowVector(*spatialBuffer_); + inGBuffer->copyFrom(*inVTmp); + inGBuffer->mulRowVector(*spatialBuffer_); // divide by square of norm spatialBuffer_->dotMul(*normTmp, *normTmp); - inGTmp->divRowVector(*spatialBuffer_); + inGBuffer->divRowVector(*spatialBuffer_); // subtract - inGTmp->add(*outGTmp, -1, 1); + inGBuffer->add(*outGTmp, -1, 1); // divide by norm - inGTmp->divRowVector(*normTmp); + inGBuffer->divRowVector(*normTmp); // scale the diff - inGTmp->mulColVector(*scale_->getW()); + inGBuffer->mulColVector(*scale_->getW()); + + inGTmp->add(*inGBuffer); } // updata scale - if (scale_->getWGrad()) scale_->getWGrad()->copyFrom(*scaleDiff_); + if (scale_->getWGrad()) scale_->getWGrad()->add(*scaleDiff_); scale_->getParameterPtr()->incUpdate(callback); } diff --git a/paddle/gserver/layers/MaxLayer.h b/paddle/gserver/layers/MaxLayer.h index baa58ca2d7a6970f0d2f3ef6f8609404c82efa30..fa536fce2b4818337520478a6590bae36b26d09a 100644 --- a/paddle/gserver/layers/MaxLayer.h +++ b/paddle/gserver/layers/MaxLayer.h @@ -26,6 +26,10 @@ namespace paddle { * If SequenceLevel = kNonSeq: * Output: output size is the number of input sequences (NOT input instances) * output[i] = max_{for each instance in this sequence}{input[i]} + * If stride_ > 0: + * Output: a shorten sequence. Stride is the step size by which we slide a + * window upon the input sequence, and the max pooling operation is + * then applied to each interval independently. * If SequenceLevel = kSeq: * Check input sequence must has sub-sequence * Output: output size is the number of input sub-sequences diff --git a/paddle/gserver/layers/NormLayer.cpp b/paddle/gserver/layers/NormLayer.cpp index e094078bfe86e30c06e1b80ebc04c8213fe9abcf..caef7100929c7e3df4904b577cb7c2178466ddfc 100644 --- a/paddle/gserver/layers/NormLayer.cpp +++ b/paddle/gserver/layers/NormLayer.cpp @@ -56,14 +56,4 @@ bool ResponseNormLayer::init(const LayerMap& layerMap, return true; } -bool CrossChannelNormLayer::init(const LayerMap& layerMap, - const ParameterMap& parameterMap) { - Layer::init(layerMap, parameterMap); - CHECK(parameters_[0]); - const NormConfig& conf = config_.inputs(0).norm_conf(); - channels_ = conf.channels(); - scale_.reset(new Weight(channels_, 1, parameters_[0])); - return true; -} - } // namespace paddle diff --git a/paddle/gserver/layers/SequenceLastInstanceLayer.cpp b/paddle/gserver/layers/SequenceLastInstanceLayer.cpp index 944c7051668dccf39dd2ace14986d43c8a14e452..323cc47df199a6cb5e8e87cad4aaf51a92c36f81 100644 --- a/paddle/gserver/layers/SequenceLastInstanceLayer.cpp +++ b/paddle/gserver/layers/SequenceLastInstanceLayer.cpp @@ -26,10 +26,9 @@ namespace paddle { * If SequenceLevel = kNonseq: * Output: a sequence containing only the last instance of the input sequence * If stride_ > 0: - * Output: a shorten sequence. The operation of getting last instance of a - * sequence is independently performed on every slice of the input - * sequence, which is obtained by sliding a window with the window - * size set to stride_. + * Output: a shorten sequence. Stride is the step size by which we slide a + * window upon the input sequence, and getting last instance + * operation is then applied to each interval independently. * If SequenceLevel = kSeq: * Check input sequence must has sub-sequence * Output: a sequence containing only the last instance of each sub-sequence @@ -73,8 +72,7 @@ bool SequenceLastInstanceLayer::init(const LayerMap& layerMap, void SequenceLastInstanceLayer::forward(PassType passType) { SequencePoolLayer::forward(passType); - auto starts = (stride_ > 0) ? stridePositions_->getData() - : startPositions_->getData(false); + auto starts = startPositions_->getData(false); MatrixPtr inputValue = getInputValue(0); MatrixPtr outputValue = getOutputValue(); diff --git a/paddle/gserver/layers/SequencePoolLayer.cpp b/paddle/gserver/layers/SequencePoolLayer.cpp index 4179a9e7e0cb58fcb49bff712e62b9f3fea373bd..2a693b110a562ce3938643c919bfb1a4d3cd1f80 100644 --- a/paddle/gserver/layers/SequencePoolLayer.cpp +++ b/paddle/gserver/layers/SequencePoolLayer.cpp @@ -72,9 +72,8 @@ void SequencePoolLayer::forward(PassType passType) { if (stride_ > 0) { CHECK_EQ(input.hasSubseq(), 0UL) << "sequence stride pooling is invalid for hasSubseq now"; - output_.poolSequenceWithStride( - input, stride_, &stridePositions_, reversed_); - newBatchSize_ = stridePositions_->getSize() - 1; + output_.poolSequenceWithStride(input, stride_, &startPositions_, reversed_); + newBatchSize_ = startPositions_->getSize() - 1; } resetOutput(newBatchSize_, dim); diff --git a/paddle/gserver/layers/SequencePoolLayer.h b/paddle/gserver/layers/SequencePoolLayer.h index 293d1bf27823ffb0ebddba95461883d646f159ae..e207afd1dce80e646b220c5be601fd3a6bd36bac 100644 --- a/paddle/gserver/layers/SequencePoolLayer.h +++ b/paddle/gserver/layers/SequencePoolLayer.h @@ -28,8 +28,9 @@ namespace paddle { * sequence}{input[i]} * If stride_ > 0: * Check input sequence must not have sub-sequence - * Output: a shorten sequence, pooling is performed upon a small local - * area + * Output: a shorten sequence. Stride is the step size by which we slide + * a window upon the input sequence, and the pooling operation + * is then applied to each interval independently. * If SequenceLevel = kSeq: * Check input sequence must has sub-sequence * Output: output size is the number of input sub-sequences @@ -47,8 +48,6 @@ protected: size_t newBatchSize_; ICpuGpuVectorPtr startPositions_; int stride_; - // Store the start position of each window. - IVectorPtr stridePositions_; // Whether the input sequence is reversed or not. bool reversed_ = false; diff --git a/paddle/gserver/tests/LayerGradUtil.cpp b/paddle/gserver/tests/LayerGradUtil.cpp index e3591ba4df88f547e48bf07d4339d5f25db95e81..9eca58f1a1baa6fb1c404a91a345bc7f9d6b4acc 100644 --- a/paddle/gserver/tests/LayerGradUtil.cpp +++ b/paddle/gserver/tests/LayerGradUtil.cpp @@ -241,7 +241,7 @@ void testBatchState(LayerPtr testLayer, std::vector args; args.push_back(out); - EXPECT_EQ(0, Argument::sum(args)) << "testBatchState failed"; + ASSERT_NEAR(0, Argument::sum(args), 1e-5) << "testBatchState failed"; for (size_t seqId = 0; seqId < numSequences; ++seqId) { start[seqId] += seqLens[seqId]; } @@ -465,7 +465,6 @@ void initTestLayer(TestConfig testConf, ParameterConfig paraConfig) { paraConfig.set_name(paraName); paraConfig.set_size(paraSize); - paraConfig.set_initial_std(1); paraConfig.set_is_static(isStatic); auto para = std::make_shared(paraConfig, FLAGS_use_gpu, initialize); @@ -499,6 +498,9 @@ void initTestLayer(TestConfig testConf, paraConfig.add_dims((*layerMap)[input.input_layer_name()]->getSize()); paraConfig.add_dims(testConf.layerConfig.size()); } + CHECK_GE(testConf.paramInitialStd, 0); + paraConfig.set_initial_mean(testConf.paramInitialMean); + paraConfig.set_initial_std(testConf.paramInitialStd); initParameter(paraName, paraSize, inputDef.isStatic, false, paraConfig); } } diff --git a/paddle/gserver/tests/LayerGradUtil.h b/paddle/gserver/tests/LayerGradUtil.h index 18a6525a145fbf7539e8e84bd162a3b4345394dc..d299b4dd09418589514d99a72f83e1103ace7de1 100644 --- a/paddle/gserver/tests/LayerGradUtil.h +++ b/paddle/gserver/tests/LayerGradUtil.h @@ -125,12 +125,16 @@ struct TestConfig { LayerConfig layerConfig; std::vector inputDefs; size_t biasSize; + real paramInitialMean; + real paramInitialStd; bool testAccumulate; bool testState; bool staticBias; bool testBatchState; TestConfig() : biasSize(0), + paramInitialMean(0.0), + paramInitialStd(1.0), testAccumulate(true), testState(false), staticBias(false), diff --git a/paddle/gserver/tests/test_LayerGrad.cpp b/paddle/gserver/tests/test_LayerGrad.cpp index 59d1e9273d42d6a53ec284c6ed684096b3f42321..67251f08e34faff57d9e6fd6a1163ba655619a8b 100644 --- a/paddle/gserver/tests/test_LayerGrad.cpp +++ b/paddle/gserver/tests/test_LayerGrad.cpp @@ -845,8 +845,12 @@ void testDegradeLayer(bool hasSubseq, TEST(Layer, MaxLayer) { testDegradeLayer(false, "max", "non-seq", -1); // seq max to non-seq - testDegradeLayer(true, "max", "non-seq", -1); // hasSubseq max to non-seq - testDegradeLayer(true, "max", "seq", -1); // hasSubseq max to seq + testDegradeLayer(false, + "max", + "non-seq", + 5); // seq max to a shorten seq, stride window = 5 + testDegradeLayer(true, "max", "non-seq", -1); // hasSubseq max to non-seq + testDegradeLayer(true, "max", "seq", -1); // hasSubseq max to seq } TEST(Layer, SequenceLastInstanceLayer) { @@ -868,6 +872,10 @@ TEST(Layer, SequenceLastInstanceLayer) { TEST(Layer, AverageLayer) { testDegradeLayer(false, "average", "non-seq", -1); // seq average to non-seq + testDegradeLayer(false, + "average", + "non-seq", + 5); // seq average to a shorten seq, stride window = 5 testDegradeLayer( true, "average", "non-seq", -1); // hasSubseq average to non-seq testDegradeLayer(true, "average", "seq", -1); // hasSubseq average to seq @@ -1661,6 +1669,8 @@ TEST(Layer, PadLayer) { TEST(Layer, CrossChannelNormLayer) { TestConfig config; + config.paramInitialMean = 1.; + config.paramInitialStd = 0.; config.layerConfig.set_type("norm"); config.layerConfig.set_size(100); LayerInputConfig* input = config.layerConfig.add_inputs(); @@ -1674,7 +1684,7 @@ TEST(Layer, CrossChannelNormLayer) { config.inputDefs.push_back({INPUT_DATA, "layer_0", 100, 10}); for (auto useGpu : {false, true}) { - testLayerGrad(config, "cross-channel-norm", 10, false, useGpu, false, 5); + testLayerGrad(config, "cross-channel-norm", 10, false, useGpu, false); } } diff --git a/paddle/parameter/Argument.cpp b/paddle/parameter/Argument.cpp index 5beced3bb5a1050078f88dfd4350a2df71d27f35..ef72b973c1a465a8ac03cae1070429160eac0ac1 100644 --- a/paddle/parameter/Argument.cpp +++ b/paddle/parameter/Argument.cpp @@ -561,7 +561,7 @@ void Argument::degradeSequence(const Argument& input) { void Argument::poolSequenceWithStride(const Argument& input, size_t stride, - IVectorPtr* stridePostions, + ICpuGpuVectorPtr* stridePostions, bool reversed) { // If input.sequenceStartPositions = [0, 9, 14, 17, 30] and stride = 5, // then sequenceStartPositions = [0, 2, 3, 4, 7]. @@ -598,8 +598,8 @@ void Argument::poolSequenceWithStride(const Argument& input, stridePos.emplace_back(starts[numSequences]); int size = stridePos.size(); CHECK_EQ(size - 1, tgtBuf[numSequences]); - IVector::resizeOrCreate(*stridePostions, size, false); - (*stridePostions)->copyFrom(stridePos.data(), size); + ICpuGpuVector::resizeOrCreate(*stridePostions, size, false); + (*stridePostions)->getMutableVector(false)->copyFrom(stridePos.data(), size); } void Argument::getValueString( diff --git a/paddle/parameter/Argument.h b/paddle/parameter/Argument.h index 09bd633616730dc9475edc596128166f4f70b0cd..0ccdef802e71b659788cfd24f28ebe43e1917db1 100644 --- a/paddle/parameter/Argument.h +++ b/paddle/parameter/Argument.h @@ -299,7 +299,7 @@ struct Argument { */ void poolSequenceWithStride(const Argument& input, size_t stride, - IVectorPtr* stridePositions, + ICpuGpuVectorPtr* stridePositions, bool reversed = false); /** * @brief getValueString will return the argument's output in string. There diff --git a/paddle/parameter/tests/test_argument.cpp b/paddle/parameter/tests/test_argument.cpp index 98ab013548734059060eb06ce1a7cec23dbf1b72..19df6ea95745609a4eb7487d422e61d2f0b269cc 100644 --- a/paddle/parameter/tests/test_argument.cpp +++ b/paddle/parameter/tests/test_argument.cpp @@ -31,7 +31,7 @@ TEST(Argument, poolSequenceWithStride) { int strideResultReversed[] = {0, 4, 9, 14, 17, 20, 25, 30}; for (auto reversed : {false, true}) { - IVectorPtr stridePositions; + ICpuGpuVectorPtr stridePositions; output.poolSequenceWithStride( input, 5 /* stride */, &stridePositions, reversed); @@ -45,7 +45,7 @@ TEST(Argument, poolSequenceWithStride) { CHECK_EQ(stridePositions->getSize(), 8UL); auto result = reversed ? strideResultReversed : strideResult; for (int i = 0; i < 8; i++) { - CHECK_EQ(stridePositions->getData()[i], result[i]); + CHECK_EQ(stridePositions->getData(false)[i], result[i]); } } } diff --git a/paddle/parameter/tests/test_common.cpp b/paddle/parameter/tests/test_common.cpp index 8bab5a6289e2bb9f634e8cce4557de55f7704447..64d204aea10c8a7905d90fac6ebccde3c9da1edc 100644 --- a/paddle/parameter/tests/test_common.cpp +++ b/paddle/parameter/tests/test_common.cpp @@ -172,53 +172,3 @@ TEST_F(CommonTest, syncThreadPool) { EXPECT_EQ((int)0, nums[i]); } } - -TEST_F(CommonTest, barrierStat) { - const int threadNum = 10; - - SyncThreadPool pool(threadNum); - -#define TEST_BARRIER_RANDOM(statName, numConnThreads, ...) \ - pool.exec([&](int tid, size_t numThreads) { \ - struct timeval time; \ - gettimeofday(&time, nullptr); \ - uint64_t usec = timeToMicroSecond(time); \ - std::srand(usec); \ - auto value = std::rand() % 100000; \ - usleep(value); \ - REGISTER_SLOW_NODES_PROBE( \ - globalStat, statName, numConnThreads, tid, __VA_ARGS__); \ - }); - - for (auto i = 0; i < 10; i++) { - TEST_BARRIER_RANDOM("synThreadBarrier1", threadNum); - TEST_BARRIER_RANDOM("synThreadBarrier2", threadNum); - } - - globalStat.printAllStatus(); - globalStat.reset(); - - for (auto i = 0; i < 10; i++) { - TEST_BARRIER_RANDOM("synThreadBarrier3", threadNum, "tag0"); - TEST_BARRIER_RANDOM("synThreadBarrier4", threadNum, "tag1"); - } - - globalStat.printAllStatus(); - globalStat.reset(); - -// use it to test accurate barrier gap -#define TEST_BARRIER(statName, numConnThreads, ...) \ - pool.exec([&](int tid, size_t numThreads) { \ - usleep(tid * 10000); \ - REGISTER_SLOW_NODES_PROBE( \ - globalStat, statName, numConnThreads, tid, __VA_ARGS__); \ - }); - - for (auto i = 0; i < 10; i++) { - TEST_BARRIER("synThreadBarrier3", threadNum, "tag0"); - TEST_BARRIER("synThreadBarrier4", threadNum, "tag1"); - } - - globalStat.printAllStatus(); - globalStat.reset(); -} diff --git a/paddle/pserver/LightNetwork.cpp b/paddle/pserver/LightNetwork.cpp index 922f25734dee0a6db7fbcfcef3d29d2bad5b7858..8616fd2d5aef666f16533fe062f3f40a7a2b202d 100644 --- a/paddle/pserver/LightNetwork.cpp +++ b/paddle/pserver/LightNetwork.cpp @@ -142,7 +142,7 @@ SocketServer::SocketServer(const std::string &addr, int port, int rdmaCpu) } /// trigger to initialize RDMA lib - PCHECK(RdmaClientDaemons::get()) << "initilizate RDMA failed\n"; + CHECK(RdmaClientDaemons::get()) << "initilizate RDMA failed\n"; } SocketServer::~SocketServer() { @@ -168,7 +168,7 @@ void SocketServer::tcpServer() { /// First call to socket() function socket_ = socket(AF_INET, SOCK_STREAM, 0); - PCHECK(socket_ >= 0) << "ERROR opening socket"; + CHECK(socket_ >= 0) << "ERROR opening socket"; /// Initialize socket structure bzero((char *)&serv_addr, sizeof(serv_addr)); @@ -176,7 +176,7 @@ void SocketServer::tcpServer() { serv_addr.sin_port = htons(port_); if (!addr_.empty()) { server = gethostbyname(addr_.c_str()); - PCHECK(server) << "ERROR, no such host: " << addr_; + CHECK(server) << "ERROR, no such host: " << addr_; bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length); @@ -187,7 +187,7 @@ void SocketServer::tcpServer() { setOption(socket_); /// Now bind the host address using bind() call. - PCHECK(bind(socket_, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) >= 0) + CHECK(bind(socket_, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) >= 0) << "ERROR on binding " << addr_; /// Now start listening for the clients, here process will @@ -201,7 +201,7 @@ void SocketServer::tcpServer() { if (stopping_) { break; } - PCHECK(newsockfd >= 0) << "ERROR on accept"; + CHECK(newsockfd >= 0) << "ERROR on accept"; constexpr int kPeerNameLen = 128; char peerName[kPeerNameLen]; CHECK(inet_ntop(AF_INET, &cli_addr.sin_addr, peerName, kPeerNameLen)); @@ -227,14 +227,14 @@ void SocketServer::rdmaServer() { /// First call to socket() function rdmaSocket_ = rdma::ssocket(rdmaCpu_); - PCHECK(rdmaSocket_) << "ERROR opening RDMA socket"; + CHECK(rdmaSocket_) << "ERROR opening RDMA socket"; - PCHECK(rdma::bind(rdmaSocket_, rdmaUri_.c_str()) == 0) + CHECK(rdma::bind(rdmaSocket_, rdmaUri_.c_str()) == 0) << "ERROR bind RDMA socket"; /// Now start listening for the clients, here process will /// go in sleep mode and will wait for the incoming connection - PCHECK(rdma::listen(rdmaSocket_) == 0) << "ERROR listen RDMA socket"; + CHECK(rdma::listen(rdmaSocket_) == 0) << "ERROR listen RDMA socket"; while (true) { /// Accept actual connection from the client @@ -242,7 +242,7 @@ void SocketServer::rdmaServer() { if (stopping_) { break; } - PCHECK(newsock) << "ERROR on accept"; + CHECK(newsock) << "ERROR on accept"; constexpr int kPeerNameLen = 128; char peerName[kPeerNameLen]; @@ -290,7 +290,7 @@ RdmaClientDaemons::RdmaClientDaemons() { onlineCpus_ = rdma::numCpus(); for (auto i = 0; i < onlineCpus_; i++) { socket = rdma::csocket(i); - PCHECK(socket) << "ERROR open client socket daemon"; + CHECK(socket) << "ERROR open client socket daemon"; rdmaClientSocket_.push_back(socket); } @@ -355,7 +355,7 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) { /// Create a socket point int sockfd = socket(AF_INET, SOCK_STREAM, 0); - PCHECK(sockfd >= 0) << "ERROR opening socket"; + CHECK(sockfd >= 0) << "ERROR opening socket"; #if defined(__OSX__) || defined(__APPLE__) server = getipnodebyname(serverAddr.c_str(), AF_INET, AI_DEFAULT, &errRet); @@ -396,8 +396,8 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) { } std::this_thread::sleep_for(std::chrono::seconds(1)); } else { - PCHECK(errno != 0) << "ERROR connecting to " << serverAddr << ":" - << serverPort << "errorno: " << errno; + CHECK(errno != 0) << "ERROR connecting to " << serverAddr << ":" + << serverPort << "errorno: " << errno; } } while (errno == ECONNREFUSED); @@ -426,7 +426,7 @@ void SocketClient::RdmaClient(const std::string &serverAddr, int serverPort) { /// connect to server with socket daemon sock = rdma::connect(socketDaemon_, rdmaUri.c_str()); - PCHECK(sock) << "ERROR connect to server" << rdmaUri; + CHECK(sock) << "ERROR connect to server" << rdmaUri; std::vector seg; str::split(rdmaUri, '/', &seg); diff --git a/paddle/pserver/ParameterServer2.cpp b/paddle/pserver/ParameterServer2.cpp index 41ac15336d3150417da1cf1631319604584991ec..d7c1d4f788f44c6bfcec040ba24bdc454348c911 100644 --- a/paddle/pserver/ParameterServer2.cpp +++ b/paddle/pserver/ParameterServer2.cpp @@ -217,10 +217,6 @@ void ParameterServer2::setConfig(const SetConfigRequest& request, SetConfigResponse response; callback(response); - - /// always defined, barrier slowest node function need it. - statSet_.reset(new StatSet("ParameterServer" + - str::to_string(static_cast(serverId_)))); } real bufferSum(const std::vector& buffers) { @@ -369,50 +365,7 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, std::vector* outputBuffers) { VLOG(1) << "pserver: addGradient"; - // forwardbackward delta from all trainers - // indicate the fluctuation caused by forwardbackward. - if (!numPassFinishClients_) { - REGISTER_BARRIER_DELTA_SERVER_SET( - *statSet_, - "forwardbackwardDelta", - FLAGS_num_gradient_servers, - request.trainer_id(), - request.forwardbackward_time(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - { - /// approximately pure network overhead - REGISTER_TIMER_DYNAMIC_SET( - "pushRecv", timeToMicroSecond(*handleRequestBegin_), -1, *statSet_); - } - -#ifndef PADDLE_DISABLE_TIMER - gettimeofday(&(*addGradBegin_), nullptr); -#endif - - /// barrier fluctuation caused by network and previous forwardbackward - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER_SET( - *statSet_, - "handleReqBegin", - FLAGS_num_gradient_servers, - request.trainer_id(), - (*handleRequestBegin_), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "addGradBegin", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - - { - REGISTER_TIMER_DYNAMIC("addGradCore", -1, *statSet_); ReadLockGuard guard(parameterMutex_); int bufferIndex = 0; for (const auto& block : request.blocks()) { @@ -444,15 +397,6 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, std::lock_guard guard(*info.lock); simd::addTo(gradientSumBuffer, gradientBuffer, size); } - - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "addGradCoreFinish", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } } if (request.batch_status() == BATCH_FINISH || request.batch_status() == BATCH_START_AND_FINISH) { @@ -461,47 +405,12 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, VLOG(1) << "num samples: " << numSamplesProcessed_ << ", new cost:" << cost_; - /// numPassFinishClients_ means some trainer has entered finishPass - if (!numPassFinishClients_) { - REGISTER_SLOW_NODES_PROBE( - *statSet_, - "SLOW_NODES", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - /// notify doOperation gradient ready gradientReadyBarrier_.wait(); - /// if wait pass finish does not start, do check - if (!numPassFinishClients_) { - CHECK_BARRIER_TIMER(*statSet_, - "SLOW_NODES", - FLAGS_num_gradient_servers, - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - - /// barrier performance while all parameter add is finished - /// can indicate the fluctation caused by computation at pserver. - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "paraReady", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } /// wait doOperation finish parameterReadyBarrier_.wait(); VLOG(1) << "start send back"; - { - /// total time except overhead of network. - REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecvNoSend", - timeToMicroSecond(*addGradBegin_), - -1, - *statSet_); - } } } @@ -543,57 +452,6 @@ bool ParameterServer2::asyncGrdientCommitCheckAndStat( return commitGradient; } -void ParameterServer2::printAsyncGradientCommitStatAndReset() { - std::stringstream statFormat; - if (asyncUpdateSteps_) { - statFormat << "async discard gradients stat: " << std::endl; - statFormat << "serverId: " << serverId_ - << " serverType: " << isSparseServer_ - << " total updates: " << asyncUpdateSteps_ - << " discard updates: " << asyncLaggedGradientsNum_ - << " discard ratio: " - << (real)asyncLaggedGradientsNum_ / (real)asyncUpdateSteps_; - statFormat << std::endl; - statFormat << std::endl; - - statFormat << "Async Gradient Update Steps distribution: " << std::endl - << "Sample: 1:1912(0.00284449) means " - << "the updates step=1 count 1912 times " - << "and account for 0.284449% of total updates" << std::endl; - size_t index = 0; - for (const auto& stat : asyncUpdateStat_) { - statFormat << index << ":" << stat << "(" - << (real)stat / (real)asyncUpdateSteps_ << ") "; - index++; - } - statFormat << std::endl; - statFormat << std::endl; - - statFormat << "Async Gradient Discard based on trainer_id: " << std::endl - << "Sample: 2:22(0.0016363) means " - << "total discarded updates from trainer_id=2 count 22 " - << "and account for 0.16363% of all updates from trainer_id=2" - << std::endl; - for (auto i = 0; i < FLAGS_num_gradient_servers; i++) { - real ratio = - (real)asyncTrainerDiscardStat_[i] / - (real)(asyncTrainerCommitStat_[i] + asyncTrainerDiscardStat_[i]); - statFormat << i << ":" << asyncTrainerDiscardStat_[i] << "(" << ratio - << ")" - << " "; - } - LOG(INFO) << statFormat.str(); - - /// reset stat - asyncUpdateSteps_ = 0; - asyncTrainerSteps_.assign(asyncTrainerSteps_.size(), 0); - asyncLaggedGradientsNum_ = 0; - asyncUpdateStat_.assign(asyncUpdateStat_.size(), 0); - asyncTrainerDiscardStat_.assign(asyncTrainerDiscardStat_.size(), 0); - asyncTrainerCommitStat_.assign(asyncTrainerCommitStat_.size(), 0); - } -} - static ThreadLocal> localBlockBitset_; void ParameterServer2::asyncSGD(const SendParameterRequest& request, @@ -695,7 +553,6 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request, if (request.trainer_id() == 0) { /// batchId_ is approximately equal to "real batchId_" batchId_++; - tuningAsyncsgdMidOutput(); } } @@ -881,34 +738,6 @@ void ParameterServer2::sendParameter(const SendParameterRequest& request, } (*requestVec_).clear(); (*callbackVec_).clear(); - - /// barrier perfromance while all data are send finished. - /// indicates network flucatuation for big message. - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "sendParamFinish", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - /// all time exhausted in parameterServer for big message. - /// it contains network and computation at pserver. - { - /// total time including overhead of network. - REGISTER_TIMER_DYNAMIC_SET("sendParaTotal", - timeToMicroSecond(*handleRequestBegin_), - -1, - *statSet_); - } - /// all time exhausted in pserverServer except recieve network. - { - /// total time except overhead of network receive - REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecv", - timeToMicroSecond(*addGradBegin_), - -1, - *statSet_); - } } break; case PSERVER_UPDATE_MODE_SET_PARAM: @@ -1088,8 +917,6 @@ void ParameterServer2::op_SGD(const Operation& operation, } { - REGISTER_TIMER_DYNAMIC("op_SGD", -1, *statSet_); - parallelExecForEachBlock([&](int64_t blockId, const VectorPtr vecs[]) { BlockInfo& info = blockInfos_[blockId]; const ParameterConfig& config = getParameterConfig(blockId); @@ -1113,7 +940,6 @@ void ParameterServer2::op_SGD(const Operation& operation, } batchId_++; - tuningSgdMidOutput(); } void ParameterServer2::op_start_pass(const Operation& operation, @@ -1146,8 +972,6 @@ void ParameterServer2::op_finish_pass(const Operation& operation, /// finish pass info.optimizer->finishPass(); }); - - tuningSgdFinished(); batchId_ = 0; } @@ -1515,7 +1339,6 @@ void ParameterServer2::asyncFinishPass(const SynchronizeRequest& request, callback(SynchronizeResponse()); if (request.trainer_id() == 0) { - tuningAsyncsgdFinished(); batchId_ = 0; } } @@ -1574,42 +1397,4 @@ void ParameterServer2::releaseMatrix(const ReleaseMatrixRequest& request, callback(response); } -void ParameterServer2::tuningSgdMidOutput() { - if (batchId_ && batchId_ % FLAGS_log_period_server == 0) { - LOG(INFO) << "======== Batch=" << batchId_ << "======="; - statSet_->setThreadInfo(true); - statSet_->printAllStatus(); - /// not reset raw data for reducing the overhead of performance tuning - statSet_->reset(false); - } -} - -void ParameterServer2::tuningSgdFinished() { - LOG(INFO) << "======== Batch=" << batchId_ << " pass END" - << "======="; - statSet_->setThreadInfo(true); - statSet_->printAllStatus(); - /** - * reset raw data at end of pass since some raw data could be not - * complete. Otherwise the raw data will pollute next pass performance - * tuning - */ - statSet_->reset(); -} - -void ParameterServer2::tuningAsyncsgdMidOutput() { -#ifndef PADDLE_DISABLE_TIMER - if (batchId_ && batchId_ % FLAGS_log_period_server == 0) { - LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << "======="; - printAsyncGradientCommitStatAndReset(); - } -#endif -} - -void ParameterServer2::tuningAsyncsgdFinished() { - LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << " pass END" - << "======="; - printAsyncGradientCommitStatAndReset(); -} - } // namespace paddle diff --git a/paddle/pserver/ParameterServer2.h b/paddle/pserver/ParameterServer2.h index 0f5a5895907b20a0cf882b6fa6fb74bd52dce058..f7d3587b88c4ab1d4e37a259c622fc7c2d5532a3 100644 --- a/paddle/pserver/ParameterServer2.h +++ b/paddle/pserver/ParameterServer2.h @@ -298,24 +298,6 @@ protected: /// barrier performance tuning sync-sgd required std::atomic batchId_; - /// the beginning of addGradient without network overhead - ThreadLocal addGradBegin_; - - /** - * tuning barrier performance - * to better control log for sparse and dense parameter, - * we use different log entities for different parameterServer - * objects. - * it will output lots of performance stats to perceive the - * overhead of network, fluctuation of computation from - * forwardbackward and network, computation from optimization - * at pserver end, barrier overhead, etc. to understand tuning - * data, focus on the synchronization between addGradient and - * doOperation which indirectly call op_SGD operation controlled - * by remote updater controller - */ - std::unique_ptr statSet_; - public: struct Buffer { real* base; @@ -325,7 +307,6 @@ public: protected: /// async gradient commit control bool asyncGrdientCommitCheckAndStat(const SendParameterRequest& request); - void printAsyncGradientCommitStatAndReset(); public: /// disable default parameter for overloading @@ -710,36 +691,6 @@ public: void op_load(const Operation& operation, OperationResult* result); void op_save(const Operation& operation, OperationResult* result); - - /** - * @brief output log in at the middle stage of training - * - * @note flush log histroy and state at the end for sgd - */ - void tuningSgdMidOutput(); - - /** - * @brief output log in at the end stage of training - * - * @note flush log histroy and state at the end for sgd. it will also - * flush some stateful stat for next pass. - */ - void tuningSgdFinished(); - - /** - * @brief output log in at the middle stage of training - * - * @note flush log histroy and state at the end for async-sgd. - * it will log some performance log if some lagged node are found - */ - void tuningAsyncsgdMidOutput(); - - /** - * @brief output log in at the end stage of training - * - * @note flush log histroy and state at the end for async-sgd. - */ - void tuningAsyncsgdFinished(); }; } // namespace paddle diff --git a/paddle/pserver/SocketChannel.cpp b/paddle/pserver/SocketChannel.cpp index 05998891649cee30e23e556d9311c3a383f43e10..12e3bc6552fcf26d8ccb32ca43d23142e3aba8e0 100644 --- a/paddle/pserver/SocketChannel.cpp +++ b/paddle/pserver/SocketChannel.cpp @@ -51,7 +51,7 @@ size_t SocketChannel::read(void* buf, size_t size) { else len = rdma::read(rdmaSocket_, (char*)buf + total, size - total); - PCHECK(len >= 0) << " peer=" << peerName_; + CHECK(len >= 0) << " peer=" << peerName_; if (len <= 0) { return total; } @@ -69,7 +69,7 @@ size_t SocketChannel::write(const void* buf, size_t size) { else len = rdma::write(rdmaSocket_, (char*)buf + total, size - total); - PCHECK(len >= 0) << " peer=" << peerName_; + CHECK(len >= 0) << " peer=" << peerName_; if (len <= 0) { return total; } @@ -98,10 +98,10 @@ static size_t readwritev(IOFunc iofunc, while (size < total) { ssize_t len = iofunc(socket, &iovs[curIov], std::min(iovcnt - curIov, maxiovs)); - PCHECK(len > 0) << " peer=" << peerName << " curIov=" << curIov - << " iovCnt=" << iovcnt - << " iovs[curIov].base=" << iovs[curIov].iov_base - << " iovs[curIov].iov_len=" << iovs[curIov].iov_len; + CHECK(len > 0) << " peer=" << peerName << " curIov=" << curIov + << " iovCnt=" << iovcnt + << " iovs[curIov].base=" << iovs[curIov].iov_base + << " iovs[curIov].iov_len=" << iovs[curIov].iov_len; size += len; /// restore iovs[curIov] to the original value @@ -183,7 +183,7 @@ void SocketChannel::writeMessage(const std::vector& userIovs) { header.totalLength += iov.iov_len; } - PCHECK(writev(iovs) == (size_t)header.totalLength); + CHECK(writev(iovs) == (size_t)header.totalLength); } std::unique_ptr SocketChannel::readMessage() { @@ -194,7 +194,7 @@ std::unique_ptr SocketChannel::readMessage() { return nullptr; } - PCHECK(len == sizeof(header)); + CHECK(len == sizeof(header)); std::unique_ptr msgReader(new MsgReader(this, header.numIovs)); @@ -209,7 +209,7 @@ std::unique_ptr SocketChannel::readMessage() { MsgReader::MsgReader(SocketChannel* channel, size_t numBlocks) : channel_(channel), blockLengths_(numBlocks), currentBlockIndex_(0) { size_t size = numBlocks * sizeof(blockLengths_[0]); - PCHECK(channel_->read(&blockLengths_[0], size) == size); + CHECK(channel_->read(&blockLengths_[0], size) == size); } void MsgReader::readBlocks(const std::vector& bufs) { @@ -223,12 +223,12 @@ void MsgReader::readBlocks(const std::vector& bufs) { ++currentBlockIndex_; } - PCHECK(channel_->readv(&iovs) == totalLength); + CHECK(channel_->readv(&iovs) == totalLength); } void MsgReader::readNextBlock(void* buf) { CHECK_LT(currentBlockIndex_, blockLengths_.size()); - PCHECK(channel_->read(buf, getNextBlockLength()) == getNextBlockLength()); + CHECK(channel_->read(buf, getNextBlockLength()) == getNextBlockLength()); ++currentBlockIndex_; } diff --git a/paddle/pserver/test/SocketTest.cpp b/paddle/pserver/test/SocketTest.cpp index 066a6c02939695e7050a7693365d7c449f70e723..6f6c9e596cfb7a2547d5b6c5de69381eb9c29132 100644 --- a/paddle/pserver/test/SocketTest.cpp +++ b/paddle/pserver/test/SocketTest.cpp @@ -113,7 +113,7 @@ void SocketServer::run() { /* First call to socket() function */ socket_ = socket(AF_INET, SOCK_STREAM, 0); - PCHECK(socket_ >= 0) << "ERROR opening socket"; + CHECK(socket_ >= 0) << "ERROR opening socket"; /* Initialize socket structure */ bzero((char*)&serv_addr, sizeof(serv_addr)); @@ -122,7 +122,7 @@ void SocketServer::run() { serv_addr.sin_port = htons(port_); /* Now bind the host address using bind() call.*/ - PCHECK(bind(socket_, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0) + CHECK(bind(socket_, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0) << "ERROR on binding"; /* Now start listening for the clients, here process will @@ -134,7 +134,7 @@ void SocketServer::run() { while (true) { /* Accept actual connection from the client */ newsockfd = accept(socket_, (struct sockaddr*)&cli_addr, &clilen); - PCHECK(newsockfd >= 0) << "ERROR on accept"; + CHECK(newsockfd >= 0) << "ERROR on accept"; SocketWorker* worker = new SocketWorker(newsockfd); worker->start(); @@ -146,17 +146,17 @@ void SocketWorker::run() { while (true) { int64_t n = channel_.readAll(&header, sizeof(header)); - PCHECK(n == sizeof(header)) << "ERROR reading from socket"; + CHECK(n == sizeof(header)) << "ERROR reading from socket"; buffer_.resize(header.dataLength); n = channel_.readAll(&buffer_[0], header.dataLength); - PCHECK(n == header.dataLength) << "ERROR reading from socket"; + CHECK(n == header.dataLength) << "ERROR reading from socket"; /* Write a response to the client */ n = channel_.writeAll(&header, sizeof(header)); - PCHECK(n == sizeof(header)) << "ERROR reading from socket"; + CHECK(n == sizeof(header)) << "ERROR reading from socket"; n = channel_.writeAll(buffer_.data(), buffer_.size()); - PCHECK(n == header.dataLength) << "ERROR writing to socket"; + CHECK(n == header.dataLength) << "ERROR writing to socket"; } } @@ -177,9 +177,9 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) { /* Create a socket point */ int sockfd = socket(AF_INET, SOCK_STREAM, 0); - PCHECK(sockfd >= 0) << "ERROR opening socket"; + CHECK(sockfd >= 0) << "ERROR opening socket"; server = gethostbyname(serverAddr.c_str()); - PCHECK(server) << "ERROR, no such host: " << serverAddr; + CHECK(server) << "ERROR, no such host: " << serverAddr; bzero((char*)&serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; @@ -189,7 +189,7 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) { serv_addr.sin_port = htons(serverPort); /* Now connect to the server */ - PCHECK(connect(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0) + CHECK(connect(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0) << "ERROR connecting"; channel_.reset(new SocketChannel(sockfd)); @@ -234,18 +234,18 @@ int main(int argc, char** argv) { cpuGrad.copyFrom(gpuGrad); header.dataLength = dataSize; - PCHECK(channel->writeAll(&header, sizeof(header)) == sizeof(header)) + CHECK(channel->writeAll(&header, sizeof(header)) == sizeof(header)) << "Client write header error"; - PCHECK(channel->writeAll(cpuGrad.getData(), dataSize) == dataSize) + CHECK(channel->writeAll(cpuGrad.getData(), dataSize) == dataSize) << "Client write data error"; /* Now read server response */ - PCHECK(channel->readAll(&header, sizeof(header)) == sizeof(header)) + CHECK(channel->readAll(&header, sizeof(header)) == sizeof(header)) << "Client read header error"; CHECK_EQ((uint64_t)header.dataLength, dataSize); - PCHECK(channel->readAll(cpuParam.getData(), dataSize) == dataSize) + CHECK(channel->readAll(cpuParam.getData(), dataSize) == dataSize) << "Client read data error"; gpuParam.copyFrom(cpuParam); diff --git a/paddle/scripts/docker/build.sh b/paddle/scripts/docker/build.sh index 1ccee686df4faa930dcaba0d38f7571c9aaadd4a..ab60f1a38dd4cd1d9799c0019dccae5f1c7d4310 100644 --- a/paddle/scripts/docker/build.sh +++ b/paddle/scripts/docker/build.sh @@ -78,7 +78,7 @@ paddle version # PaddlePaddle. This awkwardness is due to # https://github.com/PaddlePaddle/Paddle/issues/1854. It also # describes a solution. -if [ ${WITH_DOC} == "ON" ]; then +if [[ ${WITH_DOC} == "ON" ]]; then cat <getHeight(); for (size_t i = 0; i < sampleNum; ++i) { diff --git a/paddle/utils/BarrierStat.cpp b/paddle/utils/BarrierStat.cpp deleted file mode 100644 index a6dbdcae3f32c894d35e8114488d4a3264c6c5f2..0000000000000000000000000000000000000000 --- a/paddle/utils/BarrierStat.cpp +++ /dev/null @@ -1,340 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/utils/BarrierStat.h" -#include -#include -#include -#include -#include "paddle/utils/Flags.h" -#include "paddle/utils/Stat.h" - -DEFINE_bool(log_barrier_abstract, - true, - "if true, show abstract of barrier performance"); -DEFINE_int32(log_barrier_lowest_nodes, - 5, - "how many lowest node will be logged"); -DEFINE_bool(log_barrier_show_log, - false, // for performance tuning insight - "if true, always show barrier abstract even with little gap"); - -namespace paddle { - -std::ostream &operator<<(std::ostream &output, const BarrierStatBase &stat) { - if (FLAGS_log_barrier_abstract) { - std::lock_guard guard(stat.lock_); - stat.showAbstract(output); - } - return output; -} - -BarrierStatBase::BarrierStatBase(uint16_t numConnThreads, - const std::string &name) - : totSamples_(0), numConnThreads_(numConnThreads), name_(name) { - abstract_.resize(numConnThreads_); - if (FLAGS_log_barrier_show_log) { - rateThreshold_ = 0.0; - } else { - /* probablity of abnormal node - * p = 1/n + (n/8)/(n+1), n = nodes, n > 1 - * if the freq of lowest trainerId larger than p, - * output FLAGS_log_barrier_lowest_nodes lastTrainerId. - * numConnThreads_ indicates nodes - */ - float n = (float)numConnThreads; - rateThreshold_ = 1.0 / n + (n / 8.0) / (n + 1.0); - } -} - -BarrierEndStat::BarrierEndStat(uint16_t numConnThreads, const std::string &name) - : BarrierStatBase(numConnThreads, name) { - timeVector_.reset(new TimeVectorEnd(numConnThreads_)); - reset(true); - LOG(INFO) << " create barrierEndStat: " << name - << " endBarrier warning rate: " << rateThreshold_; -} - -/* - * Note: - * the design different pserver entity owns different statSet to obey - * the background that different pserver runs separately. - */ -void BarrierEndStat::updateStat(struct timeval &cur, int32_t trainerId) { - CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier"; - - std::lock_guard guard(lock_); - timeVector_->addTimeval(cur, trainerId); - - if (timeVector_->full()) { - std::lock_guard abstractGuard(abstractLock_); - auto id = timeVector_->getLastTrainerId(); - auto delta = timeToMicroSecond(timeVector_->getDelta()); - auto secondDelta = timeToMicroSecond(timeVector_->get1NDelta()); - auto lastTwoDelta = timeToMicroSecond(timeVector_->getMinus1NDelta()); - auto midDelta = timeToMicroSecond(timeVector_->getMidNDelta()); - // discard first sample, since first sample probably is abnormal. - if (totSamples_) { - abstract_[id].freq++; - - if (delta < abstract_[id].minDelta) { - abstract_[id].minDelta = delta; - } - if (delta > abstract_[id].maxDelta) { - abstract_[id].maxDelta = delta; - } - abstract_[id].totDelta += delta; - abstract_[id].totSecondDelta += secondDelta; - abstract_[id].totLastTwoDelta += lastTwoDelta; - abstract_[id].totMidDelta += midDelta; - - // update totAbstract_ - totAbstract_.freq++; - if (delta < totAbstract_.minDelta) { - totAbstract_.minDelta = delta; - } - if (delta > totAbstract_.maxDelta) { - totAbstract_.maxDelta = delta; - } - totAbstract_.totDelta += delta; - totAbstract_.totSecondDelta += secondDelta; - totAbstract_.totLastTwoDelta += lastTwoDelta; - totAbstract_.totMidDelta += midDelta; - } - - totSamples_++; - timeVector_->reset(); - } -} - -void BarrierEndStat::reset(bool clearRawData) { - int32_t i = 0; - - totSamples_ = 0; - - std::lock_guard guard(abstractLock_); - - if (clearRawData) { - timeVector_->reset(); - } - - for (auto &abstract : abstract_) { - memset((void *)&abstract, 0, sizeof(abstract)); - abstract.minDelta = UINT64_MAX; - abstract.trainerId = i++; - } - memset((void *)&totAbstract_, 0, sizeof(Abstract)); - totAbstract_.minDelta = UINT64_MAX; -} - -void BarrierEndStat::showAbstract(std::ostream &output) const { - // do not support the case "<=2 pserver" - if (numConnThreads_ <= 2 || !totSamples_) { - return; - } - - // duplicate freq info - std::vector outputAbstract = abstract_; - std::sort(outputAbstract.begin(), - outputAbstract.end(), - [](const struct Abstract &a, const struct Abstract &b) { - return a.freq > b.freq; - }); - - auto rate = (float)outputAbstract[0].freq / (float)totSamples_; - if (rate < rateThreshold_) { - return; - } - - output << std::setw(20) << name_ << std::endl; - - /* - * Note: - * avgGap: the average delta between 1 -- n arriving trainers - * avgSecondGap: the average delta between 2 -- n arriving trainers - * avgLastTwoGap: the average delta between n-1 -- n arriving trainers - * avgMidGap: the average delta between n/2 -- n arriving trainers - * rato: samples / totSamples - * - * the stat is based on per trainer if trainer_id is set, totAbstract is - * stat based on all trainers scope. - */ - output << std::setw(42) << " " << std::setw(15) << "trainerId" - << std::setw(15) << "avgGap" << std::setw(15) << "avgSecondGap" - << std::setw(15) << "avgLastTwoGap" << std::setw(15) << "avgMidGap" - << std::setw(10) << "rate" << std::setw(10) << "samples" - << std::setw(10) << "totSamples" << std::endl; - // show totAbstract, it's valuable when lastTrainerId is even-distributed' - if (!totAbstract_.freq) return; - output << std::setw(42) << " " << std::setw(15) << "totAbstract" - << std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001 - << std::setw(15) - << (totAbstract_.totSecondDelta / totAbstract_.freq) * 0.001 - << std::setw(15) - << (totAbstract_.totLastTwoDelta / totAbstract_.freq) * 0.001 - << std::setw(15) - << (totAbstract_.totMidDelta / totAbstract_.freq) * 0.001 - << std::setw(10) << (float)totAbstract_.freq / (float)totSamples_ - << std::setw(10) << (float)totAbstract_.freq << std::setw(10) - << (float)totSamples_ << std::endl; - - // show lastTrainerId abstract - int count = 0; - for (auto &abstract : outputAbstract) { - if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) { - break; - } - // output format control - output << std::setw(42) << " " << std::setw(15) << abstract.trainerId - << std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001 - << std::setw(15) << (abstract.totSecondDelta / abstract.freq) * 0.001 - << std::setw(15) - << (abstract.totLastTwoDelta / abstract.freq) * 0.001 - << std::setw(15) << (abstract.totMidDelta / abstract.freq) * 0.001 - << std::setw(10) << (float)abstract.freq / (float)totSamples_ - << std::setw(10) << (float)abstract.freq << std::setw(10) - << (float)totSamples_ << std::endl; - } -} - -BarrierDeltaStat::BarrierDeltaStat(uint16_t numConnThreads, - const std::string &name) - : BarrierStatBase(numConnThreads, name) { - timeVector_.reset(new TimeVectorDelta(numConnThreads_)); - reset(true); - LOG(INFO) << " create barrierDeltaStat: " << name - << " barrierDelta warning rate: " << rateThreshold_; -} - -void BarrierDeltaStat::updateStat(uint64_t delta, int32_t trainerId) { - CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier"; - - std::lock_guard guard(lock_); - timeVector_->addTimeval(delta, trainerId); - - if (timeVector_->full()) { - std::lock_guard abstractGuard(abstractLock_); - auto id = timeVector_->getMaxTrainerId(); - auto delta = timeVector_->getDelta(); - // discard first sample, since first sample probably is abnormal. - if (totSamples_) { - abstract_[id].freq++; - - if (delta < abstract_[id].minDelta) { - abstract_[id].minDelta = delta; - } - if (delta > abstract_[id].maxDelta) { - abstract_[id].maxDelta = delta; - } - abstract_[id].totDelta += delta; - - // update totAbstract_ - totAbstract_.freq++; - if (delta < totAbstract_.minDelta) { - totAbstract_.minDelta = delta; - } - if (delta > totAbstract_.maxDelta) { - totAbstract_.maxDelta = delta; - } - totAbstract_.totDelta += delta; - } - - totSamples_++; - timeVector_->reset(); - } -} - -void BarrierDeltaStat::reset(bool clearRawData) { - int32_t i = 0; - - totSamples_ = 0; - - std::lock_guard guard(abstractLock_); - - if (clearRawData) { - timeVector_->reset(); - } - - for (auto &abstract : abstract_) { - memset((void *)&abstract, 0, sizeof(abstract)); - abstract.minDelta = UINT64_MAX; - abstract.trainerId = i++; - } - memset((void *)&totAbstract_, 0, sizeof(Abstract)); - totAbstract_.minDelta = UINT64_MAX; -} - -void BarrierDeltaStat::showAbstract(std::ostream &output) const { - // do not support the case "<=2 pserver" - if (numConnThreads_ <= 2 || !totSamples_) { - return; - } - - // duplicate freq info - std::vector outputAbstract = abstract_; - std::sort(outputAbstract.begin(), - outputAbstract.end(), - [](const struct Abstract &a, const struct Abstract &b) { - return a.freq > b.freq; - }); - - auto rate = (float)outputAbstract[0].freq / (float)totSamples_; - if (rate < rateThreshold_) { - return; - } - - output << std::setw(20) << name_ << std::endl; - - /* Note: - * Gap means the delta from all trainers' forwardbackward - * avgGap: average Gap in log_period batches - * minGap: min Gap in log_period batches - * maxGap: max Gap in log_period batches - * trainerId: the slowest trainer_id - * - * the stat is based on per trainer if trainer_id is set, totAbstract is - * stat based on all trainers scope. - */ - output << std::setw(42) << " " << std::setw(15) << "trainerId" - << std::setw(15) << "avgGap" << std::setw(10) << "minGap" - << std::setw(10) << "maxGap" << std::setw(10) << "rate" - << std::setw(10) << "samples" << std::setw(10) << "totSamples" - << std::endl; - // show totAbstract, it's valuable when lastTrainerId is even-distributed' - if (!totAbstract_.freq) return; - output << std::setw(42) << " " << std::setw(15) << "totAbstract" - << std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001 - << std::setw(10) << totAbstract_.minDelta * 0.001 << std::setw(10) - << totAbstract_.maxDelta * 0.001 << std::setw(10) - << (float)totAbstract_.freq / (float)totSamples_ << std::setw(10) - << (float)totAbstract_.freq << std::setw(10) << (float)totSamples_ - << std::endl; - - // show lastTrainerId abstract - int count = 0; - for (auto &abstract : outputAbstract) { - if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) { - break; - } - // output format control - output << std::setw(42) << " " << std::setw(15) << abstract.trainerId - << std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001 - << std::setw(10) << abstract.minDelta * 0.001 << std::setw(10) - << abstract.maxDelta * 0.001 << std::setw(10) - << (float)abstract.freq / (float)totSamples_ << std::setw(10) - << (float)abstract.freq << std::setw(10) << (float)totSamples_ - << std::endl; - } -} -} // namespace paddle diff --git a/paddle/utils/BarrierStat.h b/paddle/utils/BarrierStat.h deleted file mode 100644 index a9c925eff66838d58d540d7be5476e6207a30bec..0000000000000000000000000000000000000000 --- a/paddle/utils/BarrierStat.h +++ /dev/null @@ -1,425 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "Locks.h" -#include "Logging.h" -#include "ThreadLocal.h" - -namespace paddle { - -inline uint64_t timeToMicroSecond(struct timeval time) { - return time.tv_sec * 1000000LU + time.tv_usec; -} - -class TimeVectorEnd { - /* - * help class for gathering all barrier performance data - * which shows time point property. - * freqently used in barrier performance tuning API, such - * as tuning which is slowest node in sync-sgd mode training. - */ -public: - explicit TimeVectorEnd(uint16_t size) : size_(size) { - index_ = 0; - timeArray_.resize(size); - trainerIds_.resize(size); - } - ~TimeVectorEnd() {} - - uint16_t size() { return size_; } - - bool full() { return index_ == size_; } - - bool empty() { return index_ == 0; } - - void reset() { index_ = 0; } - - void addTimeval(struct timeval time, int32_t trainerId) { - timeArray_[index_] = time; - trainerIds_[index_] = trainerId; - index_++; - } - - struct timeval getDelta() const { - struct timeval delta; - CHECK_GT(size_, 1) << "not support with 1 pserver"; - timersub(&timeArray_[size_ - 1], &timeArray_[0], &delta); - return delta; - } - - /* 2, n delta */ - struct timeval get1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - struct timeval delta; - timersub(&timeArray_[size_ - 1], &timeArray_[1], &delta); - return delta; - } - - /* n-1, n delta */ - struct timeval getMinus1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - struct timeval delta; - timersub(&timeArray_[size_ - 1], &timeArray_[size_ - 2], &delta); - return delta; - } - - /* n/2, n delta */ - struct timeval getMidNDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - struct timeval delta; - timersub(&timeArray_[size_ - 1], &timeArray_[size_ / 2], &delta); - return delta; - } - - int32_t getLastTrainerId() const { return trainerIds_[index_ - 1]; } - -private: - uint16_t size_; - uint16_t index_; - std::vector timeArray_; - std::vector trainerIds_; -}; - -class TimeVectorDelta { - /* - * help class for gathering performance data which shows time - * delta property, such as tuning the time distribution of - * forwardBackward time from all cluster nodes. - */ -public: - explicit TimeVectorDelta(uint16_t size) - : size_(size), min_(UINT64_MAX), max_(0) { - index_ = 0; - timeArray_.resize(size); - } - ~TimeVectorDelta() {} - - uint16_t size() { return size_; } - - bool full() { return index_ == size_; } - - bool empty() { return index_ == 0; } - - void reset() { - index_ = 0; - min_ = UINT64_MAX; - max_ = 0; - } - - void addTimeval(uint64_t delta, int32_t trainerId) { - timeArray_[index_] = delta; - index_++; - if (delta < min_) { - min_ = delta; - } - if (delta > max_) { - max_ = delta; - maxTrainerId_ = trainerId; - } - } - - uint64_t getDelta() const { - CHECK_GT(size_, 1) << "not support with 1 pserver"; - return max_ - min_; - } - - /* 2, n delta */ - uint64_t get1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - LOG(FATAL) << "Not implemented"; - } - - /* n-1, n delta */ - uint64_t getMinus1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - LOG(FATAL) << "Not implemented"; - } - - /* n/2, n delta */ - uint64_t getMidNDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - LOG(FATAL) << "Not implemented"; - } - - int32_t getMaxTrainerId() const { return maxTrainerId_; } - -private: - uint16_t size_; - uint16_t index_; - std::vector timeArray_; - -private: - uint64_t min_; - uint64_t max_; - int32_t maxTrainerId_; -}; - -// total samples stats, us -struct Abstract { - // last trainerId for barrier end, maxDelta trainerId for barrier delta - int32_t trainerId; - uint64_t minDelta; - uint64_t maxDelta; - uint64_t totDelta; - // first one is probably itself, so discard it. - uint64_t totSecondDelta; - // to confirm if last node destroy barrier performance. - uint64_t totLastTwoDelta; - // n/2-n delta - uint64_t totMidDelta; - uint64_t freq; -}; - -// barrier performance tunning stats -class BarrierStatBase { -public: - BarrierStatBase(uint16_t numConnThreads, const std::string &name); - - virtual ~BarrierStatBase() {} - - // if called at pserver end, then trainId means trainer's id. - // by default trainer does not use trainerId, so set it to -1 - virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) = 0; - virtual void updateStat(uint64_t delta, int32_t trainerId = -1) = 0; - - const std::string &getName() { return name_; } - - virtual void reset(bool clearRawData = true) {} - // since the timeVector_ is not stateful, so it's not clear whether the - // the barrier delta is correct. if one timestamp was lost, the all data - // from barrier stat becomes rubbish. -_- - virtual bool checkPassBarrier() { - LOG(INFO) << "bug implementation found"; - return false; - } - -protected: - virtual void showAbstract(std::ostream &output) const {} - friend std::ostream &operator<<(std::ostream &output, - const BarrierStatBase &stat); - -protected: - mutable std::mutex lock_; - std::mutex abstractLock_; // see note on updaterStat - // each freqency for each barrier trainer - std::vector abstract_; - // it is valuable when do perf-tuining, if lastTrainerId acts uniform - // distribution - struct Abstract totAbstract_; - uint64_t totSamples_; - -protected: - uint16_t numConnThreads_; // total updates needed - float rateThreshold_; - std::string name_; -}; - -// the end-time of arriving real/forged barrier position -class BarrierEndStat : public BarrierStatBase { -public: - BarrierEndStat(uint16_t numConnThreads, const std::string &name); - ~BarrierEndStat() {} - - virtual void updateStat(struct timeval &cur, int32_t trainerId = -1); - virtual void updateStat(uint64_t delta, int32_t trainerId = -1) { - LOG(INFO) << "have no delta updateStat in BarrierEndStat"; - } - virtual void reset(bool clearRawData = true); - virtual bool checkPassBarrier() { return timeVector_->empty(); } - -protected: - /* - * LOG: - * readAllBlocks_denseUpdater - * trainerId avgGap avgSecondGap avgLastTwoGap avgMidGap rate - * 44 86.702 81.022 9.984 50.472 0.144737 - * 46 87.723 82.939 8.737 50.019 0.118421 - * 35 100.923 96.752 14.305 61.979 - * 0.0657895 - * log_barrier_abstract, log_barrier_lowest_nodes, log_barrier_threshold - * control details. - */ - virtual void showAbstract(std::ostream &output) const; - -private: - std::unique_ptr timeVector_; -}; - -// the delta-time from different trainers, -// eg, find the degree of imbalance of BP time at pserver end -// the entry value in timerVector_ is BP delta, do evaluation to BP delta. -class BarrierDeltaStat : public BarrierStatBase { -public: - BarrierDeltaStat(uint16_t numConnThreads, const std::string &name); - ~BarrierDeltaStat() {} - - virtual void updateStat(uint64_t delta, int32_t trainerId = -1); - virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) { - LOG(INFO) << "have no timeval updateStat in BarrierDeltaStat"; - } - - virtual void reset(bool clearRawData = true); - - virtual bool checkPassBarrier() { return timeVector_->empty(); } - -protected: - virtual void showAbstract(std::ostream &outPut) const; - -private: - // store delta time in uint64_t, eg BP time of all trainers - std::unique_ptr timeVector_; -}; - -// to distinguish different contexts for same parallel threads, and different -// threads with same code-sgement, just use tagName to tag the run-time -// position. -// in Sparse, sendParallel threads can not only run in the stage of push&pull -// with same thread group, but also run in the stage of pull&push with different -// thread group, tag will be used to distinguish different run-time barrier -// position. -// trainerId in REGISTER_BARRIER_TIMER_SERVER is used to retreive lowest trainer -// nodes. - -// end barrier -#define __REGISTER_BARRIER_TIMER_SERVER( \ - set, statName, numConnThreads, trainerId, ...) \ - do { \ - if (numConnThreads > 2) { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_END); \ - struct timeval cur; \ - gettimeofday(&cur, nullptr); \ - __stat->updateStat(cur, trainerId); \ - } \ - } while (0); - -// end barrier with user-defined timer -#define __REGISTER_BARRIER_TIMER_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) \ - do { \ - if (numConnThreads > 2) { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_END); \ - __stat->updateStat(cur, trainerId); \ - } \ - } while (0); - -// delta barrier -#define __REGISTER_BARRIER_DELTA_SERVER_SET( \ - set, statName, numConnThreads, trainerId, delta, ...) \ - do { \ - if (numConnThreads > 2) { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_DELTA); \ - __stat->updateStat(delta, trainerId); \ - } \ - } while (0); - -// check end barrier -#define __CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \ - do { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_END); \ - PCHECK(__stat->checkPassBarrier()) << internalName \ - << ": invalid barrier data"; \ - } while (0); - -/* - * Note: - * with sync-sgd algriothm in cluster mode, lots of synchronize action exsit at - * pserve end. these synchronizaton actions have impact on the efficiency of - * parameter exchange. the synchronizaton(barrier) GAP is composed of lots of - * factors, such as the forwardBackward variance, network fluncation. we try - * to have a quantitative analysis on these factor, so we design lots of barrier - * time to capture these performance. these barrier also can be placed at - * implict barrier position. - * - * example: - * in sync-sgd algorithm, each parameter server waits for all gradients from - * all trainers, thus, an explict barrier point exsit before doing optimization. - * the barrier timer located before the point can sense the barrier condition. - * - */ - -// try to capture which trainer is slowest node in sync-sgd at pserver. -#define REGISTER_SLOW_NODES_PROBE( \ - set, statName, numConnThreads, trainerId, ...) \ - __REGISTER_BARRIER_TIMER_SERVER( \ - (set), statName, numConnThreads, trainerId, __VA_ARGS__) -// try to check if all threads or trainers have passed barriers for data -// accuracy. -#define CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \ - __CHECK_BARRIER_TIMER((set), statName, numConnThreads, __VA_ARGS__) - -#ifdef PADDLE_DISABLE_TIMER - -#define REGISTER_BARRIER_TIMER_SERVER( \ - set, statName, numConnThreads, trainerId, ...) -#define REGISTER_BARRIER_TIMER_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) -#define REGISTER_BARRIER_DELTA_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) - -#else - -/* - * sensing barrier time distribution for all parallelization threads. - * it provides low API for slow node check(REGISTER_SLOW_NODES_PROBE) - */ -#define REGISTER_BARRIER_TIMER_SERVER( \ - set, statName, numConnThreads, trainerId, ...) \ - __REGISTER_BARRIER_TIMER_SERVER( \ - (set), statName, numConnThreads, trainerId, __VA_ARGS__) - -/* - * sensing barrier time distribution for all parallelization threads. - * but time point for barrier performance is set by user. - * eg, with this api, you can get implict barrier point such as the beginning - * time distribution - * for receiving data. - */ -#define REGISTER_BARRIER_TIMER_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) \ - __REGISTER_BARRIER_TIMER_SERVER_SET( \ - (set), statName, numConnThreads, trainerId, cur, __VA_ARGS__) - -// try to capture time delta from all trainers, such as forwardBackward time -// which implies -// computation fluctuation -#define REGISTER_BARRIER_DELTA_SERVER_SET( \ - set, statName, numConnThreads, trainerId, delta, ...) \ - __REGISTER_BARRIER_DELTA_SERVER_SET( \ - (set), statName, numConnThreads, trainerId, delta, __VA_ARGS__) - -#endif // DISABLE_TIMER -} // namespace paddle diff --git a/paddle/utils/Stat.cpp b/paddle/utils/Stat.cpp index c7194d3bf1271a8bf05379f78adfb18f0f64db29..ff1b1bf888f3915f14752cb89115f7c9ed98d67f 100644 --- a/paddle/utils/Stat.cpp +++ b/paddle/utils/Stat.cpp @@ -97,34 +97,6 @@ std::ostream& operator<<(std::ostream& outPut, const Stat& stat) { return outPut; } -BarrierStatPtr StatSet::getStat(uint16_t numConnThreads, - const std::string& name, - BarrierStatType bType) { - { - ReadLockGuard guard(lock_); - auto it = barrierStatSet_.find(name); - if (it != barrierStatSet_.end()) { - return it->second; - } - } - - std::lock_guard guard(lock_); - // test again with lock_guard - auto it = barrierStatSet_.find(name); - if (it != barrierStatSet_.end()) { - return it->second; - } - - BarrierStatPtr stat; - if (bType == BARRIER_END) { - stat = std::make_shared(numConnThreads, name); - } else if (bType == BARRIER_DELTA) { - stat = std::make_shared(numConnThreads, name); - } - auto ret = barrierStatSet_.insert(std::make_pair(name, stat)); - return ret.first->second; -} - void StatSet::printSegTimerStatus() { ReadLockGuard guard(lock_); LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') @@ -135,46 +107,20 @@ void StatSet::printSegTimerStatus() { } } -void StatSet::printBarrierTimerStatus() { - ReadLockGuard guard(lock_); - if (barrierStatSet_.empty()) { - return; - } - // control barrierAbstact in runtime, so enable compliation - LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') - << "======= BarrierStatSet status ======" << std::endl; - for (auto& stat : barrierStatSet_) { - LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') - << *(stat.second); - } -} - void StatSet::printAllStatus() { #ifndef PADDLE_DISABLE_TIMER printSegTimerStatus(); #endif - printBarrierTimerStatus(); LOG(INFO) << std::setiosflags(std::ios::left) << "--------------------------------------------------" << std::endl; } -void StatSet::printStatus(const std::string& name) { - ReadLockGuard guard(lock_); - auto iter = statSet_.find(name); - CHECK(iter != statSet_.end()) << name << " is not registed in " << name_; - LOG(INFO) << *(iter->second); -} - void StatSet::reset(bool clearRawData) { ReadLockGuard guard(lock_); for (auto& stat : statSet_) { stat.second->reset(); } - // reset barrierStat - for (auto& stat : barrierStatSet_) { - stat.second->reset(clearRawData); - } } void StatSet::setThreadInfo(const std::string& name, bool flag) { @@ -184,13 +130,6 @@ void StatSet::setThreadInfo(const std::string& name, bool flag) { iter->second->setThreadInfo(flag); } -void StatSet::deleteStat(const std::string& name) { - std::lock_guard guard(lock_); - auto iter = statSet_.find(name); - CHECK(iter != statSet_.end()) << name << " is not registed in " << name_; - statSet_.erase(iter); -} - StatInfo::~StatInfo() { if (stat_) { std::lock_guard guard(stat_->lock_); diff --git a/paddle/utils/Stat.h b/paddle/utils/Stat.h index d9cc6e413a7415d9f42508019d435992b93cf47f..79fd3b8cf043e62922dfd046754ee8ac261990c5 100644 --- a/paddle/utils/Stat.h +++ b/paddle/utils/Stat.h @@ -23,7 +23,6 @@ limitations under the License. */ #include #include -#include "BarrierStat.h" #include "Locks.h" #include "Logging.h" #include "ThreadLocal.h" @@ -60,12 +59,6 @@ public: class Stat; typedef std::shared_ptr StatPtr; -typedef std::shared_ptr BarrierStatPtr; - -enum BarrierStatType { - BARRIER_END = 0, - BARRIER_DELTA = 1, -}; class StatSet { public: @@ -74,11 +67,8 @@ public: // print to LOG(INFO) void printSegTimerStatus(); - void printBarrierTimerStatus(); void printAllStatus(); - void printStatus(const std::string& name); - StatPtr getStat(const std::string& name) { { ReadLockGuard guard(lock_); @@ -93,12 +83,6 @@ public: return ret.first->second; } - BarrierStatPtr getStat(uint16_t numConnThreads, - const std::string& name, - BarrierStatType bType); - - void deleteStat(const std::string& name); - // true for showing stats for each thread // false for showing stats aggragated over threads void setThreadInfo(const std::string& name, bool flag); @@ -120,7 +104,6 @@ public: private: std::unordered_map statSet_; - std::unordered_map barrierStatSet_; const std::string name_; RWLock lock_; }; diff --git a/paddle/utils/ThreadLocal.h b/paddle/utils/ThreadLocal.h index a4987c9ec261a2ee57e62d1640e2a21c7f804c99..b5e2862546212041a774599ec664b95e56224a07 100644 --- a/paddle/utils/ThreadLocal.h +++ b/paddle/utils/ThreadLocal.h @@ -51,7 +51,7 @@ template class ThreadLocal { public: ThreadLocal() { - PCHECK(pthread_key_create(&threadSpecificKey_, dataDestructor) == 0); + CHECK(pthread_key_create(&threadSpecificKey_, dataDestructor) == 0); } ~ThreadLocal() { pthread_key_delete(threadSpecificKey_); } @@ -65,7 +65,7 @@ public: if (!p && createLocal) { p = new T(); int ret = pthread_setspecific(threadSpecificKey_, p); - PCHECK(ret == 0); + CHECK(ret == 0); } return p; } @@ -79,7 +79,7 @@ public: if (T* q = get(false)) { dataDestructor(q); } - PCHECK(pthread_setspecific(threadSpecificKey_, p) == 0); + CHECK(pthread_setspecific(threadSpecificKey_, p) == 0); } /** @@ -112,7 +112,7 @@ private: template class ThreadLocalD { public: - ThreadLocalD() { PCHECK(pthread_key_create(&threadSpecificKey_, NULL) == 0); } + ThreadLocalD() { CHECK(pthread_key_create(&threadSpecificKey_, NULL) == 0); } ~ThreadLocalD() { pthread_key_delete(threadSpecificKey_); for (auto t : threadMap_) { @@ -127,7 +127,7 @@ public: T* p = (T*)pthread_getspecific(threadSpecificKey_); if (!p) { p = new T(); - PCHECK(pthread_setspecific(threadSpecificKey_, p) == 0); + CHECK(pthread_setspecific(threadSpecificKey_, p) == 0); updateMap(p); } return p; @@ -141,7 +141,7 @@ public: if (T* q = (T*)pthread_getspecific(threadSpecificKey_)) { dataDestructor(q); } - PCHECK(pthread_setspecific(threadSpecificKey_, p) == 0); + CHECK(pthread_setspecific(threadSpecificKey_, p) == 0); updateMap(p); } diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 361e764e25ba1801bd22f785bc282e51f058aae6..13a1802ee3790b1255fc11f5b2053e3342c61914 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -29,7 +29,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.in add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND ${CMAKE_COMMAND} -E touch ${OUTPUT_DIR}/.timestamp - DEPENDS gen_proto_py ${PY_FILES} ${external_project_dependencies} ${COPY_PADDLE_MASTER}) + DEPENDS gen_proto_py framework_py_proto ${PY_FILES} ${external_project_dependencies} ${COPY_PADDLE_MASTER}) add_custom_target(paddle_python ALL DEPENDS ${OUTPUT_DIR}/.timestamp) @@ -43,6 +43,7 @@ if (WITH_TESTING) add_subdirectory(paddle/v2/tests) add_subdirectory(paddle/v2/reader/tests) add_subdirectory(paddle/v2/plot/tests) + add_subdirectory(paddle/v2/framework/tests) endif() endif() install(DIRECTORY ${PADDLE_PYTHON_PACKAGE_DIR} diff --git a/python/paddle/trainer/config_parser.py b/python/paddle/trainer/config_parser.py index 370529ed97b1f1427ebc088a3031437a7f65e0cf..826ba2834a820d11e69feec5569ef3537194e3c3 100644 --- a/python/paddle/trainer/config_parser.py +++ b/python/paddle/trainer/config_parser.py @@ -1353,7 +1353,8 @@ class LayerBase(object): device=None, active_type="", drop_rate=0., - coeff=None): + coeff=None, + error_clipping_threshold=None): config_assert('@' not in name, "layer name: %s contain special character @" % name) global g_current_submodel @@ -1387,6 +1388,9 @@ class LayerBase(object): elif g_default_device is not None: self.config.device = g_default_device + if error_clipping_threshold is not None: + self.config.error_clipping_threshold = error_clipping_threshold + for input_index in xrange(len(self.inputs)): input = self.inputs[input_index] input_config = None @@ -2466,10 +2470,14 @@ class MaxLayer(LayerBase): trans_type='non-seq', bias=False, output_max_index=None, + stride=-1, **xargs): super(MaxLayer, self).__init__(name, 'max', 0, inputs=inputs, **xargs) config_assert(len(self.inputs) == 1, 'MaxLayer must have 1 input') + if trans_type == 'seq': + config_assert(stride == -1, 'subseq does not support stride window') self.config.trans_type = trans_type + self.config.seq_pool_stride = stride for input_index in xrange(len(self.inputs)): input_layer = self.get_input_layer(input_index) self.set_layer_size(input_layer.size) @@ -2731,11 +2739,15 @@ class AverageLayer(LayerBase): average_strategy='average', trans_type='non-seq', bias=False, + stride=-1, **xargs): super(AverageLayer, self).__init__( name, 'average', 0, inputs=inputs, **xargs) self.config.average_strategy = average_strategy + if trans_type == 'seq': + config_assert(stride == -1, 'subseq does not support stride window') self.config.trans_type = trans_type + self.config.seq_pool_stride = stride config_assert(len(inputs) == 1, 'AverageLayer must have 1 input') for input_index in xrange(len(self.inputs)): input_layer = self.get_input_layer(input_index) @@ -2774,13 +2786,7 @@ class TensorLayer(LayerBase): @config_layer('mixed') class MixedLayer(LayerBase): - def __init__(self, - name, - inputs, - size=0, - bias=True, - error_clipping_threshold=None, - **xargs): + def __init__(self, name, inputs, size=0, bias=True, **xargs): config_assert(inputs, 'inputs cannot be empty') super(MixedLayer, self).__init__( name, 'mixed', size, inputs=inputs, **xargs) @@ -2862,9 +2868,6 @@ class MixedLayer(LayerBase): self.config.bias_size = psize self.create_bias_parameter(bias, psize) - if error_clipping_threshold is not None: - self.config.error_clipping_threshold = error_clipping_threshold - # like MixedLayer, but no bias parameter @config_func diff --git a/python/paddle/trainer_config_helpers/layers.py b/python/paddle/trainer_config_helpers/layers.py index 206de1f8e1c7d3f9f977b4ca97522065c9ed0cab..0a5dd49bb48c25f268aa273314f92c092305664a 100755 --- a/python/paddle/trainer_config_helpers/layers.py +++ b/python/paddle/trainer_config_helpers/layers.py @@ -1246,10 +1246,19 @@ def pooling_layer(input, name=None, bias_attr=None, agg_level=AggregateLevel.TO_NO_SEQUENCE, + stride=-1, layer_attr=None): """ Pooling layer for sequence inputs, not used for Image. + If stride > 0, this layer slides a window whose size is determined by stride, + and return the pooling value of the window as the output. Thus, a long sequence + will be shorten. + + The parameter stride specifies the intervals at which to apply the pooling + operation. Note that for sequence with sub-sequence, the default value + of stride is -1. + The example usage is: .. code-block:: python @@ -1268,6 +1277,8 @@ def pooling_layer(input, :param pooling_type: Type of pooling, MaxPooling(default), AvgPooling, SumPooling, SquareRootNPooling. :type pooling_type: BasePoolingType|None + :param stride: The step size between successive pooling regions. + :type stride: Int :param bias_attr: Bias parameter attribute. False if no bias. :type bias_attr: ParameterAttribute|None|False :param layer_attr: The Extra Attributes for layer, such as dropout. @@ -1285,12 +1296,16 @@ def pooling_layer(input, extra_dict['output_max_index'] = pooling_type.output_max_index extra_dict.update(ExtraLayerAttribute.to_kwargs(layer_attr)) + if agg_level == AggregateLevel.TO_SEQUENCE: + assert stride == -1 + Layer( name=name, type=pooling_type.name, inputs=[Input(input.name)], bias=ParamAttr.to_bias(bias_attr), trans_type=agg_level, + stride=stride, **extra_dict) return LayerOutput( @@ -1552,7 +1567,7 @@ def last_seq(input, :type name: basestring :param input: Input layer name. :type input: LayerOutput - :param stride: window size. + :param stride: The step size between successive pooling regions. :type stride: Int :param layer_attr: extra layer attributes. :type layer_attr: ExtraLayerAttribute. @@ -1608,7 +1623,7 @@ def first_seq(input, :type name: basestring :param input: Input layer name. :type input: LayerOutput - :param stride: window size. + :param stride: The step size between successive pooling regions. :type stride: Int :param layer_attr: extra layer attributes. :type layer_attr: ExtraLayerAttribute. diff --git a/python/paddle/trainer_config_helpers/tests/configs/protostr/test_sequence_pooling.protostr b/python/paddle/trainer_config_helpers/tests/configs/protostr/test_sequence_pooling.protostr index 5a217f5544a8a3b4704b158dfeb92f747b7bd94b..8989561df04a60c906c06432fd857227a3814194 100644 --- a/python/paddle/trainer_config_helpers/tests/configs/protostr/test_sequence_pooling.protostr +++ b/python/paddle/trainer_config_helpers/tests/configs/protostr/test_sequence_pooling.protostr @@ -14,6 +14,7 @@ layers { input_layer_name: "dat_in" } trans_type: "seq" + seq_pool_stride: -1 } layers { name: "__seq_pooling_1__" @@ -24,6 +25,7 @@ layers { input_layer_name: "dat_in" } trans_type: "non-seq" + seq_pool_stride: -1 } layers { name: "__seq_pooling_2__" @@ -35,6 +37,7 @@ layers { } average_strategy: "average" trans_type: "seq" + seq_pool_stride: -1 } layers { name: "__seq_pooling_3__" @@ -46,6 +49,7 @@ layers { } average_strategy: "average" trans_type: "non-seq" + seq_pool_stride: -1 } layers { name: "__seq_pooling_4__" @@ -57,6 +61,7 @@ layers { } average_strategy: "sum" trans_type: "seq" + seq_pool_stride: -1 } layers { name: "__seq_pooling_5__" @@ -68,6 +73,7 @@ layers { } average_strategy: "sum" trans_type: "non-seq" + seq_pool_stride: -1 } layers { name: "__seq_pooling_6__" @@ -77,8 +83,44 @@ layers { inputs { input_layer_name: "dat_in" } + trans_type: "non-seq" + seq_pool_stride: 5 +} +layers { + name: "__seq_pooling_7__" + type: "average" + size: 100 + active_type: "" + inputs { + input_layer_name: "dat_in" + } + average_strategy: "average" + trans_type: "non-seq" + seq_pool_stride: 5 +} +layers { + name: "__seq_pooling_8__" + type: "average" + size: 100 + active_type: "" + inputs { + input_layer_name: "dat_in" + } + average_strategy: "sum" + trans_type: "non-seq" + seq_pool_stride: 5 +} +layers { + name: "__seq_pooling_9__" + type: "max" + size: 100 + active_type: "" + inputs { + input_layer_name: "dat_in" + } output_max_index: true trans_type: "non-seq" + seq_pool_stride: -1 } input_layer_names: "dat_in" output_layer_names: "__seq_pooling_0__" @@ -88,6 +130,9 @@ output_layer_names: "__seq_pooling_3__" output_layer_names: "__seq_pooling_4__" output_layer_names: "__seq_pooling_5__" output_layer_names: "__seq_pooling_6__" +output_layer_names: "__seq_pooling_7__" +output_layer_names: "__seq_pooling_8__" +output_layer_names: "__seq_pooling_9__" sub_models { name: "root" layer_names: "dat_in" @@ -98,6 +143,9 @@ sub_models { layer_names: "__seq_pooling_4__" layer_names: "__seq_pooling_5__" layer_names: "__seq_pooling_6__" + layer_names: "__seq_pooling_7__" + layer_names: "__seq_pooling_8__" + layer_names: "__seq_pooling_9__" input_layer_names: "dat_in" output_layer_names: "__seq_pooling_0__" output_layer_names: "__seq_pooling_1__" @@ -106,6 +154,9 @@ sub_models { output_layer_names: "__seq_pooling_4__" output_layer_names: "__seq_pooling_5__" output_layer_names: "__seq_pooling_6__" + output_layer_names: "__seq_pooling_7__" + output_layer_names: "__seq_pooling_8__" + output_layer_names: "__seq_pooling_9__" is_recurrent_layer_group: false } diff --git a/python/paddle/trainer_config_helpers/tests/configs/test_sequence_pooling.py b/python/paddle/trainer_config_helpers/tests/configs/test_sequence_pooling.py index 3c49eb56c1363a6a3f365fe56e16a8b484c8a004..3c205eabd80492a68383fdbecd14a7d6db3e16eb 100644 --- a/python/paddle/trainer_config_helpers/tests/configs/test_sequence_pooling.py +++ b/python/paddle/trainer_config_helpers/tests/configs/test_sequence_pooling.py @@ -14,6 +14,14 @@ for pt in POOL_TYPE: for al in AGG_LEVEL: opts.append(pooling_layer(input=din, agg_level=al, pooling_type=pt())) +for pt in POOL_TYPE: + opts.append( + pooling_layer( + input=din, + agg_level=AggregateLevel.TO_NO_SEQUENCE, + pooling_type=pt(), + stride=5)) + opts.append( pooling_layer( input=din, pooling_type=MaxPooling(output_max_index=True))) diff --git a/python/paddle/v2/framework/__init__.py b/python/paddle/v2/framework/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c942373c667733f8aabe63026998a8915618130a --- /dev/null +++ b/python/paddle/v2/framework/__init__.py @@ -0,0 +1 @@ +__all__ = ['proto'] diff --git a/python/paddle/v2/framework/tests/CMakeLists.txt b/python/paddle/v2/framework/tests/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..8cb0c5c3765a00b45177117925e320e61a1b609a --- /dev/null +++ b/python/paddle/v2/framework/tests/CMakeLists.txt @@ -0,0 +1 @@ +add_python_test(test_framework test_protobuf.py) diff --git a/python/paddle/v2/framework/tests/test_protobuf.py b/python/paddle/v2/framework/tests/test_protobuf.py new file mode 100644 index 0000000000000000000000000000000000000000..f0e60191991e2f24b0a1972afb0f6cbd3aaa4008 --- /dev/null +++ b/python/paddle/v2/framework/tests/test_protobuf.py @@ -0,0 +1,26 @@ +import paddle.v2.framework.proto.op_proto_pb2 +import paddle.v2.framework.proto.attr_type_pb2 +import unittest + + +class TestFrameworkProto(unittest.TestCase): + def test_all(self): + op_proto_lib = paddle.v2.framework.proto.op_proto_pb2 + attr_type_lib = paddle.v2.framework.proto.attr_type_pb2 + op_proto = op_proto_lib.OpProto() + ipt0 = op_proto.inputs.add() + ipt0.name = "a" + ipt0.comment = "the input of cosine op" + ipt1 = op_proto.inputs.add() + ipt1.name = "b" + ipt1.comment = "the other input of cosine op" + opt = op_proto.outputs.add() + opt.name = "output" + opt.comment = "the output of cosine op" + op_proto.comment = "cosine op, output = scale*cos(a, b)" + attr = op_proto.attrs.add() + attr.name = "scale" + attr.comment = "scale of cosine op" + attr.type = attr_type_lib.FLOAT + op_proto.type = "cos" + self.assertTrue(op_proto.IsInitialized()) diff --git a/python/paddle/v2/master/client.py b/python/paddle/v2/master/client.py index de8e9bb88e1064e41a80e0ef7838e307089a1331..70f9e43c9683033233d48a750668771a4c7ba045 100644 --- a/python/paddle/v2/master/client.py +++ b/python/paddle/v2/master/client.py @@ -26,14 +26,22 @@ class client(object): holder[idx] = c_ptr lib.paddle_set_dataset(self.c, holder, len(paths)) + # return format: (record, errno) + # errno = 0: ok + # < 0: error def next_record(self): p = ctypes.c_char_p() ret = ctypes.pointer(p) size = lib.paddle_next_record(self.c, ret) + if size < 0: + # Error + return None, size + if size == 0: # Empty record - return "" + return "", 0 + record = ret.contents.value[:size] # Memory created from C should be freed. lib.mem_free(ret.contents) - return record + return record, 0 diff --git a/python/paddle/v2/reader/creator.py b/python/paddle/v2/reader/creator.py index 9f888b16d6b2fbf457ee4f4fe94fcb51b6f37fc9..61b5cc134fba875955bdbfddc2bb1e083241940d 100644 --- a/python/paddle/v2/reader/creator.py +++ b/python/paddle/v2/reader/creator.py @@ -57,17 +57,20 @@ def text_file(path): return reader -def recordio(path): +def recordio_local(paths, buf_size=100): """ - Creates a data reader that outputs record one one by one from given recordio file - :path: path of recordio file - :returns: data reader of recordio file + Creates a data reader from given RecordIO file paths separated by ",", + glob pattern is supported. + :path: path of recordio files. + :returns: data reader of recordio files. """ import recordio as rec + import paddle.v2.reader.decorator as dec def reader(): - f = rec.reader(path) + a = ','.join(paths) + f = rec.reader(a) while True: r = f.read() if r is None: @@ -75,4 +78,38 @@ def recordio(path): yield r f.close() + return dec.buffered(reader, buf_size) + + +def recordio(paths, buf_size=100): + """ + Creates a data reader that outputs record one one by one + from given local or cloud recordio path. + :path: path of recordio files. + :returns: data reader of recordio files. + """ + import os + import paddle.v2.master.client as cloud + + if "KUBERNETES_SERVICE_HOST" not in os.environ.keys(): + return recordio_local(paths) + + host_name = "MASTER_SERVICE_HOST" + if host_name not in os.environ.keys(): + raise Exception('not find ' + host_name + ' in environ.') + + addr = os.environ(host) + + def reader(): + c = cloud(addr, buf_size) + c.set_dataset(paths) + + while True: + r, err = client.next_record() + if err < 0: + break + yield r + + c.close() + return reader diff --git a/python/paddle/v2/reader/tests/creator_test.py b/python/paddle/v2/reader/tests/creator_test.py index ba4f558874a0155d276fcb0e0d2d9258f0903f0e..b42d273ecfe6c4bc5706ec52617960b83496d70d 100644 --- a/python/paddle/v2/reader/tests/creator_test.py +++ b/python/paddle/v2/reader/tests/creator_test.py @@ -38,7 +38,7 @@ class TestRecordIO(unittest.TestCase): def test_recordio(self): path = os.path.join( os.path.dirname(__file__), "test_recordio_creator.dat") - reader = paddle.v2.reader.creator.recordio(path) + reader = paddle.v2.reader.creator.recordio([path]) for idx, r in enumerate(reader()): self.assertSequenceEqual(r, str(idx)) diff --git a/python/setup.py.in b/python/setup.py.in index dae01664876a913b49403d3f95001f009721f73b..eeffbfe80e3b4b5473b06f7372addd9870034e77 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -9,7 +9,9 @@ packages=['paddle', 'paddle.v2.dataset', 'paddle.v2.reader', 'paddle.v2.master', - 'paddle.v2.plot'] + 'paddle.v2.plot', + 'paddle.v2.framework', + 'paddle.v2.framework.proto'] setup_requires=["requests", "numpy", @@ -27,8 +29,11 @@ setup(name='paddle', description='Parallel Distributed Deep Learning', install_requires=setup_requires, packages=packages, - package_data={'paddle.v2.master': ['${paddle_master_LIB_NAME}'], }, + package_data={'paddle.v2.master': ['libpaddle_master.so'], }, package_dir={ - '': '${CMAKE_CURRENT_SOURCE_DIR}' + '': '${CMAKE_CURRENT_SOURCE_DIR}', + # The paddle.v2.framework.proto will be generated while compiling. + # So that package points to other directory. + 'paddle.v2.framework.proto': '${CMAKE_BINARY_DIR}/paddle/framework' }, )