提交 98378968 编写于 作者: Y Yu Yang 提交者: GitHub

Merge pull request #2740 from reyoung/feature/remove_buggy_barrier_stats

Remove buggy BarrierStat
......@@ -172,53 +172,3 @@ TEST_F(CommonTest, syncThreadPool) {
EXPECT_EQ((int)0, nums[i]);
}
}
TEST_F(CommonTest, barrierStat) {
const int threadNum = 10;
SyncThreadPool pool(threadNum);
#define TEST_BARRIER_RANDOM(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
struct timeval time; \
gettimeofday(&time, nullptr); \
uint64_t usec = timeToMicroSecond(time); \
std::srand(usec); \
auto value = std::rand() % 100000; \
usleep(value); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for (auto i = 0; i < 10; i++) {
TEST_BARRIER_RANDOM("synThreadBarrier1", threadNum);
TEST_BARRIER_RANDOM("synThreadBarrier2", threadNum);
}
globalStat.printAllStatus();
globalStat.reset();
for (auto i = 0; i < 10; i++) {
TEST_BARRIER_RANDOM("synThreadBarrier3", threadNum, "tag0");
TEST_BARRIER_RANDOM("synThreadBarrier4", threadNum, "tag1");
}
globalStat.printAllStatus();
globalStat.reset();
// use it to test accurate barrier gap
#define TEST_BARRIER(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
usleep(tid * 10000); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for (auto i = 0; i < 10; i++) {
TEST_BARRIER("synThreadBarrier3", threadNum, "tag0");
TEST_BARRIER("synThreadBarrier4", threadNum, "tag1");
}
globalStat.printAllStatus();
globalStat.reset();
}
......@@ -217,10 +217,6 @@ void ParameterServer2::setConfig(const SetConfigRequest& request,
SetConfigResponse response;
callback(response);
/// always defined, barrier slowest node function need it.
statSet_.reset(new StatSet("ParameterServer" +
str::to_string(static_cast<int>(serverId_))));
}
real bufferSum(const std::vector<ParameterServer2::Buffer>& buffers) {
......@@ -369,50 +365,7 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
std::vector<Buffer>* outputBuffers) {
VLOG(1) << "pserver: addGradient";
// forwardbackward delta from all trainers
// indicate the fluctuation caused by forwardbackward.
if (!numPassFinishClients_) {
REGISTER_BARRIER_DELTA_SERVER_SET(
*statSet_,
"forwardbackwardDelta",
FLAGS_num_gradient_servers,
request.trainer_id(),
request.forwardbackward_time(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
{
/// approximately pure network overhead
REGISTER_TIMER_DYNAMIC_SET(
"pushRecv", timeToMicroSecond(*handleRequestBegin_), -1, *statSet_);
}
#ifndef PADDLE_DISABLE_TIMER
gettimeofday(&(*addGradBegin_), nullptr);
#endif
/// barrier fluctuation caused by network and previous forwardbackward
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER_SET(
*statSet_,
"handleReqBegin",
FLAGS_num_gradient_servers,
request.trainer_id(),
(*handleRequestBegin_),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"addGradBegin",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
{
REGISTER_TIMER_DYNAMIC("addGradCore", -1, *statSet_);
ReadLockGuard guard(parameterMutex_);
int bufferIndex = 0;
for (const auto& block : request.blocks()) {
......@@ -444,15 +397,6 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
std::lock_guard<std::mutex> guard(*info.lock);
simd::addTo(gradientSumBuffer, gradientBuffer, size);
}
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"addGradCoreFinish",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
}
if (request.batch_status() == BATCH_FINISH ||
request.batch_status() == BATCH_START_AND_FINISH) {
......@@ -461,47 +405,12 @@ void ParameterServer2::addGradient(const SendParameterRequest& request,
VLOG(1) << "num samples: " << numSamplesProcessed_
<< ", new cost:" << cost_;
/// numPassFinishClients_ means some trainer has entered finishPass
if (!numPassFinishClients_) {
REGISTER_SLOW_NODES_PROBE(
*statSet_,
"SLOW_NODES",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// notify doOperation gradient ready
gradientReadyBarrier_.wait();
/// if wait pass finish does not start, do check
if (!numPassFinishClients_) {
CHECK_BARRIER_TIMER(*statSet_,
"SLOW_NODES",
FLAGS_num_gradient_servers,
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// barrier performance while all parameter add is finished
/// can indicate the fluctation caused by computation at pserver.
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"paraReady",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// wait doOperation finish
parameterReadyBarrier_.wait();
VLOG(1) << "start send back";
{
/// total time except overhead of network.
REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecvNoSend",
timeToMicroSecond(*addGradBegin_),
-1,
*statSet_);
}
}
}
......@@ -543,57 +452,6 @@ bool ParameterServer2::asyncGrdientCommitCheckAndStat(
return commitGradient;
}
void ParameterServer2::printAsyncGradientCommitStatAndReset() {
std::stringstream statFormat;
if (asyncUpdateSteps_) {
statFormat << "async discard gradients stat: " << std::endl;
statFormat << "serverId: " << serverId_
<< " serverType: " << isSparseServer_
<< " total updates: " << asyncUpdateSteps_
<< " discard updates: " << asyncLaggedGradientsNum_
<< " discard ratio: "
<< (real)asyncLaggedGradientsNum_ / (real)asyncUpdateSteps_;
statFormat << std::endl;
statFormat << std::endl;
statFormat << "Async Gradient Update Steps distribution: " << std::endl
<< "Sample: 1:1912(0.00284449) means "
<< "the updates step=1 count 1912 times "
<< "and account for 0.284449% of total updates" << std::endl;
size_t index = 0;
for (const auto& stat : asyncUpdateStat_) {
statFormat << index << ":" << stat << "("
<< (real)stat / (real)asyncUpdateSteps_ << ") ";
index++;
}
statFormat << std::endl;
statFormat << std::endl;
statFormat << "Async Gradient Discard based on trainer_id: " << std::endl
<< "Sample: 2:22(0.0016363) means "
<< "total discarded updates from trainer_id=2 count 22 "
<< "and account for 0.16363% of all updates from trainer_id=2"
<< std::endl;
for (auto i = 0; i < FLAGS_num_gradient_servers; i++) {
real ratio =
(real)asyncTrainerDiscardStat_[i] /
(real)(asyncTrainerCommitStat_[i] + asyncTrainerDiscardStat_[i]);
statFormat << i << ":" << asyncTrainerDiscardStat_[i] << "(" << ratio
<< ")"
<< " ";
}
LOG(INFO) << statFormat.str();
/// reset stat
asyncUpdateSteps_ = 0;
asyncTrainerSteps_.assign(asyncTrainerSteps_.size(), 0);
asyncLaggedGradientsNum_ = 0;
asyncUpdateStat_.assign(asyncUpdateStat_.size(), 0);
asyncTrainerDiscardStat_.assign(asyncTrainerDiscardStat_.size(), 0);
asyncTrainerCommitStat_.assign(asyncTrainerCommitStat_.size(), 0);
}
}
static ThreadLocal<std::vector<bool>> localBlockBitset_;
void ParameterServer2::asyncSGD(const SendParameterRequest& request,
......@@ -695,7 +553,6 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request,
if (request.trainer_id() == 0) {
/// batchId_ is approximately equal to "real batchId_"
batchId_++;
tuningAsyncsgdMidOutput();
}
}
......@@ -881,34 +738,6 @@ void ParameterServer2::sendParameter(const SendParameterRequest& request,
}
(*requestVec_).clear();
(*callbackVec_).clear();
/// barrier perfromance while all data are send finished.
/// indicates network flucatuation for big message.
if (!numPassFinishClients_) {
REGISTER_BARRIER_TIMER_SERVER(
*statSet_,
"sendParamFinish",
FLAGS_num_gradient_servers,
request.trainer_id(),
isSparseServer_ ? "_sparseUpdater" : "_denseUpdater");
}
/// all time exhausted in parameterServer for big message.
/// it contains network and computation at pserver.
{
/// total time including overhead of network.
REGISTER_TIMER_DYNAMIC_SET("sendParaTotal",
timeToMicroSecond(*handleRequestBegin_),
-1,
*statSet_);
}
/// all time exhausted in pserverServer except recieve network.
{
/// total time except overhead of network receive
REGISTER_TIMER_DYNAMIC_SET("sendParaNoRecv",
timeToMicroSecond(*addGradBegin_),
-1,
*statSet_);
}
}
break;
case PSERVER_UPDATE_MODE_SET_PARAM:
......@@ -1088,8 +917,6 @@ void ParameterServer2::op_SGD(const Operation& operation,
}
{
REGISTER_TIMER_DYNAMIC("op_SGD", -1, *statSet_);
parallelExecForEachBlock([&](int64_t blockId, const VectorPtr vecs[]) {
BlockInfo& info = blockInfos_[blockId];
const ParameterConfig& config = getParameterConfig(blockId);
......@@ -1113,7 +940,6 @@ void ParameterServer2::op_SGD(const Operation& operation,
}
batchId_++;
tuningSgdMidOutput();
}
void ParameterServer2::op_start_pass(const Operation& operation,
......@@ -1146,8 +972,6 @@ void ParameterServer2::op_finish_pass(const Operation& operation,
/// finish pass
info.optimizer->finishPass();
});
tuningSgdFinished();
batchId_ = 0;
}
......@@ -1515,7 +1339,6 @@ void ParameterServer2::asyncFinishPass(const SynchronizeRequest& request,
callback(SynchronizeResponse());
if (request.trainer_id() == 0) {
tuningAsyncsgdFinished();
batchId_ = 0;
}
}
......@@ -1574,42 +1397,4 @@ void ParameterServer2::releaseMatrix(const ReleaseMatrixRequest& request,
callback(response);
}
void ParameterServer2::tuningSgdMidOutput() {
if (batchId_ && batchId_ % FLAGS_log_period_server == 0) {
LOG(INFO) << "======== Batch=" << batchId_ << "=======";
statSet_->setThreadInfo(true);
statSet_->printAllStatus();
/// not reset raw data for reducing the overhead of performance tuning
statSet_->reset(false);
}
}
void ParameterServer2::tuningSgdFinished() {
LOG(INFO) << "======== Batch=" << batchId_ << " pass END"
<< "=======";
statSet_->setThreadInfo(true);
statSet_->printAllStatus();
/**
* reset raw data at end of pass since some raw data could be not
* complete. Otherwise the raw data will pollute next pass performance
* tuning
*/
statSet_->reset();
}
void ParameterServer2::tuningAsyncsgdMidOutput() {
#ifndef PADDLE_DISABLE_TIMER
if (batchId_ && batchId_ % FLAGS_log_period_server == 0) {
LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << "=======";
printAsyncGradientCommitStatAndReset();
}
#endif
}
void ParameterServer2::tuningAsyncsgdFinished() {
LOG(INFO) << "======== [not accurate] Batch=" << batchId_ << " pass END"
<< "=======";
printAsyncGradientCommitStatAndReset();
}
} // namespace paddle
......@@ -298,24 +298,6 @@ protected:
/// barrier performance tuning sync-sgd required
std::atomic<int64_t> batchId_;
/// the beginning of addGradient without network overhead
ThreadLocal<struct timeval> addGradBegin_;
/**
* tuning barrier performance
* to better control log for sparse and dense parameter,
* we use different log entities for different parameterServer
* objects.
* it will output lots of performance stats to perceive the
* overhead of network, fluctuation of computation from
* forwardbackward and network, computation from optimization
* at pserver end, barrier overhead, etc. to understand tuning
* data, focus on the synchronization between addGradient and
* doOperation which indirectly call op_SGD operation controlled
* by remote updater controller
*/
std::unique_ptr<StatSet> statSet_;
public:
struct Buffer {
real* base;
......@@ -325,7 +307,6 @@ public:
protected:
/// async gradient commit control
bool asyncGrdientCommitCheckAndStat(const SendParameterRequest& request);
void printAsyncGradientCommitStatAndReset();
public:
/// disable default parameter for overloading
......@@ -710,36 +691,6 @@ public:
void op_load(const Operation& operation, OperationResult* result);
void op_save(const Operation& operation, OperationResult* result);
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for sgd
*/
void tuningSgdMidOutput();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for sgd. it will also
* flush some stateful stat for next pass.
*/
void tuningSgdFinished();
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for async-sgd.
* it will log some performance log if some lagged node are found
*/
void tuningAsyncsgdMidOutput();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for async-sgd.
*/
void tuningAsyncsgdFinished();
};
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/utils/BarrierStat.h"
#include <string.h>
#include <sys/types.h>
#include <algorithm>
#include <iomanip>
#include "paddle/utils/Flags.h"
#include "paddle/utils/Stat.h"
DEFINE_bool(log_barrier_abstract,
true,
"if true, show abstract of barrier performance");
DEFINE_int32(log_barrier_lowest_nodes,
5,
"how many lowest node will be logged");
DEFINE_bool(log_barrier_show_log,
false, // for performance tuning insight
"if true, always show barrier abstract even with little gap");
namespace paddle {
std::ostream &operator<<(std::ostream &output, const BarrierStatBase &stat) {
if (FLAGS_log_barrier_abstract) {
std::lock_guard<std::mutex> guard(stat.lock_);
stat.showAbstract(output);
}
return output;
}
BarrierStatBase::BarrierStatBase(uint16_t numConnThreads,
const std::string &name)
: totSamples_(0), numConnThreads_(numConnThreads), name_(name) {
abstract_.resize(numConnThreads_);
if (FLAGS_log_barrier_show_log) {
rateThreshold_ = 0.0;
} else {
/* probablity of abnormal node
* p = 1/n + (n/8)/(n+1), n = nodes, n > 1
* if the freq of lowest trainerId larger than p,
* output FLAGS_log_barrier_lowest_nodes lastTrainerId.
* numConnThreads_ indicates nodes
*/
float n = (float)numConnThreads;
rateThreshold_ = 1.0 / n + (n / 8.0) / (n + 1.0);
}
}
BarrierEndStat::BarrierEndStat(uint16_t numConnThreads, const std::string &name)
: BarrierStatBase(numConnThreads, name) {
timeVector_.reset(new TimeVectorEnd(numConnThreads_));
reset(true);
LOG(INFO) << " create barrierEndStat: " << name
<< " endBarrier warning rate: " << rateThreshold_;
}
/*
* Note:
* the design different pserver entity owns different statSet to obey
* the background that different pserver runs separately.
*/
void BarrierEndStat::updateStat(struct timeval &cur, int32_t trainerId) {
CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier";
std::lock_guard<std::mutex> guard(lock_);
timeVector_->addTimeval(cur, trainerId);
if (timeVector_->full()) {
std::lock_guard<std::mutex> abstractGuard(abstractLock_);
auto id = timeVector_->getLastTrainerId();
auto delta = timeToMicroSecond(timeVector_->getDelta());
auto secondDelta = timeToMicroSecond(timeVector_->get1NDelta());
auto lastTwoDelta = timeToMicroSecond(timeVector_->getMinus1NDelta());
auto midDelta = timeToMicroSecond(timeVector_->getMidNDelta());
// discard first sample, since first sample probably is abnormal.
if (totSamples_) {
abstract_[id].freq++;
if (delta < abstract_[id].minDelta) {
abstract_[id].minDelta = delta;
}
if (delta > abstract_[id].maxDelta) {
abstract_[id].maxDelta = delta;
}
abstract_[id].totDelta += delta;
abstract_[id].totSecondDelta += secondDelta;
abstract_[id].totLastTwoDelta += lastTwoDelta;
abstract_[id].totMidDelta += midDelta;
// update totAbstract_
totAbstract_.freq++;
if (delta < totAbstract_.minDelta) {
totAbstract_.minDelta = delta;
}
if (delta > totAbstract_.maxDelta) {
totAbstract_.maxDelta = delta;
}
totAbstract_.totDelta += delta;
totAbstract_.totSecondDelta += secondDelta;
totAbstract_.totLastTwoDelta += lastTwoDelta;
totAbstract_.totMidDelta += midDelta;
}
totSamples_++;
timeVector_->reset();
}
}
void BarrierEndStat::reset(bool clearRawData) {
int32_t i = 0;
totSamples_ = 0;
std::lock_guard<std::mutex> guard(abstractLock_);
if (clearRawData) {
timeVector_->reset();
}
for (auto &abstract : abstract_) {
memset((void *)&abstract, 0, sizeof(abstract));
abstract.minDelta = UINT64_MAX;
abstract.trainerId = i++;
}
memset((void *)&totAbstract_, 0, sizeof(Abstract));
totAbstract_.minDelta = UINT64_MAX;
}
void BarrierEndStat::showAbstract(std::ostream &output) const {
// do not support the case "<=2 pserver"
if (numConnThreads_ <= 2 || !totSamples_) {
return;
}
// duplicate freq info
std::vector<struct Abstract> outputAbstract = abstract_;
std::sort(outputAbstract.begin(),
outputAbstract.end(),
[](const struct Abstract &a, const struct Abstract &b) {
return a.freq > b.freq;
});
auto rate = (float)outputAbstract[0].freq / (float)totSamples_;
if (rate < rateThreshold_) {
return;
}
output << std::setw(20) << name_ << std::endl;
/*
* Note:
* avgGap: the average delta between 1 -- n arriving trainers
* avgSecondGap: the average delta between 2 -- n arriving trainers
* avgLastTwoGap: the average delta between n-1 -- n arriving trainers
* avgMidGap: the average delta between n/2 -- n arriving trainers
* rato: samples / totSamples
*
* the stat is based on per trainer if trainer_id is set, totAbstract is
* stat based on all trainers scope.
*/
output << std::setw(42) << " " << std::setw(15) << "trainerId"
<< std::setw(15) << "avgGap" << std::setw(15) << "avgSecondGap"
<< std::setw(15) << "avgLastTwoGap" << std::setw(15) << "avgMidGap"
<< std::setw(10) << "rate" << std::setw(10) << "samples"
<< std::setw(10) << "totSamples" << std::endl;
// show totAbstract, it's valuable when lastTrainerId is even-distributed'
if (!totAbstract_.freq) return;
output << std::setw(42) << " " << std::setw(15) << "totAbstract"
<< std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001
<< std::setw(15)
<< (totAbstract_.totSecondDelta / totAbstract_.freq) * 0.001
<< std::setw(15)
<< (totAbstract_.totLastTwoDelta / totAbstract_.freq) * 0.001
<< std::setw(15)
<< (totAbstract_.totMidDelta / totAbstract_.freq) * 0.001
<< std::setw(10) << (float)totAbstract_.freq / (float)totSamples_
<< std::setw(10) << (float)totAbstract_.freq << std::setw(10)
<< (float)totSamples_ << std::endl;
// show lastTrainerId abstract
int count = 0;
for (auto &abstract : outputAbstract) {
if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) {
break;
}
// output format control
output << std::setw(42) << " " << std::setw(15) << abstract.trainerId
<< std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001
<< std::setw(15) << (abstract.totSecondDelta / abstract.freq) * 0.001
<< std::setw(15)
<< (abstract.totLastTwoDelta / abstract.freq) * 0.001
<< std::setw(15) << (abstract.totMidDelta / abstract.freq) * 0.001
<< std::setw(10) << (float)abstract.freq / (float)totSamples_
<< std::setw(10) << (float)abstract.freq << std::setw(10)
<< (float)totSamples_ << std::endl;
}
}
BarrierDeltaStat::BarrierDeltaStat(uint16_t numConnThreads,
const std::string &name)
: BarrierStatBase(numConnThreads, name) {
timeVector_.reset(new TimeVectorDelta(numConnThreads_));
reset(true);
LOG(INFO) << " create barrierDeltaStat: " << name
<< " barrierDelta warning rate: " << rateThreshold_;
}
void BarrierDeltaStat::updateStat(uint64_t delta, int32_t trainerId) {
CHECK_LT(trainerId, numConnThreads_) << "trainerId is invalid in barrier";
std::lock_guard<std::mutex> guard(lock_);
timeVector_->addTimeval(delta, trainerId);
if (timeVector_->full()) {
std::lock_guard<std::mutex> abstractGuard(abstractLock_);
auto id = timeVector_->getMaxTrainerId();
auto delta = timeVector_->getDelta();
// discard first sample, since first sample probably is abnormal.
if (totSamples_) {
abstract_[id].freq++;
if (delta < abstract_[id].minDelta) {
abstract_[id].minDelta = delta;
}
if (delta > abstract_[id].maxDelta) {
abstract_[id].maxDelta = delta;
}
abstract_[id].totDelta += delta;
// update totAbstract_
totAbstract_.freq++;
if (delta < totAbstract_.minDelta) {
totAbstract_.minDelta = delta;
}
if (delta > totAbstract_.maxDelta) {
totAbstract_.maxDelta = delta;
}
totAbstract_.totDelta += delta;
}
totSamples_++;
timeVector_->reset();
}
}
void BarrierDeltaStat::reset(bool clearRawData) {
int32_t i = 0;
totSamples_ = 0;
std::lock_guard<std::mutex> guard(abstractLock_);
if (clearRawData) {
timeVector_->reset();
}
for (auto &abstract : abstract_) {
memset((void *)&abstract, 0, sizeof(abstract));
abstract.minDelta = UINT64_MAX;
abstract.trainerId = i++;
}
memset((void *)&totAbstract_, 0, sizeof(Abstract));
totAbstract_.minDelta = UINT64_MAX;
}
void BarrierDeltaStat::showAbstract(std::ostream &output) const {
// do not support the case "<=2 pserver"
if (numConnThreads_ <= 2 || !totSamples_) {
return;
}
// duplicate freq info
std::vector<struct Abstract> outputAbstract = abstract_;
std::sort(outputAbstract.begin(),
outputAbstract.end(),
[](const struct Abstract &a, const struct Abstract &b) {
return a.freq > b.freq;
});
auto rate = (float)outputAbstract[0].freq / (float)totSamples_;
if (rate < rateThreshold_) {
return;
}
output << std::setw(20) << name_ << std::endl;
/* Note:
* Gap means the delta from all trainers' forwardbackward
* avgGap: average Gap in log_period batches
* minGap: min Gap in log_period batches
* maxGap: max Gap in log_period batches
* trainerId: the slowest trainer_id
*
* the stat is based on per trainer if trainer_id is set, totAbstract is
* stat based on all trainers scope.
*/
output << std::setw(42) << " " << std::setw(15) << "trainerId"
<< std::setw(15) << "avgGap" << std::setw(10) << "minGap"
<< std::setw(10) << "maxGap" << std::setw(10) << "rate"
<< std::setw(10) << "samples" << std::setw(10) << "totSamples"
<< std::endl;
// show totAbstract, it's valuable when lastTrainerId is even-distributed'
if (!totAbstract_.freq) return;
output << std::setw(42) << " " << std::setw(15) << "totAbstract"
<< std::setw(15) << (totAbstract_.totDelta / totAbstract_.freq) * 0.001
<< std::setw(10) << totAbstract_.minDelta * 0.001 << std::setw(10)
<< totAbstract_.maxDelta * 0.001 << std::setw(10)
<< (float)totAbstract_.freq / (float)totSamples_ << std::setw(10)
<< (float)totAbstract_.freq << std::setw(10) << (float)totSamples_
<< std::endl;
// show lastTrainerId abstract
int count = 0;
for (auto &abstract : outputAbstract) {
if (!abstract.freq || count++ >= FLAGS_log_barrier_lowest_nodes) {
break;
}
// output format control
output << std::setw(42) << " " << std::setw(15) << abstract.trainerId
<< std::setw(15) << (abstract.totDelta / abstract.freq) * 0.001
<< std::setw(10) << abstract.minDelta * 0.001 << std::setw(10)
<< abstract.maxDelta * 0.001 << std::setw(10)
<< (float)abstract.freq / (float)totSamples_ << std::setw(10)
<< (float)abstract.freq << std::setw(10) << (float)totSamples_
<< std::endl;
}
}
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <stdint.h>
#include <sys/time.h>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include "Locks.h"
#include "Logging.h"
#include "ThreadLocal.h"
namespace paddle {
inline uint64_t timeToMicroSecond(struct timeval time) {
return time.tv_sec * 1000000LU + time.tv_usec;
}
class TimeVectorEnd {
/*
* help class for gathering all barrier performance data
* which shows time point property.
* freqently used in barrier performance tuning API, such
* as tuning which is slowest node in sync-sgd mode training.
*/
public:
explicit TimeVectorEnd(uint16_t size) : size_(size) {
index_ = 0;
timeArray_.resize(size);
trainerIds_.resize(size);
}
~TimeVectorEnd() {}
uint16_t size() { return size_; }
bool full() { return index_ == size_; }
bool empty() { return index_ == 0; }
void reset() { index_ = 0; }
void addTimeval(struct timeval time, int32_t trainerId) {
timeArray_[index_] = time;
trainerIds_[index_] = trainerId;
index_++;
}
struct timeval getDelta() const {
struct timeval delta;
CHECK_GT(size_, 1) << "not support with 1 pserver";
timersub(&timeArray_[size_ - 1], &timeArray_[0], &delta);
return delta;
}
/* 2, n delta */
struct timeval get1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
struct timeval delta;
timersub(&timeArray_[size_ - 1], &timeArray_[1], &delta);
return delta;
}
/* n-1, n delta */
struct timeval getMinus1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
struct timeval delta;
timersub(&timeArray_[size_ - 1], &timeArray_[size_ - 2], &delta);
return delta;
}
/* n/2, n delta */
struct timeval getMidNDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
struct timeval delta;
timersub(&timeArray_[size_ - 1], &timeArray_[size_ / 2], &delta);
return delta;
}
int32_t getLastTrainerId() const { return trainerIds_[index_ - 1]; }
private:
uint16_t size_;
uint16_t index_;
std::vector<struct timeval> timeArray_;
std::vector<int32_t> trainerIds_;
};
class TimeVectorDelta {
/*
* help class for gathering performance data which shows time
* delta property, such as tuning the time distribution of
* forwardBackward time from all cluster nodes.
*/
public:
explicit TimeVectorDelta(uint16_t size)
: size_(size), min_(UINT64_MAX), max_(0) {
index_ = 0;
timeArray_.resize(size);
}
~TimeVectorDelta() {}
uint16_t size() { return size_; }
bool full() { return index_ == size_; }
bool empty() { return index_ == 0; }
void reset() {
index_ = 0;
min_ = UINT64_MAX;
max_ = 0;
}
void addTimeval(uint64_t delta, int32_t trainerId) {
timeArray_[index_] = delta;
index_++;
if (delta < min_) {
min_ = delta;
}
if (delta > max_) {
max_ = delta;
maxTrainerId_ = trainerId;
}
}
uint64_t getDelta() const {
CHECK_GT(size_, 1) << "not support with 1 pserver";
return max_ - min_;
}
/* 2, n delta */
uint64_t get1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
LOG(FATAL) << "Not implemented";
}
/* n-1, n delta */
uint64_t getMinus1NDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
LOG(FATAL) << "Not implemented";
}
/* n/2, n delta */
uint64_t getMidNDelta() const {
CHECK_GT(size_, 2) << "not support with less than 2 pservers";
LOG(FATAL) << "Not implemented";
}
int32_t getMaxTrainerId() const { return maxTrainerId_; }
private:
uint16_t size_;
uint16_t index_;
std::vector<uint64_t> timeArray_;
private:
uint64_t min_;
uint64_t max_;
int32_t maxTrainerId_;
};
// total samples stats, us
struct Abstract {
// last trainerId for barrier end, maxDelta trainerId for barrier delta
int32_t trainerId;
uint64_t minDelta;
uint64_t maxDelta;
uint64_t totDelta;
// first one is probably itself, so discard it.
uint64_t totSecondDelta;
// to confirm if last node destroy barrier performance.
uint64_t totLastTwoDelta;
// n/2-n delta
uint64_t totMidDelta;
uint64_t freq;
};
// barrier performance tunning stats
class BarrierStatBase {
public:
BarrierStatBase(uint16_t numConnThreads, const std::string &name);
virtual ~BarrierStatBase() {}
// if called at pserver end, then trainId means trainer's id.
// by default trainer does not use trainerId, so set it to -1
virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) = 0;
virtual void updateStat(uint64_t delta, int32_t trainerId = -1) = 0;
const std::string &getName() { return name_; }
virtual void reset(bool clearRawData = true) {}
// since the timeVector_ is not stateful, so it's not clear whether the
// the barrier delta is correct. if one timestamp was lost, the all data
// from barrier stat becomes rubbish. -_-
virtual bool checkPassBarrier() {
LOG(INFO) << "bug implementation found";
return false;
}
protected:
virtual void showAbstract(std::ostream &output) const {}
friend std::ostream &operator<<(std::ostream &output,
const BarrierStatBase &stat);
protected:
mutable std::mutex lock_;
std::mutex abstractLock_; // see note on updaterStat
// each freqency for each barrier trainer
std::vector<struct Abstract> abstract_;
// it is valuable when do perf-tuining, if lastTrainerId acts uniform
// distribution
struct Abstract totAbstract_;
uint64_t totSamples_;
protected:
uint16_t numConnThreads_; // total updates needed
float rateThreshold_;
std::string name_;
};
// the end-time of arriving real/forged barrier position
class BarrierEndStat : public BarrierStatBase {
public:
BarrierEndStat(uint16_t numConnThreads, const std::string &name);
~BarrierEndStat() {}
virtual void updateStat(struct timeval &cur, int32_t trainerId = -1);
virtual void updateStat(uint64_t delta, int32_t trainerId = -1) {
LOG(INFO) << "have no delta updateStat in BarrierEndStat";
}
virtual void reset(bool clearRawData = true);
virtual bool checkPassBarrier() { return timeVector_->empty(); }
protected:
/*
* LOG:
* readAllBlocks_denseUpdater
* trainerId avgGap avgSecondGap avgLastTwoGap avgMidGap rate
* 44 86.702 81.022 9.984 50.472 0.144737
* 46 87.723 82.939 8.737 50.019 0.118421
* 35 100.923 96.752 14.305 61.979
* 0.0657895
* log_barrier_abstract, log_barrier_lowest_nodes, log_barrier_threshold
* control details.
*/
virtual void showAbstract(std::ostream &output) const;
private:
std::unique_ptr<TimeVectorEnd> timeVector_;
};
// the delta-time from different trainers,
// eg, find the degree of imbalance of BP time at pserver end
// the entry value in timerVector_ is BP delta, do evaluation to BP delta.
class BarrierDeltaStat : public BarrierStatBase {
public:
BarrierDeltaStat(uint16_t numConnThreads, const std::string &name);
~BarrierDeltaStat() {}
virtual void updateStat(uint64_t delta, int32_t trainerId = -1);
virtual void updateStat(struct timeval &cur, int32_t trainerId = -1) {
LOG(INFO) << "have no timeval updateStat in BarrierDeltaStat";
}
virtual void reset(bool clearRawData = true);
virtual bool checkPassBarrier() { return timeVector_->empty(); }
protected:
virtual void showAbstract(std::ostream &outPut) const;
private:
// store delta time in uint64_t, eg BP time of all trainers
std::unique_ptr<TimeVectorDelta> timeVector_;
};
// to distinguish different contexts for same parallel threads, and different
// threads with same code-sgement, just use tagName to tag the run-time
// position.
// in Sparse, sendParallel threads can not only run in the stage of push&pull
// with same thread group, but also run in the stage of pull&push with different
// thread group, tag will be used to distinguish different run-time barrier
// position.
// trainerId in REGISTER_BARRIER_TIMER_SERVER is used to retreive lowest trainer
// nodes.
// end barrier
#define __REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
struct timeval cur; \
gettimeofday(&cur, nullptr); \
__stat->updateStat(cur, trainerId); \
} \
} while (0);
// end barrier with user-defined timer
#define __REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
__stat->updateStat(cur, trainerId); \
} \
} while (0);
// delta barrier
#define __REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, delta, ...) \
do { \
if (numConnThreads > 2) { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_DELTA); \
__stat->updateStat(delta, trainerId); \
} \
} while (0);
// check end barrier
#define __CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \
do { \
std::string internalName = \
std::string(statName) + std::string(__VA_ARGS__); \
BarrierStatPtr __stat = \
(set).getStat(numConnThreads, internalName, BARRIER_END); \
PCHECK(__stat->checkPassBarrier()) << internalName \
<< ": invalid barrier data"; \
} while (0);
/*
* Note:
* with sync-sgd algriothm in cluster mode, lots of synchronize action exsit at
* pserve end. these synchronizaton actions have impact on the efficiency of
* parameter exchange. the synchronizaton(barrier) GAP is composed of lots of
* factors, such as the forwardBackward variance, network fluncation. we try
* to have a quantitative analysis on these factor, so we design lots of barrier
* time to capture these performance. these barrier also can be placed at
* implict barrier position.
*
* example:
* in sync-sgd algorithm, each parameter server waits for all gradients from
* all trainers, thus, an explict barrier point exsit before doing optimization.
* the barrier timer located before the point can sense the barrier condition.
*
*/
// try to capture which trainer is slowest node in sync-sgd at pserver.
#define REGISTER_SLOW_NODES_PROBE( \
set, statName, numConnThreads, trainerId, ...) \
__REGISTER_BARRIER_TIMER_SERVER( \
(set), statName, numConnThreads, trainerId, __VA_ARGS__)
// try to check if all threads or trainers have passed barriers for data
// accuracy.
#define CHECK_BARRIER_TIMER(set, statName, numConnThreads, ...) \
__CHECK_BARRIER_TIMER((set), statName, numConnThreads, __VA_ARGS__)
#ifdef PADDLE_DISABLE_TIMER
#define REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...)
#define REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...)
#define REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...)
#else
/*
* sensing barrier time distribution for all parallelization threads.
* it provides low API for slow node check(REGISTER_SLOW_NODES_PROBE)
*/
#define REGISTER_BARRIER_TIMER_SERVER( \
set, statName, numConnThreads, trainerId, ...) \
__REGISTER_BARRIER_TIMER_SERVER( \
(set), statName, numConnThreads, trainerId, __VA_ARGS__)
/*
* sensing barrier time distribution for all parallelization threads.
* but time point for barrier performance is set by user.
* eg, with this api, you can get implict barrier point such as the beginning
* time distribution
* for receiving data.
*/
#define REGISTER_BARRIER_TIMER_SERVER_SET( \
set, statName, numConnThreads, trainerId, cur, ...) \
__REGISTER_BARRIER_TIMER_SERVER_SET( \
(set), statName, numConnThreads, trainerId, cur, __VA_ARGS__)
// try to capture time delta from all trainers, such as forwardBackward time
// which implies
// computation fluctuation
#define REGISTER_BARRIER_DELTA_SERVER_SET( \
set, statName, numConnThreads, trainerId, delta, ...) \
__REGISTER_BARRIER_DELTA_SERVER_SET( \
(set), statName, numConnThreads, trainerId, delta, __VA_ARGS__)
#endif // DISABLE_TIMER
} // namespace paddle
......@@ -97,34 +97,6 @@ std::ostream& operator<<(std::ostream& outPut, const Stat& stat) {
return outPut;
}
BarrierStatPtr StatSet::getStat(uint16_t numConnThreads,
const std::string& name,
BarrierStatType bType) {
{
ReadLockGuard guard(lock_);
auto it = barrierStatSet_.find(name);
if (it != barrierStatSet_.end()) {
return it->second;
}
}
std::lock_guard<RWLock> guard(lock_);
// test again with lock_guard
auto it = barrierStatSet_.find(name);
if (it != barrierStatSet_.end()) {
return it->second;
}
BarrierStatPtr stat;
if (bType == BARRIER_END) {
stat = std::make_shared<BarrierEndStat>(numConnThreads, name);
} else if (bType == BARRIER_DELTA) {
stat = std::make_shared<BarrierDeltaStat>(numConnThreads, name);
}
auto ret = barrierStatSet_.insert(std::make_pair(name, stat));
return ret.first->second;
}
void StatSet::printSegTimerStatus() {
ReadLockGuard guard(lock_);
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
......@@ -135,46 +107,20 @@ void StatSet::printSegTimerStatus() {
}
}
void StatSet::printBarrierTimerStatus() {
ReadLockGuard guard(lock_);
if (barrierStatSet_.empty()) {
return;
}
// control barrierAbstact in runtime, so enable compliation
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
<< "======= BarrierStatSet status ======" << std::endl;
for (auto& stat : barrierStatSet_) {
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
<< *(stat.second);
}
}
void StatSet::printAllStatus() {
#ifndef PADDLE_DISABLE_TIMER
printSegTimerStatus();
#endif
printBarrierTimerStatus();
LOG(INFO) << std::setiosflags(std::ios::left)
<< "--------------------------------------------------"
<< std::endl;
}
void StatSet::printStatus(const std::string& name) {
ReadLockGuard guard(lock_);
auto iter = statSet_.find(name);
CHECK(iter != statSet_.end()) << name << " is not registed in " << name_;
LOG(INFO) << *(iter->second);
}
void StatSet::reset(bool clearRawData) {
ReadLockGuard guard(lock_);
for (auto& stat : statSet_) {
stat.second->reset();
}
// reset barrierStat
for (auto& stat : barrierStatSet_) {
stat.second->reset(clearRawData);
}
}
void StatSet::setThreadInfo(const std::string& name, bool flag) {
......@@ -184,13 +130,6 @@ void StatSet::setThreadInfo(const std::string& name, bool flag) {
iter->second->setThreadInfo(flag);
}
void StatSet::deleteStat(const std::string& name) {
std::lock_guard<RWLock> guard(lock_);
auto iter = statSet_.find(name);
CHECK(iter != statSet_.end()) << name << " is not registed in " << name_;
statSet_.erase(iter);
}
StatInfo::~StatInfo() {
if (stat_) {
std::lock_guard<std::mutex> guard(stat_->lock_);
......
......@@ -23,7 +23,6 @@ limitations under the License. */
#include <string>
#include <unordered_map>
#include "BarrierStat.h"
#include "Locks.h"
#include "Logging.h"
#include "ThreadLocal.h"
......@@ -60,12 +59,6 @@ public:
class Stat;
typedef std::shared_ptr<Stat> StatPtr;
typedef std::shared_ptr<BarrierStatBase> BarrierStatPtr;
enum BarrierStatType {
BARRIER_END = 0,
BARRIER_DELTA = 1,
};
class StatSet {
public:
......@@ -74,11 +67,8 @@ public:
// print to LOG(INFO)
void printSegTimerStatus();
void printBarrierTimerStatus();
void printAllStatus();
void printStatus(const std::string& name);
StatPtr getStat(const std::string& name) {
{
ReadLockGuard guard(lock_);
......@@ -93,12 +83,6 @@ public:
return ret.first->second;
}
BarrierStatPtr getStat(uint16_t numConnThreads,
const std::string& name,
BarrierStatType bType);
void deleteStat(const std::string& name);
// true for showing stats for each thread
// false for showing stats aggragated over threads
void setThreadInfo(const std::string& name, bool flag);
......@@ -120,7 +104,6 @@ public:
private:
std::unordered_map<std::string, StatPtr> statSet_;
std::unordered_map<std::string, BarrierStatPtr> barrierStatSet_;
const std::string name_;
RWLock lock_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册