diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index d7087c5e1c730b3ebbb86c48e5f7721c5a8bf3da..cbed021ee82161bdbbde560da3bca26c7621fafc 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -130,31 +130,29 @@ void PipelineTrainer::CopyParameters(int section_id, int microbatch_id, } } for (auto& var : global_block.AllVars()) { - bool is_grad = false; bool is_param_grad = false; size_t pos = 0; if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos) { - is_grad = true; auto prefix_name = var->Name().substr(0, pos); if (param_map.find(prefix_name) != param_map.end()) { is_param_grad = true; } } VLOG(3) << "Var name: " << var->Name(); - if ((var->Persistable() || is_grad) && microbatch_id == 0) { + if ((var->Persistable() || is_param_grad) && microbatch_id == 0) { auto* ptr = root_scope_->FindVar(var->Name()); auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); VLOG(3) << "Create persistable var " << var->Name() << " for minibatch " << section_id << ", which pointer is " << new_ptr; InitializeVariable(new_ptr, var->GetType()); - if (!var->Persistable() && !is_param_grad) { + if (is_param_grad) { continue; } const LoDTensor& root_tensor = ptr->Get(); LoDTensor* minibatch_tensor = new_ptr->GetMutable(); TensorCopy(*static_cast(&root_tensor), place, static_cast(minibatch_tensor)); - } else if (!var->Persistable() && !is_grad) { + } else if (!var->Persistable() && !is_param_grad) { auto* ptr = microbatch_scopes_[section_id][microbatch_id]->Var(var->Name()); VLOG(3) << "Create variable " << var->Name() << " for section " diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index cb5e5c815052420cfb4afa0297fb26cff0c9c4ee..39252b0e05a6b8d1613b520bc7dc4b9e2b89d689 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -3703,7 +3703,7 @@ class PipelineOptimizer(object): self._op_role_key = op_maker.kOpRoleAttrName() self._op_role_var_key = op_maker.kOpRoleVarAttrName() self._op_device_key = op_maker.kOpDeviceAttrName() - self._param_device_map = dict() + self._param_device_map = None def _create_vars(self, block, main_program): # Create vars for block, copied from main_program's global block @@ -3742,9 +3742,10 @@ class PipelineOptimizer(object): return 'Param' in op.input_names and 'Grad' in op.input_names and ( "LearningRate" in op.input_names) - def _split_program(self, main_program): + def _split_program(self, main_program, devices): """ Split a program into sections according to devices that ops run on. + The ops of the role LRSched are copied to all sections. Args: main_program (Program): the main program @@ -3752,18 +3753,27 @@ class PipelineOptimizer(object): programs = [] # Map from device to its corresponding section program info device_program_map = dict() - block = main_program.block(0) + for device in devices: + p = {'program': Program()} + device_program_map[device] = p + block = main_program.block(0) for op in block.ops: device = op.attr(self._op_device_key) - - if device not in device_program_map: - program = {"program": Program()} - device_program_map[device] = program - program = device_program_map[device] - op_desc = op.desc - ap_op = program["program"].block(0).desc.append_op() - ap_op.copy_from(op_desc) + op_role = op.attr(self._op_role_key) + if int(op_role) & int(self._op_role.LRSched): + # Copy ops of the role LRSched to all sections. + for device in device_program_map.keys(): + program = device_program_map[device] + op_desc = op.desc + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) + ap_op._set_attr(self._op_device_key, device) + else: + program = device_program_map[device] + op_desc = op.desc + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) for key in sorted(device_program_map.keys()): program = device_program_map[key] @@ -3948,18 +3958,6 @@ class PipelineOptimizer(object): """ return name + core.grad_var_suffix() - def _update_param_device_map(self, params_grads, block): - for param_grad in params_grads: - if not param_grad[0].trainable: continue - param_name = param_grad[0].name - ops = block.ops - for op in ops: - input_arg_names = op.input_arg_names - if param_name in input_arg_names: - self._param_device_map[param_name] = op.attr( - self._op_device_key) - break - def _add_opdevice_attr_for_regularization_clip(self, block): """ Add op_device attribute for regulization and clip ops. @@ -4043,6 +4041,8 @@ class PipelineOptimizer(object): "{} has not been set.".format(op.type)) if not dev_spec in device_specs: device_specs.append(dev_spec) + sorted_device_specs = sorted(device_specs) + assert sorted_device_specs == device_specs return device_specs def _insert_enq_deq_ops_for_boundaries(self, block, origin_block, @@ -4059,6 +4059,11 @@ class PipelineOptimizer(object): var_devspec = dict() for index, op in list(enumerate(origin_block.ops)): + # skips lr-related op and vars, as we will process them later. + if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched): + continue + if self._is_update_op(op): continue + cur_device_spec = op.attr(self._op_device_key) for var_name in op.input_arg_names: # i.e., lod_tensor_blocking_queue created by DataLoader, @@ -4114,51 +4119,25 @@ class PipelineOptimizer(object): }) extra_index += 1 - def _initialize_gradients(self, startup_block, main_block): + def _clear_gradients(self, main_block): """ - Initialize gradients before run. + Clear gradients at the begining of each run of a minibatch. """ for param_name in self._param_device_map: grad_name = self._append_grad_suffix(param_name) - param_var = startup_block.vars[param_name] - grad_var = self._create_var(startup_block, param_var, grad_name) - main_grad_var = self._create_var(main_block, param_var, grad_name) - grad_var.persistable = True - main_grad_var.persistable = True - startup_block.append_op( + param_var = main_block.vars[param_name] + grad_var = main_block.vars[grad_name] + device = self._param_device_map[param_name] + main_block._insert_op( + index=0, type='fill_constant', inputs={}, outputs={'Out':[grad_var]}, - attrs={ - 'shape': grad_var.shape, - 'dtype': grad_var.dtype, - 'value': float(0) - }) - - def _clear_gradients(self, block): - """ - Clear gradients after update. - """ - for index, op in reversed(list(enumerate(block.ops))): - device = op.attr(self._op_device_key) - if not self._is_update_op(op): continue - assert self._op_role_var_key in op.attr_names - op_role_var = op.all_attrs()[self._op_role_var_key] - assert len(op_role_var) == 2 - grad_name = op_role_var[1] - grad_var = block.var(grad_name) - - block.append_op( - type='fill_constant', - inputs={}, - outputs={'Out': [grad_var]}, attrs={ 'shape': grad_var.shape, 'dtype': grad_var.dtype, 'value': float(0), - 'force_cpu': False, self._op_device_key: device, - self._op_role_key: self._op_role.Optimize }) def _accumulate_gradients(self, block): @@ -4338,13 +4317,14 @@ class PipelineOptimizer(object): startup_program = default_startup_program() optimize_ops, params_grads = self._optimizer.minimize( loss, startup_program, parameter_list, no_grad_set) - self._update_param_device_map(params_grads, main_block) + self._param_device_map = self._optimizer._param_device_map # Step1: add default op_device attribute for regulization and clip ops self._add_opdevice_attr_for_regularization_clip(main_block) # Step2: add default op_device attribute for ops whose op_device - # attribute have not been set yet. + # attribute have not been set yet. Then check all ops have the + # op_device attribute. self._add_default_opdevice_attr(main_block) device_specs = self._check_validation(main_block) @@ -4356,9 +4336,9 @@ class PipelineOptimizer(object): # Step4: accumulate gradients during backward # and clear them after update - self._initialize_gradients(startup_program.global_block(), main_block) - self._accumulate_gradients(main_block) self._clear_gradients(main_block) + self._accumulate_gradients(main_block) + #self._clear_gradients(main_block) main_program = main_block.program @@ -4377,18 +4357,11 @@ class PipelineOptimizer(object): # Step5: split program into sections and add pairs of # enqueue and dequeue ops for data var. - if len(place_list) == 0: - program_list = [] - ptmp = { - "program": main_program, - "input_set": set(), - "output_set": set() - } - program_list.append(ptmp) - else: - program_list = self._split_program(main_program) - for p in program_list: - self._create_vars(p["program"].block(0), main_program) + if len(place_list) <= 1: + raise ValueError("Run on one device, do not use pipeline.") + program_list = self._split_program(main_program, device_specs) + for p in program_list: + self._create_vars(p["program"].block(0), main_program) self._insert_enq_deq_for_data_var(main_block, program_list, startup_program, device_specs)