提交 807faec9 编写于 作者: T typhoonzero

add reduce master grad for fp16

上级 bc95eef6
......@@ -46,7 +46,7 @@ def parse_args():
add_arg('class_dim', int, 1000, "Class number.")
add_arg('image_shape', str, "3,224,224", "input image size")
add_arg('model_save_dir', str, "output", "model save directory")
add_arg('with_mem_opt', bool, False, "Whether to use memory optimization or not.")
add_arg('with_mem_opt', bool, False, "Whether to use memory optimization or not.")
add_arg('pretrained_model', str, None, "Whether to use pretrained model.")
add_arg('checkpoint', str, None, "Whether to resume checkpoint.")
add_arg('lr', float, 0.1, "set learning rate.")
......@@ -57,6 +57,7 @@ def parse_args():
add_arg('model_category', str, "models", "Whether to use models_name or not, valid value:'models','models_name'" )
add_arg('fp16', bool, False, "Enable half precision training with fp16." )
add_arg('scale_loss', float, 1.0, "Scale loss for fp16." )
add_arg('reduce_master_grad', bool, False, "Whether to allreduce fp32 gradients." )
# for distributed
add_arg('update_method', str, "local", "Can be local, pserver, nccl2.")
add_arg('multi_batch_repeat', int, 1, "Batch merge repeats.")
......@@ -66,6 +67,7 @@ def parse_args():
add_arg('async_mode', bool, False, "Async distributed training, only for pserver mode.")
add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.")
add_arg('skip_unbalanced_data', bool, False, "Skip data not if data not balanced on nodes.")
add_arg('enable_sequential_execution', bool, False, "Skip data not if data not balanced on nodes.")
# yapf: enable
args = parser.parse_args()
return args
......@@ -130,7 +132,7 @@ def build_program(is_train, main_prog, startup_prog, args):
if os.getenv("FLAGS_selected_gpus"):
# in multi process mode, "trainer_count" will be total devices
# in the whole cluster, and we need to scale num_of nodes.
end_lr *= device_num_per_worker
end_lr /= device_num_per_worker
total_images = args.total_images / trainer_count
step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1)
......@@ -158,7 +160,8 @@ def build_program(is_train, main_prog, startup_prog, args):
if args.fp16:
params_grads = optimizer.backward(avg_cost)
master_params_grads = utils.create_master_params_grads(
params_grads, main_prog, startup_prog, args.scale_loss)
params_grads, main_prog, startup_prog, args.scale_loss,
reduce_master_grad = args.reduce_master_grad)
optimizer.apply_gradients(master_params_grads)
utils.master_param_to_train_param(master_params_grads, params_grads, main_prog)
else:
......@@ -239,11 +242,15 @@ def train_parallel(args):
append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat)
startup_exe.run(startup_prog)
if args.checkpoint:
fluid.io.load_persistables(startup_exe, args.checkpoint, main_program=train_prog)
strategy = fluid.ExecutionStrategy()
strategy.num_threads = args.num_threads
build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False
build_strategy.memory_optimize = False
build_strategy.enable_sequential_execution = bool(args.enable_sequential_execution)
if args.reduce_strategy == "reduce":
......@@ -289,6 +296,11 @@ def train_parallel(args):
# scope=fluid.global_scope().new_scope()
# )
print("Run test before head")
test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
test_ret = test_single(startup_exe, test_prog, args, test_pyreader,test_fetch_list)
print("End run test before head")
over_all_start = time.time()
fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"]
......@@ -331,6 +343,12 @@ def train_parallel(args):
# test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list)
print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" %
(pass_id, test_ret[0], test_ret[1], test_ret[2]))
model_path = os.path.join(args.model_save_dir + '/' + args.model,
str(pass_id))
print("saving model to ", model_path)
if not os.path.isdir(model_path):
os.makedirs(model_path)
fluid.io.save_persistables(startup_exe, model_path, main_program=train_prog)
startup_exe.close()
print("total train time: ", time.time() - over_all_start)
......
from __future__ import print_function
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
def cast_fp16_to_fp32(i, o, prog):
prog.global_block().append_op(
......@@ -43,8 +44,30 @@ def copy_to_master_param(p, block):
name=v.name + ".master")
return new_p
def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss):
master_params_grads = []
def _update_role_var_grad(prog, params_grads):
BACKWARD = core.op_proto_and_checker_maker.OpRole.Backward
gradname_to_paramname = dict()
for p, g in params_grads:
gradname_to_paramname[g.name] = p.name
for op in prog.global_block().ops:
role = op.attr("op_role")
if role & int(BACKWARD) and op.has_attr("op_role_var"):
# have backward bits then remove all op_role_var
op.desc._remove_attr("op_role_var")
for op in prog.global_block().ops:
if op.type == "allreduce":
allreduce_role_var = []
for input_varname in op.input_arg_names:
if input_varname in gradname_to_paramname:
allreduce_role_var.append(gradname_to_paramname[input_varname])
allreduce_role_var.append(input_varname)
print("updating role var: ", allreduce_role_var)
op._set_attr("op_role_var", allreduce_role_var)
def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss, reduce_master_grad=True):
master_params_grads = [] # master p, g on local device
params_grads_to_apply = [] # master p, g after allreduced, if reduce_master_grad is enabled
tmp_role = main_prog._current_role
OpRole = fluid.core.op_proto_and_checker_maker.OpRole
main_prog._current_role = OpRole.Backward
......@@ -62,12 +85,20 @@ def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss
scaled_g = g
master_params_grads.append([p, scaled_g])
continue
master_grad = fluid.layers.cast(g, "float32")
if scale_loss > 1:
master_grad = master_grad / float(scale_loss)
master_params_grads.append([master_param, master_grad])
master_params_grads.append([p, master_grad])
if reduce_master_grad:
reduced_master_grad = fluid.layers.collective._allreduce(master_grad)
else:
reduced_master_grad = master_grad
params_grads_to_apply.append([master_param, reduced_master_grad])
# update program op role var acording to master grads before allreduce.
_update_role_var_grad(main_prog, master_params_grads)
main_prog._current_role = tmp_role
return master_params_grads
return params_grads_to_apply
def master_param_to_train_param(master_params_grads, params_grads, main_prog):
for idx, m_p_g in enumerate(master_params_grads):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册