提交 d378e0a0 编写于 作者: W wanghaoshuang

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into crop_layer

...@@ -192,9 +192,9 @@ function(cc_test TARGET_NAME) ...@@ -192,9 +192,9 @@ function(cc_test TARGET_NAME)
set(multiValueArgs SRCS DEPS) set(multiValueArgs SRCS DEPS)
cmake_parse_arguments(cc_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) cmake_parse_arguments(cc_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
add_executable(${TARGET_NAME} ${cc_test_SRCS}) add_executable(${TARGET_NAME} ${cc_test_SRCS})
target_link_libraries(${TARGET_NAME} ${cc_test_DEPS} gtest gtest_main) target_link_libraries(${TARGET_NAME} ${cc_test_DEPS} gtest gtest_main -lstdc++ -lm)
add_dependencies(${TARGET_NAME} ${cc_test_DEPS} gtest gtest_main) add_dependencies(${TARGET_NAME} ${cc_test_DEPS} gtest gtest_main)
add_test(${TARGET_NAME} ${TARGET_NAME}) add_test(NAME ${TARGET_NAME} COMMAND ${TARGET_NAME} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
endif() endif()
endfunction(cc_test) endfunction(cc_test)
...@@ -281,10 +281,11 @@ function(go_library TARGET_NAME) ...@@ -281,10 +281,11 @@ function(go_library TARGET_NAME)
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
string(REPLACE "${PADDLE_GO_PATH}/" "" CMAKE_CURRENT_SOURCE_REL_DIR ${CMAKE_CURRENT_SOURCE_DIR}) string(REPLACE "${PADDLE_GO_PATH}/" "" CMAKE_CURRENT_SOURCE_REL_DIR ${CMAKE_CURRENT_SOURCE_DIR})
# FIXME: link path
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
COMMAND rm "${${TARGET_NAME}_LIB_PATH}" COMMAND rm "${${TARGET_NAME}_LIB_PATH}"
# Golang build source code # Golang build source code
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} COMMAND env LIBRARY_PATH=${CMAKE_BINARY_DIR}/go/pserver/client/c/:$ENV{LIBRARY_PATH} GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE}
-o "${${TARGET_NAME}_LIB_PATH}" -o "${${TARGET_NAME}_LIB_PATH}"
"./${CMAKE_CURRENT_SOURCE_REL_DIR}/${GO_SOURCE}" "./${CMAKE_CURRENT_SOURCE_REL_DIR}/${GO_SOURCE}"
# must run under GOPATH # must run under GOPATH
...@@ -299,8 +300,10 @@ function(go_binary TARGET_NAME) ...@@ -299,8 +300,10 @@ function(go_binary TARGET_NAME)
cmake_parse_arguments(go_binary "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) cmake_parse_arguments(go_binary "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
string(REPLACE "${PADDLE_GO_PATH}/" "" CMAKE_CURRENT_SOURCE_REL_DIR ${CMAKE_CURRENT_SOURCE_DIR}) string(REPLACE "${PADDLE_GO_PATH}/" "" CMAKE_CURRENT_SOURCE_REL_DIR ${CMAKE_CURRENT_SOURCE_DIR})
# FIXME: link path
add_custom_command(OUTPUT ${TARGET_NAME}_timestamp add_custom_command(OUTPUT ${TARGET_NAME}_timestamp
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build COMMAND env LIBRARY_PATH=${CMAKE_BINARY_DIR}/go/pserver/client/c/:$ENV{LIBRARY_PATH}
GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build
-o "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}" -o "${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}"
"./${CMAKE_CURRENT_SOURCE_REL_DIR}/${go_binary_SRCS}" "./${CMAKE_CURRENT_SOURCE_REL_DIR}/${go_binary_SRCS}"
WORKING_DIRECTORY "${PADDLE_IN_GOPATH}/go") WORKING_DIRECTORY "${PADDLE_IN_GOPATH}/go")
......
...@@ -12,4 +12,4 @@ ...@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
go_binary(master SRC master.go) go_binary(master SRC master.go DEPS paddle_go_optimizer)
...@@ -12,4 +12,4 @@ ...@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
go_binary(pserver SRCS pserver.go) go_binary(pserver SRCS pserver.go DEPS paddle_go_optimizer)
cc_library(paddle_go_optimizer DEPS paddle_optimizer paddle_proto glog gflags protobuf) cc_library(paddle_go_optimizer DEPS paddle_optimizer paddle_proto glog gflags protobuf)
go_library(paddle_pserver_cclient STATIC DEPS paddle_go_optimizer) go_library(paddle_pserver_cclient STATIC DEPS paddle_go_optimizer)
if(WITH_TESTING) if(WITH_TESTING)
add_subdirectory(test) # TODO: add unit test
#add_subdirectory(test)
endif() endif()
cc_test(test_cclient SRCS test_cclient.c DEPS paddle_pserver_cclient) cc_test(test_cclient SRCS test_cclient.c DEPS paddle_pserver_cclient paddle_go_optimizer)
add_style_check_target(test_cclient test_cclient.c) add_style_check_target(test_cclient test_cclient.c)
package pserver package pserver
// #cgo CFLAGS: -I ../../ // #cgo CFLAGS: -I ../../
// //FIXME: ldflags contain "build" path // #cgo LDFLAGS: -lpaddle_go_optimizer -lstdc++ -lm
// #cgo LDFLAGS: ../../build/go/pserver/client/c/libpaddle_go_optimizer.a -lstdc++ -lm
// #include "paddle/optimizer/optimizer.h" // #include "paddle/optimizer/optimizer.h"
// #include <stdlib.h> // #include <stdlib.h>
// #include <string.h> // #include <string.h>
......
...@@ -66,6 +66,7 @@ SWIG_LINK_LIBRARIES(swig_paddle ...@@ -66,6 +66,7 @@ SWIG_LINK_LIBRARIES(swig_paddle
paddle_trainer_lib paddle_trainer_lib
paddle_network paddle_network
paddle_parameter paddle_parameter
paddle_optimizer
paddle_math paddle_math
paddle_utils paddle_utils
paddle_proto paddle_proto
......
...@@ -25,6 +25,10 @@ namespace paddle { ...@@ -25,6 +25,10 @@ namespace paddle {
* If SequenceLevel = kNonSeq: * If SequenceLevel = kNonSeq:
* Output: output size is the number of input sequences (NOT input instances) * Output: output size is the number of input sequences (NOT input instances)
* output[i] = average_{for each instance in this sequence}{input[i]} * output[i] = average_{for each instance in this sequence}{input[i]}
* If stride_ > 0:
* Output: a shorten sequence. Stride is the step size by which we slide a
* window upon the input sequence, and the average pooling
* operation is then applied to each interval independently.
* If SequenceLevel = kSeq: * If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence * Check input sequence must has sub-sequence
* Output: output size is the number of input sub-sequences * Output: output size is the number of input sub-sequences
......
...@@ -36,6 +36,16 @@ MatrixPtr CrossChannelNormLayer::createSpatialMatrix(MatrixPtr data, ...@@ -36,6 +36,16 @@ MatrixPtr CrossChannelNormLayer::createSpatialMatrix(MatrixPtr data,
data->getData() + iter * spatialDim, 1, spatialDim, false, useGpu_); data->getData() + iter * spatialDim, 1, spatialDim, false, useGpu_);
} }
bool CrossChannelNormLayer::init(const LayerMap& layerMap,
const ParameterMap& parameterMap) {
Layer::init(layerMap, parameterMap);
CHECK(parameters_[0]);
const NormConfig& conf = config_.inputs(0).norm_conf();
channels_ = conf.channels();
scale_.reset(new Weight(channels_, 1, parameters_[0]));
return true;
}
void CrossChannelNormLayer::forward(PassType passType) { void CrossChannelNormLayer::forward(PassType passType) {
Layer::forward(passType); Layer::forward(passType);
MatrixPtr inV = getInputValue(0); MatrixPtr inV = getInputValue(0);
...@@ -51,9 +61,7 @@ void CrossChannelNormLayer::forward(PassType passType) { ...@@ -51,9 +61,7 @@ void CrossChannelNormLayer::forward(PassType passType) {
Matrix::resizeOrCreate(dataBuffer_, batchSize, dataDim, false, useGpu_); Matrix::resizeOrCreate(dataBuffer_, batchSize, dataDim, false, useGpu_);
Matrix::resizeOrCreate(spatialBuffer_, 1, spatialDim, false, useGpu_); Matrix::resizeOrCreate(spatialBuffer_, 1, spatialDim, false, useGpu_);
Matrix::resizeOrCreate(normBuffer_, batchSize, spatialDim, false, useGpu_); Matrix::resizeOrCreate(normBuffer_, batchSize, spatialDim, false, useGpu_);
normBuffer_->zeroMem();
// add eps to avoid overflow
normBuffer_->addScalar(*normBuffer_, 1e-6);
inV->square2(*dataBuffer_); inV->square2(*dataBuffer_);
for (size_t i = 0; i < batchSize; i++) { for (size_t i = 0; i < batchSize; i++) {
const MatrixPtr inVTmp = createSampleMatrix(inV, i, spatialDim); const MatrixPtr inVTmp = createSampleMatrix(inV, i, spatialDim);
...@@ -63,6 +71,8 @@ void CrossChannelNormLayer::forward(PassType passType) { ...@@ -63,6 +71,8 @@ void CrossChannelNormLayer::forward(PassType passType) {
// compute norm. // compute norm.
spatialBuffer_->sumCols(*dataTmp, 1, 0); spatialBuffer_->sumCols(*dataTmp, 1, 0);
// add eps to avoid overflow
spatialBuffer_->add(1e-6);
spatialBuffer_->sqrt2(*spatialBuffer_); spatialBuffer_->sqrt2(*spatialBuffer_);
normTmp->copyFrom(*spatialBuffer_); normTmp->copyFrom(*spatialBuffer_);
outVTmp->copyFrom(*inVTmp); outVTmp->copyFrom(*inVTmp);
...@@ -82,6 +92,9 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) { ...@@ -82,6 +92,9 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) {
size_t dataDim = inG->getWidth(); size_t dataDim = inG->getWidth();
size_t spatialDim = dataDim / channels_; size_t spatialDim = dataDim / channels_;
MatrixPtr inGBuffer;
Matrix::resizeOrCreate(inGBuffer, channels_, spatialDim, false, useGpu_);
dataBuffer_->dotMul(*outG, *outV); dataBuffer_->dotMul(*outG, *outV);
Matrix::resizeOrCreate(scaleDiff_, channels_, 1, false, useGpu_); Matrix::resizeOrCreate(scaleDiff_, channels_, 1, false, useGpu_);
Matrix::resizeOrCreate(channelBuffer_, channels_, 1, false, useGpu_); Matrix::resizeOrCreate(channelBuffer_, channels_, 1, false, useGpu_);
...@@ -100,22 +113,24 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) { ...@@ -100,22 +113,24 @@ void CrossChannelNormLayer::backward(const UpdateCallback& callback) {
scaleDiff_->add(*channelBuffer_, 1.); scaleDiff_->add(*channelBuffer_, 1.);
sampleBuffer_->dotMul(*inVTmp, *outGTmp); sampleBuffer_->dotMul(*inVTmp, *outGTmp);
spatialBuffer_->sumCols(*sampleBuffer_, 1., 1.); spatialBuffer_->sumCols(*sampleBuffer_, 1., 0.);
// scale the grad // scale the grad
inGTmp->copyFrom(*inVTmp); inGBuffer->copyFrom(*inVTmp);
inGTmp->mulRowVector(*spatialBuffer_); inGBuffer->mulRowVector(*spatialBuffer_);
// divide by square of norm // divide by square of norm
spatialBuffer_->dotMul(*normTmp, *normTmp); spatialBuffer_->dotMul(*normTmp, *normTmp);
inGTmp->divRowVector(*spatialBuffer_); inGBuffer->divRowVector(*spatialBuffer_);
// subtract // subtract
inGTmp->add(*outGTmp, -1, 1); inGBuffer->add(*outGTmp, -1, 1);
// divide by norm // divide by norm
inGTmp->divRowVector(*normTmp); inGBuffer->divRowVector(*normTmp);
// scale the diff // scale the diff
inGTmp->mulColVector(*scale_->getW()); inGBuffer->mulColVector(*scale_->getW());
inGTmp->add(*inGBuffer);
} }
// updata scale // updata scale
if (scale_->getWGrad()) scale_->getWGrad()->copyFrom(*scaleDiff_); if (scale_->getWGrad()) scale_->getWGrad()->add(*scaleDiff_);
scale_->getParameterPtr()->incUpdate(callback); scale_->getParameterPtr()->incUpdate(callback);
} }
......
...@@ -26,6 +26,10 @@ namespace paddle { ...@@ -26,6 +26,10 @@ namespace paddle {
* If SequenceLevel = kNonSeq: * If SequenceLevel = kNonSeq:
* Output: output size is the number of input sequences (NOT input instances) * Output: output size is the number of input sequences (NOT input instances)
* output[i] = max_{for each instance in this sequence}{input[i]} * output[i] = max_{for each instance in this sequence}{input[i]}
* If stride_ > 0:
* Output: a shorten sequence. Stride is the step size by which we slide a
* window upon the input sequence, and the max pooling operation is
* then applied to each interval independently.
* If SequenceLevel = kSeq: * If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence * Check input sequence must has sub-sequence
* Output: output size is the number of input sub-sequences * Output: output size is the number of input sub-sequences
......
...@@ -56,14 +56,4 @@ bool ResponseNormLayer::init(const LayerMap& layerMap, ...@@ -56,14 +56,4 @@ bool ResponseNormLayer::init(const LayerMap& layerMap,
return true; return true;
} }
bool CrossChannelNormLayer::init(const LayerMap& layerMap,
const ParameterMap& parameterMap) {
Layer::init(layerMap, parameterMap);
CHECK(parameters_[0]);
const NormConfig& conf = config_.inputs(0).norm_conf();
channels_ = conf.channels();
scale_.reset(new Weight(channels_, 1, parameters_[0]));
return true;
}
} // namespace paddle } // namespace paddle
...@@ -26,10 +26,9 @@ namespace paddle { ...@@ -26,10 +26,9 @@ namespace paddle {
* If SequenceLevel = kNonseq: * If SequenceLevel = kNonseq:
* Output: a sequence containing only the last instance of the input sequence * Output: a sequence containing only the last instance of the input sequence
* If stride_ > 0: * If stride_ > 0:
* Output: a shorten sequence. The operation of getting last instance of a * Output: a shorten sequence. Stride is the step size by which we slide a
* sequence is independently performed on every slice of the input * window upon the input sequence, and getting last instance
* sequence, which is obtained by sliding a window with the window * operation is then applied to each interval independently.
* size set to stride_.
* If SequenceLevel = kSeq: * If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence * Check input sequence must has sub-sequence
* Output: a sequence containing only the last instance of each sub-sequence * Output: a sequence containing only the last instance of each sub-sequence
...@@ -73,8 +72,7 @@ bool SequenceLastInstanceLayer::init(const LayerMap& layerMap, ...@@ -73,8 +72,7 @@ bool SequenceLastInstanceLayer::init(const LayerMap& layerMap,
void SequenceLastInstanceLayer::forward(PassType passType) { void SequenceLastInstanceLayer::forward(PassType passType) {
SequencePoolLayer::forward(passType); SequencePoolLayer::forward(passType);
auto starts = (stride_ > 0) ? stridePositions_->getData() auto starts = startPositions_->getData(false);
: startPositions_->getData(false);
MatrixPtr inputValue = getInputValue(0); MatrixPtr inputValue = getInputValue(0);
MatrixPtr outputValue = getOutputValue(); MatrixPtr outputValue = getOutputValue();
......
...@@ -72,9 +72,8 @@ void SequencePoolLayer::forward(PassType passType) { ...@@ -72,9 +72,8 @@ void SequencePoolLayer::forward(PassType passType) {
if (stride_ > 0) { if (stride_ > 0) {
CHECK_EQ(input.hasSubseq(), 0UL) CHECK_EQ(input.hasSubseq(), 0UL)
<< "sequence stride pooling is invalid for hasSubseq now"; << "sequence stride pooling is invalid for hasSubseq now";
output_.poolSequenceWithStride( output_.poolSequenceWithStride(input, stride_, &startPositions_, reversed_);
input, stride_, &stridePositions_, reversed_); newBatchSize_ = startPositions_->getSize() - 1;
newBatchSize_ = stridePositions_->getSize() - 1;
} }
resetOutput(newBatchSize_, dim); resetOutput(newBatchSize_, dim);
......
...@@ -28,8 +28,9 @@ namespace paddle { ...@@ -28,8 +28,9 @@ namespace paddle {
* sequence}{input[i]} * sequence}{input[i]}
* If stride_ > 0: * If stride_ > 0:
* Check input sequence must not have sub-sequence * Check input sequence must not have sub-sequence
* Output: a shorten sequence, pooling is performed upon a small local * Output: a shorten sequence. Stride is the step size by which we slide
* area * a window upon the input sequence, and the pooling operation
* is then applied to each interval independently.
* If SequenceLevel = kSeq: * If SequenceLevel = kSeq:
* Check input sequence must has sub-sequence * Check input sequence must has sub-sequence
* Output: output size is the number of input sub-sequences * Output: output size is the number of input sub-sequences
...@@ -47,8 +48,6 @@ protected: ...@@ -47,8 +48,6 @@ protected:
size_t newBatchSize_; size_t newBatchSize_;
ICpuGpuVectorPtr startPositions_; ICpuGpuVectorPtr startPositions_;
int stride_; int stride_;
// Store the start position of each window.
IVectorPtr stridePositions_;
// Whether the input sequence is reversed or not. // Whether the input sequence is reversed or not.
bool reversed_ = false; bool reversed_ = false;
......
...@@ -465,7 +465,6 @@ void initTestLayer(TestConfig testConf, ...@@ -465,7 +465,6 @@ void initTestLayer(TestConfig testConf,
ParameterConfig paraConfig) { ParameterConfig paraConfig) {
paraConfig.set_name(paraName); paraConfig.set_name(paraName);
paraConfig.set_size(paraSize); paraConfig.set_size(paraSize);
paraConfig.set_initial_std(1);
paraConfig.set_is_static(isStatic); paraConfig.set_is_static(isStatic);
auto para = auto para =
std::make_shared<Parameter>(paraConfig, FLAGS_use_gpu, initialize); std::make_shared<Parameter>(paraConfig, FLAGS_use_gpu, initialize);
...@@ -499,6 +498,9 @@ void initTestLayer(TestConfig testConf, ...@@ -499,6 +498,9 @@ void initTestLayer(TestConfig testConf,
paraConfig.add_dims((*layerMap)[input.input_layer_name()]->getSize()); paraConfig.add_dims((*layerMap)[input.input_layer_name()]->getSize());
paraConfig.add_dims(testConf.layerConfig.size()); paraConfig.add_dims(testConf.layerConfig.size());
} }
CHECK_GE(testConf.paramInitialStd, 0);
paraConfig.set_initial_mean(testConf.paramInitialMean);
paraConfig.set_initial_std(testConf.paramInitialStd);
initParameter(paraName, paraSize, inputDef.isStatic, false, paraConfig); initParameter(paraName, paraSize, inputDef.isStatic, false, paraConfig);
} }
} }
......
...@@ -125,12 +125,16 @@ struct TestConfig { ...@@ -125,12 +125,16 @@ struct TestConfig {
LayerConfig layerConfig; LayerConfig layerConfig;
std::vector<InputDef> inputDefs; std::vector<InputDef> inputDefs;
size_t biasSize; size_t biasSize;
real paramInitialMean;
real paramInitialStd;
bool testAccumulate; bool testAccumulate;
bool testState; bool testState;
bool staticBias; bool staticBias;
bool testBatchState; bool testBatchState;
TestConfig() TestConfig()
: biasSize(0), : biasSize(0),
paramInitialMean(0.0),
paramInitialStd(1.0),
testAccumulate(true), testAccumulate(true),
testState(false), testState(false),
staticBias(false), staticBias(false),
......
...@@ -845,6 +845,10 @@ void testDegradeLayer(bool hasSubseq, ...@@ -845,6 +845,10 @@ void testDegradeLayer(bool hasSubseq,
TEST(Layer, MaxLayer) { TEST(Layer, MaxLayer) {
testDegradeLayer(false, "max", "non-seq", -1); // seq max to non-seq testDegradeLayer(false, "max", "non-seq", -1); // seq max to non-seq
testDegradeLayer(false,
"max",
"non-seq",
5); // seq max to a shorten seq, stride window = 5
testDegradeLayer(true, "max", "non-seq", -1); // hasSubseq max to non-seq testDegradeLayer(true, "max", "non-seq", -1); // hasSubseq max to non-seq
testDegradeLayer(true, "max", "seq", -1); // hasSubseq max to seq testDegradeLayer(true, "max", "seq", -1); // hasSubseq max to seq
} }
...@@ -868,6 +872,10 @@ TEST(Layer, SequenceLastInstanceLayer) { ...@@ -868,6 +872,10 @@ TEST(Layer, SequenceLastInstanceLayer) {
TEST(Layer, AverageLayer) { TEST(Layer, AverageLayer) {
testDegradeLayer(false, "average", "non-seq", -1); // seq average to non-seq testDegradeLayer(false, "average", "non-seq", -1); // seq average to non-seq
testDegradeLayer(false,
"average",
"non-seq",
5); // seq average to a shorten seq, stride window = 5
testDegradeLayer( testDegradeLayer(
true, "average", "non-seq", -1); // hasSubseq average to non-seq true, "average", "non-seq", -1); // hasSubseq average to non-seq
testDegradeLayer(true, "average", "seq", -1); // hasSubseq average to seq testDegradeLayer(true, "average", "seq", -1); // hasSubseq average to seq
...@@ -1661,6 +1669,8 @@ TEST(Layer, PadLayer) { ...@@ -1661,6 +1669,8 @@ TEST(Layer, PadLayer) {
TEST(Layer, CrossChannelNormLayer) { TEST(Layer, CrossChannelNormLayer) {
TestConfig config; TestConfig config;
config.paramInitialMean = 1.;
config.paramInitialStd = 0.;
config.layerConfig.set_type("norm"); config.layerConfig.set_type("norm");
config.layerConfig.set_size(100); config.layerConfig.set_size(100);
LayerInputConfig* input = config.layerConfig.add_inputs(); LayerInputConfig* input = config.layerConfig.add_inputs();
...@@ -1674,7 +1684,7 @@ TEST(Layer, CrossChannelNormLayer) { ...@@ -1674,7 +1684,7 @@ TEST(Layer, CrossChannelNormLayer) {
config.inputDefs.push_back({INPUT_DATA, "layer_0", 100, 10}); config.inputDefs.push_back({INPUT_DATA, "layer_0", 100, 10});
for (auto useGpu : {false, true}) { for (auto useGpu : {false, true}) {
testLayerGrad(config, "cross-channel-norm", 10, false, useGpu, false, 5); testLayerGrad(config, "cross-channel-norm", 10, false, useGpu, false);
} }
} }
......
...@@ -561,7 +561,7 @@ void Argument::degradeSequence(const Argument& input) { ...@@ -561,7 +561,7 @@ void Argument::degradeSequence(const Argument& input) {
void Argument::poolSequenceWithStride(const Argument& input, void Argument::poolSequenceWithStride(const Argument& input,
size_t stride, size_t stride,
IVectorPtr* stridePostions, ICpuGpuVectorPtr* stridePostions,
bool reversed) { bool reversed) {
// If input.sequenceStartPositions = [0, 9, 14, 17, 30] and stride = 5, // If input.sequenceStartPositions = [0, 9, 14, 17, 30] and stride = 5,
// then sequenceStartPositions = [0, 2, 3, 4, 7]. // then sequenceStartPositions = [0, 2, 3, 4, 7].
...@@ -598,8 +598,8 @@ void Argument::poolSequenceWithStride(const Argument& input, ...@@ -598,8 +598,8 @@ void Argument::poolSequenceWithStride(const Argument& input,
stridePos.emplace_back(starts[numSequences]); stridePos.emplace_back(starts[numSequences]);
int size = stridePos.size(); int size = stridePos.size();
CHECK_EQ(size - 1, tgtBuf[numSequences]); CHECK_EQ(size - 1, tgtBuf[numSequences]);
IVector::resizeOrCreate(*stridePostions, size, false); ICpuGpuVector::resizeOrCreate(*stridePostions, size, false);
(*stridePostions)->copyFrom(stridePos.data(), size); (*stridePostions)->getMutableVector(false)->copyFrom(stridePos.data(), size);
} }
void Argument::getValueString( void Argument::getValueString(
......
...@@ -299,7 +299,7 @@ struct Argument { ...@@ -299,7 +299,7 @@ struct Argument {
*/ */
void poolSequenceWithStride(const Argument& input, void poolSequenceWithStride(const Argument& input,
size_t stride, size_t stride,
IVectorPtr* stridePositions, ICpuGpuVectorPtr* stridePositions,
bool reversed = false); bool reversed = false);
/** /**
* @brief getValueString will return the argument's output in string. There * @brief getValueString will return the argument's output in string. There
......
...@@ -31,7 +31,7 @@ TEST(Argument, poolSequenceWithStride) { ...@@ -31,7 +31,7 @@ TEST(Argument, poolSequenceWithStride) {
int strideResultReversed[] = {0, 4, 9, 14, 17, 20, 25, 30}; int strideResultReversed[] = {0, 4, 9, 14, 17, 20, 25, 30};
for (auto reversed : {false, true}) { for (auto reversed : {false, true}) {
IVectorPtr stridePositions; ICpuGpuVectorPtr stridePositions;
output.poolSequenceWithStride( output.poolSequenceWithStride(
input, 5 /* stride */, &stridePositions, reversed); input, 5 /* stride */, &stridePositions, reversed);
...@@ -45,7 +45,7 @@ TEST(Argument, poolSequenceWithStride) { ...@@ -45,7 +45,7 @@ TEST(Argument, poolSequenceWithStride) {
CHECK_EQ(stridePositions->getSize(), 8UL); CHECK_EQ(stridePositions->getSize(), 8UL);
auto result = reversed ? strideResultReversed : strideResult; auto result = reversed ? strideResultReversed : strideResult;
for (int i = 0; i < 8; i++) { for (int i = 0; i < 8; i++) {
CHECK_EQ(stridePositions->getData()[i], result[i]); CHECK_EQ(stridePositions->getData(false)[i], result[i]);
} }
} }
} }
......
...@@ -172,53 +172,3 @@ TEST_F(CommonTest, syncThreadPool) { ...@@ -172,53 +172,3 @@ TEST_F(CommonTest, syncThreadPool) {
EXPECT_EQ((int)0, nums[i]); EXPECT_EQ((int)0, nums[i]);
} }
} }
TEST_F(CommonTest, barrierStat) {
const int threadNum = 10;
SyncThreadPool pool(threadNum);
#define TEST_BARRIER_RANDOM(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
struct timeval time; \
gettimeofday(&time, nullptr); \
uint64_t usec = timeToMicroSecond(time); \
std::srand(usec); \
auto value = std::rand() % 100000; \
usleep(value); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for (auto i = 0; i < 10; i++) {
TEST_BARRIER_RANDOM("synThreadBarrier1", threadNum);
TEST_BARRIER_RANDOM("synThreadBarrier2", threadNum);
}
globalStat.printAllStatus();
globalStat.reset();
for (auto i = 0; i < 10; i++) {
TEST_BARRIER_RANDOM("synThreadBarrier3", threadNum, "tag0");
TEST_BARRIER_RANDOM("synThreadBarrier4", threadNum, "tag1");
}
globalStat.printAllStatus();
globalStat.reset();
// use it to test accurate barrier gap
#define TEST_BARRIER(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
usleep(tid * 10000); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for (auto i = 0; i < 10; i++) {
TEST_BARRIER("synThreadBarrier3", threadNum, "tag0");
TEST_BARRIER("synThreadBarrier4", threadNum, "tag1");
}
globalStat.printAllStatus();
globalStat.reset();
}
...@@ -142,7 +142,7 @@ SocketServer::SocketServer(const std::string &addr, int port, int rdmaCpu) ...@@ -142,7 +142,7 @@ SocketServer::SocketServer(const std::string &addr, int port, int rdmaCpu)
} }
/// trigger to initialize RDMA lib /// trigger to initialize RDMA lib
PCHECK(RdmaClientDaemons::get()) << "initilizate RDMA failed\n"; CHECK(RdmaClientDaemons::get()) << "initilizate RDMA failed\n";
} }
SocketServer::~SocketServer() { SocketServer::~SocketServer() {
...@@ -168,7 +168,7 @@ void SocketServer::tcpServer() { ...@@ -168,7 +168,7 @@ void SocketServer::tcpServer() {
/// First call to socket() function /// First call to socket() function
socket_ = socket(AF_INET, SOCK_STREAM, 0); socket_ = socket(AF_INET, SOCK_STREAM, 0);
PCHECK(socket_ >= 0) << "ERROR opening socket"; CHECK(socket_ >= 0) << "ERROR opening socket";
/// Initialize socket structure /// Initialize socket structure
bzero((char *)&serv_addr, sizeof(serv_addr)); bzero((char *)&serv_addr, sizeof(serv_addr));
...@@ -176,7 +176,7 @@ void SocketServer::tcpServer() { ...@@ -176,7 +176,7 @@ void SocketServer::tcpServer() {
serv_addr.sin_port = htons(port_); serv_addr.sin_port = htons(port_);
if (!addr_.empty()) { if (!addr_.empty()) {
server = gethostbyname(addr_.c_str()); server = gethostbyname(addr_.c_str());
PCHECK(server) << "ERROR, no such host: " << addr_; CHECK(server) << "ERROR, no such host: " << addr_;
bcopy((char *)server->h_addr, bcopy((char *)server->h_addr,
(char *)&serv_addr.sin_addr.s_addr, (char *)&serv_addr.sin_addr.s_addr,
server->h_length); server->h_length);
...@@ -187,7 +187,7 @@ void SocketServer::tcpServer() { ...@@ -187,7 +187,7 @@ void SocketServer::tcpServer() {
setOption(socket_); setOption(socket_);
/// Now bind the host address using bind() call. /// Now bind the host address using bind() call.
PCHECK(bind(socket_, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) >= 0) CHECK(bind(socket_, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) >= 0)
<< "ERROR on binding " << addr_; << "ERROR on binding " << addr_;
/// Now start listening for the clients, here process will /// Now start listening for the clients, here process will
...@@ -201,7 +201,7 @@ void SocketServer::tcpServer() { ...@@ -201,7 +201,7 @@ void SocketServer::tcpServer() {
if (stopping_) { if (stopping_) {
break; break;
} }
PCHECK(newsockfd >= 0) << "ERROR on accept"; CHECK(newsockfd >= 0) << "ERROR on accept";
constexpr int kPeerNameLen = 128; constexpr int kPeerNameLen = 128;
char peerName[kPeerNameLen]; char peerName[kPeerNameLen];
CHECK(inet_ntop(AF_INET, &cli_addr.sin_addr, peerName, kPeerNameLen)); CHECK(inet_ntop(AF_INET, &cli_addr.sin_addr, peerName, kPeerNameLen));
...@@ -227,14 +227,14 @@ void SocketServer::rdmaServer() { ...@@ -227,14 +227,14 @@ void SocketServer::rdmaServer() {
/// First call to socket() function /// First call to socket() function
rdmaSocket_ = rdma::ssocket(rdmaCpu_); rdmaSocket_ = rdma::ssocket(rdmaCpu_);
PCHECK(rdmaSocket_) << "ERROR opening RDMA socket"; CHECK(rdmaSocket_) << "ERROR opening RDMA socket";
PCHECK(rdma::bind(rdmaSocket_, rdmaUri_.c_str()) == 0) CHECK(rdma::bind(rdmaSocket_, rdmaUri_.c_str()) == 0)
<< "ERROR bind RDMA socket"; << "ERROR bind RDMA socket";
/// Now start listening for the clients, here process will /// Now start listening for the clients, here process will
/// go in sleep mode and will wait for the incoming connection /// go in sleep mode and will wait for the incoming connection
PCHECK(rdma::listen(rdmaSocket_) == 0) << "ERROR listen RDMA socket"; CHECK(rdma::listen(rdmaSocket_) == 0) << "ERROR listen RDMA socket";
while (true) { while (true) {
/// Accept actual connection from the client /// Accept actual connection from the client
...@@ -242,7 +242,7 @@ void SocketServer::rdmaServer() { ...@@ -242,7 +242,7 @@ void SocketServer::rdmaServer() {
if (stopping_) { if (stopping_) {
break; break;
} }
PCHECK(newsock) << "ERROR on accept"; CHECK(newsock) << "ERROR on accept";
constexpr int kPeerNameLen = 128; constexpr int kPeerNameLen = 128;
char peerName[kPeerNameLen]; char peerName[kPeerNameLen];
...@@ -290,7 +290,7 @@ RdmaClientDaemons::RdmaClientDaemons() { ...@@ -290,7 +290,7 @@ RdmaClientDaemons::RdmaClientDaemons() {
onlineCpus_ = rdma::numCpus(); onlineCpus_ = rdma::numCpus();
for (auto i = 0; i < onlineCpus_; i++) { for (auto i = 0; i < onlineCpus_; i++) {
socket = rdma::csocket(i); socket = rdma::csocket(i);
PCHECK(socket) << "ERROR open client socket daemon"; CHECK(socket) << "ERROR open client socket daemon";
rdmaClientSocket_.push_back(socket); rdmaClientSocket_.push_back(socket);
} }
...@@ -355,7 +355,7 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) { ...@@ -355,7 +355,7 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) {
/// Create a socket point /// Create a socket point
int sockfd = socket(AF_INET, SOCK_STREAM, 0); int sockfd = socket(AF_INET, SOCK_STREAM, 0);
PCHECK(sockfd >= 0) << "ERROR opening socket"; CHECK(sockfd >= 0) << "ERROR opening socket";
#if defined(__OSX__) || defined(__APPLE__) #if defined(__OSX__) || defined(__APPLE__)
server = getipnodebyname(serverAddr.c_str(), AF_INET, AI_DEFAULT, &errRet); server = getipnodebyname(serverAddr.c_str(), AF_INET, AI_DEFAULT, &errRet);
...@@ -396,7 +396,7 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) { ...@@ -396,7 +396,7 @@ void SocketClient::TcpClient(const std::string &serverAddr, int serverPort) {
} }
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} else { } else {
PCHECK(errno != 0) << "ERROR connecting to " << serverAddr << ":" CHECK(errno != 0) << "ERROR connecting to " << serverAddr << ":"
<< serverPort << "errorno: " << errno; << serverPort << "errorno: " << errno;
} }
} while (errno == ECONNREFUSED); } while (errno == ECONNREFUSED);
...@@ -426,7 +426,7 @@ void SocketClient::RdmaClient(const std::string &serverAddr, int serverPort) { ...@@ -426,7 +426,7 @@ void SocketClient::RdmaClient(const std::string &serverAddr, int serverPort) {
/// connect to server with socket daemon /// connect to server with socket daemon
sock = rdma::connect(socketDaemon_, rdmaUri.c_str()); sock = rdma::connect(socketDaemon_, rdmaUri.c_str());
PCHECK(sock) << "ERROR connect to server" << rdmaUri; CHECK(sock) << "ERROR connect to server" << rdmaUri;
std::vector<std::string> seg; std::vector<std::string> seg;
str::split(rdmaUri, '/', &seg); str::split(rdmaUri, '/', &seg);
......
...@@ -217,10 +217,6 @@ void ParameterServer2::setConfig(const SetConfigRequest& request, ...@@ -217,10 +217,6 @@ void ParameterServer2::setConfig(const SetConfigRequest& request,
SetConfigResponse response; SetConfigResponse response;
callback(response); callback(response);
/// always defined, barrier slowest node function need it.
statSet_.reset(new StatSet("ParameterServer" +
str::to_string(static_cast<int>(serverId_))));
} }
real bufferSum(const std::vector<ParameterServer2::Buffer>& buffers) { real bufferSum(const std::vector<ParameterServer2::Buffer>& buffers) {
...@@ -369,50 +365,7 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, ...@@ -369,50 +365,7 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
std::vector<Buffer>* outputBuffers) { std::vector<Buffer>* outputBuffers) {
VLOG(1) << "pserver: addGradient"; VLOG(1) << "pserver: addGradient";
// forwardbackward delta from all trainers
// indicate the fluctuation caused by forwardbackward.
if (!numPassFinishClients_) {
REGISTER_BARRIER_DELTA_SERVER_SET(
*statSet_,
"forwardbackwardDelta",
FLAGS_num_gradient_servers,
request.trainer_id(),
request.forwardbackward_time(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
{
/// approximately pure network overhead
REGISTER_TIMER_DYNAMIC_SET(
"pushRecv", timeToMicroSecond(*handleRequestBegin_), -1, *statSet_);
}
#ifndef PADDLE_DISABLE_TIMER
gettimeofday(&(*addGradBegin_), nullptr);
#endif
/// barrier fluctuation caused by network and previous forwardbackward
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER_SET(
*statSet_,
"handleReqBegin",
FLAGS_num_gradient_servers,
request.trainer_id(),
(*handleRequestBegin_),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"addGradBegin",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
{ {
REGISTER_TIMER_DYNAMIC("addGradCore", -1, *statSet_);
ReadLockGuard guard(parameterMutex_); ReadLockGuard guard(parameterMutex_);
int bufferIndex = 0; int bufferIndex = 0;
for (const auto& block : request.blocks()) { for (const auto& block : request.blocks()) {
...@@ -444,15 +397,6 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, ...@@ -444,15 +397,6 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
std::lock_guard<std::mutex> guard(*info.lock); std::lock_guard<std::mutex> guard(*info.lock);
simd::addTo(gradientSumBuffer, gradientBuffer, size); simd::addTo(gradientSumBuffer, gradientBuffer, size);
} }
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"addGradCoreFinish",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
} }
if (request.batch_status() == BATCH_FINISH || if (request.batch_status() == BATCH_FINISH ||
request.batch_status() == BATCH_START_AND_FINISH) { request.batch_status() == BATCH_START_AND_FINISH) {
...@@ -461,47 +405,12 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, ...@@ -461,47 +405,12 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
VLOG(1) << "num samples: " << numSamplesProcessed_ VLOG(1) << "num samples: " << numSamplesProcessed_
<< ", new cost:" << cost_; << ", new cost:" << cost_;
/// numPassFinishClients_ means some trainer has entered finishPass
if (!numPassFinishClients_) {
REGISTER_SLOW_NODES_PROBE(
*statSet_,
"SLOW_NODES",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// notify doOperation gradient ready /// notify doOperation gradient ready
gradientReadyBarrier_.wait(); gradientReadyBarrier_.wait();
/// if wait pass finish does not start, do check
if (!numPassFinishClients_) {
CHECK_BARRIER_TIMER(*statSet_,
"SLOW_NODES",
FLAGS_num_gradient_servers,
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// barrier performance while all parameter add is finished
/// can indicate the fluctation caused by computation at pserver.
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"paraReady",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// wait doOperation finish /// wait doOperation finish
parameterReadyBarrier_.wait(); parameterReadyBarrier_.wait();
VLOG(1) << "start send back"; VLOG(1) << "start send back";
{
/// total time except overhead of network.
REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecvNoSend",
timeToMicroSecond(*addGradBegin_),
-1,
*statSet_);
}
} }
} }
...@@ -543,57 +452,6 @@ bool ParameterServer2::asyncGrdientCommitCheckAndStat( ...@@ -543,57 +452,6 @@ bool ParameterServer2::asyncGrdientCommitCheckAndStat(
return commitGradient; return commitGradient;
} }
void ParameterServer2::printAsyncGradientCommitStatAndReset() {
std::stringstream statFormat;
if (asyncUpdateSteps_) {
statFormat << "async discard gradients stat: " << std::endl;
statFormat << "serverId: " << serverId_
<< " serverType: " << isSparseServer_
<< " total updates: " << asyncUpdateSteps_
<< " discard updates: " << asyncLaggedGradientsNum_
<< " discard ratio: "
<< (real)asyncLaggedGradientsNum_ / (real)asyncUpdateSteps_;
statFormat << std::endl;
statFormat << std::endl;
statFormat << "Async Gradient Update Steps distribution: " << std::endl
<< "Sample: 1:1912(0.00284449) means "
<< "the updates step=1 count 1912 times "
<< "and account for 0.284449% of total updates" << std::endl;
size_t index = 0;
for (const auto& stat : asyncUpdateStat_) {
statFormat << index << ":" << stat << "("
<< (real)stat / (real)asyncUpdateSteps_ << ") ";
index++;
}
statFormat << std::endl;
statFormat << std::endl;
statFormat << "Async Gradient Discard based on trainer_id: " << std::endl
<< "Sample: 2:22(0.0016363) means "
<< "total discarded updates from trainer_id=2 count 22 "
<< "and account for 0.16363% of all updates from trainer_id=2"
<< std::endl;
for (auto i = 0; i < FLAGS_num_gradient_servers; i++) {
real ratio =
(real)asyncTrainerDiscardStat_[i] /
(real)(asyncTrainerCommitStat_[i] + asyncTrainerDiscardStat_[i]);
statFormat << i << ":" << asyncTrainerDiscardStat_[i] << "(" << ratio
<< ")"
<< " ";
}
LOG(INFO) << statFormat.str();
/// reset stat
asyncUpdateSteps_ = 0;
asyncTrainerSteps_.assign(asyncTrainerSteps_.size(), 0);
asyncLaggedGradientsNum_ = 0;
asyncUpdateStat_.assign(asyncUpdateStat_.size(), 0);
asyncTrainerDiscardStat_.assign(asyncTrainerDiscardStat_.size(), 0);
asyncTrainerCommitStat_.assign(asyncTrainerCommitStat_.size(), 0);
}
}
static ThreadLocal<std::vector<bool>> localBlockBitset_; static ThreadLocal<std::vector<bool>> localBlockBitset_;
void ParameterServer2::asyncSGD(const SendParameterRequest& request, void ParameterServer2::asyncSGD(const SendParameterRequest& request,
...@@ -695,7 +553,6 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request, ...@@ -695,7 +553,6 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request,
if (request.trainer_id() == 0) { if (request.trainer_id() == 0) {
/// batchId_ is approximately equal to "real batchId_" /// batchId_ is approximately equal to "real batchId_"
batchId_++; batchId_++;
tuningAsyncsgdMidOutput();
} }
} }
...@@ -881,34 +738,6 @@ void ParameterServer2::sendParameter(const SendParameterRequest& request, ...@@ -881,34 +738,6 @@ void ParameterServer2::sendParameter(const SendParameterRequest& request,
} }
(*requestVec_).clear(); (*requestVec_).clear();
(*callbackVec_).clear(); (*callbackVec_).clear();
/// barrier perfromance while all data are send finished.
/// indicates network flucatuation for big message.
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"sendParamFinish",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// all time exhausted in parameterServer for big message.
/// it contains network and computation at pserver.
{
/// total time including overhead of network.
REGISTER_TIMER_DYNAMIC_SET("sendParaTotal",
timeToMicroSecond(*handleRequestBegin_),
-1,
*statSet_);
}
/// all time exhausted in pserverServer except recieve network.
{
/// total time except overhead of network receive
REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecv",
timeToMicroSecond(*addGradBegin_),
-1,
*statSet_);
}
} }
break; break;
case PSERVER_UPDATE_MODE_SET_PARAM: case PSERVER_UPDATE_MODE_SET_PARAM:
...@@ -1088,8 +917,6 @@ void ParameterServer2::op_SGD(const Operation& operation, ...@@ -1088,8 +917,6 @@ void ParameterServer2::op_SGD(const Operation& operation,
} }
{ {
REGISTER_TIMER_DYNAMIC("op_SGD", -1, *statSet_);
parallelExecForEachBlock([&](int64_t blockId, const VectorPtr vecs[]) { parallelExecForEachBlock([&](int64_t blockId, const VectorPtr vecs[]) {
BlockInfo& info = blockInfos_[blockId]; BlockInfo& info = blockInfos_[blockId];
const ParameterConfig& config = getParameterConfig(blockId); const ParameterConfig& config = getParameterConfig(blockId);
...@@ -1113,7 +940,6 @@ void ParameterServer2::op_SGD(const Operation& operation, ...@@ -1113,7 +940,6 @@ void ParameterServer2::op_SGD(const Operation& operation,
} }
batchId_++; batchId_++;
tuningSgdMidOutput();
} }
void ParameterServer2::op_start_pass(const Operation& operation, void ParameterServer2::op_start_pass(const Operation& operation,
...@@ -1146,8 +972,6 @@ void ParameterServer2::op_finish_pass(const Operation& operation, ...@@ -1146,8 +972,6 @@ void ParameterServer2::op_finish_pass(const Operation& operation,
/// finish pass /// finish pass
info.optimizer->finishPass(); info.optimizer->finishPass();
}); });
tuningSgdFinished();
batchId_ = 0; batchId_ = 0;
} }
...@@ -1515,7 +1339,6 @@ void ParameterServer2::asyncFinishPass(const SynchronizeRequest& request, ...@@ -1515,7 +1339,6 @@ void ParameterServer2::asyncFinishPass(const SynchronizeRequest& request,
callback(SynchronizeResponse()); callback(SynchronizeResponse());
if (request.trainer_id() == 0) { if (request.trainer_id() == 0) {
tuningAsyncsgdFinished();
batchId_ = 0; batchId_ = 0;
} }
} }
...@@ -1574,42 +1397,4 @@ void ParameterServer2::releaseMatrix(const ReleaseMatrixRequest& request, ...@@ -1574,42 +1397,4 @@ void ParameterServer2::releaseMatrix(const ReleaseMatrixRequest& request,
callback(response); callback(response);
} }
void ParameterServer2::tuningSgdMidOutput() {
if (batchId_ && batchId_ % FLAGS_log_period_server == 0) {
LOG(INFO) << "======== Batch=" << batchId_ << "=======";
statSet_->setThreadInfo(true);
statSet_->printAllStatus();
/// not reset raw data for reducing the overhead of performance tuning
statSet_->reset(false);
}
}
void ParameterServer2::tuningSgdFinished() {
LOG(INFO) << "======== Batch=" << batchId_ << " pass END"
<< "=======";
statSet_->setThreadInfo(true);
statSet_->printAllStatus();
/**
* reset raw data at end of pass since some raw data could be not
* complete. Otherwise the raw data will pollute next pass performance
* tuning
*/
statSet_->reset();
}
void ParameterServer2::tuningAsyncsgdMidOutput() {
#ifndef PADDLE_DISABLE_TIMER
if (batchId_ && batchId_ % FLAGS_log_period_server == 0) {
LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << "=======";
printAsyncGradientCommitStatAndReset();
}
#endif
}
void ParameterServer2::tuningAsyncsgdFinished() {
LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << " pass END"
<< "=======";
printAsyncGradientCommitStatAndReset();
}
} // namespace paddle } // namespace paddle
...@@ -298,24 +298,6 @@ protected: ...@@ -298,24 +298,6 @@ protected:
/// barrier performance tuning sync-sgd required /// barrier performance tuning sync-sgd required
std::atomic<int64_t> batchId_; std::atomic<int64_t> batchId_;
/// the beginning of addGradient without network overhead
ThreadLocal<struct timeval> addGradBegin_;
/**
* tuning barrier performance
* to better control log for sparse and dense parameter,
* we use different log entities for different parameterServer
* objects.
* it will output lots of performance stats to perceive the
* overhead of network, fluctuation of computation from
* forwardbackward and network, computation from optimization
* at pserver end, barrier overhead, etc. to understand tuning
* data, focus on the synchronization between addGradient and
* doOperation which indirectly call op_SGD operation controlled
* by remote updater controller
*/
std::unique_ptr<StatSet> statSet_;
public: public:
struct Buffer { struct Buffer {
real* base; real* base;
...@@ -325,7 +307,6 @@ public: ...@@ -325,7 +307,6 @@ public:
protected: protected:
/// async gradient commit control /// async gradient commit control
bool asyncGrdientCommitCheckAndStat(const SendParameterRequest& request); bool asyncGrdientCommitCheckAndStat(const SendParameterRequest& request);
void printAsyncGradientCommitStatAndReset();
public: public:
/// disable default parameter for overloading /// disable default parameter for overloading
...@@ -710,36 +691,6 @@ public: ...@@ -710,36 +691,6 @@ public:
void op_load(const Operation& operation, OperationResult* result); void op_load(const Operation& operation, OperationResult* result);
void op_save(const Operation& operation, OperationResult* result); void op_save(const Operation& operation, OperationResult* result);
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for sgd
*/
void tuningSgdMidOutput();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for sgd. it will also
* flush some stateful stat for next pass.
*/
void tuningSgdFinished();
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for async-sgd.
* it will log some performance log if some lagged node are found
*/
void tuningAsyncsgdMidOutput();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for async-sgd.
*/
void tuningAsyncsgdFinished();
}; };
} // namespace paddle } // namespace paddle
...@@ -51,7 +51,7 @@ size_t SocketChannel::read(void* buf, size_t size) { ...@@ -51,7 +51,7 @@ size_t SocketChannel::read(void* buf, size_t size) {
else else
len = rdma::read(rdmaSocket_, (char*)buf + total, size - total); len = rdma::read(rdmaSocket_, (char*)buf + total, size - total);
PCHECK(len >= 0) << " peer=" << peerName_; CHECK(len >= 0) << " peer=" << peerName_;
if (len <= 0) { if (len <= 0) {
return total; return total;
} }
...@@ -69,7 +69,7 @@ size_t SocketChannel::write(const void* buf, size_t size) { ...@@ -69,7 +69,7 @@ size_t SocketChannel::write(const void* buf, size_t size) {
else else
len = rdma::write(rdmaSocket_, (char*)buf + total, size - total); len = rdma::write(rdmaSocket_, (char*)buf + total, size - total);
PCHECK(len >= 0) << " peer=" << peerName_; CHECK(len >= 0) << " peer=" << peerName_;
if (len <= 0) { if (len <= 0) {
return total; return total;
} }
...@@ -98,7 +98,7 @@ static size_t readwritev(IOFunc iofunc, ...@@ -98,7 +98,7 @@ static size_t readwritev(IOFunc iofunc,
while (size < total) { while (size < total) {
ssize_t len = ssize_t len =
iofunc(socket, &iovs[curIov], std::min(iovcnt - curIov, maxiovs)); iofunc(socket, &iovs[curIov], std::min(iovcnt - curIov, maxiovs));
PCHECK(len > 0) << " peer=" << peerName << " curIov=" << curIov CHECK(len > 0) << " peer=" << peerName << " curIov=" << curIov
<< " iovCnt=" << iovcnt << " iovCnt=" << iovcnt
<< " iovs[curIov].base=" << iovs[curIov].iov_base << " iovs[curIov].base=" << iovs[curIov].iov_base
<< " iovs[curIov].iov_len=" << iovs[curIov].iov_len; << " iovs[curIov].iov_len=" << iovs[curIov].iov_len;
...@@ -183,7 +183,7 @@ void SocketChannel::writeMessage(const std::vector<struct iovec>& userIovs) { ...@@ -183,7 +183,7 @@ void SocketChannel::writeMessage(const std::vector<struct iovec>& userIovs) {
header.totalLength += iov.iov_len; header.totalLength += iov.iov_len;
} }
PCHECK(writev(iovs) == (size_t)header.totalLength); CHECK(writev(iovs) == (size_t)header.totalLength);
} }
std::unique_ptr<MsgReader> SocketChannel::readMessage() { std::unique_ptr<MsgReader> SocketChannel::readMessage() {
...@@ -194,7 +194,7 @@ std::unique_ptr<MsgReader> SocketChannel::readMessage() { ...@@ -194,7 +194,7 @@ std::unique_ptr<MsgReader> SocketChannel::readMessage() {
return nullptr; return nullptr;
} }
PCHECK(len == sizeof(header)); CHECK(len == sizeof(header));
std::unique_ptr<MsgReader> msgReader(new MsgReader(this, header.numIovs)); std::unique_ptr<MsgReader> msgReader(new MsgReader(this, header.numIovs));
...@@ -209,7 +209,7 @@ std::unique_ptr<MsgReader> SocketChannel::readMessage() { ...@@ -209,7 +209,7 @@ std::unique_ptr<MsgReader> SocketChannel::readMessage() {
MsgReader::MsgReader(SocketChannel* channel, size_t numBlocks) MsgReader::MsgReader(SocketChannel* channel, size_t numBlocks)
: channel_(channel), blockLengths_(numBlocks), currentBlockIndex_(0) { : channel_(channel), blockLengths_(numBlocks), currentBlockIndex_(0) {
size_t size = numBlocks * sizeof(blockLengths_[0]); size_t size = numBlocks * sizeof(blockLengths_[0]);
PCHECK(channel_->read(&blockLengths_[0], size) == size); CHECK(channel_->read(&blockLengths_[0], size) == size);
} }
void MsgReader::readBlocks(const std::vector<void*>& bufs) { void MsgReader::readBlocks(const std::vector<void*>& bufs) {
...@@ -223,12 +223,12 @@ void MsgReader::readBlocks(const std::vector<void*>& bufs) { ...@@ -223,12 +223,12 @@ void MsgReader::readBlocks(const std::vector<void*>& bufs) {
++currentBlockIndex_; ++currentBlockIndex_;
} }
PCHECK(channel_->readv(&iovs) == totalLength); CHECK(channel_->readv(&iovs) == totalLength);
} }
void MsgReader::readNextBlock(void* buf) { void MsgReader::readNextBlock(void* buf) {
CHECK_LT(currentBlockIndex_, blockLengths_.size()); CHECK_LT(currentBlockIndex_, blockLengths_.size());
PCHECK(channel_->read(buf, getNextBlockLength()) == getNextBlockLength()); CHECK(channel_->read(buf, getNextBlockLength()) == getNextBlockLength());
++currentBlockIndex_; ++currentBlockIndex_;
} }
......
...@@ -113,7 +113,7 @@ void SocketServer::run() { ...@@ -113,7 +113,7 @@ void SocketServer::run() {
/* First call to socket() function */ /* First call to socket() function */
socket_ = socket(AF_INET, SOCK_STREAM, 0); socket_ = socket(AF_INET, SOCK_STREAM, 0);
PCHECK(socket_ >= 0) << "ERROR opening socket"; CHECK(socket_ >= 0) << "ERROR opening socket";
/* Initialize socket structure */ /* Initialize socket structure */
bzero((char*)&serv_addr, sizeof(serv_addr)); bzero((char*)&serv_addr, sizeof(serv_addr));
...@@ -122,7 +122,7 @@ void SocketServer::run() { ...@@ -122,7 +122,7 @@ void SocketServer::run() {
serv_addr.sin_port = htons(port_); serv_addr.sin_port = htons(port_);
/* Now bind the host address using bind() call.*/ /* Now bind the host address using bind() call.*/
PCHECK(bind(socket_, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0) CHECK(bind(socket_, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0)
<< "ERROR on binding"; << "ERROR on binding";
/* Now start listening for the clients, here process will /* Now start listening for the clients, here process will
...@@ -134,7 +134,7 @@ void SocketServer::run() { ...@@ -134,7 +134,7 @@ void SocketServer::run() {
while (true) { while (true) {
/* Accept actual connection from the client */ /* Accept actual connection from the client */
newsockfd = accept(socket_, (struct sockaddr*)&cli_addr, &clilen); newsockfd = accept(socket_, (struct sockaddr*)&cli_addr, &clilen);
PCHECK(newsockfd >= 0) << "ERROR on accept"; CHECK(newsockfd >= 0) << "ERROR on accept";
SocketWorker* worker = new SocketWorker(newsockfd); SocketWorker* worker = new SocketWorker(newsockfd);
worker->start(); worker->start();
...@@ -146,17 +146,17 @@ void SocketWorker::run() { ...@@ -146,17 +146,17 @@ void SocketWorker::run() {
while (true) { while (true) {
int64_t n = channel_.readAll(&header, sizeof(header)); int64_t n = channel_.readAll(&header, sizeof(header));
PCHECK(n == sizeof(header)) << "ERROR reading from socket"; CHECK(n == sizeof(header)) << "ERROR reading from socket";
buffer_.resize(header.dataLength); buffer_.resize(header.dataLength);
n = channel_.readAll(&buffer_[0], header.dataLength); n = channel_.readAll(&buffer_[0], header.dataLength);
PCHECK(n == header.dataLength) << "ERROR reading from socket"; CHECK(n == header.dataLength) << "ERROR reading from socket";
/* Write a response to the client */ /* Write a response to the client */
n = channel_.writeAll(&header, sizeof(header)); n = channel_.writeAll(&header, sizeof(header));
PCHECK(n == sizeof(header)) << "ERROR reading from socket"; CHECK(n == sizeof(header)) << "ERROR reading from socket";
n = channel_.writeAll(buffer_.data(), buffer_.size()); n = channel_.writeAll(buffer_.data(), buffer_.size());
PCHECK(n == header.dataLength) << "ERROR writing to socket"; CHECK(n == header.dataLength) << "ERROR writing to socket";
} }
} }
...@@ -177,9 +177,9 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) { ...@@ -177,9 +177,9 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) {
/* Create a socket point */ /* Create a socket point */
int sockfd = socket(AF_INET, SOCK_STREAM, 0); int sockfd = socket(AF_INET, SOCK_STREAM, 0);
PCHECK(sockfd >= 0) << "ERROR opening socket"; CHECK(sockfd >= 0) << "ERROR opening socket";
server = gethostbyname(serverAddr.c_str()); server = gethostbyname(serverAddr.c_str());
PCHECK(server) << "ERROR, no such host: " << serverAddr; CHECK(server) << "ERROR, no such host: " << serverAddr;
bzero((char*)&serv_addr, sizeof(serv_addr)); bzero((char*)&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
...@@ -189,7 +189,7 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) { ...@@ -189,7 +189,7 @@ SocketClient::SocketClient(const std::string& serverAddr, int serverPort) {
serv_addr.sin_port = htons(serverPort); serv_addr.sin_port = htons(serverPort);
/* Now connect to the server */ /* Now connect to the server */
PCHECK(connect(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0) CHECK(connect(sockfd, (sockaddr*)&serv_addr, sizeof(serv_addr)) >= 0)
<< "ERROR connecting"; << "ERROR connecting";
channel_.reset(new SocketChannel(sockfd)); channel_.reset(new SocketChannel(sockfd));
...@@ -234,18 +234,18 @@ int main(int argc, char** argv) { ...@@ -234,18 +234,18 @@ int main(int argc, char** argv) {
cpuGrad.copyFrom(gpuGrad); cpuGrad.copyFrom(gpuGrad);
header.dataLength = dataSize; header.dataLength = dataSize;
PCHECK(channel->writeAll(&header, sizeof(header)) == sizeof(header)) CHECK(channel->writeAll(&header, sizeof(header)) == sizeof(header))
<< "Client write header error"; << "Client write header error";
PCHECK(channel->writeAll(cpuGrad.getData(), dataSize) == dataSize) CHECK(channel->writeAll(cpuGrad.getData(), dataSize) == dataSize)
<< "Client write data error"; << "Client write data error";
/* Now read server response */ /* Now read server response */
PCHECK(channel->readAll(&header, sizeof(header)) == sizeof(header)) CHECK(channel->readAll(&header, sizeof(header)) == sizeof(header))
<< "Client read header error"; << "Client read header error";
CHECK_EQ((uint64_t)header.dataLength, dataSize); CHECK_EQ((uint64_t)header.dataLength, dataSize);
PCHECK(channel->readAll(cpuParam.getData(), dataSize) == dataSize) CHECK(channel->readAll(cpuParam.getData(), dataSize) == dataSize)
<< "Client read data error"; << "Client read data error";
gpuParam.copyFrom(cpuParam); gpuParam.copyFrom(cpuParam);
......
...@@ -78,7 +78,7 @@ paddle version ...@@ -78,7 +78,7 @@ paddle version
# PaddlePaddle. This awkwardness is due to # PaddlePaddle. This awkwardness is due to
# https://github.com/PaddlePaddle/Paddle/issues/1854. It also # https://github.com/PaddlePaddle/Paddle/issues/1854. It also
# describes a solution. # describes a solution.
if [ ${WITH_DOC} == "ON" ]; then if [[ ${WITH_DOC} == "ON" ]]; then
cat <<EOF cat <<EOF
======================================== ========================================
Building documentation ... Building documentation ...
......
...@@ -175,7 +175,7 @@ real Tester::forwardOneBatch(const DataBatch& dataBatch, ...@@ -175,7 +175,7 @@ real Tester::forwardOneBatch(const DataBatch& dataBatch,
} }
hl_stream_synchronize(HPPL_STREAM_DEFAULT); hl_stream_synchronize(HPPL_STREAM_DEFAULT);
FILE* fp = fopen(featFile.c_str(), "ab+"); FILE* fp = fopen(featFile.c_str(), "ab+");
PCHECK(!ferror(fp)) << "Fail to open " << featFile; CHECK(!ferror(fp)) << "Fail to open " << featFile;
size_t sampleNum = featMatrices[0]->getHeight(); size_t sampleNum = featMatrices[0]->getHeight();
for (size_t i = 0; i < sampleNum; ++i) { for (size_t i = 0; i < sampleNum; ++i) {
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/utils/BarrierStat.h"
#include <string.h>
#include <sys/types.h>
#include <algorithm>
#include <iomanip>
#include "paddle/utils/Flags.h"
#include "paddle/utils/Stat.h"
DEFINE_bool(log_barrier_abstract,
true,
"if true, show abstract of barrier performance");
DEFINE_int32(log_barrier_lowest_nodes,
5,
"how many lowest node will be logged");
DEFINE_bool(log_barrier_show_log,
false, // for performance tuning insight
"if true, always show barrier abstract even with little gap");
namespace paddle {
std::ostream &operator<<(std::ostream &output, const BarrierStatBase &stat) {
if (FLAGS_log_barrier_abstract) {
std::lock_guard<std::mutex> guard(stat.lock_);
stat.showAbstract(output);
}
return output;
}
BarrierStatBase::BarrierStatBase(uint16_t numConnThreads,
const std::string &name)
: totSamples_(0), numConnThreads_(numConnThreads), name_(name) {
abstract_.resize(numConnThreads_);
if (FLAGS_log_barrier_show_log) {
rateThreshold_ = 0.0;
} else {
/* probablity of abnormal node
* p = 1/n + (n/8)/(n+1), n = nodes, n > 1
* if the freq of lowest trainerId larger than p,
* output FLAGS_log_barrier_lowest_nodes lastTrainerId.
* numConnThreads_ indicates nodes
*/
float n = (float)numConnThreads;
rateThreshold_ = 1.0 / n + (n / 8.0) / (n + 1.0);
}
}
BarrierEndStat::BarrierEndStat(uint16_t numConnThreads, const std::string &name)
: BarrierStatBase(numConnThreads, name) {
timeVector_.reset(new TimeVectorEnd(numConnThreads_));
reset(true);
LOG(INFO) << " create barrierEndStat: " << name
<< " endBarrier warning rate: " << rateThreshold_;
}
/*
* Note:
* the design different pserver entity owns different statSet to obey
* the background that different pserver runs separately.
*/
void BarrierEndStat::updateStat(struct timeval &cur, int32_t trainerId) {
CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier";
std::lock_guard<std::mutex> guard(lock_);
timeVector_->addTimeval(cur, trainerId);
if (timeVector_->full()) {
std::lock_guard<std::mutex> abstractGuard(abstractLock_);
auto id = timeVector_->getLastTrainerId();
auto delta = timeToMicroSecond(timeVector_->getDelta());
auto secondDelta = timeToMicroSecond(timeVector_->get1NDelta());
auto lastTwoDelta = timeToMicroSecond(timeVector_->getMinus1NDelta());
auto midDelta = timeToMicroSecond(timeVector_->getMidNDelta());
// discard first sample, since first sample probably is abnormal.
if (totSamples_) {
abstract_[id].freq++;
if (delta < abstract_[id].minDelta) {
abstract_[id].minDelta = delta;
}
if (delta > abstract_[id].maxDelta) {
abstract_[id].maxDelta = delta;
}
abstract_[id].totDelta += delta;
abstract_[id].totSecondDelta += secondDelta;
abstract_[id].totLastTwoDelta += lastTwoDelta;
abstract_[id].totMidDelta += midDelta;
// update totAbstract_
totAbstract_.freq++;
if (delta < totAbstract_.minDelta) {
totAbstract_.minDelta = delta;
}
if (delta > totAbstract_.maxDelta) {
totAbstract_.maxDelta = delta;
}
totAbstract_.totDelta += delta;
totAbstract_.totSecondDelta += secondDelta;
totAbstract_.totLastTwoDelta += lastTwoDelta;
totAbstract_.totMidDelta += midDelta;
}
totSamples_++;
timeVector_->reset();
}
}
void BarrierEndStat::reset(bool clearRawData) {
int32_t i = 0;
totSamples_ = 0;
std::lock_guard<std::mutex> guard(abstractLock_);
if (clearRawData) {
timeVector_->reset();
}
for (auto &abstract : abstract_) {
memset((void *)&abstract, 0, sizeof(abstract));
abstract.minDelta = UINT64_MAX;
abstract.trainerId = i++;
}
memset((void *)&totAbstract_, 0, sizeof(Abstract));
totAbstract_.minDelta = UINT64_MAX;
}
void BarrierEndStat::showAbstract(std::ostream &output) const {
// do not support the case "<=2 pserver"
if (numConnThreads_ <= 2 || !totSamples_) {
return;
}
// duplicate freq info
std::vector<struct Abstract> outputAbstract = abstract_;
std::sort(outputAbstract.begin(),
outputAbstract.end(),
[](const struct Abstract &a, const struct Abstract &b) {
return a.freq > b.freq;
});
auto rate = (float)outputAbstract[0].freq / (float)totSamples_;
if (rate < rateThreshold_) {
return;
}
output << std::setw(20) << name_ << std::endl;
/*
* Note:
* avgGap: the average delta between 1 -- n arriving trainers
* avgSecondGap: the average delta between 2 -- n arriving trainers
* avgLastTwoGap: the average delta between n-1 -- n arriving trainers
* avgMidGap: the average delta between n/2 -- n arriving trainers
* rato: samples / totSamples
*
* the stat is based on per trainer if trainer_id is set, totAbstract is
* stat based on all trainers scope.
*/
output << std::setw(42) << " " << std::setw(15) << "trainerId"
<< std::setw(15) << "avgGap" << std::setw(15) << "avgSecondGap"
<< std::setw(15) << "avgLastTwoGap" << std::setw(15) << "avgMidGap"
<< std::setw(10) << "rate" << std::setw(10) << "samples"
<< std::setw(10) << "totSamples" << std::endl;
// show totAbstract, it's valuable when lastTrainerId is even-distributed'
if (!totAbstract_.freq) return;
output << std::setw(42) << " " << std::setw(15) << "totAbstract"
<< std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001
<< std::setw(15)
<< (totAbstract_.totSecondDelta / totAbstract_.freq) * 0.001
<< std::setw(15)
<< (totAbstract_.totLastTwoDelta / totAbstract_.freq) * 0.001
<< std::setw(15)
<< (totAbstract_.totMidDelta / totAbstract_.freq) * 0.001
<< std::setw(10) << (float)totAbstract_.freq / (float)totSamples_
<< std::setw(10) << (float)totAbstract_.freq << std::setw(10)
<< (float)totSamples_ << std::endl;
// show lastTrainerId abstract
int count = 0;
for (auto &abstract : outputAbstract) {
if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) {
break;
}
// output format control
output << std::setw(42) << " " << std::setw(15) << abstract.trainerId
<< std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001
<< std::setw(15) << (abstract.totSecondDelta / abstract.freq) * 0.001
<< std::setw(15)
<< (abstract.totLastTwoDelta / abstract.freq) * 0.001
<< std::setw(15) << (abstract.totMidDelta / abstract.freq) * 0.001
<< std::setw(10) << (float)abstract.freq / (float)totSamples_
<< std::setw(10) << (float)abstract.freq << std::setw(10)
<< (float)totSamples_ << std::endl;
}
}
BarrierDeltaStat::BarrierDeltaStat(uint16_t numConnThreads,
const std::string &name)
: BarrierStatBase(numConnThreads, name) {
timeVector_.reset(new TimeVectorDelta(numConnThreads_));
reset(true);
LOG(INFO) << " create barrierDeltaStat: " << name
<< " barrierDelta warning rate: " << rateThreshold_;
}
void BarrierDeltaStat::updateStat(uint64_t delta, int32_t trainerId) {
CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier";
std::lock_guard<std::mutex> guard(lock_);
timeVector_->addTimeval(delta, trainerId);
if (timeVector_->full()) {
std::lock_guard<std::mutex> abstractGuard(abstractLock_);
auto id = timeVector_->getMaxTrainerId();
auto delta = timeVector_->getDelta();
// discard first sample, since first sample probably is abnormal.
if (totSamples_) {
abstract_[id].freq++;
if (delta < abstract_[id].minDelta) {
abstract_[id].minDelta = delta;
}
if (delta > abstract_[id].maxDelta) {
abstract_[id].maxDelta = delta;
}
abstract_[id].totDelta += delta;
// update totAbstract_
totAbstract_.freq++;
if (delta < totAbstract_.minDelta) {
totAbstract_.minDelta = delta;
}
if (delta > totAbstract_.maxDelta) {
totAbstract_.maxDelta = delta;
}
totAbstract_.totDelta += delta;
}
totSamples_++;
timeVector_->reset();
}
}
void BarrierDeltaStat::reset(bool clearRawData) {
int32_t i = 0;
totSamples_ = 0;
std::lock_guard<std::mutex> guard(abstractLock_);
if (clearRawData) {
timeVector_->reset();
}
for (auto &abstract : abstract_) {
memset((void *)&abstract, 0, sizeof(abstract));
abstract.minDelta = UINT64_MAX;
abstract.trainerId = i++;
}
memset((void *)&totAbstract_, 0, sizeof(Abstract));
totAbstract_.minDelta = UINT64_MAX;
}
void BarrierDeltaStat::showAbstract(std::ostream &output) const {
// do not support the case "<=2 pserver"
if (numConnThreads_ <= 2 || !totSamples_) {
return;
}
// duplicate freq info
std::vector<struct Abstract> outputAbstract = abstract_;
std::sort(outputAbstract.begin(),
outputAbstract.end(),
[](const struct Abstract &a, const struct Abstract &b) {
return a.freq > b.freq;
});
auto rate = (float)outputAbstract[0].freq / (float)totSamples_;
if (rate < rateThreshold_) {
return;
}
output << std::setw(20) << name_ << std::endl;
/* Note:
* Gap means the delta from all trainers' forwardbackward
* avgGap: average Gap in log_period batches
* minGap: min Gap in log_period batches
* maxGap: max Gap in log_period batches
* trainerId: the slowest trainer_id
*
* the stat is based on per trainer if trainer_id is set, totAbstract is
* stat based on all trainers scope.
*/
output << std::setw(42) << " " << std::setw(15) << "trainerId"
<< std::setw(15) << "avgGap" << std::setw(10) << "minGap"
<< std::setw(10) << "maxGap" << std::setw(10) << "rate"
<< std::setw(10) << "samples" << std::setw(10) << "totSamples"
<< std::endl;
// show totAbstract, it's valuable when lastTrainerId is even-distributed'
if (!totAbstract_.freq) return;
output << std::setw(42) << " " << std::setw(15) << "totAbstract"
<< std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001
<< std::setw(10) << totAbstract_.minDelta * 0.001 << std::setw(10)
<< totAbstract_.maxDelta * 0.001 << std::setw(10)
<< (float)totAbstract_.freq / (float)totSamples_ << std::setw(10)
<< (float)totAbstract_.freq << std::setw(10) << (float)totSamples_
<< std::endl;
// show lastTrainerId abstract
int count = 0;
for (auto &abstract : outputAbstract) {
if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) {
break;
}
// output format control
output << std::setw(42) << " " << std::setw(15) << abstract.trainerId
<< std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001
<< std::setw(10) << abstract.minDelta * 0.001 << std::setw(10)
<< abstract.maxDelta * 0.001 << std::setw(10)
<< (float)abstract.freq / (float)totSamples_ << std::setw(10)
<< (float)abstract.freq << std::setw(10) << (float)totSamples_
<< std::endl;
}
}
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <stdint.h>
#include <sys/time.h>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include "Locks.h"
#include "Logging.h"
#include "ThreadLocal.h"
namespace paddle {
inline uint64_t timeToMicroSecond(struct timeval time) {
return time.tv_sec * 1000000LU + time.tv_usec;
}
class TimeVectorEnd {
/*
* help class for gathering all barrier performance data
* which shows time point property.
* freqently used in barrier performance tuning API, such
* as tuning which is slowest node in sync-sgd mode training.
*/
public:
explicit TimeVectorEnd(uint16_t size) : size_(size) {
index_ = 0;
timeArray_.resize(size);
trainerIds_.resize(size);
}
~TimeVectorEnd() {}
uint16_t size() { return size_; }
bool full() { return index_ == size_; }
bool empty() { return index_ == 0; }
void reset() { index_ = 0; }
void addTimeval(struct timeval time, int32_t trainerId) {
timeArray_[index_] = time;
trainerIds_[index_] = trainerId;
index_++;
}
struct timeval getDelta() const {
struct timeval delta;
CHECK_GT(size_, 1) << "not support with 1 pserver";
timersub(&timeArray_[size_ - 1], &timeArray_[0], &delta);
return delta;
}
/* 2, n delta */
struct timeval get1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
struct timeval delta;
timersub(&timeArray_[size_ - 1], &timeArray_[1], &delta);
return delta;
}
/* n-1, n delta */
struct timeval getMinus1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
struct timeval delta;
timersub(&timeArray_[size_ - 1], &timeArray_[size_ - 2], &delta);
return delta;
}
/* n/2, n delta */
struct timeval getMidNDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
struct timeval delta;
timersub(&timeArray_[size_ - 1], &timeArray_[size_ / 2], &delta);
return delta;
}
int32_t getLastTrainerId() const { return trainerIds_[index_ - 1]; }
private:
uint16_t size_;
uint16_t index_;
std::vector<struct timeval> timeArray_;
std::vector<int32_t> trainerIds_;
};
class TimeVectorDelta {
/*
* help class for gathering performance data which shows time
* delta property, such as tuning the time distribution of
* forwardBackward time from all cluster nodes.
*/
public:
explicit TimeVectorDelta(uint16_t size)
: size_(size), min_(UINT64_MAX), max_(0) {
index_ = 0;
timeArray_.resize(size);
}
~TimeVectorDelta() {}
uint16_t size() { return size_; }
bool full() { return index_ == size_; }
bool empty() { return index_ == 0; }
void reset() {
index_ = 0;
min_ = UINT64_MAX;
max_ = 0;
}
void addTimeval(uint64_t delta, int32_t trainerId) {
timeArray_[index_] = delta;
index_++;
if (delta < min_) {
min_ = delta;
}
if (delta > max_) {
max_ = delta;
maxTrainerId_ = trainerId;
}
}
uint64_t getDelta() const {
CHECK_GT(size_, 1) << "not support with 1 pserver";
return max_ - min_;
}
/* 2, n delta */
uint64_t get1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
LOG(FATAL) << "Not implemented";
}
/* n-1, n delta */
uint64_t getMinus1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
LOG(FATAL) << "Not implemented";
}
/* n/2, n delta */
uint64_t getMidNDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
LOG(FATAL) << "Not implemented";
}
int32_t getMaxTrainerId() const { return maxTrainerId_; }
private:
uint16_t size_;
uint16_t index_;
std::vector<uint64_t> timeArray_;
private:
uint64_t min_;
uint64_t max_;
int32_t maxTrainerId_;
};
// total samples stats, us
struct Abstract {
// last trainerId for barrier end, maxDelta trainerId for barrier delta
int32_t trainerId;
uint64_t minDelta;
uint64_t maxDelta;
uint64_t totDelta;
// first one is probably itself, so discard it.
uint64_t totSecondDelta;
// to confirm if last node destroy barrier performance.
uint64_t totLastTwoDelta;
// n/2-n delta
uint64_t totMidDelta;
uint64_t freq;
};
// barrier performance tunning stats
class BarrierStatBase {
public:
BarrierStatBase(uint16_t numConnThreads, const std::string &name);
virtual ~BarrierStatBase() {}
// if called at pserver end, then trainId means trainer's id.
// by default trainer does not use trainerId, so set it to -1
virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) = 0;
virtual void updateStat(uint64_t delta, int32_t trainerId = -1) = 0;
const std::string &getName() { return name_; }
virtual void reset(bool clearRawData = true) {}
// since the timeVector_ is not stateful, so it's not clear whether the
// the barrier delta is correct. if one timestamp was lost, the all data
// from barrier stat becomes rubbish. -_-
virtual bool checkPassBarrier() {
LOG(INFO) << "bug implementation found";
return false;
}
protected:
virtual void showAbstract(std::ostream &output) const {}
friend std::ostream &operator<<(std::ostream &output,
const BarrierStatBase &stat);
protected:
mutable std::mutex lock_;
std::mutex abstractLock_; // see note on updaterStat
// each freqency for each barrier trainer
std::vector<struct Abstract> abstract_;
// it is valuable when do perf-tuining, if lastTrainerId acts uniform
// distribution
struct Abstract totAbstract_;
uint64_t totSamples_;
protected:
uint16_t numConnThreads_; // total updates needed
float rateThreshold_;
std::string name_;
};
// the end-time of arriving real/forged barrier position
class BarrierEndStat : public BarrierStatBase {
public:
BarrierEndStat(uint16_t numConnThreads, const std::string &name);
~BarrierEndStat() {}
virtual void updateStat(struct timeval &cur, int32_t trainerId = -1);
virtual void updateStat(uint64_t delta, int32_t trainerId = -1) {
LOG(INFO) << "have no delta updateStat in BarrierEndStat";
}
virtual void reset(bool clearRawData = true);
virtual bool checkPassBarrier() { return timeVector_->empty(); }
protected:
/*
* LOG:
* readAllBlocks_denseUpdater
* trainerId avgGap avgSecondGap avgLastTwoGap avgMidGap rate
* 44 86.702 81.022 9.984 50.472 0.144737
* 46 87.723 82.939 8.737 50.019 0.118421
* 35 100.923 96.752 14.305 61.979
* 0.0657895
* log_barrier_abstract, log_barrier_lowest_nodes, log_barrier_threshold
* control details.
*/
virtual void showAbstract(std::ostream &output) const;
private:
std::unique_ptr<TimeVectorEnd> timeVector_;
};
// the delta-time from different trainers,
// eg, find the degree of imbalance of BP time at pserver end
// the entry value in timerVector_ is BP delta, do evaluation to BP delta.
class BarrierDeltaStat : public BarrierStatBase {
public:
BarrierDeltaStat(uint16_t numConnThreads, const std::string &name);
~BarrierDeltaStat() {}
virtual void updateStat(uint64_t delta, int32_t trainerId = -1);
virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) {
LOG(INFO) << "have no timeval updateStat in BarrierDeltaStat";
}
virtual void reset(bool clearRawData = true);
virtual bool checkPassBarrier() { return timeVector_->empty(); }
protected:
virtual void showAbstract(std::ostream &outPut) const;
private:
// store delta time in uint64_t, eg BP time of all trainers
std::unique_ptr<TimeVectorDelta> timeVector_;
};
// to distinguish different contexts for same parallel threads, and different
// threads with same code-sgement, just use tagName to tag the run-time
// position.
// in Sparse, sendParallel threads can not only run in the stage of push&pull
// with same thread group, but also run in the stage of pull&push with different
// thread group, tag will be used to distinguish different run-time barrier
// position.
// trainerId in REGISTER_BARRIER_TIMER_SERVER is used to retreive lowest trainer
// nodes.
// end barrier
#define __REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
struct timeval cur; \
gettimeofday(&cur, nullptr); \
__stat->updateStat(cur, trainerId); \
} \
} while (0);
// end barrier with user-defined timer
#define __REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
__stat->updateStat(cur, trainerId); \
} \
} while (0);
// delta barrier
#define __REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, delta, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_DELTA); \
__stat->updateStat(delta, trainerId); \
} \
} while (0);
// check end barrier
#define __CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \
do { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
PCHECK(__stat->checkPassBarrier()) << internalName \
<< ": invalid barrier data"; \
} while (0);
/*
* Note:
* with sync-sgd algriothm in cluster mode, lots of synchronize action exsit at
* pserve end. these synchronizaton actions have impact on the efficiency of
* parameter exchange. the synchronizaton(barrier) GAP is composed of lots of
* factors, such as the forwardBackward variance, network fluncation. we try
* to have a quantitative analysis on these factor, so we design lots of barrier
* time to capture these performance. these barrier also can be placed at
* implict barrier position.
*
* example:
* in sync-sgd algorithm, each parameter server waits for all gradients from
* all trainers, thus, an explict barrier point exsit before doing optimization.
* the barrier timer located before the point can sense the barrier condition.
*
*/
// try to capture which trainer is slowest node in sync-sgd at pserver.
#define REGISTER_SLOW_NODES_PROBE( \
set, statName, numConnThreads, trainerId, ...) \
__REGISTER_BARRIER_TIMER_SERVER( \
(set), statName, numConnThreads, trainerId, __VA_ARGS__)
// try to check if all threads or trainers have passed barriers for data
// accuracy.
#define CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \
__CHECK_BARRIER_TIMER((set), statName, numConnThreads, __VA_ARGS__)
#ifdef PADDLE_DISABLE_TIMER
#define REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...)
#define REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...)
#define REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...)
#else
/*
* sensing barrier time distribution for all parallelization threads.
* it provides low API for slow node check(REGISTER_SLOW_NODES_PROBE)
*/
#define REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...) \
__REGISTER_BARRIER_TIMER_SERVER( \
(set), statName, numConnThreads, trainerId, __VA_ARGS__)
/*
* sensing barrier time distribution for all parallelization threads.
* but time point for barrier performance is set by user.
* eg, with this api, you can get implict barrier point such as the beginning
* time distribution
* for receiving data.
*/
#define REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...) \
__REGISTER_BARRIER_TIMER_SERVER_SET( \
(set), statName, numConnThreads, trainerId, cur, __VA_ARGS__)
// try to capture time delta from all trainers, such as forwardBackward time
// which implies
// computation fluctuation
#define REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, delta, ...) \
__REGISTER_BARRIER_DELTA_SERVER_SET( \
(set), statName, numConnThreads, trainerId, delta, __VA_ARGS__)
#endif // DISABLE_TIMER
} // namespace paddle
...@@ -97,34 +97,6 @@ std::ostream& operator<<(std::ostream& outPut, const Stat& stat) { ...@@ -97,34 +97,6 @@ std::ostream& operator<<(std::ostream& outPut, const Stat& stat) {
return outPut; return outPut;
} }
BarrierStatPtr StatSet::getStat(uint16_t numConnThreads,
const std::string& name,
BarrierStatType bType) {
{
ReadLockGuard guard(lock_);
auto it = barrierStatSet_.find(name);
if (it != barrierStatSet_.end()) {
return it->second;
}
}
std::lock_guard<RWLock> guard(lock_);
// test again with lock_guard
auto it = barrierStatSet_.find(name);
if (it != barrierStatSet_.end()) {
return it->second;
}
BarrierStatPtr stat;
if (bType == BARRIER_END) {
stat = std::make_shared<BarrierEndStat>(numConnThreads, name);
} else if (bType == BARRIER_DELTA) {
stat = std::make_shared<BarrierDeltaStat>(numConnThreads, name);
}
auto ret = barrierStatSet_.insert(std::make_pair(name, stat));
return ret.first->second;
}
void StatSet::printSegTimerStatus() { void StatSet::printSegTimerStatus() {
ReadLockGuard guard(lock_); ReadLockGuard guard(lock_);
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
...@@ -135,46 +107,20 @@ void StatSet::printSegTimerStatus() { ...@@ -135,46 +107,20 @@ void StatSet::printSegTimerStatus() {
} }
} }
void StatSet::printBarrierTimerStatus() {
ReadLockGuard guard(lock_);
if (barrierStatSet_.empty()) {
return;
}
// control barrierAbstact in runtime, so enable compliation
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
<< "======= BarrierStatSet status ======" << std::endl;
for (auto& stat : barrierStatSet_) {
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
<< *(stat.second);
}
}
void StatSet::printAllStatus() { void StatSet::printAllStatus() {
#ifndef PADDLE_DISABLE_TIMER #ifndef PADDLE_DISABLE_TIMER
printSegTimerStatus(); printSegTimerStatus();
#endif #endif
printBarrierTimerStatus();
LOG(INFO) << std::setiosflags(std::ios::left) LOG(INFO) << std::setiosflags(std::ios::left)
<< "--------------------------------------------------" << "--------------------------------------------------"
<< std::endl; << std::endl;
} }
void StatSet::printStatus(const std::string& name) {
ReadLockGuard guard(lock_);
auto iter = statSet_.find(name);
CHECK(iter != statSet_.end()) << name << " is not registed in " << name_;
LOG(INFO) << *(iter->second);
}
void StatSet::reset(bool clearRawData) { void StatSet::reset(bool clearRawData) {
ReadLockGuard guard(lock_); ReadLockGuard guard(lock_);
for (auto& stat : statSet_) { for (auto& stat : statSet_) {
stat.second->reset(); stat.second->reset();
} }
// reset barrierStat
for (auto& stat : barrierStatSet_) {
stat.second->reset(clearRawData);
}
} }
void StatSet::setThreadInfo(const std::string& name, bool flag) { void StatSet::setThreadInfo(const std::string& name, bool flag) {
...@@ -184,13 +130,6 @@ void StatSet::setThreadInfo(const std::string& name, bool flag) { ...@@ -184,13 +130,6 @@ void StatSet::setThreadInfo(const std::string& name, bool flag) {
iter->second->setThreadInfo(flag); iter->second->setThreadInfo(flag);
} }
void StatSet::deleteStat(const std::string& name) {
std::lock_guard<RWLock> guard(lock_);
auto iter = statSet_.find(name);
CHECK(iter != statSet_.end()) << name << " is not registed in " << name_;
statSet_.erase(iter);
}
StatInfo::~StatInfo() { StatInfo::~StatInfo() {
if (stat_) { if (stat_) {
std::lock_guard<std::mutex> guard(stat_->lock_); std::lock_guard<std::mutex> guard(stat_->lock_);
......
...@@ -23,7 +23,6 @@ limitations under the License. */ ...@@ -23,7 +23,6 @@ limitations under the License. */
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include "BarrierStat.h"
#include "Locks.h" #include "Locks.h"
#include "Logging.h" #include "Logging.h"
#include "ThreadLocal.h" #include "ThreadLocal.h"
...@@ -60,12 +59,6 @@ public: ...@@ -60,12 +59,6 @@ public:
class Stat; class Stat;
typedef std::shared_ptr<Stat> StatPtr; typedef std::shared_ptr<Stat> StatPtr;
typedef std::shared_ptr<BarrierStatBase> BarrierStatPtr;
enum BarrierStatType {
BARRIER_END = 0,
BARRIER_DELTA = 1,
};
class StatSet { class StatSet {
public: public:
...@@ -74,11 +67,8 @@ public: ...@@ -74,11 +67,8 @@ public:
// print to LOG(INFO) // print to LOG(INFO)
void printSegTimerStatus(); void printSegTimerStatus();
void printBarrierTimerStatus();
void printAllStatus(); void printAllStatus();
void printStatus(const std::string& name);
StatPtr getStat(const std::string& name) { StatPtr getStat(const std::string& name) {
{ {
ReadLockGuard guard(lock_); ReadLockGuard guard(lock_);
...@@ -93,12 +83,6 @@ public: ...@@ -93,12 +83,6 @@ public:
return ret.first->second; return ret.first->second;
} }
BarrierStatPtr getStat(uint16_t numConnThreads,
const std::string& name,
BarrierStatType bType);
void deleteStat(const std::string& name);
// true for showing stats for each thread // true for showing stats for each thread
// false for showing stats aggragated over threads // false for showing stats aggragated over threads
void setThreadInfo(const std::string& name, bool flag); void setThreadInfo(const std::string& name, bool flag);
...@@ -120,7 +104,6 @@ public: ...@@ -120,7 +104,6 @@ public:
private: private:
std::unordered_map<std::string, StatPtr> statSet_; std::unordered_map<std::string, StatPtr> statSet_;
std::unordered_map<std::string, BarrierStatPtr> barrierStatSet_;
const std::string name_; const std::string name_;
RWLock lock_; RWLock lock_;
}; };
......
...@@ -51,7 +51,7 @@ template <class T> ...@@ -51,7 +51,7 @@ template <class T>
class ThreadLocal { class ThreadLocal {
public: public:
ThreadLocal() { ThreadLocal() {
PCHECK(pthread_key_create(&threadSpecificKey_, dataDestructor) == 0); CHECK(pthread_key_create(&threadSpecificKey_, dataDestructor) == 0);
} }
~ThreadLocal() { pthread_key_delete(threadSpecificKey_); } ~ThreadLocal() { pthread_key_delete(threadSpecificKey_); }
...@@ -65,7 +65,7 @@ public: ...@@ -65,7 +65,7 @@ public:
if (!p && createLocal) { if (!p && createLocal) {
p = new T(); p = new T();
int ret = pthread_setspecific(threadSpecificKey_, p); int ret = pthread_setspecific(threadSpecificKey_, p);
PCHECK(ret == 0); CHECK(ret == 0);
} }
return p; return p;
} }
...@@ -79,7 +79,7 @@ public: ...@@ -79,7 +79,7 @@ public:
if (T* q = get(false)) { if (T* q = get(false)) {
dataDestructor(q); dataDestructor(q);
} }
PCHECK(pthread_setspecific(threadSpecificKey_, p) == 0); CHECK(pthread_setspecific(threadSpecificKey_, p) == 0);
} }
/** /**
...@@ -112,7 +112,7 @@ private: ...@@ -112,7 +112,7 @@ private:
template <class T> template <class T>
class ThreadLocalD { class ThreadLocalD {
public: public:
ThreadLocalD() { PCHECK(pthread_key_create(&threadSpecificKey_, NULL) == 0); } ThreadLocalD() { CHECK(pthread_key_create(&threadSpecificKey_, NULL) == 0); }
~ThreadLocalD() { ~ThreadLocalD() {
pthread_key_delete(threadSpecificKey_); pthread_key_delete(threadSpecificKey_);
for (auto t : threadMap_) { for (auto t : threadMap_) {
...@@ -127,7 +127,7 @@ public: ...@@ -127,7 +127,7 @@ public:
T* p = (T*)pthread_getspecific(threadSpecificKey_); T* p = (T*)pthread_getspecific(threadSpecificKey_);
if (!p) { if (!p) {
p = new T(); p = new T();
PCHECK(pthread_setspecific(threadSpecificKey_, p) == 0); CHECK(pthread_setspecific(threadSpecificKey_, p) == 0);
updateMap(p); updateMap(p);
} }
return p; return p;
...@@ -141,7 +141,7 @@ public: ...@@ -141,7 +141,7 @@ public:
if (T* q = (T*)pthread_getspecific(threadSpecificKey_)) { if (T* q = (T*)pthread_getspecific(threadSpecificKey_)) {
dataDestructor(q); dataDestructor(q);
} }
PCHECK(pthread_setspecific(threadSpecificKey_, p) == 0); CHECK(pthread_setspecific(threadSpecificKey_, p) == 0);
updateMap(p); updateMap(p);
} }
......
...@@ -2511,10 +2511,14 @@ class MaxLayer(LayerBase): ...@@ -2511,10 +2511,14 @@ class MaxLayer(LayerBase):
trans_type='non-seq', trans_type='non-seq',
bias=False, bias=False,
output_max_index=None, output_max_index=None,
stride=-1,
**xargs): **xargs):
super(MaxLayer, self).__init__(name, 'max', 0, inputs=inputs, **xargs) super(MaxLayer, self).__init__(name, 'max', 0, inputs=inputs, **xargs)
config_assert(len(self.inputs) == 1, 'MaxLayer must have 1 input') config_assert(len(self.inputs) == 1, 'MaxLayer must have 1 input')
if trans_type == 'seq':
config_assert(stride == -1, 'subseq does not support stride window')
self.config.trans_type = trans_type self.config.trans_type = trans_type
self.config.seq_pool_stride = stride
for input_index in xrange(len(self.inputs)): for input_index in xrange(len(self.inputs)):
input_layer = self.get_input_layer(input_index) input_layer = self.get_input_layer(input_index)
self.set_layer_size(input_layer.size) self.set_layer_size(input_layer.size)
...@@ -2776,11 +2780,15 @@ class AverageLayer(LayerBase): ...@@ -2776,11 +2780,15 @@ class AverageLayer(LayerBase):
average_strategy='average', average_strategy='average',
trans_type='non-seq', trans_type='non-seq',
bias=False, bias=False,
stride=-1,
**xargs): **xargs):
super(AverageLayer, self).__init__( super(AverageLayer, self).__init__(
name, 'average', 0, inputs=inputs, **xargs) name, 'average', 0, inputs=inputs, **xargs)
self.config.average_strategy = average_strategy self.config.average_strategy = average_strategy
if trans_type == 'seq':
config_assert(stride == -1, 'subseq does not support stride window')
self.config.trans_type = trans_type self.config.trans_type = trans_type
self.config.seq_pool_stride = stride
config_assert(len(inputs) == 1, 'AverageLayer must have 1 input') config_assert(len(inputs) == 1, 'AverageLayer must have 1 input')
for input_index in xrange(len(self.inputs)): for input_index in xrange(len(self.inputs)):
input_layer = self.get_input_layer(input_index) input_layer = self.get_input_layer(input_index)
......
...@@ -1247,10 +1247,19 @@ def pooling_layer(input, ...@@ -1247,10 +1247,19 @@ def pooling_layer(input,
name=None, name=None,
bias_attr=None, bias_attr=None,
agg_level=AggregateLevel.TO_NO_SEQUENCE, agg_level=AggregateLevel.TO_NO_SEQUENCE,
stride=-1,
layer_attr=None): layer_attr=None):
""" """
Pooling layer for sequence inputs, not used for Image. Pooling layer for sequence inputs, not used for Image.
If stride > 0, this layer slides a window whose size is determined by stride,
and return the pooling value of the window as the output. Thus, a long sequence
will be shorten.
The parameter stride specifies the intervals at which to apply the pooling
operation. Note that for sequence with sub-sequence, the default value
of stride is -1.
The example usage is: The example usage is:
.. code-block:: python .. code-block:: python
...@@ -1269,6 +1278,8 @@ def pooling_layer(input, ...@@ -1269,6 +1278,8 @@ def pooling_layer(input,
:param pooling_type: Type of pooling, MaxPooling(default), AvgPooling, :param pooling_type: Type of pooling, MaxPooling(default), AvgPooling,
SumPooling, SquareRootNPooling. SumPooling, SquareRootNPooling.
:type pooling_type: BasePoolingType|None :type pooling_type: BasePoolingType|None
:param stride: The step size between successive pooling regions.
:type stride: Int
:param bias_attr: Bias parameter attribute. False if no bias. :param bias_attr: Bias parameter attribute. False if no bias.
:type bias_attr: ParameterAttribute|None|False :type bias_attr: ParameterAttribute|None|False
:param layer_attr: The Extra Attributes for layer, such as dropout. :param layer_attr: The Extra Attributes for layer, such as dropout.
...@@ -1286,12 +1297,16 @@ def pooling_layer(input, ...@@ -1286,12 +1297,16 @@ def pooling_layer(input,
extra_dict['output_max_index'] = pooling_type.output_max_index extra_dict['output_max_index'] = pooling_type.output_max_index
extra_dict.update(ExtraLayerAttribute.to_kwargs(layer_attr)) extra_dict.update(ExtraLayerAttribute.to_kwargs(layer_attr))
if agg_level == AggregateLevel.TO_SEQUENCE:
assert stride == -1
Layer( Layer(
name=name, name=name,
type=pooling_type.name, type=pooling_type.name,
inputs=[Input(input.name)], inputs=[Input(input.name)],
bias=ParamAttr.to_bias(bias_attr), bias=ParamAttr.to_bias(bias_attr),
trans_type=agg_level, trans_type=agg_level,
stride=stride,
**extra_dict) **extra_dict)
return LayerOutput( return LayerOutput(
...@@ -1553,7 +1568,7 @@ def last_seq(input, ...@@ -1553,7 +1568,7 @@ def last_seq(input,
:type name: basestring :type name: basestring
:param input: Input layer name. :param input: Input layer name.
:type input: LayerOutput :type input: LayerOutput
:param stride: window size. :param stride: The step size between successive pooling regions.
:type stride: Int :type stride: Int
:param layer_attr: extra layer attributes. :param layer_attr: extra layer attributes.
:type layer_attr: ExtraLayerAttribute. :type layer_attr: ExtraLayerAttribute.
...@@ -1609,7 +1624,7 @@ def first_seq(input, ...@@ -1609,7 +1624,7 @@ def first_seq(input,
:type name: basestring :type name: basestring
:param input: Input layer name. :param input: Input layer name.
:type input: LayerOutput :type input: LayerOutput
:param stride: window size. :param stride: The step size between successive pooling regions.
:type stride: Int :type stride: Int
:param layer_attr: extra layer attributes. :param layer_attr: extra layer attributes.
:type layer_attr: ExtraLayerAttribute. :type layer_attr: ExtraLayerAttribute.
......
...@@ -14,6 +14,7 @@ layers { ...@@ -14,6 +14,7 @@ layers {
input_layer_name: "dat_in" input_layer_name: "dat_in"
} }
trans_type: "seq" trans_type: "seq"
seq_pool_stride: -1
} }
layers { layers {
name: "__seq_pooling_1__" name: "__seq_pooling_1__"
...@@ -24,6 +25,7 @@ layers { ...@@ -24,6 +25,7 @@ layers {
input_layer_name: "dat_in" input_layer_name: "dat_in"
} }
trans_type: "non-seq" trans_type: "non-seq"
seq_pool_stride: -1
} }
layers { layers {
name: "__seq_pooling_2__" name: "__seq_pooling_2__"
...@@ -35,6 +37,7 @@ layers { ...@@ -35,6 +37,7 @@ layers {
} }
average_strategy: "average" average_strategy: "average"
trans_type: "seq" trans_type: "seq"
seq_pool_stride: -1
} }
layers { layers {
name: "__seq_pooling_3__" name: "__seq_pooling_3__"
...@@ -46,6 +49,7 @@ layers { ...@@ -46,6 +49,7 @@ layers {
} }
average_strategy: "average" average_strategy: "average"
trans_type: "non-seq" trans_type: "non-seq"
seq_pool_stride: -1
} }
layers { layers {
name: "__seq_pooling_4__" name: "__seq_pooling_4__"
...@@ -57,6 +61,7 @@ layers { ...@@ -57,6 +61,7 @@ layers {
} }
average_strategy: "sum" average_strategy: "sum"
trans_type: "seq" trans_type: "seq"
seq_pool_stride: -1
} }
layers { layers {
name: "__seq_pooling_5__" name: "__seq_pooling_5__"
...@@ -68,6 +73,7 @@ layers { ...@@ -68,6 +73,7 @@ layers {
} }
average_strategy: "sum" average_strategy: "sum"
trans_type: "non-seq" trans_type: "non-seq"
seq_pool_stride: -1
} }
layers { layers {
name: "__seq_pooling_6__" name: "__seq_pooling_6__"
...@@ -77,8 +83,44 @@ layers { ...@@ -77,8 +83,44 @@ layers {
inputs { inputs {
input_layer_name: "dat_in" input_layer_name: "dat_in"
} }
trans_type: "non-seq"
seq_pool_stride: 5
}
layers {
name: "__seq_pooling_7__"
type: "average"
size: 100
active_type: ""
inputs {
input_layer_name: "dat_in"
}
average_strategy: "average"
trans_type: "non-seq"
seq_pool_stride: 5
}
layers {
name: "__seq_pooling_8__"
type: "average"
size: 100
active_type: ""
inputs {
input_layer_name: "dat_in"
}
average_strategy: "sum"
trans_type: "non-seq"
seq_pool_stride: 5
}
layers {
name: "__seq_pooling_9__"
type: "max"
size: 100
active_type: ""
inputs {
input_layer_name: "dat_in"
}
output_max_index: true output_max_index: true
trans_type: "non-seq" trans_type: "non-seq"
seq_pool_stride: -1
} }
input_layer_names: "dat_in" input_layer_names: "dat_in"
output_layer_names: "__seq_pooling_0__" output_layer_names: "__seq_pooling_0__"
...@@ -88,6 +130,9 @@ output_layer_names: "__seq_pooling_3__" ...@@ -88,6 +130,9 @@ output_layer_names: "__seq_pooling_3__"
output_layer_names: "__seq_pooling_4__" output_layer_names: "__seq_pooling_4__"
output_layer_names: "__seq_pooling_5__" output_layer_names: "__seq_pooling_5__"
output_layer_names: "__seq_pooling_6__" output_layer_names: "__seq_pooling_6__"
output_layer_names: "__seq_pooling_7__"
output_layer_names: "__seq_pooling_8__"
output_layer_names: "__seq_pooling_9__"
sub_models { sub_models {
name: "root" name: "root"
layer_names: "dat_in" layer_names: "dat_in"
...@@ -98,6 +143,9 @@ sub_models { ...@@ -98,6 +143,9 @@ sub_models {
layer_names: "__seq_pooling_4__" layer_names: "__seq_pooling_4__"
layer_names: "__seq_pooling_5__" layer_names: "__seq_pooling_5__"
layer_names: "__seq_pooling_6__" layer_names: "__seq_pooling_6__"
layer_names: "__seq_pooling_7__"
layer_names: "__seq_pooling_8__"
layer_names: "__seq_pooling_9__"
input_layer_names: "dat_in" input_layer_names: "dat_in"
output_layer_names: "__seq_pooling_0__" output_layer_names: "__seq_pooling_0__"
output_layer_names: "__seq_pooling_1__" output_layer_names: "__seq_pooling_1__"
...@@ -106,6 +154,9 @@ sub_models { ...@@ -106,6 +154,9 @@ sub_models {
output_layer_names: "__seq_pooling_4__" output_layer_names: "__seq_pooling_4__"
output_layer_names: "__seq_pooling_5__" output_layer_names: "__seq_pooling_5__"
output_layer_names: "__seq_pooling_6__" output_layer_names: "__seq_pooling_6__"
output_layer_names: "__seq_pooling_7__"
output_layer_names: "__seq_pooling_8__"
output_layer_names: "__seq_pooling_9__"
is_recurrent_layer_group: false is_recurrent_layer_group: false
} }
...@@ -14,6 +14,14 @@ for pt in POOL_TYPE: ...@@ -14,6 +14,14 @@ for pt in POOL_TYPE:
for al in AGG_LEVEL: for al in AGG_LEVEL:
opts.append(pooling_layer(input=din, agg_level=al, pooling_type=pt())) opts.append(pooling_layer(input=din, agg_level=al, pooling_type=pt()))
for pt in POOL_TYPE:
opts.append(
pooling_layer(
input=din,
agg_level=AggregateLevel.TO_NO_SEQUENCE,
pooling_type=pt(),
stride=5))
opts.append( opts.append(
pooling_layer( pooling_layer(
input=din, pooling_type=MaxPooling(output_max_index=True))) input=din, pooling_type=MaxPooling(output_max_index=True)))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册