diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 953618560913229cd1e47659ad61e621efc10ed1..c774eaf4c8ba82c20693964cb4036d783a7c407c 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -81,6 +81,8 @@ Scope& Scope::NewScope() const { return *child; } +Scope* Scope::NewTmpScope() const { return new Scope(this); } + Variable* Scope::Var(const std::string& name) { SCOPE_VARS_WRITER_LOCK return VarInternal(name); diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index f0915d2eee072b0bcd53f37dad5ef9d801c87172..0e9b8edeb3e5f1afdfef83520e5d602b9b7bad43 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -55,6 +55,8 @@ class Scope { /// Mark it to const because that new kid scope cannot change parent scope. Scope& NewScope() const; + Scope* NewTmpScope() const; + /// Create a variable with given name if it doesn't exist. /// Caller doesn't own the returned Variable. Variable* Var(const std::string& name); diff --git a/paddle/fluid/operators/distributed/grpc/grpc_server.cc b/paddle/fluid/operators/distributed/grpc/grpc_server.cc index 08f777e279e34da0c0ac89afd3f660fa089599fe..8bc8d5772f9a8077978d24d91e8f564582f3deae 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_server.cc @@ -107,6 +107,9 @@ class RequestSend final : public RequestBase { int trainer_id = request_->GetTrainerId(); framework::Variable* outvar = nullptr; + if (!request_handler_->sync_mode()) { + request_->ReleaseOwnershipOfLocalScope(); + } request_handler_->Handle(varname, scope, invar, &outvar, trainer_id); Finish(reply_, &responder_); } diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index c63d65348880ebb4085d83059d9fead6456216d7..9dfbc80870a940eec74a5936066d51ca858b0a7a 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -180,7 +180,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, const std::vector& height_sections, const framework::ExecutionContext& context, const framework::Scope& scope) { - auto& local_scope = scope.NewScope(); + framework::Scope* local_scope = scope.NewTmpScope(); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& cpu_ctx = *pool.Get(platform::CPUPlace()); @@ -224,22 +224,22 @@ void prefetch(const std::string& id_name, const std::string& out_name, #endif } - auto splited_ids = SplitIds(ids_vector, height_sections, &local_scope); + auto splited_ids = SplitIds(ids_vector, height_sections, local_scope); SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids, - &local_scope); + local_scope); // create output var in local scope for (auto& name : out_var_names) { - local_scope.Var(name)->GetMutable(); + local_scope->Var(name)->GetMutable(); } std::vector rets; for (size_t i = 0; i < in_var_names.size(); i++) { - if (NeedSend(local_scope, in_var_names[i])) { + if (NeedSend(*local_scope, in_var_names[i])) { VLOG(3) << "sending " << in_var_names[i] << " to " << epmap[i] << " to get " << out_var_names[i] << " back"; rets.push_back(rpc_client->AsyncPrefetchVar( - epmap[i], cpu_ctx, local_scope, in_var_names[i], out_var_names[i], + epmap[i], cpu_ctx, *local_scope, in_var_names[i], out_var_names[i], table_names[i])); } else { VLOG(3) << "don't send no-initialied variable: " << out_var_names[i]; @@ -252,8 +252,8 @@ void prefetch(const std::string& id_name, const std::string& out_name, MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name, out_var_names, height_sections, splited_ids, - context, &local_scope, &actual_ctx); - scope.DeleteScope(&local_scope); + context, local_scope, &actual_ctx); + delete local_scope; } }; // namespace distributed diff --git a/paddle/fluid/operators/distributed/request_handler.h b/paddle/fluid/operators/distributed/request_handler.h index 62b24f150b41efead24c8bdbe08c9b44e160445a..f58c2bc38076a1bb3ca7642664c386a4eeb3422e 100644 --- a/paddle/fluid/operators/distributed/request_handler.h +++ b/paddle/fluid/operators/distributed/request_handler.h @@ -58,13 +58,15 @@ class VarHandle { VarHandle(const std::string ep, const std::string& method, const std::string& name, const platform::DeviceContext* p_ctx = nullptr, - const framework::Scope* p_scope = nullptr) + const framework::Scope* p_scope = nullptr, + bool delete_local_scope = false) : status_(kDefaultState) { ep_ = ep; ctx_ = p_ctx; scope_ = p_scope; name_ = name; method_ = method; + delete_local_scope_ = delete_local_scope; } virtual ~VarHandle() {} @@ -86,6 +88,7 @@ class VarHandle { std::unique_lock lk(sync_mutex_); status_ = ok ? kFinishState : kErrorState; } + if (delete_local_scope_ && scope_) delete scope_; VLOG(7) << "VarHandle finish:" << ok; wait_cond_.notify_all(); } @@ -112,6 +115,7 @@ class VarHandle { std::string name_; // RPC method name. std::string method_; + bool delete_local_scope_; protected: std::mutex sync_mutex_; diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index 9722f8c96e91d2dfbe929dcc11645a40c44afb4e..1625e55d5ad69a135ca96689c4d9df2ffa0d943d 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -53,13 +53,9 @@ bool RequestSendHandler::Handle(const std::string& varname, // Async if (!sync_mode_) { VLOG(3) << "async process var: " << varname; - try { - executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), - scope); - } catch (std::exception& e) { - LOG(ERROR) << "async: run sub program error " << e.what(); - return false; - } + executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), + scope); + delete scope; return true; } else { // sync rpc_server_->WaitCond(kRequestSend); diff --git a/paddle/fluid/operators/distributed/variable_response.h b/paddle/fluid/operators/distributed/variable_response.h index 294cae5f44a4701c064c3669af7b4138f68659e6..3ecb6960690cbd4679af9ad310f27a878ddb7a23 100644 --- a/paddle/fluid/operators/distributed/variable_response.h +++ b/paddle/fluid/operators/distributed/variable_response.h @@ -60,14 +60,12 @@ class VariableResponse { bool create_scope = false) : scope_(scope), dev_ctx_(dev_ctx), create_scope_(create_scope) { if (create_scope) { - local_scope_ = &scope->NewScope(); + local_scope_ = scope->NewTmpScope(); } } virtual ~VariableResponse() { - if (create_scope_) { - scope_->DeleteScope(local_scope_); - } + if (local_scope_) delete local_scope_; } int Parse(Source* source, const sendrecv::VariableMessage& meta) { @@ -86,6 +84,12 @@ class VariableResponse { inline std::string Varname() const { return meta_.varname(); } inline std::string OutVarname() const { return meta_.out_varname(); } inline std::string TableName() const { return meta_.table_name(); } + inline void ReleaseOwnershipOfLocalScope() { + PADDLE_ENFORCE(create_scope_, + "only when create_scope_ is true can you release the " + "ownership of local scope"); + local_scope_ = nullptr; + } // should call parse first. framework::Variable* GetVar() {