From 752670e21b66ff1c121c423c630b5d95f6e086be Mon Sep 17 00:00:00 2001 From: Ruibiao Chen Date: Fri, 16 Jun 2023 19:04:22 +0800 Subject: [PATCH] Run plan in standalone executor (#54394) * Run plan in standalone executor * Update codes * Update atol and rtol for py3-CI * Add scope to cache key * Fix CI errors * Fix code style * Update codes * Remove fetch_name in standalone executor * Fix UT * Update codes * Fix new IR bug --- .../framework/new_executor/CMakeLists.txt | 4 +- .../new_executor/feed_fetch_utils.cc | 48 ++++ .../framework/new_executor/feed_fetch_utils.h | 29 +++ .../framework/new_executor/interpreter/job.h | 40 ++- .../new_executor/interpreter/plan.cc | 32 ++- .../framework/new_executor/interpreter/plan.h | 26 +- .../framework/new_executor/interpretercore.cc | 4 +- .../framework/new_executor/interpretercore.h | 4 +- .../new_executor/standalone_executor.cc | 167 ++++++------- .../new_executor/standalone_executor.h | 29 +-- .../ir/phi_kernel_adaptor/phi_kernel_util.cc | 2 +- paddle/fluid/pybind/pybind.cc | 46 ++-- .../passes/pipeline_scheduler_pass.py | 10 +- python/paddle/fluid/executor.py | 60 ++--- .../standalone_executor_new_ir_test.cc | 2 +- .../new_executor/standalone_executor_test.cc | 28 ++- test/ir/new_ir/test_standalone_new_ir.py | 4 +- ...t_standalone_executor_multi_micro_batch.py | 232 ++++++++++++++++++ .../test_standalone_executor_multi_program.py | 184 -------------- .../test_standalone_executor_plan.py | 11 +- tools/windows/run_unittests.sh | 2 + 21 files changed, 562 insertions(+), 402 deletions(-) create mode 100644 paddle/fluid/framework/new_executor/feed_fetch_utils.cc create mode 100644 paddle/fluid/framework/new_executor/feed_fetch_utils.h create mode 100644 test/standalone_executor/test_standalone_executor_multi_micro_batch.py delete mode 100644 test/standalone_executor/test_standalone_executor_multi_program.py diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index bfa2fc3bbb1..eedafa96d23 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -2,8 +2,8 @@ add_subdirectory(garbage_collector) add_subdirectory(interpreter) add_subdirectory(workqueue) -set(STANDALONE_EXECUTOR_SRCS interpretercore.cc new_executor_defs.cc - standalone_executor.cc) +set(STANDALONE_EXECUTOR_SRCS feed_fetch_utils.cc interpretercore.cc + new_executor_defs.cc standalone_executor.cc) set(STANDALONE_EXECUTOR_DEPS interpreter diff --git a/paddle/fluid/framework/new_executor/feed_fetch_utils.cc b/paddle/fluid/framework/new_executor/feed_fetch_utils.cc new file mode 100644 index 00000000000..fb9f4ebcfe9 --- /dev/null +++ b/paddle/fluid/framework/new_executor/feed_fetch_utils.cc @@ -0,0 +1,48 @@ +// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/new_executor/feed_fetch_utils.h" + +#include +#include + +#include "paddle/fluid/framework/new_executor/new_executor_defs.h" +#include "paddle/fluid/framework/program_desc.h" + +namespace paddle { +namespace framework { + +void SetColAttrForFetchOps(const interpreter::Job& job, + std::shared_ptr program_desc) { + const std::set& valid_feed_fetch_op_types = {"fetch", + "fetch_v2"}; + + const std::vector all_op_ids = job.AllFetchOpIds(); + for (int op_id : all_op_ids) { + int col_attr = job.ColAttrForFetchOp(op_id); + OpDesc* op_desc = program_desc->MutableBlock(0)->Op(op_id); + PADDLE_ENFORCE(valid_feed_fetch_op_types.find(op_desc->Type()) != + valid_feed_fetch_op_types.end(), + phi::errors::InvalidArgument( + "Op (%s) corressponding to feed_fetch_op_id (%d) is not " + "in valid_feed_fetch_op_types.", + op_desc->Type(), + op_id)); + + op_desc->SetAttr("col", col_attr); + } +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/feed_fetch_utils.h b/paddle/fluid/framework/new_executor/feed_fetch_utils.h new file mode 100644 index 00000000000..09364afd80c --- /dev/null +++ b/paddle/fluid/framework/new_executor/feed_fetch_utils.h @@ -0,0 +1,29 @@ +// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include + +#include "paddle/fluid/framework/new_executor/interpreter/plan.h" +#include "paddle/fluid/framework/program_desc.h" + +namespace paddle { +namespace framework { + +void SetColAttrForFetchOps(const interpreter::Job& job, + std::shared_ptr program_desc); + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/job.h b/paddle/fluid/framework/new_executor/interpreter/job.h index 2eab9bc9387..0342f632164 100644 --- a/paddle/fluid/framework/new_executor/interpreter/job.h +++ b/paddle/fluid/framework/new_executor/interpreter/job.h @@ -13,29 +13,57 @@ // limitations under the License. #pragma once +#include + +#include "paddle/phi/core/enforce.h" +#include "paddle/phi/core/errors.h" #include "paddle/phi/core/macros.h" namespace paddle { namespace framework { +namespace interpreter { class Job final { public: - explicit Job(const std::string& type) : type_(type), micro_batch_id_(-1) {} + explicit Job(const std::string& type) : type_(type), micro_batch_id_(0) {} ~Job() = default; - const std::string& GetJobType() const { return type_; } - int64_t GetMicroBatchId() const { return micro_batch_id_; } + const std::string& Type() const { return type_; } + + int ColAttrForFetchOp(int fetch_op_id) const { + return fetch_op_id_to_col_attr_.at(fetch_op_id); + } + + int64_t MicroBatchId() const { return micro_batch_id_; } + + std::vector AllFetchOpIds() const { + std::vector fetch_op_ids; + fetch_op_ids.reserve(fetch_op_id_to_col_attr_.size()); + for (auto& item : fetch_op_id_to_col_attr_) { + fetch_op_ids.push_back(item.first); + } + return fetch_op_ids; + } + + void SetColAttrForFetchOp(int fetch_op_id, int col_attr) { + fetch_op_id_to_col_attr_[fetch_op_id] = col_attr; + } void SetMicroBatchId(int64_t micro_batch_id) { + PADDLE_ENFORCE_GE( + micro_batch_id, + 0, + phi::errors::InvalidArgument( + "The micro_batch_id should be greater or equal to 0.")); micro_batch_id_ = micro_batch_id; } private: - DISABLE_COPY_AND_ASSIGN(Job); - - std::string type_; + const std::string type_; int64_t micro_batch_id_; + std::unordered_map fetch_op_id_to_col_attr_; }; +} // namespace interpreter } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/plan.cc b/paddle/fluid/framework/new_executor/interpreter/plan.cc index 0a4e465bd35..c6b0851b7d9 100644 --- a/paddle/fluid/framework/new_executor/interpreter/plan.cc +++ b/paddle/fluid/framework/new_executor/interpreter/plan.cc @@ -13,20 +13,42 @@ // limitations under the License. #include "paddle/fluid/framework/new_executor/interpreter/plan.h" -#include "paddle/fluid/framework/new_executor/interpreter/job.h" + #include "paddle/fluid/framework/program_desc.h" namespace paddle { namespace framework { +namespace interpreter { + +Plan::Plan(const std::vector>& job_list, + const std::unordered_map& type_to_program) + : job_list_(job_list), + type_to_program_(type_to_program), + micro_batch_num_(1) { + for (size_t i = 0; i < job_list_.size(); ++i) { + const auto& job = job_list_[i]; + PADDLE_ENFORCE(type_to_program_.find(job->Type()) != type_to_program_.end(), + phi::errors::InvalidArgument( + "The %d-th job (type:%s, micro_batch_id:%d) has no " + "corresponding Program.", + i, + job->Type(), + job->MicroBatchId())); -const std::vector>& Plan::GetJobList() const { + micro_batch_num_ = std::max(micro_batch_num_, job->MicroBatchId() + 1); + } +} + +const std::vector>& Plan::JobList() const { return job_list_; } -const std::unordered_map& Plan::GetTypeToProgram() - const { - return type_to_program_; +const ProgramDesc* Plan::Program(const std::string& job_type) const { + return type_to_program_.at(job_type); } +int64_t Plan::MicroBatchNum() const { return micro_batch_num_; } + +} // namespace interpreter } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpreter/plan.h b/paddle/fluid/framework/new_executor/interpreter/plan.h index 6c50b46f685..8837272a314 100644 --- a/paddle/fluid/framework/new_executor/interpreter/plan.h +++ b/paddle/fluid/framework/new_executor/interpreter/plan.h @@ -17,30 +17,34 @@ #include #include #include + +#include "paddle/fluid/framework/new_executor/interpreter/job.h" + +#include "paddle/fluid/framework/program_desc.h" #include "paddle/phi/core/macros.h" namespace paddle { namespace framework { - -class ProgramDesc; -class Job; +namespace interpreter { class Plan final { public: Plan(const std::vector>& job_list, - const std::unordered_map& type_to_program) - : job_list_(job_list), type_to_program_(type_to_program) {} + const std::unordered_map& type_to_program); ~Plan() = default; - const std::vector>& GetJobList() const; - const std::unordered_map& GetTypeToProgram() const; + const std::vector>& JobList() const; - private: - DISABLE_COPY_AND_ASSIGN(Plan); + const ProgramDesc* Program(const std::string& job_type) const; + + int64_t MicroBatchNum() const; - std::vector> job_list_; - std::unordered_map type_to_program_; + private: + const std::vector> job_list_; + const std::unordered_map type_to_program_; + int64_t micro_batch_num_; }; +} // namespace interpreter } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index f2f3b9f48fc..0bb1113e0d8 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -151,14 +151,14 @@ InterpreterCore::InterpreterCore(const platform::Place& place, InterpreterCore::InterpreterCore(const platform::Place& place, const BlockDesc& block, framework::Scope* scope, - ::ir::Program* ir_prog, + std::unique_ptr<::ir::Program> ir_prog, const ExecutionConfig& execution_config) : place_(place), block_(block), stream_analyzer_(place), execution_config_(execution_config), var_scope_(scope), - ir_program_(ir_prog) { + ir_program_(std::move(ir_prog)) { VLOG(4) << "InterpreterCore(): " << this << " on " << place_; static_build_ = FLAGS_new_executor_static_build && diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index afaa09db516..e676c0ea709 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -57,7 +57,7 @@ class InterpreterCore { InterpreterCore(const platform::Place& place, const BlockDesc& block, Scope* scope, - ::ir::Program* ir_prog, + std::unique_ptr<::ir::Program> ir_prog, const ExecutionConfig& execution_config = ExecutionConfig()); ~InterpreterCore(); @@ -201,7 +201,7 @@ class InterpreterCore { std::vector hookfuncs_; // The next only for new IR - ::ir::Program* ir_program_{nullptr}; + std::unique_ptr<::ir::Program> ir_program_{nullptr}; std::unordered_map<::ir::Value, std::string> value_2_var_name_map_; }; diff --git a/paddle/fluid/framework/new_executor/standalone_executor.cc b/paddle/fluid/framework/new_executor/standalone_executor.cc index 0a8a38b794b..73bb7568435 100644 --- a/paddle/fluid/framework/new_executor/standalone_executor.cc +++ b/paddle/fluid/framework/new_executor/standalone_executor.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/framework/new_executor/standalone_executor.h" +#include "paddle/fluid/framework/new_executor/feed_fetch_utils.h" #include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h" #include "paddle/fluid/platform/profiler/event_tracing.h" @@ -25,112 +26,98 @@ PHI_DECLARE_bool(enable_new_ir_in_executor); namespace paddle { namespace framework { StandaloneExecutor::StandaloneExecutor(const platform::Place& place, - const std::vector& programs) - : place_(place), programs_(programs) { - if (FLAGS_enable_new_ir_in_executor) { - for (size_t i = 0; i < programs_.size(); ++i) { - VLOG(6) << "begin to translate" << std::endl; - auto base_progrm = paddle::TranslateLegacyProgramToProgram(programs_[i]); + const interpreter::Plan& plan, + Scope* scope) + : place_(place), plan_(plan), scope_(scope) { + int64_t micro_batch_num = plan_.MicroBatchNum(); + for (int64_t i = 0; i < micro_batch_num; ++i) { + micro_batch_scopes_.emplace_back(&scope->NewScope()); + } + + std::stringstream ss; + ss << "Create " << micro_batch_num << " micro_batch_scopes for scope " + << scope_ << " : "; + for (Scope* scope : micro_batch_scopes_) { + ss << scope << ", "; + } + VLOG(6) << ss.str(); + + const auto& jobs = plan_.JobList(); + for (const auto& job : jobs) { + const std::string& job_type = job->Type(); + std::shared_ptr program = + std::make_shared(*(plan_.Program(job_type))); + SetColAttrForFetchOps(*job, program); + + int64_t micro_batch_id = job->MicroBatchId(); + PADDLE_ENFORCE( + micro_batch_id >= 0 && micro_batch_id < micro_batch_num, + phi::errors::Unavailable("The micro batch id (%lld) out of bound, " + "which should be in the range of [0, %lld].", + micro_batch_id, + micro_batch_num)); + + interpreter::ExecutionConfig execution_config; + execution_config.create_local_scope = false; + // TODO(Ruibiao): hack skip gc all vars for multiple jobs, improve it later + if (jobs.size() > 1) { + for (VarDesc* var : program->Block(0).AllVars()) { + execution_config.skip_gc_vars.insert(var->Name()); + } + } + if (FLAGS_enable_new_ir_in_executor) { + VLOG(6) << "begin to translate" << std::endl; + auto base_progrm = paddle::TranslateLegacyProgramToProgram(*program); auto kernel_program = paddle::dialect::PdOpLowerToKernelPass(base_progrm.get()); - - ir_programs_.emplace_back(std::move(kernel_program)); + interpretercores_.emplace_back( + std::make_unique(place_, + program->Block(0), + scope_, + std::move(kernel_program), + execution_config)); + } else { + interpretercores_.emplace_back( + std::make_unique(place_, + program->Block(0), + micro_batch_scopes_[micro_batch_id], + execution_config)); + interpretercores_.back()->SetCopyProgram(program); } } } paddle::framework::FetchList StandaloneExecutor::Run( - Scope* scope, - const std::vector& feed_names, - const std::vector& fetch_names) { + const std::vector& feed_names) { platform::RecordEvent record_event( "StandaloneExecutor::run", platform::TracerEventType::UserDefined, 1); - // TODO(Ruibiao): Unified single and multiple program run - if (programs_.size() == 1) { // run single program - VLOG(6) << "Run single program"; - auto core = GetInterpreterCore(scope, - programs_.at(0), - feed_names, - fetch_names, - 0, - interpreter::ExecutionConfig()); - VLOG(4) << "StandaloneExecutor: " << this << ", InterpreterCore: " << core; - - return core->Run(feed_names); - } else { // run multiple programs - VLOG(6) << "Run multiple program, programs_.size() " << programs_.size(); - FetchList merged_fetch_list; - for (size_t program_idx = 0; program_idx < programs_.size(); - ++program_idx) { - const ProgramDesc& program = programs_[program_idx]; - - interpreter::ExecutionConfig execution_config; - execution_config.create_local_scope = false; - // TODO(Ruibiao): hack skip gc for all vars, improve it later - std::set skip_gc_vars; - for (VarDesc* var : program.Block(0).AllVars()) { - execution_config.skip_gc_vars.insert(var->Name()); - } - - // TODO(Ruibiao): ONLY support feeds data in the first program for now - const std::vector& real_feed_names = - (program_idx == 0 ? feed_names : std::vector()); - auto core = GetInterpreterCore(scope, - program, - real_feed_names, - fetch_names, - program_idx, - execution_config); - const FetchList& fetch_list = core->Run(real_feed_names); - std::move(fetch_list.begin(), - fetch_list.end(), - std::back_inserter(merged_fetch_list)); - } - return merged_fetch_list; + if (plan_.MicroBatchNum() > 1) { + PADDLE_ENFORCE_EQ(feed_names.size(), + 0, + phi::errors::Unimplemented( + "Unsupported feed data for multiple micro_batch, " + "please use non-iterative DataLoader for now.")); } -} -std::shared_ptr StandaloneExecutor::GetInterpreterCore( - Scope* scope, - const ProgramDesc& program, - const std::vector& feed_names, - const std::vector& fetch_names, - size_t program_idx, - interpreter::ExecutionConfig execution_config) { - std::ostringstream oss; - oss << "prog_idx:" << program_idx << ","; - oss << "feed:"; - for (auto& feedname : feed_names) { - oss << feedname << ","; - } - oss << "fetch:"; - for (auto& fetchname : fetch_names) { - oss << fetchname << ","; - } - oss << "scope:" << scope; + const auto& jobs = plan_.JobList(); + for (size_t job_idx = 0; job_idx < jobs.size(); ++job_idx) { + const auto& job = jobs[job_idx]; + const std::string& job_type = job->Type(); - auto iter = interpretercores_.find(oss.str()); + VLOG(6) << "Run job (" << job_idx << "), type = " << job_type + << ", micro_batch_id =" << job->MicroBatchId(); - if (iter == interpretercores_.end()) { - VLOG(3) << "create interpreter_core for " << oss.str() << " on place " - << place_; - std::shared_ptr core = nullptr; - if (FLAGS_enable_new_ir_in_executor) { - core = std::make_shared(place_, - program.Block(0), - scope, - ir_programs_[program_idx].get(), - execution_config); - } else { - core = std::make_shared( - place_, program.Block(0), scope, execution_config); - } - interpretercores_.emplace(oss.str(), core); - return core; + interpretercores_[job_idx]->Run(feed_names, /*need_fetch = */ false); + } + + // return Fetch Tensors + auto* fetch_var = scope_->FindVar(interpreter::kFetchVarName); + if (fetch_var) { + return std::move(*fetch_var->GetMutable()); } else { - return iter->second; + return {}; } } diff --git a/paddle/fluid/framework/new_executor/standalone_executor.h b/paddle/fluid/framework/new_executor/standalone_executor.h index ee3a9af5b6f..aa841bbffd0 100644 --- a/paddle/fluid/framework/new_executor/standalone_executor.h +++ b/paddle/fluid/framework/new_executor/standalone_executor.h @@ -19,6 +19,7 @@ #include #include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/new_executor/interpreter/plan.h" #include "paddle/fluid/framework/new_executor/interpretercore.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h" #include "paddle/fluid/framework/program_desc.h" @@ -33,33 +34,21 @@ class InterpreterCore; class StandaloneExecutor { public: StandaloneExecutor(const platform::Place& place, - const std::vector& programs); + const interpreter::Plan& plan_, + Scope* scope); ~StandaloneExecutor() {} - // NOTE(zhiqiu): feed_names are only used for caching interpretercore. - // fetch_names are used for caching interpretercore and inserting fetch ops, - // the latter can be moved to python side. - paddle::framework::FetchList Run(Scope* scope, - const std::vector& feed_names, - const std::vector& fetch_names); + paddle::framework::FetchList Run(const std::vector& feed_names); private: - std::shared_ptr GetInterpreterCore( - Scope* scope, - const ProgramDesc& prog, - const std::vector& feed_names, - const std::vector& fetch_names, - size_t program_idx, - interpreter::ExecutionConfig execution_config); - const platform::Place place_; - const std::vector programs_; - std::vector> ir_programs_; - std::vector microbatch_scopes_; + const interpreter::Plan plan_; + + std::vector micro_batch_scopes_; + std::vector> interpretercores_; - std::unordered_map> - interpretercores_; + Scope* scope_; }; } // namespace framework diff --git a/paddle/fluid/ir/phi_kernel_adaptor/phi_kernel_util.cc b/paddle/fluid/ir/phi_kernel_adaptor/phi_kernel_util.cc index 6273d703971..cb92831d40c 100644 --- a/paddle/fluid/ir/phi_kernel_adaptor/phi_kernel_util.cc +++ b/paddle/fluid/ir/phi_kernel_adaptor/phi_kernel_util.cc @@ -49,7 +49,7 @@ void BuildScope(ir::Block* block, auto attr_map = (*it)->attributes(); std::string op_name = (*it)->name(); if (attr_map.count("op_name")) { - auto op_name = attr_map.at("op_name").dyn_cast().data(); + op_name = attr_map.at("op_name").dyn_cast().data(); } if (op_name == "pd.fetch") { // fetch is a very special op, with no output diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 2ceff1cca17..1c2914ee7ab 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1157,6 +1157,8 @@ All parameter, weight, gradient are variables in Paddle. _Scope .def("_remove_from_pool", [](Scope &self) { ScopePool::Instance().Remove(&self); }) + .def("raw_address", + [](Scope &self) { return reinterpret_cast(&self); }) .def( "var", [](Scope &self, const std::string &name) -> Variable * { @@ -1852,35 +1854,39 @@ All parameter, weight, gradient are variables in Paddle. }); py::class_(m, "StandaloneExecutor") - .def( - py::init &>()) + .def(py::init()) .def("run", - [](StandaloneExecutor &self, - Scope *scope, - std::vector feed_names, - std::vector fetch_names) { + [](StandaloneExecutor &self, std::vector feed_names) { paddle::framework::FetchList ret; { pybind11::gil_scoped_release release; - ret = self.Run(scope, feed_names, fetch_names); + ret = self.Run(feed_names); } return py::cast(std::move(ret)); }); - py::class_>(m, "job") + py::class_>(m, "Job") .def(py::init(), py::arg("type")) - .def("type", &framework::Job::GetJobType) - .def("micro_batch_id", &framework::Job::GetMicroBatchId) - .def("set_micro_batch_id", &framework::Job::SetMicroBatchId); - - py::class_(m, "plan") - .def(py::init> &, - const std::unordered_map &>(), - py::arg("job_list"), - py::arg("type_to_program")) - .def("job_list", &framework::Plan::GetJobList) - .def("type_to_program", &framework::Plan::GetTypeToProgram); + .def("micro_batch_id", &framework::interpreter::Job::MicroBatchId) + .def("type", &framework::interpreter::Job::Type) + .def("set_col_attr_for_fetch_op", + &framework::interpreter::Job::SetColAttrForFetchOp) + .def("set_micro_batch_id", &framework::interpreter::Job::SetMicroBatchId); + + py::class_(m, "Plan") + .def( + py::init< + const std::vector> &, + const std::unordered_map + &>(), + py::arg("job_list"), + py::arg("type_to_program")) + .def("job_list", &framework::interpreter::Plan::JobList) + .def("micro_batch_num", &framework::interpreter::Plan::MicroBatchNum) + .def("program", &framework::interpreter::Plan::Program); m.def("init_gflags", framework::InitGflags); m.def("init_glog", framework::InitGLOG); diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass.py b/python/paddle/distributed/passes/pipeline_scheduler_pass.py index 516d30dd29e..8ff5f2b35e7 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass.py @@ -270,19 +270,19 @@ class PipelineFThenBPass(PassBase): def _create_job_list(self): job_list = [] - lr_job = core.job("lr") + lr_job = core.Job("lr") job_list.append(lr_job) for i in range(self._micro_batch_size): - forward_job = core.job("forward") + forward_job = core.Job("forward") forward_job.set_micro_batch_id(i) job_list.append(forward_job) for i in range(self._micro_batch_size): - backward_job = core.job("backward") + backward_job = core.Job("backward") backward_job.set_micro_batch_id(i) job_list.append(backward_job) - opt_job = core.job("optimizer") + opt_job = core.Job("optimizer") job_list.append(opt_job) return job_list @@ -294,5 +294,5 @@ class PipelineFThenBPass(PassBase): type_to_program = _program_for_fthenb_and_1f1b(self._program) job_list = self._create_job_list() - plan = core.plan(job_list, type_to_program) + plan = core.Plan(job_list, type_to_program) context.set_attr("plan", plan) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 4ff61f94a24..d1445e22722 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -511,9 +511,11 @@ def _prepare_fleet_executor(): return fleet_exe -def _get_strong_program_cache_key_for_new_exe(program, feed, fetch_list): - return program.desc.cached_hash_str() + _get_program_cache_key( - feed, fetch_list +def _get_strong_program_cache_key_for_new_exe(program, scope, feed, fetch_list): + return ( + program.desc.cached_hash_str() + + str(scope.raw_address()) + + _get_program_cache_key(feed, fetch_list) ) @@ -632,14 +634,14 @@ handler = FetchHandlerExample(var_dict=var_dict) class _StandaloneExecutor: - def __init__(self, place, programs, scope): + def __init__(self, place, plan, scope): self._place = core.Place() self._place.set_place(place) - self._programs = programs + self._plan = plan self._scope = scope self._new_exe = self._create_new_executor() - def run(self, scope, feed_names, fetch_list, return_numpy=True): + def run(self, feed_names, return_numpy=True): """ Args: feed_names(list): This parameter represents the input names of the model. @@ -649,41 +651,17 @@ class _StandaloneExecutor: (the Tensor specified in the fetch list) to numpy.ndarray. if it is False, the type of the return value is a list of :code:`LoDTensor`. The default is True. """ - fetch_list = self._check_fetch(fetch_list) - - tensors = self._new_exe.run( - scope, feed_names, fetch_list - )._move_to_list() + tensors = self._new_exe.run(feed_names)._move_to_list() if return_numpy: return as_numpy(tensors, copy=True) else: return tensors def _create_new_executor(self): - new_exe = core.StandaloneExecutor( - self._place, [program.desc for program in self._programs] - ) + new_exe = core.StandaloneExecutor(self._place, self._plan, self._scope) return new_exe - def _check_fetch(self, fetch_list): - if fetch_list is None: - fetch_list = [] - - res = [] - for fetch_var in fetch_list: - if isinstance(fetch_var, Variable): - fetch_var = fetch_var.name - elif not isinstance(fetch_var, str): - raise TypeError( - "Required fetch_var shall be str|Variable, but received {}".format( - type(fetch_var).__name__ - ) - ) - - res.append(fetch_var) - return res - class _ExecutorCache: class _CachedData: @@ -716,13 +694,16 @@ class _ExecutorCache: ).to_program() self.key = hash( _get_strong_program_cache_key_for_new_exe( - self.program._program, feed, fetch_list + self.program._program, + self.scope, + self.feed, + self.fetch_list, ) ) else: self.key = hash( _get_strong_program_cache_key_for_new_exe( - self.program, feed, fetch_list + self.program, self.scope, self.feed, self.fetch_list ) ) @@ -850,9 +831,12 @@ class _ExecutorCache: _apply_inplace_addto_pass( program, enable_inplace, enable_addto, skip_var_names ) - new_program = program.clone() - new_exe = _StandaloneExecutor(place, [new_program], scope) + new_exe = _StandaloneExecutor( + place, + core.Plan([core.Job("default")], {"default": new_program.desc}), + scope, + ) return new_program, new_exe @@ -1614,9 +1598,7 @@ class Executor: else: tensor._copy_from(cpu_tensor, self.place) - ret = new_exe.run( - scope, list(feed.keys()), fetch_list, return_numpy - ) + ret = new_exe.run(list(feed.keys()), return_numpy) set_flags(stored_flag) return ret diff --git a/test/cpp/new_executor/standalone_executor_new_ir_test.cc b/test/cpp/new_executor/standalone_executor_new_ir_test.cc index f878b41cec2..4aee1ccc554 100644 --- a/test/cpp/new_executor/standalone_executor_new_ir_test.cc +++ b/test/cpp/new_executor/standalone_executor_new_ir_test.cc @@ -70,7 +70,7 @@ TEST(StandaloneExecutor, run) { ProgramDesc prog_desc; InterpreterCore test_core( - place, prog_desc.Block(0), &scope, kernel_program.get()); + place, prog_desc.Block(0), &scope, std::move(kernel_program)); test_core.Run({}); diff --git a/test/cpp/new_executor/standalone_executor_test.cc b/test/cpp/new_executor/standalone_executor_test.cc index a9026ba2468..51abf476179 100644 --- a/test/cpp/new_executor/standalone_executor_test.cc +++ b/test/cpp/new_executor/standalone_executor_test.cc @@ -18,8 +18,10 @@ #include #include +#include #include +#include "paddle/fluid/framework/new_executor/interpreter/plan.h" #include "paddle/phi/core/kernel_registry.h" USE_OP_ITSELF(fill_constant); @@ -106,6 +108,9 @@ PD_DECLARE_KERNEL(add_n, GPU, ALL_LAYOUT); namespace paddle { namespace framework { +using Job = interpreter::Job; +using Plan = interpreter::Plan; + ProgramDesc load_from_file(const std::string& file_name) { std::ifstream fin(file_name, std::ios::in | std::ios::binary); fin.seekg(0, std::ios::end); @@ -146,11 +151,22 @@ TEST(StandaloneExecutor, run) { ProgramDesc main_prog = GetLmMainProgram(); Scope scope; - StandaloneExecutor startup_exec(place, - std::vector{startup_prog}); - startup_exec.Run(&scope, {}, {}); - StandaloneExecutor exec(place, std::vector{main_prog}); - exec.Run(&scope, {}, {}); + std::shared_ptr startup_job = std::make_shared(Job("startup")); + StandaloneExecutor startup_exec( + place, + Plan(std::vector>({startup_job}), + std::unordered_map( + {{startup_job->Type(), &startup_prog}})), + &scope); + startup_exec.Run({}); + + std::shared_ptr main_job = std::make_shared(Job("main")); + StandaloneExecutor exec(place, + Plan(std::vector>({main_job}), + std::unordered_map( + {{main_job->Type(), &main_prog}})), + &scope); + exec.Run({}); auto start = std::chrono::steady_clock::now(); for (size_t i = 0; i < 10; ++i) { @@ -158,7 +174,7 @@ TEST(StandaloneExecutor, run) { std::cout << i << std::endl; } - exec.Run(&scope, {}, {}); + exec.Run({}); } auto end = std::chrono::steady_clock::now(); diff --git a/test/ir/new_ir/test_standalone_new_ir.py b/test/ir/new_ir/test_standalone_new_ir.py index a2a4b0c2d46..781ebd73e9a 100644 --- a/test/ir/new_ir/test_standalone_new_ir.py +++ b/test/ir/new_ir/test_standalone_new_ir.py @@ -40,9 +40,7 @@ class TestNewIr(unittest.TestCase): self.assertEqual( np.array_equal( np.array( - paddle.static.global_scope() - .find_var("inner_var_2") - .get_tensor() + paddle.static.global_scope().find_var(z.name).get_tensor() ), gold_res, ), diff --git a/test/standalone_executor/test_standalone_executor_multi_micro_batch.py b/test/standalone_executor/test_standalone_executor_multi_micro_batch.py new file mode 100644 index 00000000000..8421e77fb48 --- /dev/null +++ b/test/standalone_executor/test_standalone_executor_multi_micro_batch.py @@ -0,0 +1,232 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest + +import numpy as np + +import paddle +from paddle.distributed.passes.pass_utils import split_program +from paddle.fluid import core +from paddle.fluid.core import Job, Plan +from paddle.fluid.executor import _add_feed_fetch_ops, _StandaloneExecutor +from paddle.nn import TransformerEncoderLayer + +paddle.enable_static() + + +class TestEncorderMulitMicroBatchRun(unittest.TestCase): + def setUp(self): + self.place_desc = ( + paddle.CUDAPlace(0) + if core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + self.place = core.Place() + self.place.set_place(self.place_desc) + + self.batch_size = 2 + self.src_len = 4 + self.d_model = 128 + self.n_head = 2 + self.run_step = 3 + + self.enc_input_data, self.attn_mask_data = self.get_random_data( + self.batch_size, + self.src_len, + self.d_model, + self.n_head, + self.run_step, + ) + + def get_random_data(self, batch_size, src_len, d_model, n_head, run_step): + np.random.seed(2022) + + enc_input_data = np.random.rand( + run_step, batch_size, src_len, d_model + ).astype(np.float32) + attn_mask_data = np.random.rand( + run_step, batch_size, n_head, src_len, src_len + ).astype(np.float32) + + return enc_input_data, attn_mask_data + + def batch_generator_creator(self, micro_batch_size): + def __reader__(): + for i in range(self.run_step): + for offset in range(0, self.batch_size, micro_batch_size): + enc_input = self.enc_input_data[i][ + offset : offset + micro_batch_size + ] + attn_mask = self.attn_mask_data[i][ + offset : offset + micro_batch_size + ] + yield enc_input, attn_mask + + return __reader__ + + def build_program(self, micro_batch_size, src_len, d_model, n_head): + startup_program = paddle.static.Program() + main_program = paddle.static.Program() + + with paddle.static.program_guard(main_program, startup_program): + enc_input = paddle.static.data( + name="enc_input", + shape=[micro_batch_size, src_len, d_model], + dtype="float32", + ) + attn_mask = paddle.static.data( + name="attn_mask", + shape=[micro_batch_size, n_head, src_len, src_len], + dtype="float32", + ) + + loader = paddle.fluid.io.DataLoader.from_generator( + feed_list=[enc_input, attn_mask], + use_double_buffer=False, + capacity=16, + iterable=False, + ) + loader.set_batch_generator( + self.batch_generator_creator(micro_batch_size) + ) + + encoder_layer = TransformerEncoderLayer( + d_model, n_head, dim_feedforward=512 + ) + attn_mask = paddle.nn.layer.transformer._convert_attention_mask( + attn_mask, enc_input.dtype + ) + + enc_output = encoder_layer(enc_input, attn_mask) + + split_op_indics = [len(main_program.block(0).ops)] + + enc_output = encoder_layer(enc_output, attn_mask) + + fetch_list = [enc_output.name] + + return ( + startup_program, + main_program, + split_op_indics, + loader, + fetch_list, + ) + + def avoid_randomness(self, program): + for op in program.block(0).ops: + if op.type == "dropout": + op._set_attr("dropout_prob", 0) + + def run_train(self, split=False, micro_batch_num=1): + paddle.seed(2022) + + scope = paddle.static.Scope() + + with paddle.static.scope_guard(scope): + ( + startup_program, + main_program, + split_op_indics, + loader, + fetch_list, + ) = self.build_program( + self.batch_size // micro_batch_num, + self.src_len, + self.d_model, + self.n_head, + ) + + self.avoid_randomness(main_program) + + startup_exe = _StandaloneExecutor( + self.place, + Plan([Job("startup")], {"startup": startup_program.desc}), + scope, + ) + startup_exe.run([]) + + programs = [main_program] + fetch_op_num = len(fetch_list) + fetch_op_indics = [] + if split: + programs, _, _ = split_program(main_program, split_op_indics) + # hack add fetch ops in the last program + programs[-1] = _add_feed_fetch_ops( + programs[-1], [], fetch_list, "feed", "fetch" + ) + op_num = len(programs[-1].block(0).ops) + fetch_op_indics = list(range(op_num - fetch_op_num, op_num)) + else: + programs[0] = _add_feed_fetch_ops( + programs[0], [], fetch_list, "feed", "fetch" + ) + op_num = len(programs[0].block(0).ops) + fetch_op_indics = list(range(op_num - fetch_op_num, op_num)) + + job_list = [] + program_num = len(programs) + + for micro_batch_id in range(micro_batch_num): + for program_id in range(program_num): + job = Job(f"P{program_id}") + job.set_micro_batch_id(micro_batch_id) + # Set col_attr info for fetch_op to fetch the correct data after running multiple micro batch + if program_id == program_num - 1: + fetch_op_id_to_col_attr = {} + for i in range(fetch_op_num): + job.set_col_attr_for_fetch_op( + fetch_op_indics[i], + i * micro_batch_num + micro_batch_id, + ) + job_list.append(job) + + type_to_program = {} + for program_id in range(program_num): + type_to_program[f"P{program_id}"] = programs[program_id].desc + + plan = Plan(job_list, type_to_program) + + main_exe = _StandaloneExecutor(self.place, plan, scope) + + loader.start() + res = [] + for i in range(self.run_step): + fetch_res = main_exe.run(feed_names=[]) + res.append( + np.array(fetch_res).reshape( + self.batch_size, self.src_len, self.d_model + ) + ) + + return res + + def test_multi_micro_batch_run(self): + last_res = None + + for split in [True, False]: + for micro_batch_num in [1, 2]: + res = self.run_train(split, micro_batch_num) + if last_res: + for i in range(len(res)): + np.testing.assert_allclose( + last_res[i], res[i], atol=1e-6, rtol=1e-6 + ) + last_res = res + + +if __name__ == "__main__": + unittest.main() diff --git a/test/standalone_executor/test_standalone_executor_multi_program.py b/test/standalone_executor/test_standalone_executor_multi_program.py deleted file mode 100644 index 5eeb64f5fac..00000000000 --- a/test/standalone_executor/test_standalone_executor_multi_program.py +++ /dev/null @@ -1,184 +0,0 @@ -# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import unittest - -import numpy as np - -import paddle -from paddle.distributed.passes.pass_utils import split_program -from paddle.fluid import core -from paddle.fluid.executor import ( - _add_feed_fetch_ops, - _as_lodtensor, - _StandaloneExecutor, - check_feed_shape_type, -) -from paddle.nn import TransformerEncoderLayer - -paddle.enable_static() - - -class TestMulitProgramRun(unittest.TestCase): - def setUp(self): - self.place_desc = ( - paddle.CUDAPlace(0) - if core.is_compiled_with_cuda() - else paddle.CPUPlace() - ) - self.place = core.Place() - self.place.set_place(self.place_desc) - - def build_program(self): - batch_size = 2 - src_len = 4 - d_model = 128 - n_head = 2 - - startup_program = paddle.static.Program() - main_program = paddle.static.Program() - - with paddle.static.program_guard(main_program, startup_program): - enc_input = paddle.static.data( - name="enc_input", - shape=[batch_size, src_len, d_model], - dtype="float32", - ) - attn_mask = paddle.static.data( - name="attn_mask", - shape=[batch_size, n_head, src_len, src_len], - dtype="float32", - ) - encoder_layer = TransformerEncoderLayer( - d_model, n_head, dim_feedforward=512 - ) - attn_mask = paddle.nn.layer.transformer._convert_attention_mask( - attn_mask, enc_input.dtype - ) - - enc_output = encoder_layer(enc_input, attn_mask) - - split_op_indics = [len(main_program.block(0).ops)] - - enc_output = encoder_layer(enc_output, attn_mask) - - np.random.seed(2022) - feed = { - enc_input.name: np.random.rand( - batch_size, src_len, d_model - ).astype(np.float32), - attn_mask.name: np.random.rand( - batch_size, n_head, src_len, src_len - ).astype(np.float32), - } - fetch_list = [enc_output.name] - - return ( - startup_program, - main_program, - split_op_indics, - feed, - fetch_list, - ) - - def feed_data(self, program, feed, feed_var_name, scope): - # feed var to framework - global_block = program.global_block() - for op in global_block.ops: - if op.desc.type() == 'feed': - feed_target_name = op.desc.output('Out')[0] - cur_feed = feed[feed_target_name] - var = global_block.var(feed_target_name) - if var.dtype != core.VarDesc.VarType.STRINGS: - if not isinstance(cur_feed, core.LoDTensor): - cur_feed = _as_lodtensor( - cur_feed, self.place_desc, var.dtype - ) - check_feed_shape_type(var, cur_feed) - idx = op.desc.attr('col') - core.set_feed_variable(scope, cur_feed, feed_var_name, idx) - else: - break - - def run_program( - self, - startup_program, - main_program, - feed, - fetch_list, - scope, - run_step, - split_op_indics=None, - ): - paddle.seed(2022) - - startup_exe = _StandaloneExecutor(self.place, [startup_program], scope) - startup_exe.run(scope, [], []) - - programs = [main_program] - if split_op_indics is not None: - programs, _, _ = split_program(main_program, split_op_indics) - # hack add feed ops in the first program and fetch ops in the last program - programs[0] = _add_feed_fetch_ops( - programs[0], feed, [], "feed", "fetch" - ) - programs[-1] = _add_feed_fetch_ops( - programs[-1], [], fetch_list, "feed", "fetch" - ) - else: - programs[0] = _add_feed_fetch_ops( - programs[0], feed, fetch_list, "feed", "fetch" - ) - - self.feed_data(programs[0], feed, "feed", scope) - - main_exe = _StandaloneExecutor(self.place, programs, scope) - - res = [] - for i in range(run_step): - res += main_exe.run(scope, list(feed.keys()), fetch_list) - return res - - def test_multi_program_run(self): - ( - startup_program, - main_program, - split_op_indics, - feed, - fetch_list, - ) = self.build_program() - run_step = 3 - res = self.run_program( - startup_program, - main_program, - feed, - fetch_list, - paddle.static.Scope(), - run_step, - ) - splited_res = self.run_program( - startup_program, - main_program, - feed, - fetch_list, - paddle.static.Scope(), - run_step, - split_op_indics, - ) - np.testing.assert_array_equal(res, splited_res) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/standalone_executor/test_standalone_executor_plan.py b/test/standalone_executor/test_standalone_executor_plan.py index e10c4ba58b1..72fc4ab9bb7 100644 --- a/test/standalone_executor/test_standalone_executor_plan.py +++ b/test/standalone_executor/test_standalone_executor_plan.py @@ -21,9 +21,9 @@ from paddle.fluid import core class TestStandaloneExecutorPlan(unittest.TestCase): def test_standalone_executor_plan(self): micro_batch_id = 0 - forward_job = core.job("forward") - backward_job = core.job("backward") - optimizer_job = core.job("optimizer") + forward_job = core.Job("forward") + backward_job = core.Job("backward") + optimizer_job = core.Job("optimizer") forward_job.set_micro_batch_id(micro_batch_id) backward_job.set_micro_batch_id(micro_batch_id) optimizer_job.set_micro_batch_id(micro_batch_id) @@ -39,9 +39,10 @@ class TestStandaloneExecutorPlan(unittest.TestCase): "backward": backward_program.desc, "optimizer": optimizer_program.desc, } - plan = core.plan(job_list, type_to_program) + plan = core.Plan(job_list, type_to_program) self.assertEqual(plan.job_list(), job_list) - self.assertEqual(plan.type_to_program(), type_to_program) + for type in type_to_program.keys(): + self.assertEqual(plan.program(type), type_to_program[type]) if __name__ == '__main__': diff --git a/tools/windows/run_unittests.sh b/tools/windows/run_unittests.sh index 03fb4b02543..e7c05f2768a 100644 --- a/tools/windows/run_unittests.sh +++ b/tools/windows/run_unittests.sh @@ -44,6 +44,7 @@ disable_wingpu_test="^test_model$|\ ^test_dataloader_unkeep_order$|\ ^test_multiprocess_dataloader_iterable_dataset_static$|\ ^test_fuse_bn_act_pass$|\ +^test_fuse_bn_act_pass_static_build$|\ ^test_fuse_bn_add_act_pass$|\ ^test_gather_op$|\ ^test_activation_op$|\ @@ -165,6 +166,7 @@ disable_win_inference_test="^trt_quant_int8_yolov3_r50_test$|\ ^test_sync_batch_norm_op$|\ ^test_sync_batch_norm_op_static_build$|\ ^test_fuse_bn_act_pass$|\ +^test_fuse_bn_act_pass_static_build$|\ ^test_fuse_bn_add_act_pass$|\ ^test_decoupled_py_reader_data_check$|\ ^test_parallel_dygraph_sync_batch_norm$|\ -- GitLab