diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index ec46425dd830180b41256c595919a27446e5a0f3..3600deb0f184604f72d27992037caa8d7af3ac80 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -3874,6 +3874,8 @@ class PipelineOptimizer(object): """ prev_op = [] for op in ops: + if op.type == 'c_send' or op.type == 'c_recv': + continue if op == cur_op: break for out_var_name in op.output_arg_names: @@ -4089,7 +4091,7 @@ class PipelineOptimizer(object): assert sorted_device_specs == device_specs return device_specs - def _insert_sendrecv_ops_for_boundaries(self, block, origin_block): + def _insert_sendrecv_ops_for_boundaries(self, block): """ Insert a pair of send and recv ops for every two consecutive ops on different devices. @@ -4100,7 +4102,7 @@ class PipelineOptimizer(object): # avoiding multiple send and recv ops. var_devspec = dict() - for index, op in list(enumerate(origin_block.ops)): + for index, op in enumerate(list(block.ops)): # skips lr-related ops and vars, as we will process them later. if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched): continue @@ -4111,12 +4113,11 @@ class PipelineOptimizer(object): for var_name in op.input_arg_names: # i.e., lod_tensor_blocking_queue created by DataLoader, # which only exists in startup program. - if not var_name in origin_block.vars: continue + if not var_name in block.vars: continue var = block.var(var_name) # skip data, because we will process it later if var.is_data: continue - prev_op = self._find_real_prev_op(origin_block.ops, op, - var_name) + prev_op = self._find_real_prev_op(block.ops, op, var_name) if prev_op is None: continue prev_device_spec = prev_op.attr(self._op_device_key) @@ -4160,7 +4161,6 @@ class PipelineOptimizer(object): """ for param_name in self._param_device_map: grad_name = self._append_grad_suffix(param_name) - param_var = main_block.vars[param_name] grad_var = main_block.vars[grad_name] device = self._param_device_map[param_name] main_block._insert_op( @@ -4173,6 +4173,7 @@ class PipelineOptimizer(object): 'dtype': grad_var.dtype, 'value': float(0), self._op_device_key: device, + # a trick to run this op once per mini-batch self._op_role_key: self._op_role.Optimize.LRSched, }) @@ -4182,7 +4183,7 @@ class PipelineOptimizer(object): We also scale the loss corresponding to number of micro-batches at the same time. """ - for index, op in reversed(list(enumerate(block.ops))): + for index, op in reversed(enumerate(list(block.ops))): offset = index device = op.attr(self._op_device_key) @@ -4355,18 +4356,11 @@ class PipelineOptimizer(object): self._add_default_opdevice_attr(main_block) device_specs = self._check_validation(main_block) + if len(device_specs) == 1: + print("Warn: Run on one device, pipeline is disabled.") # Step3: add send and recv ops between section boundaries - origin_prog = main_block.program.clone(for_test=False) - origin_main_block = origin_prog.global_block() - self._insert_sendrecv_ops_for_boundaries(main_block, origin_main_block) - - # Step4: clear gradients before each mini-batch and - # accumulate gradients during backward - self._clear_gradients(main_block) - self._accumulate_gradients(main_block) - - main_program = main_block.program + self._insert_sendrecv_ops_for_boundaries(main_block) place_list = [] place_id_list = [] @@ -4381,22 +4375,21 @@ class PipelineOptimizer(object): else: raise ValueError("Unknown device type: %s", dev_spec) - # Step5: split program into sections and add pairs of + # Step4: split program into sections and add pairs of # send and recv ops for data var. - if len(place_list) <= 1: - raise ValueError("Run on one device, do not use pipeline.") + main_program = main_block.program program_list = self._split_program(main_program, device_specs) for p in program_list: self._create_vars(p["program"].block(0), main_program) self._insert_sendrecv_for_data_var(main_block, program_list, startup_program, device_specs) - # Step6: Special Case: process persistable vars that exist in + # Step5: Special Case: process persistable vars that exist in # multiple sections self._process_persistable_vars_in_multi_sections( main_program, startup_program, program_list) - # Step7: Add sub blocks for section programs + # Step6: Add sub blocks for section programs self._add_sub_blocks(main_block, program_list) assert (main_program._pipeline_opt and @@ -4404,7 +4397,8 @@ class PipelineOptimizer(object): 'local_rank' in main_program._pipeline_opt), \ "You must use pipeline with fleet" local_rank = main_program._pipeline_opt['local_rank'] - # Step8: Split startup program + + # Step7: Split startup program startup_program = self._split_startup_program( startup_program, program_list[local_rank]['program']) with open("startup_prog_%d" % local_rank, 'w') as f: @@ -4412,6 +4406,13 @@ class PipelineOptimizer(object): with open("main_prog_%d" % local_rank, 'w') as f: f.writelines(str(program_list[local_rank]['program'])) + # Step8: clear gradients before each mini-batch and + # accumulate gradients during backward + self._clear_gradients(program_list[local_rank]['program'].global_block( + )) + self._accumulate_gradients(program_list[local_rank]['program'] + .global_block()) + main_program._pipeline_opt = { "trainer": "PipelineTrainer", "device_worker": "Section", @@ -4421,6 +4422,7 @@ class PipelineOptimizer(object): "sync_steps": -1, "num_microbatches": self._num_microbatches, "start_cpu_core_id": self._start_cpu_core_id, + "startup_program": startup_program } return optimize_ops, params_grads, program_list