未验证 提交 d3011c75 编写于 作者: F Fan Zhang 提交者: GitHub

add retry in pull dense (#38927)

Co-authored-by: Nyaoxuefeng <yaoxuefeng@baidu.com>
上级 56943dd4
......@@ -424,6 +424,7 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
if (ret != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << ret << "]";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
#else
for (size_t index = 0; index < inputs->size(); ++index) {
......@@ -473,9 +474,36 @@ void FleetWrapper::PullDenseVarsSync(
paddle::ps::Region reg(w, tensor->numel());
regions.emplace_back(std::move(reg));
}
auto status =
pslib_ptr_->_worker_ptr->pull_dense(regions.data(), regions.size(), tid);
status.wait();
int32_t status = -1;
int32_t cnt = 0;
while (true) {
auto tt = pslib_ptr_->_worker_ptr->pull_dense(regions.data(),
regions.size(), tid);
bool flag = true;
tt.wait();
try {
status = tt.get();
} catch (const std::future_error& e) {
VLOG(0) << "Caught a future_error with code" << e.code()
<< ", Message:" << e.what();
}
if (status != 0) {
VLOG(0) << "fleet pull dense sync failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
flag = false;
cnt++;
}
if (cnt > 3) {
VLOG(0) << "fleet pull dense sync failed, retry 3 times";
exit(-1);
}
if (flag) {
break;
}
}
#endif
}
......@@ -924,6 +952,7 @@ void FleetWrapper::LoadModelOneTable(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "load model of table id: " << table_id
<< ", from path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::LoadModel does nothing when no pslib";
......@@ -939,6 +968,7 @@ void FleetWrapper::LoadWithWhitelist(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "load model of table id: " << table_id
<< ", from path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::LoadWhitelist does nothing when no pslib";
......@@ -987,6 +1017,7 @@ void FleetWrapper::SaveModelOneTable(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "save model of table id: " << table_id
<< ", to path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::SaveModelOneTable does nothing when no pslib";
......@@ -1004,6 +1035,7 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "save model (with prefix) of table id: " << table_id
<< ", to path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::SaveModelOneTablePrefix does nothing when no pslib";
......@@ -1102,6 +1134,11 @@ void FleetWrapper::ShrinkSparseTable(int table_id) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->shrink(table_id);
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Shrink Sparse Table failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::ShrinkSparseTable does nothing when no pslib";
#endif
......@@ -1111,6 +1148,10 @@ void FleetWrapper::ClearModel() {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->clear();
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Clear Model failed";
}
#else
VLOG(0) << "FleetWrapper::ClearModel does nothing when no pslib";
#endif
......@@ -1120,6 +1161,10 @@ void FleetWrapper::ClearOneTable(const uint64_t table_id) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->clear(table_id);
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Clear One Table failed table_id: " << table_id;
}
#else
VLOG(0) << "FleetWrapper::ClearOneTable does nothing when no pslib";
#endif
......@@ -1179,6 +1224,10 @@ void FleetWrapper::ClientFlush() {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->flush();
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Client Flush failed";
}
#else
VLOG(0) << "FleetWrapper::ServerFlush does nothing when no pslib";
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册