未验证 提交 9b97ac70 编写于 作者: R Ruibiao Chen 提交者: GitHub

Refactor build_op_downstream_map for standalone executor (#44729)

* Refactor build_op_downstream_map for standalone executor

* Add some comments
上级 d788e727
......@@ -12,7 +12,7 @@ set(STANDALONE_EXECUTOR_SRCS
standalone_executor.cc)
set(STANDALONE_EXECUTOR_DEPS
dependency_utils
dependency_builder
device_context
op_registry
scope
......
cc_library(
dependency_utils
SRCS dependency_utils.cc
dependency_builder
SRCS dependency_builder.cc
DEPS operator)
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h"
#include <queue>
// The difference between "sequential_run" and "serial_run":
// "sequential_run" dispatches OPs one by one according to the sequence in the
// Program, while "serial_run" ensures that all Ops are scheduled in a singal
// thread. In standalone executor, "sequential_run" is also "serial_run", while
// "serial_run" is not necessarily "sequential_run".
PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run,
false,
"Enable sequential execution for standalone "
"executor, only applied to GPU OPs.");
namespace paddle {
namespace framework {
namespace interpreter {
size_t CountDownstreamMap(const std::map<int, std::set<int>>& downstream_map) {
size_t count = 0;
for (auto pair : downstream_map) {
count += pair.second.size();
}
return count;
}
bool IsCommunicationOp(const std::string& op_name) {
const std::set<std::string> special_comm_op_set = {
"send",
"recv",
"send_v2",
"recv_v2",
};
const std::string communication_op_prefix = "c_";
if (op_name.find(communication_op_prefix) != std::string::npos ||
special_comm_op_set.count(op_name)) {
return true;
}
return false;
}
// check whether exists prior_op -> ... -> posterior_op to avoid building loops
bool IsDependency(int prior_op_idx,
int posterior_op_idx,
const std::map<int, std::set<int>>& downstream_map) {
std::queue<int> q;
q.push(prior_op_idx);
while (!q.empty()) {
int op_idx = q.front();
q.pop();
auto it = downstream_map.find(op_idx);
if (it != downstream_map.end()) {
for (int downstream_op_idx : it->second) {
if (downstream_op_idx == posterior_op_idx) {
return true;
}
// no need for double enqueue checking since DAG is assumed
q.push(downstream_op_idx);
}
}
}
return false;
}
const std::string StringizeDownstreamMap(
const std::map<int, std::set<int>>& downstream_map) {
std::ostringstream oss;
for (auto pair : downstream_map) {
oss << pair.first << " -> ";
std::copy(pair.second.begin(),
pair.second.end(),
std::ostream_iterator<int>(oss, " "));
oss << std::endl;
}
return oss.str();
}
const std::map<int, std::set<int>>& DependencyBuilder::Build(
const std::vector<Instruction>& instructions) {
PADDLE_ENFORCE_EQ(
is_build_,
false,
phi::errors::AlreadyExists("The op dependency has been built"));
instructions_ = &instructions;
op_num_ = instructions_->size();
BuildDownstreamMap();
BuildOpHappensBefore();
ShrinkDownstreamMap();
AddDependencyForCoalesceTensorOp();
AddDependencyForCommunicationOp();
AddDependencyForRandomOp();
AddDependencyForReadOp();
if (FLAGS_new_executor_sequential_run) {
AddDependencyForSequentialRun();
}
is_build_ = true;
VLOG(8) << "Finish build dependency";
VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map_);
VLOG(8) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map_);
return op_downstream_map_;
}
bool DependencyBuilder::OpHappensBefore(int prior_op_idx,
int posterior_op_idx) {
PADDLE_ENFORCE_GE(
op_happens_before_.size(),
0,
phi::errors::Unavailable("op_happen_before is not yet built"));
return op_happens_before_.at(prior_op_idx).at(posterior_op_idx);
}
void DependencyBuilder::AddDependencyForCoalesceTensorOp() {
const std::string kCoalesceTensor = "coalesce_tensor";
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (instructions_->at(op_idx).OpBase()->Type() == kCoalesceTensor) {
VLOG(4) << "Add depend for " << kCoalesceTensor << " " << op_idx;
auto fused_out = instructions_->at(op_idx).Outputs().at("FusedOutput")[0];
auto outputs = instructions_->at(op_idx).Outputs().at("Output");
auto is_read = [](const Instruction& inst, int var_id) -> bool {
for (auto pair : inst.Inputs()) {
for (auto item : pair.second) {
if (item == var_id) {
return true;
}
}
}
return false;
};
auto is_write = [](const Instruction& inst, int var_id) -> bool {
for (auto pair : inst.Outputs()) {
for (auto item : pair.second) {
if (item == var_id) {
return true;
}
}
}
return false;
};
// find first op that reads fused_out
auto first_read_fused_out_op = -1;
for (auto j = op_idx + 1; j < op_num_; ++j) {
if (is_read(instructions_->at(j), fused_out)) {
first_read_fused_out_op = j;
break;
}
}
if (UNLIKELY(first_read_fused_out_op == -1)) {
VLOG(4) << "No op read FusedOutput";
continue;
}
// find ops that write 'outputs' between (op_index,
// first_read_fused_out_op)
// add depend: them->first_read_fused_out_op
for (auto j = op_idx + 1;
j < static_cast<size_t>(first_read_fused_out_op);
++j) {
for (auto var_id : outputs) {
if (is_write(instructions_->at(j), var_id)) {
AddDownstreamOp(j, first_read_fused_out_op);
}
}
}
// find first op read 'outputs' between (first_read_fused_out_op, end)
// add depned: first_read_fused_out_op -> first op that reads 'outputs'
// special case for consecutive communication ops, for example,
// FusedOutput = c_sync_calc_stream(FusedOutput)
// FusedOutput= c_allreduce_sum(FusedOutput)
// FusedOutput = c_sync_comm_stream(FusedOutput)
// we should take the last one to add depned instead of
// 'first_read_fused_out_op'
size_t target = first_read_fused_out_op;
for (size_t j = first_read_fused_out_op + 1; j < op_num_; ++j) {
if (j == target + 1 &&
IsCommunicationOp(instructions_->at(target).OpBase()->Type()) &&
IsCommunicationOp(instructions_->at(j).OpBase()->Type())) {
VLOG(4) << "Found consecutive communication ops, "
<< instructions_->at(target).OpBase()->Type() << " -> "
<< instructions_->at(j).OpBase()->Type();
target = j;
continue;
}
for (auto var_id : outputs) {
if (is_read(instructions_->at(j), var_id)) {
AddDownstreamOp(target, j);
}
}
}
}
}
}
void DependencyBuilder::AddDependencyForCommunicationOp() {
auto IsCommunicationOp = [](std::string op) -> bool {
const std::set<std::string> special_comm_op_set = {
"send",
"recv",
"send_v2",
"recv_v2",
};
const std::string communication_op_prefix = "c_";
if (op.find(communication_op_prefix) != std::string::npos ||
special_comm_op_set.count(op)) {
return true;
}
return false;
};
int dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (IsCommunicationOp(instructions_->at(op_idx).OpBase()->Type())) {
if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, op_idx);
}
dependence_op_idx = op_idx;
}
}
// TODO(zhiqiu): there still some cases not handled
// add dependency for c_sync_comm_stream
// in program, we can add only one c_sync_comm_stream to sync all
// communication ops.
// c_allreduce_sum(a)
// c_allreduce_sum(b)
// c_allreduce_sum(c)
// c_sync_comm_stream(a)
const std::string kSyncComm = "c_sync_comm_stream";
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (instructions_->at(op_idx).OpBase()->Type() == kSyncComm) {
dependence_op_idx = op_idx;
} else {
if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, op_idx);
}
}
}
}
// make sure that the random op is scheduled sequentially
void DependencyBuilder::AddDependencyForRandomOp() {
const std::set<std::string> random_op_set = {
"bernoulli",
"poisson",
"multinomial",
"gaussian_random",
"truncated_gaussian_random",
"uniform_random",
"randint",
"randperm",
"exponential",
"sampling_id"
"dropout",
"class_center_sample",
};
int dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (random_op_set.count(instructions_->at(op_idx).OpBase()->Type())) {
if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, op_idx);
}
dependence_op_idx = op_idx;
}
}
}
// equivalent to add_reader_dependency_pass
void DependencyBuilder::AddDependencyForReadOp() {
std::vector<bool> is_startup_ops(op_num_, true);
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
auto it = op_downstream_map_.find(op_idx);
if (it != op_downstream_map_.end()) {
for (size_t downstream_op_idx : it->second) {
is_startup_ops[downstream_op_idx] = false;
}
}
}
std::vector<size_t> read_ops;
std::vector<size_t> startup_ops;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (instructions_->at(op_idx).OpBase()->Type() == "read") {
read_ops.push_back(op_idx);
}
if (is_startup_ops[op_idx]) {
startup_ops.push_back(op_idx);
}
}
for (size_t read_op_idx : read_ops) {
for (size_t downstream_op_idx : startup_ops) {
if (read_op_idx != downstream_op_idx &&
!IsDependency(downstream_op_idx, read_op_idx, op_downstream_map_))
AddDownstreamOp(read_op_idx, downstream_op_idx);
}
}
}
void DependencyBuilder::AddDependencyForSequentialRun() {
int dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (!IsCpuOp(instructions_->at(op_idx))) {
if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, op_idx);
}
dependence_op_idx = op_idx;
}
}
}
void DependencyBuilder::AddDownstreamOp(int prior_op_idx,
int posterior_op_idx) {
std::set<int>& downstream_ops = op_downstream_map_[prior_op_idx];
if (op_happens_before_.size() != 0) {
PADDLE_ENFORCE_EQ(
op_happens_before_[posterior_op_idx][prior_op_idx],
false,
phi::errors::Unavailable(
"Can not add dependency %d->%d because %d is run before %d",
prior_op_idx,
posterior_op_idx,
posterior_op_idx,
prior_op_idx));
for (int op_idx : downstream_ops) {
if (op_happens_before_[op_idx][posterior_op_idx]) {
VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx
<< "->" << posterior_op_idx << ", skip adding " << prior_op_idx
<< "->" << posterior_op_idx;
return;
}
}
}
downstream_ops.insert(posterior_op_idx);
if (op_happens_before_.size() != 0) {
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (op_happens_before_[posterior_op_idx][op_idx]) {
op_happens_before_[prior_op_idx][op_idx] = true;
}
}
}
VLOG(8) << prior_op_idx << "->" << posterior_op_idx;
VLOG(8) << "Add dependency from "
<< instructions_->at(prior_op_idx).OpBase()->Type() << "("
<< prior_op_idx << ") to "
<< instructions_->at(posterior_op_idx).OpBase()->Type() << "("
<< posterior_op_idx << ")";
}
void DependencyBuilder::BuildDownstreamMap() {
auto var2min_rw_op =
std::map<int, std::list<int>>(); // # map from variable id to read /
// write op id.
auto var2recent_write_op =
std::map<int, int>(); // # map from variable to recent write op.
auto op2dependences =
std::map<int, std::set<int>>(); //# map from op to the dependence list,
// op must run after the dependence.
std::set<int>
remove_duplicate; // remove the duplicate between inputs and outputs
// reserve
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
op2dependences[op_idx] = std::set<int>();
}
auto update_var_min_rw_op =
[](const std::map<int, std::set<int>>& op2dependences,
std::map<int, std::list<int>>* var2min_rw_op,
int cur_op,
int rw_var) {
// rw_var is inputs or outputs of cur_op
// this function update the var2min_rw_op set .
if (var2min_rw_op->find(rw_var) == var2min_rw_op->end()) {
(*var2min_rw_op)[rw_var] = std::list<int>();
}
for (auto dep_op : op2dependences.at(cur_op)) {
var2min_rw_op->at(rw_var).remove(dep_op);
}
var2min_rw_op->at(rw_var).push_back(cur_op);
};
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
remove_duplicate.clear();
// step1: update the op2dependences structure
for (auto& item :
instructions_->at(op_idx).Inputs()) { // for all inputs(read only)
for (auto var : item.second) {
if (var2recent_write_op.count(var))
op2dependences[op_idx].insert(var2recent_write_op[var]);
}
}
for (auto& item :
instructions_->at(op_idx).Outputs()) { // for all write vars
for (auto var : item.second) {
if (var2min_rw_op.count(var)) {
for (auto dep_op : var2min_rw_op[var]) {
op2dependences[op_idx].insert(dep_op);
}
}
}
}
// the original output of inplace op is also change.
if (!instructions_->at(op_idx).InplaceBackMap().empty()) {
auto& m = instructions_->at(op_idx).InplaceBackMap();
for (auto& p : m) {
auto& var = p.second;
if (var2min_rw_op.count(var)) {
for (auto dep_op : var2min_rw_op[var]) {
op2dependences[op_idx].insert(dep_op);
}
}
}
}
// step2: update 2 var2xxxx data structure
for (auto& item :
instructions_->at(op_idx).Outputs()) { // for all write vars
for (auto var : item.second) {
var2recent_write_op[var] = op_idx;
var2min_rw_op[var] = {static_cast<int>(op_idx)};
remove_duplicate.insert(var);
}
}
// NOTE(zhiqiu): The inplace op with `transfer` also changes
// original output after that so add original output as well
// original: a->op->a
// after: a->data_transfer->a'->op->a'->transfer_back->a
// which means op writes a and a'
if (!instructions_->at(op_idx).InplaceBackMap().empty()) {
auto& m = instructions_->at(op_idx).InplaceBackMap();
for (auto& p : m) {
auto var = p.second;
var2recent_write_op[var] = op_idx;
var2min_rw_op[var] = {static_cast<int>(op_idx)};
remove_duplicate.insert(var);
}
}
for (auto& item :
instructions_->at(op_idx).Inputs()) { // for all inputs(read only)
for (auto var : item.second) {
if (remove_duplicate.count(var) ==
0) { // var in input list and in output list, so remove it.
update_var_min_rw_op(op2dependences, &var2min_rw_op, op_idx, var);
}
}
}
}
// convert op2dependences to downstream_map directly. op2dependences is op ->
// it's dependences, we want to get op -> [next ops] map, where ops is the
// next instruction of op. The size of downstream != size of op2dependences
// since there are some ops that have no downstream-op.
for (auto& item : op2dependences) {
int op = item.first;
for (auto dep_op : item.second) {
AddDownstreamOp(dep_op, op);
}
}
VLOG(6) << "downstream count: " << CountDownstreamMap(op_downstream_map_);
VLOG(6) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map_);
}
void DependencyBuilder::BuildOpHappensBefore() {
// happens_before[i][j] means i should be executed before j
op_happens_before_.assign(op_num_, std::vector<bool>(op_num_, false));
// bfs to get all next ops
auto bfs = [&](size_t op_idx) {
std::queue<size_t> q;
std::vector<bool> visited(op_num_, false);
q.push(op_idx);
while (!q.empty()) {
size_t op = q.front();
q.pop();
visited[op] = true;
if (!op_downstream_map_.count(op)) {
continue;
}
for (auto next : op_downstream_map_.at(op)) {
if (!visited[next]) {
PADDLE_ENFORCE_EQ(op_happens_before_[next][op_idx],
false,
paddle::platform::errors::AlreadyExists(
"There exists circle in graph, expected "
"%d->%d, but already got %d->%d",
op_idx,
next,
next,
op_idx));
op_happens_before_[op_idx][next] = true;
VLOG(8) << "happens before: " << op_idx << " " << next;
q.push(next);
}
}
}
};
for (size_t i = 0; i < op_num_; ++i) {
bfs(i);
}
}
void DependencyBuilder::ShrinkDownstreamMap() {
// remove unnecessary downstream ops
// for example, a->b->c
// a: b, c
// b: c
// =>
// a: b
// b: c
// shrink, find the downstream op that has no other op in the
// downstream list happens before it
for (size_t i = 0; i < op_num_; ++i) {
if (op_downstream_map_.find(i) == op_downstream_map_.end()) {
continue;
}
std::set<int> minumum_nexts;
for (size_t item : op_downstream_map_.at(i)) {
bool not_after_any = true;
// find the op that is not executed after any
for (size_t other_item : op_downstream_map_.at(i)) {
if (op_happens_before_[other_item][item]) {
VLOG(8) << "happens_before: " << other_item << "->" << item
<< ", so skip " << item;
not_after_any = false;
break;
}
}
if (not_after_any) {
VLOG(8) << "downstream op of " << i << ": " << item;
minumum_nexts.insert(item);
}
}
op_downstream_map_.at(i) = minumum_nexts;
}
VLOG(6) << "downstream count: " << CountDownstreamMap(op_downstream_map_);
VLOG(6) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map_);
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
......@@ -12,20 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// This file provides some dependency adding function to handle the implicit
// dependency that cannot be explicitly expresed by a Program. It is a
// compromise of the incomplete expression ability of the Program. Do not add
// too many functions here at will, that will bring great burden to the
// Interpretercore.
// TODO(Ruibiao):
// 1. Move other dependency adding codes from interpretercore_util.cc to
// dependency_utils.cc
// 2. Move other Interpretercore related codes to directory
// new_executor/interpreter
// 3. Try to remove parameter op_happens_before from the dependency adding
// function
#pragma once
#include <map>
......@@ -37,17 +23,50 @@ namespace paddle {
namespace framework {
namespace interpreter {
// equivalent to add_reader_dependency_pass
void AddDependencyForReadOp(
const std::vector<Instruction>& vec_instruction,
std::map<int, std::list<int>>* downstream_map,
const std::vector<std::vector<bool>>* op_happens_before = nullptr);
void AddDownstreamOp(
int prior_op_idx,
int posterior_op_idx,
std::map<int, std::list<int>>* op_downstream_map,
const std::vector<std::vector<bool>>* op_happens_before = nullptr);
// DependencyBuilder provides some dependency adding function to handle the
// dependency that cannot be explicitly expresed by a Program. It is a
// compromise of the incomplete expression ability of the Program. Do not add
// too many functions here at will, that will bring great burden to the
// Interpretercore.
class DependencyBuilder {
public:
DependencyBuilder() : is_build_(false), instructions_(nullptr) {}
// build op dependencies and return the mapping from op to its downstream-op
// set
const std::map<int, std::set<int>>& Build(
const std::vector<Instruction>& instructions);
bool OpHappensBefore(int prior_op_idx, int posterior_op_idx);
private:
void AddDependencyForCoalesceTensorOp();
void AddDependencyForCommunicationOp();
void AddDependencyForRandomOp();
void AddDependencyForReadOp();
void AddDependencyForSequentialRun();
void AddDownstreamOp(int prior_op_idx, int posterior_op_idx);
void BuildDownstreamMap();
void BuildOpHappensBefore();
void ShrinkDownstreamMap();
bool is_build_;
const std::vector<Instruction>* instructions_; // not_own
size_t op_num_;
// op_happens_before_[i][j] == true means op[i] happens before op[j]
std::vector<std::vector<bool>> op_happens_before_;
// op_downstream_map_ is the mapping from op to its downstream-op set, that is
// to say, op_downstream_map_[i] == {a, b, c} means op[a], op[b] and op[c]
// should be dispatched after op[i]
std::map<int, std::set<int>> op_downstream_map_;
};
} // namespace interpreter
} // namespace framework
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h"
#include <queue>
namespace paddle {
namespace framework {
namespace interpreter {
void AddDownstreamOp(int prior_op_idx,
int posterior_op_idx,
std::map<int, std::list<int>>* op_downstream_map,
const std::vector<std::vector<bool>>* op_happens_before) {
if (op_downstream_map->find(prior_op_idx) == op_downstream_map->end()) {
op_downstream_map->emplace(std::make_pair(prior_op_idx, std::list<int>()));
} else {
if (op_happens_before != nullptr) {
for (int op_idx : op_downstream_map->at(prior_op_idx)) {
if (op_happens_before->at(op_idx).at(posterior_op_idx)) {
VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx
<< "->" << posterior_op_idx << ", skip adding "
<< prior_op_idx << "->" << posterior_op_idx;
return;
}
}
}
}
op_downstream_map->at(prior_op_idx).push_back(posterior_op_idx);
}
// check whether exists prior_op -> ... -> posterior_op to avoid building loops
bool IsDependency(int prior_op_idx,
int posterior_op_idx,
const std::map<int, std::list<int>>& downstream_map) {
std::queue<int> q;
q.push(prior_op_idx);
while (!q.empty()) {
int op_idx = q.front();
q.pop();
auto it = downstream_map.find(op_idx);
if (it != downstream_map.end()) {
for (int downstream_op_idx : it->second) {
if (downstream_op_idx == posterior_op_idx) {
return true;
}
// no need for double enqueue checking since DAG is assumed
q.push(downstream_op_idx);
}
}
}
return false;
}
void AddDependencyForReadOp(
const std::vector<Instruction>& vec_instruction,
std::map<int, std::list<int>>* downstream_map,
const std::vector<std::vector<bool>>* op_happens_before) {
size_t op_num = vec_instruction.size();
std::vector<bool> is_startup_ops(op_num, true);
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
auto it = downstream_map->find(op_idx);
if (it != downstream_map->end()) {
for (size_t downstream_op_idx : it->second) {
is_startup_ops[downstream_op_idx] = false;
}
}
}
std::vector<size_t> read_ops;
std::vector<size_t> startup_ops;
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == "read") {
read_ops.push_back(op_idx);
}
if (is_startup_ops[op_idx]) {
startup_ops.push_back(op_idx);
}
}
for (size_t read_op_idx : read_ops) {
for (size_t downstream_op_idx : startup_ops) {
if (read_op_idx != downstream_op_idx &&
!IsDependency(downstream_op_idx, read_op_idx, *downstream_map))
AddDownstreamOp(
read_op_idx, downstream_op_idx, downstream_map, op_happens_before);
VLOG(4) << "Add depend from "
<< vec_instruction[read_op_idx].OpBase()->Type() << "("
<< read_op_idx << ") to "
<< vec_instruction[downstream_op_idx].OpBase()->Type() << "("
<< downstream_op_idx << ")";
}
}
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
......@@ -333,8 +333,7 @@ void InterpreterCore::BuildOperatorDependences() {
// Schedule
auto op_nums = vec_instruction_.size();
dependecy_count_.resize(op_nums);
auto op2downstream = interpreter::build_op_downstream_map(
vec_instruction_, &op_happens_before_);
auto op2downstream = dependency_builder_.Build(vec_instruction_);
for (size_t op = 0; op < vec_instruction_.size(); ++op) {
auto op_list = op2downstream[op];
std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
......@@ -450,7 +449,7 @@ void InterpreterCore::Convert(
bool not_before_any = true;
// find the op that is not executed before any
for (size_t other_item : last_live_ops_[i]) {
if (op_happens_before_[item][other_item]) {
if (dependency_builder_.OpHappensBefore(item, other_item)) {
VLOG(8) << "happens_before: " << item << "->" << other_item
<< ", so skip " << item;
not_before_any = false;
......
......@@ -22,6 +22,7 @@
#include "paddle/fluid/framework/details/exception_holder.h"
#include "paddle/fluid/framework/new_executor/event_manager.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/profiler.h"
......@@ -106,6 +107,8 @@ class InterpreterCore {
const BlockDesc& block_; // not owned
const std::set<std::string> skip_gc_vars_;
interpreter::DependencyBuilder dependency_builder_;
// NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will
// copy a new program and block, the copy_program_ here is used to
// hold the program, otherwise block_ maybe not valid after the
......@@ -119,8 +122,6 @@ class InterpreterCore {
std::vector<Instruction> vec_instruction_; // deconstruct before OpFuncNode
// op_happens_before_[i][j] == true means op[i] happens before op[j]
std::vector<std::vector<bool>> op_happens_before_;
// last_live_ops_[i] contains the id of operatos that last access var[i]
std::map<size_t, std::set<size_t>> last_live_ops_;
......
......@@ -18,7 +18,6 @@
#include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/new_executor/data_transfer.h"
#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h"
#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h"
#include "paddle/fluid/operators/controlflow/recurrent_op_helper.h"
#include "paddle/fluid/operators/controlflow/while_op_helper.h"
......@@ -29,16 +28,6 @@
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
// The difference between "sequential_run" and "serial_run":
// "sequential_run" dispatches OPs one by one according to the sequence in the
// Program, while "serial_run" ensures that all Ops are scheduled in a singal
// thread. In standalone executor, "sequential_run" is also "serial_run", while
// "serial_run" is not necessarily "sequential_run".
PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run,
false,
"Enable sequential execution for standalone "
"executor, only applied to GPU OPs.");
PADDLE_DEFINE_EXPORTED_bool(
new_executor_serial_run,
false,
......@@ -718,451 +707,6 @@ std::vector<size_t> merge_vector(const std::vector<size_t>& first,
return out;
}
void update_var_min_rw_op(const std::map<int, std::set<int>>& op2dependences,
std::map<int, std::list<int>>* var2min_rw_op,
int cur_op,
int rw_var) {
// rw_var is inputs or outputs of cur_op
// this function update the var2min_rw_op set .
if (var2min_rw_op->find(rw_var) == var2min_rw_op->end()) {
(*var2min_rw_op)[rw_var] = std::list<int>();
}
for (auto dep_op : op2dependences.at(cur_op)) {
var2min_rw_op->at(rw_var).remove(dep_op);
}
var2min_rw_op->at(rw_var).push_back(cur_op);
}
size_t CountDownstreamMap(const std::map<int, std::list<int>>& downstream_map) {
size_t count = 0;
for (auto pair : downstream_map) {
count += pair.second.size();
}
return count;
}
const std::string StringizeDownstreamMap(
const std::map<int, std::list<int>>& downstream_map) {
std::ostringstream oss;
for (auto pair : downstream_map) {
oss << pair.first << " -> ";
std::copy(pair.second.begin(),
pair.second.end(),
std::ostream_iterator<int>(oss, " "));
oss << std::endl;
}
return oss.str();
}
// convert op2dependences to downstream_map directly. op2dependences is op ->
// it's dependences, we want to get op -> [next ops] map, where ops is the next
// instruction of op.
std::map<int, std::list<int>> GetDownstreamMap(
const std::map<int, std::set<int>>& op2dependences) {
std::map<int, std::list<int>> downstream_map;
for (auto& item : op2dependences) {
int op = item.first;
for (auto dep_op : item.second) {
AddDownstreamOp(dep_op, op, &downstream_map);
}
}
VLOG(6) << "downstream count: " << CountDownstreamMap(downstream_map);
VLOG(6) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(downstream_map);
return downstream_map;
}
void ShrinkDownstreamMap(std::map<int, std::list<int>>* downstream_map,
std::vector<std::vector<bool>>* op_happens_before,
size_t op_num) {
// remove unnecessary downstream ops
// for example, a->b->c
// a: b, c
// b: c
// =>
// a: b
// b: c
// happens_before[i][j] means i should be executed before j
op_happens_before->assign(op_num, std::vector<bool>(op_num, false));
// bfs to get all next ops
auto bfs = [&](size_t op_idx) {
std::queue<size_t> q;
std::vector<bool> visited(op_num, false);
q.push(op_idx);
while (!q.empty()) {
size_t op = q.front();
q.pop();
visited[op] = true;
if (!downstream_map->count(op)) {
continue;
}
for (auto next : downstream_map->at(op)) {
if (!visited[next]) {
PADDLE_ENFORCE_EQ((*op_happens_before)[next][op_idx],
false,
paddle::platform::errors::AlreadyExists(
"There exists circle in graph, expected "
"%d->%d, but already got %d->%d",
op_idx,
next,
next,
op_idx));
(*op_happens_before)[op_idx][next] = true;
VLOG(8) << "happens before: " << op_idx << " " << next;
q.push(next);
}
}
}
};
for (size_t i = 0; i < op_num; ++i) {
bfs(i);
}
// shrink, find the downstream op that has no other op in the
// downstream list happens before it
for (size_t i = 0; i < op_num; ++i) {
if (downstream_map->find(i) == downstream_map->end()) {
continue;
}
std::list<int> minumum_nexts;
for (size_t item : downstream_map->at(i)) {
bool not_after_any = true;
// find the op that is not executed after any
for (size_t other_item : downstream_map->at(i)) {
if ((*op_happens_before)[other_item][item]) {
VLOG(8) << "happens_before: " << other_item << "->" << item
<< ", so skip " << item;
not_after_any = false;
break;
}
}
if (not_after_any) {
VLOG(8) << "downstream op of " << i << ": " << item;
minumum_nexts.push_back(item);
}
}
downstream_map->at(i) = minumum_nexts;
}
VLOG(6) << "downstream count: " << CountDownstreamMap(*downstream_map);
VLOG(6) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(*downstream_map);
}
std::map<int, std::list<int>> build_op_downstream_map(
const std::vector<Instruction>& vec_instruction,
std::vector<std::vector<bool>>* op_happens_before) {
auto var2min_rw_op =
std::map<int, std::list<int>>(); // # map from variable id to read /
// write op id.
auto var2recent_write_op =
std::map<int, int>(); // # map from variable to recent write op.
auto op2dependences =
std::map<int, std::set<int>>(); //# map from op to the dependence list,
// op must run after the dependence.
std::set<int>
remove_duplicate; // remove the duplicate between inputs and outputs
size_t op_num = vec_instruction.size();
// reserve
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
op2dependences[op_idx] = std::set<int>();
}
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
remove_duplicate.clear();
// step1: update the op2dependences structure
for (auto& item :
vec_instruction[op_idx].Inputs()) { // for all inputs(read only)
for (auto var : item.second) {
if (var2recent_write_op.count(var))
op2dependences[op_idx].insert(var2recent_write_op[var]);
}
}
for (auto& item :
vec_instruction[op_idx].Outputs()) { // for all write vars
for (auto var : item.second) {
if (var2min_rw_op.count(var)) {
for (auto dep_op : var2min_rw_op[var]) {
op2dependences[op_idx].insert(dep_op);
}
}
}
}
// the original output of inplace op is also change.
if (!vec_instruction[op_idx].InplaceBackMap().empty()) {
auto& m = vec_instruction[op_idx].InplaceBackMap();
for (auto& p : m) {
auto& var = p.second;
if (var2min_rw_op.count(var)) {
for (auto dep_op : var2min_rw_op[var]) {
op2dependences[op_idx].insert(dep_op);
}
}
}
}
// step2: update 2 var2xxxx data structure
for (auto& item :
vec_instruction[op_idx].Outputs()) { // for all write vars
for (auto var : item.second) {
var2recent_write_op[var] = op_idx;
var2min_rw_op[var] = {static_cast<int>(op_idx)};
remove_duplicate.insert(var);
}
}
// NOTE(zhiqiu): The inplace op with `transfer` also changes
// original output after that so add original output as well
// original: a->op->a
// after: a->data_transfer->a'->op->a'->transfer_back->a
// which means op writes a and a'
if (!vec_instruction[op_idx].InplaceBackMap().empty()) {
auto& m = vec_instruction[op_idx].InplaceBackMap();
for (auto& p : m) {
auto var = p.second;
var2recent_write_op[var] = op_idx;
var2min_rw_op[var] = {static_cast<int>(op_idx)};
remove_duplicate.insert(var);
}
}
for (auto& item :
vec_instruction[op_idx].Inputs()) { // for all inputs(read only)
for (auto var : item.second) {
if (remove_duplicate.count(var) ==
0) { // var in input list and in output list, so remove it.
update_var_min_rw_op(op2dependences, &var2min_rw_op, op_idx, var);
}
}
}
}
// NOTE(zhiqiu): the size of downstream != size of op2dependences since there
// are some ops that have no downstream-op.
std::map<int, std::list<int>> op_downstream_map =
GetDownstreamMap(op2dependences);
ShrinkDownstreamMap(&op_downstream_map, op_happens_before, op_num);
// add dependences for random op, make sure that the random op is scheduled
// sequentially
const std::set<std::string> random_op_set = {
"bernoulli",
"poisson",
"multinomial",
"gaussian_random",
"truncated_gaussian_random",
"uniform_random",
"randint",
"randperm",
"exponential",
"sampling_id"
"dropout",
"class_center_sample",
};
int dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) {
if (dependence_op_idx != -1) {
AddDownstreamOp(
dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
}
dependence_op_idx = op_idx;
}
}
// add dependency for communication op
auto is_comm_op = [](std::string op) -> bool {
const std::set<std::string> special_comm_op_set = {
"send",
"recv",
"send_v2",
"recv_v2",
};
const std::string communication_op_prefix = "c_";
if (op.find(communication_op_prefix) != std::string::npos ||
special_comm_op_set.count(op)) {
return true;
}
return false;
};
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) {
if (dependence_op_idx != -1) {
AddDownstreamOp(
dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
VLOG(4) << "Add depend from "
<< vec_instruction[dependence_op_idx].OpBase()->Type() << " to "
<< vec_instruction[op_idx].OpBase()->Type();
}
dependence_op_idx = op_idx;
}
}
// TODO(zhiqiu): there still some cases not handled
// add dependency for c_sync_comm_stream
// in program, we can add only one c_sync_comm_stream to sync all
// communication ops.
// c_allreduce_sum(a)
// c_allreduce_sum(b)
// c_allreduce_sum(c)
// c_sync_comm_stream(a)
const std::string kSyncComm = "c_sync_comm_stream";
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == kSyncComm) {
dependence_op_idx = op_idx;
} else {
if (dependence_op_idx != -1) {
VLOG(4) << "Add depend from "
<< vec_instruction[dependence_op_idx].OpBase()->Type() << " to "
<< vec_instruction[op_idx].OpBase()->Type();
AddDownstreamOp(
dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
}
}
}
// add dependency for coalesce_tensor
const std::string kCoalesceTensor = "coalesce_tensor";
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == kCoalesceTensor) {
VLOG(4) << "Add depend for " << kCoalesceTensor << " " << op_idx;
auto fused_out = vec_instruction[op_idx].Outputs().at("FusedOutput")[0];
auto outputs = vec_instruction[op_idx].Outputs().at("Output");
auto is_read = [](const Instruction& inst, int var_id) -> bool {
for (auto pair : inst.Inputs()) {
for (auto item : pair.second) {
if (item == var_id) {
return true;
}
}
}
return false;
};
auto is_write = [](const Instruction& inst, int var_id) -> bool {
for (auto pair : inst.Outputs()) {
for (auto item : pair.second) {
if (item == var_id) {
return true;
}
}
}
return false;
};
// find first op that reads fused_out
auto first_read_fused_out_op = -1;
for (auto j = op_idx + 1; j < op_num; ++j) {
if (is_read(vec_instruction[j], fused_out)) {
first_read_fused_out_op = j;
break;
}
}
if (UNLIKELY(first_read_fused_out_op == -1)) {
VLOG(4) << "No op read FusedOutput";
continue;
}
// find ops that write 'outputs' between (op_index,
// first_read_fused_out_op)
// add depend: them->first_read_fused_out_op
for (auto j = op_idx + 1;
j < static_cast<size_t>(first_read_fused_out_op);
++j) {
for (auto var_id : outputs) {
if (is_write(vec_instruction[j], var_id)) {
AddDownstreamOp(j,
first_read_fused_out_op,
&op_downstream_map,
op_happens_before);
VLOG(4) << j << " -> " << first_read_fused_out_op;
VLOG(4)
<< "Add depend from " << vec_instruction[j].OpBase()->Type()
<< " to "
<< vec_instruction[first_read_fused_out_op].OpBase()->Type();
}
}
}
// find first op read 'outputs' between (first_read_fused_out_op, end)
// add depned: first_read_fused_out_op -> first op that reads 'outputs'
// special case for consecutive communication ops, for example,
// FusedOutput = c_sync_calc_stream(FusedOutput)
// FusedOutput= c_allreduce_sum(FusedOutput)
// FusedOutput = c_sync_comm_stream(FusedOutput)
// we should take the last one to add depned instead of
// 'first_read_fused_out_op'
size_t target = first_read_fused_out_op;
for (size_t j = first_read_fused_out_op + 1; j < op_num; ++j) {
if (j == target + 1 &&
is_comm_op(vec_instruction[target].OpBase()->Type()) &&
is_comm_op(vec_instruction[j].OpBase()->Type())) {
VLOG(4) << "Found consecutive communication ops, "
<< vec_instruction[target].OpBase()->Type() << " -> "
<< vec_instruction[j].OpBase()->Type();
target = j;
continue;
}
for (auto var_id : outputs) {
if (is_read(vec_instruction[j], var_id)) {
AddDownstreamOp(target, j, &op_downstream_map, op_happens_before);
VLOG(4) << target << " -> " << j;
VLOG(4) << "Add depend from "
<< vec_instruction[target].OpBase()->Type() << " to "
<< vec_instruction[j].OpBase()->Type();
}
}
}
}
}
if (FLAGS_new_executor_sequential_run) {
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (!IsCpuOp(vec_instruction[op_idx])) {
if (dependence_op_idx != -1) {
AddDownstreamOp(
dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
VLOG(4) << "Add depend from "
<< vec_instruction[dependence_op_idx].OpBase()->Type() << "("
<< dependence_op_idx << ") to "
<< vec_instruction[op_idx].OpBase()->Type() << "(" << op_idx
<< ")";
}
dependence_op_idx = op_idx;
}
}
}
AddDependencyForReadOp(
vec_instruction, &op_downstream_map, op_happens_before);
VLOG(8) << "build_op_downstream_map finished";
VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map);
VLOG(8) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map);
return op_downstream_map;
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
......@@ -82,10 +82,6 @@ void build_op_func_list(const platform::Place& place,
VariableScope* scope,
bool use_local_scope = true);
std::map<int, std::list<int>> build_op_downstream_map(
const std::vector<Instruction>& vec_instruction,
std::vector<std::vector<bool>>* op_happens_before);
void add_fetch(const std::vector<std::string>& fetch_names,
framework::BlockDesc* block);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册