diff --git a/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.cc b/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.cc index 813119e4101f400f00ca4f9a43a240627adcc5ed..3c837e7ac6edc8c4e774ce8f3ff17b0275a3efda 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.cc @@ -37,6 +37,19 @@ InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector( } } +InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector( + const std::vector>& vec_instruction) { + WorkQueueOptions options(/*name*/ "GarbageCollector", + /*num_threads*/ 1, + /*allow_spinning*/ true, + /*track_task*/ false); + queue_ = CreateSingleThreadedWorkQueue(options); + for (auto& instruc : vec_instruction) { + gc_event_.emplace_back(instruc->DeviceContext().GetPlace(), + platform::GenerateDeviceEventFlag()); + } +} + InterpreterCoreEventGarbageCollector::~InterpreterCoreEventGarbageCollector() { queue_.reset(nullptr); } @@ -53,6 +66,18 @@ void InterpreterCoreEventGarbageCollector::Add(Variable* var, Add(var, &gc_event_.at(instr.Id()), &instr.DeviceContext()); } +void InterpreterCoreEventGarbageCollector::Add(Variable* var, + const InstructionBase* instr) { + PADDLE_ENFORCE_LT(instr->Id(), + gc_event_.size(), + platform::errors::OutOfRange( + "The index should be less than the size of gc event " + ", but got index is %d and size is %d", + instr->Id(), + gc_event_.size())); + Add(var, &gc_event_.at(instr->Id()), &instr->DeviceContext()); +} + void InterpreterCoreEventGarbageCollector::Add( Variable* var, platform::DeviceEvent* event, diff --git a/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h b/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h index 305dbb598b2cf030e26b3998a7281af4e36a0c37..87a620fa80c5e22171da120bea063346d0a6a423 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h +++ b/paddle/fluid/framework/new_executor/garbage_collector/event_garbage_collector.h @@ -26,9 +26,16 @@ class InterpreterCoreEventGarbageCollector public: InterpreterCoreEventGarbageCollector( const std::vector& vec_instruction); + + InterpreterCoreEventGarbageCollector( + const std::vector>& vec_instruction); + ~InterpreterCoreEventGarbageCollector(); + void Add(Variable* var, const Instruction& instruction) override; + void Add(Variable* var, const InstructionBase* instruction) override; + private: void Add(Variable* var, platform::DeviceEvent* event, diff --git a/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.cc b/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.cc index 9d3984908ac1fa8f553e978e30f22bf9ab123ecc..09b004e65b411876cd2ad274c519dd75bb9bab0d 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.cc @@ -22,6 +22,11 @@ void InterpreterCoreFastGarbageCollector::Add(Variable* var, Add(var); } +void InterpreterCoreFastGarbageCollector::Add(Variable* var, + const InstructionBase*) { + Add(var); +} + void InterpreterCoreFastGarbageCollector::Add(Variable* var) { if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) { return; diff --git a/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h b/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h index 07034a4f299834748196e728ac12696210ebd072..f508b8cd55d14b9ed6526b52be8cdae0f3c124d5 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h +++ b/paddle/fluid/framework/new_executor/garbage_collector/fast_garbage_collector.h @@ -23,6 +23,8 @@ class InterpreterCoreFastGarbageCollector public: void Add(Variable* var, const Instruction& instr) override; + void Add(Variable* var, const InstructionBase* instr) override; + private: void Add(Variable* var); void Add(Garbage garbage); diff --git a/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.cc b/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.cc index ed2d48f4b1d851e75782d36f6b7c6272db5e3ce2..42f5d7e765106d254466c82e69fad3a41f8ee912 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.cc @@ -49,6 +49,36 @@ InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() { cur_memory_size_ = 0; } +std::unique_ptr +CreateInterpreterCoreGarbageCollector( + const platform::Place& place, + const std::vector>& vec_instruction) { + if (platform::is_gpu_place(place)) { + if (IsInterpretercoreFastGCEnabled()) { + return std::unique_ptr( + new InterpreterCoreFastGarbageCollector()); + } else { + return std::unique_ptr( + new InterpreterCoreEventGarbageCollector(vec_instruction)); + } + } else if (platform::is_xpu_place(place)) { + // Because there is no multi-stream on XPU device, fast GC can + // be used. + // Previously, XPU used no_event GC. But `Wait` in no_event GC + // may cause GC delayed, causing no enough memory problem. + // TODO(pangyoki): Multi-stream allocator and multi-stream GC + // are needed to be adapted for XPU. + return std::unique_ptr( + new InterpreterCoreFastGarbageCollector()); + } else if (platform::is_ipu_place(place)) { + return std::unique_ptr( + new InterpreterCoreNoEventGarbageCollector()); + } else { + return std::unique_ptr( + new InterpreterCoreEventGarbageCollector(vec_instruction)); + } +} + std::unique_ptr CreateInterpreterCoreGarbageCollector( const platform::Place& place, diff --git a/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h b/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h index 2e8e1792cd139ec3582e0643d94bc3cbba2f8e38..fb697e887216eeb891bc39010645ecb4ba771dc5 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h +++ b/paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h @@ -15,6 +15,7 @@ #include +#include "paddle/fluid/framework/new_executor/instruction/instruction_base.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h" #include "paddle/fluid/memory/allocation/spin_lock.h" #include "paddle/fluid/platform/device_event.h" @@ -34,6 +35,8 @@ class InterpreterCoreGarbageCollector { virtual void Add(Variable* var, const Instruction& instruction) = 0; + virtual void Add(Variable* var, const InstructionBase* instruction) = 0; + DISABLE_COPY_AND_ASSIGN(InterpreterCoreGarbageCollector); protected: @@ -50,5 +53,10 @@ CreateInterpreterCoreGarbageCollector( const platform::Place& place, const std::vector& vec_instruction); +std::unique_ptr +CreateInterpreterCoreGarbageCollector( + const platform::Place& place, + const std::vector>& vec_instruction); + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.cc b/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.cc index ca51b904822f2470d5f9d7270798a2a02f361567..fc39bcaa5de36a14943f8c66644b854982bf7b0c 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.cc @@ -36,6 +36,11 @@ void InterpreterCoreNoEventGarbageCollector::Add(Variable* var, Add(var, &instr.DeviceContext()); } +void InterpreterCoreNoEventGarbageCollector::Add(Variable* var, + const InstructionBase* instr) { + Add(var, &instr->DeviceContext()); +} + void InterpreterCoreNoEventGarbageCollector::Add( Variable* var, const platform::DeviceContext* ctx) { if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) { diff --git a/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.h b/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.h index 36c8adec367ad6cc00e5d03a2c51ddf805665967..0d610f29578172acdd188dcfbedfc9db065a1089 100644 --- a/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.h +++ b/paddle/fluid/framework/new_executor/garbage_collector/no_event_garbage_collector.h @@ -28,6 +28,8 @@ class InterpreterCoreNoEventGarbageCollector ~InterpreterCoreNoEventGarbageCollector(); void Add(Variable* var, const Instruction& instr) override; + void Add(Variable* var, const InstructionBase* instr) override; + private: void Add(Variable* var, const platform::DeviceContext* ctx); void Add(Garbage garbage, const platform::DeviceContext* ctx); diff --git a/paddle/fluid/framework/new_executor/instruction/phi_kernel_instruction.cc b/paddle/fluid/framework/new_executor/instruction/phi_kernel_instruction.cc index 39e791aca3f8ac077c522fc861a2c6e5111003f4..71081e1725252fdd51a0ee841376d29dcd800e98 100644 --- a/paddle/fluid/framework/new_executor/instruction/phi_kernel_instruction.cc +++ b/paddle/fluid/framework/new_executor/instruction/phi_kernel_instruction.cc @@ -246,6 +246,7 @@ PhiKernelInstruction::PhiKernelInstruction( kernel_context_.SetDeviceContext(phi::DeviceContextPool::Instance().Get( phi::TransToPhiPlace(kernel_key.backend()))); VLOG(6) << "finish process kernel context"; + SetDeviceContext( ParseDeviceContext(op, phi::DeviceContextPool::Instance().Get( diff --git a/paddle/fluid/framework/new_executor/new_ir_interpreter.cc b/paddle/fluid/framework/new_executor/new_ir_interpreter.cc index b9060cb16e0d88e04baa2ee92217c06e73d91646..df94728559a79bbf1d3fd9129eb6b8c7160b1447 100644 --- a/paddle/fluid/framework/new_executor/new_ir_interpreter.cc +++ b/paddle/fluid/framework/new_executor/new_ir_interpreter.cc @@ -38,6 +38,7 @@ #include "paddle/fluid/framework/new_executor/instruction/phi_kernel_instruction.h" #include "paddle/fluid/ir/phi_kernel_adaptor/phi_kernel_util.h" +#include "paddle/ir/core/builtin_attribute.h" namespace paddle { namespace framework { @@ -90,6 +91,17 @@ NewIRInterpreter::NewIRInterpreter(const platform::Place& place, return lhs_scheduling_priority > rhs_scheduling_priority; }; + ir_instruction_scheduling_priority_less = [this](size_t lhs, size_t rhs) { + SchedulingPriority lhs_scheduling_priority = + vec_instruction_base_[lhs]->GetSchedulingPriority(); + SchedulingPriority rhs_scheduling_priority = + vec_instruction_base_[rhs]->GetSchedulingPriority(); + if (lhs_scheduling_priority == rhs_scheduling_priority) { + return lhs < rhs; + } + return lhs_scheduling_priority > rhs_scheduling_priority; + }; + PrepareForCUDAGraphCapture(); } @@ -196,13 +208,6 @@ FetchList NewIRInterpreter::Run(const std::vector& feed_names, &variable_list_); VLOG(4) << DebugValueInfo(); - // NOTE(zhangbo): Iterative version, gradually replacing BuildOpFuncList() - // and Convert() by: - // [1] BuildInstruction(); - // [2] BuildInstructionDependences(); - // [3] ir_stream_analyzer_.ConstructEvents(&vec_instruction_base_); - // [4] GC(); - std::vector op_func_nodes; interpreter::BuildOpFuncList(place_, ir_program_->block(), @@ -247,73 +252,12 @@ FetchList NewIRInterpreter::Run(const std::vector& feed_names, } } -FetchList NewIRInterpreter::BetaRun(const std::vector& feed_names, - bool need_fetch) { - SetDeviceId(place_); - if (!is_build_) { - LOG_FIRST_N(INFO, 1) << "New Executor is BetaRunning."; - std::stringstream ss; - ss << this; - ::ir::BuildScope(*ir_program_->block(), - InnerScope(), - ss.str(), - &value_2_var_name_, - &variable_2_var_name_, - &var_name_2_id_, - &variable_list_); - VLOG(4) << DebugValueInfo(); - - BuildInstruction(); - - BuildInstructionDependences(); - - ir_stream_analyzer_.ConstructEvents(vec_instruction_base_); - // add event for the input var of jit program, since there are async copied - // from gpu_pinned place to gpu place on compute stream. - for (size_t i = 0; i < dependecy_count_.size(); ++i) { - if (dependecy_count_[i] == 0) { - InstructionBase* inst = vec_instruction_base_[i].get(); - if (inst->Name() == "pd.memcpy_d2h" && platform::is_gpu_place(place_)) { - for (auto& item : inst->Inputs()) { - for (auto var_id : item.second) { - auto name = GetNameById(var_id); - if (JitInputVars().count(name)) { - auto device_event = std::make_shared( - place_, platform::GenerateDeviceEventFlag()); - VLOG(4) << "Add input event for input: " << name << " of " - << inst->Name(); - inst->AddEventToWait( - i, device_event, ir_stream_analyzer_.GetWaiterType(inst)); - } - } - } - } - } - } - - for (size_t instr_id = 0; instr_id < vec_instruction_base_.size(); - ++instr_id) { - vec_instruction_base_[instr_id]->Run(); - } - } else { - for (size_t instr_id = 0; instr_id < vec_instruction_base_.size(); - ++instr_id) { - vec_instruction_base_[instr_id]->Run(); - } - } - if (HasLocalScope()) { - ClearLoDTensorArrayInLocalScope(); - } - - // return Fetch Tensors - Scope* inner_scope = InnerScope(); - auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName); - if (fetch_var && need_fetch) { - auto fetch_list = std::move(*fetch_var->GetMutable()); - return fetch_list; - } else { - return {}; +int NewIRInterpreter::GetIdByName(const std::string& name) const { + auto it = var_name_2_id_.find(name); + if (it != var_name_2_id_.end()) { + return it->second; } + return -1; } void NewIRInterpreter::SetCopyProgram(std::shared_ptr prog) { @@ -896,7 +840,8 @@ void NewIRInterpreter::Convert( vec_meta_info[i].var_ref_count_, var_scope_.VarRef(i))); } - AnalyseExecuteOrderForTrace(); + AnalyseExecuteOrderForTrace(dependency_builder_.OpDownstreamMap(), + instruction_scheduling_priority_less); } void NewIRInterpreter::BuildSkipShareLoDInfo() { @@ -1087,7 +1032,6 @@ void NewIRInterpreter::RunInstruction(const Instruction& instr_node) { &(op_func_node->infer_meta_context_)); } VLOG(5) << "after run infer meta"; - if (op_func_node->fluid_op) { // run fluid op ExecutionContext exe_ctx(*(op_func_node->operator_base_.get()), @@ -1411,6 +1355,15 @@ void NewIRInterpreter::CheckGC(const Instruction& instr) { } } +::ir::Value NewIRInterpreter::GetValueByName(const std::string& var_name) { + for (auto kv : value_2_var_name_) { + if (kv.second == var_name) { + return kv.first; + } + } + return nullptr; +} + void NewIRInterpreter::Prepare( const std::vector& feed_names, const std::vector& feed_tensors, @@ -1576,12 +1529,12 @@ void NewIRInterpreter::UpdateSyncOpNum() { // ->(sync_run)-> OP(B) OP(O) ->(direct_run)-> OP(C) ->(direct_run)-> OP(D) If B // is run before C, B may always block to wait for A to finish executing, but in // fact, C can be executed first during this time. -void NewIRInterpreter::AnalyseExecuteOrderForTrace() { +void NewIRInterpreter::AnalyseExecuteOrderForTrace( + std::map> op_downstream_map, + InstructionSchedulingPriorityLess compare) { VLOG(4) << "Analyze the execution order of Trace scheduling mode."; interpreter::ResetAtomicGuard guard(&deps_, &refs_); - - auto op_downstream_map = dependency_builder_.OpDownstreamMap(); - + VLOG(4) << "1"; auto IsReady = [this](size_t next_id) { VLOG(4) << "op_id: " << next_id << ", remain deps: " << deps_[next_id]->DynamicDep(); @@ -1589,7 +1542,7 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace() { }; std::vector trace_order; - SchedulingQueue ready_ops(instruction_scheduling_priority_less); + SchedulingQueue ready_ops(compare); for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) { if (dependecy_count_[instr_id] == 0) { @@ -1618,6 +1571,14 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace() { "trace_order size should be equal to dependecy_count_.")); trace_execute_order_ = trace_order; + + std::stringstream ss; + ss << "trace order: "; + for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) { + ss << trace_execute_order_[idx] << " -> "; + } + ss << "end\n"; + VLOG(6) << ss.str(); } /// ======================== /// @@ -1725,5 +1686,455 @@ void NewIRInterpreter::BuildInstructionDependences() { } } +void NewIRInterpreter::RecordMemcpyD2H(InstructionBase* instr_node) { + // NOTE(zhiqiu): hot fix for jit input var + if (instr_node->Name() == "pd.memcpy_d2h") { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto* default_dev_ctx = pool.Get(place_); + for (auto& event : instr_node->EventsToWait()) { + platform::RecordEvent record( + "RecordStreamEvent", platform::TracerEventType::UserDefined, 10); + VLOG(3) << "Record event on default stream in jit_input_var at op: " + << instr_node->Name(); + event.event_->Record(default_dev_ctx); + } + } +} + +void NewIRInterpreter::RecordStreamForGC(InstructionBase* instr) { +#if !defined(PADDLE_WITH_CUDA) && !defined(PADDLE_WITH_HIP) + PADDLE_THROW(platform::errors::Unimplemented( + "RecordStreamForGC is only implemented when compiled with GPU.")); +#else + if (!IsInterpretercoreFastGCEnabled() || + instr->KernelType() != OpFuncType::kGpuAsync) { + return; + } + if (instr->DeviceContext().GetPlace().GetType() == + phi::AllocationType::CUSTOM) { + return; + } + platform::RecordEvent record( + "RecordStreamForGC", platform::TracerEventType::UserDefined, 10); + + gpuStream_t stream = + reinterpret_cast(instr->DeviceContext()).stream(); + auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) { + auto allocation = tensor.Holder(); + if (allocation == nullptr) { + return; + } + + const platform::Place& place = allocation->place(); + if (platform::is_gpu_place(place)) { + memory::RecordStream(allocation, stream); + } else if (platform::is_cuda_pinned_place(place)) { + // TODO(Ruibiao): Here should do something to make sure that the tensor + // is not freed until the H2D copies done. However, simplely launch a + // CUDA runtime callback to the H2D stream may lead a high performance + // overhead. As all the cases we meet in H2D are copies from CPUPlace at + // present, we just log a WARNING here. A better design is required. + LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous " + "manner may lead a data inconsistent"; + } else { + // memory copies involve CPUPlace are always synchronous, so just do + // nothing here + } + }; + + /* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when + * all the following conditions are satisfied: + * 1. The tensor will be GC after running the instruction, i.e., in + * instr.GCCheckVars. + * 2. The stream which initializes this tensor is different from the stream + * which the instruction run in. + * 3. The tensor is the instruction's input, cause we assume that + * instruction will initialize all output tensors with its running stream. + * 4. In the OP function of this instruction, the tensor is an input of a + * async CUDA kernel. + * + * Here we only process the first condition, because: + * 1. Since the RecordStream function will directly return when the recored + * stream is equal to the owning stream, recording a stream same as which + * initialized this tensor has less time overhead. Conversely, it may take + * more time if we try to extract those cross-stream input vars from + * instr.GCCheckVars. + * 2. Now the instruction has no idea of which vars involving async running + * in OP function, and thus we can not recognize condition 4. It should be + * supported later. + */ + for (int var_id : instr->GCCheckVars()) { + VLOG(4) << "GC sync " << GetNameById(var_id); + + // persistable var will be ignore while GC + ::ir::Value value = GetValueByName(GetNameById(var_id)); + if (value && value.GetDefiningOp()->attributes().count("is_persisable") && + value.GetDefiningOp() + ->attributes() + .at("is_persisable") + .dyn_cast<::ir::BoolAttribute>() + .data()) { + continue; + } + + paddle::framework::Variable* var = variable_list_[var_id]; + if (var == nullptr) { + continue; + } + + if (var->IsType()) { + TensorRecordStream(*(var->GetMutable())); + } else if (var->IsType< + operators::reader:: + OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) { + // do nothing + } else if (var->IsType()) { + TensorRecordStream( + *(var->GetMutable()->mutable_value())); + } else if (var->IsType()) { + auto* tensor_arr = var->GetMutable(); + for (auto& tensor : *tensor_arr) { + TensorRecordStream(tensor); + } + } else if (var->IsType>()) { + // do nothing + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "The variable(%s) is not supported in eager deletion.", + framework::ToTypeName(var->Type()))); + } + } +#endif +} + +void NewIRInterpreter::CheckGC(InstructionBase* instr) { + platform::RecordEvent record( + "CheckGC", platform::TracerEventType::UserDefined, 10); + +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + RecordStreamForGC(instr); +#endif + + for (auto var_id : instr->GCCheckVars()) { + VLOG(4) << "GC:" << GetNameById(var_id) << ", id:" << var_id + << ", ref:" << refs_[var_id]->DynamicRef(); + bool is_ready = refs_[var_id]->CheckAndDecrease(); + // ignore all persistable var while GCphi + ::ir::Value value = GetValueByName(GetNameById(var_id)); + if (value && value.GetDefiningOp()->attributes().count("is_persisable") && + value.GetDefiningOp() + ->attributes() + .at("is_persisable") + .dyn_cast<::ir::BoolAttribute>() + .data()) { + continue; + } + if (is_ready) { + VLOG(6) << "Async delete variable with name : " << GetNameById(var_id); + gc_->Add(refs_[var_id]->Var(), instr); + } + } +} + +void NewIRInterpreter::CalculateLastLiveOps() { + // calculate last_live_ops_ + for (size_t op_idx = 0; op_idx < vec_instruction_base_.size(); ++op_idx) { + InstructionBase* instr = vec_instruction_base_[op_idx].get(); + std::set gc_check_vars; + + const std::unordered_map<::ir::Value, std::vector>& ins = + instr->Inputs(); + const std::unordered_map<::ir::Value, std::vector>& outs = + instr->Outputs(); + std::unordered_multimap<::ir::Value, std::vector> ins_and_outs{ + ins.begin(), ins.end()}; + ins_and_outs.insert(outs.begin(), outs.end()); + + for (auto& item : ins_and_outs) { + for (auto var_id : item.second) { + // skip no_need_buffer input vars + if (ins.count(item.first) && instr->NoNeedBuffer().count(item.first)) { + continue; + } + gc_check_vars.insert(var_id); + } + } + + for (auto var_id : gc_check_vars) { + Scope* inner_scope = InnerScope(); + paddle::framework::Variable* var = + inner_scope->FindVar(GetNameById(var_id)); + if (var->IsType() || var->IsType() || + var->IsType()) { + last_live_ops_[var_id].insert(op_idx); + } else { + VLOG(4) << "not clear " << GetNameById(var_id) << " after " + << instr->Name() << " because its type is " + << framework::ToTypeName(var->Type()); + } + } + } + // clear the last_live_ops list for all vars in skip_gc_vars + for (const std::string& skip_gc_var : execution_config_.skip_gc_vars) { + int var_id = GetIdByName(skip_gc_var); + if (var_id != -1) { + last_live_ops_[var_id].clear(); + VLOG(8) << "Skip gc for var: " << skip_gc_var; + } + } + VLOG(4) << "calculate last_live_ops_"; + + // shrink, find the downstream op that has no other op in the + // downstream list happens before it + // For example, + // b = op1(a) + // c = op2(a, b) + // in this case, a is the input of op1 and op2, we only need to check + // a after op2, because op2 always uses a after op1. + var_ref_count_.resize(variable_list_.size()); + VLOG(4) << "last_live_ops_.size() : " << last_live_ops_.size(); + for (auto kv : last_live_ops_) { + for (auto val : kv.second) { + VLOG(4) << "var: " << kv.first << " -> op: " << val; + } + } + VLOG(4) << "var_ref_count_.size() : " << var_ref_count_.size(); + for (size_t i = 0; i < last_live_ops_.size(); ++i) { + std::set minumum_last_live_ops; + for (auto val : last_live_ops_[i]) { + VLOG(4) << "last_live_ops_: " << val; + } + for (size_t item : last_live_ops_[i]) { + bool not_before_any = true; + // find the op that is not executed before any + for (size_t other_item : last_live_ops_[i]) { + if (ir_dependency_builder_.OpHappensBefore(item, other_item)) { + VLOG(6) << "happens_before: " << item << "->" << other_item + << ", so skip " << item; + not_before_any = false; + break; + } + } + if (not_before_any) { + VLOG(6) << "last live op of var " << i << " " << GetNameById(i) << " : " + << item << " " << vec_instruction_base_[item]->Name(); + minumum_last_live_ops.insert(item); + vec_instruction_base_[item]->AddGCCheckVar(i); + } + } + last_live_ops_[i] = minumum_last_live_ops; + var_ref_count_[i] = last_live_ops_[i].size(); + } + VLOG(4) << "calculate last_live_ops_ 2"; + + for (auto& dep : dependecy_count_) { + deps_.emplace_back(std::make_shared(dep)); + } + for (size_t i = 0; i < variable_list_.size(); ++i) { + refs_.emplace_back(std::make_shared( + var_ref_count_[i], variable_list_[i])); + } + VLOG(4) << "calculate last_live_ops_ 3"; +} + +void NewIRInterpreter::ConstructEventForJitInput() { + for (size_t i = 0; i < dependecy_count_.size(); ++i) { + if (dependecy_count_[i] == 0) { + InstructionBase* inst = vec_instruction_base_[i].get(); + if (inst->Name() == "pd.memcpy_d2h" && platform::is_gpu_place(place_)) { + for (auto& item : inst->Inputs()) { + for (auto var_id : item.second) { + auto name = GetNameById(var_id); + if (JitInputVars().count(name)) { + auto device_event = std::make_shared( + place_, platform::GenerateDeviceEventFlag()); + VLOG(4) << "Add input event for input: " << name << " of " + << inst->Name(); + inst->AddEventToWait( + i, device_event, ir_stream_analyzer_.GetWaiterType(inst)); + } + } + } + } + } + } +} + +FetchList NewIRInterpreter::BetaRun(const std::vector& feed_names, + bool need_fetch) { + SetDeviceId(place_); + CheckCUDAGraphBeforeRun(feed_names); + +#ifdef PADDLE_WITH_MKLDNN + platform::AttachPointerHashToMKLDNNKey(this, place_); +#endif + + if (!is_build_) { + LOG_FIRST_N(INFO, 1) << "New Executor is BetaRunning."; + // Build + std::stringstream ss; + ss << this; + ::ir::BuildScope(*ir_program_->block(), + InnerScope(), + ss.str(), + &value_2_var_name_, + &variable_2_var_name_, + &var_name_2_id_, + &variable_list_); + VLOG(4) << DebugValueInfo(); + + BuildInstruction(); + VLOG(4) << "Done BuildInstruction"; + + PreAnalysis(); + VLOG(4) << "Done PreAnalysis"; + + // Run + BetaRunImpl(); + } else { + BetaRunImpl(); + } + + if (HasLocalScope()) { + ClearLoDTensorArrayInLocalScope(); + } + + // return Fetch Tensors + Scope* inner_scope = InnerScope(); + auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName); + if (fetch_var && need_fetch) { + auto fetch_list = std::move(*fetch_var->GetMutable()); +#ifdef PADDLE_WITH_CUDA + if (platform::IsCUDAGraphCapturing()) { + PADDLE_ENFORCE_EQ(fetch_list.empty(), + true, + platform::errors::InvalidArgument( + "Cannot fetch data when using CUDA Graph.")); + } +#endif + return fetch_list; + } else { + return {}; + } +} + +void NewIRInterpreter::NewIrLoopRunImpl() { + for (size_t instr_id = 0; instr_id < vec_instruction_base_.size(); + ++instr_id) { + vec_instruction_base_[instr_id]->Run(); + } +} + +void NewIRInterpreter::BetaRunImpl() { + // lazy initialization of gc, do not create gc is the program only run once + if (!gc_) { + gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_base_); + } + + interpreter::ResetAtomicGuard guard(&deps_, &refs_); + VLOG(4) << "Tracing Instruction List"; + + TraceInstructionList(vec_instruction_base_); + VLOG(4) << "Done BetaRunImpl"; +} + +void NewIRInterpreter::TraceInstructionList( + const std::vector>& vec_instr) { + unfinished_op_number_ = vec_instr.size(); + if (unfinished_op_number_ == 0) { + VLOG(4) << "No op to run, return"; + return; + } + + exception_holder_.Clear(); + + for (size_t i = 0; i < dependecy_count_.size(); ++i) { + if (dependecy_count_[i] == 0) { + // NOTE(zhiqiu): hot fix for jit input var + RecordMemcpyD2H(vec_instr.at(i).get()); + } + } + + for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) { + auto instr_id = trace_execute_order_[idx]; + InstructionBase* instr_node = vec_instruction_base_.at(instr_id).get(); + + VLOG(6) << "Run InstructionBase " << instr_id; + RunInstructionBase(instr_node); + + if (UNLIKELY(exception_holder_.IsCaught())) { + VLOG(4) << "Exception caught"; + break; + } + } + + if (UNLIKELY(exception_holder_.IsCaught())) { + VLOG(1) << "Exception caught " << exception_holder_.Type(); + PADDLE_ENFORCE_EQ( + main_thread_blocker_.Clear(), + 0, + platform::errors::PreconditionNotMet( + "main_thread_blocker_.Clear() return -1, clear failed")); + VLOG(4) << "clear ok"; + exception_holder_.ReThrow(); + } + VLOG(4) << "Done TraceInstructionList"; +} + +void NewIRInterpreter::RunInstructionBase(InstructionBase* instr_node) { + platform::RecordEvent instruction_event( + instr_node->Name(), platform::TracerEventType::Operator, 1); + + SetDeviceId(instr_node->DeviceContext().GetPlace()); + + try { + instr_node->WaitEvent(place_); + + VLOG(5) << "begin to run op " << instr_node->Name(); + if (!instr_node->IsArtificial()) { + instr_node->Run(); + VLOG(4) << "done instruction node run"; + CheckGC(instr_node); + VLOG(4) << "done CheckGC"; + interpreter::LogDeviceMemoryStats(place_); + } + VLOG(5) << "after run kernel"; + instr_node->RecordEvent(place_); + } catch (platform::EnforceNotMet& ex) { + LOG(WARNING) << instr_node->Name() << " raises an EnforceNotMet exception " + << platform::demangle(typeid(ex).name()) << ", " << ex.what(); + exception_holder_.Catch(std::make_exception_ptr(std::move(ex))); + } catch (platform::EOFException&) { + exception_holder_.Catch(std::current_exception()); + } catch (std::exception& ex) { + LOG(WARNING) << instr_node->Name() << " raises an exception " + << platform::demangle(typeid(ex).name()) << ", " << ex.what(); + exception_holder_.Catch(std::current_exception()); + } catch (...) { + LOG(WARNING) << instr_node->Name() << " raises an unknown exception"; + exception_holder_.Catch(std::current_exception()); + } +} + +void NewIRInterpreter::PreAnalysis() { + BuildInstructionDependences(); + VLOG(4) << "Done BuildInstructionDependences"; + + ir_stream_analyzer_.ConstructEvents(vec_instruction_base_); + VLOG(4) << "Done ConstructEvents"; + + // add event for the input var of jit program, since there are async copied + // from gpu_pinned place to gpu place on compute stream. + ConstructEventForJitInput(); + VLOG(4) << "AddEventToWait for JitInputVars"; + + CalculateLastLiveOps(); + VLOG(4) << "Done CalculateLastLiveOps"; + + AnalyseExecuteOrderForTrace(ir_dependency_builder_.OpDownstreamMap(), + ir_instruction_scheduling_priority_less); + VLOG(4) << "Done AnalyseExecuteOrderForTrace"; +} + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/new_ir_interpreter.h b/paddle/fluid/framework/new_executor/new_ir_interpreter.h index 8011811c44f2fc1f4adbf1fb8b8139dcff58fcfb..28e670afae3e15deae1a25772175664ba6cd90d3 100644 --- a/paddle/fluid/framework/new_executor/new_ir_interpreter.h +++ b/paddle/fluid/framework/new_executor/new_ir_interpreter.h @@ -16,6 +16,7 @@ #include #include "paddle/fluid/framework/new_executor/instruction/instruction_base.h" #include "paddle/fluid/framework/new_executor/interpreter_base_impl.h" +#include "paddle/ir/core/value.h" namespace ir { class Program; @@ -86,6 +87,8 @@ class NewIRInterpreter : public InterpreterBaseImpl { std::string GetNameById(int id) const; + int GetIdByName(const std::string& name) const; + private: // build graph void Convert(std::vector* op_func_nodes); @@ -93,7 +96,11 @@ class NewIRInterpreter : public InterpreterBaseImpl { void BuildAndCacheInstructionCtx(Instruction* instr_node); void BuildSkipShareLoDInfo(); void UpdateSyncOpNum(); - void AnalyseExecuteOrderForTrace(); + void AnalyseExecuteOrderForTrace( + std::map> op_downstream_map, + InstructionSchedulingPriorityLess compare); + void ConstructEventForJitInput(); + void CalculateLastLiveOps(); // inplace void BuildInplace(); @@ -201,10 +208,31 @@ class NewIRInterpreter : public InterpreterBaseImpl { /// ======================== /// std::string DebugValueInfo(); + void PreAnalysis(); + void BuildInstruction(); void BuildInstructionDependences(); + void NewIrLoopRunImpl(); + + void BetaRunImpl(); + + void TraceInstructionList( + const std::vector>& vec_instr); + + void RunInstructionBase(InstructionBase* instr_node); + + void RecordMemcpyD2H(InstructionBase* instr_node); + + ::ir::Value GetValueByName(const std::string& var_name); + + void CheckGC(InstructionBase* instr); + + void RecordStreamForGC(InstructionBase* instr); + + InstructionSchedulingPriorityLess ir_instruction_scheduling_priority_less; + std::unique_ptr<::ir::Program> ir_program_{nullptr}; std::vector> vec_instruction_base_; @@ -218,6 +246,8 @@ class NewIRInterpreter : public InterpreterBaseImpl { std::vector variable_list_; + std::vector var_ref_count_; + interpreter::NewIrDependencyBuilder ir_dependency_builder_; interpreter::NewIrStreamAnalyzer ir_stream_analyzer_; diff --git a/test/cpp/new_executor/standalone_executor_new_ir_test.cc b/test/cpp/new_executor/standalone_executor_new_ir_test.cc index b86731a1e5ba945dd9860a1a52a4bcb2a7381904..3fa9ddd6267906c15b5d2acbb7ae440234d5c347 100644 --- a/test/cpp/new_executor/standalone_executor_new_ir_test.cc +++ b/test/cpp/new_executor/standalone_executor_new_ir_test.cc @@ -70,23 +70,19 @@ TEST(StandaloneExecutor, run) { ProgramDesc prog_desc; InterpreterCore test_core(place, std::move(kernel_program), &scope); - VLOG(0) << "&test_core" << &test_core; - VLOG(0) << "&test_core.impl" << test_core.Impl(); - VLOG(0) << "&test_core.impl.cast" - << reinterpret_cast( - const_cast(test_core.Impl())); - test_core.BetaRun({}); std::stringstream os; os << reinterpret_cast( const_cast(test_core.Impl())); - std::string prefix_str = os.str(); + std::string out_name = os.str() + "_inner_var_2"; + test_core.SetSkipGcVars({out_name}); + + test_core.BetaRun({}); + auto out_tensor = test_core.local_scope() == nullptr - ? scope.FindVar(prefix_str + "_inner_var_2")->Get() - : test_core.local_scope() - ->FindVar(prefix_str + "_inner_var_2") - ->Get(); + ? scope.FindVar(out_name)->Get() + : test_core.local_scope()->FindVar(out_name)->Get(); bool res0 = simple_cmp(out_tensor.data()[0], 2.0); bool res1 = simple_cmp(out_tensor.data()[1], 2.0); @@ -115,18 +111,19 @@ TEST(StandaloneExecutor, run_inplace_sqrt) { auto place = platform::CPUPlace(); Scope scope; InterpreterCore test_core(place, std::move(kernel_program), &scope); - test_core.BetaRun({}); std::stringstream os; os << reinterpret_cast( const_cast(test_core.Impl())); - std::string prefix_str = os.str(); + std::string out_name = os.str() + "_inner_var_0"; + test_core.SetSkipGcVars({out_name}); + + test_core.BetaRun({}); + auto out_tensor = test_core.local_scope() == nullptr - ? scope.FindVar(prefix_str + "_inner_var_0")->Get() - : test_core.local_scope() - ->FindVar(prefix_str + "_inner_var_0") - ->Get(); + ? scope.FindVar(out_name)->Get() + : test_core.local_scope()->FindVar(out_name)->Get(); bool res0 = simple_cmp(out_tensor.data()[0], 2.0); bool res1 = simple_cmp(out_tensor.data()[1], 2.0);