提交 a2241734 编写于 作者: S sandyhouse

update

上级 aac8303d
...@@ -3874,6 +3874,8 @@ class PipelineOptimizer(object): ...@@ -3874,6 +3874,8 @@ class PipelineOptimizer(object):
""" """
prev_op = [] prev_op = []
for op in ops: for op in ops:
if op.type == 'c_send' or op.type == 'c_recv':
continue
if op == cur_op: if op == cur_op:
break break
for out_var_name in op.output_arg_names: for out_var_name in op.output_arg_names:
...@@ -4089,7 +4091,7 @@ class PipelineOptimizer(object): ...@@ -4089,7 +4091,7 @@ class PipelineOptimizer(object):
assert sorted_device_specs == device_specs assert sorted_device_specs == device_specs
return device_specs return device_specs
def _insert_sendrecv_ops_for_boundaries(self, block, origin_block): def _insert_sendrecv_ops_for_boundaries(self, block):
""" """
Insert a pair of send and recv ops for every two Insert a pair of send and recv ops for every two
consecutive ops on different devices. consecutive ops on different devices.
...@@ -4100,7 +4102,7 @@ class PipelineOptimizer(object): ...@@ -4100,7 +4102,7 @@ class PipelineOptimizer(object):
# avoiding multiple send and recv ops. # avoiding multiple send and recv ops.
var_devspec = dict() var_devspec = dict()
for index, op in list(enumerate(origin_block.ops)): for index, op in enumerate(list(block.ops)):
# skips lr-related ops and vars, as we will process them later. # skips lr-related ops and vars, as we will process them later.
if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched): if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched):
continue continue
...@@ -4111,12 +4113,11 @@ class PipelineOptimizer(object): ...@@ -4111,12 +4113,11 @@ class PipelineOptimizer(object):
for var_name in op.input_arg_names: for var_name in op.input_arg_names:
# i.e., lod_tensor_blocking_queue created by DataLoader, # i.e., lod_tensor_blocking_queue created by DataLoader,
# which only exists in startup program. # which only exists in startup program.
if not var_name in origin_block.vars: continue if not var_name in block.vars: continue
var = block.var(var_name) var = block.var(var_name)
# skip data, because we will process it later # skip data, because we will process it later
if var.is_data: continue if var.is_data: continue
prev_op = self._find_real_prev_op(origin_block.ops, op, prev_op = self._find_real_prev_op(block.ops, op, var_name)
var_name)
if prev_op is None: if prev_op is None:
continue continue
prev_device_spec = prev_op.attr(self._op_device_key) prev_device_spec = prev_op.attr(self._op_device_key)
...@@ -4160,7 +4161,6 @@ class PipelineOptimizer(object): ...@@ -4160,7 +4161,6 @@ class PipelineOptimizer(object):
""" """
for param_name in self._param_device_map: for param_name in self._param_device_map:
grad_name = self._append_grad_suffix(param_name) grad_name = self._append_grad_suffix(param_name)
param_var = main_block.vars[param_name]
grad_var = main_block.vars[grad_name] grad_var = main_block.vars[grad_name]
device = self._param_device_map[param_name] device = self._param_device_map[param_name]
main_block._insert_op( main_block._insert_op(
...@@ -4173,6 +4173,7 @@ class PipelineOptimizer(object): ...@@ -4173,6 +4173,7 @@ class PipelineOptimizer(object):
'dtype': grad_var.dtype, 'dtype': grad_var.dtype,
'value': float(0), 'value': float(0),
self._op_device_key: device, self._op_device_key: device,
# a trick to run this op once per mini-batch
self._op_role_key: self._op_role.Optimize.LRSched, self._op_role_key: self._op_role.Optimize.LRSched,
}) })
...@@ -4182,7 +4183,7 @@ class PipelineOptimizer(object): ...@@ -4182,7 +4183,7 @@ class PipelineOptimizer(object):
We also scale the loss corresponding to number of micro-batches at We also scale the loss corresponding to number of micro-batches at
the same time. the same time.
""" """
for index, op in reversed(list(enumerate(block.ops))): for index, op in reversed(enumerate(list(block.ops))):
offset = index offset = index
device = op.attr(self._op_device_key) device = op.attr(self._op_device_key)
...@@ -4355,18 +4356,11 @@ class PipelineOptimizer(object): ...@@ -4355,18 +4356,11 @@ class PipelineOptimizer(object):
self._add_default_opdevice_attr(main_block) self._add_default_opdevice_attr(main_block)
device_specs = self._check_validation(main_block) device_specs = self._check_validation(main_block)
if len(device_specs) == 1:
print("Warn: Run on one device, pipeline is disabled.")
# Step3: add send and recv ops between section boundaries # Step3: add send and recv ops between section boundaries
origin_prog = main_block.program.clone(for_test=False) self._insert_sendrecv_ops_for_boundaries(main_block)
origin_main_block = origin_prog.global_block()
self._insert_sendrecv_ops_for_boundaries(main_block, origin_main_block)
# Step4: clear gradients before each mini-batch and
# accumulate gradients during backward
self._clear_gradients(main_block)
self._accumulate_gradients(main_block)
main_program = main_block.program
place_list = [] place_list = []
place_id_list = [] place_id_list = []
...@@ -4381,22 +4375,21 @@ class PipelineOptimizer(object): ...@@ -4381,22 +4375,21 @@ class PipelineOptimizer(object):
else: else:
raise ValueError("Unknown device type: %s", dev_spec) raise ValueError("Unknown device type: %s", dev_spec)
# Step5: split program into sections and add pairs of # Step4: split program into sections and add pairs of
# send and recv ops for data var. # send and recv ops for data var.
if len(place_list) <= 1: main_program = main_block.program
raise ValueError("Run on one device, do not use pipeline.")
program_list = self._split_program(main_program, device_specs) program_list = self._split_program(main_program, device_specs)
for p in program_list: for p in program_list:
self._create_vars(p["program"].block(0), main_program) self._create_vars(p["program"].block(0), main_program)
self._insert_sendrecv_for_data_var(main_block, program_list, self._insert_sendrecv_for_data_var(main_block, program_list,
startup_program, device_specs) startup_program, device_specs)
# Step6: Special Case: process persistable vars that exist in # Step5: Special Case: process persistable vars that exist in
# multiple sections # multiple sections
self._process_persistable_vars_in_multi_sections( self._process_persistable_vars_in_multi_sections(
main_program, startup_program, program_list) main_program, startup_program, program_list)
# Step7: Add sub blocks for section programs # Step6: Add sub blocks for section programs
self._add_sub_blocks(main_block, program_list) self._add_sub_blocks(main_block, program_list)
assert (main_program._pipeline_opt and assert (main_program._pipeline_opt and
...@@ -4404,7 +4397,8 @@ class PipelineOptimizer(object): ...@@ -4404,7 +4397,8 @@ class PipelineOptimizer(object):
'local_rank' in main_program._pipeline_opt), \ 'local_rank' in main_program._pipeline_opt), \
"You must use pipeline with fleet" "You must use pipeline with fleet"
local_rank = main_program._pipeline_opt['local_rank'] local_rank = main_program._pipeline_opt['local_rank']
# Step8: Split startup program
# Step7: Split startup program
startup_program = self._split_startup_program( startup_program = self._split_startup_program(
startup_program, program_list[local_rank]['program']) startup_program, program_list[local_rank]['program'])
with open("startup_prog_%d" % local_rank, 'w') as f: with open("startup_prog_%d" % local_rank, 'w') as f:
...@@ -4412,6 +4406,13 @@ class PipelineOptimizer(object): ...@@ -4412,6 +4406,13 @@ class PipelineOptimizer(object):
with open("main_prog_%d" % local_rank, 'w') as f: with open("main_prog_%d" % local_rank, 'w') as f:
f.writelines(str(program_list[local_rank]['program'])) f.writelines(str(program_list[local_rank]['program']))
# Step8: clear gradients before each mini-batch and
# accumulate gradients during backward
self._clear_gradients(program_list[local_rank]['program'].global_block(
))
self._accumulate_gradients(program_list[local_rank]['program']
.global_block())
main_program._pipeline_opt = { main_program._pipeline_opt = {
"trainer": "PipelineTrainer", "trainer": "PipelineTrainer",
"device_worker": "Section", "device_worker": "Section",
...@@ -4421,6 +4422,7 @@ class PipelineOptimizer(object): ...@@ -4421,6 +4422,7 @@ class PipelineOptimizer(object):
"sync_steps": -1, "sync_steps": -1,
"num_microbatches": self._num_microbatches, "num_microbatches": self._num_microbatches,
"start_cpu_core_id": self._start_cpu_core_id, "start_cpu_core_id": self._start_cpu_core_id,
"startup_program": startup_program
} }
return optimize_ops, params_grads, program_list return optimize_ops, params_grads, program_list
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册