From 0a7cb901b432395154eb3595f7928c9142f268ac Mon Sep 17 00:00:00 2001 From: yaoxuefeng Date: Mon, 10 Jan 2022 23:15:09 +0800 Subject: [PATCH] add retry on pull dense sync (#38793) --- paddle/fluid/framework/fleet/fleet_wrapper.cc | 56 ++++++++++++++++++- paddle/fluid/framework/fleet/heter_context.h | 1 - 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 225c2656fb..f900275563 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -632,6 +632,7 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim, if (ret != 0) { LOG(ERROR) << "fleet pull sparse failed, status[" << ret << "]"; sleep(sleep_seconds_before_fail_exit_); + exit(-1); } #else for (size_t index = 0; index < inputs->size(); ++index) { @@ -685,9 +686,36 @@ void FleetWrapper::PullDenseVarsSync( paddle::ps::Region reg(w, tensor->numel()); regions.emplace_back(std::move(reg)); } - auto status = - pslib_ptr_->_worker_ptr->pull_dense(regions.data(), regions.size(), tid); - status.wait(); + int32_t status = -1; + int32_t cnt = 0; + while (true) { + auto tt = pslib_ptr_->_worker_ptr->pull_dense(regions.data(), + regions.size(), tid); + bool flag = true; + + tt.wait(); + + try { + status = tt.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull dense sync failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull dense sync failed, retry 3 times"; + exit(-1); + } + + if (flag) { + break; + } + } #endif } @@ -1248,6 +1276,7 @@ void FleetWrapper::LoadModelOneTable(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "load model of table id: " << table_id << ", from path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::LoadModel does nothing when no pslib"; @@ -1263,6 +1292,7 @@ void FleetWrapper::LoadWithWhitelist(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "load model of table id: " << table_id << ", from path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::LoadWhitelist does nothing when no pslib"; @@ -1311,6 +1341,7 @@ void FleetWrapper::SaveModelOneTable(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "save model of table id: " << table_id << ", to path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::SaveModelOneTable does nothing when no pslib"; @@ -1328,6 +1359,7 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id, if (ret.get() != 0) { LOG(ERROR) << "save model (with prefix) of table id: " << table_id << ", to path: " << path << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::SaveModelOneTablePrefix does nothing when no pslib"; @@ -1351,6 +1383,7 @@ void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) { ret.wait(); if (ret.get() != 0) { LOG(ERROR) << "setdate : " << date << " failed"; + exit(-1); } #else VLOG(0) << "FleetWrapper::SetDate does nothing when no pslib-gpu"; @@ -1463,6 +1496,11 @@ void FleetWrapper::ShrinkSparseTable(int table_id) { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->shrink(table_id); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Shrink Sparse Table failed"; + exit(-1); + } #else VLOG(0) << "FleetWrapper::ShrinkSparseTable does nothing when no pslib"; #endif @@ -1472,6 +1510,10 @@ void FleetWrapper::ClearModel() { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->clear(); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Clear Model failed"; + } #else VLOG(0) << "FleetWrapper::ClearModel does nothing when no pslib"; #endif @@ -1481,6 +1523,10 @@ void FleetWrapper::ClearOneTable(const uint64_t table_id) { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->clear(table_id); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Clear One Table failed table_id: " << table_id; + } #else VLOG(0) << "FleetWrapper::ClearOneTable does nothing when no pslib"; #endif @@ -1541,6 +1587,10 @@ void FleetWrapper::ClientFlush() { #ifdef PADDLE_WITH_PSLIB auto ret = pslib_ptr_->_worker_ptr->flush(); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Client Flush failed"; + } #else VLOG(0) << "FleetWrapper::ServerFlush does nothing when no pslib"; #endif diff --git a/paddle/fluid/framework/fleet/heter_context.h b/paddle/fluid/framework/fleet/heter_context.h index 45f9b04383..3e8b0cfbc3 100644 --- a/paddle/fluid/framework/fleet/heter_context.h +++ b/paddle/fluid/framework/fleet/heter_context.h @@ -235,7 +235,6 @@ class HeterContext { } VLOG(3) << "heter_context unique keys with dynamic mf dimention"; } - for (std::thread& t : threads) { t.join(); } -- GitLab