diff --git a/paddle/parameter/tests/test_common.cpp b/paddle/parameter/tests/test_common.cpp index 8bab5a6289e2bb9f634e8cce4557de55f7704447..64d204aea10c8a7905d90fac6ebccde3c9da1edc 100644 --- a/paddle/parameter/tests/test_common.cpp +++ b/paddle/parameter/tests/test_common.cpp @@ -172,53 +172,3 @@ TEST_F(CommonTest, syncThreadPool) { EXPECT_EQ((int)0, nums[i]); } } - -TEST_F(CommonTest, barrierStat) { - const int threadNum = 10; - - SyncThreadPool pool(threadNum); - -#define TEST_BARRIER_RANDOM(statName, numConnThreads, ...) \ - pool.exec([&](int tid, size_t numThreads) { \ - struct timeval time; \ - gettimeofday(&time, nullptr); \ - uint64_t usec = timeToMicroSecond(time); \ - std::srand(usec); \ - auto value = std::rand() % 100000; \ - usleep(value); \ - REGISTER_SLOW_NODES_PROBE( \ - globalStat, statName, numConnThreads, tid, __VA_ARGS__); \ - }); - - for (auto i = 0; i < 10; i++) { - TEST_BARRIER_RANDOM("synThreadBarrier1", threadNum); - TEST_BARRIER_RANDOM("synThreadBarrier2", threadNum); - } - - globalStat.printAllStatus(); - globalStat.reset(); - - for (auto i = 0; i < 10; i++) { - TEST_BARRIER_RANDOM("synThreadBarrier3", threadNum, "tag0"); - TEST_BARRIER_RANDOM("synThreadBarrier4", threadNum, "tag1"); - } - - globalStat.printAllStatus(); - globalStat.reset(); - -// use it to test accurate barrier gap -#define TEST_BARRIER(statName, numConnThreads, ...) \ - pool.exec([&](int tid, size_t numThreads) { \ - usleep(tid * 10000); \ - REGISTER_SLOW_NODES_PROBE( \ - globalStat, statName, numConnThreads, tid, __VA_ARGS__); \ - }); - - for (auto i = 0; i < 10; i++) { - TEST_BARRIER("synThreadBarrier3", threadNum, "tag0"); - TEST_BARRIER("synThreadBarrier4", threadNum, "tag1"); - } - - globalStat.printAllStatus(); - globalStat.reset(); -} diff --git a/paddle/pserver/ParameterServer2.cpp b/paddle/pserver/ParameterServer2.cpp index 41ac15336d3150417da1cf1631319604584991ec..d7c1d4f788f44c6bfcec040ba24bdc454348c911 100644 --- a/paddle/pserver/ParameterServer2.cpp +++ b/paddle/pserver/ParameterServer2.cpp @@ -217,10 +217,6 @@ void ParameterServer2::setConfig(const SetConfigRequest& request, SetConfigResponse response; callback(response); - - /// always defined, barrier slowest node function need it. - statSet_.reset(new StatSet("ParameterServer" + - str::to_string(static_cast(serverId_)))); } real bufferSum(const std::vector& buffers) { @@ -369,50 +365,7 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, std::vector* outputBuffers) { VLOG(1) << "pserver: addGradient"; - // forwardbackward delta from all trainers - // indicate the fluctuation caused by forwardbackward. - if (!numPassFinishClients_) { - REGISTER_BARRIER_DELTA_SERVER_SET( - *statSet_, - "forwardbackwardDelta", - FLAGS_num_gradient_servers, - request.trainer_id(), - request.forwardbackward_time(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - { - /// approximately pure network overhead - REGISTER_TIMER_DYNAMIC_SET( - "pushRecv", timeToMicroSecond(*handleRequestBegin_), -1, *statSet_); - } - -#ifndef PADDLE_DISABLE_TIMER - gettimeofday(&(*addGradBegin_), nullptr); -#endif - - /// barrier fluctuation caused by network and previous forwardbackward - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER_SET( - *statSet_, - "handleReqBegin", - FLAGS_num_gradient_servers, - request.trainer_id(), - (*handleRequestBegin_), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "addGradBegin", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - - { - REGISTER_TIMER_DYNAMIC("addGradCore", -1, *statSet_); ReadLockGuard guard(parameterMutex_); int bufferIndex = 0; for (const auto& block : request.blocks()) { @@ -444,15 +397,6 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, std::lock_guard guard(*info.lock); simd::addTo(gradientSumBuffer, gradientBuffer, size); } - - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "addGradCoreFinish", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } } if (request.batch_status() == BATCH_FINISH || request.batch_status() == BATCH_START_AND_FINISH) { @@ -461,47 +405,12 @@ void ParameterServer2::addGradient(const SendParameterRequest& request, VLOG(1) << "num samples: " << numSamplesProcessed_ << ", new cost:" << cost_; - /// numPassFinishClients_ means some trainer has entered finishPass - if (!numPassFinishClients_) { - REGISTER_SLOW_NODES_PROBE( - *statSet_, - "SLOW_NODES", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - /// notify doOperation gradient ready gradientReadyBarrier_.wait(); - /// if wait pass finish does not start, do check - if (!numPassFinishClients_) { - CHECK_BARRIER_TIMER(*statSet_, - "SLOW_NODES", - FLAGS_num_gradient_servers, - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - - /// barrier performance while all parameter add is finished - /// can indicate the fluctation caused by computation at pserver. - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "paraReady", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } /// wait doOperation finish parameterReadyBarrier_.wait(); VLOG(1) << "start send back"; - { - /// total time except overhead of network. - REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecvNoSend", - timeToMicroSecond(*addGradBegin_), - -1, - *statSet_); - } } } @@ -543,57 +452,6 @@ bool ParameterServer2::asyncGrdientCommitCheckAndStat( return commitGradient; } -void ParameterServer2::printAsyncGradientCommitStatAndReset() { - std::stringstream statFormat; - if (asyncUpdateSteps_) { - statFormat << "async discard gradients stat: " << std::endl; - statFormat << "serverId: " << serverId_ - << " serverType: " << isSparseServer_ - << " total updates: " << asyncUpdateSteps_ - << " discard updates: " << asyncLaggedGradientsNum_ - << " discard ratio: " - << (real)asyncLaggedGradientsNum_ / (real)asyncUpdateSteps_; - statFormat << std::endl; - statFormat << std::endl; - - statFormat << "Async Gradient Update Steps distribution: " << std::endl - << "Sample: 1:1912(0.00284449) means " - << "the updates step=1 count 1912 times " - << "and account for 0.284449% of total updates" << std::endl; - size_t index = 0; - for (const auto& stat : asyncUpdateStat_) { - statFormat << index << ":" << stat << "(" - << (real)stat / (real)asyncUpdateSteps_ << ") "; - index++; - } - statFormat << std::endl; - statFormat << std::endl; - - statFormat << "Async Gradient Discard based on trainer_id: " << std::endl - << "Sample: 2:22(0.0016363) means " - << "total discarded updates from trainer_id=2 count 22 " - << "and account for 0.16363% of all updates from trainer_id=2" - << std::endl; - for (auto i = 0; i < FLAGS_num_gradient_servers; i++) { - real ratio = - (real)asyncTrainerDiscardStat_[i] / - (real)(asyncTrainerCommitStat_[i] + asyncTrainerDiscardStat_[i]); - statFormat << i << ":" << asyncTrainerDiscardStat_[i] << "(" << ratio - << ")" - << " "; - } - LOG(INFO) << statFormat.str(); - - /// reset stat - asyncUpdateSteps_ = 0; - asyncTrainerSteps_.assign(asyncTrainerSteps_.size(), 0); - asyncLaggedGradientsNum_ = 0; - asyncUpdateStat_.assign(asyncUpdateStat_.size(), 0); - asyncTrainerDiscardStat_.assign(asyncTrainerDiscardStat_.size(), 0); - asyncTrainerCommitStat_.assign(asyncTrainerCommitStat_.size(), 0); - } -} - static ThreadLocal> localBlockBitset_; void ParameterServer2::asyncSGD(const SendParameterRequest& request, @@ -695,7 +553,6 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request, if (request.trainer_id() == 0) { /// batchId_ is approximately equal to "real batchId_" batchId_++; - tuningAsyncsgdMidOutput(); } } @@ -881,34 +738,6 @@ void ParameterServer2::sendParameter(const SendParameterRequest& request, } (*requestVec_).clear(); (*callbackVec_).clear(); - - /// barrier perfromance while all data are send finished. - /// indicates network flucatuation for big message. - if (!numPassFinishClients_) { - REGISTER_BARRIER_TIMER_SERVER( - *statSet_, - "sendParamFinish", - FLAGS_num_gradient_servers, - request.trainer_id(), - isSparseServer_ ? "_sparseUpdater" : "_denseUpdater"); - } - /// all time exhausted in parameterServer for big message. - /// it contains network and computation at pserver. - { - /// total time including overhead of network. - REGISTER_TIMER_DYNAMIC_SET("sendParaTotal", - timeToMicroSecond(*handleRequestBegin_), - -1, - *statSet_); - } - /// all time exhausted in pserverServer except recieve network. - { - /// total time except overhead of network receive - REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecv", - timeToMicroSecond(*addGradBegin_), - -1, - *statSet_); - } } break; case PSERVER_UPDATE_MODE_SET_PARAM: @@ -1088,8 +917,6 @@ void ParameterServer2::op_SGD(const Operation& operation, } { - REGISTER_TIMER_DYNAMIC("op_SGD", -1, *statSet_); - parallelExecForEachBlock([&](int64_t blockId, const VectorPtr vecs[]) { BlockInfo& info = blockInfos_[blockId]; const ParameterConfig& config = getParameterConfig(blockId); @@ -1113,7 +940,6 @@ void ParameterServer2::op_SGD(const Operation& operation, } batchId_++; - tuningSgdMidOutput(); } void ParameterServer2::op_start_pass(const Operation& operation, @@ -1146,8 +972,6 @@ void ParameterServer2::op_finish_pass(const Operation& operation, /// finish pass info.optimizer->finishPass(); }); - - tuningSgdFinished(); batchId_ = 0; } @@ -1515,7 +1339,6 @@ void ParameterServer2::asyncFinishPass(const SynchronizeRequest& request, callback(SynchronizeResponse()); if (request.trainer_id() == 0) { - tuningAsyncsgdFinished(); batchId_ = 0; } } @@ -1574,42 +1397,4 @@ void ParameterServer2::releaseMatrix(const ReleaseMatrixRequest& request, callback(response); } -void ParameterServer2::tuningSgdMidOutput() { - if (batchId_ && batchId_ % FLAGS_log_period_server == 0) { - LOG(INFO) << "======== Batch=" << batchId_ << "======="; - statSet_->setThreadInfo(true); - statSet_->printAllStatus(); - /// not reset raw data for reducing the overhead of performance tuning - statSet_->reset(false); - } -} - -void ParameterServer2::tuningSgdFinished() { - LOG(INFO) << "======== Batch=" << batchId_ << " pass END" - << "======="; - statSet_->setThreadInfo(true); - statSet_->printAllStatus(); - /** - * reset raw data at end of pass since some raw data could be not - * complete. Otherwise the raw data will pollute next pass performance - * tuning - */ - statSet_->reset(); -} - -void ParameterServer2::tuningAsyncsgdMidOutput() { -#ifndef PADDLE_DISABLE_TIMER - if (batchId_ && batchId_ % FLAGS_log_period_server == 0) { - LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << "======="; - printAsyncGradientCommitStatAndReset(); - } -#endif -} - -void ParameterServer2::tuningAsyncsgdFinished() { - LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << " pass END" - << "======="; - printAsyncGradientCommitStatAndReset(); -} - } // namespace paddle diff --git a/paddle/pserver/ParameterServer2.h b/paddle/pserver/ParameterServer2.h index 0f5a5895907b20a0cf882b6fa6fb74bd52dce058..f7d3587b88c4ab1d4e37a259c622fc7c2d5532a3 100644 --- a/paddle/pserver/ParameterServer2.h +++ b/paddle/pserver/ParameterServer2.h @@ -298,24 +298,6 @@ protected: /// barrier performance tuning sync-sgd required std::atomic batchId_; - /// the beginning of addGradient without network overhead - ThreadLocal addGradBegin_; - - /** - * tuning barrier performance - * to better control log for sparse and dense parameter, - * we use different log entities for different parameterServer - * objects. - * it will output lots of performance stats to perceive the - * overhead of network, fluctuation of computation from - * forwardbackward and network, computation from optimization - * at pserver end, barrier overhead, etc. to understand tuning - * data, focus on the synchronization between addGradient and - * doOperation which indirectly call op_SGD operation controlled - * by remote updater controller - */ - std::unique_ptr statSet_; - public: struct Buffer { real* base; @@ -325,7 +307,6 @@ public: protected: /// async gradient commit control bool asyncGrdientCommitCheckAndStat(const SendParameterRequest& request); - void printAsyncGradientCommitStatAndReset(); public: /// disable default parameter for overloading @@ -710,36 +691,6 @@ public: void op_load(const Operation& operation, OperationResult* result); void op_save(const Operation& operation, OperationResult* result); - - /** - * @brief output log in at the middle stage of training - * - * @note flush log histroy and state at the end for sgd - */ - void tuningSgdMidOutput(); - - /** - * @brief output log in at the end stage of training - * - * @note flush log histroy and state at the end for sgd. it will also - * flush some stateful stat for next pass. - */ - void tuningSgdFinished(); - - /** - * @brief output log in at the middle stage of training - * - * @note flush log histroy and state at the end for async-sgd. - * it will log some performance log if some lagged node are found - */ - void tuningAsyncsgdMidOutput(); - - /** - * @brief output log in at the end stage of training - * - * @note flush log histroy and state at the end for async-sgd. - */ - void tuningAsyncsgdFinished(); }; } // namespace paddle diff --git a/paddle/utils/BarrierStat.cpp b/paddle/utils/BarrierStat.cpp deleted file mode 100644 index a6dbdcae3f32c894d35e8114488d4a3264c6c5f2..0000000000000000000000000000000000000000 --- a/paddle/utils/BarrierStat.cpp +++ /dev/null @@ -1,340 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/utils/BarrierStat.h" -#include -#include -#include -#include -#include "paddle/utils/Flags.h" -#include "paddle/utils/Stat.h" - -DEFINE_bool(log_barrier_abstract, - true, - "if true, show abstract of barrier performance"); -DEFINE_int32(log_barrier_lowest_nodes, - 5, - "how many lowest node will be logged"); -DEFINE_bool(log_barrier_show_log, - false, // for performance tuning insight - "if true, always show barrier abstract even with little gap"); - -namespace paddle { - -std::ostream &operator<<(std::ostream &output, const BarrierStatBase &stat) { - if (FLAGS_log_barrier_abstract) { - std::lock_guard guard(stat.lock_); - stat.showAbstract(output); - } - return output; -} - -BarrierStatBase::BarrierStatBase(uint16_t numConnThreads, - const std::string &name) - : totSamples_(0), numConnThreads_(numConnThreads), name_(name) { - abstract_.resize(numConnThreads_); - if (FLAGS_log_barrier_show_log) { - rateThreshold_ = 0.0; - } else { - /* probablity of abnormal node - * p = 1/n + (n/8)/(n+1), n = nodes, n > 1 - * if the freq of lowest trainerId larger than p, - * output FLAGS_log_barrier_lowest_nodes lastTrainerId. - * numConnThreads_ indicates nodes - */ - float n = (float)numConnThreads; - rateThreshold_ = 1.0 / n + (n / 8.0) / (n + 1.0); - } -} - -BarrierEndStat::BarrierEndStat(uint16_t numConnThreads, const std::string &name) - : BarrierStatBase(numConnThreads, name) { - timeVector_.reset(new TimeVectorEnd(numConnThreads_)); - reset(true); - LOG(INFO) << " create barrierEndStat: " << name - << " endBarrier warning rate: " << rateThreshold_; -} - -/* - * Note: - * the design different pserver entity owns different statSet to obey - * the background that different pserver runs separately. - */ -void BarrierEndStat::updateStat(struct timeval &cur, int32_t trainerId) { - CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier"; - - std::lock_guard guard(lock_); - timeVector_->addTimeval(cur, trainerId); - - if (timeVector_->full()) { - std::lock_guard abstractGuard(abstractLock_); - auto id = timeVector_->getLastTrainerId(); - auto delta = timeToMicroSecond(timeVector_->getDelta()); - auto secondDelta = timeToMicroSecond(timeVector_->get1NDelta()); - auto lastTwoDelta = timeToMicroSecond(timeVector_->getMinus1NDelta()); - auto midDelta = timeToMicroSecond(timeVector_->getMidNDelta()); - // discard first sample, since first sample probably is abnormal. - if (totSamples_) { - abstract_[id].freq++; - - if (delta < abstract_[id].minDelta) { - abstract_[id].minDelta = delta; - } - if (delta > abstract_[id].maxDelta) { - abstract_[id].maxDelta = delta; - } - abstract_[id].totDelta += delta; - abstract_[id].totSecondDelta += secondDelta; - abstract_[id].totLastTwoDelta += lastTwoDelta; - abstract_[id].totMidDelta += midDelta; - - // update totAbstract_ - totAbstract_.freq++; - if (delta < totAbstract_.minDelta) { - totAbstract_.minDelta = delta; - } - if (delta > totAbstract_.maxDelta) { - totAbstract_.maxDelta = delta; - } - totAbstract_.totDelta += delta; - totAbstract_.totSecondDelta += secondDelta; - totAbstract_.totLastTwoDelta += lastTwoDelta; - totAbstract_.totMidDelta += midDelta; - } - - totSamples_++; - timeVector_->reset(); - } -} - -void BarrierEndStat::reset(bool clearRawData) { - int32_t i = 0; - - totSamples_ = 0; - - std::lock_guard guard(abstractLock_); - - if (clearRawData) { - timeVector_->reset(); - } - - for (auto &abstract : abstract_) { - memset((void *)&abstract, 0, sizeof(abstract)); - abstract.minDelta = UINT64_MAX; - abstract.trainerId = i++; - } - memset((void *)&totAbstract_, 0, sizeof(Abstract)); - totAbstract_.minDelta = UINT64_MAX; -} - -void BarrierEndStat::showAbstract(std::ostream &output) const { - // do not support the case "<=2 pserver" - if (numConnThreads_ <= 2 || !totSamples_) { - return; - } - - // duplicate freq info - std::vector outputAbstract = abstract_; - std::sort(outputAbstract.begin(), - outputAbstract.end(), - [](const struct Abstract &a, const struct Abstract &b) { - return a.freq > b.freq; - }); - - auto rate = (float)outputAbstract[0].freq / (float)totSamples_; - if (rate < rateThreshold_) { - return; - } - - output << std::setw(20) << name_ << std::endl; - - /* - * Note: - * avgGap: the average delta between 1 -- n arriving trainers - * avgSecondGap: the average delta between 2 -- n arriving trainers - * avgLastTwoGap: the average delta between n-1 -- n arriving trainers - * avgMidGap: the average delta between n/2 -- n arriving trainers - * rato: samples / totSamples - * - * the stat is based on per trainer if trainer_id is set, totAbstract is - * stat based on all trainers scope. - */ - output << std::setw(42) << " " << std::setw(15) << "trainerId" - << std::setw(15) << "avgGap" << std::setw(15) << "avgSecondGap" - << std::setw(15) << "avgLastTwoGap" << std::setw(15) << "avgMidGap" - << std::setw(10) << "rate" << std::setw(10) << "samples" - << std::setw(10) << "totSamples" << std::endl; - // show totAbstract, it's valuable when lastTrainerId is even-distributed' - if (!totAbstract_.freq) return; - output << std::setw(42) << " " << std::setw(15) << "totAbstract" - << std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001 - << std::setw(15) - << (totAbstract_.totSecondDelta / totAbstract_.freq) * 0.001 - << std::setw(15) - << (totAbstract_.totLastTwoDelta / totAbstract_.freq) * 0.001 - << std::setw(15) - << (totAbstract_.totMidDelta / totAbstract_.freq) * 0.001 - << std::setw(10) << (float)totAbstract_.freq / (float)totSamples_ - << std::setw(10) << (float)totAbstract_.freq << std::setw(10) - << (float)totSamples_ << std::endl; - - // show lastTrainerId abstract - int count = 0; - for (auto &abstract : outputAbstract) { - if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) { - break; - } - // output format control - output << std::setw(42) << " " << std::setw(15) << abstract.trainerId - << std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001 - << std::setw(15) << (abstract.totSecondDelta / abstract.freq) * 0.001 - << std::setw(15) - << (abstract.totLastTwoDelta / abstract.freq) * 0.001 - << std::setw(15) << (abstract.totMidDelta / abstract.freq) * 0.001 - << std::setw(10) << (float)abstract.freq / (float)totSamples_ - << std::setw(10) << (float)abstract.freq << std::setw(10) - << (float)totSamples_ << std::endl; - } -} - -BarrierDeltaStat::BarrierDeltaStat(uint16_t numConnThreads, - const std::string &name) - : BarrierStatBase(numConnThreads, name) { - timeVector_.reset(new TimeVectorDelta(numConnThreads_)); - reset(true); - LOG(INFO) << " create barrierDeltaStat: " << name - << " barrierDelta warning rate: " << rateThreshold_; -} - -void BarrierDeltaStat::updateStat(uint64_t delta, int32_t trainerId) { - CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier"; - - std::lock_guard guard(lock_); - timeVector_->addTimeval(delta, trainerId); - - if (timeVector_->full()) { - std::lock_guard abstractGuard(abstractLock_); - auto id = timeVector_->getMaxTrainerId(); - auto delta = timeVector_->getDelta(); - // discard first sample, since first sample probably is abnormal. - if (totSamples_) { - abstract_[id].freq++; - - if (delta < abstract_[id].minDelta) { - abstract_[id].minDelta = delta; - } - if (delta > abstract_[id].maxDelta) { - abstract_[id].maxDelta = delta; - } - abstract_[id].totDelta += delta; - - // update totAbstract_ - totAbstract_.freq++; - if (delta < totAbstract_.minDelta) { - totAbstract_.minDelta = delta; - } - if (delta > totAbstract_.maxDelta) { - totAbstract_.maxDelta = delta; - } - totAbstract_.totDelta += delta; - } - - totSamples_++; - timeVector_->reset(); - } -} - -void BarrierDeltaStat::reset(bool clearRawData) { - int32_t i = 0; - - totSamples_ = 0; - - std::lock_guard guard(abstractLock_); - - if (clearRawData) { - timeVector_->reset(); - } - - for (auto &abstract : abstract_) { - memset((void *)&abstract, 0, sizeof(abstract)); - abstract.minDelta = UINT64_MAX; - abstract.trainerId = i++; - } - memset((void *)&totAbstract_, 0, sizeof(Abstract)); - totAbstract_.minDelta = UINT64_MAX; -} - -void BarrierDeltaStat::showAbstract(std::ostream &output) const { - // do not support the case "<=2 pserver" - if (numConnThreads_ <= 2 || !totSamples_) { - return; - } - - // duplicate freq info - std::vector outputAbstract = abstract_; - std::sort(outputAbstract.begin(), - outputAbstract.end(), - [](const struct Abstract &a, const struct Abstract &b) { - return a.freq > b.freq; - }); - - auto rate = (float)outputAbstract[0].freq / (float)totSamples_; - if (rate < rateThreshold_) { - return; - } - - output << std::setw(20) << name_ << std::endl; - - /* Note: - * Gap means the delta from all trainers' forwardbackward - * avgGap: average Gap in log_period batches - * minGap: min Gap in log_period batches - * maxGap: max Gap in log_period batches - * trainerId: the slowest trainer_id - * - * the stat is based on per trainer if trainer_id is set, totAbstract is - * stat based on all trainers scope. - */ - output << std::setw(42) << " " << std::setw(15) << "trainerId" - << std::setw(15) << "avgGap" << std::setw(10) << "minGap" - << std::setw(10) << "maxGap" << std::setw(10) << "rate" - << std::setw(10) << "samples" << std::setw(10) << "totSamples" - << std::endl; - // show totAbstract, it's valuable when lastTrainerId is even-distributed' - if (!totAbstract_.freq) return; - output << std::setw(42) << " " << std::setw(15) << "totAbstract" - << std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001 - << std::setw(10) << totAbstract_.minDelta * 0.001 << std::setw(10) - << totAbstract_.maxDelta * 0.001 << std::setw(10) - << (float)totAbstract_.freq / (float)totSamples_ << std::setw(10) - << (float)totAbstract_.freq << std::setw(10) << (float)totSamples_ - << std::endl; - - // show lastTrainerId abstract - int count = 0; - for (auto &abstract : outputAbstract) { - if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) { - break; - } - // output format control - output << std::setw(42) << " " << std::setw(15) << abstract.trainerId - << std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001 - << std::setw(10) << abstract.minDelta * 0.001 << std::setw(10) - << abstract.maxDelta * 0.001 << std::setw(10) - << (float)abstract.freq / (float)totSamples_ << std::setw(10) - << (float)abstract.freq << std::setw(10) << (float)totSamples_ - << std::endl; - } -} -} // namespace paddle diff --git a/paddle/utils/BarrierStat.h b/paddle/utils/BarrierStat.h deleted file mode 100644 index a9c925eff66838d58d540d7be5476e6207a30bec..0000000000000000000000000000000000000000 --- a/paddle/utils/BarrierStat.h +++ /dev/null @@ -1,425 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "Locks.h" -#include "Logging.h" -#include "ThreadLocal.h" - -namespace paddle { - -inline uint64_t timeToMicroSecond(struct timeval time) { - return time.tv_sec * 1000000LU + time.tv_usec; -} - -class TimeVectorEnd { - /* - * help class for gathering all barrier performance data - * which shows time point property. - * freqently used in barrier performance tuning API, such - * as tuning which is slowest node in sync-sgd mode training. - */ -public: - explicit TimeVectorEnd(uint16_t size) : size_(size) { - index_ = 0; - timeArray_.resize(size); - trainerIds_.resize(size); - } - ~TimeVectorEnd() {} - - uint16_t size() { return size_; } - - bool full() { return index_ == size_; } - - bool empty() { return index_ == 0; } - - void reset() { index_ = 0; } - - void addTimeval(struct timeval time, int32_t trainerId) { - timeArray_[index_] = time; - trainerIds_[index_] = trainerId; - index_++; - } - - struct timeval getDelta() const { - struct timeval delta; - CHECK_GT(size_, 1) << "not support with 1 pserver"; - timersub(&timeArray_[size_ - 1], &timeArray_[0], &delta); - return delta; - } - - /* 2, n delta */ - struct timeval get1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - struct timeval delta; - timersub(&timeArray_[size_ - 1], &timeArray_[1], &delta); - return delta; - } - - /* n-1, n delta */ - struct timeval getMinus1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - struct timeval delta; - timersub(&timeArray_[size_ - 1], &timeArray_[size_ - 2], &delta); - return delta; - } - - /* n/2, n delta */ - struct timeval getMidNDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - struct timeval delta; - timersub(&timeArray_[size_ - 1], &timeArray_[size_ / 2], &delta); - return delta; - } - - int32_t getLastTrainerId() const { return trainerIds_[index_ - 1]; } - -private: - uint16_t size_; - uint16_t index_; - std::vector timeArray_; - std::vector trainerIds_; -}; - -class TimeVectorDelta { - /* - * help class for gathering performance data which shows time - * delta property, such as tuning the time distribution of - * forwardBackward time from all cluster nodes. - */ -public: - explicit TimeVectorDelta(uint16_t size) - : size_(size), min_(UINT64_MAX), max_(0) { - index_ = 0; - timeArray_.resize(size); - } - ~TimeVectorDelta() {} - - uint16_t size() { return size_; } - - bool full() { return index_ == size_; } - - bool empty() { return index_ == 0; } - - void reset() { - index_ = 0; - min_ = UINT64_MAX; - max_ = 0; - } - - void addTimeval(uint64_t delta, int32_t trainerId) { - timeArray_[index_] = delta; - index_++; - if (delta < min_) { - min_ = delta; - } - if (delta > max_) { - max_ = delta; - maxTrainerId_ = trainerId; - } - } - - uint64_t getDelta() const { - CHECK_GT(size_, 1) << "not support with 1 pserver"; - return max_ - min_; - } - - /* 2, n delta */ - uint64_t get1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - LOG(FATAL) << "Not implemented"; - } - - /* n-1, n delta */ - uint64_t getMinus1NDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - LOG(FATAL) << "Not implemented"; - } - - /* n/2, n delta */ - uint64_t getMidNDelta() const { - CHECK_GT(size_, 2) << "not support with less than 2 pservers"; - LOG(FATAL) << "Not implemented"; - } - - int32_t getMaxTrainerId() const { return maxTrainerId_; } - -private: - uint16_t size_; - uint16_t index_; - std::vector timeArray_; - -private: - uint64_t min_; - uint64_t max_; - int32_t maxTrainerId_; -}; - -// total samples stats, us -struct Abstract { - // last trainerId for barrier end, maxDelta trainerId for barrier delta - int32_t trainerId; - uint64_t minDelta; - uint64_t maxDelta; - uint64_t totDelta; - // first one is probably itself, so discard it. - uint64_t totSecondDelta; - // to confirm if last node destroy barrier performance. - uint64_t totLastTwoDelta; - // n/2-n delta - uint64_t totMidDelta; - uint64_t freq; -}; - -// barrier performance tunning stats -class BarrierStatBase { -public: - BarrierStatBase(uint16_t numConnThreads, const std::string &name); - - virtual ~BarrierStatBase() {} - - // if called at pserver end, then trainId means trainer's id. - // by default trainer does not use trainerId, so set it to -1 - virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) = 0; - virtual void updateStat(uint64_t delta, int32_t trainerId = -1) = 0; - - const std::string &getName() { return name_; } - - virtual void reset(bool clearRawData = true) {} - // since the timeVector_ is not stateful, so it's not clear whether the - // the barrier delta is correct. if one timestamp was lost, the all data - // from barrier stat becomes rubbish. -_- - virtual bool checkPassBarrier() { - LOG(INFO) << "bug implementation found"; - return false; - } - -protected: - virtual void showAbstract(std::ostream &output) const {} - friend std::ostream &operator<<(std::ostream &output, - const BarrierStatBase &stat); - -protected: - mutable std::mutex lock_; - std::mutex abstractLock_; // see note on updaterStat - // each freqency for each barrier trainer - std::vector abstract_; - // it is valuable when do perf-tuining, if lastTrainerId acts uniform - // distribution - struct Abstract totAbstract_; - uint64_t totSamples_; - -protected: - uint16_t numConnThreads_; // total updates needed - float rateThreshold_; - std::string name_; -}; - -// the end-time of arriving real/forged barrier position -class BarrierEndStat : public BarrierStatBase { -public: - BarrierEndStat(uint16_t numConnThreads, const std::string &name); - ~BarrierEndStat() {} - - virtual void updateStat(struct timeval &cur, int32_t trainerId = -1); - virtual void updateStat(uint64_t delta, int32_t trainerId = -1) { - LOG(INFO) << "have no delta updateStat in BarrierEndStat"; - } - virtual void reset(bool clearRawData = true); - virtual bool checkPassBarrier() { return timeVector_->empty(); } - -protected: - /* - * LOG: - * readAllBlocks_denseUpdater - * trainerId avgGap avgSecondGap avgLastTwoGap avgMidGap rate - * 44 86.702 81.022 9.984 50.472 0.144737 - * 46 87.723 82.939 8.737 50.019 0.118421 - * 35 100.923 96.752 14.305 61.979 - * 0.0657895 - * log_barrier_abstract, log_barrier_lowest_nodes, log_barrier_threshold - * control details. - */ - virtual void showAbstract(std::ostream &output) const; - -private: - std::unique_ptr timeVector_; -}; - -// the delta-time from different trainers, -// eg, find the degree of imbalance of BP time at pserver end -// the entry value in timerVector_ is BP delta, do evaluation to BP delta. -class BarrierDeltaStat : public BarrierStatBase { -public: - BarrierDeltaStat(uint16_t numConnThreads, const std::string &name); - ~BarrierDeltaStat() {} - - virtual void updateStat(uint64_t delta, int32_t trainerId = -1); - virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) { - LOG(INFO) << "have no timeval updateStat in BarrierDeltaStat"; - } - - virtual void reset(bool clearRawData = true); - - virtual bool checkPassBarrier() { return timeVector_->empty(); } - -protected: - virtual void showAbstract(std::ostream &outPut) const; - -private: - // store delta time in uint64_t, eg BP time of all trainers - std::unique_ptr timeVector_; -}; - -// to distinguish different contexts for same parallel threads, and different -// threads with same code-sgement, just use tagName to tag the run-time -// position. -// in Sparse, sendParallel threads can not only run in the stage of push&pull -// with same thread group, but also run in the stage of pull&push with different -// thread group, tag will be used to distinguish different run-time barrier -// position. -// trainerId in REGISTER_BARRIER_TIMER_SERVER is used to retreive lowest trainer -// nodes. - -// end barrier -#define __REGISTER_BARRIER_TIMER_SERVER( \ - set, statName, numConnThreads, trainerId, ...) \ - do { \ - if (numConnThreads > 2) { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_END); \ - struct timeval cur; \ - gettimeofday(&cur, nullptr); \ - __stat->updateStat(cur, trainerId); \ - } \ - } while (0); - -// end barrier with user-defined timer -#define __REGISTER_BARRIER_TIMER_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) \ - do { \ - if (numConnThreads > 2) { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_END); \ - __stat->updateStat(cur, trainerId); \ - } \ - } while (0); - -// delta barrier -#define __REGISTER_BARRIER_DELTA_SERVER_SET( \ - set, statName, numConnThreads, trainerId, delta, ...) \ - do { \ - if (numConnThreads > 2) { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_DELTA); \ - __stat->updateStat(delta, trainerId); \ - } \ - } while (0); - -// check end barrier -#define __CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \ - do { \ - std::string internalName = \ - std::string(statName) + std::string(__VA_ARGS__); \ - BarrierStatPtr __stat = \ - (set).getStat(numConnThreads, internalName, BARRIER_END); \ - PCHECK(__stat->checkPassBarrier()) << internalName \ - << ": invalid barrier data"; \ - } while (0); - -/* - * Note: - * with sync-sgd algriothm in cluster mode, lots of synchronize action exsit at - * pserve end. these synchronizaton actions have impact on the efficiency of - * parameter exchange. the synchronizaton(barrier) GAP is composed of lots of - * factors, such as the forwardBackward variance, network fluncation. we try - * to have a quantitative analysis on these factor, so we design lots of barrier - * time to capture these performance. these barrier also can be placed at - * implict barrier position. - * - * example: - * in sync-sgd algorithm, each parameter server waits for all gradients from - * all trainers, thus, an explict barrier point exsit before doing optimization. - * the barrier timer located before the point can sense the barrier condition. - * - */ - -// try to capture which trainer is slowest node in sync-sgd at pserver. -#define REGISTER_SLOW_NODES_PROBE( \ - set, statName, numConnThreads, trainerId, ...) \ - __REGISTER_BARRIER_TIMER_SERVER( \ - (set), statName, numConnThreads, trainerId, __VA_ARGS__) -// try to check if all threads or trainers have passed barriers for data -// accuracy. -#define CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \ - __CHECK_BARRIER_TIMER((set), statName, numConnThreads, __VA_ARGS__) - -#ifdef PADDLE_DISABLE_TIMER - -#define REGISTER_BARRIER_TIMER_SERVER( \ - set, statName, numConnThreads, trainerId, ...) -#define REGISTER_BARRIER_TIMER_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) -#define REGISTER_BARRIER_DELTA_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) - -#else - -/* - * sensing barrier time distribution for all parallelization threads. - * it provides low API for slow node check(REGISTER_SLOW_NODES_PROBE) - */ -#define REGISTER_BARRIER_TIMER_SERVER( \ - set, statName, numConnThreads, trainerId, ...) \ - __REGISTER_BARRIER_TIMER_SERVER( \ - (set), statName, numConnThreads, trainerId, __VA_ARGS__) - -/* - * sensing barrier time distribution for all parallelization threads. - * but time point for barrier performance is set by user. - * eg, with this api, you can get implict barrier point such as the beginning - * time distribution - * for receiving data. - */ -#define REGISTER_BARRIER_TIMER_SERVER_SET( \ - set, statName, numConnThreads, trainerId, cur, ...) \ - __REGISTER_BARRIER_TIMER_SERVER_SET( \ - (set), statName, numConnThreads, trainerId, cur, __VA_ARGS__) - -// try to capture time delta from all trainers, such as forwardBackward time -// which implies -// computation fluctuation -#define REGISTER_BARRIER_DELTA_SERVER_SET( \ - set, statName, numConnThreads, trainerId, delta, ...) \ - __REGISTER_BARRIER_DELTA_SERVER_SET( \ - (set), statName, numConnThreads, trainerId, delta, __VA_ARGS__) - -#endif // DISABLE_TIMER -} // namespace paddle diff --git a/paddle/utils/Stat.cpp b/paddle/utils/Stat.cpp index c7194d3bf1271a8bf05379f78adfb18f0f64db29..ff1b1bf888f3915f14752cb89115f7c9ed98d67f 100644 --- a/paddle/utils/Stat.cpp +++ b/paddle/utils/Stat.cpp @@ -97,34 +97,6 @@ std::ostream& operator<<(std::ostream& outPut, const Stat& stat) { return outPut; } -BarrierStatPtr StatSet::getStat(uint16_t numConnThreads, - const std::string& name, - BarrierStatType bType) { - { - ReadLockGuard guard(lock_); - auto it = barrierStatSet_.find(name); - if (it != barrierStatSet_.end()) { - return it->second; - } - } - - std::lock_guard guard(lock_); - // test again with lock_guard - auto it = barrierStatSet_.find(name); - if (it != barrierStatSet_.end()) { - return it->second; - } - - BarrierStatPtr stat; - if (bType == BARRIER_END) { - stat = std::make_shared(numConnThreads, name); - } else if (bType == BARRIER_DELTA) { - stat = std::make_shared(numConnThreads, name); - } - auto ret = barrierStatSet_.insert(std::make_pair(name, stat)); - return ret.first->second; -} - void StatSet::printSegTimerStatus() { ReadLockGuard guard(lock_); LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') @@ -135,46 +107,20 @@ void StatSet::printSegTimerStatus() { } } -void StatSet::printBarrierTimerStatus() { - ReadLockGuard guard(lock_); - if (barrierStatSet_.empty()) { - return; - } - // control barrierAbstact in runtime, so enable compliation - LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') - << "======= BarrierStatSet status ======" << std::endl; - for (auto& stat : barrierStatSet_) { - LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') - << *(stat.second); - } -} - void StatSet::printAllStatus() { #ifndef PADDLE_DISABLE_TIMER printSegTimerStatus(); #endif - printBarrierTimerStatus(); LOG(INFO) << std::setiosflags(std::ios::left) << "--------------------------------------------------" << std::endl; } -void StatSet::printStatus(const std::string& name) { - ReadLockGuard guard(lock_); - auto iter = statSet_.find(name); - CHECK(iter != statSet_.end()) << name << " is not registed in " << name_; - LOG(INFO) << *(iter->second); -} - void StatSet::reset(bool clearRawData) { ReadLockGuard guard(lock_); for (auto& stat : statSet_) { stat.second->reset(); } - // reset barrierStat - for (auto& stat : barrierStatSet_) { - stat.second->reset(clearRawData); - } } void StatSet::setThreadInfo(const std::string& name, bool flag) { @@ -184,13 +130,6 @@ void StatSet::setThreadInfo(const std::string& name, bool flag) { iter->second->setThreadInfo(flag); } -void StatSet::deleteStat(const std::string& name) { - std::lock_guard guard(lock_); - auto iter = statSet_.find(name); - CHECK(iter != statSet_.end()) << name << " is not registed in " << name_; - statSet_.erase(iter); -} - StatInfo::~StatInfo() { if (stat_) { std::lock_guard guard(stat_->lock_); diff --git a/paddle/utils/Stat.h b/paddle/utils/Stat.h index d9cc6e413a7415d9f42508019d435992b93cf47f..79fd3b8cf043e62922dfd046754ee8ac261990c5 100644 --- a/paddle/utils/Stat.h +++ b/paddle/utils/Stat.h @@ -23,7 +23,6 @@ limitations under the License. */ #include #include -#include "BarrierStat.h" #include "Locks.h" #include "Logging.h" #include "ThreadLocal.h" @@ -60,12 +59,6 @@ public: class Stat; typedef std::shared_ptr StatPtr; -typedef std::shared_ptr BarrierStatPtr; - -enum BarrierStatType { - BARRIER_END = 0, - BARRIER_DELTA = 1, -}; class StatSet { public: @@ -74,11 +67,8 @@ public: // print to LOG(INFO) void printSegTimerStatus(); - void printBarrierTimerStatus(); void printAllStatus(); - void printStatus(const std::string& name); - StatPtr getStat(const std::string& name) { { ReadLockGuard guard(lock_); @@ -93,12 +83,6 @@ public: return ret.first->second; } - BarrierStatPtr getStat(uint16_t numConnThreads, - const std::string& name, - BarrierStatType bType); - - void deleteStat(const std::string& name); - // true for showing stats for each thread // false for showing stats aggragated over threads void setThreadInfo(const std::string& name, bool flag); @@ -120,7 +104,6 @@ public: private: std::unordered_map statSet_; - std::unordered_map barrierStatSet_; const std::string name_; RWLock lock_; };