diff --git a/fluid/PaddleCV/image_classification/dist_train/dist_train.py b/fluid/PaddleCV/image_classification/dist_train/dist_train.py index 3fbd73e7375b2c22d4c97adcde5a1342756eb165..9d053916e0a2844170d782b65f1217c864ba27d9 100644 --- a/fluid/PaddleCV/image_classification/dist_train/dist_train.py +++ b/fluid/PaddleCV/image_classification/dist_train/dist_train.py @@ -46,7 +46,7 @@ def parse_args(): add_arg('class_dim', int, 1000, "Class number.") add_arg('image_shape', str, "3,224,224", "input image size") add_arg('model_save_dir', str, "output", "model save directory") - add_arg('with_mem_opt', bool, False, "Whether to use memory optimization or not.") + add_arg('with_mem_opt', bool, False, "Whether to use memory optimization or not.") add_arg('pretrained_model', str, None, "Whether to use pretrained model.") add_arg('checkpoint', str, None, "Whether to resume checkpoint.") add_arg('lr', float, 0.1, "set learning rate.") @@ -57,6 +57,7 @@ def parse_args(): add_arg('model_category', str, "models", "Whether to use models_name or not, valid value:'models','models_name'" ) add_arg('fp16', bool, False, "Enable half precision training with fp16." ) add_arg('scale_loss', float, 1.0, "Scale loss for fp16." ) + add_arg('reduce_master_grad', bool, False, "Whether to allreduce fp32 gradients." ) # for distributed add_arg('update_method', str, "local", "Can be local, pserver, nccl2.") add_arg('multi_batch_repeat', int, 1, "Batch merge repeats.") @@ -66,6 +67,7 @@ def parse_args(): add_arg('async_mode', bool, False, "Async distributed training, only for pserver mode.") add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.") add_arg('skip_unbalanced_data', bool, False, "Skip data not if data not balanced on nodes.") + add_arg('enable_sequential_execution', bool, False, "Skip data not if data not balanced on nodes.") # yapf: enable args = parser.parse_args() return args @@ -130,7 +132,7 @@ def build_program(is_train, main_prog, startup_prog, args): if os.getenv("FLAGS_selected_gpus"): # in multi process mode, "trainer_count" will be total devices # in the whole cluster, and we need to scale num_of nodes. - end_lr *= device_num_per_worker + end_lr /= device_num_per_worker total_images = args.total_images / trainer_count step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1) @@ -158,7 +160,8 @@ def build_program(is_train, main_prog, startup_prog, args): if args.fp16: params_grads = optimizer.backward(avg_cost) master_params_grads = utils.create_master_params_grads( - params_grads, main_prog, startup_prog, args.scale_loss) + params_grads, main_prog, startup_prog, args.scale_loss, + reduce_master_grad = args.reduce_master_grad) optimizer.apply_gradients(master_params_grads) utils.master_param_to_train_param(master_params_grads, params_grads, main_prog) else: @@ -239,11 +242,15 @@ def train_parallel(args): append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat) startup_exe.run(startup_prog) + if args.checkpoint: + fluid.io.load_persistables(startup_exe, args.checkpoint, main_program=train_prog) + strategy = fluid.ExecutionStrategy() strategy.num_threads = args.num_threads build_strategy = fluid.BuildStrategy() build_strategy.enable_inplace = False build_strategy.memory_optimize = False + build_strategy.enable_sequential_execution = bool(args.enable_sequential_execution) if args.reduce_strategy == "reduce": @@ -304,8 +311,8 @@ def train_parallel(args): if batch_id % 30 == 0: fetch_ret = exe.run(fetch_list) fetched_data = [np.mean(np.array(d)) for d in fetch_ret] - print("Pass %d, batch %d, loss %s, acc1: %s, acc5: %s, avg batch time %.4f" % - (pass_id, batch_id, fetched_data[0], fetched_data[1], + print("Pass [%d/%d], batch [%d/%d], loss %s, acc1: %s, acc5: %s, avg batch time %.4f" % + (pass_id, args.num_epochs, batch_id, steps_per_pass, fetched_data[0], fetched_data[1], fetched_data[2], (time.time()-start_time) / batch_id)) else: fetch_ret = exe.run([]) @@ -321,8 +328,7 @@ def train_parallel(args): print_train_time(start_time, time.time(), num_samples) train_pyreader.reset() - - if pass_id > args.start_test_pass: + if pass_id >= args.start_test_pass: if args.multi_batch_repeat > 1: copyback_repeat_bn_params(train_prog) test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name] @@ -331,7 +337,12 @@ def train_parallel(args): # test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list) print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" % (pass_id, test_ret[0], test_ret[1], test_ret[2])) - + model_path = os.path.join(args.model_save_dir + '/' + args.model, + str(pass_id)) + print("saving model to ", model_path) + if not os.path.isdir(model_path): + os.makedirs(model_path) + fluid.io.save_persistables(startup_exe, model_path, main_program=train_prog) startup_exe.close() print("total train time: ", time.time() - over_all_start) diff --git a/fluid/PaddleCV/image_classification/utils/fp16_utils.py b/fluid/PaddleCV/image_classification/utils/fp16_utils.py index cf6e081e7f9b433e4097f190b012c533064f5cca..939ac59db2441af7b190604895f8a467d9844294 100644 --- a/fluid/PaddleCV/image_classification/utils/fp16_utils.py +++ b/fluid/PaddleCV/image_classification/utils/fp16_utils.py @@ -1,6 +1,7 @@ from __future__ import print_function import paddle import paddle.fluid as fluid +import paddle.fluid.core as core def cast_fp16_to_fp32(i, o, prog): prog.global_block().append_op( @@ -43,8 +44,30 @@ def copy_to_master_param(p, block): name=v.name + ".master") return new_p -def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss): - master_params_grads = [] + +def _update_role_var_grad(prog, params_grads): + BACKWARD = core.op_proto_and_checker_maker.OpRole.Backward + gradname_to_paramname = dict() + for p, g in params_grads: + gradname_to_paramname[g.name] = p.name + for op in prog.global_block().ops: + role = op.attr("op_role") + if role & int(BACKWARD) and op.has_attr("op_role_var"): + # have backward bits then remove all op_role_var + op.desc.remove_attr("op_role_var") + for op in prog.global_block().ops: + if op.type == "allreduce": + allreduce_role_var = [] + for input_varname in op.input_arg_names: + if input_varname in gradname_to_paramname: + allreduce_role_var.append(gradname_to_paramname[input_varname]) + allreduce_role_var.append(input_varname) + print("updating role var: ", allreduce_role_var) + op._set_attr("op_role_var", allreduce_role_var) + +def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss, reduce_master_grad=True): + master_params_grads = [] # master p, g on local device + params_grads_to_apply = [] # master p, g after allreduced, if reduce_master_grad is enabled tmp_role = main_prog._current_role OpRole = fluid.core.op_proto_and_checker_maker.OpRole main_prog._current_role = OpRole.Backward @@ -62,12 +85,21 @@ def create_master_params_grads(params_grads, main_prog, startup_prog, scale_loss scaled_g = g master_params_grads.append([p, scaled_g]) continue + master_grad = fluid.layers.cast(g, "float32") if scale_loss > 1: master_grad = master_grad / float(scale_loss) - master_params_grads.append([master_param, master_grad]) + master_params_grads.append([p, master_grad]) + if reduce_master_grad: + reduced_master_grad = fluid.layers.collective._allreduce(master_grad) + else: + reduced_master_grad = master_grad + params_grads_to_apply.append([master_param, reduced_master_grad]) + + # update program op role var acording to master grads before allreduce. + _update_role_var_grad(main_prog, master_params_grads) main_prog._current_role = tmp_role - return master_params_grads + return params_grads_to_apply def master_param_to_train_param(master_params_grads, params_grads, main_prog): for idx, m_p_g in enumerate(master_params_grads):