diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index fb884f54c5534e5af20626a847e814f81809e50a..0422b151ac9f4ba22768614d6cd70a782989b31e 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -1,5 +1,6 @@ -add_subdirectory(workqueue) add_subdirectory(garbage_collector) +add_subdirectory(interpreter) +add_subdirectory(workqueue) set(STANDALONE_EXECUTOR_SRCS data_transfer.cc @@ -11,8 +12,9 @@ set(STANDALONE_EXECUTOR_SRCS standalone_executor.cc) set(STANDALONE_EXECUTOR_DEPS - op_registry + dependency_utils device_context + op_registry scope framework_proto data_feed_proto @@ -36,9 +38,6 @@ set(STANDALONE_EXECUTOR_DEPS enforce scope glog - enforce - glog - scope workqueue interpretercore_event_garbage_collector ${DEVICE_EVENT_LIBS} diff --git a/paddle/fluid/framework/new_executor/interpreter/CMakeLists.txt b/paddle/fluid/framework/new_executor/interpreter/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..4a569575c73aca8670d962ca8a7be4a80ac95569 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpreter/CMakeLists.txt @@ -0,0 +1,4 @@ +cc_library( + dependency_utils + SRCS dependency_utils.cc + DEPS operator) diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_utils.cc b/paddle/fluid/framework/new_executor/interpreter/dependency_utils.cc new file mode 100644 index 0000000000000000000000000000000000000000..e3f88d54d91e16fbee0c4b13f61ff9115e8dc565 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpreter/dependency_utils.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h" + +#include + +namespace paddle { +namespace framework { +namespace interpreter { + +void AddDownstreamOp(int prior_op_idx, + int posterior_op_idx, + std::map>* op_downstream_map, + const std::vector>* op_happens_before) { + if (op_downstream_map->find(prior_op_idx) == op_downstream_map->end()) { + op_downstream_map->emplace(std::make_pair(prior_op_idx, std::list())); + } else { + if (op_happens_before != nullptr) { + for (int op_idx : op_downstream_map->at(prior_op_idx)) { + if (op_happens_before->at(op_idx).at(posterior_op_idx)) { + VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx + << "->" << posterior_op_idx << ", skip adding " + << prior_op_idx << "->" << posterior_op_idx; + return; + } + } + } + } + + op_downstream_map->at(prior_op_idx).push_back(posterior_op_idx); +} + +// check whether exists prior_op -> ... -> posterior_op to avoid building loops +bool IsDependency(int prior_op_idx, + int posterior_op_idx, + const std::map>& downstream_map) { + std::queue q; + q.push(prior_op_idx); + + while (!q.empty()) { + int op_idx = q.front(); + q.pop(); + + auto it = downstream_map.find(op_idx); + if (it != downstream_map.end()) { + for (int downstream_op_idx : it->second) { + if (downstream_op_idx == posterior_op_idx) { + return true; + } + + // no need for double enqueue checking since DAG is assumed + q.push(downstream_op_idx); + } + } + } + + return false; +} + +void AddDependencyForReadOp( + const std::vector& vec_instruction, + std::map>* downstream_map, + const std::vector>* op_happens_before) { + size_t op_num = vec_instruction.size(); + std::vector is_startup_ops(op_num, true); + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { + auto it = downstream_map->find(op_idx); + if (it != downstream_map->end()) { + for (size_t downstream_op_idx : it->second) { + is_startup_ops[downstream_op_idx] = false; + } + } + } + + std::vector read_ops; + std::vector startup_ops; + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { + if (vec_instruction[op_idx].OpBase()->Type() == "read") { + read_ops.push_back(op_idx); + } + + if (is_startup_ops[op_idx]) { + startup_ops.push_back(op_idx); + } + } + + for (size_t read_op_idx : read_ops) { + for (size_t downstream_op_idx : startup_ops) { + if (read_op_idx != downstream_op_idx && + !IsDependency(downstream_op_idx, read_op_idx, *downstream_map)) + AddDownstreamOp( + read_op_idx, downstream_op_idx, downstream_map, op_happens_before); + VLOG(4) << "Add depend from " + << vec_instruction[read_op_idx].OpBase()->Type() << "(" + << read_op_idx << ") to " + << vec_instruction[downstream_op_idx].OpBase()->Type() << "(" + << downstream_op_idx << ")"; + } + } +} + +} // namespace interpreter +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/dependency_utils.h b/paddle/fluid/framework/new_executor/interpreter/dependency_utils.h new file mode 100644 index 0000000000000000000000000000000000000000..206a519f55a04f1a12c0575009c36e95e7f0bfe1 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpreter/dependency_utils.h @@ -0,0 +1,54 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file provides some dependency adding function to handle the implicit +// dependency that cannot be explicitly expresed by a Program. It is a +// compromise of the incomplete expression ability of the Program. Do not add +// too many functions here at will, that will bring great burden to the +// Interpretercore. + +// TODO(Ruibiao): +// 1. Move other dependency adding codes from interpretercore_util.cc to +// dependency_utils.cc +// 2. Move other Interpretercore related codes to directory +// new_executor/interpreter +// 3. Try to remove parameter op_happens_before from the dependency adding +// function + +#pragma once + +#include +#include + +#include "paddle/fluid/framework/new_executor/new_executor_defs.h" + +namespace paddle { +namespace framework { +namespace interpreter { + +// equivalent to add_reader_dependency_pass +void AddDependencyForReadOp( + const std::vector& vec_instruction, + std::map>* downstream_map, + const std::vector>* op_happens_before = nullptr); + +void AddDownstreamOp( + int prior_op_idx, + int posterior_op_idx, + std::map>* op_downstream_map, + const std::vector>* op_happens_before = nullptr); + +} // namespace interpreter +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index acbcf1da4c5e3e4fddf1e5aad074f3e4d2ca8fdf..35f02189ca97ed198a041df6da10e88061253d01 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -18,6 +18,7 @@ #include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/new_executor/data_transfer.h" +#include "paddle/fluid/framework/new_executor/interpreter/dependency_utils.h" #include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" #include "paddle/fluid/operators/controlflow/recurrent_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h" @@ -704,33 +705,6 @@ void update_var_min_rw_op(const std::map>& op2dependences, var2min_rw_op->at(rw_var).push_back(cur_op); } -void AddDownstreamOp(int prior_op_idx, - int posterior_op_idx, - std::map>* op_downstream_map) { - if (op_downstream_map->find(prior_op_idx) == op_downstream_map->end()) { - op_downstream_map->emplace(std::make_pair(prior_op_idx, std::list())); - } - op_downstream_map->at(prior_op_idx).push_back(posterior_op_idx); -} - -void AddDownstreamOp(int prior_op_idx, - int posterior_op_idx, - std::map>* op_downstream_map, - const std::vector>& op_happens_before) { - if (op_downstream_map->find(prior_op_idx) != op_downstream_map->end()) { - for (int op_idx : op_downstream_map->at(prior_op_idx)) { - if (op_happens_before[op_idx][posterior_op_idx]) { - VLOG(7) << "Find dependencies " << prior_op_idx << "->" << op_idx - << "->" << posterior_op_idx << ", skip adding " << prior_op_idx - << "->" << posterior_op_idx; - return; - } - } - } - - AddDownstreamOp(prior_op_idx, posterior_op_idx, op_downstream_map); -} - size_t CountDownstreamMap(const std::map>& downstream_map) { size_t count = 0; for (auto pair : downstream_map) { @@ -972,7 +946,7 @@ std::map> build_op_downstream_map( if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) { if (dependence_op_idx != -1) { AddDownstreamOp( - dependence_op_idx, op_idx, &op_downstream_map, *op_happens_before); + dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); } dependence_op_idx = op_idx; } @@ -999,7 +973,7 @@ std::map> build_op_downstream_map( if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) { if (dependence_op_idx != -1) { AddDownstreamOp( - dependence_op_idx, op_idx, &op_downstream_map, *op_happens_before); + dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); VLOG(4) << "Add depend from " << vec_instruction[dependence_op_idx].OpBase()->Type() << " to " << vec_instruction[op_idx].OpBase()->Type(); @@ -1028,7 +1002,7 @@ std::map> build_op_downstream_map( << vec_instruction[dependence_op_idx].OpBase()->Type() << " to " << vec_instruction[op_idx].OpBase()->Type(); AddDownstreamOp( - dependence_op_idx, op_idx, &op_downstream_map, *op_happens_before); + dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); } } } @@ -1088,7 +1062,7 @@ std::map> build_op_downstream_map( AddDownstreamOp(j, first_read_fused_out_op, &op_downstream_map, - *op_happens_before); + op_happens_before); VLOG(4) << j << " -> " << first_read_fused_out_op; VLOG(4) << "Add depend from " << vec_instruction[j].OpBase()->Type() @@ -1121,7 +1095,7 @@ std::map> build_op_downstream_map( for (auto var_id : outputs) { if (is_read(vec_instruction[j], var_id)) { - AddDownstreamOp(target, j, &op_downstream_map, *op_happens_before); + AddDownstreamOp(target, j, &op_downstream_map, op_happens_before); VLOG(4) << target << " -> " << j; VLOG(4) << "Add depend from " << vec_instruction[target].OpBase()->Type() << " to " @@ -1137,10 +1111,8 @@ std::map> build_op_downstream_map( for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { if (!IsCpuOp(vec_instruction[op_idx])) { if (dependence_op_idx != -1) { - AddDownstreamOp(dependence_op_idx, - op_idx, - &op_downstream_map, - *op_happens_before); + AddDownstreamOp( + dependence_op_idx, op_idx, &op_downstream_map, op_happens_before); VLOG(4) << "Add depend from " << vec_instruction[dependence_op_idx].OpBase()->Type() << "(" << dependence_op_idx << ") to " @@ -1152,6 +1124,10 @@ std::map> build_op_downstream_map( } } + AddDependencyForReadOp( + vec_instruction, &op_downstream_map, op_happens_before); + + VLOG(8) << "build_op_downstream_map finished"; VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map); VLOG(8) << "downstream_map: " << std::endl << StringizeDownstreamMap(op_downstream_map); diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 7234cccb57b481408260f4cd953ae752a233cd90..6296db41db9cb1a33aae3d0293fad2bd63e95c0d 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -12,12 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -/************************************************************************* - > File Name: interpretercore_util.h - > Author: guanshanshan@baidu.com - > Created Time: Fri 23 Jul 2021 06:19:19 AM UTC - ************************************************************************/ - #pragma once #include @@ -48,7 +42,6 @@ using AtomicVectorSizeT = std::vector>; namespace paddle { namespace framework { - namespace interpreter { class AsyncWorkQueue { diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 28bd796efdcee30639cb8ee04a56c82544567fd6..d8f4ecf6731de071ee2ee4f7a9d750dfb9aa8a8e 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -1713,3 +1713,10 @@ if(WITH_CINN AND WITH_TESTING) FLAGS_allow_cinn_ops="conv2d;conv2d_grad;elementwise_add;elementwise_add_grad;relu;relu_grad;sum" ) endif() + +py_test_modules( + test_add_reader_dependency_for_interpretercore MODULES + test_add_reader_dependency ENVS FLAGS_CONVERT_GRAPH_TO_PROGRAM=true) + +set_tests_properties(test_add_reader_dependency_for_interpretercore + PROPERTIES TIMEOUT 120) diff --git a/tools/windows/run_unittests.sh b/tools/windows/run_unittests.sh index 34c3d4156a818e4591b140d56ad6517a477514c1..3c1a95f3691d7542b7dd9e35fec33362ae93c916 100644 --- a/tools/windows/run_unittests.sh +++ b/tools/windows/run_unittests.sh @@ -18,6 +18,7 @@ disable_wingpu_test="^test_model$|\ ^test_dataloader_early_reset$|\ ^test_add_reader_dependency$|\ +^test_add_reader_dependency_for_interpretercore$|\ ^test_decoupled_py_reader$|\ ^test_generator_dataloader$|\ ^test_parallel_dygraph_sync_batch_norm$|\ @@ -152,6 +153,7 @@ disable_win_inference_test="^trt_quant_int8_yolov3_r50_test$|\ ^test_multiprocess_dataloader_iterable_dataset_dynamic$|\ ^test_multiprocess_dataloader_iterable_dataset_static$|\ ^test_add_reader_dependency$|\ +^test_add_reader_dependency_for_interpretercore$|\ ^test_compat$|\ ^test_decoupled_py_reader$|\ ^test_generator_dataloader$|\