未验证 提交 2ee32028 编写于 作者: R Ruibiao Chen 提交者: GitHub

Add dependency for read op in standalone executor (#44362)

* Add dependency for read op in standalone executor

* Fix CI errors

* Add UT

* add_dependency -> dependency_utils

* Fix CI errors
上级 3ed53280
add_subdirectory(workqueue)
add_subdirectory(garbage_collector) add_subdirectory(garbage_collector)
add_subdirectory(interpreter)
add_subdirectory(workqueue)
set(STANDALONE_EXECUTOR_SRCS set(STANDALONE_EXECUTOR_SRCS
data_transfer.cc data_transfer.cc
...@@ -11,8 +12,9 @@ set(STANDALONE_EXECUTOR_SRCS ...@@ -11,8 +12,9 @@ set(STANDALONE_EXECUTOR_SRCS
standalone_executor.cc) standalone_executor.cc)
set(STANDALONE_EXECUTOR_DEPS set(STANDALONE_EXECUTOR_DEPS
op_registry dependency_utils
device_context device_context
op_registry
scope scope
framework_proto framework_proto
data_feed_proto data_feed_proto
...@@ -36,9 +38,6 @@ set(STANDALONE_EXECUTOR_DEPS ...@@ -36,9 +38,6 @@ set(STANDALONE_EXECUTOR_DEPS
enforce enforce
scope scope
glog glog
enforce
glog
scope
workqueue workqueue
interpretercore_event_garbage_collector interpretercore_event_garbage_collector
${DEVICE_EVENT_LIBS} ${DEVICE_EVENT_LIBS}
......
cc_library(
dependency_utils
SRCS dependency_utils.cc
DEPS operator)
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h"
#include <queue>
namespace paddle {
namespace framework {
namespace interpreter {
void AddDownstreamOp(int prior_op_idx,
int posterior_op_idx,
std::map<int, std::list<int>>* op_downstream_map,
const std::vector<std::vector<bool>>* op_happens_before) {
if (op_downstream_map->find(prior_op_idx) == op_downstream_map->end()) {
op_downstream_map->emplace(std::make_pair(prior_op_idx, std::list<int>()));
} else {
if (op_happens_before != nullptr) {
for (int op_idx : op_downstream_map->at(prior_op_idx)) {
if (op_happens_before->at(op_idx).at(posterior_op_idx)) {
VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx
<< "->" << posterior_op_idx << ", skip adding "
<< prior_op_idx << "->" << posterior_op_idx;
return;
}
}
}
}
op_downstream_map->at(prior_op_idx).push_back(posterior_op_idx);
}
// check whether exists prior_op -> ... -> posterior_op to avoid building loops
bool IsDependency(int prior_op_idx,
int posterior_op_idx,
const std::map<int, std::list<int>>& downstream_map) {
std::queue<int> q;
q.push(prior_op_idx);
while (!q.empty()) {
int op_idx = q.front();
q.pop();
auto it = downstream_map.find(op_idx);
if (it != downstream_map.end()) {
for (int downstream_op_idx : it->second) {
if (downstream_op_idx == posterior_op_idx) {
return true;
}
// no need for double enqueue checking since DAG is assumed
q.push(downstream_op_idx);
}
}
}
return false;
}
void AddDependencyForReadOp(
const std::vector<Instruction>& vec_instruction,
std::map<int, std::list<int>>* downstream_map,
const std::vector<std::vector<bool>>* op_happens_before) {
size_t op_num = vec_instruction.size();
std::vector<bool> is_startup_ops(op_num, true);
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
auto it = downstream_map->find(op_idx);
if (it != downstream_map->end()) {
for (size_t downstream_op_idx : it->second) {
is_startup_ops[downstream_op_idx] = false;
}
}
}
std::vector<size_t> read_ops;
std::vector<size_t> startup_ops;
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == "read") {
read_ops.push_back(op_idx);
}
if (is_startup_ops[op_idx]) {
startup_ops.push_back(op_idx);
}
}
for (size_t read_op_idx : read_ops) {
for (size_t downstream_op_idx : startup_ops) {
if (read_op_idx != downstream_op_idx &&
!IsDependency(downstream_op_idx, read_op_idx, *downstream_map))
AddDownstreamOp(
read_op_idx, downstream_op_idx, downstream_map, op_happens_before);
VLOG(4) << "Add depend from "
<< vec_instruction[read_op_idx].OpBase()->Type() << "("
<< read_op_idx << ") to "
<< vec_instruction[downstream_op_idx].OpBase()->Type() << "("
<< downstream_op_idx << ")";
}
}
}
} // namespace interpreter
} // namespace framework
} // namespace paddle
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file provides some dependency adding function to handle the implicit
// dependency that cannot be explicitly expresed by a Program. It is a
// compromise of the incomplete expression ability of the Program. Do not add
// too many functions here at will, that will bring great burden to the
// Interpretercore.
// TODO(Ruibiao):
// 1. Move other dependency adding codes from interpretercore_util.cc to
// dependency_utils.cc
// 2. Move other Interpretercore related codes to directory
// new_executor/interpreter
// 3. Try to remove parameter op_happens_before from the dependency adding
// function
#pragma once
#include <map>
#include <vector>
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
namespace paddle {
namespace framework {
namespace interpreter {
// equivalent to add_reader_dependency_pass
void AddDependencyForReadOp(
const std::vector<Instruction>& vec_instruction,
std::map<int, std::list<int>>* downstream_map,
const std::vector<std::vector<bool>>* op_happens_before = nullptr);
void AddDownstreamOp(
int prior_op_idx,
int posterior_op_idx,
std::map<int, std::list<int>>* op_downstream_map,
const std::vector<std::vector<bool>>* op_happens_before = nullptr);
} // namespace interpreter
} // namespace framework
} // namespace paddle
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/details/nan_inf_utils.h"
#include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/new_executor/data_transfer.h" #include "paddle/fluid/framework/new_executor/data_transfer.h"
#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h"
#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" #include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h"
#include "paddle/fluid/operators/controlflow/recurrent_op_helper.h" #include "paddle/fluid/operators/controlflow/recurrent_op_helper.h"
#include "paddle/fluid/operators/controlflow/while_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h"
...@@ -704,33 +705,6 @@ void update_var_min_rw_op(const std::map<int, std::set<int>>& op2dependences, ...@@ -704,33 +705,6 @@ void update_var_min_rw_op(const std::map<int, std::set<int>>& op2dependences,
var2min_rw_op->at(rw_var).push_back(cur_op); var2min_rw_op->at(rw_var).push_back(cur_op);
} }
void AddDownstreamOp(int prior_op_idx,
int posterior_op_idx,
std::map<int, std::list<int>>* op_downstream_map) {
if (op_downstream_map->find(prior_op_idx) == op_downstream_map->end()) {
op_downstream_map->emplace(std::make_pair(prior_op_idx, std::list<int>()));
}
op_downstream_map->at(prior_op_idx).push_back(posterior_op_idx);
}
void AddDownstreamOp(int prior_op_idx,
int posterior_op_idx,
std::map<int, std::list<int>>* op_downstream_map,
const std::vector<std::vector<bool>>& op_happens_before) {
if (op_downstream_map->find(prior_op_idx) != op_downstream_map->end()) {
for (int op_idx : op_downstream_map->at(prior_op_idx)) {
if (op_happens_before[op_idx][posterior_op_idx]) {
VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx
<< "->" << posterior_op_idx << ", skip adding " << prior_op_idx
<< "->" << posterior_op_idx;
return;
}
}
}
AddDownstreamOp(prior_op_idx, posterior_op_idx, op_downstream_map);
}
size_t CountDownstreamMap(const std::map<int, std::list<int>>& downstream_map) { size_t CountDownstreamMap(const std::map<int, std::list<int>>& downstream_map) {
size_t count = 0; size_t count = 0;
for (auto pair : downstream_map) { for (auto pair : downstream_map) {
...@@ -972,7 +946,7 @@ std::map<int, std::list<int>> build_op_downstream_map( ...@@ -972,7 +946,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) { if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) {
if (dependence_op_idx != -1) { if (dependence_op_idx != -1) {
AddDownstreamOp( AddDownstreamOp(
dependence_op_idx, op_idx, &op_downstream_map, *op_happens_before); dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
} }
dependence_op_idx = op_idx; dependence_op_idx = op_idx;
} }
...@@ -999,7 +973,7 @@ std::map<int, std::list<int>> build_op_downstream_map( ...@@ -999,7 +973,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) { if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) {
if (dependence_op_idx != -1) { if (dependence_op_idx != -1) {
AddDownstreamOp( AddDownstreamOp(
dependence_op_idx, op_idx, &op_downstream_map, *op_happens_before); dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
VLOG(4) << "Add depend from " VLOG(4) << "Add depend from "
<< vec_instruction[dependence_op_idx].OpBase()->Type() << " to " << vec_instruction[dependence_op_idx].OpBase()->Type() << " to "
<< vec_instruction[op_idx].OpBase()->Type(); << vec_instruction[op_idx].OpBase()->Type();
...@@ -1028,7 +1002,7 @@ std::map<int, std::list<int>> build_op_downstream_map( ...@@ -1028,7 +1002,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
<< vec_instruction[dependence_op_idx].OpBase()->Type() << " to " << vec_instruction[dependence_op_idx].OpBase()->Type() << " to "
<< vec_instruction[op_idx].OpBase()->Type(); << vec_instruction[op_idx].OpBase()->Type();
AddDownstreamOp( AddDownstreamOp(
dependence_op_idx, op_idx, &op_downstream_map, *op_happens_before); dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
} }
} }
} }
...@@ -1088,7 +1062,7 @@ std::map<int, std::list<int>> build_op_downstream_map( ...@@ -1088,7 +1062,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
AddDownstreamOp(j, AddDownstreamOp(j,
first_read_fused_out_op, first_read_fused_out_op,
&op_downstream_map, &op_downstream_map,
*op_happens_before); op_happens_before);
VLOG(4) << j << " -> " << first_read_fused_out_op; VLOG(4) << j << " -> " << first_read_fused_out_op;
VLOG(4) VLOG(4)
<< "Add depend from " << vec_instruction[j].OpBase()->Type() << "Add depend from " << vec_instruction[j].OpBase()->Type()
...@@ -1121,7 +1095,7 @@ std::map<int, std::list<int>> build_op_downstream_map( ...@@ -1121,7 +1095,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
for (auto var_id : outputs) { for (auto var_id : outputs) {
if (is_read(vec_instruction[j], var_id)) { if (is_read(vec_instruction[j], var_id)) {
AddDownstreamOp(target, j, &op_downstream_map, *op_happens_before); AddDownstreamOp(target, j, &op_downstream_map, op_happens_before);
VLOG(4) << target << " -> " << j; VLOG(4) << target << " -> " << j;
VLOG(4) << "Add depend from " VLOG(4) << "Add depend from "
<< vec_instruction[target].OpBase()->Type() << " to " << vec_instruction[target].OpBase()->Type() << " to "
...@@ -1137,10 +1111,8 @@ std::map<int, std::list<int>> build_op_downstream_map( ...@@ -1137,10 +1111,8 @@ std::map<int, std::list<int>> build_op_downstream_map(
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (!IsCpuOp(vec_instruction[op_idx])) { if (!IsCpuOp(vec_instruction[op_idx])) {
if (dependence_op_idx != -1) { if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, AddDownstreamOp(
op_idx, dependence_op_idx, op_idx, &op_downstream_map, op_happens_before);
&op_downstream_map,
*op_happens_before);
VLOG(4) << "Add depend from " VLOG(4) << "Add depend from "
<< vec_instruction[dependence_op_idx].OpBase()->Type() << "(" << vec_instruction[dependence_op_idx].OpBase()->Type() << "("
<< dependence_op_idx << ") to " << dependence_op_idx << ") to "
...@@ -1152,6 +1124,10 @@ std::map<int, std::list<int>> build_op_downstream_map( ...@@ -1152,6 +1124,10 @@ std::map<int, std::list<int>> build_op_downstream_map(
} }
} }
AddDependencyForReadOp(
vec_instruction, &op_downstream_map, op_happens_before);
VLOG(8) << "build_op_downstream_map finished";
VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map); VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map);
VLOG(8) << "downstream_map: " << std::endl VLOG(8) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map); << StringizeDownstreamMap(op_downstream_map);
......
...@@ -12,12 +12,6 @@ ...@@ -12,12 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
/*************************************************************************
> File Name: interpretercore_util.h
> Author: guanshanshan@baidu.com
> Created Time: Fri 23 Jul 2021 06:19:19 AM UTC
************************************************************************/
#pragma once #pragma once
#include <chrono> #include <chrono>
...@@ -48,7 +42,6 @@ using AtomicVectorSizeT = std::vector<std::atomic<size_t>>; ...@@ -48,7 +42,6 @@ using AtomicVectorSizeT = std::vector<std::atomic<size_t>>;
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace interpreter { namespace interpreter {
class AsyncWorkQueue { class AsyncWorkQueue {
......
...@@ -1713,3 +1713,10 @@ if(WITH_CINN AND WITH_TESTING) ...@@ -1713,3 +1713,10 @@ if(WITH_CINN AND WITH_TESTING)
FLAGS_allow_cinn_ops="conv2d;conv2d_grad;elementwise_add;elementwise_add_grad;relu;relu_grad;sum" FLAGS_allow_cinn_ops="conv2d;conv2d_grad;elementwise_add;elementwise_add_grad;relu;relu_grad;sum"
) )
endif() endif()
py_test_modules(
test_add_reader_dependency_for_interpretercore MODULES
test_add_reader_dependency ENVS FLAGS_CONVERT_GRAPH_TO_PROGRAM=true)
set_tests_properties(test_add_reader_dependency_for_interpretercore
PROPERTIES TIMEOUT 120)
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
disable_wingpu_test="^test_model$|\ disable_wingpu_test="^test_model$|\
^test_dataloader_early_reset$|\ ^test_dataloader_early_reset$|\
^test_add_reader_dependency$|\ ^test_add_reader_dependency$|\
^test_add_reader_dependency_for_interpretercore$|\
^test_decoupled_py_reader$|\ ^test_decoupled_py_reader$|\
^test_generator_dataloader$|\ ^test_generator_dataloader$|\
^test_parallel_dygraph_sync_batch_norm$|\ ^test_parallel_dygraph_sync_batch_norm$|\
...@@ -152,6 +153,7 @@ disable_win_inference_test="^trt_quant_int8_yolov3_r50_test$|\ ...@@ -152,6 +153,7 @@ disable_win_inference_test="^trt_quant_int8_yolov3_r50_test$|\
^test_multiprocess_dataloader_iterable_dataset_dynamic$|\ ^test_multiprocess_dataloader_iterable_dataset_dynamic$|\
^test_multiprocess_dataloader_iterable_dataset_static$|\ ^test_multiprocess_dataloader_iterable_dataset_static$|\
^test_add_reader_dependency$|\ ^test_add_reader_dependency$|\
^test_add_reader_dependency_for_interpretercore$|\
^test_compat$|\ ^test_compat$|\
^test_decoupled_py_reader$|\ ^test_decoupled_py_reader$|\
^test_generator_dataloader$|\ ^test_generator_dataloader$|\
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册