未验证 提交 889318d8 编写于 作者: R Ruibiao Chen 提交者: GitHub

Refactor stream anayzer (#48158)

* Move stream_anayzer to interpreter

* Refactor StreamAnalyzer

* Refactor RunNextInstructionList

* Remove no_data_transform_index

* Fix typos

* Fix data_transfer OpFuncType error

* Add event for depend_op

* Update transfer OpFuncType for heter place
上级 66eeb6a6
...@@ -3,7 +3,7 @@ add_subdirectory(interpreter) ...@@ -3,7 +3,7 @@ add_subdirectory(interpreter)
add_subdirectory(workqueue) add_subdirectory(workqueue)
set(STANDALONE_EXECUTOR_SRCS interpretercore.cc new_executor_defs.cc set(STANDALONE_EXECUTOR_SRCS interpretercore.cc new_executor_defs.cc
stream_analyzer.cc standalone_executor.cc) standalone_executor.cc)
set(STANDALONE_EXECUTOR_DEPS interpreter interpretercore_garbage_collector set(STANDALONE_EXECUTOR_DEPS interpreter interpretercore_garbage_collector
workqueue) workqueue)
......
set(INTERPRETER_SRCS data_transfer.cc dependency_builder.cc event_manager.cc set(INTERPRETER_SRCS data_transfer.cc dependency_builder.cc execution_config.cc
execution_config.cc interpreter_util.cc) interpreter_util.cc stream_analyzer.cc)
set(INTERPRETER_DEPS set(INTERPRETER_DEPS
device_context device_context
......
...@@ -184,21 +184,30 @@ void DataTranferHelper::RunAndConstructOpFuncNode( ...@@ -184,21 +184,30 @@ void DataTranferHelper::RunAndConstructOpFuncNode(
(*new_op_func_node.phi_kernel_)(&phi_kernel_context); (*new_op_func_node.phi_kernel_)(&phi_kernel_context);
} }
// NOTE(winter-wang): in npu device, D2H kernel is asynchronous. need to const phi::Place& place = dev_ctx->GetPlace();
// explicit synchronization.
#ifdef PADDLE_WITH_ASCEND_CL // NOTE(winter-wang): in npu and custom device, D2H kernel is asynchronous.
if (op_type == kMemcpyD2H && platform::is_npu_place(dev_ctx->GetPlace())) { // need to explicit synchronization.
if ((platform::is_npu_place(place) || platform::is_custom_place(place)) &&
op_type == kMemcpyD2H) {
dev_ctx->Wait(); dev_ctx->Wait();
} }
#endif
#ifdef PADDLE_WITH_CUSTOM_DEVICE if (platform::is_cpu_place(place)) {
if (op_type == kMemcpyD2H && platform::is_custom_place(dev_ctx->GetPlace())) { new_op_func_node.type_ = OpFuncType::kCpuSync;
dev_ctx->Wait(); } else if (platform::is_gpu_place(place)) {
// MemcpyD2H in gpu is synchronous, see
// https://docs.nvidia.com/cuda/cuda-runtime-api/api-sync-behavior.html#api-sync-behavior__memcpy-async
// for more detial.
new_op_func_node.type_ =
(op_type == kMemcpyD2H ? OpFuncType::kGpuSync : OpFuncType::kGpuAsync);
} else if (platform::is_xpu_place(place)) {
// Memcpy in xpu is synchronous
new_op_func_node.type_ = OpFuncType::kGpuSync;
} else {
// Memcpy in npu and custom devices is asynchronous
new_op_func_node.type_ = OpFuncType::kGpuAsync;
} }
#endif
// NOTE(Aurelius84): data_transform_op is expensive operation, so we tag them
// as kQueueSync and execute them in thread pool.
new_op_func_node.type_ = OpFuncType::kQueueSync;
new_op_func_node.dev_ctx_ = dev_ctx; new_op_func_node.dev_ctx_ = dev_ctx;
new_op_func_node.operator_base_ = op; new_op_func_node.operator_base_ = op;
VLOG(3) << "Run " << op_type << " done."; VLOG(3) << "Run " << op_type << " done.";
...@@ -437,8 +446,6 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key, ...@@ -437,8 +446,6 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key,
VariableNameMap new_ins(op_base->Inputs()); VariableNameMap new_ins(op_base->Inputs());
VariableNameMap new_outs(op_base->Outputs()); VariableNameMap new_outs(op_base->Outputs());
// record the no need transform variable index.
std::unordered_set<int> no_data_transform_index;
const std::unordered_set<std::string>* no_buffer_ins = nullptr; const std::unordered_set<std::string>* no_buffer_ins = nullptr;
auto& no_buffer_inferer = op_base->Info().NoNeedBufferVarsInferer(); auto& no_buffer_inferer = op_base->Info().NoNeedBufferVarsInferer();
...@@ -560,12 +567,6 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key, ...@@ -560,12 +567,6 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key,
if (op_base->Type() == "fetch_v2") { if (op_base->Type() == "fetch_v2") {
op_base->SetAttr("deepcopy", false); op_base->SetAttr("deepcopy", false);
} }
} else {
// record no need data transformer input var_id
VLOG(3) << op_base->Type()
<< " found no data_transform var: " << var_name
<< " with id: " << var_scope->VarId(var_name);
no_data_transform_index.emplace(var_scope->VarId(var_name));
} }
} }
} }
...@@ -576,7 +577,6 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key, ...@@ -576,7 +577,6 @@ void ApplyDataTransform(const OpKernelType& expected_kernel_key,
op_base->Inputs() = new_ins; op_base->Inputs() = new_ins;
op_base->Outputs() = new_outs; op_base->Outputs() = new_outs;
} }
op_func_node->no_data_transform_index = std::move(no_data_transform_index);
} }
void HandleComplexGradToRealGrad(const OpFuncNode& op_func_node, void HandleComplexGradToRealGrad(const OpFuncNode& op_func_node,
......
...@@ -21,7 +21,8 @@ namespace paddle { ...@@ -21,7 +21,8 @@ namespace paddle {
namespace framework { namespace framework {
namespace interpreter { namespace interpreter {
size_t CountDownstreamMap(const std::map<int, std::set<int>>& downstream_map) { size_t CountDownstreamMap(
const std::map<size_t, std::set<size_t>>& downstream_map) {
size_t count = 0; size_t count = 0;
for (auto pair : downstream_map) { for (auto pair : downstream_map) {
count += pair.second.size(); count += pair.second.size();
...@@ -29,19 +30,19 @@ size_t CountDownstreamMap(const std::map<int, std::set<int>>& downstream_map) { ...@@ -29,19 +30,19 @@ size_t CountDownstreamMap(const std::map<int, std::set<int>>& downstream_map) {
return count; return count;
} }
const std::string StringizeDownstreamMap( const std::string StringizeDownstreamMap(
const std::map<int, std::set<int>>& downstream_map) { const std::map<size_t, std::set<size_t>>& downstream_map) {
std::ostringstream oss; std::ostringstream oss;
for (auto pair : downstream_map) { for (auto pair : downstream_map) {
oss << pair.first << " -> "; oss << pair.first << " -> ";
std::copy(pair.second.begin(), std::copy(pair.second.begin(),
pair.second.end(), pair.second.end(),
std::ostream_iterator<int>(oss, " ")); std::ostream_iterator<size_t>(oss, " "));
oss << std::endl; oss << std::endl;
} }
return oss.str(); return oss.str();
} }
const std::map<int, std::set<int>>& DependencyBuilder::Build( const std::map<size_t, std::set<size_t>>& DependencyBuilder::Build(
const std::vector<Instruction>& instructions, bool is_sequential_run) { const std::vector<Instruction>& instructions, bool is_sequential_run) {
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
is_build_, is_build_,
...@@ -74,8 +75,18 @@ const std::map<int, std::set<int>>& DependencyBuilder::Build( ...@@ -74,8 +75,18 @@ const std::map<int, std::set<int>>& DependencyBuilder::Build(
return op_downstream_map_; return op_downstream_map_;
} }
bool DependencyBuilder::OpHappensBefore(int prior_op_idx, const std::map<size_t, std::set<size_t>>& DependencyBuilder::OpDownstreamMap()
int posterior_op_idx) { const {
PADDLE_ENFORCE_EQ(
is_build_,
true,
phi::errors::Unavailable(
"DependencyBuilder is not yet built, call Build() firstly."));
return op_downstream_map_;
}
bool DependencyBuilder::OpHappensBefore(size_t prior_op_idx,
size_t posterior_op_idx) const {
PADDLE_ENFORCE_GE( PADDLE_ENFORCE_GE(
op_happens_before_.size(), op_happens_before_.size(),
0, 0,
...@@ -90,9 +101,9 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() { ...@@ -90,9 +101,9 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() {
auto fused_out = instructions_->at(op_idx).Outputs().at("FusedOutput")[0]; auto fused_out = instructions_->at(op_idx).Outputs().at("FusedOutput")[0];
auto outputs = instructions_->at(op_idx).Outputs().at("Output"); auto outputs = instructions_->at(op_idx).Outputs().at("Output");
auto is_read = [](const Instruction& inst, int var_id) -> bool { auto is_read = [](const Instruction& inst, size_t var_id) -> bool {
for (auto pair : inst.Inputs()) { for (auto pair : inst.Inputs()) {
for (auto item : pair.second) { for (size_t item : pair.second) {
if (item == var_id) { if (item == var_id) {
return true; return true;
} }
...@@ -101,9 +112,9 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() { ...@@ -101,9 +112,9 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() {
return false; return false;
}; };
auto is_write = [](const Instruction& inst, int var_id) -> bool { auto is_write = [](const Instruction& inst, size_t var_id) -> bool {
for (auto pair : inst.Outputs()) { for (auto pair : inst.Outputs()) {
for (auto item : pair.second) { for (size_t item : pair.second) {
if (item == var_id) { if (item == var_id) {
return true; return true;
} }
...@@ -113,7 +124,7 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() { ...@@ -113,7 +124,7 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() {
}; };
// find first op that reads fused_out // find first op that reads fused_out
auto first_read_fused_out_op = -1; auto first_read_fused_out_op = ULLONG_MAX;
for (auto j = op_idx + 1; j < op_num_; ++j) { for (auto j = op_idx + 1; j < op_num_; ++j) {
if (is_read(instructions_->at(j), fused_out)) { if (is_read(instructions_->at(j), fused_out)) {
first_read_fused_out_op = j; first_read_fused_out_op = j;
...@@ -121,7 +132,7 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() { ...@@ -121,7 +132,7 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() {
} }
} }
if (UNLIKELY(first_read_fused_out_op == -1)) { if (UNLIKELY(first_read_fused_out_op == ULLONG_MAX)) {
VLOG(4) << "No op read FusedOutput"; VLOG(4) << "No op read FusedOutput";
continue; continue;
} }
...@@ -170,10 +181,10 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() { ...@@ -170,10 +181,10 @@ void DependencyBuilder::AddDependencyForCoalesceTensorOp() {
} }
void DependencyBuilder::AddDependencyForCommunicationOp() { void DependencyBuilder::AddDependencyForCommunicationOp() {
int dependence_op_idx = -1; size_t dependence_op_idx = ULLONG_MAX;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (IsCommunicationOp(instructions_->at(op_idx))) { if (IsCommunicationOp(instructions_->at(op_idx))) {
if (dependence_op_idx != -1) { if (dependence_op_idx != ULLONG_MAX) {
AddDownstreamOp(dependence_op_idx, op_idx); AddDownstreamOp(dependence_op_idx, op_idx);
} }
dependence_op_idx = op_idx; dependence_op_idx = op_idx;
...@@ -190,12 +201,12 @@ void DependencyBuilder::AddDependencyForCommunicationOp() { ...@@ -190,12 +201,12 @@ void DependencyBuilder::AddDependencyForCommunicationOp() {
// c_allreduce_sum(c) // c_allreduce_sum(c)
// c_sync_comm_stream(a) // c_sync_comm_stream(a)
const std::string kSyncComm = "c_sync_comm_stream"; const std::string kSyncComm = "c_sync_comm_stream";
dependence_op_idx = -1; dependence_op_idx = ULLONG_MAX;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (instructions_->at(op_idx).OpBase()->Type() == kSyncComm) { if (instructions_->at(op_idx).OpBase()->Type() == kSyncComm) {
dependence_op_idx = op_idx; dependence_op_idx = op_idx;
} else { } else {
if (dependence_op_idx != -1) { if (dependence_op_idx != ULLONG_MAX) {
AddDownstreamOp(dependence_op_idx, op_idx); AddDownstreamOp(dependence_op_idx, op_idx);
} }
} }
...@@ -217,10 +228,10 @@ void DependencyBuilder::AddDependencyForRandomOp() { ...@@ -217,10 +228,10 @@ void DependencyBuilder::AddDependencyForRandomOp() {
"dropout", "dropout",
"class_center_sample"}; "class_center_sample"};
int dependence_op_idx = -1; size_t dependence_op_idx = ULLONG_MAX;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (random_op_set.count(instructions_->at(op_idx).OpBase()->Type())) { if (random_op_set.count(instructions_->at(op_idx).OpBase()->Type())) {
if (dependence_op_idx != -1) { if (dependence_op_idx != ULLONG_MAX) {
AddDownstreamOp(dependence_op_idx, op_idx); AddDownstreamOp(dependence_op_idx, op_idx);
} }
dependence_op_idx = op_idx; dependence_op_idx = op_idx;
...@@ -263,10 +274,10 @@ void DependencyBuilder::AddDependencyForReadOp() { ...@@ -263,10 +274,10 @@ void DependencyBuilder::AddDependencyForReadOp() {
} }
void DependencyBuilder::AddDependencyForSequentialRun() { void DependencyBuilder::AddDependencyForSequentialRun() {
int dependence_op_idx = -1; size_t dependence_op_idx = ULLONG_MAX;
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
if (!IsCpuOp(instructions_->at(op_idx))) { if (!IsCpuOp(instructions_->at(op_idx))) {
if (dependence_op_idx != -1) { if (dependence_op_idx != ULLONG_MAX) {
AddDownstreamOp(dependence_op_idx, op_idx); AddDownstreamOp(dependence_op_idx, op_idx);
} }
dependence_op_idx = op_idx; dependence_op_idx = op_idx;
...@@ -274,9 +285,9 @@ void DependencyBuilder::AddDependencyForSequentialRun() { ...@@ -274,9 +285,9 @@ void DependencyBuilder::AddDependencyForSequentialRun() {
} }
} }
void DependencyBuilder::AddDownstreamOp(int prior_op_idx, void DependencyBuilder::AddDownstreamOp(size_t prior_op_idx,
int posterior_op_idx) { size_t posterior_op_idx) {
std::set<int>& downstream_ops = op_downstream_map_[prior_op_idx]; std::set<size_t>& downstream_ops = op_downstream_map_[prior_op_idx];
if (op_happens_before_.size() != 0) { if (op_happens_before_.size() != 0) {
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
...@@ -289,7 +300,7 @@ void DependencyBuilder::AddDownstreamOp(int prior_op_idx, ...@@ -289,7 +300,7 @@ void DependencyBuilder::AddDownstreamOp(int prior_op_idx,
posterior_op_idx, posterior_op_idx,
prior_op_idx)); prior_op_idx));
for (int op_idx : downstream_ops) { for (size_t op_idx : downstream_ops) {
if (op_happens_before_[op_idx][posterior_op_idx]) { if (op_happens_before_[op_idx][posterior_op_idx]) {
VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx
<< "->" << posterior_op_idx << ", skip adding " << prior_op_idx << "->" << posterior_op_idx << ", skip adding " << prior_op_idx
...@@ -322,30 +333,31 @@ void DependencyBuilder::AddDownstreamOp(int prior_op_idx, ...@@ -322,30 +333,31 @@ void DependencyBuilder::AddDownstreamOp(int prior_op_idx,
void DependencyBuilder::BuildDownstreamMap() { void DependencyBuilder::BuildDownstreamMap() {
auto var2min_rw_op = auto var2min_rw_op =
std::map<int, std::list<int>>(); // # map from variable id to read / std::map<size_t, std::list<size_t>>(); // # map from variable id to read
// write op id. // write op id.
auto var2recent_write_op = auto var2recent_write_op =
std::map<int, int>(); // # map from variable to recent write op. std::map<size_t, size_t>(); // # map from variable to recent write op.
auto op2dependences = auto op2dependences =
std::map<int, std::set<int>>(); //# map from op to the dependence list, std::map<size_t,
// op must run after the dependence. std::set<size_t>>(); //# map from op to the dependence list,
std::set<int> // op must run after the dependence.
std::set<size_t>
remove_duplicate; // remove the duplicate between inputs and outputs remove_duplicate; // remove the duplicate between inputs and outputs
// reserve // reserve
for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) { for (size_t op_idx = 0; op_idx < op_num_; ++op_idx) {
op2dependences[op_idx] = std::set<int>(); op2dependences[op_idx] = std::set<size_t>();
} }
auto update_var_min_rw_op = auto update_var_min_rw_op =
[](const std::map<int, std::set<int>>& op2dependences, [](const std::map<size_t, std::set<size_t>>& op2dependences,
std::map<int, std::list<int>>* var2min_rw_op, std::map<size_t, std::list<size_t>>* var2min_rw_op,
int cur_op, size_t cur_op,
int rw_var) { size_t rw_var) {
// rw_var is inputs or outputs of cur_op // rw_var is inputs or outputs of cur_op
// this function update the var2min_rw_op set . // this function update the var2min_rw_op set .
if (var2min_rw_op->find(rw_var) == var2min_rw_op->end()) { if (var2min_rw_op->find(rw_var) == var2min_rw_op->end()) {
(*var2min_rw_op)[rw_var] = std::list<int>(); (*var2min_rw_op)[rw_var] = std::list<size_t>();
} }
for (auto dep_op : op2dependences.at(cur_op)) { for (auto dep_op : op2dependences.at(cur_op)) {
var2min_rw_op->at(rw_var).remove(dep_op); var2min_rw_op->at(rw_var).remove(dep_op);
...@@ -392,7 +404,7 @@ void DependencyBuilder::BuildDownstreamMap() { ...@@ -392,7 +404,7 @@ void DependencyBuilder::BuildDownstreamMap() {
instructions_->at(op_idx).Outputs()) { // for all write vars instructions_->at(op_idx).Outputs()) { // for all write vars
for (auto var : item.second) { for (auto var : item.second) {
var2recent_write_op[var] = op_idx; var2recent_write_op[var] = op_idx;
var2min_rw_op[var] = {static_cast<int>(op_idx)}; var2min_rw_op[var] = {static_cast<size_t>(op_idx)};
remove_duplicate.insert(var); remove_duplicate.insert(var);
} }
} }
...@@ -407,7 +419,7 @@ void DependencyBuilder::BuildDownstreamMap() { ...@@ -407,7 +419,7 @@ void DependencyBuilder::BuildDownstreamMap() {
for (auto& p : m) { for (auto& p : m) {
auto var = p.second; auto var = p.second;
var2recent_write_op[var] = op_idx; var2recent_write_op[var] = op_idx;
var2min_rw_op[var] = {static_cast<int>(op_idx)}; var2min_rw_op[var] = {static_cast<size_t>(op_idx)};
remove_duplicate.insert(var); remove_duplicate.insert(var);
} }
} }
...@@ -428,7 +440,7 @@ void DependencyBuilder::BuildDownstreamMap() { ...@@ -428,7 +440,7 @@ void DependencyBuilder::BuildDownstreamMap() {
// next instruction of op. The size of downstream != size of op2dependences // next instruction of op. The size of downstream != size of op2dependences
// since there are some ops that have no downstream-op. // since there are some ops that have no downstream-op.
for (auto& item : op2dependences) { for (auto& item : op2dependences) {
int op = item.first; size_t op = item.first;
for (auto dep_op : item.second) { for (auto dep_op : item.second) {
AddDownstreamOp(dep_op, op); AddDownstreamOp(dep_op, op);
} }
...@@ -491,7 +503,7 @@ void DependencyBuilder::ShrinkDownstreamMap() { ...@@ -491,7 +503,7 @@ void DependencyBuilder::ShrinkDownstreamMap() {
continue; continue;
} }
std::set<int> minumum_nexts; std::set<size_t> minumum_nexts;
for (size_t item : op_downstream_map_.at(i)) { for (size_t item : op_downstream_map_.at(i)) {
bool not_after_any = true; bool not_after_any = true;
// find the op that is not executed after any // find the op that is not executed after any
......
...@@ -35,10 +35,12 @@ class DependencyBuilder { ...@@ -35,10 +35,12 @@ class DependencyBuilder {
// build op dependencies and return the mapping from op to its downstream-op // build op dependencies and return the mapping from op to its downstream-op
// set // set
const std::map<int, std::set<int>>& Build( const std::map<size_t, std::set<size_t>>& Build(
const std::vector<Instruction>& instructions, bool is_sequential_run); const std::vector<Instruction>& instructions, bool is_sequential_run);
bool OpHappensBefore(int prior_op_idx, int posterior_op_idx); const std::map<size_t, std::set<size_t>>& OpDownstreamMap() const;
bool OpHappensBefore(size_t prior_op_idx, size_t posterior_op_idx) const;
private: private:
void AddDependencyForCoalesceTensorOp(); void AddDependencyForCoalesceTensorOp();
...@@ -47,7 +49,7 @@ class DependencyBuilder { ...@@ -47,7 +49,7 @@ class DependencyBuilder {
void AddDependencyForReadOp(); void AddDependencyForReadOp();
void AddDependencyForSequentialRun(); void AddDependencyForSequentialRun();
void AddDownstreamOp(int prior_op_idx, int posterior_op_idx); void AddDownstreamOp(size_t prior_op_idx, size_t posterior_op_idx);
void BuildDownstreamMap(); void BuildDownstreamMap();
...@@ -65,7 +67,7 @@ class DependencyBuilder { ...@@ -65,7 +67,7 @@ class DependencyBuilder {
// op_downstream_map_ is the mapping from op to its downstream-op set, that is // op_downstream_map_ is the mapping from op to its downstream-op set, that is
// to say, op_downstream_map_[i] == {a, b, c} means op[a], op[b] and op[c] // to say, op_downstream_map_[i] == {a, b, c} means op[a], op[b] and op[c]
// should be dispatched after op[i] // should be dispatched after op[i]
std::map<int, std::set<int>> op_downstream_map_; std::map<size_t, std::set<size_t>> op_downstream_map_;
}; };
} // namespace interpreter } // namespace interpreter
......
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpreter/event_manager.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
namespace interpreter {
void WaitEvent(const Instruction& instruction, const platform::Place& place) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place)) {
return;
}
VLOG(3) << "Deal StreamWaitEventOrSync for " << instruction.OpBase()->Type();
for (const auto& event_iter : instruction.InputEvents()) {
platform::RecordEvent record(
"WaitStreamEvent", platform::TracerEventType::UserDefined, 10);
VLOG(3) << "wait var_id: " << event_iter.var_id_
<< " 's event with waiter_type: " << event_iter.waiter_type_;
event_iter.event_->Wait(event_iter.waiter_type_,
&instruction.DeviceContext());
}
}
void RecordEvent(const Instruction& instruction, const platform::Place& place) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place)) {
return;
}
for (const auto& event : instruction.OutputEvents()) {
platform::RecordEvent record(
"RecordStreamEvent", platform::TracerEventType::UserDefined, 10);
VLOG(3) << "Record event in out_var_id: " << event.var_id_;
event.event_->Record(&instruction.DeviceContext());
}
}
void RecordEvent(const Instruction& instruction) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(instruction.DeviceContext().GetPlace())) return;
for (auto& event : instruction.OutputEvents()) {
platform::RecordEvent record(
"RecordStreamEvent", platform::TracerEventType::UserDefined, 10);
VLOG(3) << "Record event in out_var_id: " << event.var_id_;
event.event_->Record(&instruction.DeviceContext());
}
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
namespace paddle {
namespace framework {
namespace interpreter {
void RecordEvent(const Instruction& instruction, const platform::Place& place);
void RecordEvent(const Instruction& instruction);
void WaitEvent(const Instruction& instruction, const platform::Place& place);
} // namespace interpreter
} // namespace framework
} // namespace paddle
...@@ -116,14 +116,13 @@ AsyncWorkQueue::AsyncWorkQueue(size_t host_num_threads, ...@@ -116,14 +116,13 @@ AsyncWorkQueue::AsyncWorkQueue(size_t host_num_threads,
void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type,
std::function<void()> fn) { std::function<void()> fn) {
VLOG(4) << "Add task: " << static_cast<size_t>(op_func_type) << " "; // queue_idx=0 : kCpuSync or kGpuSync
// NOTE(zhiqiu): use the second queue of size of, so only one thread is used. // queue_idx=1 : kGPUAsync
if (FLAGS_new_executor_serial_run) { // when serial_run, always make queue_idx=1, so only one thread is used
queue_group_->AddTask(static_cast<size_t>(OpFuncType::kQueueAsync), size_t queue_idx =
std::move(fn)); (op_func_type == OpFuncType::kGpuAsync || FLAGS_new_executor_serial_run);
} else { VLOG(8) << "Add task: " << queue_idx;
queue_group_->AddTask(static_cast<size_t>(op_func_type), std::move(fn)); queue_group_->AddTask(queue_idx, std::move(fn));
}
} }
bool IsCommunicationOp(const std::string& op_name) { bool IsCommunicationOp(const std::string& op_name) {
...@@ -303,7 +302,7 @@ void BuildVariableScope(const framework::BlockDesc& block, ...@@ -303,7 +302,7 @@ void BuildVariableScope(const framework::BlockDesc& block,
OpFuncType AnalyseOpFuncType(const OpFuncNode& op_func_node, OpFuncType AnalyseOpFuncType(const OpFuncNode& op_func_node,
const platform::Place& place) { const platform::Place& place) {
if (platform::is_cpu_place(place)) { if (platform::is_cpu_place(place)) {
return OpFuncType::kQueueSync; return OpFuncType::kCpuSync;
} }
PADDLE_ENFORCE_EQ(IsSupportedHeterPlace(place), PADDLE_ENFORCE_EQ(IsSupportedHeterPlace(place),
...@@ -312,19 +311,19 @@ OpFuncType AnalyseOpFuncType(const OpFuncNode& op_func_node, ...@@ -312,19 +311,19 @@ OpFuncType AnalyseOpFuncType(const OpFuncNode& op_func_node,
// Some GPU OPs do not launch CUDA Kernel, but spend a lot of time on CPU // Some GPU OPs do not launch CUDA Kernel, but spend a lot of time on CPU
// computing. They execute serially in device thread and block CUDA kernel // computing. They execute serially in device thread and block CUDA kernel
// launching in other GPU OPs. To improve performance, set them as kQueueSync // launching in other GPU OPs. To improve performance, set them as kGpuSync
// and so that they would be dispatched to host thread. // and so that they would be dispatched to host thread.
std::shared_ptr<OperatorBase> op = op_func_node.operator_base_; std::shared_ptr<OperatorBase> op = op_func_node.operator_base_;
if (op->Type() == kCoalesceTensor && if (op->Type() == kCoalesceTensor &&
op->Attr<bool>("set_constant") == false && op->Attr<bool>("set_constant") == false &&
op->Attr<bool>("copy_data") == false) { op->Attr<bool>("copy_data") == false) {
return OpFuncType::kQueueSync; return OpFuncType::kGpuSync;
} }
if (op->Type() == "shape") { if (op->Type() == "shape") {
return OpFuncType::kQueueSync; return OpFuncType::kGpuSync;
} }
return OpFuncType::kQueueAsync; return OpFuncType::kGpuAsync;
} }
void CreateAllOps(const framework::BlockDesc& block, void CreateAllOps(const framework::BlockDesc& block,
...@@ -478,14 +477,6 @@ void HandleOperatorBase(const platform::Place& place, ...@@ -478,14 +477,6 @@ void HandleOperatorBase(const platform::Place& place,
op_func_node->type_ = AnalyseOpFuncType(*op_func_node, place); op_func_node->type_ = AnalyseOpFuncType(*op_func_node, place);
op_func_node->kernel_func_ = nullptr; op_func_node->kernel_func_ = nullptr;
op_base->Run(*local_scope, place); // Run without data transformer. op_base->Run(*local_scope, place); // Run without data transformer.
std::unordered_set<int> no_data_transform_index;
for (auto& it : op_func_node->input_index) {
for (auto& id : it.second) {
no_data_transform_index.emplace(id);
}
}
op_func_node->no_data_transform_index =
no_data_transform_index; // all index is no-need-transform
op_func_node->dev_ctx_ = dev_ctx; op_func_node->dev_ctx_ = dev_ctx;
} }
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpreter/stream_analyzer.h"
#include <future>
#include <unordered_set>
#include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace interpreter {
using DeviceContext = platform::DeviceContext;
using DeviceEvent = platform::DeviceEvent;
class ContextManager {
public:
using DeviceContextMap =
std::map<Place, std::shared_future<std::unique_ptr<DeviceContext>>>;
static ContextManager& Instance() {
static ContextManager* ctx_manager = new ContextManager;
return *ctx_manager;
}
std::shared_future<std::unique_ptr<DeviceContext>> Get(
const std::string& type, const platform::Place& place) {
std::lock_guard<std::mutex> lk(ctx_mtx_);
VLOG(6) << "Get dev_ctx for " << type << " - " << place;
DeviceContextMap& ctxs = ctx_pool_[type];
if (ctxs.find(place) == ctxs.end()) {
platform::EmplaceDeviceContexts(
&ctxs,
{place},
/*disable_setting_default_stream_for_allocator=*/true);
}
return ctxs[place];
}
private:
ContextManager() {}
DISABLE_COPY_AND_ASSIGN(ContextManager);
std::mutex ctx_mtx_;
std::unordered_map<std::string, DeviceContextMap> ctx_pool_;
};
inline std::string RunTypeToString(DownstreamRunType run_type) {
if (run_type == DownstreamRunType::kDirectRun) {
return "DirectRun";
} else if (run_type == DownstreamRunType::kSyncRun) {
return "SyncRun";
} else {
return "EventRun";
}
}
void StreamAnalyzer::ConstructEvents(
const DependencyBuilder& dependency_builder,
std::vector<Instruction>* instructions) const {
const std::map<size_t, std::set<size_t>>& downstream_map =
dependency_builder.OpDownstreamMap();
const size_t instr_num = instructions->size();
std::vector<std::vector<std::vector<size_t>>> run_type_info(
instr_num,
std::vector<std::vector<size_t>>(
/*number_of_run_type = */ 3)); // instr_id -> run_type ->
// next_instr_id
AnalyseAllRunType(*instructions, downstream_map, &run_type_info);
std::map<const DeviceContext*, std::map<size_t, std::set<size_t>>>
event_info; // DeviceContext -> waiter_instr_id -> recorder_instr_ids
AnalyseAllEventInfo(*instructions, run_type_info, &event_info);
ShrinkEventInfo(dependency_builder, &event_info);
// Construct events
std::map<size_t, std::shared_ptr<DeviceEvent>> instr2event;
for (auto& context_item : event_info) {
for (auto& waiter_item : context_item.second) {
size_t waiter_instr_id = waiter_item.first;
std::set<size_t>& recorder_instr_ids = waiter_item.second;
for (size_t recorder_instr_id : recorder_instr_ids) {
Instruction& recorder_instr = instructions->at(recorder_instr_id);
Instruction& waiter_instr = instructions->at(waiter_instr_id);
platform::DeviceType waiter_type = GetWaiterType(waiter_instr);
if (instr2event.find(recorder_instr_id) == instr2event.end()) {
std::shared_ptr<DeviceEvent> device_event =
std::make_shared<DeviceEvent>(
recorder_instr.DeviceContext().GetPlace(),
platform::GenerateDeviceEventFlag());
recorder_instr.AddEventToRecord(device_event,
platform::kCUDA /*unused*/);
instr2event.emplace(recorder_instr_id, device_event);
}
waiter_instr.AddEventToWait(
recorder_instr_id, instr2event.at(recorder_instr_id), waiter_type);
VLOG(6) << "Add event: " << recorder_instr.OpBase()->Type() << "("
<< recorder_instr_id << ") -> " << waiter_instr.OpBase()->Type()
<< "(" << waiter_instr_id << "), waiter type = " << waiter_type;
}
}
}
}
DeviceContext* StreamAnalyzer::ParseDeviceContext(
const OpFuncNode& op_func_node) const {
auto& op = op_func_node.operator_base_;
auto& op_type = op->Type();
const std::string& execution_stream = op_func_node.execution_stream_;
ContextManager& ctx_manager = ContextManager::Instance();
// only gpu/npu need update. xpu not need, because xpu memcpy op kernel is
// synchronous.
if (platform::is_gpu_place(place_) || platform::is_npu_place(place_) ||
platform::is_custom_place(place_)) {
VLOG(6) << "Parse DeviceContext for " << op_type
<< ", execution stream = " << execution_stream;
if (execution_stream != kDefaultStream) {
return ctx_manager
.Get(std::string(kCustomStream) + "-" + execution_stream, place_)
.get()
.get();
}
if (op_type == interpreter::kMemcpyD2H) {
return ctx_manager.Get(std::string(kD2HStream), place_).get().get();
} else if (op_type == interpreter::kMemcpyH2D) {
return ctx_manager.Get(std::string(kH2DStream), place_).get().get();
}
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
// NOTE(Ruibiao): Here supports multi-stream overlap for c_allreduce_sum
// with use_cal_stream==false by returning a device context getting from the
// global NCCLCommContext instance. Because when use_calc_stream==false, in
// OP kernel, the NCCL communication will be launched to the stream directly
// getting from the global NCCLCommContext instance rather than the
// DeviceContext passed from executor (see CAllReduceOpCUDAKernel in
// c_allreduce_op.h). Now it is just a temporary solution for ONLY
// c_allreduce_sum which is used in ResNet50 distributed training.
if (op_type == "c_allreduce_sum" &&
op->Attr<bool>("use_calc_stream") == false) {
int ring_id = op->Attr<int>("ring_id");
return platform::NCCLCommContext::Instance()
.Get(ring_id, place_)
->dev_context();
}
#endif
}
return op_func_node.dev_ctx_;
}
bool StreamAnalyzer::HasDataDependency(const Instruction& cur_instr,
const Instruction& next_instr) const {
auto no_need_buffer_ins =
[](const Instruction& instr) -> const std::unordered_set<std::string> {
auto* op = instr.OpBase();
auto& inferer = op->Info().NoNeedBufferVarsInferer();
if (inferer) {
return inferer(op->Inputs(), op->Outputs(), op->Attrs());
}
return std::unordered_set<std::string>();
};
// cur_instr->var->next_instr
std::unordered_set<size_t> cur_var_ids;
for (auto& item : cur_instr.Outputs()) {
cur_var_ids.insert(item.second.begin(), item.second.end());
}
const std::unordered_set<std::string> next_instr_no_need_buffer_ins =
no_need_buffer_ins(next_instr);
for (auto& item : next_instr.Inputs()) {
if (next_instr_no_need_buffer_ins.find(item.first) !=
next_instr_no_need_buffer_ins.end()) {
continue;
}
for (auto next_var_id : item.second) {
if (cur_var_ids.find(next_var_id) != cur_var_ids.end()) {
VLOG(6) << "Found data dependency from " << cur_instr.OpBase()->Type()
<< "(" << cur_instr.Id() << ") to "
<< next_instr.OpBase()->Type() << "(" << next_instr.Id()
<< ") at variable " << item.first << "(" << next_var_id << ")";
return true;
}
}
}
// cur_instr->var && next_instr->var
// var->cur_instr && next_instr->var
const std::unordered_set<std::string> cur_instr_no_need_buffer_ins =
no_need_buffer_ins(cur_instr);
for (auto& item : cur_instr.Inputs()) {
if (cur_instr_no_need_buffer_ins.find(item.first) ==
cur_instr_no_need_buffer_ins.end()) {
cur_var_ids.insert(item.second.begin(), item.second.end());
}
}
for (auto& item : next_instr.Outputs()) {
for (auto next_var_id : item.second) {
if (cur_var_ids.find(next_var_id) != cur_var_ids.end()) {
VLOG(6) << "Found data dependency from " << cur_instr.OpBase()->Type()
<< "(" << cur_instr.Id() << ") to "
<< next_instr.OpBase()->Type() << "(" << next_instr.Id()
<< ") at variable " << item.first << "(" << next_var_id << ")";
return true;
}
}
}
return false;
}
void StreamAnalyzer::AnalyseAllEventInfo(
const std::vector<Instruction>& instructions,
const std::vector<std::vector<std::vector<size_t>>>& run_type_info,
std::map<const DeviceContext*, std::map<size_t, std::set<size_t>>>*
event_info) const {
for (size_t cur_instr_id = 0; cur_instr_id < instructions.size();
++cur_instr_id) {
const Instruction& cur_instr = instructions[cur_instr_id];
const std::vector<std::vector<size_t>>& next_instr_list =
run_type_info[cur_instr_id];
std::set<size_t> waiter_instr_ids;
std::vector<size_t> next_instr_ids =
next_instr_list[DownstreamRunType::kSyncRun];
next_instr_ids.insert(next_instr_ids.end(),
next_instr_list[DownstreamRunType::kEventRun].begin(),
next_instr_list[DownstreamRunType::kEventRun].end());
for (size_t next_instr_id : next_instr_ids) {
AnalyseEventInfoForTwoInstructions(instructions,
run_type_info,
cur_instr_id,
next_instr_id,
&waiter_instr_ids);
}
for (size_t waiter_instr_id : waiter_instr_ids) {
(*event_info)[&(cur_instr.DeviceContext())][waiter_instr_id].insert(
cur_instr_id);
}
}
}
void StreamAnalyzer::AnalyseAllRunType(
const std::vector<Instruction>& instructions,
const std::map<size_t, std::set<size_t>>& downstream_map,
std::vector<std::vector<std::vector<size_t>>>* run_type_info) const {
for (auto& item : downstream_map) {
size_t cur_instr_id = item.first;
const Instruction& cur_instr = instructions[item.first];
for (size_t next_instr_id : item.second) {
const Instruction& next_instr = instructions[next_instr_id];
DownstreamRunType run_type =
AnalyseRunTypeForTwoInstructions(cur_instr, next_instr);
(*run_type_info)[cur_instr_id][run_type].push_back(next_instr_id);
VLOG(6) << RunTypeToString(run_type) << ": " << cur_instr.OpBase()->Type()
<< "(" << cur_instr_id << ") -> " << next_instr.OpBase()->Type()
<< "(" << next_instr_id << ")";
}
}
}
// The caller should guarantee cur_instr and next_instr is kSyncRun or kEventRun
void StreamAnalyzer::AnalyseEventInfoForTwoInstructions(
const std::vector<Instruction>& instructions,
const std::vector<std::vector<std::vector<size_t>>>& run_type_info,
const size_t cur_instr_id,
const size_t next_instr_id,
std::set<size_t>* waiter_instr_ids) const {
// NOTE(Ruibiao): Though depend_op as next_instr is no_need_buffer, we should
// also wait event for it. Because depend_op is used to build dependencies for
// fused vars in some scenarios. In those cases, we do not know which vars may
// lead a implicit data dependency. For example,
// ###
// ### fused_var = fuse_op(var0, ...)
// ### var1 = op1(fused_var)
// ### var0 = depend_op(var0, fused_var)
// ### var2 = op2(var0)
// ###
// If op1 are cross-stream with depend_op and op2, then we have:
// ###
// ### event_run : op1 -> depend_op
// ### direct_run : depend_op -> op2
// ###
// There is actually a data dependency between op1 and op2 that var0 and
// fused_var share the same tensor. However, as the dependency is implicit, we
// can only add event for it with the help of depend_op.
if (HasDataDependency(instructions[cur_instr_id],
instructions[next_instr_id]) ||
run_type_info[next_instr_id][DownstreamRunType::kSyncRun].size() ||
run_type_info[next_instr_id][DownstreamRunType::kEventRun].size() ||
instructions[next_instr_id].OpBase()->Type() == "depend") {
waiter_instr_ids->insert(next_instr_id);
return;
}
// NOTE(Ruibiao): If no data dependency from cur_instr to next_instr, and
// simultaneously next_instr has neither sync_run nor event_run downstream
// instr, we try to recursively add events between cur_instr and next_instr's
// direct-run-instrs. This can delay the event wait and achieve better
// scheduling performance in some scenarios. However, when next_instr has too
// many direct-run-instrs, it may perform worse than add event directly
// between cur_instr and next_instr.
for (size_t instr_id :
run_type_info[next_instr_id][DownstreamRunType::kDirectRun]) {
AnalyseEventInfoForTwoInstructions(
instructions, run_type_info, cur_instr_id, instr_id, waiter_instr_ids);
}
}
// waiter instr should only wait events for the last recorder instrs in each
// stream
void StreamAnalyzer::ShrinkEventInfo(
const DependencyBuilder& dependency_builder,
std::map<const DeviceContext*, std::map<size_t, std::set<size_t>>>*
event_info) const {
for (auto& context_item : *event_info) {
for (auto& waiter_item : context_item.second) {
size_t waiter_instr_id = waiter_item.first;
std::set<size_t>& recorder_instr_ids = waiter_item.second;
std::set<size_t> unnecessary_recorder_instr_ids;
for (size_t cur_instr_id : recorder_instr_ids) {
for (size_t next_instr_id : recorder_instr_ids) {
if (dependency_builder.OpHappensBefore(cur_instr_id, next_instr_id)) {
unnecessary_recorder_instr_ids.insert(cur_instr_id);
break;
}
}
}
for (size_t unnecessary_recorder_instr_id :
unnecessary_recorder_instr_ids) {
VLOG(8) << "Shrink event : " << unnecessary_recorder_instr_id << " -> "
<< waiter_instr_id;
recorder_instr_ids.erase(unnecessary_recorder_instr_id);
}
}
}
}
platform::DeviceType StreamAnalyzer::GetWaiterType(
const Instruction& instr) const {
if (instr.KernelType() == OpFuncType::kCpuSync) {
return platform::kCPU;
} else {
if (platform::is_xpu_place(place_)) {
return platform::kXPU;
} else if (platform::is_npu_place(place_)) {
return platform::kNPU;
} else if (platform::is_custom_place(place_)) {
return platform::kCUSTOM_DEVICE;
}
return platform::kCUDA;
}
}
DownstreamRunType StreamAnalyzer::AnalyseRunTypeForTwoInstructions(
const Instruction& cur_instr, const Instruction& next_instr) const {
// xpu&ipu memcpy kerenl is synchronous.
if (platform::is_ipu_place(place_) || platform::is_xpu_place(place_)) {
return DownstreamRunType::kDirectRun;
}
// npu d2h kernel is asynchronous.
if (platform::is_npu_place(place_) || platform::is_custom_place(place_)) {
if (interpreter::IsCpuOp(cur_instr) ||
interpreter::IsMemcpyH2D(next_instr)) {
return DownstreamRunType::kDirectRun;
}
}
if (cur_instr.KernelType() == OpFuncType::kGpuAsync) {
if (next_instr.KernelType() == OpFuncType::kCpuSync) {
return DownstreamRunType::kSyncRun;
} else {
// cross-stream: kGpuAsync -> kGpuSync, kGpuAsync -> kGpuSync
if (&cur_instr.DeviceContext() != &next_instr.DeviceContext()) {
return DownstreamRunType::kEventRun;
}
}
}
return DownstreamRunType::kDirectRun;
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. // Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
...@@ -17,45 +17,67 @@ ...@@ -17,45 +17,67 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/device_event.h" #include "paddle/fluid/platform/device_event.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace interpreter {
enum DownstreamRunType { kDirectRun, kSyncRun, kEventRun };
class StreamAnalyzer { class StreamAnalyzer {
public: public:
using Place = platform::Place;
using DeviceContext = platform::DeviceContext; using DeviceContext = platform::DeviceContext;
using Place = platform::Place;
explicit StreamAnalyzer(const Place& place) : place_(place) {} explicit StreamAnalyzer(const Place& place) : place_(place) {}
~StreamAnalyzer() {} ~StreamAnalyzer() {}
void Schedule(const std::vector<size_t>& downstream_ops, void ConstructEvents(const DependencyBuilder& dependency_builder,
std::vector<Instruction>* instructions, std::vector<Instruction>* instructions) const;
size_t op_index);
DeviceContext* ParseDeviceContext(const OpFuncNode& op_func_node); platform::DeviceContext* ParseDeviceContext(
const OpFuncNode& op_func_node) const;
private: private:
std::vector<size_t> GetNeedEventVarIds(const Instruction& cur_instr, bool HasDataDependency(const Instruction& cur_instr,
const Instruction& next_instr); const Instruction& next_instr) const;
void AnalyseAllEventInfo(
const std::vector<Instruction>& instructions,
const std::vector<std::vector<std::vector<size_t>>>& run_type_info,
std::map<const DeviceContext*, std::map<size_t, std::set<size_t>>>*
event_info) const;
void AnalyseAllRunType(
const std::vector<Instruction>& instructions,
const std::map<size_t, std::set<size_t>>& downstream_map,
std::vector<std::vector<std::vector<size_t>>>* run_type_info) const;
void AnalyseEventInfoForTwoInstructions(
const std::vector<Instruction>& instructions,
const std::vector<std::vector<std::vector<size_t>>>& run_type_info,
const size_t cur_instr_id,
const size_t next_instr_id,
std::set<size_t>* waiter_instr_ids) const;
void ConstructEventForVar(const std::vector<size_t>& new_event_var_id, void ShrinkEventInfo(
Instruction* next_instr, const DependencyBuilder& dependency_builder,
platform::DeviceType waiter_type, std::map<const DeviceContext*, std::map<size_t, std::set<size_t>>>*
const Place& place); event_info_map) const;
bool IsDirectRun(Instruction& cur_instr, // NOLINT platform::DeviceType GetWaiterType(const Instruction& instr) const;
const Instruction& next_instr);
platform::DeviceType GetWaiterType(const Instruction& instr); DownstreamRunType AnalyseRunTypeForTwoInstructions(
const Instruction& cur_instr, const Instruction& next_instr) const;
const Place place_; const Place place_;
std::map<size_t, std::shared_ptr<platform::DeviceEvent>> var_id2event_;
}; };
} // namespace interpreter
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -117,8 +117,8 @@ InterpreterCore::InterpreterCore(const platform::Place& place, ...@@ -117,8 +117,8 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
: place_(place), : place_(place),
block_(block), block_(block),
execution_config_(place, block.OpSize()), execution_config_(place, block.OpSize()),
var_scope_(scope), stream_analyzer_(place),
stream_analyzer_(place) { var_scope_(scope) {
VLOG(4) << "InterpreterCore(): " << this << " on " << place_; VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught); exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
...@@ -488,20 +488,43 @@ void InterpreterCore::BuildInplace() { ...@@ -488,20 +488,43 @@ void InterpreterCore::BuildInplace() {
} }
void InterpreterCore::BuildOperatorDependences() { void InterpreterCore::BuildOperatorDependences() {
// analysis the dependences between ops, set the dependecy_count_ and Call // analysis the dependences between ops, add next_instr_list to each instr,
// Schedule // and set the dependecy_count_
auto op_nums = vec_instruction_.size(); size_t instr_num = vec_instruction_.size();
dependecy_count_.resize(op_nums); dependecy_count_.resize(instr_num);
auto op2downstream = dependency_builder_.Build( auto downstream_map = dependency_builder_.Build(
vec_instruction_, vec_instruction_,
/*is_sequential_run=*/FLAGS_new_executor_sequential_run); /*is_sequential_run=*/FLAGS_new_executor_sequential_run);
for (size_t op = 0; op < vec_instruction_.size(); ++op) {
auto op_list = op2downstream[op];
std::vector<size_t> downsteam_vector(op_list.begin(), op_list.end());
stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op);
for (auto inst_id : op_list) { for (size_t instr_id = 0; instr_id < instr_num; ++instr_id) {
dependecy_count_[inst_id]++; Instruction& cur_instr = vec_instruction_[instr_id];
const std::set<size_t>& next_instr_ids = downstream_map[instr_id];
if (cur_instr.KernelType() == OpFuncType::kGpuAsync) {
for (size_t next_instr_id : next_instr_ids) {
if (vec_instruction_[next_instr_id].KernelType() ==
OpFuncType::kGpuAsync) {
cur_instr.AddNextInstrInSameThread(next_instr_id);
} else {
cur_instr.AddNextInstrInDifferentThread(next_instr_id);
}
}
} else {
bool has_instr_in_same_thread = false;
for (size_t next_instr_id : next_instr_ids) {
if (!has_instr_in_same_thread &&
vec_instruction_[next_instr_id].KernelType() !=
OpFuncType::kGpuAsync) {
cur_instr.AddNextInstrInSameThread(next_instr_id);
has_instr_in_same_thread = true;
} else {
cur_instr.AddNextInstrInDifferentThread(next_instr_id);
}
}
}
for (size_t next_instr_id : next_instr_ids) {
++dependecy_count_[next_instr_id];
} }
} }
} }
...@@ -538,6 +561,8 @@ void InterpreterCore::Convert( ...@@ -538,6 +561,8 @@ void InterpreterCore::Convert(
BuildOperatorDependences(); BuildOperatorDependences();
stream_analyzer_.ConstructEvents(dependency_builder_, &vec_instruction_);
// calculate last_live_ops_ // calculate last_live_ops_
for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) {
Instruction& instr = vec_instruction_[op_idx]; Instruction& instr = vec_instruction_[op_idx];
...@@ -635,7 +660,7 @@ void InterpreterCore::Convert( ...@@ -635,7 +660,7 @@ void InterpreterCore::Convert(
BuildSkipShareLoDInfo(); BuildSkipShareLoDInfo();
bool inplaced = false; bool inplaced = false;
for (auto inst : vec_instruction_) { for (const Instruction& inst : vec_instruction_) {
if (inst.OpBase()->Type() == "share_buffer" || if (inst.OpBase()->Type() == "share_buffer" ||
inst.OpBase()->Type() == "share_data") { inst.OpBase()->Type() == "share_data") {
VLOG(4) << "Already inplaced, skip inplace now."; VLOG(4) << "Already inplaced, skip inplace now.";
...@@ -843,7 +868,6 @@ void InterpreterCore::RunNextInstructions( ...@@ -843,7 +868,6 @@ void InterpreterCore::RunNextInstructions(
const Instruction& instr, std::deque<size_t>* reserved_next_ops) { const Instruction& instr, std::deque<size_t>* reserved_next_ops) {
platform::RecordEvent record( platform::RecordEvent record(
"RunNextInstructions", platform::TracerEventType::UserDefined, 10); "RunNextInstructions", platform::TracerEventType::UserDefined, 10);
auto& next_instr = instr.NextInstructions();
auto IsReady = [this](size_t next_id) { auto IsReady = [this](size_t next_id) {
VLOG(4) << "op_id: " << next_id VLOG(4) << "op_id: " << next_id
...@@ -851,67 +875,22 @@ void InterpreterCore::RunNextInstructions( ...@@ -851,67 +875,22 @@ void InterpreterCore::RunNextInstructions(
return deps_[next_id]->CheckAndDecrease(); return deps_[next_id]->CheckAndDecrease();
}; };
if (instr.KernelType() == OpFuncType::kQueueAsync) { for (size_t next_instr_id : instr.NextInstrsInDifferenceThread()) {
// move all sync_ops into other threads if (IsReady(next_instr_id)) {
for (size_t next_id : next_instr.SyncRunIds()) { async_work_queue_->AddTask(
if (IsReady(next_id)) { vec_instruction_[next_instr_id].KernelType(),
async_work_queue_->AddTask( [this, next_instr_id]() { RunInstructionAsync(next_instr_id); });
vec_instruction_[next_id].KernelType(),
[this, next_id]() { RunInstructionAsync(next_id); });
}
}
// keep all async_ops running in current thread
for (size_t next_id : next_instr.DirectRunIds()) {
if (IsReady(next_id)) {
if (vec_instruction_[next_id].GetPriority() == Priority::kLowest) {
reserved_next_ops->push_back(next_id);
} else {
reserved_next_ops->push_front(next_id);
}
}
}
for (size_t next_id : next_instr.EventRunIds()) {
if (IsReady(next_id)) {
if (vec_instruction_[next_id].GetPriority() == Priority::kLowest) {
reserved_next_ops->push_back(next_id);
} else {
reserved_next_ops->push_front(next_id);
}
}
}
} else {
// move async_ops into async_thread
for (auto next_id : next_instr.EventRunIds()) {
if (IsReady(next_id)) {
async_work_queue_->AddTask(
vec_instruction_[next_id].KernelType(),
[this, next_id] { RunInstructionAsync(next_id); });
}
} }
}
std::vector<size_t> direct_run_ops = next_instr.SyncRunIds(); for (size_t next_instr_id : instr.NextInstrsInSameThread()) {
direct_run_ops.insert(direct_run_ops.end(), if (IsReady(next_instr_id)) {
next_instr.DirectRunIds().begin(), if (vec_instruction_[next_instr_id].GetPriority() == Priority::kLowest) {
next_instr.DirectRunIds().end()); reserved_next_ops->push_back(next_instr_id);
} else {
int64_t first_op = -1; reserved_next_ops->push_front(next_instr_id);
for (auto next_id : direct_run_ops) {
if (IsReady(next_id)) {
// only keep one sync op running in current thread
if (first_op == -1 &&
vec_instruction_[next_id].KernelType() == OpFuncType::kQueueSync) {
first_op = next_id;
continue;
}
// move rest ops into other threads
async_work_queue_->AddTask(
vec_instruction_[next_id].KernelType(),
[this, next_id] { RunInstructionAsync(next_id); });
} }
} }
if (first_op != -1) {
reserved_next_ops->push_front(first_op);
}
} }
} }
...@@ -924,9 +903,11 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { ...@@ -924,9 +903,11 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) {
auto& instr_node = vec_instruction_.at(instr_id); auto& instr_node = vec_instruction_.at(instr_id);
VLOG(5) << __func__ << " OP id:" << instr_node.Id() VLOG(5) << __func__ << " OP id:" << instr_node.Id()
<< " name:" << instr_node.OpBase()->Type() << " type:" << " name:" << instr_node.OpBase()->Type() << " type:"
<< (instr_node.KernelType() == OpFuncType::kQueueSync << (instr_node.KernelType() == OpFuncType::kCpuSync
? "kQueueSync" ? "kCpuSync"
: "kQueueAsync") : (instr_node.KernelType() == OpFuncType::kGpuSync
? "kGpuSync"
: "kGpuAsync"))
<< " runs on " << platform::GetCurrentThreadName(); << " runs on " << platform::GetCurrentThreadName();
auto* op = instr_node.OpBase(); auto* op = instr_node.OpBase();
...@@ -934,7 +915,7 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { ...@@ -934,7 +915,7 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) {
op->Type(), platform::TracerEventType::Operator, 1); op->Type(), platform::TracerEventType::Operator, 1);
try { try {
interpreter::WaitEvent(instr_node, place_); instr_node.WaitEvent(place_);
if (!instr_node.IsArtificial()) { if (!instr_node.IsArtificial()) {
RunInstruction(instr_node); RunInstruction(instr_node);
...@@ -942,7 +923,7 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { ...@@ -942,7 +923,7 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) {
interpreter::LogDeviceMemoryStats(place_); interpreter::LogDeviceMemoryStats(place_);
} }
interpreter::RecordEvent(instr_node, place_); instr_node.RecordEvent(place_);
} catch (platform::EnforceNotMet& ex) { } catch (platform::EnforceNotMet& ex) {
framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex); framework::InsertCallStackInfo(op->Type(), op->Attrs(), &ex);
exception_holder_.Catch(std::make_exception_ptr(std::move(ex))); exception_holder_.Catch(std::make_exception_ptr(std::move(ex)));
...@@ -984,7 +965,7 @@ void InterpreterCore::RecordStreamForGC(const Instruction& instr) { ...@@ -984,7 +965,7 @@ void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
"RecordStreamForGC is only implemented when compiled with GPU.")); "RecordStreamForGC is only implemented when compiled with GPU."));
#else #else
if (!IsInterpretercoreFastGCEnabled() || if (!IsInterpretercoreFastGCEnabled() ||
instr.KernelType() != OpFuncType::kQueueAsync) { instr.KernelType() != OpFuncType::kGpuAsync) {
return; return;
} }
platform::RecordEvent record( platform::RecordEvent record(
......
...@@ -22,12 +22,11 @@ ...@@ -22,12 +22,11 @@
#include "paddle/fluid/framework/details/exception_holder.h" #include "paddle/fluid/framework/details/exception_holder.h"
#include "paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h" #include "paddle/fluid/framework/new_executor/garbage_collector/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h" #include "paddle/fluid/framework/new_executor/interpreter/dependency_builder.h"
#include "paddle/fluid/framework/new_executor/interpreter/event_manager.h"
#include "paddle/fluid/framework/new_executor/interpreter/execution_config.h" #include "paddle/fluid/framework/new_executor/interpreter/execution_config.h"
#include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h" #include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h"
#include "paddle/fluid/framework/new_executor/interpreter/stream_analyzer.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/profiler.h" #include "paddle/fluid/framework/new_executor/profiler.h"
#include "paddle/fluid/framework/new_executor/stream_analyzer.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
...@@ -117,6 +116,7 @@ class InterpreterCore { ...@@ -117,6 +116,7 @@ class InterpreterCore {
interpreter::DependencyBuilder dependency_builder_; interpreter::DependencyBuilder dependency_builder_;
interpreter::ExecutionConfig execution_config_; interpreter::ExecutionConfig execution_config_;
interpreter::StreamAnalyzer stream_analyzer_;
// NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will // NOTE(zhiqiu): when add fetch ops in GetInterpreterCore, we will
// copy a new program and block, the copy_program_ here is used to // copy a new program and block, the copy_program_ here is used to
...@@ -135,7 +135,6 @@ class InterpreterCore { ...@@ -135,7 +135,6 @@ class InterpreterCore {
VariableScope var_scope_; VariableScope var_scope_;
Scope* local_scope_{nullptr}; // not owned Scope* local_scope_{nullptr}; // not owned
StreamAnalyzer stream_analyzer_;
EventsWaiter main_thread_blocker_; EventsWaiter main_thread_blocker_;
std::shared_ptr<interpreter::AsyncWorkQueue> async_work_queue_; std::shared_ptr<interpreter::AsyncWorkQueue> async_work_queue_;
......
...@@ -19,10 +19,7 @@ ...@@ -19,10 +19,7 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/phi/core/utils/rw_lock.h" #include "paddle/fluid/platform/profiler/event_tracing.h"
#define SCOPE_VARS_READER_LOCK AutoRDLock auto_lock(&vars_lock_);
#define SCOPE_VARS_WRITER_LOCK AutoWRLock auto_lock(&vars_lock_);
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -686,7 +683,31 @@ Instruction::Instruction(size_t id, ...@@ -686,7 +683,31 @@ Instruction::Instruction(size_t id,
"Required id >= 0, but received id = %d", id)); "Required id >= 0, but received id = %d", id));
} }
size_t Instruction::Id() const { return id_; } void Instruction::WaitEvent(const Place& place) const {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place)) {
return;
}
VLOG(6) << "Deal StreamWaitEventOrSync for " << this->OpBase()->Type();
for (const EventInter& event_iter : events_to_wait_) {
platform::RecordEvent record(
"WaitStreamEvent", platform::TracerEventType::UserDefined, 10);
VLOG(6) << "Wait instruction: " << event_iter.instr_id_
<< " 's event with waiter_type: " << event_iter.waiter_type_;
event_iter.event_->Wait(event_iter.waiter_type_, &dev_ctx_);
}
}
void Instruction::RecordEvent(const Place& place) const {
platform::RecordEvent record(
"RecordStreamEvent", platform::TracerEventType::UserDefined, 10);
if (event_to_record_) {
VLOG(6) << "Record event at instruction: " << id_;
event_to_record_->event_->Record(&dev_ctx_);
}
}
const std::map<std::string, std::vector<int>>& Instruction::Inputs() const { const std::map<std::string, std::vector<int>>& Instruction::Inputs() const {
return op_func_node_.input_index; return op_func_node_.input_index;
...@@ -696,10 +717,6 @@ const std::map<std::string, std::vector<int>>& Instruction::Outputs() const { ...@@ -696,10 +717,6 @@ const std::map<std::string, std::vector<int>>& Instruction::Outputs() const {
return op_func_node_.output_index; return op_func_node_.output_index;
} }
const std::unordered_set<int>& Instruction::NoDataTransformVars() const {
return op_func_node_.no_data_transform_index;
}
OpKernelComputeFunc Instruction::KernelFunc() const { OpKernelComputeFunc Instruction::KernelFunc() const {
return op_func_node_.kernel_func_; return op_func_node_.kernel_func_;
} }
...@@ -722,14 +739,6 @@ OperatorBase* Instruction::OpBase() const { ...@@ -722,14 +739,6 @@ OperatorBase* Instruction::OpBase() const {
return op_base.get(); return op_base.get();
} }
NextInstructionList& Instruction::NextInstructions() {
return next_instruction_;
}
const NextInstructionList& Instruction::NextInstructions() const {
return next_instruction_;
}
void Instruction::AddGCCheckVar(size_t id) { gc_check_vars_.push_back(id); } void Instruction::AddGCCheckVar(size_t id) { gc_check_vars_.push_back(id); }
const std::vector<size_t>& Instruction::GCCheckVars() const { const std::vector<size_t>& Instruction::GCCheckVars() const {
...@@ -786,25 +795,5 @@ void Instruction::AddInplace(Variable* in, Variable* out) { ...@@ -786,25 +795,5 @@ void Instruction::AddInplace(Variable* in, Variable* out) {
void Instruction::ClearInplace() { vec_inplace_in_to_out_.clear(); } void Instruction::ClearInplace() { vec_inplace_in_to_out_.clear(); }
const std::vector<EventInter>& Instruction::InputEvents() const {
return intput_events_;
}
const std::vector<EventInter>& Instruction::OutputEvents() const {
return output_events_;
}
void Instruction::AddInputEvent(size_t var_id,
std::shared_ptr<platform::DeviceEvent> event,
platform::DeviceType waiter_type) {
intput_events_.emplace_back(var_id, event, waiter_type);
}
void Instruction::AddOutputEvent(size_t var_id,
std::shared_ptr<platform::DeviceEvent> event,
platform::DeviceType waiter_type) {
output_events_.emplace_back(var_id, event, waiter_type);
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -245,39 +245,20 @@ class VariableScope { ...@@ -245,39 +245,20 @@ class VariableScope {
std::vector<std::pair<std::string, int>> data_transfer_added_vars_; std::vector<std::pair<std::string, int>> data_transfer_added_vars_;
}; };
class NextInstructionList {
public:
void AddDirectRun(size_t id) { direct_run_.push_back(id); }
void ADDEventRun(size_t id) { event_wait_run_.push_back(id); }
void AddSyncRun(size_t id) { synchronize_run_.push_back(id); }
const std::vector<size_t>& DirectRunIds() const { return direct_run_; }
const std::vector<size_t>& EventRunIds() const { return event_wait_run_; }
const std::vector<size_t>& SyncRunIds() const { return synchronize_run_; }
private:
std::vector<size_t> direct_run_;
std::vector<size_t> event_wait_run_;
std::vector<size_t> synchronize_run_;
};
struct EventInter { struct EventInter {
explicit EventInter(size_t var_id, explicit EventInter(size_t instr_id,
std::shared_ptr<platform::DeviceEvent> event, std::shared_ptr<platform::DeviceEvent> event,
platform::DeviceType waiter_type) platform::DeviceType waiter_type)
: var_id_(var_id), event_(event), waiter_type_(waiter_type) {} : instr_id_(instr_id), event_(event), waiter_type_(waiter_type) {}
size_t var_id_; size_t instr_id_;
std::shared_ptr<platform::DeviceEvent> event_; std::shared_ptr<platform::DeviceEvent> event_;
platform::DeviceType waiter_type_; platform::DeviceType waiter_type_;
}; };
enum class OpFuncType { enum class OpFuncType {
kQueueSync = 0, // CPU kernel, block host kCpuSync, // CPU kernel, block host
kQueueAsync = 1, // GPU、XPU Kernel or d2h, h2d, send, recv, broadcast kGpuSync, // GPU or other device kernel without asynchronous operation
kGpuAsync // GPU or other device kernel with asynchronous operation
}; };
class RuntimeInferShapeContext; class RuntimeInferShapeContext;
...@@ -287,7 +268,6 @@ struct OpFuncNode { ...@@ -287,7 +268,6 @@ struct OpFuncNode {
std::string execution_stream_{kDefaultStream}; std::string execution_stream_{kDefaultStream};
std::map<std::string, std::vector<int>> input_index; std::map<std::string, std::vector<int>> input_index;
std::map<std::string, std::vector<int>> output_index; std::map<std::string, std::vector<int>> output_index;
std::unordered_set<int> no_data_transform_index;
std::map<int, int> inplace_back_map; std::map<int, int> inplace_back_map;
...@@ -309,7 +289,38 @@ class Instruction { ...@@ -309,7 +289,38 @@ class Instruction {
bool IsArtificial() const { return is_artificial_; } bool IsArtificial() const { return is_artificial_; }
size_t Id() const; const std::vector<size_t>& NextInstrsInDifferenceThread() const {
return next_instrs_in_different_thread;
}
const std::vector<size_t>& NextInstrsInSameThread() const {
return next_instrs_in_same_thread;
}
size_t Id() const { return id_; }
void AddEventToRecord(std::shared_ptr<platform::DeviceEvent> event,
platform::DeviceType waiter_type) {
event_to_record_ = std::make_unique<EventInter>(id_, event, waiter_type);
}
void AddEventToWait(size_t instr_id,
std::shared_ptr<platform::DeviceEvent> event,
platform::DeviceType waiter_type) {
events_to_wait_.emplace_back(instr_id, event, waiter_type);
}
void AddNextInstrInDifferentThread(size_t id) {
next_instrs_in_different_thread.push_back(id);
}
void AddNextInstrInSameThread(size_t id) {
next_instrs_in_same_thread.push_back(id);
}
void RecordEvent(const Place& place) const;
void WaitEvent(const Place& place) const;
const std::map<std::string, std::vector<int>>& Inputs() const; const std::map<std::string, std::vector<int>>& Inputs() const;
...@@ -327,10 +338,6 @@ class Instruction { ...@@ -327,10 +338,6 @@ class Instruction {
OperatorBase* OpBase() const; OperatorBase* OpBase() const;
NextInstructionList& NextInstructions();
const NextInstructionList& NextInstructions() const;
void AddGCCheckVar(size_t id); void AddGCCheckVar(size_t id);
const std::vector<size_t>& GCCheckVars() const; const std::vector<size_t>& GCCheckVars() const;
...@@ -357,18 +364,6 @@ class Instruction { ...@@ -357,18 +364,6 @@ class Instruction {
void ClearInplace(); void ClearInplace();
const std::vector<EventInter>& InputEvents() const;
const std::vector<EventInter>& OutputEvents() const;
void AddInputEvent(size_t var_id,
std::shared_ptr<platform::DeviceEvent> event,
platform::DeviceType waiter_type);
void AddOutputEvent(size_t var_id,
std::shared_ptr<platform::DeviceEvent> event,
platform::DeviceType waiter_type);
Priority GetPriority() const { return priority_; } Priority GetPriority() const { return priority_; }
private: private:
...@@ -376,6 +371,13 @@ class Instruction { ...@@ -376,6 +371,13 @@ class Instruction {
// to assist scheduling and no need to be executed. // to assist scheduling and no need to be executed.
size_t id_; size_t id_;
std::vector<size_t> next_instrs_in_different_thread;
std::vector<size_t> next_instrs_in_same_thread;
std::unique_ptr<EventInter> event_to_record_;
std::vector<EventInter> events_to_wait_;
OpFuncNode op_func_node_; OpFuncNode op_func_node_;
const platform::DeviceContext& dev_ctx_; // not owned const platform::DeviceContext& dev_ctx_; // not owned
const Priority priority_; const Priority priority_;
...@@ -386,11 +388,6 @@ class Instruction { ...@@ -386,11 +388,6 @@ class Instruction {
std::vector<size_t> gc_check_vars_; std::vector<size_t> gc_check_vars_;
NextInstructionList next_instruction_;
std::vector<EventInter> intput_events_;
std::vector<EventInter> output_events_;
std::vector<std::pair<Variable*, Variable*>> vec_inplace_in_to_out_; std::vector<std::pair<Variable*, Variable*>> vec_inplace_in_to_out_;
}; };
......
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/stream_analyzer.h"
#include <future>
#include <unordered_set>
#include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
class ContextManager {
public:
using DeviceContextMap =
std::map<Place,
std::shared_future<std::unique_ptr<platform::DeviceContext>>>;
static ContextManager& Instance() {
static ContextManager* ctx_manager = new ContextManager;
return *ctx_manager;
}
std::shared_future<std::unique_ptr<platform::DeviceContext>> Get(
const std::string& type, const platform::Place& place) {
std::lock_guard<std::mutex> lk(ctx_mtx_);
VLOG(6) << "Get dev_ctx for " << type << " - " << place;
DeviceContextMap& ctxs = ctx_pool_[type];
if (ctxs.find(place) == ctxs.end()) {
platform::EmplaceDeviceContexts(
&ctxs,
{place},
/*disable_setting_default_stream_for_allocator=*/true);
}
return ctxs[place];
}
private:
ContextManager() {}
DISABLE_COPY_AND_ASSIGN(ContextManager);
std::mutex ctx_mtx_;
std::unordered_map<std::string, DeviceContextMap> ctx_pool_;
};
/*
* Parse the var_ids that need to be associated with an event.
* The caller should guarantee front_op and back_op satisfy the
* following conditions:
* 1. kQueueSync -> kQueueAsync
* 2. kQueueAsync -> kQueueSync
*
* For example: matmul(gpu) -> out_var -> memcpy_d2h
* out_var should be associated with an event.
*
* NOTE(zhiqiu): There are two special case that no event is needed:
* 1. the variable is marked as NoDataTransformVar
* 2. the variable is marked as NoNeedDataBuffer
*/
std::vector<size_t> StreamAnalyzer::GetNeedEventVarIds(
const Instruction& cur_instr, const Instruction& next_instr) {
std::unordered_set<size_t> unique_var_ids;
for (auto& item : cur_instr.Outputs()) {
unique_var_ids.insert(item.second.begin(), item.second.end());
}
auto is_no_need_buffer = [&next_instr](std::string name) {
auto* op = next_instr.OpBase();
auto& inferer = op->Info().NoNeedBufferVarsInferer();
if (inferer) {
auto no_need_buffer_ins =
inferer(op->Inputs(), op->Outputs(), op->Attrs());
return no_need_buffer_ins.count(name) != 0;
}
return false;
};
auto is_shape_op = [](std::string op_name) {
if (op_name == "shape") {
return true;
}
return false;
};
bool is_memcpy =
interpreter::IsMemcpyOp(cur_instr) || interpreter::IsMemcpyOp(next_instr);
std::vector<size_t> need_event_var_ids;
for (auto& item : next_instr.Inputs()) {
for (auto var_id : item.second) {
if (unique_var_ids.count(var_id) > 0) {
if (is_memcpy || is_shape_op(next_instr.OpBase()->Type())) {
if (next_instr.NoDataTransformVars().count(var_id)) {
VLOG(4) << "Skip inserting event at variable " << item.first
<< " of operator " << next_instr.OpBase()->Type()
<< " since it is NoDataTransform";
continue;
}
if (is_no_need_buffer(item.first)) {
VLOG(4) << "Skip inserting event at variable " << item.first
<< " of operator " << next_instr.OpBase()->Type()
<< " since it is NoNeedBufferVar";
continue;
}
}
need_event_var_ids.push_back(var_id);
}
}
}
return need_event_var_ids;
}
void StreamAnalyzer::ConstructEventForVar(
const std::vector<size_t>& new_event_var_id,
Instruction* next_instr,
platform::DeviceType waiter_type,
const platform::Place& place) {
for (auto var_id : new_event_var_id) {
if (var_id2event_.count(var_id) == 0) {
auto device_event = std::make_shared<platform::DeviceEvent>(
place, platform::GenerateDeviceEventFlag());
var_id2event_.emplace(var_id, std::move(device_event));
}
// Add events for next_instr.inputs
next_instr->AddInputEvent(var_id, var_id2event_.at(var_id), waiter_type);
}
}
void StreamAnalyzer::Schedule(const std::vector<size_t>& downstream_ops,
std::vector<Instruction>* instructions,
size_t op_index) {
auto& cur_instr = instructions->at(op_index);
auto& next_instruction = cur_instr.NextInstructions();
std::vector<size_t> event_var_ids;
for (auto next_op_id : downstream_ops) {
auto& next_instr = instructions->at(next_op_id);
if (IsDirectRun(cur_instr, next_instr)) {
VLOG(4) << "DirectRun: " << cur_instr.OpBase()->Type() << "->"
<< next_instr.OpBase()->Type();
next_instruction.AddDirectRun(next_op_id);
} else {
// Always insert events between different stream
auto need_event_var_ids = GetNeedEventVarIds(cur_instr, next_instr);
event_var_ids.insert(event_var_ids.end(),
need_event_var_ids.begin(),
need_event_var_ids.end());
auto waiter_type = GetWaiterType(next_instr);
ConstructEventForVar(need_event_var_ids,
&next_instr,
waiter_type,
cur_instr.DeviceContext().GetPlace());
if (waiter_type == platform::kCPU) { // GPU -> CPU
VLOG(4) << "SyncRun: " << cur_instr.OpBase()->Type() << "->"
<< next_instr.OpBase()->Type();
next_instruction.AddSyncRun(next_op_id);
} else { // GPU -> GPU(different stream)
VLOG(4) << "EventRun: " << cur_instr.OpBase()->Type() << "->"
<< next_instr.OpBase()->Type();
next_instruction.ADDEventRun(next_op_id);
}
}
}
// Create events for these cross-stream vars
VLOG(3) << cur_instr.OpBase()->Type()
<< " event_var_ids.size: " << event_var_ids.size();
for (auto var_id : event_var_ids) {
cur_instr.AddOutputEvent(
var_id, var_id2event_.at(var_id), platform::kCUDA /*not used*/);
}
}
platform::DeviceContext* StreamAnalyzer::ParseDeviceContext(
const OpFuncNode& op_func_node) {
auto& op = op_func_node.operator_base_;
auto& op_type = op->Type();
const std::string& execution_stream = op_func_node.execution_stream_;
ContextManager& ctx_manager = ContextManager::Instance();
// only gpu/npu need update. xpu not need, because xpu memcpy op kernel is
// synchronous.
if (platform::is_gpu_place(place_) || platform::is_npu_place(place_) ||
platform::is_custom_place(place_)) {
VLOG(7) << "Parse DeviceContext for " << op_type
<< ", execution stream = " << execution_stream;
if (execution_stream != kDefaultStream) {
return ctx_manager
.Get(std::string(kCustomStream) + "-" + execution_stream, place_)
.get()
.get();
}
if (op_type == interpreter::kMemcpyD2H) {
return ctx_manager.Get(std::string(kD2HStream), place_).get().get();
} else if (op_type == interpreter::kMemcpyH2D) {
return ctx_manager.Get(std::string(kH2DStream), place_).get().get();
}
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
// NOTE(Ruibiao): Here supports multi-stream overlap for c_allreduce_sum
// with use_cal_stream==false by returning a device context getting from the
// global NCCLCommContext instance. Because when use_calc_stream==false, in
// OP kernel, the NCCL communication will be launched to the stream directly
// getting from the global NCCLCommContext instance rather than the
// DeviceContext passed from executor (see CAllReduceOpCUDAKernel in
// c_allreduce_op.h). Now it is just a temporary solution for ONLY
// c_allreduce_sum which is used in ResNet50 distributed training.
if (op_type == "c_allreduce_sum" &&
op->Attr<bool>("use_calc_stream") == false) {
int ring_id = op->Attr<int>("ring_id");
return platform::NCCLCommContext::Instance()
.Get(ring_id, place_)
->dev_context();
}
#endif
}
return op_func_node.dev_ctx_;
}
/*
* NOTE(dev): The following cases are considered as directly run:
*
* 0. in XPU place. because xpu memcpy op kernel is synchronous.
* 1. with same dev_ctx_, such as: CPU -> CPU, GPU -> GPU
* 2. CPU -> any (it is possible: CPU op->VAR->GPU op, when var is no need
* buffer or no need data transform)
* 3. D2H -> CPU
* 4. CPU -> H2D
*/
bool StreamAnalyzer::IsDirectRun(Instruction& cur_instr,
const Instruction& next_instr) {
if ((cur_instr.KernelType() == OpFuncType::kQueueSync) &&
(next_instr.KernelType() == OpFuncType::kQueueSync)) {
return true;
}
if (cur_instr.KernelType() == next_instr.KernelType() &&
(&cur_instr.DeviceContext() == &next_instr.DeviceContext())) {
return true;
}
// xpu&ipu memcpy kerenl is synchronous.
if (platform::is_ipu_place(place_) || platform::is_xpu_place(place_)) {
return true;
}
// npu d2h kernel is asynchronous.
if (platform::is_npu_place(place_) || platform::is_custom_place(place_)) {
return interpreter::IsCpuOp(cur_instr) ||
interpreter::IsMemcpyH2D(next_instr);
}
// gpu or cpu
return interpreter::IsCpuOp(cur_instr) ||
interpreter::IsMemcpyD2H(cur_instr) ||
interpreter::IsMemcpyH2D(next_instr);
}
platform::DeviceType StreamAnalyzer::GetWaiterType(const Instruction& instr) {
if (instr.KernelType() == OpFuncType::kQueueSync) {
return platform::kCPU;
} else {
if (platform::is_xpu_place(place_)) {
return platform::kXPU;
} else if (platform::is_npu_place(place_)) {
return platform::kNPU;
} else if (platform::is_custom_place(place_)) {
return platform::kCUSTOM_DEVICE;
}
return platform::kCUDA;
}
}
} // namespace framework
} // namespace paddle
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册