From 550e7e410b21484552d30caeedca965ff3a540b0 Mon Sep 17 00:00:00 2001 From: chengduo Date: Thu, 20 Dec 2018 01:36:15 -0600 Subject: [PATCH] Code Clean parallel_executor.py (#14849) * refine parallel_executor * remove uncessary code test=develop --- .../fluid/framework/details/build_strategy.cc | 7 +- .../fluid/framework/details/build_strategy.h | 15 ++-- .../details/multi_devices_graph_pass.cc | 5 -- .../details/multi_devices_graph_pass.h | 1 - paddle/fluid/framework/parallel_executor.cc | 9 +-- paddle/fluid/framework/parallel_executor.h | 1 - paddle/fluid/pybind/pybind.cc | 1 - python/paddle/fluid/parallel_executor.py | 79 +++++++++---------- 8 files changed, 50 insertions(+), 68 deletions(-) diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 779a9ed5236..389366a8a98 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -131,9 +131,7 @@ std::shared_ptr BuildStrategy::CreatePassesFromStrategy( std::unique_ptr BuildStrategy::Apply( const ProgramDesc &main_program, const std::vector &places, - const std::string &loss_var_name, - const std::unordered_set ¶m_names, - const std::vector &local_scopes, + const std::string &loss_var_name, const std::vector &local_scopes, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const { #else @@ -149,9 +147,6 @@ std::unique_ptr BuildStrategy::Apply( pass->SetNotOwned>("places", &places); pass->Erase("loss_var_name"); pass->SetNotOwned("loss_var_name", &loss_var_name); - pass->Erase("params"); - pass->SetNotOwned>("params", - ¶m_names); pass->Erase("local_scopes"); pass->SetNotOwned>("local_scopes", &local_scopes); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 29396501dc0..11db184cb4e 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -106,16 +106,15 @@ struct BuildStrategy { // Apply the passes built by the pass_builder_. The passes will be // applied to the Program and output an ir::Graph. - std::unique_ptr Apply( - const ProgramDesc &main_program, - const std::vector &places, - const std::string &loss_var_name, - const std::unordered_set ¶m_names, - const std::vector &local_scopes, + std::unique_ptr Apply(const ProgramDesc &main_program, + const std::vector &places, + const std::string &loss_var_name, + const std::vector &local_scopes, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const; + const bool use_cuda, + platform::NCCLContextMap *nccl_ctxs) const; #else - const bool use_cuda) const; + const bool use_cuda) const; #endif private: diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index f216aad9d9f..7e320a08942 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -130,7 +130,6 @@ void AddOutputToLeafOps(ir::Graph *graph) { static const char kLossVarName[] = "loss_var_name"; static const char kPlaces[] = "places"; -static const char kParams[] = "params"; static const char kLocalScopes[] = "local_scopes"; static const char kStrategy[] = "strategy"; static const char kNumTrainers[] = "num_trainers"; @@ -147,9 +146,6 @@ void MultiDevSSAGraphBuilder::Init() const { nccl_ctxs_ = &Get("nccl_ctxs"); #endif - for (auto &p : Get>(kParams)) { - grad_names_.insert(GradVarName(p)); - } balance_vars_.resize(places_.size(), 0); if (strategy_.enable_data_balance_ && places_.size() == 1) { LOG(WARNING) << "It is no need to enable data balance when there is only " @@ -898,7 +894,6 @@ REGISTER_PASS(multi_devices_pass, paddle::framework::details::MultiDevSSAGraphBuilder) .RequirePassAttr(paddle::framework::details::kLossVarName) .RequirePassAttr(paddle::framework::details::kPlaces) - .RequirePassAttr(paddle::framework::details::kParams) .RequirePassAttr(paddle::framework::details::kLocalScopes) .RequirePassAttr(paddle::framework::details::kStrategy) .RequirePassAttr(paddle::framework::details::kNumTrainers); diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 1428fe83059..5736102ddc1 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -103,7 +103,6 @@ class MultiDevSSAGraphBuilder : public ir::Pass { mutable std::string loss_var_name_; mutable std::vector places_; mutable std::vector local_scopes_; - mutable std::unordered_set grad_names_; mutable BuildStrategy strategy_; mutable std::unordered_map all_vars_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 7e3fe02eaf5..a921f469f5e 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -190,7 +190,6 @@ std::vector &ParallelExecutor::GetLocalScopes() { ParallelExecutor::ParallelExecutor( const std::vector &places, - const std::unordered_set ¶ms, const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, @@ -209,7 +208,7 @@ ParallelExecutor::ParallelExecutor( "the number of places must be greater than 1."); } - // Step 1. Bcast the params to devs. + // Step 1. Bcast the bcast_vars to devs. // Create local scopes if (local_scopes.empty()) { member_->own_local_scope_ = true; @@ -249,12 +248,12 @@ ParallelExecutor::ParallelExecutor( // ncclOp #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, params, - member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); + main_program, member_->places_, loss_var_name, member_->local_scopes_, + member_->use_cuda_, member_->nccl_ctxs_.get()); #else std::unique_ptr graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, - params, member_->local_scopes_, member_->use_cuda_); + member_->local_scopes_, member_->use_cuda_); #endif auto max_memory_size = GetEagerDeletionThreshold(); if (max_memory_size >= 0) { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 1fc17a0d64d..5f6c2159aa2 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -41,7 +41,6 @@ class ParallelExecutor { public: explicit ParallelExecutor(const std::vector &places, - const std::unordered_set ¶ms, const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 737ae2dd9c3..f8a5c9deb06 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -977,7 +977,6 @@ All parameter, weight, gradient are variables in Paddle. cannot be updated after being finalized.)DOC"); pe.def(py::init &, - const std::unordered_set &, const std::unordered_set &, const ProgramDesc &, const std::string &, Scope *, std::vector &, const ExecutionStrategy &, const BuildStrategy &, size_t, diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index c54c3963a15..74cf76da951 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -92,35 +92,27 @@ class ParallelExecutor(object): num_trainers=1, trainer_id=0, scope=None): + # step1: get places, the places are used in run too. self._places = [] - self._act_places = [] if use_cuda: - gpus = [] gpus_env = os.getenv("FLAGS_selected_gpus") if gpus_env: gpus = [int(s) for s in gpus_env.split(",")] else: - for i in six.moves.range(core.get_cuda_device_count()): - gpus.append(i) - for i in gpus: - p = core.Place() - self._act_places.append(core.CUDAPlace(i)) - p.set_place(self._act_places[-1]) - self._places.append(p) + gpus = [ + i for i in six.moves.range(core.get_cuda_device_count()) + ] + self._places = [core.CUDAPlace(i) for i in gpus] else: cpu_num = int( os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - for i in six.moves.range(cpu_num): - p = core.Place() - self._act_places.append(core.CPUPlace()) - p.set_place(self._act_places[-1]) - self._places.append(p) + self._places = [core.CPUPlace() for _ in six.moves.range(cpu_num)] assert self._places, "no place for execution" + # step2: init exec_strategy if exec_strategy is None: exec_strategy = ExecutionStrategy() exec_strategy.use_cuda = use_cuda - if exec_strategy.num_threads == 0: if use_cuda: # Experiments on se-resnext shows that too many threads hurt @@ -131,49 +123,54 @@ class ParallelExecutor(object): os.environ.get('CPU_NUM', multiprocessing.cpu_count())) exec_strategy.num_threads = cpu_num * 2 + # step3: init build_strategy if build_strategy is None: build_strategy = BuildStrategy() - build_strategy.num_trainers = num_trainers build_strategy.trainer_id = trainer_id - main = main_program - main = main if main else framework.default_main_program() + # step4: get main_program, scope, local_scopes + main = main_program if main_program \ + else framework.default_main_program() + scope = scope if scope is not None else executor.global_scope() + + if share_vars_from and not isinstance(share_vars_from, + ParallelExecutor): + raise TypeError("share_vars_from must be ParallelExecutor.") + + local_scopes = share_vars_from.executor.local_scopes()\ + if share_vars_from else [] + # step5: check trainers_endpoints, it is used for distribution. trainers_endpoints = main._trainers_endpoints if num_trainers > 1 and trainers_endpoints: assert num_trainers == len( trainers_endpoints), "num_trainers == len(end_points)" build_strategy.trainers_endpoints = trainers_endpoints - if scope == None: - scope = executor.global_scope() - - if share_vars_from and not isinstance(share_vars_from, - ParallelExecutor): - raise TypeError("share_vars_from must be ParallelExecutor.") - - local_scopes = share_vars_from.executor.local_scopes( - ) if share_vars_from else [] - - self.persistable_vars = [ - v.name for v in [ + # step5: get persistable_vars, parameter_vars, places. persistable_vars + # need be broadcast to other local_scope. + persistable_vars = set([ + cpt.to_text(v.name) for v in [ var for var in main.list_vars() if var.persistable and var.type != core.VarDesc.VarType.RAW ] - ] + ]) + + def place_obj(place): + p = core.Place() + p.set_place(place) + return p + places = list(map(place_obj, self._places)) + + # step6: init ParallelExecutor self.executor = core.ParallelExecutor( - self._places, - set([ - cpt.to_text(p.name) - for p in main.global_block().iter_parameters() - if not p.stop_gradient - ]), - set(cpt.to_text(var) for var in self.persistable_vars), main.desc, + places, persistable_vars, main.desc, cpt.to_text(loss_name) if loss_name else six.u(''), scope, local_scopes, exec_strategy, build_strategy, num_trainers, trainer_id) + self.scope = scope def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True): @@ -261,7 +258,7 @@ class ParallelExecutor(object): self.executor.feed_and_split_tensor_into_local_scopes( feed_tensor_dict) elif isinstance(feed, list) or isinstance(feed, tuple): - if len(feed) != len(self._act_places): + if len(feed) != len(self._places): raise ValueError( "Feed a list of tensor, the list should be the same size as places" ) @@ -277,7 +274,7 @@ class ParallelExecutor(object): tensor = each[feed_name] if not isinstance(tensor, core.LoDTensor): tmp = core.LoDTensor() - tmp.set(tensor, self._act_places[i]) + tmp.set(tensor, self._places[i]) tensor = tmp res_dict[feed_name] = tensor res.append(res_dict) @@ -294,4 +291,4 @@ class ParallelExecutor(object): @property def device_count(self): - return len(self._act_places) + return len(self._places) -- GitLab