From bb1fffd66015b3552ee8b1bb727519ab0e442166 Mon Sep 17 00:00:00 2001 From: zhangbo9674 <82555433+zhangbo9674@users.noreply.github.com> Date: Mon, 28 Nov 2022 11:50:36 +0800 Subject: [PATCH] Add trace mode for interpretercore (#48370) * add trace mode for interpretercore * fix bug * add a ctrl flag * add record for memcpyd2h * polish code * polish code --- .../framework/new_executor/interpretercore.cc | 244 ++++++++++++++---- .../framework/new_executor/interpretercore.h | 12 +- 2 files changed, 204 insertions(+), 52 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 81cbea8efaa..c792aa393b5 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -206,7 +206,12 @@ paddle::framework::FetchList InterpreterCore::Run( gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_); } - ExecuteInstructionList(vec_instruction_); + if (execution_config_.used_for_jit && (sync_op_num_ == 0)) { + VLOG(4) << "Tracing Instruction List"; + TraceInstructionList(vec_instruction_); + } else { + ExecuteInstructionList(vec_instruction_); + } #ifdef PADDLE_WITH_ASCEND_CL if (platform::is_npu_place(place_)) { platform::DeviceContextPool::Instance().Get(place_)->Wait(); @@ -257,6 +262,7 @@ paddle::framework::FetchList InterpreterCore::Run( // convert vec func_list to graph Convert(&op_func_nodes); is_build_ = true; + UpdateSyncOpNum(); } else { // For the program that only run once, it is no need to // create work_queue, so the async_work_queue_ is created @@ -268,7 +274,12 @@ paddle::framework::FetchList InterpreterCore::Run( gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_); } - ExecuteInstructionList(vec_instruction_); + if (execution_config_.used_for_jit && (sync_op_num_ == 0)) { + VLOG(4) << "Tracing Instruction List"; + TraceInstructionList(vec_instruction_); + } else { + ExecuteInstructionList(vec_instruction_); + } #ifdef PADDLE_WITH_ASCEND_CL if (platform::is_npu_place(place_)) { platform::DeviceContextPool::Instance().Get(place_)->Wait(); @@ -719,6 +730,8 @@ void InterpreterCore::Convert( refs_.emplace_back(std::make_shared( vec_meta_info[i].var_ref_count_, var_scope_.VarRef(i))); } + + AnalyseExecuteOrderForTrace(); } void InterpreterCore::BuildSkipShareLoDInfo() { @@ -741,7 +754,7 @@ void InterpreterCore::BuildSkipShareLoDInfo() { } } -void InterpreterCore::RunInstruction(const Instruction& instr_node) { +void InterpreterCore::RunOperator(const Instruction& instr_node) { auto* op = instr_node.OpBase(); auto place = instr_node.DeviceContext().GetPlace(); Scope* local_scope = HasLocalScope() ? var_scope_.GetMutableLocalScope() @@ -865,6 +878,45 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) { } } +void InterpreterCore::RunInstruction(const Instruction& instr_node) { + VLOG(5) << __func__ << " OP id:" << instr_node.Id() + << " name:" << instr_node.OpBase()->Type() << " type:" + << (instr_node.KernelType() == OpFuncType::kCpuSync + ? "kCpuSync" + : (instr_node.KernelType() == OpFuncType::kGpuSync + ? "kGpuSync" + : "kGpuAsync")) + << " runs on " << platform::GetCurrentThreadName(); + + auto* op = instr_node.OpBase(); + platform::RecordEvent instruction_event( + op->Type(), platform::TracerEventType::Operator, 1); + + try { + instr_node.WaitEvent(place_); + + if (!instr_node.IsArtificial()) { + RunOperator(instr_node); + CheckGC(instr_node); + interpreter::LogDeviceMemoryStats(place_); + } + + instr_node.RecordEvent(place_); + } catch (platform::EnforceNotMet& ex) { + framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex); + exception_holder_.Catch(std::make_exception_ptr(std::move(ex))); + } catch (platform::EOFException&) { + exception_holder_.Catch(std::current_exception()); + } catch (std::exception& ex) { + LOG(WARNING) << op->Type() << " raises an exception " + << platform::demangle(typeid(ex).name()) << ", " << ex.what(); + exception_holder_.Catch(std::current_exception()); + } catch (...) { + LOG(WARNING) << op->Type() << " raises an unknown exception"; + exception_holder_.Catch(std::current_exception()); + } +} + void InterpreterCore::ExecuteInstructionList( const std::vector& vec_instr) { interpreter::ResetAtomicGuard guard(&deps_, &refs_); @@ -879,18 +931,7 @@ void InterpreterCore::ExecuteInstructionList( for (size_t i = 0; i < dependecy_count_.size(); ++i) { if (dependecy_count_[i] == 0) { // NOTE(zhiqiu): hot fix for jit input var - if (vec_instr.at(i).OpBase()->Type() == interpreter::kMemcpyD2H) { - platform::DeviceContextPool& pool = - platform::DeviceContextPool::Instance(); - auto* default_dev_ctx = pool.Get(place_); - for (auto& event : vec_instr.at(i).EventsToWait()) { - platform::RecordEvent record( - "RecordStreamEvent", platform::TracerEventType::UserDefined, 10); - VLOG(3) << "Record event on default stream in jit_input_var at op: " - << vec_instr.at(i).OpBase()->Type(); - event.event_->Record(default_dev_ctx); - } - } + RecordMemcpyD2H(vec_instr.at(i)); async_work_queue_->AddTask(vec_instr.at(i).KernelType(), [this, i] { RunInstructionAsync(i); }); } @@ -955,43 +996,8 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { instr_id = ready_ops.front(); ready_ops.pop_front(); auto& instr_node = vec_instruction_.at(instr_id); - VLOG(5) << __func__ << " OP id:" << instr_node.Id() - << " name:" << instr_node.OpBase()->Type() << " type:" - << (instr_node.KernelType() == OpFuncType::kCpuSync - ? "kCpuSync" - : (instr_node.KernelType() == OpFuncType::kGpuSync - ? "kGpuSync" - : "kGpuAsync")) - << " runs on " << platform::GetCurrentThreadName(); - - auto* op = instr_node.OpBase(); - platform::RecordEvent instruction_event( - op->Type(), platform::TracerEventType::Operator, 1); - - try { - instr_node.WaitEvent(place_); - - if (!instr_node.IsArtificial()) { - RunInstruction(instr_node); - CheckGC(instr_node); - interpreter::LogDeviceMemoryStats(place_); - } - instr_node.RecordEvent(place_); - } catch (platform::EnforceNotMet& ex) { - framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex); - exception_holder_.Catch(std::make_exception_ptr(std::move(ex))); - } catch (platform::EOFException&) { - exception_holder_.Catch(std::current_exception()); - } catch (std::exception& ex) { - LOG(WARNING) << op->Type() << " raises an exception " - << platform::demangle(typeid(ex).name()) << ", " - << ex.what(); - exception_holder_.Catch(std::current_exception()); - } catch (...) { - LOG(WARNING) << op->Type() << " raises an unknown exception"; - exception_holder_.Catch(std::current_exception()); - } + RunInstruction(instr_node); if (UNLIKELY(exception_holder_.IsCaught())) { VLOG(4) << "Exception caught"; @@ -1176,6 +1182,7 @@ void InterpreterCore::Prepare(const std::vector& feed_names, SetFeedVarsInplaceSkip(feed_names); // convert vec func_list to graph Convert(&op_func_nodes); + UpdateSyncOpNum(); is_build_ = true; } // NOTE: Because feed_tensor will be GC after @@ -1213,5 +1220,140 @@ std::shared_ptr CreateInterpreterCore( return core; } +// Note(zhangbo): +// (1) What is "Trace"? +// The OP execute scheduling rule adopted by Interpretercore by default is a +// multi-threaded scheduling mode(see ExecuteInstructionList). By maintaining a +// high-performance thread pool, the OP's execute scheduling is distributed to +// the sub threads maintained by the thread pool, but the main thread does not +// have any tasks. In Trace mode, the executor will execute directly in the main +// thread according to the pre provided OP sequence(trace_execute_order_), +// instead of being distributed to the thread pool. +// (2) When we use "Trace"? +// In dygraph to static, This scheduling causes that the execution of the +// forward and backward OPs and the execution of the dygraph optimizer cannot be +// executed in the same thread. Executing thread switch may cause cpu cache +// miss. When a model is all KQueueAsync type OPs, all OPs will be distributed +// to the DeviceThread for execution, and the multithreading scheduling will not +// have any benefits. Therefore, in the dynamic to static, when the number of +// KQueueAsync Ops is 0, we choose Trace mode. +void InterpreterCore::TraceInstructionList( + const std::vector& vec_instr) { + unfinished_op_number_ = vec_instr.size(); + if (unfinished_op_number_ == 0) { + VLOG(4) << "No op to run, return"; + return; + } + + exception_holder_.Clear(); + + for (size_t i = 0; i < dependecy_count_.size(); ++i) { + if (dependecy_count_[i] == 0) { + // NOTE(zhiqiu): hot fix for jit input var + RecordMemcpyD2H(vec_instr.at(i)); + } + } + + for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) { + auto instr_id = trace_execute_order_[idx]; + auto& instr_node = vec_instruction_.at(instr_id); + + RunInstruction(instr_node); + + if (UNLIKELY(exception_holder_.IsCaught())) { + VLOG(4) << "Exception caught"; + break; + } + } + + if (UNLIKELY(exception_holder_.IsCaught())) { + VLOG(1) << "Exception caught " << exception_holder_.Type(); + PADDLE_ENFORCE_EQ( + main_thread_blocker_.Clear(), + 0, + platform::errors::PreconditionNotMet( + "main_thread_blocker_.Clear() return -1, clear failed")); + VLOG(4) << "clear ok"; + exception_holder_.ReThrow(); + } +} + +void InterpreterCore::RecordMemcpyD2H(const Instruction& instr_node) { + // NOTE(zhiqiu): hot fix for jit input var + if (instr_node.OpBase()->Type() == interpreter::kMemcpyD2H) { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto* default_dev_ctx = pool.Get(place_); + for (auto& event : instr_node.EventsToWait()) { + platform::RecordEvent record( + "RecordStreamEvent", platform::TracerEventType::UserDefined, 10); + VLOG(3) << "Record event on default stream in jit_input_var at op: " + << instr_node.OpBase()->Type(); + event.event_->Record(default_dev_ctx); + } + } +} + +void InterpreterCore::UpdateSyncOpNum() { + int64_t sync_op_num = 0; + for (size_t i = 0; i < vec_instruction_.size(); ++i) { + if (vec_instruction_[i].KernelType() == OpFuncType::kCpuSync || + vec_instruction_[i].KernelType() == OpFuncType::kGpuSync) { + sync_op_num = sync_op_num + 1; + } + } + sync_op_num_ = sync_op_num; + VLOG(4) << "Update sync op num, sync op num is: " << sync_op_num_; +} + +// Note(zhangbo): +// When there is a KQueueSync type OP in the model, breadth traversal is better +// than depth traversal. For example: OP(O) ->(direct_run)-> OP(A) +// ->(sync_run)-> OP(B) OP(O) ->(direct_run)-> OP(C) ->(direct_run)-> OP(D) If B +// is run before C, B may always block to wait for A to finish executing, but in +// fact, C can be executed first during this time. +void InterpreterCore::AnalyseExecuteOrderForTrace() { + VLOG(4) << "Analyze the execution order of Trace scheduling mode."; + interpreter::ResetAtomicGuard guard(&deps_, &refs_); + + auto op_downstream_map = dependency_builder_.OpDownstreamMap(); + + auto IsReady = [this](size_t next_id) { + VLOG(4) << "op_id: " << next_id + << ", remain deps: " << deps_[next_id]->DynamicDep(); + return deps_[next_id]->CheckAndDecrease(); + }; + + std::vector trace_order; + std::deque ready_ops; + + for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) { + if (dependecy_count_[instr_id] == 0) { + ready_ops.push_back(instr_id); + } + } + + while (!ready_ops.empty()) { + auto now_id = ready_ops.front(); + ready_ops.pop_front(); + trace_order.push_back(now_id); + + auto next_op_set = op_downstream_map[now_id]; + + for (size_t next_op_id : next_op_set) { + if (IsReady(next_op_id)) { + ready_ops.push_back(next_op_id); + } + } + } + + PADDLE_ENFORCE_EQ( + trace_order.size(), + dependecy_count_.size(), + platform::errors::PreconditionNotMet( + "trace_order size should be equal to dependecy_count_.")); + + trace_execute_order_ = trace_order; +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index ceb5ad2c727..2c25a0dee9c 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -83,6 +83,8 @@ class InterpreterCore { void BuildOperatorDependences(); void BuildAndCacheInstructionCtx(Instruction* instr_node); void BuildSkipShareLoDInfo(); + void UpdateSyncOpNum(); + void AnalyseExecuteOrderForTrace(); // inplace void BuildInplace(); @@ -96,11 +98,17 @@ class InterpreterCore { void RunInstruction(const Instruction& instr_node); void RunNextInstructions(const Instruction& instr_id, std::deque* reserved_next_ops); + void RunOperator(const Instruction& instr_node); + // Trace + void TraceInstructionList(const std::vector& vec_instr); + // only used when program contains no feed op void Prepare(const std::vector& feed_names, const std::vector& feed_tensors, bool prepare_feed); + void RecordMemcpyD2H(const Instruction& instr_node); + // gc void RecordStreamForGC(const Instruction& instr); void CheckGC(const Instruction& instr); @@ -159,7 +167,9 @@ class InterpreterCore { std::vector> deps_; std::vector> refs_; - // for jit + // used for Trace + int64_t sync_op_num_{-1}; + std::vector trace_execute_order_; }; std::shared_ptr CreateInterpreterCore( -- GitLab