From 80992253b3f3268a3641f5bd5fc478d544062a97 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 20 Apr 2022 20:03:45 +0800 Subject: [PATCH] [cherry-pick] Cherry pick pr of new-exec (#42009) * [new-exec] shrink downstream map (#41471) * shrink downstream map * shrink last live ops of var * add comment * fix bug * add dependency for send/recv to support pp parallel (#41652) * [new-exec] clear the scope listener after run (#41947) * clear the listener after run * only sync variables in program * refine code * fit for lod_tensor_blocking_queue --- .../framework/new_executor/interpretercore.cc | 94 +++++++---- .../framework/new_executor/interpretercore.h | 5 + .../new_executor/interpretercore_util.cc | 158 +++++++++++++++--- .../new_executor/interpretercore_util.h | 3 +- .../new_executor/new_executor_defs.cc | 22 +++ .../new_executor/new_executor_defs.h | 4 + .../new_executor/standalone_executor.cc | 26 +-- paddle/fluid/framework/scope.cc | 5 + paddle/fluid/framework/scope.h | 2 + 9 files changed, 254 insertions(+), 65 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 29aa7b13a27..67c79c9bf97 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -121,6 +121,9 @@ paddle::framework::FetchList InterpreterCore::Run( Prepare(feed_names, feed_tensors, is_build); if (is_build) { + // add listener before run and is_build=true + global_scope_->ResetListener(); + ExecuteInstructionList(vec_instruction_); } @@ -128,6 +131,9 @@ paddle::framework::FetchList InterpreterCore::Run( ClearLoDTensorArrayInLocalScope(); } + // clear the listener after run + global_scope_->ClearListener(); + // return Fetch Tensors auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName); return std::move(*fetch_var->GetMutable()); @@ -162,6 +168,9 @@ paddle::framework::FetchList InterpreterCore::Run( Convert(&op_func_nodes); } else { + // add listener before run and is_build=true + global_scope_->ResetListener(); + ExecuteInstructionList(vec_instruction_); } @@ -169,6 +178,9 @@ paddle::framework::FetchList InterpreterCore::Run( ClearLoDTensorArrayInLocalScope(); } + // clear the listener after run + global_scope_->ClearListener(); + // return Fetch Tensors auto* fetch_var = global_scope_->Var(interpreter::kFetchVarName); return std::move(*fetch_var->GetMutable()); @@ -192,7 +204,8 @@ void InterpreterCore::BuildOperatorDependences() { // Schedule auto op_nums = vec_instruction_.size(); dependecy_count_.resize(op_nums); - auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_); + auto op2downstream = interpreter::build_op_downstream_map( + vec_instruction_, &op_happens_before_); for (size_t op = 0; op < vec_instruction_.size(); ++op) { auto op_list = op2downstream[op]; std::vector downsteam_vector(op_list.begin(), op_list.end()); @@ -213,18 +226,21 @@ void InterpreterCore::Convert( auto op_nums = nodes.size(); vec_instruction_.reserve(op_nums); - for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { auto& op_func_node = nodes[op_idx]; auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node); - vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_); - auto& instr = vec_instruction_.back(); + } + + BuildOperatorDependences(); + // calculate last_live_ops_ + for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { + auto& instr = vec_instruction_[op_idx]; OpInOutInfo info; - std::vector gc_check_input_list; + std::set gc_check_inputs; - for (auto& item : op_func_node.input_index) { + for (auto& item : instr.Inputs()) { for (auto id : item.second) { if (id == kEmptyVarIndex) { continue; @@ -232,38 +248,24 @@ void InterpreterCore::Convert( input_var2op_info_.at(id).push_back(op_idx); // var can be gc-ed if (!info.IsBuilt()) { - info.Build(op_func_node.operator_base_.get()); + info.Build(instr.OpBase()); } auto* var_desc = global_scope_->VarDesc(id); if (var_desc) { if (info.IsInArgBufferNeeded(var_desc->Name())) { - gc_check_input_list.push_back(id); + gc_check_inputs.insert(id); } } else { - gc_check_input_list.push_back(id); + gc_check_inputs.insert(id); } } } - std::sort(gc_check_input_list.begin(), gc_check_input_list.end()); - auto last = - std::unique(gc_check_input_list.begin(), gc_check_input_list.end()); - gc_check_input_list.erase(last, gc_check_input_list.end()); - for (auto var_id : gc_check_input_list) { + for (auto var_id : gc_check_inputs) { paddle::framework::Variable* var = global_scope_->Var(var_id); if (var->IsType() || var->IsType() || var->IsType()) { - vec_meta_info[var_id].var_ref_count_++; - // TODO(zhiqiu): not all var needs to be checked, var need to be checked - // only - // after the last_live_op. For example, - // b = op1(a) - // c = op2(a, b) - // in this case, a is the input of op1 and op2, we only need to check - // a after op2, because op2 always uses a after op1. - instr.AddGCCheckVar(var_id); - VLOG(4) << "clear " << global_scope_->GetNameById(var_id) << " after " - << instr.OpBase()->Type(); + last_live_ops_[var_id].insert(op_idx); } else { VLOG(4) << "not clear " << global_scope_->GetNameById(var_id) << " after " << instr.OpBase()->Type() @@ -276,19 +278,45 @@ void InterpreterCore::Convert( for (size_t i = 0; i < vec_instruction_.size(); ++i) { // checkout ouput for (auto& item : vec_instruction_[i].Outputs()) { - for (auto id : item.second) { - if (input_var2op_info_.at(id).size() == 0) { - // output var not be used by any kernel - vec_instruction_[i].AddGCCheckVar(id); - VLOG(4) << "clear " << global_scope_->GetNameById(id) << " after " - << vec_instruction_[i].OpBase()->Type(); - vec_meta_info[id].var_ref_count_++; + for (auto var_id : item.second) { + if (input_var2op_info_.at(var_id).size() == 0) { + last_live_ops_[var_id].insert(i); } } } } - BuildOperatorDependences(); + // shrink, find the downstream op that has no other op in the + // downstream list happens before it + // For example, + // b = op1(a) + // c = op2(a, b) + // in this case, a is the input of op1 and op2, we only need to check + // a after op2, because op2 always uses a after op1. + for (size_t i = 0; i < last_live_ops_.size(); ++i) { + std::set minumum_last_live_ops; + for (size_t item : last_live_ops_[i]) { + bool not_before_any = true; + // find the op that is not executed before any + for (size_t other_item : last_live_ops_[i]) { + if (op_happens_before_[item][other_item]) { + VLOG(8) << "happens_before: " << item << "->" << other_item + << ", so skip " << item; + not_before_any = false; + break; + } + } + if (not_before_any) { + VLOG(8) << "last live op of var " << i << " " + << global_scope_->GetNameById(i) << " : " << item << " " + << vec_instruction_[item].OpBase()->Type(); + minumum_last_live_ops.insert(item); + vec_instruction_[item].AddGCCheckVar(i); + } + } + last_live_ops_[i] = minumum_last_live_ops; + vec_meta_info[i].var_ref_count_ = last_live_ops_[i].size(); + } for (size_t i = 0; i < vec_instruction_.size(); ++i) { BuildAndCacheInstructionCtx(&vec_instruction_[i]); diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index c1ade85e138..3af0ddb675a 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -109,6 +109,11 @@ class InterpreterCore { std::vector vec_instruction_; // deconstruct before OpFuncNode + // op_happens_before_[i][j] == true means op[i] happens before op[j] + std::vector> op_happens_before_; + // last_live_ops_[i] contains the id of operatos that last access var[i] + std::map> last_live_ops_; + std::vector dependecy_count_; std::atomic unfinished_op_numer_{0}; std::vector> input_var2op_info_; diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 59703332efe..ed813c78bc3 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -172,6 +172,8 @@ void build_variable_scope(const framework::BlockDesc& block, auto* ptr = inner_scope->Var(var_name); VLOG(3) << "Initialize Variable " << var_name; + // NOTE(zhiqiu): if var exists in scope and the type is right, + // InitializeVariable will not create a new variable. InitializeVariable(ptr, var_desc->GetType()); VLOG(3) << "Create Variable " << var_name << " global, which pointer is " << ptr << " type is " << static_cast(var_desc->GetType()); @@ -614,23 +616,125 @@ void update_var_min_rw_op(const std::map>& op2dependences, } std::map> get_downstream_map( - const std::map>& op2dependences) { - // op2dependences is op -> it's dependences. we want to get op -> [ops] map, + const std::map>& op2dependences, + std::vector>* op_happens_before) { + // step1: convert op2dependences to downstream_map directly + // op2dependences is op -> it's dependences. + // we want to get op -> [next ops] map, // where ops is the next instruction of op. - std::map> result; + std::map> downstream; for (auto& item : op2dependences) { int op = item.first; for (auto dep_op : item.second) { - if (result.find(dep_op) == result.end()) - result[dep_op] = std::list(); - result[dep_op].push_back(op); + if (downstream.find(dep_op) == downstream.end()) + downstream[dep_op] = std::list(); + downstream[dep_op].push_back(op); } } - return std::move(result); + + auto downstream_map_to_str = [&]() -> std::string { + std::ostringstream oss; + for (auto pair : downstream) { + oss << pair.first << " -> "; + std::copy(pair.second.begin(), pair.second.end(), + std::ostream_iterator(oss, " ")); + oss << std::endl; + } + return oss.str(); + }; + + auto downstream_map_count = [&]() -> size_t { + size_t count = 0; + for (auto pair : downstream) { + count += pair.second.size(); + } + return count; + }; + + VLOG(6) << "downstream count: " << downstream_map_count(); + VLOG(6) << "downstream_map: " << std::endl << downstream_map_to_str(); + + // step2: remove unneccessary downstream ops + // for example, a->b->c + // a: b, c + // b: c + // => + // a: b + // b: c + + // NOTE(zhiqiu): the size of downstream != size of op2dependences + // since there are some ops that have no downstream-op. + auto op_num = op2dependences.size(); + // happens_before[i][j] means i should be executed before j + op_happens_before->resize(op_num); + for (size_t i = 0; i < op_num; ++i) { + (*op_happens_before)[i].resize(op_num); + std::fill((*op_happens_before)[i].begin(), (*op_happens_before)[i].end(), + false); + } + + // bfs to get all next ops + auto bfs = [&](size_t op_idx) { + std::queue q; + std::vector visited(op_num, false); + q.push(op_idx); + while (!q.empty()) { + size_t op = q.front(); + q.pop(); + visited[op] = true; + if (!downstream.count(op)) { + continue; + } + for (auto next : downstream[op]) { + if (!visited[next]) { + PADDLE_ENFORCE_EQ((*op_happens_before)[next][op_idx], false, + paddle::platform::errors::AlreadyExists( + "There exists circle in graph, expected " + "%d->%d, but already got %d->%d", + op_idx, next, next, op_idx)); + (*op_happens_before)[op_idx][next] = true; + VLOG(8) << "happens before: " << op_idx << " " << next; + q.push(next); + } + } + } + }; + + for (size_t i = 0; i < op_num; ++i) { + bfs(i); + } + + // shrink, find the downstream op that has no other op in the + // downstream list happens before it + for (size_t i = 0; i < op_num; ++i) { + std::list minumum_nexts; + for (size_t item : downstream[i]) { + bool not_after_any = true; + // find the op that is not executed after any + for (size_t other_item : downstream[i]) { + if ((*op_happens_before)[other_item][item]) { + VLOG(8) << "happens_before: " << other_item << "->" << item + << ", so skip " << item; + not_after_any = false; + break; + } + } + if (not_after_any) { + VLOG(8) << "downstream op of " << i << ": " << item; + minumum_nexts.push_back(item); + } + } + downstream[i] = minumum_nexts; + } + VLOG(6) << "downstream count: " << downstream_map_count(); + VLOG(6) << "downstream_map: " << std::endl << downstream_map_to_str(); + + return std::move(downstream); } std::map> build_op_downstream_map( - const std::vector& vec_instruction) { + const std::vector& vec_instruction, + std::vector>* op_happens_before) { auto var2min_rw_op = std::map< int, std::list>(); // # map from variable id to read / write op id. auto var2recent_write_op = @@ -709,8 +813,13 @@ std::map> build_op_downstream_map( // add dependences for random op, make sure that the random op is scheduled // sequentially const std::set random_op_set = { - "bernoulli", "poisson", "multinomial", "gaussian_random", - "uniform_random", "randint", "randperm", "exponential"}; + "bernoulli", "poisson", "multinomial", "gaussian_random", + "truncated_gaussian_random", "uniform_random", "randint", "randperm", + "exponential", + "sampling_id" + "dropout", + "class_center_sample", + }; int dependence_op_idx = -1; for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { @@ -723,13 +832,26 @@ std::map> build_op_downstream_map( } // add dependency for communication op - const std::string communication_op_prefix = "c_"; + auto is_comm_op = [](std::string op) -> bool { + const std::set special_comm_op_set = { + "send", "recv", "send_v2", "recv_v2", + }; + const std::string communication_op_prefix = "c_"; + if (op.find(communication_op_prefix) != std::string::npos || + special_comm_op_set.count(op)) { + return true; + } + return false; + }; + dependence_op_idx = -1; for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { - if (vec_instruction[op_idx].OpBase()->Type().find( - communication_op_prefix) != std::string::npos) { + if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) { if (dependence_op_idx != -1) { op2dependences[op_idx].insert(dependence_op_idx); + VLOG(4) << "Add depend from " + << vec_instruction[dependence_op_idx].OpBase()->Type() << " to " + << vec_instruction[op_idx].OpBase()->Type(); } dependence_op_idx = op_idx; } @@ -833,10 +955,8 @@ std::map> build_op_downstream_map( for (size_t j = first_read_fused_out_op + 1; j < vec_instruction.size(); ++j) { if (j == target + 1 && - vec_instruction[target].OpBase()->Type().find( - communication_op_prefix) != std::string::npos && - vec_instruction[j].OpBase()->Type().find(communication_op_prefix) != - std::string::npos) { + is_comm_op(vec_instruction[target].OpBase()->Type()) && + is_comm_op(vec_instruction[j].OpBase()->Type())) { VLOG(4) << "Found consecutive communication ops, " << vec_instruction[target].OpBase()->Type() << " -> " << vec_instruction[j].OpBase()->Type(); @@ -857,13 +977,13 @@ std::map> build_op_downstream_map( } } for (auto pair : op2dependences) { - VLOG(10) << pair.first << " Depends on " << pair.second.size(); std::ostringstream oss; + oss << pair.first << " Depends on " << pair.second.size() << " ops: "; std::copy(pair.second.begin(), pair.second.end(), std::ostream_iterator(oss, " ")); VLOG(10) << oss.str(); } - return std::move(get_downstream_map(op2dependences)); + return std::move(get_downstream_map(op2dependences, op_happens_before)); } } // namespace interpreter diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 044a9ea368c..56683330ee6 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -116,7 +116,8 @@ void build_op_func_list(const platform::Place& place, VariableScope* var_scope, bool use_local_scope = true); std::map> build_op_downstream_map( - const std::vector& vec_instruction); + const std::vector& vec_instruction, + std::vector>* op_happens_before); void add_fetch(const std::vector& fetch_names, framework::BlockDesc* block); diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.cc b/paddle/fluid/framework/new_executor/new_executor_defs.cc index 089e68fe48c..ac1a654df47 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.cc +++ b/paddle/fluid/framework/new_executor/new_executor_defs.cc @@ -641,6 +641,28 @@ void VariableScope::CheckExist(const std::string& name) const { "%s not in VariableScope.", name)); } +void VariableScope::ClearListener() { + if (scope_ && listener_ && scope_->HasListener(listener_)) { + VLOG(4) << "Clear listener " << listener_ << " for " << scope_; + scope_->DelListener(listener_); + } + if (local_scope_ && listener_ && local_scope_->HasListener(listener_)) { + VLOG(4) << "Clear listener " << listener_ << " for " << local_scope_; + local_scope_->DelListener(listener_); + } +} + +void VariableScope::ResetListener() { + if (scope_ && listener_ && !scope_->HasListener(listener_)) { + VLOG(4) << "Add listener " << listener_ << " for " << scope_; + scope_->AddListener(listener_); + } + if (local_scope_ && listener_ && !local_scope_->HasListener(listener_)) { + VLOG(4) << "Add listener " << listener_ << " for " << local_scope_; + local_scope_->AddListener(listener_); + } +} + VariableScopeListener::VariableScopeListener(VariableScope* var_scope) { var_scope_ = var_scope; } diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index aab32cfa06d..b223a2ad769 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -238,6 +238,10 @@ class VariableScope : public ScopeBase { bool GetVarSikpInplace(int id) const; + void ClearListener(); + + void ResetListener(); + friend class VariableScopeListener; private: diff --git a/paddle/fluid/framework/new_executor/standalone_executor.cc b/paddle/fluid/framework/new_executor/standalone_executor.cc index a2250231475..72eb90dd727 100644 --- a/paddle/fluid/framework/new_executor/standalone_executor.cc +++ b/paddle/fluid/framework/new_executor/standalone_executor.cc @@ -24,19 +24,21 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place, startup_prog_(startup_prog), main_prog_(main_prog), global_scope_(VariableScope(scope)) { - // NOTE(zhiqiu): it is needed to sync thhe variables in scope to - // variable_scope, - // since the some variable only exists in startup program, e.g, - // lod_tensor_blocking_queue_0 used in dataloader. - // These variables may be created in scope during runing startup program with - // original executor. + // NOTE(zhiqiu): it is needed to sync the variables in scope to + // variable_scope, since the some variable only exists in scope. + // For example, 'lod_tensor_blocking_queue_0' used in dataloader. + // These variables may be created in scope, and it is not existed as + // variable in program. if (scope) { - auto name_list = scope->LocalVarNames(); - for (auto name : name_list) { - VLOG(4) << "Sync Variable from variable scope: " << name; - auto v = scope->Var(name); - if (!global_scope_.HasVar(name)) { - global_scope_.AddVar(name, *v); + const std::string blocking_queue_prefix = "lod_tensor_blocking_queue"; + auto vars = scope->LocalVarNames(); + for (const auto& name : vars) { + if (name.find(blocking_queue_prefix) != std::string::npos) { + if (!global_scope_.HasVar(name)) { + auto* v = scope->Var(name); + VLOG(4) << "Sync Variable from scope to variable scope: " << name; + global_scope_.AddVar(name, *v); + } } } } diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index 0463f5788f1..c95159d5727 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -289,6 +289,11 @@ void Scope::DelListener(const std::shared_ptr& listener) { listeners_.remove(listener); } +bool Scope::HasListener(const std::shared_ptr& listener) { + auto it = std::find(listeners_.begin(), listeners_.end(), listener); + return it != listeners_.end(); +} + void Scope::EraseVarsExcept(const std::unordered_set& vars) { SCOPE_VARS_WRITER_LOCK for (auto iter = vars_.begin(); iter != vars_.end();) { diff --git a/paddle/fluid/framework/scope.h b/paddle/fluid/framework/scope.h index 1669fba1327..9231ec90e8f 100644 --- a/paddle/fluid/framework/scope.h +++ b/paddle/fluid/framework/scope.h @@ -154,6 +154,8 @@ class Scope : public ScopeBase { void DelListener(const std::shared_ptr& listener); + bool HasListener(const std::shared_ptr& listener); + protected: struct KeyHasher { std::size_t operator()(const std::string& key) const { -- GitLab