From d4cf56666ed2a993b5570fd193a4fc6acf8400b6 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 20 Apr 2022 10:43:39 +0800 Subject: [PATCH] [new-exec] clear the scope listener after run (#41947) * clear the listener after run * only sync variables in program * refine code * fit for lod_tensor_blocking_queue --- .../framework/new_executor/interpretercore.cc | 12 +++++++++ .../new_executor/interpretercore_util.cc | 2 ++ .../new_executor/new_executor_defs.cc | 22 ++++++++++++++++ .../new_executor/new_executor_defs.h | 4 +++ .../new_executor/standalone_executor.cc | 26 ++++++++++--------- paddle/fluid/framework/scope.cc | 5 ++++ paddle/fluid/framework/scope.h | 2 ++ 7 files changed, 61 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 80553765df..74310a6046 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -121,6 +121,9 @@ paddle::framework::FetchList InterpreterCore::Run( Prepare(feed_names, feed_tensors, is_build); if (is_build) { + // add listener before run and is_build=true + global_scope_->ResetListener(); + ExecuteInstructionList(vec_instruction_); } @@ -128,6 +131,9 @@ paddle::framework::FetchList InterpreterCore::Run( ClearLoDTensorArrayInLocalScope(); } + // clear the listener after run + global_scope_->ClearListener(); + // return Fetch Tensors auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName); return std::move(*fetch_var->GetMutable()); @@ -162,6 +168,9 @@ paddle::framework::FetchList InterpreterCore::Run( Convert(&op_func_nodes); } else { + // add listener before run and is_build=true + global_scope_->ResetListener(); + ExecuteInstructionList(vec_instruction_); } @@ -169,6 +178,9 @@ paddle::framework::FetchList InterpreterCore::Run( ClearLoDTensorArrayInLocalScope(); } + // clear the listener after run + global_scope_->ClearListener(); + // return Fetch Tensors auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName); return std::move(*fetch_var->GetMutable()); diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index be05acd7b7..ed813c78bc 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -172,6 +172,8 @@ void build_variable_scope(const framework::BlockDesc& block, auto* ptr = inner_scope->Var(var_name); VLOG(3) << "Initialize Variable " << var_name; + // NOTE(zhiqiu): if var exists in scope and the type is right, + // InitializeVariable will not create a new variable. InitializeVariable(ptr, var_desc->GetType()); VLOG(3) << "Create Variable " << var_name << " global, which pointer is " << ptr << " type is " << static_cast(var_desc->GetType()); diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.cc b/paddle/fluid/framework/new_executor/new_executor_defs.cc index 8f6bac76e2..86d534b0b4 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.cc +++ b/paddle/fluid/framework/new_executor/new_executor_defs.cc @@ -642,6 +642,28 @@ void VariableScope::CheckExist(const std::string& name) const { "%s not in VariableScope.", name)); } +void VariableScope::ClearListener() { + if (scope_ && listener_ && scope_->HasListener(listener_)) { + VLOG(4) << "Clear listener " << listener_ << " for " << scope_; + scope_->DelListener(listener_); + } + if (local_scope_ && listener_ && local_scope_->HasListener(listener_)) { + VLOG(4) << "Clear listener " << listener_ << " for " << local_scope_; + local_scope_->DelListener(listener_); + } +} + +void VariableScope::ResetListener() { + if (scope_ && listener_ && !scope_->HasListener(listener_)) { + VLOG(4) << "Add listener " << listener_ << " for " << scope_; + scope_->AddListener(listener_); + } + if (local_scope_ && listener_ && !local_scope_->HasListener(listener_)) { + VLOG(4) << "Add listener " << listener_ << " for " << local_scope_; + local_scope_->AddListener(listener_); + } +} + VariableScopeListener::VariableScopeListener(VariableScope* var_scope) { var_scope_ = var_scope; } diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index e257b71742..6a1e46e359 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -238,6 +238,10 @@ class VariableScope : public ScopeBase { bool GetVarSikpInplace(int id) const; + void ClearListener(); + + void ResetListener(); + friend class VariableScopeListener; private: diff --git a/paddle/fluid/framework/new_executor/standalone_executor.cc b/paddle/fluid/framework/new_executor/standalone_executor.cc index 4d4f7c74cd..31315df570 100644 --- a/paddle/fluid/framework/new_executor/standalone_executor.cc +++ b/paddle/fluid/framework/new_executor/standalone_executor.cc @@ -25,19 +25,21 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place, startup_prog_(startup_prog), main_prog_(main_prog), global_scope_(VariableScope(scope)) { - // NOTE(zhiqiu): it is needed to sync thhe variables in scope to - // variable_scope, - // since the some variable only exists in startup program, e.g, - // lod_tensor_blocking_queue_0 used in dataloader. - // These variables may be created in scope during runing startup program with - // original executor. + // NOTE(zhiqiu): it is needed to sync the variables in scope to + // variable_scope, since the some variable only exists in scope. + // For example, 'lod_tensor_blocking_queue_0' used in dataloader. + // These variables may be created in scope, and it is not existed as + // variable in program. if (scope) { - auto name_list = scope->LocalVarNames(); - for (auto name : name_list) { - VLOG(4) << "Sync Variable from variable scope: " << name; - auto v = scope->Var(name); - if (!global_scope_.HasVar(name)) { - global_scope_.AddVar(name, *v); + const std::string blocking_queue_prefix = "lod_tensor_blocking_queue"; + auto vars = scope->LocalVarNames(); + for (const auto& name : vars) { + if (name.find(blocking_queue_prefix) != std::string::npos) { + if (!global_scope_.HasVar(name)) { + auto* v = scope->Var(name); + VLOG(4) << "Sync Variable from scope to variable scope: " << name; + global_scope_.AddVar(name, *v); + } } } } diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 0463f5788f..c95159d572 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -289,6 +289,11 @@ void Scope::DelListener(const std::shared_ptr& listener) { listeners_.remove(listener); } +bool Scope::HasListener(const std::shared_ptr& listener) { + auto it = std::find(listeners_.begin(), listeners_.end(), listener); + return it != listeners_.end(); +} + void Scope::EraseVarsExcept(const std::unordered_set& vars) { SCOPE_VARS_WRITER_LOCK for (auto iter = vars_.begin(); iter != vars_.end();) { diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index 1669fba132..9231ec90e8 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -154,6 +154,8 @@ class Scope : public ScopeBase { void DelListener(const std::shared_ptr& listener); + bool HasListener(const std::shared_ptr& listener); + protected: struct KeyHasher { std::size_t operator()(const std::string& key) const { -- GitLab