未验证 提交 0a7cb901 编写于 作者: Y yaoxuefeng 提交者: GitHub

add retry on pull dense sync (#38793)

上级 1f8fe035
......@@ -632,6 +632,7 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
if (ret != 0) {
LOG(ERROR) << "fleet pull sparse failed, status[" << ret << "]";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
#else
for (size_t index = 0; index < inputs->size(); ++index) {
......@@ -685,9 +686,36 @@ void FleetWrapper::PullDenseVarsSync(
paddle::ps::Region reg(w, tensor->numel());
regions.emplace_back(std::move(reg));
}
auto status =
pslib_ptr_->_worker_ptr->pull_dense(regions.data(), regions.size(), tid);
status.wait();
int32_t status = -1;
int32_t cnt = 0;
while (true) {
auto tt = pslib_ptr_->_worker_ptr->pull_dense(regions.data(),
regions.size(), tid);
bool flag = true;
tt.wait();
try {
status = tt.get();
} catch (const std::future_error& e) {
VLOG(0) << "Caught a future_error with code" << e.code()
<< ", Message:" << e.what();
}
if (status != 0) {
VLOG(0) << "fleet pull dense sync failed, status[" << status << "]";
sleep(sleep_seconds_before_fail_exit_);
flag = false;
cnt++;
}
if (cnt > 3) {
VLOG(0) << "fleet pull dense sync failed, retry 3 times";
exit(-1);
}
if (flag) {
break;
}
}
#endif
}
......@@ -1248,6 +1276,7 @@ void FleetWrapper::LoadModelOneTable(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "load model of table id: " << table_id
<< ", from path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::LoadModel does nothing when no pslib";
......@@ -1263,6 +1292,7 @@ void FleetWrapper::LoadWithWhitelist(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "load model of table id: " << table_id
<< ", from path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::LoadWhitelist does nothing when no pslib";
......@@ -1311,6 +1341,7 @@ void FleetWrapper::SaveModelOneTable(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "save model of table id: " << table_id
<< ", to path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::SaveModelOneTable does nothing when no pslib";
......@@ -1328,6 +1359,7 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id,
if (ret.get() != 0) {
LOG(ERROR) << "save model (with prefix) of table id: " << table_id
<< ", to path: " << path << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::SaveModelOneTablePrefix does nothing when no pslib";
......@@ -1351,6 +1383,7 @@ void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) {
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "setdate : " << date << " failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::SetDate does nothing when no pslib-gpu";
......@@ -1463,6 +1496,11 @@ void FleetWrapper::ShrinkSparseTable(int table_id) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->shrink(table_id);
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Shrink Sparse Table failed";
exit(-1);
}
#else
VLOG(0) << "FleetWrapper::ShrinkSparseTable does nothing when no pslib";
#endif
......@@ -1472,6 +1510,10 @@ void FleetWrapper::ClearModel() {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->clear();
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Clear Model failed";
}
#else
VLOG(0) << "FleetWrapper::ClearModel does nothing when no pslib";
#endif
......@@ -1481,6 +1523,10 @@ void FleetWrapper::ClearOneTable(const uint64_t table_id) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->clear(table_id);
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Clear One Table failed table_id: " << table_id;
}
#else
VLOG(0) << "FleetWrapper::ClearOneTable does nothing when no pslib";
#endif
......@@ -1541,6 +1587,10 @@ void FleetWrapper::ClientFlush() {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->flush();
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "Client Flush failed";
}
#else
VLOG(0) << "FleetWrapper::ServerFlush does nothing when no pslib";
#endif
......
......@@ -235,7 +235,6 @@ class HeterContext {
}
VLOG(3) << "heter_context unique keys with dynamic mf dimention";
}
for (std::thread& t : threads) {
t.join();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册