提交 8c895085 编写于 作者: S sandyhouse

update, test=develop

上级 920806db
...@@ -123,7 +123,8 @@ def _insert_cast_op(block, op, idx, src_dtype, dest_dtype): ...@@ -123,7 +123,8 @@ def _insert_cast_op(block, op, idx, src_dtype, dest_dtype):
outputs={"Out": out_var}, outputs={"Out": out_var},
attrs={ attrs={
"in_dtype": in_var.dtype, "in_dtype": in_var.dtype,
"out_dtype": out_var.dtype "out_dtype": out_var.dtype,
"op_device": op.attr("op_device")
}) })
num_cast_ops += 1 num_cast_ops += 1
_rename_arg(op, in_var.name, out_var.name) _rename_arg(op, in_var.name, out_var.name)
...@@ -171,8 +172,11 @@ def _insert_cast_post_op(block, op, idx, src_dtype, dest_dtype, target_name, ...@@ -171,8 +172,11 @@ def _insert_cast_post_op(block, op, idx, src_dtype, dest_dtype, target_name,
type="cast", type="cast",
inputs={"X": target_var}, inputs={"X": target_var},
outputs={"Out": cast_var}, outputs={"Out": cast_var},
attrs={"in_dtype": target_var.dtype, attrs={
"out_dtype": cast_var.dtype}) "in_dtype": target_var.dtype,
"out_dtype": cast_var.dtype,
"op_device": op.attr("op_device")
})
num_cast_ops += 1 num_cast_ops += 1
op_var_rename_map[block.idx][target_var.name] = cast_var.name op_var_rename_map[block.idx][target_var.name] = cast_var.name
......
...@@ -19,6 +19,7 @@ import six ...@@ -19,6 +19,7 @@ import six
import os import os
import logging import logging
from collections import defaultdict from collections import defaultdict
import time
import paddle import paddle
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
...@@ -3759,15 +3760,21 @@ class PipelineOptimizer(object): ...@@ -3759,15 +3760,21 @@ class PipelineOptimizer(object):
def __init__(self, optimizer, num_microbatches=1, start_cpu_core_id=0): def __init__(self, optimizer, num_microbatches=1, start_cpu_core_id=0):
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
raise Exception("In dygraph, don't support PipelineOptimizer.") raise Exception("In dygraph, don't support PipelineOptimizer.")
if not isinstance(optimizer, Optimizer) and not isinstance( supported_opt_types = (Optimizer, paddle.fluid.contrib.mixed_precision.
optimizer, paddle.optimizer.Optimizer) and not isinstance( decorator.OptimizerWithMixedPrecision)
optimizer, paddle.fluid.contrib.mixed_precision.decorator. if not isinstance(optimizer, supported_opt_types):
OptimizerWithMixedPrecision):
raise ValueError("The 'optimizer' parameter for " raise ValueError("The 'optimizer' parameter for "
"PipelineOptimizer must be an instance of " "PipelineOptimizer must be an instance of one of "
"Optimizer, but the given type is {}.".format( "{}, but the type is {}.".format(
type(optimizer))) supported_opt_types, type(optimizer)))
self._optimizer = optimizer self._optimizer = optimizer
# Get the original optimizer defined by users, such as SGD
self._origin_optimizer = self._optimizer
while hasattr(self._origin_optimizer, "inner_opt"):
self._origin_optimizer = self._origin_optimizer.inner_opt
assert num_microbatches >= 1, ( assert num_microbatches >= 1, (
"num_microbatches must be a positive value.") "num_microbatches must be a positive value.")
self._num_microbatches = num_microbatches self._num_microbatches = num_microbatches
...@@ -3783,50 +3790,141 @@ class PipelineOptimizer(object): ...@@ -3783,50 +3790,141 @@ class PipelineOptimizer(object):
self._param_device_map = None self._param_device_map = None
def _create_vars(self, block, ori_block): def _create_vars(self, block, ori_block):
# Create vars for block, copied from main_program's global block # Create vars for block, copied from ori_block
used_var_set = set() used_var_set = set()
for op_idx in range(block.desc.op_size()): for op_idx in range(block.desc.op_size()):
op_desc = block.desc.op(op_idx) # Whether to insert allreduce_sum or allreduce_max op?
vars = op_desc.input_arg_names() + op_desc.output_arg_names() # For amp and global gradient clip strategies, we should
# get the global infomation, so allreduce op is needed.
should_insert = False
op = block.ops[op_idx]
# For op process vars on all devices, remove its input
# vars not in this block
reserved_x = []
if op.type == 'reduce_any' and self._is_optimize_op(op):
should_insert = True
if op.type == 'concat' and self._is_optimize_op(op):
for input_name in op.desc.input("X"):
if block._find_var_recursive(input_name):
reserved_x.append(input_name)
op.desc.set_input('X', reserved_x)
print('reserved_x:', reserved_x)
if op.type == 'update_loss_scaling':
for input_name in op.desc.input("X"):
if block._find_var_recursive(input_name):
reserved_x.append(input_name)
op.desc.set_input('X', reserved_x)
op.desc.set_output('Out', reserved_x)
if op.type == 'sum' and self._is_gradient_clip_op(op):
for input_name in op.desc.input("X"):
if block._find_var_recursive(input_name):
reserved_x.append(input_name)
op.desc.set_input('X', reserved_x)
should_insert = True
vars = op.desc.input_arg_names() + op.desc.output_arg_names()
for var in vars: for var in vars:
# a var whose name contains "blocking_queue" # a var whose name contains "blocking_queue"
# only exists in startup program # only exists in startup program
if var in used_var_set or "_blocking_queue" in var: if var in used_var_set or "_blocking_queue" in var: continue
continue
used_var_set.add(var) used_var_set.add(var)
if block._find_var_recursive(str(var)): continue if block._find_var_recursive(str(var)): continue
source_var = ori_block._var_recursive(str(var)) source_var = ori_block._var_recursive(str(var))
if source_var.type == core.VarDesc.VarType.READER: if source_var.type == core.VarDesc.VarType.READER:
block.create_var( dest_var = block.create_var(
name=var, name=var,
type=core.VarDesc.VarType.READER, type=core.VarDesc.VarType.READER,
persistable=source_var.persistable) persistable=source_var.persistable)
else: else:
block._clone_variable(source_var, False) dest_var = block._clone_variable(source_var, False)
dest_var.stop_gradient = source_var.stop_gradient
if not should_insert: continue
out_name = op.desc.output_arg_names()[0]
out_var = block.var(out_name)
offset = 0
if op.type == "reduce_any":
# cast the bool var to int32 to use allreduce op
temp_var_name = unique_name.generate(out_name + "_cast_int32")
temp_var = block.create_var(
name=temp_var_name, shape=[1], dtype="int32")
block._insert_op(
op_idx + 1 + offset,
type='cast',
inputs={'X': out_var},
outputs={'Out': temp_var},
attrs={
'in_dtype': out_var.dtype,
'out_dtype': temp_var.dtype,
self._op_role_key:
core.op_proto_and_checker_maker.OpRole.Optimize
})
offset += 1
# block._insert_op(
# op_idx + 1 + offset,
# type='c_sync_calc_stream',
# inputs={'X': temp_var if op.type == "reduce_any" else out_var},
# outputs={
# 'Out': temp_var if op.type == "reduce_any" else out_var
# },
# attrs={
# OP_ROLE_KEY:
# core.op_proto_and_checker_maker.OpRole.Optimize,
# })
# offset += 1
block._insert_op(
op_idx + 1 + offset,
type='c_allreduce_max'
if op.type == "reduce_any" else 'c_allreduce_sum',
inputs={'X': temp_var if op.type == "reduce_any" else out_var},
outputs={
'Out': temp_var if op.type == "reduce_any" else out_var
},
attrs={
'ring_id': self.ring_id,
self._op_role_key:
core.op_proto_and_checker_maker.OpRole.Optimize,
'use_calc_stream': True
})
offset += 1
# block._insert_op(
# # op_idx + 1 + extra_index,
# op_idx + 1 + offset,
# type='c_sync_comm_stream',
# inputs={'X': temp_var if op.type == "reduce_any" else out_var},
# outputs={
# 'Out': temp_var if op.type == "reduce_any" else out_var
# },
# attrs={
# 'ring_id': self.ring_id,
# OP_ROLE_KEY:
# core.op_proto_and_checker_maker.OpRole.Optimize,
# })
# offset += 1
if op.type == "reduce_any":
block._insert_op(
op_idx + 1 + offset,
type='cast',
inputs={'X': temp_var},
outputs={'Out': out_var},
attrs={
'in_dtype': temp_var.dtype,
'out_dtype': out_var.dtype,
self._op_role_key:
core.op_proto_and_checker_maker.OpRole.Optimize
})
def _is_loss_grad_op(self, op): def _is_loss_grad_op(self, op):
if self._op_role_key not in op.attr_names: assert self._op_role_key in op.attr_names
return False op_role = int(op.attr(self._op_role_key))
op_role = int(op.all_attrs()[self._op_role_key])
return op_role & int(self._op_role.Backward) and op_role & int( return op_role & int(self._op_role.Backward) and op_role & int(
self._op_role.Loss) self._op_role.Loss)
def _is_backward_op(self, op):
return self._op_role_key in op.attr_names and int(op.all_attrs()[
self._op_role_key]) & int(self._op_role.Backward)
def _is_optimize_op(self, op):
return self._op_role_key in op.attr_names and int(op.all_attrs()[
self._op_role_key]) & int(self._op_role.Optimize)
def _is_update_op(self, op):
return 'Param' in op.input_names and 'Grad' in op.input_names and (
"LearningRate" in op.input_names)
def _split_program(self, main_program, devices): def _split_program(self, main_program, devices):
""" """
Split a program into sections according to devices that ops run on. Split a program into sections according to devices that ops run on.
The ops of the role LRSched are copied to all sections. The op whose op_device attr is "gpu:all" is copied to all sections.
Args: Args:
main_program (Program): the main program main_program (Program): the main program
...@@ -3842,27 +3940,20 @@ class PipelineOptimizer(object): ...@@ -3842,27 +3940,20 @@ class PipelineOptimizer(object):
block = main_program.block(0) block = main_program.block(0)
for op in block.ops: for op in block.ops:
device = op.attr(self._op_device_key) device = op.attr(self._op_device_key)
op_role = op.attr(self._op_role_key) # Copy ops whose op_device set to "gpu:all" to all sections.
if int(op_role) & int(self._op_role.LRSched): if device == "gpu:all":
# Copy ops of the role LRSched to all sections.
for device in device_program_map.keys():
program = device_program_map[device]
op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc)
# ap_op._set_attr(self._op_device_key, "")
elif op.type == "create_py_reader" or op.type == "read" or op.type == "create_double_buffer_reader":
# Copy read related ops to all section to make them exit after each epoch.
for device in device_program_map.keys(): for device in device_program_map.keys():
program = device_program_map[device] program = device_program_map[device]
op_desc = op.desc op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op() ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc) ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
else: else:
program = device_program_map[device] program = device_program_map[device]
op_desc = op.desc op_desc = op.desc
ap_op = program["program"].block(0).desc.append_op() ap_op = program["program"].block(0).desc.append_op()
ap_op.copy_from(op_desc) ap_op.copy_from(op_desc)
ap_op._set_attr(self._op_device_key, "")
for key in devices: for key in devices:
program = device_program_map[key] program = device_program_map[key]
...@@ -3921,6 +4012,11 @@ class PipelineOptimizer(object): ...@@ -3921,6 +4012,11 @@ class PipelineOptimizer(object):
var_name as output. var_name as output.
var_name (string): Variable name. var_name (string): Variable name.
""" """
# To skip the cast op added by amp which has no op_device set
if '.cast_fp32' in var_name:
var_name = var_name.replace('.cast_fp32', '')
if '.cast_fp16' in var_name:
var_name = var_name.replace('.cast_fp16', '')
post_op = [] post_op = []
before = True before = True
for op in ops: for op in ops:
...@@ -3982,9 +4078,10 @@ class PipelineOptimizer(object): ...@@ -3982,9 +4078,10 @@ class PipelineOptimizer(object):
dtype=ref_var.dtype, dtype=ref_var.dtype,
type=ref_var.type, type=ref_var.type,
lod_level=ref_var.lod_level, lod_level=ref_var.lod_level,
persistable=False, persistable=ref_var.persistable,
is_data=False, is_data=ref_var.is_data,
need_check_feed=ref_var.desc.need_check_feed()) need_check_feed=ref_var.desc.need_check_feed())
new_var.stop_gradient = ref_var.stop_gradient
return new_var return new_var
def _get_data_var_info(self, block): def _get_data_var_info(self, block):
...@@ -4046,6 +4143,7 @@ class PipelineOptimizer(object): ...@@ -4046,6 +4143,7 @@ class PipelineOptimizer(object):
self._op_role_key: self._op_role.Forward, self._op_role_key: self._op_role.Forward,
'use_calc_stream': True, 'use_calc_stream': True,
'peer': dev_index, 'peer': dev_index,
'ring_id': self.ring_id,
}) })
# Get the device that that data on # Get the device that that data on
assert device in devices assert device in devices
...@@ -4070,6 +4168,7 @@ class PipelineOptimizer(object): ...@@ -4070,6 +4168,7 @@ class PipelineOptimizer(object):
self._op_role_key: self._op_role.Forward, self._op_role_key: self._op_role.Forward,
'peer': first_dev_index, 'peer': first_dev_index,
'use_calc_stream': True, 'use_calc_stream': True,
'ring_id': self.ring_id,
}) })
def _strip_grad_suffix(self, name): def _strip_grad_suffix(self, name):
...@@ -4085,79 +4184,178 @@ class PipelineOptimizer(object): ...@@ -4085,79 +4184,178 @@ class PipelineOptimizer(object):
""" """
return name + core.grad_var_suffix() return name + core.grad_var_suffix()
def _add_opdevice_attr_for_regularization_clip(self, block): def _is_forward_op(self, op):
""" """
Add op_device attribute for regulization and clip ops. Is the op_role attribute of a op is Forward.
""" """
for op in block.ops: assert self._op_role_key in op.attr_names
# role for regularization and clip ops is optimize return int(op.attr(self._op_role_key)) == int(self._op_role.Forward)
if int(op.attr(self._op_role_key)) != int(self._op_role.Optimize):
continue
if op.has_attr(self._op_device_key) and (
op.attr(self._op_device_key) != ""):
continue
assert self._op_role_var_key in op.attr_names
op_role_var = op.all_attrs()[self._op_role_var_key]
assert len(op_role_var) == 2
param_name = op_role_var[0]
device = self._param_device_map[param_name]
op._set_attr(self._op_device_key, device)
def _add_default_opdevice_attr(self, block): def _is_backward_op(self, op):
""" """
1. Add default op_device attribute for lr-related ops. Is the op_role attribute of a op is Backward.
The default value is the one that of the first place.
2. Add default op_device attribute for sum ops added during
backward. For these ops, we set the op_device attribute
as the one of its post op, i.e, which op has the output of the
sum op as an input.
""" """
first_devcie = "" assert self._op_role_key in op.attr_names
return int(op.attr(self._op_role_key)) == int(self._op_role.Backward)
# Get the device spec of the first place. def _is_loss_op(self, op):
# device_spec: 'cpu' for cpu device and 'gpu:id' for gpu device, """
# e.g. 'gpu:0', 'gpu:1', etc. Is the op_role attribute of a op is Loss.
for op in block.ops: """
if op.has_attr(self._op_device_key) and ( assert self._op_role_key in op.attr_names
op.attr(self._op_device_key) != ""): return int(op.attr(self._op_role_key)) == int(self._op_role.Loss)
first_device = op.attr(self._op_device_key)
break
assert first_device
first_device_type = first_device.split(":")[0]
assert first_device_type == "gpu"
# set op_device attr for lr-related ops def _is_optimize_op(self, op):
lrsched_role = int(self._op_role.LRSched) """
for op in block.ops: Is the op_role attribute of a op is Optimize.
if not op.has_attr(self._op_device_key) or ( """
op.attr(self._op_device_key) == ""): assert self._op_role_key in op.attr_names
if op.type == "sum": return int(op.attr(self._op_role_key)) == int(self._op_role.Optimize)
# For sum ops that compute the sum of @RENAMED@ vars
for name in op.desc.input_arg_names(): def _is_lrsched_op(self, op):
assert '@RENAME@' in name """
assert len(op.desc.output_arg_names()) == 1 Is the op_role attribute of a op is LRSched.
out_name = op.desc.output_arg_names()[0] """
post_op = self._find_post_op(block.ops, op, out_name) assert self._op_role_key in op.attr_names
device = post_op.attr(self._op_device_key) return int(op.attr(self._op_role_key)) == int(self._op_role.LRSched)
assert device
op._set_attr(self._op_device_key, device) def _is_update_op(self, op):
continue """
Is the op updates the parameter using gradient.
"""
return 'Param' in op.input_names and 'Grad' in op.input_names and (
"LearningRate" in op.input_names)
def _get_op_device_attr(self, op):
"""
Get the op_device attribute of a op.
"""
device = op.attr(self._op_device_key) \
if op.has_attr(self._op_device_key) else None
if device:
assert device[0:3] == 'gpu', "Now, only gpu devices are " \
"supported in pipeline parallemism."
return device
def _add_op_device_attr_for_op(self, op, idx, block):
"""
Add op_device attrribute for ops that have not that attribute set.
assert op.attr(self._op_role_key) == lrsched_role, ( We use "gpu:all" to represent the op should be put on all
"Op whose op_device attr has not been set for pipeline" sub-programs, such as lr-related ops. Note that: "gpu:all"
" must be of the role LRSched.") is only used by pipeline as an indicator.
op._set_attr(self._op_device_key, first_device) """
lrsched_role = int(self._op_role.LRSched)
if op.attr(self._op_role_key) == lrsched_role:
# For LRSched ops, we should put them on all sub-programs to
# make sure each sub-program update the lr correctly
op._set_attr(self._op_device_key, "gpu:all")
elif op.type == "sum" and self._is_backward_op(op):
# For sum ops that compute the sum of @RENAMED@ vars
for name in op.desc.input_arg_names():
assert '@RENAME@' in name, \
"The op must be sum used to accumulate renamed vars."
assert len(op.desc.output_arg_names()) == 1
out_name = op.desc.output_arg_names()[0]
post_op = self._find_post_op(block.ops, op, out_name)
assert post_op.has_attr(
'op_device'), "{} has no op_device attr for var {}".format(
post_op.type, out_name)
device = post_op.attr(self._op_device_key)
assert device, "The post op must have op_device set."
op._set_attr(self._op_device_key, device)
elif (op.type == "cast" or
op.type == "scale") and self._is_backward_op(op):
prev_op = self._find_real_prev_op(block.ops, op,
op.desc.input("X")[0])
op._set_attr('op_device', prev_op.attr('op_device'))
elif self._is_loss_op(op):
# For loss * loss_scaling op added by AMP
offset = 1
while (not block.ops[idx + offset].has_attr(self._op_device_key) or
not block.ops[idx + offset].attr(self._op_device_key)):
offset += 1
# assert block.ops[idx + 1].type == "fill_constant"
# assert block.ops[idx + 2].type == "elementwise_mul_grad"
# assert block.ops[idx + 3].type == "elementwise_add_grad"
# assert block.ops[idx + 4].type == "mean_grad"
# device = block.ops[idx + 4].attr(self._op_device_key)
device = block.ops[idx + offset].attr(self._op_device_key)
assert device, "Please put you program within device_guard scope."
# op._set_attr(self._op_device_key, device)
# block.ops[idx + 1]._set_attr(self._op_device_key, device)
# block.ops[idx + 2]._set_attr(self._op_device_key, device)
# block.ops[idx + 2]._set_attr(self._op_device_key, device)
for i in range(offset):
block.ops[idx + i]._set_attr(self._op_device_key, device)
elif self._is_optimize_op(op) and op.type == "check_finite_and_unscale":
#op._set_attr(self._op_device_key, "gpu:all")
op_role_var = op.attr(self._op_role_var_key)
param_name = op_role_var[0]
device = self._param_device_map[param_name]
op._set_attr(self._op_device_key, device)
elif self._is_optimize_op(op) and op.type == "cast":
# For fp16-->fp32 cast added by AMP
grad_name = op.output('Out')
assert len(grad_name) == 1
param_name = grad_name[0].strip(core.grad_var_suffix())
device = self._param_device_map[param_name]
op._set_attr(self._op_device_key, device)
elif self._is_gradient_clip_op(op) or self._is_regularization_op(op):
# For gradient clip and regularization ops, we set their op_device
# attribute to the device where their corresponding parameters on.
assert self._op_role_var_key in op.attr_names, "gradient_clip " \
"and regularization ops must have op_role_var attribute."
op_role_var = op.attr(self._op_role_var_key)
assert len(op_role_var) == 2, "op_role_var for gradient_clip " \
"regularization ops must have two elements."
param_name = op_role_var[0]
device = self._param_device_map[param_name]
# For sum op added by global gradient clip, it must be
# put on all devices
if (op.type == 'sum' or op.type == 'sqrt' or
op.type == 'fill_constant' or
op.type == 'elementwise_max' or
op.type == 'elementwise_div'):
device = "gpu:all"
op._set_attr(self._op_device_key, device)
else:
other_known_ops = [
'update_loss_scaling', 'reduce_any', 'concat', 'sum'
]
assert op.type in other_known_ops, "For other ops without " \
"op_device set, they must be one of {}, but it " \
"is {}".format(other_known_ops, op.type)
assert self._is_optimize_op(op)
op._set_attr(self._op_device_key, "gpu:all")
def _add_op_device_attr(self, block):
"""
Add op_device attrribute for ops in block that have
not that attribute set.
"""
for idx, op in enumerate(list(block.ops)):
if (op.type == "create_py_reader" or op.type == "read" or
op.type == "create_double_buffer_reader"):
# Copy read related ops to all section to make them exit
# after each epoch.
# We use "gpu:all" to represent the op should be put on all
# sub-programs, such as lr-related ops. Note that: "gpu:all"
# is only used by pipeline as an indicator.
op._set_attr(self._op_device_key, "gpu:all")
continue
# op_device attribute has been set
if self._get_op_device_attr(op): continue
self._add_op_device_attr_for_op(op, idx, block)
def _check_validation(self, block): def _check_validation(self, block):
""" """
Check whether ops in a block are all validate (i.e., the Check whether ops in a block have the op_device attribute set.
op_device attribute has been set). Then, return all devices in order.
Then, return all device specifications in order.
""" """
device_specs = [] device_list = []
for op in block.ops: for op in block.ops:
type = op.type if not op._has_kernel(op.type):
if not op._has_kernel(type):
assert op.type == "conditional_block" and ( assert op.type == "conditional_block" and (
op.attr(self._op_role_key) == int(self._op_role.LRSched)), ( op.attr(self._op_role_key) == int(self._op_role.LRSched)), (
"Now, the only supported op without kernel is " "Now, the only supported op without kernel is "
...@@ -4165,15 +4363,16 @@ class PipelineOptimizer(object): ...@@ -4165,15 +4363,16 @@ class PipelineOptimizer(object):
assert op.has_attr(self._op_device_key), ( assert op.has_attr(self._op_device_key), (
"op ({}) has no {} attribute.".format(op.type, "op ({}) has no {} attribute.".format(op.type,
self._op_device_key)) self._op_device_key))
dev_spec = op.attr(self._op_device_key) device = op.attr(self._op_device_key)
assert dev_spec, ("op_device attribute for op " assert device, ("op_device attribute for op "
"{} has not been set.".format(op.type)) "{} has not been set.".format(op.type))
dev_type = dev_spec.split(':')[0] if device == "gpu:all": continue
dev_type = device.split(':')[0]
assert dev_type == "gpu", ("Now only gpu devices are supported " assert dev_type == "gpu", ("Now only gpu devices are supported "
"for pipeline parallelism.") "for pipeline parallelism.")
if not dev_spec in device_specs: if not device in device_list:
device_specs.append(dev_spec) device_list.append(device)
return device_specs return device_list
def _insert_sendrecv_ops_for_boundaries(self, block): def _insert_sendrecv_ops_for_boundaries(self, block):
""" """
...@@ -4182,49 +4381,44 @@ class PipelineOptimizer(object): ...@@ -4182,49 +4381,44 @@ class PipelineOptimizer(object):
""" """
extra_index = 0 extra_index = 0
# A map from var to device spec where op takes it as input, # A map from var to device where op takes it as input,
# avoiding multiple send and recv ops. # avoiding multiple send and recv ops.
var_devspec = dict() var_dev_map = dict()
for index, op in enumerate(list(block.ops)): for index, op in enumerate(list(block.ops)):
# skips lr-related ops and vars, as we will process them later. cur_device = op.attr(self._op_device_key)
if int(op.attr(self._op_role_key)) & int(self._op_role.LRSched): if cur_device == "gpu:all": continue
continue
# skips update ops and vars, as we will process them later.
if self._is_update_op(op): continue
cur_device_spec = op.attr(self._op_device_key)
for var_name in op.input_arg_names: for var_name in op.input_arg_names:
# i.e., lod_tensor_blocking_queue created by DataLoader, # i.e., lod_tensor_blocking_queue created by DataLoader,
# which only exists in startup program. # which only exists in startup program.
if not var_name in block.vars: continue # if not var_name in block.vars: continue
var = block.var(var_name) var = block.var(var_name)
# skip data, because we will process it later # skip data, because we will process it later
if var.is_data: continue if var.is_data: continue
prev_op = self._find_real_prev_op(block.ops, op, var_name) prev_op = self._find_real_prev_op(block.ops, op, var_name)
if prev_op is None: prev_device = prev_op.attr(self._op_device_key) \
continue if prev_op else None
prev_device_spec = prev_op.attr(self._op_device_key) if not prev_device or prev_device == 'gpu:all': continue
if prev_device_spec != cur_device_spec: if prev_device != cur_device:
if var_name not in var_devspec: if var_name not in var_dev_map: var_dev_map[var_name] = []
var_devspec[var_name] = [] if cur_device in var_dev_map[var_name]: continue
if cur_device_spec in var_devspec[var_name]: continue var_dev_map[var_name].append(cur_device)
var_devspec[var_name].append(cur_device_spec)
op_role = op.all_attrs()[self._op_role_key] op_role = op.all_attrs()[self._op_role_key]
var = block.vars[var_name] var = block.vars[var_name]
prev_device_index = int(prev_device_spec.split(':')[1]) prev_device_index = int(prev_device.split(':')[1])
cur_device_index = int(cur_device_spec.split(':')[1]) cur_device_index = int(cur_device.split(':')[1])
block._insert_op( block._insert_op(
index=index + extra_index, index=index + extra_index,
type='send_v2', type='send_v2',
inputs={'X': var}, inputs={'X': var},
attrs={ attrs={
self._op_device_key: prev_device_spec, self._op_device_key: prev_device,
self._op_role_key: op_role, self._op_role_key: op_role,
'use_calc_stream': True, 'use_calc_stream': True,
'peer': cur_device_index, 'peer': cur_device_index,
'ring_id': self.ring_id,
}) })
extra_index += 1 extra_index += 1
block._insert_op( block._insert_op(
...@@ -4234,23 +4428,28 @@ class PipelineOptimizer(object): ...@@ -4234,23 +4428,28 @@ class PipelineOptimizer(object):
attrs={ attrs={
'out_shape': var.shape, 'out_shape': var.shape,
'dtype': var.dtype, 'dtype': var.dtype,
self._op_device_key: cur_device_spec, self._op_device_key: cur_device,
self._op_role_key: op_role, self._op_role_key: op_role,
'use_calc_stream': True, 'use_calc_stream': True,
'peer': prev_device_index, 'peer': prev_device_index,
'ring_id': self.ring_id,
}) })
extra_index += 1 extra_index += 1
def _clear_gradients(self, main_block, dev_spec): def _clear_gradients(self, main_block, param_names):
""" """
Clear gradients at the begining of each run of a minibatch. Clear gradients at the begining of each run of a minibatch.
""" """
for param_name in self._param_device_map: # for param_name in self._param_device_map:
device = self._param_device_map[param_name] print("param_names:", param_names)
if device != dev_spec: continue for param_name in param_names:
# device = self._param_device_map[param_name]
# if device != dev_spec: continue
grad_name = self._append_grad_suffix(param_name) grad_name = self._append_grad_suffix(param_name)
if not main_block.has_var(grad_name): continue # if not main_block.has_var(grad_name): continue
grad_var = main_block.vars[grad_name] assert main_block.has_var(grad_name)
grad_var = main_block.var(grad_name)
grad_var.persistable = True
main_block._insert_op( main_block._insert_op(
index=0, index=0,
type='fill_constant', type='fill_constant',
...@@ -4260,21 +4459,20 @@ class PipelineOptimizer(object): ...@@ -4260,21 +4459,20 @@ class PipelineOptimizer(object):
'shape': grad_var.shape, 'shape': grad_var.shape,
'dtype': grad_var.dtype, 'dtype': grad_var.dtype,
'value': float(0), 'value': float(0),
self._op_device_key: device, # self._op_device_key: device,
# a trick to run this op once per mini-batch # a trick to run this op once per mini-batch
self._op_role_key: self._op_role.Optimize.LRSched, self._op_role_key: self._op_role.Optimize.LRSched,
}) })
def _accumulate_gradients(self, block): def _insert_loss_scale(self, block):
""" """
Accumulate the gradients generated in microbatch to the one in mini-batch.
We also scale the loss corresponding to number of micro-batches as well. We also scale the loss corresponding to number of micro-batches as well.
""" """
if self._num_microbatches == 1: return
for index, op in reversed(tuple(enumerate(list(block.ops)))): for index, op in reversed(tuple(enumerate(list(block.ops)))):
offset = index offset = index
device = op.attr(self._op_device_key) #device = op.attr(self._op_device_key)
# Backward pass
if self._is_loss_grad_op(op): if self._is_loss_grad_op(op):
loss_grad_var = block.vars[op.output_arg_names[0]] loss_grad_var = block.vars[op.output_arg_names[0]]
scale_factor = self._num_microbatches scale_factor = self._num_microbatches
...@@ -4285,36 +4483,151 @@ class PipelineOptimizer(object): ...@@ -4285,36 +4483,151 @@ class PipelineOptimizer(object):
outputs={'Out': loss_grad_var}, outputs={'Out': loss_grad_var},
attrs={ attrs={
'scale': 1.0 / scale_factor, 'scale': 1.0 / scale_factor,
self._op_device_key: device, #self._op_device_key: device,
self._op_role_key: self._op_role.Backward self._op_role_key: self._op_role.Backward
}) })
break break
def _rename_gradient_var_name(self, block):
for index, op in enumerate(block.ops):
if self._is_backward_op(op) and ( if self._is_backward_op(op) and (
self._op_role_var_key in op.attr_names): self._op_role_var_key in op.attr_names):
op_role_var = op.all_attrs()[self._op_role_var_key] op_role_var = op.attr(self._op_role_var_key)
if len(op_role_var) == 0: if len(op_role_var) == 0:
continue continue
assert len(op_role_var) % 2 == 0
offset = index
for i in range(0, len(op_role_var), 2): for i in range(0, len(op_role_var), 2):
grad_name = op_role_var[i + 1] grad_name = op_role_var[i + 1]
grad_var = block.vars[grad_name] grad_var = block.vars[grad_name]
new_grad_var_name = unique_name.generate(grad_name) new_grad_var_name = unique_name.generate(grad_name)
new_var = self._create_var(block, grad_var, new_var = self._create_var(block, grad_var,
new_grad_var_name) new_grad_var_name)
new_var.persistable = False
self._rename_arg(op, grad_name, new_grad_var_name) self._rename_arg(op, grad_name, new_grad_var_name)
def _accumulate_gradients(self, block):
"""
Accumulate the gradients generated in microbatch to the one in mini-batch.
"""
first_optimize_op_index = None
for index, op in reversed(tuple(enumerate(list(block.ops)))):
# device = op.attr(self._op_device_key)
if not self._is_optimize_op(op) and not first_optimize_op_index:
first_optimize_op_index = index + 1
if block.ops[
first_optimize_op_index].type == 'c_sync_comm_stream':
block.ops[first_optimize_op_index]._set_attr(
self._op_role_key, self._op_role.Backward)
first_optimize_op_index += 1
if self._is_backward_op(op) and (
self._op_role_var_key in op.attr_names):
op_role_var = op.attr(self._op_role_var_key)
if len(op_role_var) == 0:
continue
assert len(op_role_var) % 2 == 0
for i in range(0, len(op_role_var), 2):
offset = 0
param_name = op_role_var[i]
if not block.has_var(param_name): continue
# clear gradient
param_grad_name = self._append_grad_suffix(param_name)
# if not main_block.has_var(grad_name): continue
if not block.has_var(param_grad_name):
self._create_var(block, block.vars[param_name],
param_grad_name)
assert block.has_var(param_grad_name)
param_grad_var = block.var(param_grad_name)
param_grad_var.persistable = True
block._insert_op( block._insert_op(
index=offset + 1, index=first_optimize_op_index + offset,
type='sum', type='fill_constant',
inputs={'X': [grad_var, new_var]}, inputs={},
outputs={'Out': grad_var}, outputs={'Out': [param_grad_var]},
attrs={ attrs={
self._op_device_key: device, 'shape': param_grad_var.shape,
self._op_role_key: self._op_role.Backward, 'dtype': param_grad_var.dtype,
self._op_role_var_key: op_role_var 'value': float(0),
# self._op_device_key: device,
# a trick to run this op once per mini-batch
self._op_role_key: self._op_role.Optimize.LRSched,
}) })
offset += 1 offset += 1
grad_name = op_role_var[i + 1] # with _0 suffix
grad_var = block.vars[grad_name] # without _0 suffix
real_grad_name = grad_name[0:grad_name.find(
'@GRAD')] + '@GRAD'
real_grad_var = block.vars[
real_grad_name] # without _0 suffix
# new_grad_var_name = unique_name.generate(grad_name)
# new_var = self._create_var(block, grad_var,
# new_grad_var_name)
# new_var.persistable = False
# self._rename_arg(op, grad_name, new_grad_var_name)
if not 'cast_fp16' in grad_name:
block._insert_op(
index=first_optimize_op_index + offset,
type='sum',
inputs={'X': [grad_var, real_grad_var]},
outputs={'Out': real_grad_var},
attrs={
#self._op_device_key: device,
self._op_role_key: self._op_role.Backward,
#self._op_role_var_key: op_role_var
})
offset += 1
else:
grad_name = op_role_var[i + 1] # with _0 suffix
grad_var = block.vars[grad_name] # without _0 suffix
fp32_grad_var_name = param_name + core.grad_var_suffix()
fp32_grad_var = block.vars[fp32_grad_var_name]
fp32_grad_var.persistable = True
cast_grad_var_name = unique_name.generate(
fp32_grad_var_name)
cast_var = self._create_var(block, grad_var,
cast_grad_var_name)
cast_var.persistable = False
real_grad_name = grad_name[0:grad_name.find(
'@GRAD')] + '@GRAD'
real_grad_var = block.vars[
real_grad_name] # without _0 suffix
block._insert_op(
index=first_optimize_op_index + offset,
type='cast',
inputs={'X': fp32_grad_var},
outputs={'Out': cast_var},
attrs={
'in_dtype': fp32_grad_var.dtype,
'out_dtype': cast_var.dtype,
# self._op_device_key: device,
self._op_role_key: self._op_role.Backward,
# self._op_role_var_key: op_role_var
})
offset += 1
block._insert_op(
index=first_optimize_op_index + offset,
type='sum',
inputs={'X': [grad_var, cast_var]},
outputs={'Out': real_grad_var},
attrs={
# self._op_device_key: device,
self._op_role_key: self._op_role.Backward,
# self._op_role_var_key: op_role_var
})
offset += 1
block._insert_op(
index=first_optimize_op_index + offset,
type='cast',
inputs={'X': real_grad_var},
outputs={'Out': fp32_grad_var},
attrs={
'in_dtype': real_grad_var.dtype,
'out_dtype': fp32_grad_var.dtype,
# self._op_device_key: device,
self._op_role_key: self._op_role.Backward,
# self._op_role_var_key: op_role_var
})
def _add_sub_blocks(self, main_block, program_list): def _add_sub_blocks(self, main_block, program_list):
main_program = main_block.program main_program = main_block.program
...@@ -4372,7 +4685,7 @@ class PipelineOptimizer(object): ...@@ -4372,7 +4685,7 @@ class PipelineOptimizer(object):
block = prog.block(0) block = prog.block(0)
for op in block.ops: for op in block.ops:
if op.type == "recv_v2" or op.type == "create_py_reader" or \ if op.type == "recv_v2" or op.type == "create_py_reader" or \
op.type == "read": op.type == "read" or op.type == "update_loss_scaling":
continue continue
# We have processed lr related vars # We have processed lr related vars
if op.attr(self._op_role_key) == int( if op.attr(self._op_role_key) == int(
...@@ -4412,6 +4725,7 @@ class PipelineOptimizer(object): ...@@ -4412,6 +4725,7 @@ class PipelineOptimizer(object):
# microbatch # microbatch
self._op_role_key: self._op_role.LRSched, self._op_role_key: self._op_role.LRSched,
'peer': read_dev_index, 'peer': read_dev_index,
'ring_id': self.ring_id,
}) })
read_block._insert_op( read_block._insert_op(
index=0, index=0,
...@@ -4425,9 +4739,18 @@ class PipelineOptimizer(object): ...@@ -4425,9 +4739,18 @@ class PipelineOptimizer(object):
# A trick to make the role LRSched to avoid copy every # A trick to make the role LRSched to avoid copy every
# microbatch # microbatch
self._op_role_key: self._op_role.LRSched, self._op_role_key: self._op_role.LRSched,
'peer': write_dev_index 'peer': write_dev_index,
'ring_id': self.ring_id,
}) })
def _is_gradient_clip_op(self, op):
return op.desc.has_attr("op_namescope") \
and op.desc.attr("op_namescope").startswith("/gradient_clip")
def _is_regularization_op(self, op):
return op.desc.has_attr("op_namescope") \
and op.desc.attr("op_namescope").startswith("/regularization")
def minimize(self, def minimize(self,
loss, loss,
startup_program=None, startup_program=None,
...@@ -4438,17 +4761,29 @@ class PipelineOptimizer(object): ...@@ -4438,17 +4761,29 @@ class PipelineOptimizer(object):
startup_program = default_startup_program() startup_program = default_startup_program()
optimize_ops, params_grads = self._optimizer.minimize( optimize_ops, params_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set) loss, startup_program, parameter_list, no_grad_set)
self._param_device_map = self._optimizer._param_device_map self._param_device_map = self._origin_optimizer._param_device_map
assert main_block.program._pipeline_opt \
# Step1: add default op_device attribute for regulization and clip ops and 'local_rank' in main_block.program._pipeline_opt, \
self._add_opdevice_attr_for_regularization_clip(main_block) 'Please use pipeline with fleet.'
local_rank = main_block.program._pipeline_opt['local_rank']
# Step2: add default op_device attribute for ops whose op_device
# attribute have not been set yet. Then check all ops have the self.use_sharding = False
# op_device attribute. if 'use_sharding' in main_block.program._pipeline_opt:
self._add_default_opdevice_attr(main_block) self.use_sharding = main_block.program._pipeline_opt['use_sharding']
device_specs = self._check_validation(main_block) self.ring_id = 0
if 'ring_id' in main_block.program._pipeline_opt:
self.ring_id = main_block.program._pipeline_opt['ring_id']
if main_block.program._pipeline_opt['global_rank'] == 0:
with open("startup_raw", 'w') as f:
f.writelines(str(startup_program))
with open("main_raw", 'w') as f:
f.writelines(str(main_block.program))
# Step1: add default op_device attribute for ops.
self._add_op_device_attr(main_block)
device_list = self._check_validation(main_block)
def device_cmp(device1, device2): def device_cmp(device1, device2):
dev1_id = int(device1.split(':')[1]) dev1_id = int(device1.split(':')[1])
...@@ -4460,66 +4795,62 @@ class PipelineOptimizer(object): ...@@ -4460,66 +4795,62 @@ class PipelineOptimizer(object):
else: else:
return 0 return 0
sorted_device_spec = sorted(device_specs, key=cmp_to_key(device_cmp)) sorted_device_list = sorted(device_list, key=cmp_to_key(device_cmp))
assert sorted_device_spec == device_specs, ( assert sorted_device_list == device_list, (
"With pipeline " "With pipeline parallelism, you must use gpu devices one after "
"parallelism, you must use gpu devices one after another " "another in the order of their ids.")
"in the order of their ids.")
# Step3: add send and recv ops between section boundaries # Step2: add send and recv ops between section boundaries
self._insert_sendrecv_ops_for_boundaries(main_block) self._insert_sendrecv_ops_for_boundaries(main_block)
# Step4: split program into sections and add pairs of # Step3: split program into sections and add pairs of
# send and recv ops for data var. # send and recv ops for data var.
main_program = main_block.program main_program = main_block.program
program_list = self._split_program(main_program, device_specs) program_list = self._split_program(main_program, device_list)
for p in program_list: for p in program_list:
self._create_vars(p["program"].block(0), self._create_vars(p["program"].block(0), main_block)
main_program.global_block())
self._insert_sendrecv_for_data_var(main_block, program_list, self._insert_sendrecv_for_data_var(main_block, program_list,
startup_program, device_specs) startup_program, device_list)
# Step5: Special Case: process persistable vars that exist in # Step4: Special Case: process persistable vars that exist in
# multiple sections # multiple sections
self._process_persistable_vars_in_multi_sections( self._process_persistable_vars_in_multi_sections(
main_program, startup_program, program_list) main_program, startup_program, program_list)
# Step6: Add sub blocks for section programs # Step5: Add sub blocks for section programs
self._add_sub_blocks(main_block, program_list) self._add_sub_blocks(main_block, program_list)
assert (main_program._pipeline_opt and local_rank = main_program._pipeline_opt['local_rank'] % len(device_list)
isinstance(main_program._pipeline_opt, dict) and
'local_rank' in main_program._pipeline_opt), \
"You must use pipeline with fleet"
local_rank = main_program._pipeline_opt['local_rank'] % len(
device_specs)
place_list = [] place_list = []
for dev_spec in device_specs: for dev in device_list:
dev_index = dev_spec.split(":")[1] dev_index = int(dev.split(":")[1])
place_list.append(core.CUDAPlace(local_rank)) place_list.append(core.CUDAPlace(dev_index))
# Step7: Split startup program # Step6: Split startup program
new_startup_program = self._split_startup_program(startup_program, new_startup_program = self._split_startup_program(startup_program,
local_rank) local_rank)
# Step8: clear gradients before each mini-batch and
# accumulate gradients during backward
self._clear_gradients(
program_list[local_rank]['program'].global_block(),
dev_spec=device_specs[local_rank])
self._accumulate_gradients(program_list[local_rank]['program']
.global_block())
startup_program._pipeline_opt = { startup_program._pipeline_opt = {
"startup_program": new_startup_program, "startup_program": new_startup_program,
} }
real_block = program_list[local_rank]['program'].global_block()
self._insert_loss_scale(real_block)
if not self.use_sharding:
# Step7: clear gradients before each mini-batch and
# accumulate gradients during backward
param_list = []
for param, grad in params_grads:
if real_block.has_var(param): param_list.append(param)
#self._clear_gradients(real_block, param_list)
self._rename_gradient_var_name(real_block)
self._accumulate_gradients(real_block)
place_id = int(os.getenv("FLAGS_selected_gpus", "0")) place_id = int(os.getenv("FLAGS_selected_gpus", "0"))
main_program._pipeline_opt = { main_program._pipeline_opt = {
"trainer": "PipelineTrainer", "trainer": "PipelineTrainer",
"device_worker": "Section", "device_worker": "Section",
"inner_parallelism": len(device_specs), "inner_parallelism": len(device_list),
"section_program": program_list[local_rank], "section_program": program_list[local_rank],
"place": place_list[local_rank], "place": place_list[local_rank],
"place_id": place_id, "place_id": place_id,
...@@ -5487,7 +5818,7 @@ class GradientMergeOptimizer(object): ...@@ -5487,7 +5818,7 @@ class GradientMergeOptimizer(object):
def _is_the_backward_op(self, op): def _is_the_backward_op(self, op):
op_maker = core.op_proto_and_checker_maker op_maker = core.op_proto_and_checker_maker
backward = core.op_proto_and_checker_maker.OpRole.Backward backward = core.op_proto_and_checker_maker.OpRole.Bcackward
if op_maker.kOpRoleVarAttrName() in op.attr_names and \ if op_maker.kOpRoleVarAttrName() in op.attr_names and \
int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(backward): int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(backward):
return True return True
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册