From 5616974e5de13d2ccb6360c298e67d0186f7e849 Mon Sep 17 00:00:00 2001 From: guosheng Date: Tue, 17 Apr 2018 15:53:27 +0800 Subject: [PATCH] Add ParallelExecutor for training, validation and saving in Transformer --- .../transformer/train.py | 103 ++++++--- .../transformer/util.py | 205 ++++++++++++++++++ 2 files changed, 272 insertions(+), 36 deletions(-) create mode 100644 fluid/neural_machine_translation/transformer/util.py diff --git a/fluid/neural_machine_translation/transformer/train.py b/fluid/neural_machine_translation/transformer/train.py index 14c9acdf..dfcfa08e 100644 --- a/fluid/neural_machine_translation/transformer/train.py +++ b/fluid/neural_machine_translation/transformer/train.py @@ -8,6 +8,7 @@ import paddle.fluid as fluid from model import transformer, position_encoding_init from optim import LearningRateScheduler from config import * +from util import save_inference_model def pad_batch_data(insts, @@ -43,8 +44,8 @@ def pad_batch_data(insts, # This is used to avoid attention on paddings and subsequent # words. slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, max_len)) - slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape( - [-1, 1, max_len, max_len]) + slf_attn_bias_data = np.triu(slf_attn_bias_data, + 1).reshape([-1, 1, max_len, max_len]) slf_attn_bias_data = np.tile(slf_attn_bias_data, [1, n_head, 1, 1]) * [-1e9] else: @@ -165,25 +166,6 @@ def main(): ModelHyperParams.trg_vocab_size), batch_size=TrainTaskConfig.batch_size) - def test(exe): - test_total_cost = 0 - test_total_token = 0 - for batch_id, data in enumerate(val_data()): - data_input_dict, util_input_dict = prepare_batch_input( - data, data_input_names, util_input_names, - ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, - ModelHyperParams.n_head, ModelHyperParams.d_model) - test_sum_cost, test_token_num = exe.run( - test_program, - feed=dict(data_input_dict.items() + util_input_dict.items()), - fetch_list=[sum_cost, token_num], - use_program_cache=True) - test_total_cost += test_sum_cost - test_total_token += test_token_num - test_avg_cost = test_total_cost / test_total_token - test_ppl = np.exp([min(test_avg_cost, 100)]) - return test_avg_cost, test_ppl - # Initialize the parameters. exe.run(fluid.framework.default_startup_program()) @@ -191,35 +173,81 @@ def main(): -1] + label_data_names util_input_names = encoder_util_input_fields + decoder_util_input_fields + def test(exe): + test_total_cost = 0 + test_total_token = 0 + test_data = read_multiple(reader=val_data, count=dev_count) + for batch_id, data in enumerate(test_data()): + for place_id, data_buffer in enumerate(data): + data_input_dict, util_input_dict = prepare_batch_input( + data_buffer, data_input_names, util_input_names, + ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, + ModelHyperParams.n_head, ModelHyperParams.d_model) + local_scope = exe.executor.local_scope(place_id) + + for var_name in data_input_dict: + local_scope.var(var_name).get_tensor().set( + data_input_dict[var_name], fluid.CUDAPlace(place_id)) + + for var_name in util_input_dict: + local_scope.var(var_name).get_tensor().set( + util_input_dict[var_name], fluid.CUDAPlace(place_id)) + + outs = exe.run(fetch_list=[sum_cost.name, token_num.name]) + sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) + test_total_cost += sum_cost_val.sum() + test_total_token += token_num_val.sum() + test_avg_cost = test_total_cost / test_total_token + test_ppl = np.exp([min(test_avg_cost, 100)]) + return test_avg_cost, test_ppl + train_exe = fluid.ParallelExecutor( use_cuda=TrainTaskConfig.use_gpu, loss_name=avg_cost.name if TrainTaskConfig.use_avg_cost else sum_cost.name) - train_data = read_multiple(reader=train_data, count=train_exe.device_count) + test_exe = fluid.ParallelExecutor( + use_cuda=True, main_program=test_program, share_vars_from=train_exe) + '''with open('./main_program.txt', 'w') as f_main: + print >> f_main, fluid.default_main_program() + with open('./startup_program.txt', 'w') as f_main: + print >> f_main, fluid.default_startup_program() + exit(0)''' + dev_count = fluid.core.get_cuda_device_count() + + for pos_enc_param_name in pos_enc_param_names: + tensor = position_encoding_init(ModelHyperParams.max_length + 1, + ModelHyperParams.d_model) + for place_id in xrange(dev_count): + local_scope = train_exe.executor.local_scope(place_id) + local_scope.var(pos_enc_param_name).get_tensor().set( + tensor, fluid.CUDAPlace(place_id)) + + train_data = read_multiple(reader=train_data, count=dev_count) for pass_id in xrange(TrainTaskConfig.pass_num): pass_start_time = time.time() for batch_id, data in enumerate(train_data()): - data_on_devices = [] - + lr = lr_scheduler.update_learning_rate(), for place_id, data_buffer in enumerate(data): data_input_dict, util_input_dict = prepare_batch_input( data_buffer, data_input_names, util_input_names, ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, ModelHyperParams.n_head, ModelHyperParams.d_model) - data_input_dict.update(util_input_dict) - data_input_dict.update({ - lr_scheduler.learning_rate.name: lr_scheduler.update_learning_rate() - }) + local_scope = train_exe.executor.local_scope(place_id) - for pos_enc_param_name in pos_enc_param_names: - tensor = position_encoding_init(ModelHyperParams.max_length + 1, ModelHyperParams.d_model) - data_input_dict[pos_enc_param_name] = tensor + local_scope.find_var( + lr_scheduler.learning_rate.name).get_tensor().set( + lr, fluid.CUDAPlace(place_id)) - data_on_devices.append(data_input_dict) + for var_name in data_input_dict: + local_scope.var(var_name).get_tensor().set( + data_input_dict[var_name], fluid.CUDAPlace(place_id)) - outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name], feed=data_on_devices) + for var_name in util_input_dict: + local_scope.var(var_name).get_tensor().set( + util_input_dict[var_name], fluid.CUDAPlace(place_id)) + outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name]) sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) total_sum_cost = sum_cost_val.sum( ) # sum the cost from multi devices @@ -229,16 +257,19 @@ def main(): (pass_id, batch_id, total_sum_cost, total_avg_cost, np.exp([min(total_avg_cost, 100)]))) # Validate and save the model for inference. - val_avg_cost, val_ppl = test(exe) + val_avg_cost, val_ppl = test(test_exe) pass_end_time = time.time() time_consumed = pass_end_time - pass_start_time print("epoch: %d, val avg loss: %f, val ppl: %f, " "consumed %fs" % (pass_id, val_avg_cost, val_ppl, time_consumed)) - fluid.io.save_inference_model( + save_program = save_inference_model( os.path.join(TrainTaskConfig.model_dir, "pass_" + str(pass_id) + ".infer.model"), encoder_input_data_names + decoder_input_data_names[:-1], - [predict], exe) + [predict], train_exe) + save_exe = fluid.ParallelExecutor( + use_cuda=True, main_program=save_program, share_vars_from=train_exe) + save_exe.run(fetch_list=[]) if __name__ == "__main__": diff --git a/fluid/neural_machine_translation/transformer/util.py b/fluid/neural_machine_translation/transformer/util.py new file mode 100644 index 00000000..a2c6f08c --- /dev/null +++ b/fluid/neural_machine_translation/transformer/util.py @@ -0,0 +1,205 @@ +import os +from paddle.fluid.framework import Program, Parameter, default_main_program, Variable +import paddle.fluid.core as core + + +def is_persistable(var): + if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ + var.desc.type() == core.VarDesc.VarType.FETCH_LIST: + return False + return var.persistable + + +def _clone_var_in_block_(block, var): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + lod_level=var.lod_level, + persistable=True) + + +def save_vars(executor, + dirname, + main_program=None, + vars=None, + predicate=None, + filename=None): + """ + Save variables to directory by executor. + + :param executor: executor that save variable + :param dirname: directory path + :param main_program: program. If vars is None, then filter all variables in this + program which fit `predicate`. Default default_main_program. + :param predicate: The Predicate describes a callable that returns a variable + as a bool. If it returns true, the corresponding input variable will be saved. + :param vars: variables need to be saved. If vars is specified, program & predicate + will be ignored + :param filename: The name of a single file that all vars are saved to. + If it is None, save variables to separate files. + + :return: None + """ + if vars is None: + if main_program is None: + main_program = default_main_program() + if not isinstance(main_program, Program): + raise TypeError("program should be as Program type or None") + + return save_vars( + executor, + dirname=dirname, + vars=filter(predicate, main_program.list_vars()), + filename=filename) + else: + save_program = Program() + save_block = save_program.global_block() + + save_var_map = {} + for each_var in vars: + # NOTE: don't save the variable which type is RAW + if each_var.type == core.VarDesc.VarType.RAW: + continue + new_var = _clone_var_in_block_(save_block, each_var) + if filename is None: + save_block.append_op( + type='save', + inputs={'X': [new_var]}, + outputs={}, + attrs={'file_path': os.path.join(dirname, new_var.name)}) + else: + save_var_map[new_var.name] = new_var + + if filename is not None: + save_var_list = [] + for name in sorted(save_var_map.keys()): + save_var_list.append(save_var_map[name]) + + save_block.append_op( + type='save_combine', + inputs={'X': save_var_list}, + outputs={}, + attrs={'file_path': os.path.join(dirname, filename)}) + #print save_program + return save_program #executor.run(fetch_list=[]) + + +def save_persistables(executor, dirname, main_program=None, filename=None): + """ + Save all persistables to directory with executor. + """ + save_program = save_vars( + executor, + dirname=dirname, + main_program=main_program, + vars=None, + predicate=is_persistable, + filename=filename) + #print save_program + return save_program + + +def prepend_feed_ops(inference_program, + feed_target_names, + feed_holder_name='feed'): + global_block = inference_program.global_block() + feed_var = global_block.create_var( + name=feed_holder_name, + type=core.VarDesc.VarType.FEED_MINIBATCH, + persistable=True) + + for i, name in enumerate(feed_target_names): + out = global_block.var(name) + global_block.prepend_op( + type='feed', + inputs={'X': [feed_var]}, + outputs={'Out': [out]}, + attrs={'col': i}) + + +def append_fetch_ops(inference_program, + fetch_target_names, + fetch_holder_name='fetch'): + global_block = inference_program.global_block() + fetch_var = global_block.create_var( + name=fetch_holder_name, + type=core.VarDesc.VarType.FETCH_LIST, + persistable=True) + + for i, name in enumerate(fetch_target_names): + global_block.append_op( + type='fetch', + inputs={'X': [name]}, + outputs={'Out': [fetch_var]}, + attrs={'col': i}) + + +def save_inference_model(dirname, + feeded_var_names, + target_vars, + executor, + main_program=None, + model_filename=None, + params_filename=None): + """ + Build a model especially for inference, + and save it to directory by the executor. + + :param dirname: directory path + :param feeded_var_names: Names of variables that need to be feeded data during inference + :param target_vars: Variables from which we can get inference results. + :param executor: executor that save inference model + :param main_program: original program, which will be pruned to build the inference model. + Default default_main_program(). + :param model_filename: The name of file to save inference program. + If not specified, default filename `__model__` will be used. + :param params_filename: The name of file to save parameters. + It is used for the case that all parameters are saved in a single binary file. + If not specified, parameters are considered saved in separate files. + + :return: None + """ + if isinstance(feeded_var_names, basestring): + feeded_var_names = [feeded_var_names] + else: + if not (bool(feeded_var_names) and all( + isinstance(name, basestring) for name in feeded_var_names)): + raise ValueError("'feed_var_names' should be a list of str.") + + if isinstance(target_vars, Variable): + target_vars = [target_vars] + else: + if not (bool(target_vars) and + all(isinstance(var, Variable) for var in target_vars)): + raise ValueError("'target_vars' should be a list of Variable.") + + if main_program is None: + main_program = default_main_program() + + if not os.path.isdir(dirname): + os.makedirs(dirname) + + pruned_program = main_program.prune(targets=target_vars) + inference_program = pruned_program.inference_optimize() + fetch_var_names = [v.name for v in target_vars] + + prepend_feed_ops(inference_program, feeded_var_names) + append_fetch_ops(inference_program, fetch_var_names) + + if model_filename is not None: + model_filename = os.path.basename(model_filename) + else: + model_filename = "__model__" + model_filename = os.path.join(dirname, model_filename) + + if params_filename is not None: + params_filename = os.path.basename(params_filename) + + with open(model_filename, "wb") as f: + f.write(inference_program.desc.serialize_to_string()) + + return save_persistables(executor, dirname, inference_program, + params_filename) -- GitLab