From 479efeeba70530ddfb1f2d49f242e16f43675dc8 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 8 Mar 2021 13:36:26 +0800 Subject: [PATCH] update --- .../meta_optimizers/sharding_optimizer.py | 42 ++++++++++++------- python/paddle/fluid/optimizer.py | 14 ++++--- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 621e9459dca..755e390a0bf 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -206,13 +206,17 @@ class ShardingOptimizer(MetaOptimizerBase): # if self._shard.has_param(param_name): # param_list.append(param_name) #pp_optimizer._clear_gradients(main_block, param_list) - accumulated_gradient_names, first_optimize_op_index = pp_optimizer._accumulate_gradients( + accumulated_grad_names = pp_optimizer._accumulate_gradients( + main_block) + accumulated_grad_names = sorted(accumulated_grad_names) + print(accumulated_grad_names) + first_optimize_op_index = get_first_check_finite_and_unscale_op_idx( main_block) insert_reduce_ops( main_block, first_optimize_op_index, self.sharding_ring_id, - accumulated_gradient_names, + accumulated_grad_names, self._shard, OpRole.Optimize, use_calc_stream=True) @@ -466,15 +470,26 @@ class ShardingOptimizer(MetaOptimizerBase): self._main_program.global_block()) def _wait(self, ): - # only the first parallelsm group that init nccl need to be wait. - if self._as_outer_parallelism: - endpoints = self.global_group_endpoints[:] - else: - endpoints = self.sharding_group_endpoints[:] + endpoints = self.role_maker._get_trainer_endpoints() current_endpoint = endpoints[self.role_maker._worker_index()] - if self.sharding_rank == 0: + if self.role_maker._worker_index() == 0: self._collective_helper._wait(current_endpoint, endpoints) + # def _wait(self, ): + # # only the first parallelsm group that init nccl need to be wait. + # if self._as_outer_parallelism: + # endpoints = self.role_maker._get_trainer_endpoints() + # else: + # endpoints = self.sharding_group_endpoints[:] + # current_endpoint = endpoints[self.role_maker._worker_index()] + + # if self._as_outer_parallelism: + # if self.role_maker._worker_index() == 0: + # self._collective_helper._wait(current_endpoint, endpoints) + # else: + # if self.sharding_rank == 0: + # self._collective_helper._wait(current_endpoint, endpoints) + def _split_program(self, block): for op_idx, op in reversed(list(enumerate(block.ops))): if int(op.attr('op_role')) != int(OpRole.Optimize): @@ -804,10 +819,10 @@ class ShardingOptimizer(MetaOptimizerBase): def _init_comm(self): # sharding alone mode - self.sharding_ring_id = 0 - self.sharding_rank = self.global_rank - self.sharding_group_endpoints = self.endpoints[:] - self.sharding_group_size = len(self.endpoints) + # self.sharding_ring_id = 0 + # self.sharding_rank = self.global_rank + # self.sharding_group_endpoints = self.endpoints[:] + # self.sharding_group_size = len(self.endpoints) if self.hybrid_dp: assert self._as_outer_parallelism == False, "hybrid dp is conflict when using sharding as outer parallelism" @@ -828,8 +843,7 @@ class ShardingOptimizer(MetaOptimizerBase): ep for idx, ep in enumerate(self.endpoints) if (idx % self.sharding_group_size) == self.sharding_rank ] - self.global_group_endpoints = self.role_maker._get_trainer_endpoints( - )[:] + # self.global_group_endpoints = self.role_maker._get_trainer_endpoints()[:] assert self.global_word_size > self.sharding_group_size, \ "global_word_size: {} should be larger than sharding_group_size: {}".format(self.global_word_size, self.sharding_group_size) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index dc677849dce..4507635f8c4 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -4843,7 +4843,7 @@ class PipelineOptimizer(object): Accumulate the gradients generated in microbatch to the one in mini-batch. """ # the name of real grad vars that should be allreduce - accumulated_gradient_names = [] + # accumulated_gradient_names = [] first_optimize_op_index = None accumulated_grad_names = [] @@ -4875,15 +4875,16 @@ class PipelineOptimizer(object): for i in range(0, len(op_role_var), 2): offset = 0 param_name = op_role_var[i] - if not block.has_var(param_name): continue + # if not block.has_var(param_name): continue if '@BroadCast' in param_name: param_name = param_name[0:param_name.find('@BroadCast')] # clear gradient param_grad_name = self._append_grad_suffix(param_name) accumulated_grad_names.append(param_grad_name) if not block.has_var(param_grad_name): - self._create_var(block, block.vars[param_name], - param_grad_name) + self._create_var( + block, self.origin_main_block.vars[param_name], + param_grad_name) assert block.has_var(param_grad_name) param_grad_var = block.var(param_grad_name) param_grad_var.persistable = True @@ -4924,7 +4925,7 @@ class PipelineOptimizer(object): #self._op_role_var_key: op_role_var }) #offset += 1 - accumulated_gradient_names.append(real_grad_var.name) + # accumulated_gradient_names.append(param_grad_var.name) else: grad_name = op_role_var[i + 1] # with _0 suffix grad_var = block.vars[grad_name] @@ -4961,7 +4962,7 @@ class PipelineOptimizer(object): # self._op_role_var_key: op_role_var }) offset += 1 - accumulated_gradient_names.append(fp32_grad_var.name) + # accumulated_gradient_names.append(param_grad_var.name) #real_grad_name = grad_name[0:grad_name.find( # '@GRAD')] + '@GRAD' #real_grad_var = block.vars[ @@ -5150,6 +5151,7 @@ class PipelineOptimizer(object): parameter_list=None, no_grad_set=None): main_block = loss.block + self.origin_main_block = main_block if startup_program is None: startup_program = default_startup_program() optimize_ops, params_grads = self._optimizer.minimize( -- GitLab