未验证 提交 ee65599e 编写于 作者: S Sonder 提交者: GitHub

【静态图性能优化】图依赖信息复用 (#55389)

* add share api for DependencyBuilder

* add judge codes for sharing build results

* add ShareBuildResultsFrom

* update ShareDependencyFrom

* fix error

* add share codes

* fix memory error

* update according review

* update notes

* fix code style

* remove const_cast

* fix code style
上级 75517841
......@@ -60,18 +60,26 @@ const std::string StringizeDownstreamMap(
return oss.str();
}
DependencyBuilder::DependencyBuilder()
: is_build_(false), instructions_(nullptr) {
op_downstream_map_ = std::make_shared<std::map<size_t, std::set<size_t>>>();
op_happens_before_ = std::make_shared<std::vector<std::vector<bool>>>();
}
const std::map<size_t, std::set<size_t>>& DependencyBuilder::Build(
const std::vector<Instruction>& instructions) {
if (is_build_) {
return op_downstream_map_;
return *op_downstream_map_;
}
std::tie(op_downstream_map_, op_happens_before_) = GetDependency();
instructions_ = &instructions;
op_num_ = instructions_->size();
ops_before_.assign(op_num_, {});
ops_behind_.assign(op_num_, {});
op_happens_before_.assign(op_num_, std::vector<bool>(op_num_, false));
op_happens_before_->assign(op_num_, std::vector<bool>(op_num_, false));
BuildDownstreamMap();
VLOG(6) << "Finish BuildDownstreamMap";
......@@ -97,13 +105,24 @@ const std::map<size_t, std::set<size_t>>& DependencyBuilder::Build(
VLOG(6) << "Finish AddDependencyForReadOp";
VLOG(6) << "Finish build dependency";
VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map_);
VLOG(8) << "downstream count: " << CountDownstreamMap(*op_downstream_map_);
VLOG(8) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map_);
<< StringizeDownstreamMap(*op_downstream_map_);
is_build_ = true;
return op_downstream_map_;
return *op_downstream_map_;
}
std::tuple<std::shared_ptr<std::map<size_t, std::set<size_t>>>,
std::shared_ptr<std::vector<std::vector<bool>>>>
DependencyBuilder::GetDependency() const {
return std::make_tuple(op_downstream_map_, op_happens_before_);
}
void DependencyBuilder::ShareDependencyFrom(const DependencyBuilder& src) {
std::tie(op_downstream_map_, op_happens_before_) = src.GetDependency();
is_build_ = true;
}
const std::map<size_t, std::set<size_t>>& DependencyBuilder::OpDownstreamMap()
......@@ -113,7 +132,7 @@ const std::map<size_t, std::set<size_t>>& DependencyBuilder::OpDownstreamMap()
true,
phi::errors::Unavailable(
"DependencyBuilder is not yet built, call Build() firstly."));
return op_downstream_map_;
return *op_downstream_map_;
}
void DependencyBuilder::AddDependencyForCoalesceTensorOp() {
......@@ -268,8 +287,8 @@ void DependencyBuilder::AddDependencyForRandomOp() {
void DependencyBuilder::AddDependencyForReadOp() {
std::vector<bool> is_startup_ops(op_num_, true);
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
auto it = op_downstream_map_.find(op_idx);
if (it != op_downstream_map_.end()) {
auto it = op_downstream_map_->find(op_idx);
if (it != op_downstream_map_->end()) {
for (size_t downstream_op_idx : it->second) {
is_startup_ops[downstream_op_idx] = false;
}
......@@ -320,8 +339,7 @@ void DependencyBuilder::AddDownstreamOp(size_t prior_op_idx,
posterior_op_idx,
posterior_op_idx,
prior_op_idx));
std::set<size_t>& downstream_ops = op_downstream_map_[prior_op_idx];
std::set<size_t>& downstream_ops = (*op_downstream_map_)[prior_op_idx];
// NOTE(Ruibiao): Here the downstream map shrinking is best-effort, therefore
// ShrinkDownstreamMap after BuildDownstreamMap is still helpful. For example,
// a->c will not be shrinked in the following case: AddDownstreamOp(a, b) ->
......@@ -342,8 +360,8 @@ void DependencyBuilder::AddDownstreamOp(size_t prior_op_idx,
auto update_op_happen_before = [this](size_t prior_op_idx,
size_t posterior_op_idx) {
if (!op_happens_before_[prior_op_idx][posterior_op_idx]) {
op_happens_before_[prior_op_idx][posterior_op_idx] = true;
if (!(*op_happens_before_)[prior_op_idx][posterior_op_idx]) {
(*op_happens_before_)[prior_op_idx][posterior_op_idx] = true;
ops_before_[posterior_op_idx].push_back(prior_op_idx);
ops_behind_[prior_op_idx].push_back(posterior_op_idx);
}
......@@ -377,8 +395,8 @@ void DependencyBuilder::BuildDownstreamMap() {
std::map<size_t, size_t>(); // # map from variable to recent write op.
auto op2dependences =
std::map<size_t,
std::set<size_t>>(); //# map from op to the dependence list,
// op must run after the dependence.
std::set<size_t>>(); // # map from op to the dependence list,
// op must run after the dependence.
std::set<size_t>
remove_duplicate; // remove the duplicate between inputs and outputs
......@@ -497,15 +515,15 @@ void DependencyBuilder::ShrinkDownstreamMap() {
// shrink, find the downstream op that has no other op in the
// downstream list happens before it
for (size_t i = 0; i < op_num_; ++i) {
if (op_downstream_map_.find(i) == op_downstream_map_.end()) {
if (op_downstream_map_->find(i) == op_downstream_map_->end()) {
continue;
}
std::set<size_t> minumum_nexts;
for (size_t item : op_downstream_map_.at(i)) {
for (size_t item : op_downstream_map_->at(i)) {
bool not_after_any = true;
// find the op that is not executed after any
for (size_t other_item : op_downstream_map_.at(i)) {
for (size_t other_item : op_downstream_map_->at(i)) {
if (OpHappensBefore(other_item, item)) {
VLOG(8) << "happens_before: " << other_item << "->" << item
<< ", so skip " << item;
......@@ -520,12 +538,12 @@ void DependencyBuilder::ShrinkDownstreamMap() {
}
// NOTE(Ruibiao): op_happens_before will not be changed when shrink
// dowstream map
op_downstream_map_.at(i) = minumum_nexts;
(*op_downstream_map_)[i] = minumum_nexts;
}
VLOG(8) << "Finish shrink downstream map";
VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map_);
VLOG(8) << "downstream count: " << CountDownstreamMap(*op_downstream_map_);
VLOG(8) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map_);
<< StringizeDownstreamMap(*op_downstream_map_);
}
/// ======================== ///
......
......@@ -34,23 +34,29 @@ namespace interpreter {
class DependencyBuilder {
public:
DependencyBuilder() : is_build_(false), instructions_(nullptr) {}
DependencyBuilder();
// build op dependencies and return the mapping from op to its downstream-op
// set
const std::map<size_t, std::set<size_t>>& Build(
const std::vector<Instruction>& instructions);
std::tuple<std::shared_ptr<std::map<size_t, std::set<size_t>>>,
std::shared_ptr<std::vector<std::vector<bool>>>>
GetDependency() const;
const std::map<size_t, std::set<size_t>>& OpDownstreamMap() const;
bool OpHappensBefore(size_t prior_op_idx, size_t posterior_op_idx) const {
PADDLE_ENFORCE_GE(
op_happens_before_.size(),
op_happens_before_->size(),
0,
phi::errors::Unavailable("op_happen_before is not yet built"));
return op_happens_before_.at(prior_op_idx).at(posterior_op_idx);
return op_happens_before_->at(prior_op_idx).at(posterior_op_idx);
}
void ShareDependencyFrom(const DependencyBuilder& src);
private:
void AddDependencyForCoalesceTensorOp();
void AddDependencyForCommunicationOp();
......@@ -76,13 +82,13 @@ class DependencyBuilder {
std::vector<std::vector<size_t>> ops_behind_;
// op_downstream_map_ is the mapping from op to its downstream-op set, that is
// to say, op_downstream_map_[i] == {a, b, c} means op[a], op[b] and op[c]
// to say, (*op_downstream_map_)[i] == {a, b, c} means op[a], op[b] and op[c]
// depend on op[i] directly.
std::map<size_t, std::set<size_t>> op_downstream_map_;
std::shared_ptr<std::map<size_t, std::set<size_t>>> op_downstream_map_;
// op_happens_before_ is a matrix form of ops_before_ and ops_behind_, it is
// used to speed up the query.
std::vector<std::vector<bool>> op_happens_before_;
std::shared_ptr<std::vector<std::vector<bool>>> op_happens_before_;
};
// /// ======================== ///
......
......@@ -80,6 +80,8 @@ class InterpreterBaseImpl {
virtual void ShareWorkQueueFrom(InterpreterBaseImpl* src) = 0;
virtual void ShareBuildResultsFrom(const InterpreterBaseImpl& src) = 0;
virtual void SetCopyProgram(std::shared_ptr<ProgramDesc> prog) = 0;
virtual void SetSkipGcVars(const std::set<std::string>& skip_gc_vars) = 0;
......@@ -97,6 +99,11 @@ class InterpreterBaseImpl {
virtual const platform::Place& GetPlace() const = 0;
virtual void SetOutputHooks(const std::vector<HookFunc>& hookfuncs) = 0;
virtual const interpreter::DependencyBuilder& GetDependencyBuilder()
const = 0;
virtual std::shared_ptr<std::vector<size_t>> GetDependencyCount() const = 0;
};
inline void SetDeviceId(const platform::Place& place) {
......
......@@ -81,6 +81,12 @@ void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src) {
impl_->ShareWorkQueueFrom(const_cast<InterpreterBaseImpl*>(src->Impl()));
}
void InterpreterCore::ShareBuildResultsFrom(
std::shared_ptr<InterpreterCore> src) {
// ShareBuildResultsFrom required const InterpreterBaseImpl& src as input
impl_->ShareBuildResultsFrom(*src->Impl());
}
void InterpreterCore::SetCopyProgram(std::shared_ptr<ProgramDesc> prog) {
impl_->SetCopyProgram(prog);
}
......
......@@ -56,6 +56,8 @@ class InterpreterCore {
void ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src);
void ShareBuildResultsFrom(std::shared_ptr<InterpreterCore> src);
void SetCopyProgram(std::shared_ptr<ProgramDesc> prog);
void SetSkipGcVars(const std::set<std::string>& skip_gc_vars);
......
......@@ -351,6 +351,24 @@ void NewIRInterpreter::ShareWorkQueueFrom(InterpreterBaseImpl* src) {
<< ") to InterpreterCore(" << this << ")";
}
void NewIRInterpreter::ShareBuildResultsFrom(const InterpreterBaseImpl& src) {
PADDLE_THROW(platform::errors::Unimplemented(
"ShareBuildResultsFrom is not implemented in NewIRInterpreter."));
}
// op dependences
const interpreter::DependencyBuilder& NewIRInterpreter::GetDependencyBuilder()
const {
PADDLE_THROW(platform::errors::Unimplemented(
"GetDependencyBuilder is not implemented in NewIRInterpreter."));
}
std::shared_ptr<std::vector<size_t>> NewIRInterpreter::GetDependencyCount()
const {
PADDLE_THROW(platform::errors::Unimplemented(
"GetDependencyCount is not implemented in NewIRInterpreter."));
}
bool NewIRInterpreter::BuildInplaceCheckVarIsOnlyInput(
const std::vector<std::vector<size_t>>& input_var2op, size_t var_index) {
if (!var_scope_.VarDesc(var_index)) {
......
......@@ -53,6 +53,13 @@ class NewIRInterpreter : public InterpreterBaseImpl {
void ShareWorkQueueFrom(InterpreterBaseImpl* src) override;
void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override;
// op dependences
const interpreter::DependencyBuilder& GetDependencyBuilder() const override;
std::shared_ptr<std::vector<size_t>> GetDependencyCount() const override;
void SetCopyProgram(std::shared_ptr<ProgramDesc> prog) override;
void SetSkipGcVars(const std::set<std::string>& skip_gc_vars) override;
......
......@@ -53,6 +53,8 @@ ProgramInterpreter::ProgramInterpreter(const platform::Place& place,
exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
completion_notifier_ = main_thread_blocker_.RegisterEvent(kTaskCompletion);
dependecy_count_ = std::make_shared<std::vector<size_t>>();
if (!FLAGS_new_executor_use_local_scope) {
execution_config_.create_local_scope = false;
}
......@@ -288,6 +290,15 @@ void ProgramInterpreter::ShareWorkQueueFrom(InterpreterBaseImpl* src) {
<< ") to InterpreterCore(" << this << ")";
}
void ProgramInterpreter::ShareBuildResultsFrom(const InterpreterBaseImpl& src) {
// share op dependency
dependency_builder_.ShareDependencyFrom(src.GetDependencyBuilder());
dependecy_count_ = src.GetDependencyCount();
is_shared_ = true;
VLOG(8) << "Share BuildResults from InterpreterCore(" << &src
<< ") to InterpreterCore(" << this << ")";
}
bool ProgramInterpreter::BuildInplaceCheckVarIsOnlyInput(
const std::vector<std::vector<size_t>>& input_var2op, size_t var_index) {
if (!var_scope_.VarDesc(var_index)) {
......@@ -316,6 +327,16 @@ ProgramInterpreter::GetWorkQueue() {
return async_work_queue_;
}
const interpreter::DependencyBuilder& ProgramInterpreter::GetDependencyBuilder()
const {
return dependency_builder_;
}
std::shared_ptr<std::vector<size_t>> ProgramInterpreter::GetDependencyCount()
const {
return dependecy_count_;
}
void ProgramInterpreter::BuildAndCacheInstructionCtx(Instruction* instr_node) {
Scope* inner_scope =
HasLocalScope() ? local_scope_ : var_scope_.GetMutableScope();
......@@ -510,7 +531,11 @@ void ProgramInterpreter::BuildOperatorDependences() {
// analysis the dependences between ops, add next_instr_list to each instr,
// and set the dependecy_count_
size_t instr_num = vec_instruction_.size();
dependecy_count_ = std::vector<size_t>(instr_num, 0);
dependecy_count_ = GetDependencyCount();
if (!is_shared_) {
dependecy_count_->assign(instr_num, 0);
}
auto downstream_map = dependency_builder_.Build(vec_instruction_);
for (size_t instr_id = 0; instr_id < instr_num; ++instr_id) {
......@@ -546,8 +571,10 @@ void ProgramInterpreter::BuildOperatorDependences() {
}
}
for (size_t next_instr_id : next_instr_ids) {
++dependecy_count_[next_instr_id];
if (!is_shared_) {
for (size_t next_instr_id : next_instr_ids) {
++(*dependecy_count_)[next_instr_id];
}
}
}
}
......@@ -612,8 +639,8 @@ void ProgramInterpreter::Convert(
// add event for the input var of jit program, since there are async copied
// from gpu_pinned place to gpu place on compute stream.
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
for (size_t i = 0; i < dependecy_count_->size(); ++i) {
if ((*dependecy_count_)[i] == 0) {
auto& inst = vec_instruction_[i];
if (inst.OpBase()->Type() == interpreter::kMemcpyD2H &&
platform::is_gpu_place(place_)) {
......@@ -752,7 +779,7 @@ void ProgramInterpreter::Convert(
BuildInplace();
}
for (auto& dep : dependecy_count_) {
for (auto& dep : *dependecy_count_) {
deps_.emplace_back(std::make_shared<interpreter::OpDepInfo>(dep));
}
for (size_t i = 0; i < vec_meta_info.size(); ++i) {
......@@ -1000,8 +1027,8 @@ void ProgramInterpreter::ExecuteInstructionList(
exception_holder_.Clear();
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
for (size_t i = 0; i < dependecy_count_->size(); ++i) {
if ((*dependecy_count_)[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
RecordMemcpyD2H(vec_instr.at(i));
if (FLAGS_new_executor_serial_run) {
......@@ -1356,8 +1383,8 @@ void ProgramInterpreter::TraceInstructionList(
exception_holder_.Clear();
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
for (size_t i = 0; i < dependecy_count_->size(); ++i) {
if ((*dependecy_count_)[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
RecordMemcpyD2H(vec_instr.at(i));
}
......@@ -1435,8 +1462,8 @@ void ProgramInterpreter::AnalyseExecuteOrderForTrace() {
std::vector<size_t> trace_order;
SchedulingQueue ready_ops(instruction_scheduling_priority_less);
for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) {
if (dependecy_count_[instr_id] == 0) {
for (size_t instr_id = 0; instr_id < dependecy_count_->size(); ++instr_id) {
if ((*dependecy_count_)[instr_id] == 0) {
ready_ops.push(instr_id);
}
}
......@@ -1457,7 +1484,7 @@ void ProgramInterpreter::AnalyseExecuteOrderForTrace() {
PADDLE_ENFORCE_EQ(
trace_order.size(),
dependecy_count_.size(),
dependecy_count_->size(),
platform::errors::PreconditionNotMet(
"trace_order size should be equal to dependecy_count_."));
......
......@@ -54,6 +54,13 @@ class ProgramInterpreter : public InterpreterBaseImpl {
void ShareWorkQueueFrom(InterpreterBaseImpl* src) override;
void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override;
// op dependences
const interpreter::DependencyBuilder& GetDependencyBuilder() const override;
std::shared_ptr<std::vector<size_t>> GetDependencyCount() const override;
void SetCopyProgram(std::shared_ptr<ProgramDesc> prog) override;
void SetSkipGcVars(const std::set<std::string>& skip_gc_vars) override;
......@@ -127,6 +134,9 @@ class ProgramInterpreter : public InterpreterBaseImpl {
bool is_build_{false};
bool static_build_{false};
// Note(sonder): share the op dependency,
// event analyzer, thread scheduling and GC.
bool is_shared_{false};
const platform::Place place_;
const BlockDesc& block_; // not owned
......@@ -167,9 +177,9 @@ class ProgramInterpreter : public InterpreterBaseImpl {
// var
std::map<size_t, std::set<size_t>> last_live_ops_;
// dependecy_count_[i] contains the number of dependencies that the i-th op
// (*dependecy_count_)[i] contains the number of dependencies that the i-th op
// need to wait
std::vector<size_t> dependecy_count_;
std::shared_ptr<std::vector<size_t>> dependecy_count_;
std::vector<std::shared_ptr<interpreter::OpDepInfo>> deps_;
std::vector<std::shared_ptr<interpreter::VarRefInfo>> refs_;
......
......@@ -96,13 +96,26 @@ paddle::framework::FetchList StandaloneExecutor::Run(
const auto& jobs = plan_.JobList();
if (!is_interpretercore_build_result_shared_) {
std::map<std::string, std::vector<size_t>> type_to_id;
for (size_t job_idx = 1; job_idx < jobs.size(); ++job_idx) {
interpretercores_[job_idx]->ShareWorkQueueFrom(interpretercores_[0]);
// TODO(Ruibiao): Share other build result, e.g., kernel choosing, data
// transfer, op dependency, thread scheduling, GC, event analyzer, and so
// on.
type_to_id[jobs[job_idx]->Type()].emplace_back(job_idx);
}
is_interpretercore_build_result_shared_ = true;
// Note(sonder): For the same type of job, share the build result of the
// first job to other jobs. The shared build result includes op dependency,
// event analyzer, thread scheduling and GC.
for (const auto& pair : type_to_id) {
const auto& ids = pair.second;
for (size_t i = 1; i < ids.size(); ++i) {
interpretercores_[ids[i]]->ShareBuildResultsFrom(
interpretercores_[ids[0]]);
}
}
}
for (size_t job_idx = 0; job_idx < jobs.size(); ++job_idx) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册