提交 b1a23b82 编写于 作者: S sandyhouse

initialize gradient at the begining of each run instead of startup program, test=develop

上级 0360e583
......@@ -130,31 +130,29 @@ void PipelineTrainer::CopyParameters(int section_id, int microbatch_id,
}
}
for (auto& var : global_block.AllVars()) {
bool is_grad = false;
bool is_param_grad = false;
size_t pos = 0;
if ((pos = var->Name().find(kGradVarSuffix)) != std::string::npos) {
is_grad = true;
auto prefix_name = var->Name().substr(0, pos);
if (param_map.find(prefix_name) != param_map.end()) {
is_param_grad = true;
}
}
VLOG(3) << "Var name: " << var->Name();
if ((var->Persistable() || is_grad) && microbatch_id == 0) {
if ((var->Persistable() || is_param_grad) && microbatch_id == 0) {
auto* ptr = root_scope_->FindVar(var->Name());
auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name());
VLOG(3) << "Create persistable var " << var->Name() << " for minibatch "
<< section_id << ", which pointer is " << new_ptr;
InitializeVariable(new_ptr, var->GetType());
if (!var->Persistable() && !is_param_grad) {
if (is_param_grad) {
continue;
}
const LoDTensor& root_tensor = ptr->Get<LoDTensor>();
LoDTensor* minibatch_tensor = new_ptr->GetMutable<LoDTensor>();
TensorCopy(*static_cast<const Tensor*>(&root_tensor), place,
static_cast<Tensor*>(minibatch_tensor));
} else if (!var->Persistable() && !is_grad) {
} else if (!var->Persistable() && !is_param_grad) {
auto* ptr =
microbatch_scopes_[section_id][microbatch_id]->Var(var->Name());
VLOG(3) << "Create variable " << var->Name() << " for section "
......
......@@ -3703,7 +3703,7 @@ class PipelineOptimizer(object):
self._op_role_key = op_maker.kOpRoleAttrName()
self._op_role_var_key = op_maker.kOpRoleVarAttrName()
self._op_device_key = op_maker.kOpDeviceAttrName()
self._param_device_map = dict()
self._param_device_map = None
def _create_vars(self, block, main_program):
# Create vars for block, copied from main_program's global block
......@@ -3742,9 +3742,10 @@ class PipelineOptimizer(object):
return 'Param' in op.input_names and 'Grad' in op.input_names and (
"LearningRate" in op.input_names)
def _split_program(self, main_program):
def _split_program(self, main_program, devices):
"""
Split a program into sections according to devices that ops run on.
The ops of the role LRSched are copied to all sections.
Args:
main_program (Program): the main program
......@@ -3752,18 +3753,27 @@ class PipelineOptimizer(object):
programs = []
# Map from device to its corresponding section program info
device_program_map = dict()
block = main_program.block(0)
for device in devices:
p = {'program': Program()}
device_program_map[device] = p
block = main_program.block(0)
for op in block.ops:
device = op.attr(self._op_device_key)
if device not in device_program_map:
program = {"program": Program()}
device_program_map[device] = program
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
op_role = op.attr(self._op_role_key)
if int(op_role) & int(self._op_role.LRSched):
# Copy ops of the role LRSched to all sections.
for device in device_program_map.keys():
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, device)
else:
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
for key in sorted(device_program_map.keys()):
program = device_program_map[key]
......@@ -3948,18 +3958,6 @@ class PipelineOptimizer(object):
"""
return name + core.grad_var_suffix()
def _update_param_device_map(self, params_grads, block):
for param_grad in params_grads:
if not param_grad[0].trainable: continue
param_name = param_grad[0].name
ops = block.ops
for op in ops:
input_arg_names = op.input_arg_names
if param_name in input_arg_names:
self._param_device_map[param_name] = op.attr(
self._op_device_key)
break
def _add_opdevice_attr_for_regularization_clip(self, block):
"""
Add op_device attribute for regulization and clip ops.
......@@ -4043,6 +4041,8 @@ class PipelineOptimizer(object):
"{} has not been set.".format(op.type))
if not dev_spec in device_specs:
device_specs.append(dev_spec)
sorted_device_specs = sorted(device_specs)
assert sorted_device_specs == device_specs
return device_specs
def _insert_enq_deq_ops_for_boundaries(self, block, origin_block,
......@@ -4059,6 +4059,11 @@ class PipelineOptimizer(object):
var_devspec = dict()
for index, op in list(enumerate(origin_block.ops)):
# skips lr-related op and vars, as we will process them later.
if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched):
continue
if self._is_update_op(op): continue
cur_device_spec = op.attr(self._op_device_key)
for var_name in op.input_arg_names:
# i.e., lod_tensor_blocking_queue created by DataLoader,
......@@ -4114,51 +4119,25 @@ class PipelineOptimizer(object):
})
extra_index += 1
def _initialize_gradients(self, startup_block, main_block):
def _clear_gradients(self, main_block):
"""
Initialize gradients before run.
Clear gradients at the begining of each run of a minibatch.
"""
for param_name in self._param_device_map:
grad_name = self._append_grad_suffix(param_name)
param_var = startup_block.vars[param_name]
grad_var = self._create_var(startup_block, param_var, grad_name)
main_grad_var = self._create_var(main_block, param_var, grad_name)
grad_var.persistable = True
main_grad_var.persistable = True
startup_block.append_op(
param_var = main_block.vars[param_name]
grad_var = main_block.vars[grad_name]
device = self._param_device_map[param_name]
main_block._insert_op(
index=0,
type='fill_constant',
inputs={},
outputs={'Out':[grad_var]},
attrs={
'shape': grad_var.shape,
'dtype': grad_var.dtype,
'value': float(0)
})
def _clear_gradients(self, block):
"""
Clear gradients after update.
"""
for index, op in reversed(list(enumerate(block.ops))):
device = op.attr(self._op_device_key)
if not self._is_update_op(op): continue
assert self._op_role_var_key in op.attr_names
op_role_var = op.all_attrs()[self._op_role_var_key]
assert len(op_role_var) == 2
grad_name = op_role_var[1]
grad_var = block.var(grad_name)
block.append_op(
type='fill_constant',
inputs={},
outputs={'Out': [grad_var]},
attrs={
'shape': grad_var.shape,
'dtype': grad_var.dtype,
'value': float(0),
'force_cpu': False,
self._op_device_key: device,
self._op_role_key: self._op_role.Optimize
})
def _accumulate_gradients(self, block):
......@@ -4338,13 +4317,14 @@ class PipelineOptimizer(object):
startup_program = default_startup_program()
optimize_ops, params_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set)
self._update_param_device_map(params_grads, main_block)
self._param_device_map = self._optimizer._param_device_map
# Step1: add default op_device attribute for regulization and clip ops
self._add_opdevice_attr_for_regularization_clip(main_block)
# Step2: add default op_device attribute for ops whose op_device
# attribute have not been set yet.
# attribute have not been set yet. Then check all ops have the
# op_device attribute.
self._add_default_opdevice_attr(main_block)
device_specs = self._check_validation(main_block)
......@@ -4356,9 +4336,9 @@ class PipelineOptimizer(object):
# Step4: accumulate gradients during backward
# and clear them after update
self._initialize_gradients(startup_program.global_block(), main_block)
self._accumulate_gradients(main_block)
self._clear_gradients(main_block)
self._accumulate_gradients(main_block)
#self._clear_gradients(main_block)
main_program = main_block.program
......@@ -4377,18 +4357,11 @@ class PipelineOptimizer(object):
# Step5: split program into sections and add pairs of
# enqueue and dequeue ops for data var.
if len(place_list) == 0:
program_list = []
ptmp = {
"program": main_program,
"input_set": set(),
"output_set": set()
}
program_list.append(ptmp)
else:
program_list = self._split_program(main_program)
for p in program_list:
self._create_vars(p["program"].block(0), main_program)
if len(place_list) <= 1:
raise ValueError("Run on one device, do not use pipeline.")
program_list = self._split_program(main_program, device_specs)
for p in program_list:
self._create_vars(p["program"].block(0), main_program)
self._insert_enq_deq_for_data_var(main_block, program_list,
startup_program, device_specs)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册