diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 79ca6f467a38dcde2acc3920611b19b9c763c6af..3e198dc3eeea4995ad9f3e526decc5c4f65a3ac7 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -19,7 +19,9 @@ #include "paddle/fluid/distributed/fleet_executor/runtime_graph.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/framework/garbage_collector.h" +#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/variable_helper.h" namespace paddle { namespace distributed { @@ -43,18 +45,24 @@ void Carrier::Init( int64_t rank, const std::unordered_map& interceptor_id_to_rank, const std::unordered_map& interceptor_id_to_node, - framework::Scope* root_scope, framework::Scope* minibatch_scope, - const std::vector& microbatch_scopes, - const platform::Place& place) { + const framework::ProgramDesc& program, framework::Scope* scope, + int64_t num_micro_batches, const platform::Place& place) { rank_ = rank; interceptor_id_to_rank_ = interceptor_id_to_rank; interceptor_id_to_node_ = interceptor_id_to_node; - minibatch_scope_ = minibatch_scope; - microbatch_scopes_ = microbatch_scopes; place_ = place; - root_scope_ = root_scope; + root_scope_ = scope; dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); + PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument( + "root_scope can not be nullptr")); + minibatch_scope_ = &root_scope_->NewScope(); + microbatch_scopes_.resize(num_micro_batches); + for (int i = 0; i < num_micro_batches; ++i) { + microbatch_scopes_[i] = &minibatch_scope_->NewScope(); + CopyParameters(i, program); + } + // TODO(fleet_exe dev): thread pool thread_num_ = 1; thread_pool_.SetThreadNum(thread_num_); @@ -64,10 +72,33 @@ void Carrier::Init( is_init_ = true; } -void Carrier::Release() {} +void Carrier::Release() { + if (root_scope_) { + root_scope_->DropKids(); + } +} Carrier::~Carrier() { VLOG(3) << "Carrier's destructor."; } +void Carrier::CopyParameters(int microbatch_id, + const framework::ProgramDesc& program) { + auto& global_block = program.Block(0); + + for (auto& var : global_block.AllVars()) { + if (var->Persistable() && microbatch_id == 0) { + auto* ptr = root_scope_->Var(var->Name()); + InitializeVariable(ptr, var->GetType()); + VLOG(5) << "Create persistable var: " << var->Name() + << ", which pointer is " << ptr; + } else if (!var->Persistable()) { + auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name()); + VLOG(5) << "Create variable " << var->Name() << " for microbatch " + << microbatch_id << ", which pointer is " << ptr << "."; + InitializeVariable(ptr, var->GetType()); + } + } +} + bool Carrier::EnqueueInterceptorMessage( const InterceptorMessage& interceptor_message) { PADDLE_ENFORCE_EQ( @@ -116,6 +147,15 @@ void Carrier::Start() { // TODO(wangxi): async step Wait(); dev_ctx_->Wait(); + for (auto* micro_scope : microbatch_scopes_) { + // By default, we should delete all kid scopes after run executor because + // some operators may create local scope when running, such as while_op. + // But when while_op also create a local executor to run it's sub block, + // the sub scopes it created should not be dropped immediately, because + // while_grad_op will use some variables created during while_op run, so + // we need to keep the kids and wait for the outer executor to drop them. + micro_scope->DropKids(); + } } bool Carrier::IsInit() const { return is_init_; } diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 75ac07083a7968f379e7e946f2ac91fac180d65d..7762effdb9c871fe22585acf32c5977846df5d09 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -34,6 +34,7 @@ namespace paddle { namespace framework { class Scope; +class ProgramDesc; } namespace distributed { @@ -55,9 +56,10 @@ class Carrier final { int64_t rank, const std::unordered_map& interceptor_id_to_rank, const std::unordered_map& interceptor_id_to_node, - framework::Scope* root_scope, framework::Scope* minibatch_scope, - const std::vector& microbatch_scopes, - const platform::Place& place); + const framework::ProgramDesc& program, framework::Scope* scope, + int64_t num_micro_batches, const platform::Place& place); + + void CopyParameters(int microbatch_id, const framework::ProgramDesc& program); void Release(); void Wait(); diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index d6c1e678ad4f795afd0dbffd0f8bfb058e99076f..19c44fa521b1bffbe4baab27a4262e2c3bded6f1 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -22,8 +22,6 @@ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" -#include "paddle/fluid/framework/scope.h" -#include "paddle/fluid/framework/variable_helper.h" namespace paddle { namespace distributed { @@ -38,7 +36,6 @@ FleetExecutor::FleetExecutor(const std::string& exe_desc_str) { } FleetExecutor::~FleetExecutor() { - root_scope_->DropKids(); for (const auto& carrier_id : carrier_ids_) { GlobalMap::Get(carrier_id)->Release(); } @@ -47,7 +44,7 @@ FleetExecutor::~FleetExecutor() { void FleetExecutor::Init( const std::string& carrier_id, const framework::ProgramDesc& program_desc, framework::Scope* scope, const platform::Place& place, - const std::vector& task_nodes, + int64_t num_micro_batches, const std::vector& task_nodes, const std::unordered_map& task_id_to_rank) { PADDLE_ENFORCE_GT(task_nodes.size(), 0, platform::errors::InvalidArgument( @@ -72,31 +69,23 @@ void FleetExecutor::Init( for (auto& unique_op : ops) { unique_op.release(); } - root_scope_ = scope; - place_ = place; - PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument( - "root_scope_ can not be nullptr")); - minibatch_scope_ = &root_scope_->NewScope(); - int64_t num_micro_batches = exe_desc_.num_micro_batches(); - microbatch_scopes_.resize(num_micro_batches); - for (int i = 0; i < num_micro_batches; ++i) { - microbatch_scopes_[i] = &minibatch_scope_->NewScope(); - CopyParameters(i, program_desc); - } VLOG(5) << runtime_graph_->DebugString(); Carrier* carrier = GlobalMap::Create(carrier_id, carrier_id); carrier_ids_.insert(carrier_id); // Set current running carrier GlobalVal::Set(new std::string(carrier_id)); - InitCarrier(carrier); + InitCarrier(carrier, scope, place, num_micro_batches, program_desc); GlobalVal::Get()->Barrier(); } -void FleetExecutor::InitCarrier(Carrier* carrier) { +void FleetExecutor::InitCarrier(Carrier* carrier, framework::Scope* scope, + const platform::Place& place, + int64_t num_micro_batches, + const framework::ProgramDesc& program_desc) { carrier->Init(exe_desc_.cur_rank(), runtime_graph_->interceptor_id_to_rank(), - runtime_graph_->interceptor_id_to_node(), root_scope_, - minibatch_scope_, microbatch_scopes_, place_); + runtime_graph_->interceptor_id_to_node(), program_desc, scope, + num_micro_batches, place); } void FleetExecutor::InitMessageBus() { @@ -140,34 +129,6 @@ void FleetExecutor::Run(const std::string& carrier_id) { GlobalVal::Get()->Barrier(); } carrier->Start(); - for (auto* micro_scop : microbatch_scopes_) { - // By default, we should delete all kid scopes after run executor because - // some operators may create local scope when running, such as while_op. - // But when while_op also create a local executor to run it's sub block, - // the sub scopes it created should not be dropped immediately, because - // while_grad_op will use some variables created during while_op run, so - // we need to keep the kids and wait for the outer executor to drop them. - micro_scop->DropKids(); - } -} - -void FleetExecutor::CopyParameters(int microbatch_id, - const framework::ProgramDesc& program) { - auto& global_block = program.Block(0); - - for (auto& var : global_block.AllVars()) { - if (var->Persistable() && microbatch_id == 0) { - auto* ptr = root_scope_->Var(var->Name()); - InitializeVariable(ptr, var->GetType()); - VLOG(5) << "Create persistable var: " << var->Name() - << ", which pointer is " << ptr; - } else if (!var->Persistable()) { - auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name()); - VLOG(5) << "Create variable " << var->Name() << " for microbatch " - << microbatch_id << ", which pointer is " << ptr << "."; - InitializeVariable(ptr, var->GetType()); - } - } } } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h index 89ab4c62d386f21ab46b45e8782dd1bf7e3ecc3e..b2af3e4e457c7573389d491a313a63672b71b627 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.h +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -39,7 +39,7 @@ class FleetExecutor final { ~FleetExecutor(); void Init(const std::string& carrier_id, const framework::ProgramDesc& program_desc, framework::Scope* scope, - const platform::Place& place, + const platform::Place& place, int64_t num_micro_batches, const std::vector& task_nodes, const std::unordered_map& task_id_to_rank); void Run(const std::string& carrier_id); @@ -47,14 +47,11 @@ class FleetExecutor final { private: DISABLE_COPY_AND_ASSIGN(FleetExecutor); void InitMessageBus(); - void InitCarrier(Carrier* carrier); - void CopyParameters(int microbatch_id, const framework::ProgramDesc& program); + void InitCarrier(Carrier* carrier, framework::Scope* scope, + const platform::Place& place, int64_t num_micro_batches, + const framework::ProgramDesc& program_desc); FleetExecutorDesc exe_desc_; std::shared_ptr runtime_graph_; - framework::Scope* root_scope_; - framework::Scope* minibatch_scope_; - platform::Place place_; - std::vector microbatch_scopes_; std::unordered_set carrier_ids_; }; diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto index aa553557852a767b571b000cbf9b78069db94019..d048660774b3940259c2b4ea3d2dacd70f0bf0bc 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto @@ -23,5 +23,4 @@ message RankInfo { message FleetExecutorDesc { optional int64 cur_rank = 1 [ default = 0 ]; // Rank id of current processor repeated RankInfo cluster_info = 2; - optional int64 num_micro_batches = 3 [ default = 1 ]; } diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 7f282b8cea07a8ced6adb003cfb04b9b06d26569..d67d4944c69cb74689b2d3184147363b3e64b526 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -400,6 +400,23 @@ def _is_enable_standalone_executor(): return flag +def _prepare_fleet_executor(): + from ..distributed.fleet.proto import fleet_executor_desc_pb2 + trainer_endpoints_str = os.getenv("PADDLE_TRAINER_ENDPOINTS", "") + trainer_endpoints = trainer_endpoints_str.split(',') + fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc() + cur_rank = int(os.getenv("PADDLE_TRAINER_ID", 0)) + fleet_exe_desc.cur_rank = cur_rank + nrank = len(trainer_endpoints) + for rank, endpoint in enumerate(trainer_endpoints): + rank_info = fleet_executor_desc_pb2.RankInfo() + rank_info.rank = rank + rank_info.ip_port = endpoint + fleet_exe_desc.cluster_info.append(rank_info) + fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString()) + return fleet_exe + + def _get_strong_program_cache_key(program, feed, fetch_list): # NOTE(xiongkun) id(proram) may be duplicate. So add addition var_name as cache key. def _get_varname_from_block(block): @@ -692,6 +709,8 @@ class Executor(object): self._enable_interpreter_core = _is_enable_standalone_executor() self._executor_cache = _ExecutorCache(self.place) + self._fleet_executor = None + def _get_scope_cache(self, program_cache_key): return self.scope_caches.get(program_cache_key, None) @@ -1281,6 +1300,9 @@ class Executor(object): if isinstance(program, Program) and program._pipeline_opt: if "fleet_opt" in program._pipeline_opt: + # Move prepare here for port conflict with nccl in startup program + if self._fleet_executor is None: + self._fleet_executor = _prepare_fleet_executor() return self._run_using_fleet_executor( program=program, feed=feed, fetch_list=fetch_list) if "startup_program" in program._pipeline_opt: @@ -1960,27 +1982,16 @@ class Executor(object): return ctx - def _prepare_fleet_executor(self, - carrier_id="", - program=None, - scope=None, - fleet_opt=None): - from ..distributed.fleet.proto import fleet_executor_desc_pb2 - assert program, "Program for fleet executor should not be None" - assert fleet_opt, "Configurations for fleet executor should not be None" - trainer_endpoints_str = os.getenv("PADDLE_TRAINER_ENDPOINTS", "") - trainer_endpoints = trainer_endpoints_str.split(',') - fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc() + def _prepare_fleet_executor_carrier(self, + carrier_id="", + program=None, + scope=None, + fleet_opt=None): + num_micro_batches = fleet_opt[ + "num_micro_batches"] if "num_micro_batches" in fleet_opt else 1 cur_rank = int(os.getenv("PADDLE_TRAINER_ID", 0)) - fleet_exe_desc.cur_rank = cur_rank + trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", "").split(',') nrank = len(trainer_endpoints) - for rank, endpoint in enumerate(trainer_endpoints): - rank_info = fleet_executor_desc_pb2.RankInfo() - rank_info.rank = rank - rank_info.ip_port = endpoint - fleet_exe_desc.cluster_info.append(rank_info) - if "num_micro_batches" in fleet_opt: - fleet_exe_desc.num_micro_batches = fleet_opt["num_micro_batches"] assert 'scheduler' in fleet_opt or 'tasks' in fleet_opt, \ "Fleet executor need configuration for scheduler, you can choose from 1F1B or Origin. " \ @@ -2019,12 +2030,10 @@ class Executor(object): # NOTE: have to hold these vars, otherwise will be destructed fleet_opt['tasks'] = tasks fleet_opt['task_id_to_rank'] = task_id_to_rank - fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString()) place = core.Place() place.set_place(self.place) - fleet_exe.init(carrier_id, program.desc, scope, place, tasks, - task_id_to_rank) - return fleet_exe + self._fleet_executor.init(carrier_id, program.desc, scope, place, + num_micro_batches, tasks, task_id_to_rank) def _run_using_fleet_executor(self, program=None, @@ -2032,16 +2041,15 @@ class Executor(object): feed_var_name="feed", fetch_var_name="fetch", fetch_list=None): - # TODO(liyurui): Change cache strategy for multi carriers cache_key = _get_strong_program_cache_key(program, feed, fetch_list) - cached_ctx = self._get_ctx_cache(cache_key) - cached_scope = self._get_scope_cache(cache_key) cached_program = self._get_program_cache(cache_key) - real_feed = [] if feed is None else feed + cached_scope = self._get_scope_cache(cache_key) if cached_scope is None: cached_scope = global_scope() self._add_scope_cache(cache_key, cached_scope) if cached_program is None: + assert program._pipeline_opt, "program should have _pipeline_opt to start carrier" + real_feed = [] if feed is None else feed real_program = program if "section_program" in program._pipeline_opt: real_program = program._pipeline_opt["section_program"] @@ -2060,7 +2068,6 @@ class Executor(object): 'op_role', core.op_proto_and_checker_maker.OpRole.Optimize) self._add_program_cache(cache_key, cached_program) - if cached_ctx is None: fleet_opt = program._pipeline_opt["fleet_opt"] if 'tasks' in fleet_opt: # Insert feed/fetch op for cloned program in each task node, @@ -2097,12 +2104,12 @@ class Executor(object): core.op_proto_and_checker_maker.OpRole.Optimize) fetch_task.set_program(fetch_program) - cached_ctx = self._prepare_fleet_executor( + self._prepare_fleet_executor_carrier( cache_key, program=cached_program, scope=cached_scope, fleet_opt=fleet_opt) - self._add_ctx_cache(cache_key, cached_ctx) + if feed: # NOTE: don't have to traverse programs in task nodes, # since they all sub program of cached program and @@ -2120,7 +2127,8 @@ class Executor(object): lr_sheduler._var_name) tensor.set(data, self.place) - cached_ctx.run(cache_key) + self._fleet_executor.run(cache_key) + if fetch_list: arr = cached_scope.find_var(fetch_var_name).get_fetch_list() tensors = arr._move_to_list()