diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 6b47095dbe1b438e840c3816ce9ee46d73986510..6b2808914116a48bdea15f2610bfda90464fcea6 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -17,6 +17,7 @@ #include "paddle/fluid/distributed/fleet_executor/interceptor_message_service.h" #include "paddle/fluid/distributed/fleet_executor/message_bus.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" +#include "paddle/fluid/framework/scope.h" namespace paddle { namespace distributed { @@ -24,10 +25,16 @@ namespace distributed { USE_INTERCEPTOR(Compute); void Carrier::Init( - const std::unordered_map& interceptor_id_to_node) { + const std::unordered_map& interceptor_id_to_node, + framework::Scope* minibatch_scope, + const std::vector& microbatch_scopes, + const platform::Place& place) { PADDLE_ENFORCE_EQ(is_init_, false, platform::errors::AlreadyExists( "Carrier is already init.")); interceptor_id_to_node_ = interceptor_id_to_node; + minibatch_scope_ = minibatch_scope; + microbatch_scopes_ = microbatch_scopes; + place_ = place; CreateInterceptors(); is_init_ = true; } diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 6ad5aee128a83fbb0458b00c33bbc31b57bccd76..ee6d3158bf8c04d8ebf9622806e430ad0f9eb18b 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -26,8 +26,13 @@ #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/errors.h" #include "paddle/fluid/platform/macros.h" +#include "paddle/fluid/platform/place.h" namespace paddle { +namespace framework { +class Scope; +} + namespace distributed { class TaskNode; @@ -42,7 +47,10 @@ class Carrier final { } void Init( - const std::unordered_map& interceptor_id_to_node); + const std::unordered_map& interceptor_id_to_node, + framework::Scope* minibatch_scope, + const std::vector& microbatch_scopes, + const platform::Place& place); ~Carrier(); @@ -89,6 +97,9 @@ class Carrier final { std::mutex running_mutex_; std::condition_variable cond_var_; + std::vector microbatch_scopes_; + framework::Scope* minibatch_scope_; + paddle::platform::Place place_; }; } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 868e23280be86d3f5bb6e7cf9f5be8747dabe0fc..2483b4a545df462dc58e87a7c2f0fc0340f95992 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -19,6 +19,8 @@ #include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/variable_helper.h" namespace paddle { namespace distributed { @@ -33,8 +35,21 @@ FleetExecutor::~FleetExecutor() { // Destroy Executor } -void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { +void FleetExecutor::Init(const framework::ProgramDesc& program_desc, + framework::Scope* scope, + const platform::Place& place) { runtime_graph_ = std::make_unique(program_desc, exe_desc_); + root_scope_ = scope; + place_ = place; + PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument( + "root_scope_ can not be nullptr")); + minibatch_scope_ = &root_scope_->NewScope(); + int64_t num_micro_batches = exe_desc_.num_micro_batches(); + microbatch_scopes_.resize(num_micro_batches); + for (int i = 0; i < num_micro_batches; ++i) { + microbatch_scopes_[i] = &minibatch_scope_->NewScope(); + CopyParameters(i, program_desc); + } VLOG(5) << runtime_graph_->DebugString(); InitCarrier(); InitMessageBus(); @@ -43,7 +58,8 @@ void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { void FleetExecutor::InitCarrier() { Carrier& carrier_instance = Carrier::Instance(); if (!carrier_instance.IsInit()) { - carrier_instance.Init(runtime_graph_->intercepter_id_to_node()); + carrier_instance.Init(runtime_graph_->intercepter_id_to_node(), + minibatch_scope_, microbatch_scopes_, place_); } } @@ -97,8 +113,25 @@ void FleetExecutor::Run() { carrier_instance.Start(); } -void FleetExecutor::Release() { - // Release +void FleetExecutor::Release() { root_scope_->DropKids(); } + +void FleetExecutor::CopyParameters(int microbatch_id, + const framework::ProgramDesc& program) { + auto& global_block = program.Block(0); + + for (auto& var : global_block.AllVars()) { + if (var->Persistable() && microbatch_id == 0) { + auto* ptr = root_scope_->Var(var->Name()); + InitializeVariable(ptr, var->GetType()); + VLOG(5) << "Create persistable var: " << var->Name() + << ", which pointer is " << ptr; + } else if (!var->Persistable()) { + auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name()); + VLOG(5) << "Create variable " << var->Name() << " for microbatch " + << microbatch_id << ", which pointer is " << ptr << "."; + InitializeVariable(ptr, var->GetType()); + } + } } } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h index 3ef3e5345492abc62e648a70a8986766a84c0464..7be18772e9ec9f85f334609529730f9b7867baab 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.h +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -18,10 +18,12 @@ #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" #include "paddle/fluid/platform/macros.h" +#include "paddle/fluid/platform/place.h" namespace paddle { namespace framework { class ProgramDesc; +class Scope; } namespace distributed { @@ -34,16 +36,22 @@ class FleetExecutor final { FleetExecutor() = delete; explicit FleetExecutor(const std::string& exe_desc_str); ~FleetExecutor(); - void Init(const paddle::framework::ProgramDesc& program_desc); + void Init(const framework::ProgramDesc& program_desc, framework::Scope* scope, + const platform::Place& place); void Run(); void Release(); private: DISABLE_COPY_AND_ASSIGN(FleetExecutor); - FleetExecutorDesc exe_desc_; - std::unique_ptr runtime_graph_; void InitMessageBus(); void InitCarrier(); + void CopyParameters(int microbatch_id, const framework::ProgramDesc& program); + FleetExecutorDesc exe_desc_; + std::unique_ptr runtime_graph_; + framework::Scope* root_scope_; + framework::Scope* minibatch_scope_; + platform::Place place_; + std::vector microbatch_scopes_; }; } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/interceptor_message.proto b/paddle/fluid/distributed/fleet_executor/interceptor_message.proto index a2fe01cfe3822b83fe5ee4b723edc99e23fe5923..c9ab477183a318d16ceace37f2af51dc9cae6725 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor_message.proto +++ b/paddle/fluid/distributed/fleet_executor/interceptor_message.proto @@ -21,7 +21,7 @@ enum MessageType { STOP = 1; // STOP an Interceptor DATA_IS_READY = 2; // upstream data is ready DATE_IS_USELESS = 3; // downstream has used the data - ERROR = 4; // current Interceptor encounters error + ERR = 4; // current Interceptor encounters error RESET = 5; // reset the status } diff --git a/paddle/fluid/pybind/bind_fleet_executor.cc b/paddle/fluid/pybind/bind_fleet_executor.cc index 726c0428390f1d7a1a98e3482a002769c80b751c..115be1b8ba8b4dbf77cbaedd99552aa7393c305f 100644 --- a/paddle/fluid/pybind/bind_fleet_executor.cc +++ b/paddle/fluid/pybind/bind_fleet_executor.cc @@ -17,6 +17,8 @@ #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/platform/place.h" namespace py = pybind11; @@ -30,7 +32,8 @@ void BindFleetExecutor(py::module* m) { py::class_(*m, "FleetExecutor") .def(py::init()) .def("init", &FleetExecutor::Init) - .def("run", &FleetExecutor::Run); + .def("run", &FleetExecutor::Run) + .def("release", &FleetExecutor::Release); py::class_(*m, "TaskNode") .def(py::init()) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 0d46461dd6a5faea1b47cec43394d4e4ad44686e..dc4e3ce39af830bfa740dc801d26c7ec49e94cec 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1997,8 +1997,12 @@ class Executor(object): num_of_gpu = fleet_exe_desc.dp_degree * fleet_exe_desc.mp_degree * fleet_exe_desc.pp_degree assert nrank == num_of_gpu, "The number of rank is not equal to the number of gpu." fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString()) - fleet_exe.init(program._pipeline_opt["section_program"].desc) + place = core.Place() + place.set_place(self.place) + fleet_exe.init(program._pipeline_opt["section_program"].desc, scope, + place) fleet_exe.run() + fleet_exe.release() return None def _run_pipeline(self,