提交 479efeeb 编写于 作者: R root 提交者: sandyhouse

update

上级 9ed5ae61
...@@ -206,13 +206,17 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -206,13 +206,17 @@ class ShardingOptimizer(MetaOptimizerBase):
# if self._shard.has_param(param_name): # if self._shard.has_param(param_name):
# param_list.append(param_name) # param_list.append(param_name)
#pp_optimizer._clear_gradients(main_block, param_list) #pp_optimizer._clear_gradients(main_block, param_list)
accumulated_gradient_names, first_optimize_op_index = pp_optimizer._accumulate_gradients( accumulated_grad_names = pp_optimizer._accumulate_gradients(
main_block)
accumulated_grad_names = sorted(accumulated_grad_names)
print(accumulated_grad_names)
first_optimize_op_index = get_first_check_finite_and_unscale_op_idx(
main_block) main_block)
insert_reduce_ops( insert_reduce_ops(
main_block, main_block,
first_optimize_op_index, first_optimize_op_index,
self.sharding_ring_id, self.sharding_ring_id,
accumulated_gradient_names, accumulated_grad_names,
self._shard, self._shard,
OpRole.Optimize, OpRole.Optimize,
use_calc_stream=True) use_calc_stream=True)
...@@ -466,15 +470,26 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -466,15 +470,26 @@ class ShardingOptimizer(MetaOptimizerBase):
self._main_program.global_block()) self._main_program.global_block())
def _wait(self, ): def _wait(self, ):
# only the first parallelsm group that init nccl need to be wait. endpoints = self.role_maker._get_trainer_endpoints()
if self._as_outer_parallelism:
endpoints = self.global_group_endpoints[:]
else:
endpoints = self.sharding_group_endpoints[:]
current_endpoint = endpoints[self.role_maker._worker_index()] current_endpoint = endpoints[self.role_maker._worker_index()]
if self.sharding_rank == 0: if self.role_maker._worker_index() == 0:
self._collective_helper._wait(current_endpoint, endpoints) self._collective_helper._wait(current_endpoint, endpoints)
# def _wait(self, ):
# # only the first parallelsm group that init nccl need to be wait.
# if self._as_outer_parallelism:
# endpoints = self.role_maker._get_trainer_endpoints()
# else:
# endpoints = self.sharding_group_endpoints[:]
# current_endpoint = endpoints[self.role_maker._worker_index()]
# if self._as_outer_parallelism:
# if self.role_maker._worker_index() == 0:
# self._collective_helper._wait(current_endpoint, endpoints)
# else:
# if self.sharding_rank == 0:
# self._collective_helper._wait(current_endpoint, endpoints)
def _split_program(self, block): def _split_program(self, block):
for op_idx, op in reversed(list(enumerate(block.ops))): for op_idx, op in reversed(list(enumerate(block.ops))):
if int(op.attr('op_role')) != int(OpRole.Optimize): if int(op.attr('op_role')) != int(OpRole.Optimize):
...@@ -804,10 +819,10 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -804,10 +819,10 @@ class ShardingOptimizer(MetaOptimizerBase):
def _init_comm(self): def _init_comm(self):
# sharding alone mode # sharding alone mode
self.sharding_ring_id = 0 # self.sharding_ring_id = 0
self.sharding_rank = self.global_rank # self.sharding_rank = self.global_rank
self.sharding_group_endpoints = self.endpoints[:] # self.sharding_group_endpoints = self.endpoints[:]
self.sharding_group_size = len(self.endpoints) # self.sharding_group_size = len(self.endpoints)
if self.hybrid_dp: if self.hybrid_dp:
assert self._as_outer_parallelism == False, "hybrid dp is conflict when using sharding as outer parallelism" assert self._as_outer_parallelism == False, "hybrid dp is conflict when using sharding as outer parallelism"
...@@ -828,8 +843,7 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -828,8 +843,7 @@ class ShardingOptimizer(MetaOptimizerBase):
ep for idx, ep in enumerate(self.endpoints) ep for idx, ep in enumerate(self.endpoints)
if (idx % self.sharding_group_size) == self.sharding_rank if (idx % self.sharding_group_size) == self.sharding_rank
] ]
self.global_group_endpoints = self.role_maker._get_trainer_endpoints( # self.global_group_endpoints = self.role_maker._get_trainer_endpoints()[:]
)[:]
assert self.global_word_size > self.sharding_group_size, \ assert self.global_word_size > self.sharding_group_size, \
"global_word_size: {} should be larger than sharding_group_size: {}".format(self.global_word_size, self.sharding_group_size) "global_word_size: {} should be larger than sharding_group_size: {}".format(self.global_word_size, self.sharding_group_size)
......
...@@ -4843,7 +4843,7 @@ class PipelineOptimizer(object): ...@@ -4843,7 +4843,7 @@ class PipelineOptimizer(object):
Accumulate the gradients generated in microbatch to the one in mini-batch. Accumulate the gradients generated in microbatch to the one in mini-batch.
""" """
# the name of real grad vars that should be allreduce # the name of real grad vars that should be allreduce
accumulated_gradient_names = [] # accumulated_gradient_names = []
first_optimize_op_index = None first_optimize_op_index = None
accumulated_grad_names = [] accumulated_grad_names = []
...@@ -4875,15 +4875,16 @@ class PipelineOptimizer(object): ...@@ -4875,15 +4875,16 @@ class PipelineOptimizer(object):
for i in range(0, len(op_role_var), 2): for i in range(0, len(op_role_var), 2):
offset = 0 offset = 0
param_name = op_role_var[i] param_name = op_role_var[i]
if not block.has_var(param_name): continue # if not block.has_var(param_name): continue
if '@BroadCast' in param_name: if '@BroadCast' in param_name:
param_name = param_name[0:param_name.find('@BroadCast')] param_name = param_name[0:param_name.find('@BroadCast')]
# clear gradient # clear gradient
param_grad_name = self._append_grad_suffix(param_name) param_grad_name = self._append_grad_suffix(param_name)
accumulated_grad_names.append(param_grad_name) accumulated_grad_names.append(param_grad_name)
if not block.has_var(param_grad_name): if not block.has_var(param_grad_name):
self._create_var(block, block.vars[param_name], self._create_var(
param_grad_name) block, self.origin_main_block.vars[param_name],
param_grad_name)
assert block.has_var(param_grad_name) assert block.has_var(param_grad_name)
param_grad_var = block.var(param_grad_name) param_grad_var = block.var(param_grad_name)
param_grad_var.persistable = True param_grad_var.persistable = True
...@@ -4924,7 +4925,7 @@ class PipelineOptimizer(object): ...@@ -4924,7 +4925,7 @@ class PipelineOptimizer(object):
#self._op_role_var_key: op_role_var #self._op_role_var_key: op_role_var
}) })
#offset += 1 #offset += 1
accumulated_gradient_names.append(real_grad_var.name) # accumulated_gradient_names.append(param_grad_var.name)
else: else:
grad_name = op_role_var[i + 1] # with _0 suffix grad_name = op_role_var[i + 1] # with _0 suffix
grad_var = block.vars[grad_name] grad_var = block.vars[grad_name]
...@@ -4961,7 +4962,7 @@ class PipelineOptimizer(object): ...@@ -4961,7 +4962,7 @@ class PipelineOptimizer(object):
# self._op_role_var_key: op_role_var # self._op_role_var_key: op_role_var
}) })
offset += 1 offset += 1
accumulated_gradient_names.append(fp32_grad_var.name) # accumulated_gradient_names.append(param_grad_var.name)
#real_grad_name = grad_name[0:grad_name.find( #real_grad_name = grad_name[0:grad_name.find(
# '@GRAD')] + '@GRAD' # '@GRAD')] + '@GRAD'
#real_grad_var = block.vars[ #real_grad_var = block.vars[
...@@ -5150,6 +5151,7 @@ class PipelineOptimizer(object): ...@@ -5150,6 +5151,7 @@ class PipelineOptimizer(object):
parameter_list=None, parameter_list=None,
no_grad_set=None): no_grad_set=None):
main_block = loss.block main_block = loss.block
self.origin_main_block = main_block
if startup_program is None: if startup_program is None:
startup_program = default_startup_program() startup_program = default_startup_program()
optimize_ops, params_grads = self._optimizer.minimize( optimize_ops, params_grads = self._optimizer.minimize(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册