未验证 提交 8e79b07d 编写于 作者: W Wu Yi 提交者: GitHub

Merge pull request #1925 from typhoonzero/add_allreduce_master_grad

add reduce master grad for fp16
...@@ -57,6 +57,7 @@ def parse_args(): ...@@ -57,6 +57,7 @@ def parse_args():
add_arg('model_category', str, "models", "Whether to use models_name or not, valid value:'models','models_name'" ) add_arg('model_category', str, "models", "Whether to use models_name or not, valid value:'models','models_name'" )
add_arg('fp16', bool, False, "Enable half precision training with fp16." ) add_arg('fp16', bool, False, "Enable half precision training with fp16." )
add_arg('scale_loss', float, 1.0, "Scale loss for fp16." ) add_arg('scale_loss', float, 1.0, "Scale loss for fp16." )
add_arg('reduce_master_grad', bool, False, "Whether to allreduce fp32 gradients." )
# for distributed # for distributed
add_arg('update_method', str, "local", "Can be local, pserver, nccl2.") add_arg('update_method', str, "local", "Can be local, pserver, nccl2.")
add_arg('multi_batch_repeat', int, 1, "Batch merge repeats.") add_arg('multi_batch_repeat', int, 1, "Batch merge repeats.")
...@@ -66,6 +67,7 @@ def parse_args(): ...@@ -66,6 +67,7 @@ def parse_args():
add_arg('async_mode', bool, False, "Async distributed training, only for pserver mode.") add_arg('async_mode', bool, False, "Async distributed training, only for pserver mode.")
add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.") add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.")
add_arg('skip_unbalanced_data', bool, False, "Skip data not if data not balanced on nodes.") add_arg('skip_unbalanced_data', bool, False, "Skip data not if data not balanced on nodes.")
add_arg('enable_sequential_execution', bool, False, "Skip data not if data not balanced on nodes.")
# yapf: enable # yapf: enable
args = parser.parse_args() args = parser.parse_args()
return args return args
...@@ -130,7 +132,7 @@ def build_program(is_train, main_prog, startup_prog, args): ...@@ -130,7 +132,7 @@ def build_program(is_train, main_prog, startup_prog, args):
if os.getenv("FLAGS_selected_gpus"): if os.getenv("FLAGS_selected_gpus"):
# in multi process mode, "trainer_count" will be total devices # in multi process mode, "trainer_count" will be total devices
# in the whole cluster, and we need to scale num_of nodes. # in the whole cluster, and we need to scale num_of nodes.
end_lr *= device_num_per_worker end_lr /= device_num_per_worker
total_images = args.total_images / trainer_count total_images = args.total_images / trainer_count
step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1) step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1)
...@@ -158,7 +160,8 @@ def build_program(is_train, main_prog, startup_prog, args): ...@@ -158,7 +160,8 @@ def build_program(is_train, main_prog, startup_prog, args):
if args.fp16: if args.fp16:
params_grads = optimizer.backward(avg_cost) params_grads = optimizer.backward(avg_cost)
master_params_grads = utils.create_master_params_grads( master_params_grads = utils.create_master_params_grads(
params_grads, main_prog, startup_prog, args.scale_loss) params_grads, main_prog, startup_prog, args.scale_loss,
reduce_master_grad = args.reduce_master_grad)
optimizer.apply_gradients(master_params_grads) optimizer.apply_gradients(master_params_grads)
utils.master_param_to_train_param(master_params_grads, params_grads, main_prog) utils.master_param_to_train_param(master_params_grads, params_grads, main_prog)
else: else:
...@@ -239,11 +242,15 @@ def train_parallel(args): ...@@ -239,11 +242,15 @@ def train_parallel(args):
append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat) append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat)
startup_exe.run(startup_prog) startup_exe.run(startup_prog)
if args.checkpoint:
fluid.io.load_persistables(startup_exe, args.checkpoint, main_program=train_prog)
strategy = fluid.ExecutionStrategy() strategy = fluid.ExecutionStrategy()
strategy.num_threads = args.num_threads strategy.num_threads = args.num_threads
build_strategy = fluid.BuildStrategy() build_strategy = fluid.BuildStrategy()
build_strategy.enable_inplace = False build_strategy.enable_inplace = False
build_strategy.memory_optimize = False build_strategy.memory_optimize = False
build_strategy.enable_sequential_execution = bool(args.enable_sequential_execution)
if args.reduce_strategy == "reduce": if args.reduce_strategy == "reduce":
...@@ -304,8 +311,8 @@ def train_parallel(args): ...@@ -304,8 +311,8 @@ def train_parallel(args):
if batch_id % 30 == 0: if batch_id % 30 == 0:
fetch_ret = exe.run(fetch_list) fetch_ret = exe.run(fetch_list)
fetched_data = [np.mean(np.array(d)) for d in fetch_ret] fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
print("Pass %d, batch %d, loss %s, acc1: %s, acc5: %s, avg batch time %.4f" % print("Pass [%d/%d], batch [%d/%d], loss %s, acc1: %s, acc5: %s, avg batch time %.4f" %
(pass_id, batch_id, fetched_data[0], fetched_data[1], (pass_id, args.num_epochs, batch_id, steps_per_pass, fetched_data[0], fetched_data[1],
fetched_data[2], (time.time()-start_time) / batch_id)) fetched_data[2], (time.time()-start_time) / batch_id))
else: else:
fetch_ret = exe.run([]) fetch_ret = exe.run([])
...@@ -321,8 +328,7 @@ def train_parallel(args): ...@@ -321,8 +328,7 @@ def train_parallel(args):
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples)
train_pyreader.reset() train_pyreader.reset()
if pass_id >= args.start_test_pass:
if pass_id > args.start_test_pass:
if args.multi_batch_repeat > 1: if args.multi_batch_repeat > 1:
copyback_repeat_bn_params(train_prog) copyback_repeat_bn_params(train_prog)
test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name] test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
...@@ -331,7 +337,12 @@ def train_parallel(args): ...@@ -331,7 +337,12 @@ def train_parallel(args):
# test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list) # test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list)
print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" % print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" %
(pass_id, test_ret[0], test_ret[1], test_ret[2])) (pass_id, test_ret[0], test_ret[1], test_ret[2]))
model_path = os.path.join(args.model_save_dir + '/' + args.model,
str(pass_id))
print("saving model to ", model_path)
if not os.path.isdir(model_path):
os.makedirs(model_path)
fluid.io.save_persistables(startup_exe, model_path, main_program=train_prog)
startup_exe.close() startup_exe.close()
print("total train time: ", time.time() - over_all_start) print("total train time: ", time.time() - over_all_start)
......
from __future__ import print_function from __future__ import print_function
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core
def cast_fp16_to_fp32(i, o, prog): def cast_fp16_to_fp32(i, o, prog):
prog.global_block().append_op( prog.global_block().append_op(
...@@ -43,8 +44,30 @@ def copy_to_master_param(p, block): ...@@ -43,8 +44,30 @@ def copy_to_master_param(p, block):
name=v.name + ".master") name=v.name + ".master")
return new_p return new_p
def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss):
master_params_grads = [] def _update_role_var_grad(prog, params_grads):
BACKWARD = core.op_proto_and_checker_maker.OpRole.Backward
gradname_to_paramname = dict()
for p, g in params_grads:
gradname_to_paramname[g.name] = p.name
for op in prog.global_block().ops:
role = op.attr("op_role")
if role & int(BACKWARD) and op.has_attr("op_role_var"):
# have backward bits then remove all op_role_var
op.desc.remove_attr("op_role_var")
for op in prog.global_block().ops:
if op.type == "allreduce":
allreduce_role_var = []
for input_varname in op.input_arg_names:
if input_varname in gradname_to_paramname:
allreduce_role_var.append(gradname_to_paramname[input_varname])
allreduce_role_var.append(input_varname)
print("updating role var: ", allreduce_role_var)
op._set_attr("op_role_var", allreduce_role_var)
def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss, reduce_master_grad=True):
master_params_grads = [] # master p, g on local device
params_grads_to_apply = [] # master p, g after allreduced, if reduce_master_grad is enabled
tmp_role = main_prog._current_role tmp_role = main_prog._current_role
OpRole = fluid.core.op_proto_and_checker_maker.OpRole OpRole = fluid.core.op_proto_and_checker_maker.OpRole
main_prog._current_role = OpRole.Backward main_prog._current_role = OpRole.Backward
...@@ -62,12 +85,21 @@ def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss ...@@ -62,12 +85,21 @@ def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss
scaled_g = g scaled_g = g
master_params_grads.append([p, scaled_g]) master_params_grads.append([p, scaled_g])
continue continue
master_grad = fluid.layers.cast(g, "float32") master_grad = fluid.layers.cast(g, "float32")
if scale_loss > 1: if scale_loss > 1:
master_grad = master_grad / float(scale_loss) master_grad = master_grad / float(scale_loss)
master_params_grads.append([master_param, master_grad]) master_params_grads.append([p, master_grad])
if reduce_master_grad:
reduced_master_grad = fluid.layers.collective._allreduce(master_grad)
else:
reduced_master_grad = master_grad
params_grads_to_apply.append([master_param, reduced_master_grad])
# update program op role var acording to master grads before allreduce.
_update_role_var_grad(main_prog, master_params_grads)
main_prog._current_role = tmp_role main_prog._current_role = tmp_role
return master_params_grads return params_grads_to_apply
def master_param_to_train_param(master_params_grads, params_grads, main_prog): def master_param_to_train_param(master_params_grads, params_grads, main_prog):
for idx, m_p_g in enumerate(master_params_grads): for idx, m_p_g in enumerate(master_params_grads):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册