未验证 提交 dc96ebc0 编写于 作者: Z zhangbo9674 提交者: GitHub

[IR] Support GC and TraceRun for NewIr InterpreterCore (#55772)

* add interface

* add code

* add code

* add code

* add code

* fix bug

* fix bug

* add var prefix

* add code

* add code

* add code

* fix compile bug

* fix bug

* refine code

* refine code

* refine code

* refine code

* fix bug

* add code

* add code

* fix bug

* add code

* add code

* refine code

* refine code

* fix bug
上级 6f53d3b2
......@@ -37,6 +37,19 @@ InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector(
}
}
InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instruction) {
WorkQueueOptions options(/*name*/ "GarbageCollector",
/*num_threads*/ 1,
/*allow_spinning*/ true,
/*track_task*/ false);
queue_ = CreateSingleThreadedWorkQueue(options);
for (auto& instruc : vec_instruction) {
gc_event_.emplace_back(instruc->DeviceContext().GetPlace(),
platform::GenerateDeviceEventFlag());
}
}
InterpreterCoreEventGarbageCollector::~InterpreterCoreEventGarbageCollector() {
queue_.reset(nullptr);
}
......@@ -53,6 +66,18 @@ void InterpreterCoreEventGarbageCollector::Add(Variable* var,
Add(var, &gc_event_.at(instr.Id()), &instr.DeviceContext());
}
void InterpreterCoreEventGarbageCollector::Add(Variable* var,
const InstructionBase* instr) {
PADDLE_ENFORCE_LT(instr->Id(),
gc_event_.size(),
platform::errors::OutOfRange(
"The index should be less than the size of gc event "
", but got index is %d and size is %d",
instr->Id(),
gc_event_.size()));
Add(var, &gc_event_.at(instr->Id()), &instr->DeviceContext());
}
void InterpreterCoreEventGarbageCollector::Add(
Variable* var,
platform::DeviceEvent* event,
......
......@@ -26,9 +26,16 @@ class InterpreterCoreEventGarbageCollector
public:
InterpreterCoreEventGarbageCollector(
const std::vector<Instruction>& vec_instruction);
InterpreterCoreEventGarbageCollector(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instruction);
~InterpreterCoreEventGarbageCollector();
void Add(Variable* var, const Instruction& instruction) override;
void Add(Variable* var, const InstructionBase* instruction) override;
private:
void Add(Variable* var,
platform::DeviceEvent* event,
......
......@@ -22,6 +22,11 @@ void InterpreterCoreFastGarbageCollector::Add(Variable* var,
Add(var);
}
void InterpreterCoreFastGarbageCollector::Add(Variable* var,
const InstructionBase*) {
Add(var);
}
void InterpreterCoreFastGarbageCollector::Add(Variable* var) {
if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) {
return;
......
......@@ -23,6 +23,8 @@ class InterpreterCoreFastGarbageCollector
public:
void Add(Variable* var, const Instruction& instr) override;
void Add(Variable* var, const InstructionBase* instr) override;
private:
void Add(Variable* var);
void Add(Garbage garbage);
......
......@@ -49,6 +49,36 @@ InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() {
cur_memory_size_ = 0;
}
std::unique_ptr<InterpreterCoreGarbageCollector>
CreateInterpreterCoreGarbageCollector(
const platform::Place& place,
const std::vector<std::unique_ptr<InstructionBase>>& vec_instruction) {
if (platform::is_gpu_place(place)) {
if (IsInterpretercoreFastGCEnabled()) {
return std::unique_ptr<InterpreterCoreGarbageCollector>(
new InterpreterCoreFastGarbageCollector());
} else {
return std::unique_ptr<InterpreterCoreGarbageCollector>(
new InterpreterCoreEventGarbageCollector(vec_instruction));
}
} else if (platform::is_xpu_place(place)) {
// Because there is no multi-stream on XPU device, fast GC can
// be used.
// Previously, XPU used no_event GC. But `Wait` in no_event GC
// may cause GC delayed, causing no enough memory problem.
// TODO(pangyoki): Multi-stream allocator and multi-stream GC
// are needed to be adapted for XPU.
return std::unique_ptr<InterpreterCoreGarbageCollector>(
new InterpreterCoreFastGarbageCollector());
} else if (platform::is_ipu_place(place)) {
return std::unique_ptr<InterpreterCoreGarbageCollector>(
new InterpreterCoreNoEventGarbageCollector());
} else {
return std::unique_ptr<InterpreterCoreGarbageCollector>(
new InterpreterCoreEventGarbageCollector(vec_instruction));
}
}
std::unique_ptr<InterpreterCoreGarbageCollector>
CreateInterpreterCoreGarbageCollector(
const platform::Place& place,
......
......@@ -15,6 +15,7 @@
#include <queue>
#include "paddle/fluid/framework/new_executor/instruction/instruction_base.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/device_event.h"
......@@ -34,6 +35,8 @@ class InterpreterCoreGarbageCollector {
virtual void Add(Variable* var, const Instruction& instruction) = 0;
virtual void Add(Variable* var, const InstructionBase* instruction) = 0;
DISABLE_COPY_AND_ASSIGN(InterpreterCoreGarbageCollector);
protected:
......@@ -50,5 +53,10 @@ CreateInterpreterCoreGarbageCollector(
const platform::Place& place,
const std::vector<Instruction>& vec_instruction);
std::unique_ptr<InterpreterCoreGarbageCollector>
CreateInterpreterCoreGarbageCollector(
const platform::Place& place,
const std::vector<std::unique_ptr<InstructionBase>>& vec_instruction);
} // namespace framework
} // namespace paddle
......@@ -36,6 +36,11 @@ void InterpreterCoreNoEventGarbageCollector::Add(Variable* var,
Add(var, &instr.DeviceContext());
}
void InterpreterCoreNoEventGarbageCollector::Add(Variable* var,
const InstructionBase* instr) {
Add(var, &instr->DeviceContext());
}
void InterpreterCoreNoEventGarbageCollector::Add(
Variable* var, const platform::DeviceContext* ctx) {
if (UNLIKELY(max_memory_size_ < 0) || var == nullptr) {
......
......@@ -28,6 +28,8 @@ class InterpreterCoreNoEventGarbageCollector
~InterpreterCoreNoEventGarbageCollector();
void Add(Variable* var, const Instruction& instr) override;
void Add(Variable* var, const InstructionBase* instr) override;
private:
void Add(Variable* var, const platform::DeviceContext* ctx);
void Add(Garbage garbage, const platform::DeviceContext* ctx);
......
......@@ -246,6 +246,7 @@ PhiKernelInstruction::PhiKernelInstruction(
kernel_context_.SetDeviceContext(phi::DeviceContextPool::Instance().Get(
phi::TransToPhiPlace(kernel_key.backend())));
VLOG(6) << "finish process kernel context";
SetDeviceContext(
ParseDeviceContext(op,
phi::DeviceContextPool::Instance().Get(
......
......@@ -38,6 +38,7 @@
#include "paddle/fluid/framework/new_executor/instruction/phi_kernel_instruction.h"
#include "paddle/fluid/ir/phi_kernel_adaptor/phi_kernel_util.h"
#include "paddle/ir/core/builtin_attribute.h"
namespace paddle {
namespace framework {
......@@ -90,6 +91,17 @@ NewIRInterpreter::NewIRInterpreter(const platform::Place& place,
return lhs_scheduling_priority > rhs_scheduling_priority;
};
ir_instruction_scheduling_priority_less = [this](size_t lhs, size_t rhs) {
SchedulingPriority lhs_scheduling_priority =
vec_instruction_base_[lhs]->GetSchedulingPriority();
SchedulingPriority rhs_scheduling_priority =
vec_instruction_base_[rhs]->GetSchedulingPriority();
if (lhs_scheduling_priority == rhs_scheduling_priority) {
return lhs < rhs;
}
return lhs_scheduling_priority > rhs_scheduling_priority;
};
PrepareForCUDAGraphCapture();
}
......@@ -196,13 +208,6 @@ FetchList NewIRInterpreter::Run(const std::vector<std::string>& feed_names,
&variable_list_);
VLOG(4) << DebugValueInfo();
// NOTE(zhangbo): Iterative version, gradually replacing BuildOpFuncList()
// and Convert() by:
// [1] BuildInstruction();
// [2] BuildInstructionDependences();
// [3] ir_stream_analyzer_.ConstructEvents(&vec_instruction_base_);
// [4] GC();
std::vector<paddle::framework::OpFuncNode> op_func_nodes;
interpreter::BuildOpFuncList(place_,
ir_program_->block(),
......@@ -247,73 +252,12 @@ FetchList NewIRInterpreter::Run(const std::vector<std::string>& feed_names,
}
}
FetchList NewIRInterpreter::BetaRun(const std::vector<std::string>& feed_names,
bool need_fetch) {
SetDeviceId(place_);
if (!is_build_) {
LOG_FIRST_N(INFO, 1) << "New Executor is BetaRunning.";
std::stringstream ss;
ss << this;
::ir::BuildScope(*ir_program_->block(),
InnerScope(),
ss.str(),
&value_2_var_name_,
&variable_2_var_name_,
&var_name_2_id_,
&variable_list_);
VLOG(4) << DebugValueInfo();
BuildInstruction();
BuildInstructionDependences();
ir_stream_analyzer_.ConstructEvents(vec_instruction_base_);
// add event for the input var of jit program, since there are async copied
// from gpu_pinned place to gpu place on compute stream.
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
InstructionBase* inst = vec_instruction_base_[i].get();
if (inst->Name() == "pd.memcpy_d2h" && platform::is_gpu_place(place_)) {
for (auto& item : inst->Inputs()) {
for (auto var_id : item.second) {
auto name = GetNameById(var_id);
if (JitInputVars().count(name)) {
auto device_event = std::make_shared<platform::DeviceEvent>(
place_, platform::GenerateDeviceEventFlag());
VLOG(4) << "Add input event for input: " << name << " of "
<< inst->Name();
inst->AddEventToWait(
i, device_event, ir_stream_analyzer_.GetWaiterType(inst));
}
}
}
}
}
}
for (size_t instr_id = 0; instr_id < vec_instruction_base_.size();
++instr_id) {
vec_instruction_base_[instr_id]->Run();
}
} else {
for (size_t instr_id = 0; instr_id < vec_instruction_base_.size();
++instr_id) {
vec_instruction_base_[instr_id]->Run();
}
}
if (HasLocalScope()) {
ClearLoDTensorArrayInLocalScope();
}
// return Fetch Tensors
Scope* inner_scope = InnerScope();
auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName);
if (fetch_var && need_fetch) {
auto fetch_list = std::move(*fetch_var->GetMutable<framework::FetchList>());
return fetch_list;
} else {
return {};
int NewIRInterpreter::GetIdByName(const std::string& name) const {
auto it = var_name_2_id_.find(name);
if (it != var_name_2_id_.end()) {
return it->second;
}
return -1;
}
void NewIRInterpreter::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
......@@ -896,7 +840,8 @@ void NewIRInterpreter::Convert(
vec_meta_info[i].var_ref_count_, var_scope_.VarRef(i)));
}
AnalyseExecuteOrderForTrace();
AnalyseExecuteOrderForTrace(dependency_builder_.OpDownstreamMap(),
instruction_scheduling_priority_less);
}
void NewIRInterpreter::BuildSkipShareLoDInfo() {
......@@ -1087,7 +1032,6 @@ void NewIRInterpreter::RunInstruction(const Instruction& instr_node) {
&(op_func_node->infer_meta_context_));
}
VLOG(5) << "after run infer meta";
if (op_func_node->fluid_op) {
// run fluid op
ExecutionContext exe_ctx(*(op_func_node->operator_base_.get()),
......@@ -1411,6 +1355,15 @@ void NewIRInterpreter::CheckGC(const Instruction& instr) {
}
}
::ir::Value NewIRInterpreter::GetValueByName(const std::string& var_name) {
for (auto kv : value_2_var_name_) {
if (kv.second == var_name) {
return kv.first;
}
}
return nullptr;
}
void NewIRInterpreter::Prepare(
const std::vector<std::string>& feed_names,
const std::vector<phi::DenseTensor>& feed_tensors,
......@@ -1576,12 +1529,12 @@ void NewIRInterpreter::UpdateSyncOpNum() {
// ->(sync_run)-> OP(B) OP(O) ->(direct_run)-> OP(C) ->(direct_run)-> OP(D) If B
// is run before C, B may always block to wait for A to finish executing, but in
// fact, C can be executed first during this time.
void NewIRInterpreter::AnalyseExecuteOrderForTrace() {
void NewIRInterpreter::AnalyseExecuteOrderForTrace(
std::map<size_t, std::set<size_t>> op_downstream_map,
InstructionSchedulingPriorityLess compare) {
VLOG(4) << "Analyze the execution order of Trace scheduling mode.";
interpreter::ResetAtomicGuard guard(&deps_, &refs_);
auto op_downstream_map = dependency_builder_.OpDownstreamMap();
VLOG(4) << "1";
auto IsReady = [this](size_t next_id) {
VLOG(4) << "op_id: " << next_id
<< ", remain deps: " << deps_[next_id]->DynamicDep();
......@@ -1589,7 +1542,7 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace() {
};
std::vector<size_t> trace_order;
SchedulingQueue ready_ops(instruction_scheduling_priority_less);
SchedulingQueue ready_ops(compare);
for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) {
if (dependecy_count_[instr_id] == 0) {
......@@ -1618,6 +1571,14 @@ void NewIRInterpreter::AnalyseExecuteOrderForTrace() {
"trace_order size should be equal to dependecy_count_."));
trace_execute_order_ = trace_order;
std::stringstream ss;
ss << "trace order: ";
for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) {
ss << trace_execute_order_[idx] << " -> ";
}
ss << "end\n";
VLOG(6) << ss.str();
}
/// ======================== ///
......@@ -1725,5 +1686,455 @@ void NewIRInterpreter::BuildInstructionDependences() {
}
}
void NewIRInterpreter::RecordMemcpyD2H(InstructionBase* instr_node) {
// NOTE(zhiqiu): hot fix for jit input var
if (instr_node->Name() == "pd.memcpy_d2h") {
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto* default_dev_ctx = pool.Get(place_);
for (auto& event : instr_node->EventsToWait()) {
platform::RecordEvent record(
"RecordStreamEvent", platform::TracerEventType::UserDefined, 10);
VLOG(3) << "Record event on default stream in jit_input_var at op: "
<< instr_node->Name();
event.event_->Record(default_dev_ctx);
}
}
}
void NewIRInterpreter::RecordStreamForGC(InstructionBase* instr) {
#if !defined(PADDLE_WITH_CUDA) && !defined(PADDLE_WITH_HIP)
PADDLE_THROW(platform::errors::Unimplemented(
"RecordStreamForGC is only implemented when compiled with GPU."));
#else
if (!IsInterpretercoreFastGCEnabled() ||
instr->KernelType() != OpFuncType::kGpuAsync) {
return;
}
if (instr->DeviceContext().GetPlace().GetType() ==
phi::AllocationType::CUSTOM) {
return;
}
platform::RecordEvent record(
"RecordStreamForGC", platform::TracerEventType::UserDefined, 10);
gpuStream_t stream =
reinterpret_cast<const phi::GPUContext&>(instr->DeviceContext()).stream();
auto TensorRecordStream = [&stream](phi::DenseTensor& tensor) {
auto allocation = tensor.Holder();
if (allocation == nullptr) {
return;
}
const platform::Place& place = allocation->place();
if (platform::is_gpu_place(place)) {
memory::RecordStream(allocation, stream);
} else if (platform::is_cuda_pinned_place(place)) {
// TODO(Ruibiao): Here should do something to make sure that the tensor
// is not freed until the H2D copies done. However, simplely launch a
// CUDA runtime callback to the H2D stream may lead a high performance
// overhead. As all the cases we meet in H2D are copies from CPUPlace at
// present, we just log a WARNING here. A better design is required.
LOG(WARNING) << "Copy data from a CUDAPinned tensor in an asynchronous "
"manner may lead a data inconsistent";
} else {
// memory copies involve CPUPlace are always synchronous, so just do
// nothing here
}
};
/* NOTE(Ruibiao):Cross-stream tensor synchronization is required only when
* all the following conditions are satisfied:
* 1. The tensor will be GC after running the instruction, i.e., in
* instr.GCCheckVars.
* 2. The stream which initializes this tensor is different from the stream
* which the instruction run in.
* 3. The tensor is the instruction's input, cause we assume that
* instruction will initialize all output tensors with its running stream.
* 4. In the OP function of this instruction, the tensor is an input of a
* async CUDA kernel.
*
* Here we only process the first condition, because:
* 1. Since the RecordStream function will directly return when the recored
* stream is equal to the owning stream, recording a stream same as which
* initialized this tensor has less time overhead. Conversely, it may take
* more time if we try to extract those cross-stream input vars from
* instr.GCCheckVars.
* 2. Now the instruction has no idea of which vars involving async running
* in OP function, and thus we can not recognize condition 4. It should be
* supported later.
*/
for (int var_id : instr->GCCheckVars()) {
VLOG(4) << "GC sync " << GetNameById(var_id);
// persistable var will be ignore while GC
::ir::Value value = GetValueByName(GetNameById(var_id));
if (value && value.GetDefiningOp()->attributes().count("is_persisable") &&
value.GetDefiningOp()
->attributes()
.at("is_persisable")
.dyn_cast<::ir::BoolAttribute>()
.data()) {
continue;
}
paddle::framework::Variable* var = variable_list_[var_id];
if (var == nullptr) {
continue;
}
if (var->IsType<phi::DenseTensor>()) {
TensorRecordStream(*(var->GetMutable<phi::DenseTensor>()));
} else if (var->IsType<
operators::reader::
OrderedMultiDeviceLoDTensorBlockingQueueHolder>()) {
// do nothing
} else if (var->IsType<phi::SelectedRows>()) {
TensorRecordStream(
*(var->GetMutable<phi::SelectedRows>()->mutable_value()));
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& tensor : *tensor_arr) {
TensorRecordStream(tensor);
}
} else if (var->IsType<std::vector<Scope*>>()) {
// do nothing
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
#endif
}
void NewIRInterpreter::CheckGC(InstructionBase* instr) {
platform::RecordEvent record(
"CheckGC", platform::TracerEventType::UserDefined, 10);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
RecordStreamForGC(instr);
#endif
for (auto var_id : instr->GCCheckVars()) {
VLOG(4) << "GC:" << GetNameById(var_id) << ", id:" << var_id
<< ", ref:" << refs_[var_id]->DynamicRef();
bool is_ready = refs_[var_id]->CheckAndDecrease();
// ignore all persistable var while GCphi
::ir::Value value = GetValueByName(GetNameById(var_id));
if (value && value.GetDefiningOp()->attributes().count("is_persisable") &&
value.GetDefiningOp()
->attributes()
.at("is_persisable")
.dyn_cast<::ir::BoolAttribute>()
.data()) {
continue;
}
if (is_ready) {
VLOG(6) << "Async delete variable with name : " << GetNameById(var_id);
gc_->Add(refs_[var_id]->Var(), instr);
}
}
}
void NewIRInterpreter::CalculateLastLiveOps() {
// calculate last_live_ops_
for (size_t op_idx = 0; op_idx < vec_instruction_base_.size(); ++op_idx) {
InstructionBase* instr = vec_instruction_base_[op_idx].get();
std::set<size_t> gc_check_vars;
const std::unordered_map<::ir::Value, std::vector<int>>& ins =
instr->Inputs();
const std::unordered_map<::ir::Value, std::vector<int>>& outs =
instr->Outputs();
std::unordered_multimap<::ir::Value, std::vector<int>> ins_and_outs{
ins.begin(), ins.end()};
ins_and_outs.insert(outs.begin(), outs.end());
for (auto& item : ins_and_outs) {
for (auto var_id : item.second) {
// skip no_need_buffer input vars
if (ins.count(item.first) && instr->NoNeedBuffer().count(item.first)) {
continue;
}
gc_check_vars.insert(var_id);
}
}
for (auto var_id : gc_check_vars) {
Scope* inner_scope = InnerScope();
paddle::framework::Variable* var =
inner_scope->FindVar(GetNameById(var_id));
if (var->IsType<phi::DenseTensor>() || var->IsType<phi::SelectedRows>() ||
var->IsType<LoDTensorArray>()) {
last_live_ops_[var_id].insert(op_idx);
} else {
VLOG(4) << "not clear " << GetNameById(var_id) << " after "
<< instr->Name() << " because its type is "
<< framework::ToTypeName(var->Type());
}
}
}
// clear the last_live_ops list for all vars in skip_gc_vars
for (const std::string& skip_gc_var : execution_config_.skip_gc_vars) {
int var_id = GetIdByName(skip_gc_var);
if (var_id != -1) {
last_live_ops_[var_id].clear();
VLOG(8) << "Skip gc for var: " << skip_gc_var;
}
}
VLOG(4) << "calculate last_live_ops_";
// shrink, find the downstream op that has no other op in the
// downstream list happens before it
// For example,
// b = op1(a)
// c = op2(a, b)
// in this case, a is the input of op1 and op2, we only need to check
// a after op2, because op2 always uses a after op1.
var_ref_count_.resize(variable_list_.size());
VLOG(4) << "last_live_ops_.size() : " << last_live_ops_.size();
for (auto kv : last_live_ops_) {
for (auto val : kv.second) {
VLOG(4) << "var: " << kv.first << " -> op: " << val;
}
}
VLOG(4) << "var_ref_count_.size() : " << var_ref_count_.size();
for (size_t i = 0; i < last_live_ops_.size(); ++i) {
std::set<size_t> minumum_last_live_ops;
for (auto val : last_live_ops_[i]) {
VLOG(4) << "last_live_ops_: " << val;
}
for (size_t item : last_live_ops_[i]) {
bool not_before_any = true;
// find the op that is not executed before any
for (size_t other_item : last_live_ops_[i]) {
if (ir_dependency_builder_.OpHappensBefore(item, other_item)) {
VLOG(6) << "happens_before: " << item << "->" << other_item
<< ", so skip " << item;
not_before_any = false;
break;
}
}
if (not_before_any) {
VLOG(6) << "last live op of var " << i << " " << GetNameById(i) << " : "
<< item << " " << vec_instruction_base_[item]->Name();
minumum_last_live_ops.insert(item);
vec_instruction_base_[item]->AddGCCheckVar(i);
}
}
last_live_ops_[i] = minumum_last_live_ops;
var_ref_count_[i] = last_live_ops_[i].size();
}
VLOG(4) << "calculate last_live_ops_ 2";
for (auto& dep : dependecy_count_) {
deps_.emplace_back(std::make_shared<interpreter::OpDepInfo>(dep));
}
for (size_t i = 0; i < variable_list_.size(); ++i) {
refs_.emplace_back(std::make_shared<interpreter::VarRefInfo>(
var_ref_count_[i], variable_list_[i]));
}
VLOG(4) << "calculate last_live_ops_ 3";
}
void NewIRInterpreter::ConstructEventForJitInput() {
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
InstructionBase* inst = vec_instruction_base_[i].get();
if (inst->Name() == "pd.memcpy_d2h" && platform::is_gpu_place(place_)) {
for (auto& item : inst->Inputs()) {
for (auto var_id : item.second) {
auto name = GetNameById(var_id);
if (JitInputVars().count(name)) {
auto device_event = std::make_shared<platform::DeviceEvent>(
place_, platform::GenerateDeviceEventFlag());
VLOG(4) << "Add input event for input: " << name << " of "
<< inst->Name();
inst->AddEventToWait(
i, device_event, ir_stream_analyzer_.GetWaiterType(inst));
}
}
}
}
}
}
}
FetchList NewIRInterpreter::BetaRun(const std::vector<std::string>& feed_names,
bool need_fetch) {
SetDeviceId(place_);
CheckCUDAGraphBeforeRun(feed_names);
#ifdef PADDLE_WITH_MKLDNN
platform::AttachPointerHashToMKLDNNKey(this, place_);
#endif
if (!is_build_) {
LOG_FIRST_N(INFO, 1) << "New Executor is BetaRunning.";
// Build
std::stringstream ss;
ss << this;
::ir::BuildScope(*ir_program_->block(),
InnerScope(),
ss.str(),
&value_2_var_name_,
&variable_2_var_name_,
&var_name_2_id_,
&variable_list_);
VLOG(4) << DebugValueInfo();
BuildInstruction();
VLOG(4) << "Done BuildInstruction";
PreAnalysis();
VLOG(4) << "Done PreAnalysis";
// Run
BetaRunImpl();
} else {
BetaRunImpl();
}
if (HasLocalScope()) {
ClearLoDTensorArrayInLocalScope();
}
// return Fetch Tensors
Scope* inner_scope = InnerScope();
auto* fetch_var = inner_scope->FindVar(interpreter::kFetchVarName);
if (fetch_var && need_fetch) {
auto fetch_list = std::move(*fetch_var->GetMutable<framework::FetchList>());
#ifdef PADDLE_WITH_CUDA
if (platform::IsCUDAGraphCapturing()) {
PADDLE_ENFORCE_EQ(fetch_list.empty(),
true,
platform::errors::InvalidArgument(
"Cannot fetch data when using CUDA Graph."));
}
#endif
return fetch_list;
} else {
return {};
}
}
void NewIRInterpreter::NewIrLoopRunImpl() {
for (size_t instr_id = 0; instr_id < vec_instruction_base_.size();
++instr_id) {
vec_instruction_base_[instr_id]->Run();
}
}
void NewIRInterpreter::BetaRunImpl() {
// lazy initialization of gc, do not create gc is the program only run once
if (!gc_) {
gc_ = CreateInterpreterCoreGarbageCollector(place_, vec_instruction_base_);
}
interpreter::ResetAtomicGuard guard(&deps_, &refs_);
VLOG(4) << "Tracing Instruction List";
TraceInstructionList(vec_instruction_base_);
VLOG(4) << "Done BetaRunImpl";
}
void NewIRInterpreter::TraceInstructionList(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instr) {
unfinished_op_number_ = vec_instr.size();
if (unfinished_op_number_ == 0) {
VLOG(4) << "No op to run, return";
return;
}
exception_holder_.Clear();
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
RecordMemcpyD2H(vec_instr.at(i).get());
}
}
for (size_t idx = 0; idx < trace_execute_order_.size(); idx++) {
auto instr_id = trace_execute_order_[idx];
InstructionBase* instr_node = vec_instruction_base_.at(instr_id).get();
VLOG(6) << "Run InstructionBase " << instr_id;
RunInstructionBase(instr_node);
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(4) << "Exception caught";
break;
}
}
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(1) << "Exception caught " << exception_holder_.Type();
PADDLE_ENFORCE_EQ(
main_thread_blocker_.Clear(),
0,
platform::errors::PreconditionNotMet(
"main_thread_blocker_.Clear() return -1, clear failed"));
VLOG(4) << "clear ok";
exception_holder_.ReThrow();
}
VLOG(4) << "Done TraceInstructionList";
}
void NewIRInterpreter::RunInstructionBase(InstructionBase* instr_node) {
platform::RecordEvent instruction_event(
instr_node->Name(), platform::TracerEventType::Operator, 1);
SetDeviceId(instr_node->DeviceContext().GetPlace());
try {
instr_node->WaitEvent(place_);
VLOG(5) << "begin to run op " << instr_node->Name();
if (!instr_node->IsArtificial()) {
instr_node->Run();
VLOG(4) << "done instruction node run";
CheckGC(instr_node);
VLOG(4) << "done CheckGC";
interpreter::LogDeviceMemoryStats(place_);
}
VLOG(5) << "after run kernel";
instr_node->RecordEvent(place_);
} catch (platform::EnforceNotMet& ex) {
LOG(WARNING) << instr_node->Name() << " raises an EnforceNotMet exception "
<< platform::demangle(typeid(ex).name()) << ", " << ex.what();
exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
} catch (platform::EOFException&) {
exception_holder_.Catch(std::current_exception());
} catch (std::exception& ex) {
LOG(WARNING) << instr_node->Name() << " raises an exception "
<< platform::demangle(typeid(ex).name()) << ", " << ex.what();
exception_holder_.Catch(std::current_exception());
} catch (...) {
LOG(WARNING) << instr_node->Name() << " raises an unknown exception";
exception_holder_.Catch(std::current_exception());
}
}
void NewIRInterpreter::PreAnalysis() {
BuildInstructionDependences();
VLOG(4) << "Done BuildInstructionDependences";
ir_stream_analyzer_.ConstructEvents(vec_instruction_base_);
VLOG(4) << "Done ConstructEvents";
// add event for the input var of jit program, since there are async copied
// from gpu_pinned place to gpu place on compute stream.
ConstructEventForJitInput();
VLOG(4) << "AddEventToWait for JitInputVars";
CalculateLastLiveOps();
VLOG(4) << "Done CalculateLastLiveOps";
AnalyseExecuteOrderForTrace(ir_dependency_builder_.OpDownstreamMap(),
ir_instruction_scheduling_priority_less);
VLOG(4) << "Done AnalyseExecuteOrderForTrace";
}
} // namespace framework
} // namespace paddle
......@@ -16,6 +16,7 @@
#include <memory>
#include "paddle/fluid/framework/new_executor/instruction/instruction_base.h"
#include "paddle/fluid/framework/new_executor/interpreter_base_impl.h"
#include "paddle/ir/core/value.h"
namespace ir {
class Program;
......@@ -86,6 +87,8 @@ class NewIRInterpreter : public InterpreterBaseImpl {
std::string GetNameById(int id) const;
int GetIdByName(const std::string& name) const;
private:
// build graph
void Convert(std::vector<paddle::framework::OpFuncNode>* op_func_nodes);
......@@ -93,7 +96,11 @@ class NewIRInterpreter : public InterpreterBaseImpl {
void BuildAndCacheInstructionCtx(Instruction* instr_node);
void BuildSkipShareLoDInfo();
void UpdateSyncOpNum();
void AnalyseExecuteOrderForTrace();
void AnalyseExecuteOrderForTrace(
std::map<size_t, std::set<size_t>> op_downstream_map,
InstructionSchedulingPriorityLess compare);
void ConstructEventForJitInput();
void CalculateLastLiveOps();
// inplace
void BuildInplace();
......@@ -201,10 +208,31 @@ class NewIRInterpreter : public InterpreterBaseImpl {
/// ======================== ///
std::string DebugValueInfo();
void PreAnalysis();
void BuildInstruction();
void BuildInstructionDependences();
void NewIrLoopRunImpl();
void BetaRunImpl();
void TraceInstructionList(
const std::vector<std::unique_ptr<InstructionBase>>& vec_instr);
void RunInstructionBase(InstructionBase* instr_node);
void RecordMemcpyD2H(InstructionBase* instr_node);
::ir::Value GetValueByName(const std::string& var_name);
void CheckGC(InstructionBase* instr);
void RecordStreamForGC(InstructionBase* instr);
InstructionSchedulingPriorityLess ir_instruction_scheduling_priority_less;
std::unique_ptr<::ir::Program> ir_program_{nullptr};
std::vector<std::unique_ptr<InstructionBase>> vec_instruction_base_;
......@@ -218,6 +246,8 @@ class NewIRInterpreter : public InterpreterBaseImpl {
std::vector<Variable*> variable_list_;
std::vector<int> var_ref_count_;
interpreter::NewIrDependencyBuilder ir_dependency_builder_;
interpreter::NewIrStreamAnalyzer ir_stream_analyzer_;
......
......@@ -70,23 +70,19 @@ TEST(StandaloneExecutor, run) {
ProgramDesc prog_desc;
InterpreterCore test_core(place, std::move(kernel_program), &scope);
VLOG(0) << "&test_core" << &test_core;
VLOG(0) << "&test_core.impl" << test_core.Impl();
VLOG(0) << "&test_core.impl.cast"
<< reinterpret_cast<NewIRInterpreter*>(
const_cast<InterpreterBaseImpl*>(test_core.Impl()));
test_core.BetaRun({});
std::stringstream os;
os << reinterpret_cast<NewIRInterpreter*>(
const_cast<InterpreterBaseImpl*>(test_core.Impl()));
std::string prefix_str = os.str();
std::string out_name = os.str() + "_inner_var_2";
test_core.SetSkipGcVars({out_name});
test_core.BetaRun({});
auto out_tensor =
test_core.local_scope() == nullptr
? scope.FindVar(prefix_str + "_inner_var_2")->Get<phi::DenseTensor>()
: test_core.local_scope()
->FindVar(prefix_str + "_inner_var_2")
->Get<phi::DenseTensor>();
? scope.FindVar(out_name)->Get<phi::DenseTensor>()
: test_core.local_scope()->FindVar(out_name)->Get<phi::DenseTensor>();
bool res0 = simple_cmp(out_tensor.data<float>()[0], 2.0);
bool res1 = simple_cmp(out_tensor.data<float>()[1], 2.0);
......@@ -115,18 +111,19 @@ TEST(StandaloneExecutor, run_inplace_sqrt) {
auto place = platform::CPUPlace();
Scope scope;
InterpreterCore test_core(place, std::move(kernel_program), &scope);
test_core.BetaRun({});
std::stringstream os;
os << reinterpret_cast<NewIRInterpreter*>(
const_cast<InterpreterBaseImpl*>(test_core.Impl()));
std::string prefix_str = os.str();
std::string out_name = os.str() + "_inner_var_0";
test_core.SetSkipGcVars({out_name});
test_core.BetaRun({});
auto out_tensor =
test_core.local_scope() == nullptr
? scope.FindVar(prefix_str + "_inner_var_0")->Get<phi::DenseTensor>()
: test_core.local_scope()
->FindVar(prefix_str + "_inner_var_0")
->Get<phi::DenseTensor>();
? scope.FindVar(out_name)->Get<phi::DenseTensor>()
: test_core.local_scope()->FindVar(out_name)->Get<phi::DenseTensor>();
bool res0 = simple_cmp(out_tensor.data<float>()[0], 2.0);
bool res1 = simple_cmp(out_tensor.data<float>()[1], 2.0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册