From 9b97ac7076cc5c902f8e0da89265ddb6869cee2f Mon Sep 17 00:00:00 2001 From: Ruibiao Chen Date: Tue, 2 Aug 2022 10:01:17 +0800 Subject: [PATCH] Refactor build_op_downstream_map for standalone executor (#44729) * Refactor build_op_downstream_map for standalone executor * Add some comments --- .../framework/new_executor/CMakeLists.txt | 2 +- .../new_executor/interpreter/CMakeLists.txt | 4 +- .../interpreter/dependency_builder.cc | 590 ++++++++++++++++++ .../interpreter/dependency_builder.h | 73 +++ .../interpreter/dependency_utils.cc | 116 ---- .../interpreter/dependency_utils.h | 54 -- .../framework/new_executor/interpretercore.cc | 5 +- .../framework/new_executor/interpretercore.h | 5 +- .../new_executor/interpretercore_util.cc | 456 -------------- .../new_executor/interpretercore_util.h | 4 - 10 files changed, 671 insertions(+), 638 deletions(-) create mode 100644 paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc create mode 100644 paddle/fluid/framework/new_executor/interpreter/dependency_builder.h delete mode 100644 paddle/fluid/framework/new_executor/interpreter/dependency_utils.cc delete mode 100644 paddle/fluid/framework/new_executor/interpreter/dependency_utils.h diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index 0422b151ac9..f72a8157970 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -12,7 +12,7 @@ set(STANDALONE_EXECUTOR_SRCS standalone_executor.cc) set(STANDALONE_EXECUTOR_DEPS - dependency_utils + dependency_builder device_context op_registry scope diff --git a/paddle/fluid/framework/new_executor/interpreter/CMakeLists.txt b/paddle/fluid/framework/new_executor/interpreter/CMakeLists.txt index 4a569575c73..86d34bf9839 100644 --- a/paddle/fluid/framework/new_executor/interpreter/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/interpreter/CMakeLists.txt @@ -1,4 +1,4 @@ cc_library( - dependency_utils - SRCS dependency_utils.cc + dependency_builder + SRCS dependency_builder.cc DEPS operator) diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc new file mode 100644 index 00000000000..4d005d0e883 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.cc @@ -0,0 +1,590 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h" + +#include + +// The difference between "sequential_run" and "serial_run": +// "sequential_run" dispatches OPs one by one according to the sequence in the +// Program, while "serial_run" ensures that all Ops are scheduled in a singal +// thread. In standalone executor, "sequential_run" is also "serial_run", while +// "serial_run" is not necessarily "sequential_run". +PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run, + false, + "Enable sequential execution for standalone " + "executor, only applied to GPU OPs."); + +namespace paddle { +namespace framework { +namespace interpreter { + +size_t CountDownstreamMap(const std::map>& downstream_map) { + size_t count = 0; + for (auto pair : downstream_map) { + count += pair.second.size(); + } + return count; +} + +bool IsCommunicationOp(const std::string& op_name) { + const std::set special_comm_op_set = { + "send", + "recv", + "send_v2", + "recv_v2", + }; + const std::string communication_op_prefix = "c_"; + if (op_name.find(communication_op_prefix) != std::string::npos || + special_comm_op_set.count(op_name)) { + return true; + } + return false; +} + +// check whether exists prior_op -> ... -> posterior_op to avoid building loops +bool IsDependency(int prior_op_idx, + int posterior_op_idx, + const std::map>& downstream_map) { + std::queue q; + q.push(prior_op_idx); + + while (!q.empty()) { + int op_idx = q.front(); + q.pop(); + + auto it = downstream_map.find(op_idx); + if (it != downstream_map.end()) { + for (int downstream_op_idx : it->second) { + if (downstream_op_idx == posterior_op_idx) { + return true; + } + + // no need for double enqueue checking since DAG is assumed + q.push(downstream_op_idx); + } + } + } + + return false; +} + +const std::string StringizeDownstreamMap( + const std::map>& downstream_map) { + std::ostringstream oss; + for (auto pair : downstream_map) { + oss << pair.first << " -> "; + std::copy(pair.second.begin(), + pair.second.end(), + std::ostream_iterator(oss, " ")); + oss << std::endl; + } + return oss.str(); +} + +const std::map>& DependencyBuilder::Build( + const std::vector& instructions) { + PADDLE_ENFORCE_EQ( + is_build_, + false, + phi::errors::AlreadyExists("The op dependency has been built")); + + instructions_ = &instructions; + op_num_ = instructions_->size(); + + BuildDownstreamMap(); + BuildOpHappensBefore(); + ShrinkDownstreamMap(); + + AddDependencyForCoalesceTensorOp(); + AddDependencyForCommunicationOp(); + AddDependencyForRandomOp(); + AddDependencyForReadOp(); + + if (FLAGS_new_executor_sequential_run) { + AddDependencyForSequentialRun(); + } + + is_build_ = true; + + VLOG(8) << "Finish build dependency"; + VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map_); + VLOG(8) << "downstream_map: " << std::endl + << StringizeDownstreamMap(op_downstream_map_); + + return op_downstream_map_; +} + +bool DependencyBuilder::OpHappensBefore(int prior_op_idx, + int posterior_op_idx) { + PADDLE_ENFORCE_GE( + op_happens_before_.size(), + 0, + phi::errors::Unavailable("op_happen_before is not yet built")); + return op_happens_before_.at(prior_op_idx).at(posterior_op_idx); +} + +void DependencyBuilder::AddDependencyForCoalesceTensorOp() { + const std::string kCoalesceTensor = "coalesce_tensor"; + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + if (instructions_->at(op_idx).OpBase()->Type() == kCoalesceTensor) { + VLOG(4) << "Add depend for " << kCoalesceTensor << " " << op_idx; + auto fused_out = instructions_->at(op_idx).Outputs().at("FusedOutput")[0]; + auto outputs = instructions_->at(op_idx).Outputs().at("Output"); + + auto is_read = [](const Instruction& inst, int var_id) -> bool { + for (auto pair : inst.Inputs()) { + for (auto item : pair.second) { + if (item == var_id) { + return true; + } + } + } + return false; + }; + + auto is_write = [](const Instruction& inst, int var_id) -> bool { + for (auto pair : inst.Outputs()) { + for (auto item : pair.second) { + if (item == var_id) { + return true; + } + } + } + return false; + }; + + // find first op that reads fused_out + auto first_read_fused_out_op = -1; + for (auto j = op_idx + 1; j < op_num_; ++j) { + if (is_read(instructions_->at(j), fused_out)) { + first_read_fused_out_op = j; + break; + } + } + + if (UNLIKELY(first_read_fused_out_op == -1)) { + VLOG(4) << "No op read FusedOutput"; + continue; + } + + // find ops that write 'outputs' between (op_index, + // first_read_fused_out_op) + // add depend: them->first_read_fused_out_op + for (auto j = op_idx + 1; + j < static_cast(first_read_fused_out_op); + ++j) { + for (auto var_id : outputs) { + if (is_write(instructions_->at(j), var_id)) { + AddDownstreamOp(j, first_read_fused_out_op); + } + } + } + + // find first op read 'outputs' between (first_read_fused_out_op, end) + // add depned: first_read_fused_out_op -> first op that reads 'outputs' + + // special case for consecutive communication ops, for example, + // FusedOutput = c_sync_calc_stream(FusedOutput) + // FusedOutput= c_allreduce_sum(FusedOutput) + // FusedOutput = c_sync_comm_stream(FusedOutput) + // we should take the last one to add depned instead of + // 'first_read_fused_out_op' + size_t target = first_read_fused_out_op; + for (size_t j = first_read_fused_out_op + 1; j < op_num_; ++j) { + if (j == target + 1 && + IsCommunicationOp(instructions_->at(target).OpBase()->Type()) && + IsCommunicationOp(instructions_->at(j).OpBase()->Type())) { + VLOG(4) << "Found consecutive communication ops, " + << instructions_->at(target).OpBase()->Type() << " -> " + << instructions_->at(j).OpBase()->Type(); + target = j; + continue; + } + + for (auto var_id : outputs) { + if (is_read(instructions_->at(j), var_id)) { + AddDownstreamOp(target, j); + } + } + } + } + } +} + +void DependencyBuilder::AddDependencyForCommunicationOp() { + auto IsCommunicationOp = [](std::string op) -> bool { + const std::set special_comm_op_set = { + "send", + "recv", + "send_v2", + "recv_v2", + }; + const std::string communication_op_prefix = "c_"; + if (op.find(communication_op_prefix) != std::string::npos || + special_comm_op_set.count(op)) { + return true; + } + return false; + }; + + int dependence_op_idx = -1; + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + if (IsCommunicationOp(instructions_->at(op_idx).OpBase()->Type())) { + if (dependence_op_idx != -1) { + AddDownstreamOp(dependence_op_idx, op_idx); + } + dependence_op_idx = op_idx; + } + } + + // TODO(zhiqiu): there still some cases not handled + // add dependency for c_sync_comm_stream + + // in program, we can add only one c_sync_comm_stream to sync all + // communication ops. + // c_allreduce_sum(a) + // c_allreduce_sum(b) + // c_allreduce_sum(c) + // c_sync_comm_stream(a) + const std::string kSyncComm = "c_sync_comm_stream"; + dependence_op_idx = -1; + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + if (instructions_->at(op_idx).OpBase()->Type() == kSyncComm) { + dependence_op_idx = op_idx; + } else { + if (dependence_op_idx != -1) { + AddDownstreamOp(dependence_op_idx, op_idx); + } + } + } +} + +// make sure that the random op is scheduled sequentially +void DependencyBuilder::AddDependencyForRandomOp() { + const std::set random_op_set = { + "bernoulli", + "poisson", + "multinomial", + "gaussian_random", + "truncated_gaussian_random", + "uniform_random", + "randint", + "randperm", + "exponential", + "sampling_id" + "dropout", + "class_center_sample", + }; + + int dependence_op_idx = -1; + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + if (random_op_set.count(instructions_->at(op_idx).OpBase()->Type())) { + if (dependence_op_idx != -1) { + AddDownstreamOp(dependence_op_idx, op_idx); + } + dependence_op_idx = op_idx; + } + } +} + +// equivalent to add_reader_dependency_pass +void DependencyBuilder::AddDependencyForReadOp() { + std::vector is_startup_ops(op_num_, true); + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + auto it = op_downstream_map_.find(op_idx); + if (it != op_downstream_map_.end()) { + for (size_t downstream_op_idx : it->second) { + is_startup_ops[downstream_op_idx] = false; + } + } + } + + std::vector read_ops; + std::vector startup_ops; + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + if (instructions_->at(op_idx).OpBase()->Type() == "read") { + read_ops.push_back(op_idx); + } + + if (is_startup_ops[op_idx]) { + startup_ops.push_back(op_idx); + } + } + + for (size_t read_op_idx : read_ops) { + for (size_t downstream_op_idx : startup_ops) { + if (read_op_idx != downstream_op_idx && + !IsDependency(downstream_op_idx, read_op_idx, op_downstream_map_)) + AddDownstreamOp(read_op_idx, downstream_op_idx); + } + } +} + +void DependencyBuilder::AddDependencyForSequentialRun() { + int dependence_op_idx = -1; + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + if (!IsCpuOp(instructions_->at(op_idx))) { + if (dependence_op_idx != -1) { + AddDownstreamOp(dependence_op_idx, op_idx); + } + dependence_op_idx = op_idx; + } + } +} + +void DependencyBuilder::AddDownstreamOp(int prior_op_idx, + int posterior_op_idx) { + std::set& downstream_ops = op_downstream_map_[prior_op_idx]; + + if (op_happens_before_.size() != 0) { + PADDLE_ENFORCE_EQ( + op_happens_before_[posterior_op_idx][prior_op_idx], + false, + phi::errors::Unavailable( + "Can not add dependency %d->%d because %d is run before %d", + prior_op_idx, + posterior_op_idx, + posterior_op_idx, + prior_op_idx)); + + for (int op_idx : downstream_ops) { + if (op_happens_before_[op_idx][posterior_op_idx]) { + VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx + << "->" << posterior_op_idx << ", skip adding " << prior_op_idx + << "->" << posterior_op_idx; + return; + } + } + } + + downstream_ops.insert(posterior_op_idx); + + if (op_happens_before_.size() != 0) { + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + if (op_happens_before_[posterior_op_idx][op_idx]) { + op_happens_before_[prior_op_idx][op_idx] = true; + } + } + } + VLOG(8) << prior_op_idx << "->" << posterior_op_idx; + VLOG(8) << "Add dependency from " + << instructions_->at(prior_op_idx).OpBase()->Type() << "(" + << prior_op_idx << ") to " + << instructions_->at(posterior_op_idx).OpBase()->Type() << "(" + << posterior_op_idx << ")"; +} + +void DependencyBuilder::BuildDownstreamMap() { + auto var2min_rw_op = + std::map>(); // # map from variable id to read / + // write op id. + auto var2recent_write_op = + std::map(); // # map from variable to recent write op. + auto op2dependences = + std::map>(); //# map from op to the dependence list, + // op must run after the dependence. + std::set + remove_duplicate; // remove the duplicate between inputs and outputs + + // reserve + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + op2dependences[op_idx] = std::set(); + } + + auto update_var_min_rw_op = + [](const std::map>& op2dependences, + std::map>* var2min_rw_op, + int cur_op, + int rw_var) { + // rw_var is inputs or outputs of cur_op + // this function update the var2min_rw_op set . + if (var2min_rw_op->find(rw_var) == var2min_rw_op->end()) { + (*var2min_rw_op)[rw_var] = std::list(); + } + for (auto dep_op : op2dependences.at(cur_op)) { + var2min_rw_op->at(rw_var).remove(dep_op); + } + var2min_rw_op->at(rw_var).push_back(cur_op); + }; + + for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { + remove_duplicate.clear(); + // step1: update the op2dependences structure + for (auto& item : + instructions_->at(op_idx).Inputs()) { // for all inputs(read only) + for (auto var : item.second) { + if (var2recent_write_op.count(var)) + op2dependences[op_idx].insert(var2recent_write_op[var]); + } + } + + for (auto& item : + instructions_->at(op_idx).Outputs()) { // for all write vars + for (auto var : item.second) { + if (var2min_rw_op.count(var)) { + for (auto dep_op : var2min_rw_op[var]) { + op2dependences[op_idx].insert(dep_op); + } + } + } + } + // the original output of inplace op is also change. + if (!instructions_->at(op_idx).InplaceBackMap().empty()) { + auto& m = instructions_->at(op_idx).InplaceBackMap(); + for (auto& p : m) { + auto& var = p.second; + if (var2min_rw_op.count(var)) { + for (auto dep_op : var2min_rw_op[var]) { + op2dependences[op_idx].insert(dep_op); + } + } + } + } + + // step2: update 2 var2xxxx data structure + for (auto& item : + instructions_->at(op_idx).Outputs()) { // for all write vars + for (auto var : item.second) { + var2recent_write_op[var] = op_idx; + var2min_rw_op[var] = {static_cast(op_idx)}; + remove_duplicate.insert(var); + } + } + + // NOTE(zhiqiu): The inplace op with `transfer` also changes + // original output after that so add original output as well + // original: a->op->a + // after: a->data_transfer->a'->op->a'->transfer_back->a + // which means op writes a and a' + if (!instructions_->at(op_idx).InplaceBackMap().empty()) { + auto& m = instructions_->at(op_idx).InplaceBackMap(); + for (auto& p : m) { + auto var = p.second; + var2recent_write_op[var] = op_idx; + var2min_rw_op[var] = {static_cast(op_idx)}; + remove_duplicate.insert(var); + } + } + + for (auto& item : + instructions_->at(op_idx).Inputs()) { // for all inputs(read only) + for (auto var : item.second) { + if (remove_duplicate.count(var) == + 0) { // var in input list and in output list, so remove it. + update_var_min_rw_op(op2dependences, &var2min_rw_op, op_idx, var); + } + } + } + } + + // convert op2dependences to downstream_map directly. op2dependences is op -> + // it's dependences, we want to get op -> [next ops] map, where ops is the + // next instruction of op. The size of downstream != size of op2dependences + // since there are some ops that have no downstream-op. + for (auto& item : op2dependences) { + int op = item.first; + for (auto dep_op : item.second) { + AddDownstreamOp(dep_op, op); + } + } + + VLOG(6) << "downstream count: " << CountDownstreamMap(op_downstream_map_); + VLOG(6) << "downstream_map: " << std::endl + << StringizeDownstreamMap(op_downstream_map_); +} + +void DependencyBuilder::BuildOpHappensBefore() { + // happens_before[i][j] means i should be executed before j + op_happens_before_.assign(op_num_, std::vector(op_num_, false)); + + // bfs to get all next ops + auto bfs = [&](size_t op_idx) { + std::queue q; + std::vector visited(op_num_, false); + q.push(op_idx); + while (!q.empty()) { + size_t op = q.front(); + q.pop(); + visited[op] = true; + if (!op_downstream_map_.count(op)) { + continue; + } + for (auto next : op_downstream_map_.at(op)) { + if (!visited[next]) { + PADDLE_ENFORCE_EQ(op_happens_before_[next][op_idx], + false, + paddle::platform::errors::AlreadyExists( + "There exists circle in graph, expected " + "%d->%d, but already got %d->%d", + op_idx, + next, + next, + op_idx)); + op_happens_before_[op_idx][next] = true; + VLOG(8) << "happens before: " << op_idx << " " << next; + q.push(next); + } + } + } + }; + + for (size_t i = 0; i < op_num_; ++i) { + bfs(i); + } +} + +void DependencyBuilder::ShrinkDownstreamMap() { + // remove unnecessary downstream ops + // for example, a->b->c + // a: b, c + // b: c + // => + // a: b + // b: c + + // shrink, find the downstream op that has no other op in the + // downstream list happens before it + for (size_t i = 0; i < op_num_; ++i) { + if (op_downstream_map_.find(i) == op_downstream_map_.end()) { + continue; + } + + std::set minumum_nexts; + for (size_t item : op_downstream_map_.at(i)) { + bool not_after_any = true; + // find the op that is not executed after any + for (size_t other_item : op_downstream_map_.at(i)) { + if (op_happens_before_[other_item][item]) { + VLOG(8) << "happens_before: " << other_item << "->" << item + << ", so skip " << item; + not_after_any = false; + break; + } + } + if (not_after_any) { + VLOG(8) << "downstream op of " << i << ": " << item; + minumum_nexts.insert(item); + } + } + op_downstream_map_.at(i) = minumum_nexts; + } + VLOG(6) << "downstream count: " << CountDownstreamMap(op_downstream_map_); + VLOG(6) << "downstream_map: " << std::endl + << StringizeDownstreamMap(op_downstream_map_); +} + +} // namespace interpreter +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_builder.h b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.h new file mode 100644 index 00000000000..9d579014a11 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpreter/dependency_builder.h @@ -0,0 +1,73 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "paddle/fluid/framework/new_executor/new_executor_defs.h" + +namespace paddle { +namespace framework { +namespace interpreter { + +// DependencyBuilder provides some dependency adding function to handle the +// dependency that cannot be explicitly expresed by a Program. It is a +// compromise of the incomplete expression ability of the Program. Do not add +// too many functions here at will, that will bring great burden to the +// Interpretercore. + +class DependencyBuilder { + public: + DependencyBuilder() : is_build_(false), instructions_(nullptr) {} + + // build op dependencies and return the mapping from op to its downstream-op + // set + const std::map>& Build( + const std::vector& instructions); + + bool OpHappensBefore(int prior_op_idx, int posterior_op_idx); + + private: + void AddDependencyForCoalesceTensorOp(); + void AddDependencyForCommunicationOp(); + void AddDependencyForRandomOp(); + void AddDependencyForReadOp(); + void AddDependencyForSequentialRun(); + + void AddDownstreamOp(int prior_op_idx, int posterior_op_idx); + + void BuildDownstreamMap(); + + void BuildOpHappensBefore(); + + void ShrinkDownstreamMap(); + + bool is_build_; + const std::vector* instructions_; // not_own + size_t op_num_; + + // op_happens_before_[i][j] == true means op[i] happens before op[j] + std::vector> op_happens_before_; + + // op_downstream_map_ is the mapping from op to its downstream-op set, that is + // to say, op_downstream_map_[i] == {a, b, c} means op[a], op[b] and op[c] + // should be dispatched after op[i] + std::map> op_downstream_map_; +}; + +} // namespace interpreter +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_utils.cc b/paddle/fluid/framework/new_executor/interpreter/dependency_utils.cc deleted file mode 100644 index e3f88d54d91..00000000000 --- a/paddle/fluid/framework/new_executor/interpreter/dependency_utils.cc +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h" - -#include - -namespace paddle { -namespace framework { -namespace interpreter { - -void AddDownstreamOp(int prior_op_idx, - int posterior_op_idx, - std::map>* op_downstream_map, - const std::vector>* op_happens_before) { - if (op_downstream_map->find(prior_op_idx) == op_downstream_map->end()) { - op_downstream_map->emplace(std::make_pair(prior_op_idx, std::list())); - } else { - if (op_happens_before != nullptr) { - for (int op_idx : op_downstream_map->at(prior_op_idx)) { - if (op_happens_before->at(op_idx).at(posterior_op_idx)) { - VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx - << "->" << posterior_op_idx << ", skip adding " - << prior_op_idx << "->" << posterior_op_idx; - return; - } - } - } - } - - op_downstream_map->at(prior_op_idx).push_back(posterior_op_idx); -} - -// check whether exists prior_op -> ... -> posterior_op to avoid building loops -bool IsDependency(int prior_op_idx, - int posterior_op_idx, - const std::map>& downstream_map) { - std::queue q; - q.push(prior_op_idx); - - while (!q.empty()) { - int op_idx = q.front(); - q.pop(); - - auto it = downstream_map.find(op_idx); - if (it != downstream_map.end()) { - for (int downstream_op_idx : it->second) { - if (downstream_op_idx == posterior_op_idx) { - return true; - } - - // no need for double enqueue checking since DAG is assumed - q.push(downstream_op_idx); - } - } - } - - return false; -} - -void AddDependencyForReadOp( - const std::vector& vec_instruction, - std::map>* downstream_map, - const std::vector>* op_happens_before) { - size_t op_num = vec_instruction.size(); - std::vector is_startup_ops(op_num, true); - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - auto it = downstream_map->find(op_idx); - if (it != downstream_map->end()) { - for (size_t downstream_op_idx : it->second) { - is_startup_ops[downstream_op_idx] = false; - } - } - } - - std::vector read_ops; - std::vector startup_ops; - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - if (vec_instruction[op_idx].OpBase()->Type() == "read") { - read_ops.push_back(op_idx); - } - - if (is_startup_ops[op_idx]) { - startup_ops.push_back(op_idx); - } - } - - for (size_t read_op_idx : read_ops) { - for (size_t downstream_op_idx : startup_ops) { - if (read_op_idx != downstream_op_idx && - !IsDependency(downstream_op_idx, read_op_idx, *downstream_map)) - AddDownstreamOp( - read_op_idx, downstream_op_idx, downstream_map, op_happens_before); - VLOG(4) << "Add depend from " - << vec_instruction[read_op_idx].OpBase()->Type() << "(" - << read_op_idx << ") to " - << vec_instruction[downstream_op_idx].OpBase()->Type() << "(" - << downstream_op_idx << ")"; - } - } -} - -} // namespace interpreter -} // namespace framework -} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_utils.h b/paddle/fluid/framework/new_executor/interpreter/dependency_utils.h deleted file mode 100644 index 206a519f55a..00000000000 --- a/paddle/fluid/framework/new_executor/interpreter/dependency_utils.h +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// This file provides some dependency adding function to handle the implicit -// dependency that cannot be explicitly expresed by a Program. It is a -// compromise of the incomplete expression ability of the Program. Do not add -// too many functions here at will, that will bring great burden to the -// Interpretercore. - -// TODO(Ruibiao): -// 1. Move other dependency adding codes from interpretercore_util.cc to -// dependency_utils.cc -// 2. Move other Interpretercore related codes to directory -// new_executor/interpreter -// 3. Try to remove parameter op_happens_before from the dependency adding -// function - -#pragma once - -#include -#include - -#include "paddle/fluid/framework/new_executor/new_executor_defs.h" - -namespace paddle { -namespace framework { -namespace interpreter { - -// equivalent to add_reader_dependency_pass -void AddDependencyForReadOp( - const std::vector& vec_instruction, - std::map>* downstream_map, - const std::vector>* op_happens_before = nullptr); - -void AddDownstreamOp( - int prior_op_idx, - int posterior_op_idx, - std::map>* op_downstream_map, - const std::vector>* op_happens_before = nullptr); - -} // namespace interpreter -} // namespace framework -} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 4b72d6bea34..b9a235bafac 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -333,8 +333,7 @@ void InterpreterCore::BuildOperatorDependences() { // Schedule auto op_nums = vec_instruction_.size(); dependecy_count_.resize(op_nums); - auto op2downstream = interpreter::build_op_downstream_map( - vec_instruction_, &op_happens_before_); + auto op2downstream = dependency_builder_.Build(vec_instruction_); for (size_t op = 0; op < vec_instruction_.size(); ++op) { auto op_list = op2downstream[op]; std::vector downsteam_vector(op_list.begin(), op_list.end()); @@ -450,7 +449,7 @@ void InterpreterCore::Convert( bool not_before_any = true; // find the op that is not executed before any for (size_t other_item : last_live_ops_[i]) { - if (op_happens_before_[item][other_item]) { + if (dependency_builder_.OpHappensBefore(item, other_item)) { VLOG(8) << "happens_before: " << item << "->" << other_item << ", so skip " << item; not_before_any = false; diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index 5efd1d9385b..7069be5af16 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -22,6 +22,7 @@ #include "paddle/fluid/framework/details/exception_holder.h" #include "paddle/fluid/framework/new_executor/event_manager.h" #include "paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h" +#include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h" #include "paddle/fluid/framework/new_executor/profiler.h" @@ -106,6 +107,8 @@ class InterpreterCore { const BlockDesc& block_; // not owned const std::set skip_gc_vars_; + interpreter::DependencyBuilder dependency_builder_; + // NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will // copy a new program and block, the copy_program_ here is used to // hold the program, otherwise block_ maybe not valid after the @@ -119,8 +122,6 @@ class InterpreterCore { std::vector vec_instruction_; // deconstruct before OpFuncNode - // op_happens_before_[i][j] == true means op[i] happens before op[j] - std::vector> op_happens_before_; // last_live_ops_[i] contains the id of operatos that last access var[i] std::map> last_live_ops_; diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index b58a74a659c..f1ef875e4db 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -18,7 +18,6 @@ #include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/new_executor/data_transfer.h" -#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h" #include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" #include "paddle/fluid/operators/controlflow/recurrent_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h" @@ -29,16 +28,6 @@ #include "paddle/fluid/platform/mkldnn_helper.h" #endif -// The difference between "sequential_run" and "serial_run": -// "sequential_run" dispatches OPs one by one according to the sequence in the -// Program, while "serial_run" ensures that all Ops are scheduled in a singal -// thread. In standalone executor, "sequential_run" is also "serial_run", while -// "serial_run" is not necessarily "sequential_run". -PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run, - false, - "Enable sequential execution for standalone " - "executor, only applied to GPU OPs."); - PADDLE_DEFINE_EXPORTED_bool( new_executor_serial_run, false, @@ -718,451 +707,6 @@ std::vector merge_vector(const std::vector& first, return out; } -void update_var_min_rw_op(const std::map>& op2dependences, - std::map>* var2min_rw_op, - int cur_op, - int rw_var) { - // rw_var is inputs or outputs of cur_op - // this function update the var2min_rw_op set . - if (var2min_rw_op->find(rw_var) == var2min_rw_op->end()) { - (*var2min_rw_op)[rw_var] = std::list(); - } - for (auto dep_op : op2dependences.at(cur_op)) { - var2min_rw_op->at(rw_var).remove(dep_op); - } - var2min_rw_op->at(rw_var).push_back(cur_op); -} - -size_t CountDownstreamMap(const std::map>& downstream_map) { - size_t count = 0; - for (auto pair : downstream_map) { - count += pair.second.size(); - } - return count; -} - -const std::string StringizeDownstreamMap( - const std::map>& downstream_map) { - std::ostringstream oss; - for (auto pair : downstream_map) { - oss << pair.first << " -> "; - std::copy(pair.second.begin(), - pair.second.end(), - std::ostream_iterator(oss, " ")); - oss << std::endl; - } - return oss.str(); -} - -// convert op2dependences to downstream_map directly. op2dependences is op -> -// it's dependences, we want to get op -> [next ops] map, where ops is the next -// instruction of op. -std::map> GetDownstreamMap( - const std::map>& op2dependences) { - std::map> downstream_map; - for (auto& item : op2dependences) { - int op = item.first; - for (auto dep_op : item.second) { - AddDownstreamOp(dep_op, op, &downstream_map); - } - } - - VLOG(6) << "downstream count: " << CountDownstreamMap(downstream_map); - VLOG(6) << "downstream_map: " << std::endl - << StringizeDownstreamMap(downstream_map); - - return downstream_map; -} - -void ShrinkDownstreamMap(std::map>* downstream_map, - std::vector>* op_happens_before, - size_t op_num) { - // remove unnecessary downstream ops - // for example, a->b->c - // a: b, c - // b: c - // => - // a: b - // b: c - - // happens_before[i][j] means i should be executed before j - op_happens_before->assign(op_num, std::vector(op_num, false)); - - // bfs to get all next ops - auto bfs = [&](size_t op_idx) { - std::queue q; - std::vector visited(op_num, false); - q.push(op_idx); - while (!q.empty()) { - size_t op = q.front(); - q.pop(); - visited[op] = true; - if (!downstream_map->count(op)) { - continue; - } - for (auto next : downstream_map->at(op)) { - if (!visited[next]) { - PADDLE_ENFORCE_EQ((*op_happens_before)[next][op_idx], - false, - paddle::platform::errors::AlreadyExists( - "There exists circle in graph, expected " - "%d->%d, but already got %d->%d", - op_idx, - next, - next, - op_idx)); - (*op_happens_before)[op_idx][next] = true; - VLOG(8) << "happens before: " << op_idx << " " << next; - q.push(next); - } - } - } - }; - - for (size_t i = 0; i < op_num; ++i) { - bfs(i); - } - - // shrink, find the downstream op that has no other op in the - // downstream list happens before it - for (size_t i = 0; i < op_num; ++i) { - if (downstream_map->find(i) == downstream_map->end()) { - continue; - } - - std::list minumum_nexts; - for (size_t item : downstream_map->at(i)) { - bool not_after_any = true; - // find the op that is not executed after any - for (size_t other_item : downstream_map->at(i)) { - if ((*op_happens_before)[other_item][item]) { - VLOG(8) << "happens_before: " << other_item << "->" << item - << ", so skip " << item; - not_after_any = false; - break; - } - } - if (not_after_any) { - VLOG(8) << "downstream op of " << i << ": " << item; - minumum_nexts.push_back(item); - } - } - downstream_map->at(i) = minumum_nexts; - } - VLOG(6) << "downstream count: " << CountDownstreamMap(*downstream_map); - VLOG(6) << "downstream_map: " << std::endl - << StringizeDownstreamMap(*downstream_map); -} - -std::map> build_op_downstream_map( - const std::vector& vec_instruction, - std::vector>* op_happens_before) { - auto var2min_rw_op = - std::map>(); // # map from variable id to read / - // write op id. - auto var2recent_write_op = - std::map(); // # map from variable to recent write op. - auto op2dependences = - std::map>(); //# map from op to the dependence list, - // op must run after the dependence. - std::set - remove_duplicate; // remove the duplicate between inputs and outputs - - size_t op_num = vec_instruction.size(); - - // reserve - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - op2dependences[op_idx] = std::set(); - } - - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - remove_duplicate.clear(); - // step1: update the op2dependences structure - for (auto& item : - vec_instruction[op_idx].Inputs()) { // for all inputs(read only) - for (auto var : item.second) { - if (var2recent_write_op.count(var)) - op2dependences[op_idx].insert(var2recent_write_op[var]); - } - } - - for (auto& item : - vec_instruction[op_idx].Outputs()) { // for all write vars - for (auto var : item.second) { - if (var2min_rw_op.count(var)) { - for (auto dep_op : var2min_rw_op[var]) { - op2dependences[op_idx].insert(dep_op); - } - } - } - } - // the original output of inplace op is also change. - if (!vec_instruction[op_idx].InplaceBackMap().empty()) { - auto& m = vec_instruction[op_idx].InplaceBackMap(); - for (auto& p : m) { - auto& var = p.second; - if (var2min_rw_op.count(var)) { - for (auto dep_op : var2min_rw_op[var]) { - op2dependences[op_idx].insert(dep_op); - } - } - } - } - - // step2: update 2 var2xxxx data structure - for (auto& item : - vec_instruction[op_idx].Outputs()) { // for all write vars - for (auto var : item.second) { - var2recent_write_op[var] = op_idx; - var2min_rw_op[var] = {static_cast(op_idx)}; - remove_duplicate.insert(var); - } - } - - // NOTE(zhiqiu): The inplace op with `transfer` also changes - // original output after that so add original output as well - // original: a->op->a - // after: a->data_transfer->a'->op->a'->transfer_back->a - // which means op writes a and a' - if (!vec_instruction[op_idx].InplaceBackMap().empty()) { - auto& m = vec_instruction[op_idx].InplaceBackMap(); - for (auto& p : m) { - auto var = p.second; - var2recent_write_op[var] = op_idx; - var2min_rw_op[var] = {static_cast(op_idx)}; - remove_duplicate.insert(var); - } - } - - for (auto& item : - vec_instruction[op_idx].Inputs()) { // for all inputs(read only) - for (auto var : item.second) { - if (remove_duplicate.count(var) == - 0) { // var in input list and in output list, so remove it. - update_var_min_rw_op(op2dependences, &var2min_rw_op, op_idx, var); - } - } - } - } - - // NOTE(zhiqiu): the size of downstream != size of op2dependences since there - // are some ops that have no downstream-op. - std::map> op_downstream_map = - GetDownstreamMap(op2dependences); - - ShrinkDownstreamMap(&op_downstream_map, op_happens_before, op_num); - - // add dependences for random op, make sure that the random op is scheduled - // sequentially - const std::set random_op_set = { - "bernoulli", - "poisson", - "multinomial", - "gaussian_random", - "truncated_gaussian_random", - "uniform_random", - "randint", - "randperm", - "exponential", - "sampling_id" - "dropout", - "class_center_sample", - }; - - int dependence_op_idx = -1; - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) { - if (dependence_op_idx != -1) { - AddDownstreamOp( - dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); - } - dependence_op_idx = op_idx; - } - } - - // add dependency for communication op - auto is_comm_op = [](std::string op) -> bool { - const std::set special_comm_op_set = { - "send", - "recv", - "send_v2", - "recv_v2", - }; - const std::string communication_op_prefix = "c_"; - if (op.find(communication_op_prefix) != std::string::npos || - special_comm_op_set.count(op)) { - return true; - } - return false; - }; - - dependence_op_idx = -1; - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) { - if (dependence_op_idx != -1) { - AddDownstreamOp( - dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); - VLOG(4) << "Add depend from " - << vec_instruction[dependence_op_idx].OpBase()->Type() << " to " - << vec_instruction[op_idx].OpBase()->Type(); - } - dependence_op_idx = op_idx; - } - } - - // TODO(zhiqiu): there still some cases not handled - // add dependency for c_sync_comm_stream - - // in program, we can add only one c_sync_comm_stream to sync all - // communication ops. - // c_allreduce_sum(a) - // c_allreduce_sum(b) - // c_allreduce_sum(c) - // c_sync_comm_stream(a) - const std::string kSyncComm = "c_sync_comm_stream"; - dependence_op_idx = -1; - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - if (vec_instruction[op_idx].OpBase()->Type() == kSyncComm) { - dependence_op_idx = op_idx; - } else { - if (dependence_op_idx != -1) { - VLOG(4) << "Add depend from " - << vec_instruction[dependence_op_idx].OpBase()->Type() << " to " - << vec_instruction[op_idx].OpBase()->Type(); - AddDownstreamOp( - dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); - } - } - } - - // add dependency for coalesce_tensor - const std::string kCoalesceTensor = "coalesce_tensor"; - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - if (vec_instruction[op_idx].OpBase()->Type() == kCoalesceTensor) { - VLOG(4) << "Add depend for " << kCoalesceTensor << " " << op_idx; - auto fused_out = vec_instruction[op_idx].Outputs().at("FusedOutput")[0]; - auto outputs = vec_instruction[op_idx].Outputs().at("Output"); - - auto is_read = [](const Instruction& inst, int var_id) -> bool { - for (auto pair : inst.Inputs()) { - for (auto item : pair.second) { - if (item == var_id) { - return true; - } - } - } - return false; - }; - - auto is_write = [](const Instruction& inst, int var_id) -> bool { - for (auto pair : inst.Outputs()) { - for (auto item : pair.second) { - if (item == var_id) { - return true; - } - } - } - return false; - }; - - // find first op that reads fused_out - auto first_read_fused_out_op = -1; - for (auto j = op_idx + 1; j < op_num; ++j) { - if (is_read(vec_instruction[j], fused_out)) { - first_read_fused_out_op = j; - break; - } - } - - if (UNLIKELY(first_read_fused_out_op == -1)) { - VLOG(4) << "No op read FusedOutput"; - continue; - } - - // find ops that write 'outputs' between (op_index, - // first_read_fused_out_op) - // add depend: them->first_read_fused_out_op - for (auto j = op_idx + 1; - j < static_cast(first_read_fused_out_op); - ++j) { - for (auto var_id : outputs) { - if (is_write(vec_instruction[j], var_id)) { - AddDownstreamOp(j, - first_read_fused_out_op, - &op_downstream_map, - op_happens_before); - VLOG(4) << j << " -> " << first_read_fused_out_op; - VLOG(4) - << "Add depend from " << vec_instruction[j].OpBase()->Type() - << " to " - << vec_instruction[first_read_fused_out_op].OpBase()->Type(); - } - } - } - - // find first op read 'outputs' between (first_read_fused_out_op, end) - // add depned: first_read_fused_out_op -> first op that reads 'outputs' - - // special case for consecutive communication ops, for example, - // FusedOutput = c_sync_calc_stream(FusedOutput) - // FusedOutput= c_allreduce_sum(FusedOutput) - // FusedOutput = c_sync_comm_stream(FusedOutput) - // we should take the last one to add depned instead of - // 'first_read_fused_out_op' - size_t target = first_read_fused_out_op; - for (size_t j = first_read_fused_out_op + 1; j < op_num; ++j) { - if (j == target + 1 && - is_comm_op(vec_instruction[target].OpBase()->Type()) && - is_comm_op(vec_instruction[j].OpBase()->Type())) { - VLOG(4) << "Found consecutive communication ops, " - << vec_instruction[target].OpBase()->Type() << " -> " - << vec_instruction[j].OpBase()->Type(); - target = j; - continue; - } - - for (auto var_id : outputs) { - if (is_read(vec_instruction[j], var_id)) { - AddDownstreamOp(target, j, &op_downstream_map, op_happens_before); - VLOG(4) << target << " -> " << j; - VLOG(4) << "Add depend from " - << vec_instruction[target].OpBase()->Type() << " to " - << vec_instruction[j].OpBase()->Type(); - } - } - } - } - } - - if (FLAGS_new_executor_sequential_run) { - dependence_op_idx = -1; - for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { - if (!IsCpuOp(vec_instruction[op_idx])) { - if (dependence_op_idx != -1) { - AddDownstreamOp( - dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); - VLOG(4) << "Add depend from " - << vec_instruction[dependence_op_idx].OpBase()->Type() << "(" - << dependence_op_idx << ") to " - << vec_instruction[op_idx].OpBase()->Type() << "(" << op_idx - << ")"; - } - dependence_op_idx = op_idx; - } - } - } - - AddDependencyForReadOp( - vec_instruction, &op_downstream_map, op_happens_before); - - VLOG(8) << "build_op_downstream_map finished"; - VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map); - VLOG(8) << "downstream_map: " << std::endl - << StringizeDownstreamMap(op_downstream_map); - - return op_downstream_map; -} - } // namespace interpreter } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 6296db41db9..dfbc493d9dc 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -82,10 +82,6 @@ void build_op_func_list(const platform::Place& place, VariableScope* scope, bool use_local_scope = true); -std::map> build_op_downstream_map( - const std::vector& vec_instruction, - std::vector>* op_happens_before); - void add_fetch(const std::vector& fetch_names, framework::BlockDesc* block); -- GitLab