diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index e20ffd06ef82150193fe979ca99973285e14a585..8d8d513367803b51dd1610393497231c13da3c9a 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -343,16 +343,36 @@ void FleetWrapper::PullSparseVarsSync( for (auto& t : *fea_values) { pull_result_ptr.push_back(t.data()); } - auto status = pslib_ptr_->_worker_ptr->pull_sparse( - pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size()); - pull_sparse_status.push_back(std::move(status)); - for (auto& t : pull_sparse_status) { - t.wait(); - auto status = t.get(); - if (status != 0) { - LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]"; - sleep(sleep_seconds_before_fail_exit_); - exit(-1); + + int32_t cnt = 0; + while (true) { + pull_sparse_status.clear(); + auto status = pslib_ptr_->_worker_ptr->pull_sparse( + pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size()); + pull_sparse_status.push_back(std::move(status)); + bool flag = true; + for (auto& t : pull_sparse_status) { + t.wait(); + int32_t status = -1; + try { + status = t.get(); + } catch (const std::future_error& e) { + VLOG(0) << "Caught a future_error with code" << e.code() + << ", Message:" << e.what(); + } + if (status != 0) { + VLOG(0) << "fleet pull sparse failed, status[" << status << "]"; + sleep(sleep_seconds_before_fail_exit_); + flag = false; + cnt++; + } + if (cnt > 3) { + VLOG(0) << "fleet pull sparse failed, retry 3 times"; + exit(-1); + } + } + if (flag) { + break; } } #endif