From d3011c751d44d6718bc9c94fb7ef52f38cb5e635 Mon Sep 17 00:00:00 2001 From: Fan Zhang Date: Mon, 17 Jan 2022 16:56:15 +0800 Subject: [PATCH] add retry in pull dense (#38927) Co-authored-by: yaoxuefeng --- paddle/fluid/framework/fleet/fleet_wrapper.cc | 55 ++++++++++++++++++- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 8d8d5133678..c56fb54cb40 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -424,6 +424,7 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim, if (ret != 0) { LOG(ERROR) << "fleet pull sparse failed, status[" << ret << "]"; sleep(sleep_seconds_before_fail_exit_); + exit(-1); } #else for (size_t index = 0; index < inputs->size(); ++index) { @@ -473,9 +474,36 @@ void FleetWrapper::PullDenseVarsSync( paddle::ps::Region reg(w, tensor->numel()); regions.emplace_back(std::move(reg)); } - auto status = - pslib_ptr_->_worker_ptr->pull_dense(regions.data(), regions.size(), tid); - status.wait(); + int32_t status = -1; + int32_t cnt = 0; + while (true) { + auto tt = pslib_ptr_->_worker_ptr->pull_dense(regions.data(), + regions.size(), tid); + bool flag = true; + + tt.wait(); + + try { + status = tt.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull dense sync failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull dense sync failed, retry 3 times"; + exit(-1); + } + + if (flag) { + break; + } + } #endif } @@ -924,6 +952,7 @@ void FleetWrapper::LoadModelOneTable(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "load model of table id: " << table_id << ", from path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::LoadModel does nothing when no pslib"; @@ -939,6 +968,7 @@ void FleetWrapper::LoadWithWhitelist(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "load model of table id: " << table_id << ", from path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::LoadWhitelist does nothing when no pslib"; @@ -987,6 +1017,7 @@ void FleetWrapper::SaveModelOneTable(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "save model of table id: " << table_id << ", to path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::SaveModelOneTable does nothing when no pslib"; @@ -1004,6 +1035,7 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "save model (with prefix) of table id: " << table_id << ", to path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::SaveModelOneTablePrefix does nothing when no pslib"; @@ -1102,6 +1134,11 @@ void FleetWrapper::ShrinkSparseTable(int table_id) { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->shrink(table_id); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Shrink Sparse Table failed"; + exit(-1); + } #else VLOG(0) << "FleetWrapper::ShrinkSparseTable does nothing when no pslib"; #endif @@ -1111,6 +1148,10 @@ void FleetWrapper::ClearModel() { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->clear(); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Clear Model failed"; + } #else VLOG(0) << "FleetWrapper::ClearModel does nothing when no pslib"; #endif @@ -1120,6 +1161,10 @@ void FleetWrapper::ClearOneTable(const uint64_t table_id) { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->clear(table_id); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Clear One Table failed table_id: " << table_id; + } #else VLOG(0) << "FleetWrapper::ClearOneTable does nothing when no pslib"; #endif @@ -1179,6 +1224,10 @@ void FleetWrapper::ClientFlush() { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->flush(); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Client Flush failed"; + } #else VLOG(0) << "FleetWrapper::ServerFlush does nothing when no pslib"; #endif -- GitLab