import argparse import ast import copy import logging import multiprocessing import os import six import sys import time import numpy as np import paddle.fluid as fluid import reader from config import * from model import transformer, position_encoding_init def parse_args(): parser = argparse.ArgumentParser("Training for Transformer.") parser.add_argument( "--src_vocab_fpath", type=str, required=True, help="The path of vocabulary file of source language.") parser.add_argument( "--trg_vocab_fpath", type=str, required=True, help="The path of vocabulary file of target language.") parser.add_argument( "--train_file_pattern", type=str, required=True, help="The pattern to match training data files.") parser.add_argument( "--val_file_pattern", type=str, help="The pattern to match validation data files.") parser.add_argument( "--use_token_batch", type=ast.literal_eval, default=True, help="The flag indicating whether to " "produce batch data according to token number.") parser.add_argument( "--batch_size", type=int, default=4096, help="The number of sequences contained in a mini-batch, or the maximum " "number of tokens (include paddings) contained in a mini-batch. Note " "that this represents the number on single device and the actual batch " "size for multi-devices will multiply the device number.") parser.add_argument( "--pool_size", type=int, default=200000, help="The buffer size to pool data.") parser.add_argument( "--sort_type", default="pool", choices=("global", "pool", "none"), help="The grain to sort by length: global for all instances; pool for " "instances in pool; none for no sort.") parser.add_argument( "--shuffle", type=ast.literal_eval, default=True, help="The flag indicating whether to shuffle instances in each pass.") parser.add_argument( "--shuffle_batch", type=ast.literal_eval, default=True, help="The flag indicating whether to shuffle the data batches.") parser.add_argument( "--special_token", type=lambda x: x.encode(), default=[b"", b"", b""], nargs=3, help="The , and tokens in the dictionary.") parser.add_argument( "--token_delimiter", type=lambda x: x.encode(), default=b" ", help="The delimiter used to split tokens in source or target sentences. " "For EN-DE BPE data we provided, use spaces as token delimiter. ") parser.add_argument( 'opts', help='See config.py for all options', default=None, nargs=argparse.REMAINDER) parser.add_argument( '--local', type=ast.literal_eval, default=True, help='Whether to run as local mode.') parser.add_argument( '--device', type=str, default='GPU', choices=['CPU', 'GPU'], help="The device type.") parser.add_argument( '--update_method', choices=("pserver", "nccl2"), default="pserver", help='Update method.') parser.add_argument( '--sync', type=ast.literal_eval, default=True, help="sync mode.") parser.add_argument( "--enable_ce", type=ast.literal_eval, default=False, help="The flag indicating whether to run the task " "for continuous evaluation.") parser.add_argument( "--use_mem_opt", type=ast.literal_eval, default=True, help="The flag indicating whether to use memory optimization.") parser.add_argument( "--use_py_reader", type=ast.literal_eval, default=True, help="The flag indicating whether to use py_reader.") parser.add_argument( "--fetch_steps", type=int, default=100, help="The frequency to fetch and print output.") args = parser.parse_args() # Append args related to dict src_dict = reader.DataReader.load_dict(args.src_vocab_fpath) trg_dict = reader.DataReader.load_dict(args.trg_vocab_fpath) dict_args = [ "src_vocab_size", str(len(src_dict)), "trg_vocab_size", str(len(trg_dict)), "bos_idx", str(src_dict[args.special_token[0]]), "eos_idx", str(src_dict[args.special_token[1]]), "unk_idx", str(src_dict[args.special_token[2]]) ] merge_cfg_from_list(args.opts + dict_args, [TrainTaskConfig, ModelHyperParams]) return args def append_nccl2_prepare(startup_prog, trainer_id, worker_endpoints, current_endpoint): assert (trainer_id >= 0 and len(worker_endpoints) > 1 and current_endpoint in worker_endpoints) eps = copy.deepcopy(worker_endpoints) eps.remove(current_endpoint) nccl_id_var = startup_prog.global_block().create_var( name="NCCLID", persistable=True, type=fluid.core.VarDesc.VarType.RAW) startup_prog.global_block().append_op( type="gen_nccl_id", inputs={}, outputs={"NCCLID": nccl_id_var}, attrs={ "endpoint": current_endpoint, "endpoint_list": eps, "trainer_id": trainer_id }) return nccl_id_var def pad_batch_data(insts, pad_idx, n_head, is_target=False, is_label=False, return_attn_bias=True, return_max_len=True, return_num_token=False): """ Pad the instances to the max sequence length in batch, and generate the corresponding position data and attention bias. """ return_list = [] max_len = max(len(inst) for inst in insts) # Any token included in dict can be used to pad, since the paddings' loss # will be masked out by weights and make no effect on parameter gradients. inst_data = np.array( [inst + [pad_idx] * (max_len - len(inst)) for inst in insts]) return_list += [inst_data.astype("int64").reshape([-1, 1])] if is_label: # label weight inst_weight = np.array( [[1.] * len(inst) + [0.] * (max_len - len(inst)) for inst in insts]) return_list += [inst_weight.astype("float32").reshape([-1, 1])] else: # position data inst_pos = np.array([ list(range(0, len(inst))) + [0] * (max_len - len(inst)) for inst in insts ]) return_list += [inst_pos.astype("int64").reshape([-1, 1])] if return_attn_bias: if is_target: # This is used to avoid attention on paddings and subsequent # words. slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, max_len)) slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape([-1, 1, max_len, max_len]) slf_attn_bias_data = np.tile(slf_attn_bias_data, [1, n_head, 1, 1]) * [-1e9] else: # This is used to avoid attention on paddings. slf_attn_bias_data = np.array([[0] * len(inst) + [-1e9] * (max_len - len(inst)) for inst in insts]) slf_attn_bias_data = np.tile( slf_attn_bias_data.reshape([-1, 1, 1, max_len]), [1, n_head, max_len, 1]) return_list += [slf_attn_bias_data.astype("float32")] if return_max_len: return_list += [max_len] if return_num_token: num_token = 0 for inst in insts: num_token += len(inst) return_list += [num_token] return return_list if len(return_list) > 1 else return_list[0] def prepare_batch_input(insts, data_input_names, src_pad_idx, trg_pad_idx, n_head, d_model): """ Put all padded data needed by training into a dict. """ src_word, src_pos, src_slf_attn_bias, src_max_len = pad_batch_data( [inst[0] for inst in insts], src_pad_idx, n_head, is_target=False) src_word = src_word.reshape(-1, src_max_len, 1) src_pos = src_pos.reshape(-1, src_max_len, 1) trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = pad_batch_data( [inst[1] for inst in insts], trg_pad_idx, n_head, is_target=True) trg_word = trg_word.reshape(-1, trg_max_len, 1) trg_pos = trg_pos.reshape(-1, trg_max_len, 1) trg_src_attn_bias = np.tile(src_slf_attn_bias[:, :, ::src_max_len, :], [1, 1, trg_max_len, 1]).astype("float32") lbl_word, lbl_weight, num_token = pad_batch_data( [inst[2] for inst in insts], trg_pad_idx, n_head, is_target=False, is_label=True, return_attn_bias=False, return_max_len=False, return_num_token=True) data_input_dict = dict( zip(data_input_names, [ src_word, src_pos, src_slf_attn_bias, trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight ])) return data_input_dict, np.asarray([num_token], dtype="float32") def prepare_data_generator(args, is_test, count, pyreader, py_reader_provider_wrapper, place=None): """ Data generator wrapper for DataReader. If use py_reader, set the data provider for py_reader """ data_reader = reader.DataReader( fpattern=args.val_file_pattern if is_test else args.train_file_pattern, src_vocab_fpath=args.src_vocab_fpath, trg_vocab_fpath=args.trg_vocab_fpath, token_delimiter=args.token_delimiter, use_token_batch=args.use_token_batch, batch_size=args.batch_size * (1 if args.use_token_batch else count), pool_size=args.pool_size, sort_type=args.sort_type, shuffle=args.shuffle, shuffle_batch=args.shuffle_batch, start_mark=args.special_token[0], end_mark=args.special_token[1], unk_mark=args.special_token[2], # count start and end tokens out max_length=ModelHyperParams.max_length - 2, clip_last_batch=False).batch_generator def stack(data_reader, count, clip_last=True): def __impl__(): res = [] for item in data_reader(): res.append(item) if len(res) == count: yield res res = [] if len(res) == count: yield res elif not clip_last: data = [] for item in res: data += item if len(data) > count: inst_num_per_part = len(data) // count yield [ data[inst_num_per_part * i:inst_num_per_part * (i + 1)] for i in range(count) ] return __impl__ def split(data_reader, count): def __impl__(): for item in data_reader(): inst_num_per_part = len(item) // count for i in range(count): yield item[inst_num_per_part * i:inst_num_per_part * (i + 1 )] return __impl__ if not args.use_token_batch: # to make data on each device have similar token number data_reader = split(data_reader, count) if args.use_py_reader: pyreader.decorate_tensor_provider( py_reader_provider_wrapper(data_reader, place)) data_reader = None else: # Data generator for multi-devices data_reader = stack(data_reader, count) return data_reader def prepare_feed_dict_list(data_generator, init_flag, count): """ Prepare the list of feed dict for multi-devices. """ feed_dict_list = [] if data_generator is not None: # use_py_reader == False data_input_names = encoder_data_input_fields + \ decoder_data_input_fields[:-1] + label_data_input_fields data = next(data_generator) for idx, data_buffer in enumerate(data): data_input_dict, num_token = prepare_batch_input( data_buffer, data_input_names, ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, ModelHyperParams.n_head, ModelHyperParams.d_model) feed_dict_list.append(data_input_dict) if init_flag: for idx in range(count): pos_enc_tables = dict() for pos_enc_param_name in pos_enc_param_names: pos_enc_tables[pos_enc_param_name] = position_encoding_init( ModelHyperParams.max_length + 1, ModelHyperParams.d_model) if len(feed_dict_list) <= idx: feed_dict_list.append(pos_enc_tables) else: feed_dict_list[idx] = dict( list(pos_enc_tables.items()) + list(feed_dict_list[idx] .items())) return feed_dict_list if len(feed_dict_list) == count else None def py_reader_provider_wrapper(data_reader, place): """ Data provider needed by fluid.layers.py_reader. """ def py_reader_provider(): data_input_names = encoder_data_input_fields + \ decoder_data_input_fields[:-1] + label_data_input_fields for batch_id, data in enumerate(data_reader()): data_input_dict, num_token = prepare_batch_input( data, data_input_names, ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, ModelHyperParams.n_head, ModelHyperParams.d_model) yield [data_input_dict[item] for item in data_input_names] return py_reader_provider def test_context(exe, train_exe, dev_count): # Context to do validation. test_prog = fluid.Program() startup_prog = fluid.Program() if args.enable_ce: test_prog.random_seed = 1000 startup_prog.random_seed = 1000 with fluid.program_guard(test_prog, startup_prog): with fluid.unique_name.guard(): sum_cost, avg_cost, predict, token_num, pyreader = transformer( ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size, ModelHyperParams.max_length + 1, ModelHyperParams.n_layer, ModelHyperParams.n_head, ModelHyperParams.d_key, ModelHyperParams.d_value, ModelHyperParams.d_model, ModelHyperParams.d_inner_hid, ModelHyperParams.prepostprocess_dropout, ModelHyperParams.attention_dropout, ModelHyperParams.relu_dropout, ModelHyperParams.preprocess_cmd, ModelHyperParams.postprocess_cmd, ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps, use_py_reader=args.use_py_reader, is_test=True) test_prog = test_prog.clone(for_test=True) test_data = prepare_data_generator( args, is_test=True, count=dev_count, pyreader=pyreader, py_reader_provider_wrapper=py_reader_provider_wrapper) exe.run(startup_prog) # to init pyreader for testing if TrainTaskConfig.ckpt_path: fluid.io.load_persistables( exe, TrainTaskConfig.ckpt_path, main_program=test_prog) exec_strategy = fluid.ExecutionStrategy() exec_strategy.use_experimental_executor = True build_strategy = fluid.BuildStrategy() test_exe = fluid.ParallelExecutor( use_cuda=TrainTaskConfig.use_gpu, main_program=test_prog, build_strategy=build_strategy, exec_strategy=exec_strategy, share_vars_from=train_exe) def test(exe=test_exe, pyreader=pyreader): test_total_cost = 0 test_total_token = 0 if args.use_py_reader: pyreader.start() data_generator = None else: data_generator = test_data() while True: try: feed_dict_list = prepare_feed_dict_list(data_generator, False, dev_count) outs = test_exe.run(fetch_list=[sum_cost.name, token_num.name], feed=feed_dict_list) except (StopIteration, fluid.core.EOFException): # The current pass is over. if args.use_py_reader: pyreader.reset() break sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) test_total_cost += sum_cost_val.sum() test_total_token += token_num_val.sum() test_avg_cost = test_total_cost / test_total_token test_ppl = np.exp([min(test_avg_cost, 100)]) return test_avg_cost, test_ppl return test def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost, token_num, predict, pyreader, nccl2_num_trainers=1, nccl2_trainer_id=0): # Initialize the parameters. if TrainTaskConfig.ckpt_path: exe.run(startup_prog) # to init pyreader for training logging.info("load checkpoint from {}".format( TrainTaskConfig.ckpt_path)) fluid.io.load_persistables( exe, TrainTaskConfig.ckpt_path, main_program=train_prog) else: logging.info("init fluid.framework.default_startup_program") exe.run(startup_prog) logging.info("begin reader") train_data = prepare_data_generator( args, is_test=False, count=dev_count, pyreader=pyreader, py_reader_provider_wrapper=py_reader_provider_wrapper) # For faster executor exec_strategy = fluid.ExecutionStrategy() exec_strategy.use_experimental_executor = True exec_strategy.num_iteration_per_drop_scope = int(args.fetch_steps) build_strategy = fluid.BuildStrategy() # Since the token number differs among devices, customize gradient scale to # use token average cost among multi-devices. and the gradient scale is # `1 / token_number` for average cost. # build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized logging.info("begin executor") train_exe = fluid.ParallelExecutor( use_cuda=TrainTaskConfig.use_gpu, loss_name=avg_cost.name, main_program=train_prog, build_strategy=build_strategy, exec_strategy=exec_strategy, num_trainers=nccl2_num_trainers, trainer_id=nccl2_trainer_id) if args.val_file_pattern is not None: test = test_context(exe, train_exe, dev_count) # the best cross-entropy value with label smoothing loss_normalizer = -((1. - TrainTaskConfig.label_smooth_eps) * np.log( (1. - TrainTaskConfig.label_smooth_eps )) + TrainTaskConfig.label_smooth_eps * np.log(TrainTaskConfig.label_smooth_eps / ( ModelHyperParams.trg_vocab_size - 1) + 1e-20)) step_idx = 0 init_flag = True logging.info("begin train") for pass_id in six.moves.xrange(TrainTaskConfig.pass_num): pass_start_time = time.time() if args.use_py_reader: pyreader.start() data_generator = None else: data_generator = train_data() batch_id = 0 while True: try: feed_dict_list = prepare_feed_dict_list(data_generator, init_flag, dev_count) outs = train_exe.run( fetch_list=[sum_cost.name, token_num.name] if step_idx % args.fetch_steps == 0 else [], feed=feed_dict_list) if step_idx % args.fetch_steps == 0: sum_cost_val, token_num_val = np.array(outs[0]), np.array( outs[1]) # sum the cost from multi-devices total_sum_cost = sum_cost_val.sum() total_token_num = token_num_val.sum() total_avg_cost = total_sum_cost / total_token_num if step_idx == 0: logging.info( "step_idx: %d, epoch: %d, batch: %d, avg loss: %f, " "normalized loss: %f, ppl: %f" % (step_idx, pass_id, batch_id, total_avg_cost, total_avg_cost - loss_normalizer, np.exp([min(total_avg_cost, 100)]))) avg_batch_time = time.time() else: logging.info( "step_idx: %d, epoch: %d, batch: %d, avg loss: %f, " "normalized loss: %f, ppl: %f, speed: %.2f step/s" % (step_idx, pass_id, batch_id, total_avg_cost, total_avg_cost - loss_normalizer, np.exp([min(total_avg_cost, 100)]), args.fetch_steps / (time.time() - avg_batch_time))) avg_batch_time = time.time() if step_idx % TrainTaskConfig.save_freq == 0 and step_idx > 0: fluid.io.save_persistables( exe, os.path.join(TrainTaskConfig.ckpt_dir, "latest.checkpoint"), train_prog) fluid.io.save_params( exe, os.path.join(TrainTaskConfig.model_dir, "iter_" + str(step_idx) + ".infer.model"), train_prog) init_flag = False batch_id += 1 step_idx += 1 except (StopIteration, fluid.core.EOFException): # The current pass is over. if args.use_py_reader: pyreader.reset() break time_consumed = time.time() - pass_start_time # Validate and save the persistable. if args.val_file_pattern is not None: val_avg_cost, val_ppl = test() logging.info( "epoch: %d, val avg loss: %f, val normalized loss: %f, val ppl: %f," " consumed %fs" % (pass_id, val_avg_cost, val_avg_cost - loss_normalizer, val_ppl, time_consumed)) else: logging.info("epoch: %d, consumed %fs" % (pass_id, time_consumed)) if not args.enable_ce: fluid.io.save_persistables( exe, os.path.join(TrainTaskConfig.ckpt_dir, "pass_" + str(pass_id) + ".checkpoint"), train_prog) if args.enable_ce: # For CE print("kpis\ttrain_cost_card%d\t%f" % (dev_count, total_avg_cost)) if args.val_file_pattern is not None: print("kpis\ttest_cost_card%d\t%f" % (dev_count, val_avg_cost)) print("kpis\ttrain_duration_card%d\t%f" % (dev_count, time_consumed)) def train(args): # priority: ENV > args > config is_local = os.getenv("PADDLE_IS_LOCAL", "1") if is_local == '0': args.local = False logging.info(args) if args.device == 'CPU': TrainTaskConfig.use_gpu = False training_role = os.getenv("TRAINING_ROLE", "TRAINER") if training_role == "PSERVER" or (not TrainTaskConfig.use_gpu): place = fluid.CPUPlace() dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count())) else: place = fluid.CUDAPlace(0) dev_count = fluid.core.get_cuda_device_count() exe = fluid.Executor(place) train_prog = fluid.Program() startup_prog = fluid.Program() if args.enable_ce: train_prog.random_seed = 1000 startup_prog.random_seed = 1000 with fluid.program_guard(train_prog, startup_prog): with fluid.unique_name.guard(): sum_cost, avg_cost, predict, token_num, pyreader = transformer( ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size, ModelHyperParams.max_length + 1, ModelHyperParams.n_layer, ModelHyperParams.n_head, ModelHyperParams.d_key, ModelHyperParams.d_value, ModelHyperParams.d_model, ModelHyperParams.d_inner_hid, ModelHyperParams.prepostprocess_dropout, ModelHyperParams.attention_dropout, ModelHyperParams.relu_dropout, ModelHyperParams.preprocess_cmd, ModelHyperParams.postprocess_cmd, ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps, use_py_reader=args.use_py_reader, is_test=False) optimizer = None if args.sync: lr_decay = fluid.layers.learning_rate_scheduler.noam_decay( ModelHyperParams.d_model, TrainTaskConfig.warmup_steps) logging.info("before adam") with fluid.default_main_program()._lr_schedule_guard(): learning_rate = lr_decay * TrainTaskConfig.learning_rate optimizer = fluid.optimizer.Adam( learning_rate=learning_rate, beta1=TrainTaskConfig.beta1, beta2=TrainTaskConfig.beta2, epsilon=TrainTaskConfig.eps) else: optimizer = fluid.optimizer.SGD(0.003) optimizer.minimize(avg_cost) if args.use_mem_opt: fluid.memory_optimize(train_prog) if args.local: logging.info("local start_up:") train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost, token_num, predict, pyreader) else: if args.update_method == "nccl2": trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) port = os.getenv("PADDLE_PORT") worker_ips = os.getenv("PADDLE_TRAINERS") worker_endpoints = [] for ip in worker_ips.split(","): worker_endpoints.append(':'.join([ip, port])) trainers_num = len(worker_endpoints) current_endpoint = os.getenv("POD_IP") + ":" + port if trainer_id == 0: logging.info("train_id == 0, sleep 60s") time.sleep(60) logging.info("trainers_num:{}".format(trainers_num)) logging.info("worker_endpoints:{}".format(worker_endpoints)) logging.info("current_endpoint:{}".format(current_endpoint)) append_nccl2_prepare(startup_prog, trainer_id, worker_endpoints, current_endpoint) train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost, token_num, predict, pyreader, trainers_num, trainer_id) return port = os.getenv("PADDLE_PORT", "6174") pserver_ips = os.getenv("PADDLE_PSERVERS") # ip,ip... eplist = [] for ip in pserver_ips.split(","): eplist.append(':'.join([ip, port])) pserver_endpoints = ",".join(eplist) # ip:port,ip:port... trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0")) current_endpoint = os.getenv("POD_IP") + ":" + port trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) logging.info("pserver_endpoints:{}".format(pserver_endpoints)) logging.info("current_endpoint:{}".format(current_endpoint)) logging.info("trainer_id:{}".format(trainer_id)) logging.info("pserver_ips:{}".format(pserver_ips)) logging.info("port:{}".format(port)) t = fluid.DistributeTranspiler() t.transpile( trainer_id, pservers=pserver_endpoints, trainers=trainers, program=train_prog, startup_program=startup_prog) if training_role == "PSERVER": logging.info("distributed: pserver started") current_endpoint = os.getenv("POD_IP") + ":" + os.getenv( "PADDLE_PORT") if not current_endpoint: logging.critical("need env SERVER_ENDPOINT") exit(1) pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) exe.run(pserver_startup) exe.run(pserver_prog) elif training_role == "TRAINER": logging.info("distributed: trainer started") trainer_prog = t.get_trainer_program() train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost, token_num, predict, pyreader) else: logging.critical( "environment var TRAINER_ROLE should be TRAINER os PSERVER") exit(1) if __name__ == "__main__": LOG_FORMAT = "[%(asctime)s %(levelname)s %(filename)s:%(lineno)d] %(message)s" logging.basicConfig( stream=sys.stdout, level=logging.DEBUG, format=LOG_FORMAT) logging.getLogger().setLevel(logging.INFO) args = parse_args() train(args)