diff --git a/python/paddle/fluid/tests/unittests/dist_transformer.py b/python/paddle/fluid/tests/unittests/dist_transformer.py index 239adcb9d5900d4073a6c07cb189ab7503aea86e..179c2540f812feac32632540e9070b58451c9a90 100644 --- a/python/paddle/fluid/tests/unittests/dist_transformer.py +++ b/python/paddle/fluid/tests/unittests/dist_transformer.py @@ -18,54 +18,129 @@ import numpy as np import argparse import time import math +import os +import sys +import six +import argparse +import ast +import multiprocessing +import time +from functools import partial +from os.path import expanduser +import glob +import random +import tarfile import paddle import paddle.fluid as fluid +import paddle.fluid.layers as layers from paddle.fluid import core -import os -import sys -import six -import transformer_model -import paddle.dataset.wmt16 as wmt16 +from test_dist_base import TestDistRunnerBase, runtime_main +from paddle.compat import long_type + +import hashlib + +from paddle.fluid.transpiler.details import program_to_code + +const_para_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant(0.001)) +const_bias_attr = const_para_attr # Fix seed for test fluid.default_startup_program().random_seed = 1 fluid.default_main_program().random_seed = 1 -WMT16_RECORDIO_FILE = "/tmp/wmt16.recordio" +#from transformer_config import ModelHyperParams, TrainTaskConfig, merge_cfg_from_list +class TrainTaskConfig(object): + # only support GPU currently + use_gpu = True + # the epoch number to train. + pass_num = 1 + # the number of sequences contained in a mini-batch. + # deprecated, set batch_size in args. + batch_size = 20 + # the hyper parameters for Adam optimizer. + # This static learning_rate will be multiplied to the LearningRateScheduler + # derived learning rate the to get the final learning rate. + learning_rate = 1 + beta1 = 0.9 + beta2 = 0.98 + eps = 1e-9 + # the parameters for learning rate scheduling. + warmup_steps = 4000 + # the weight used to mix up the ground-truth distribution and the fixed + # uniform distribution in label smoothing when training. + # Set this as zero if label smoothing is not wanted. + label_smooth_eps = 0.1 + # the directory for saving trained models. + model_dir = "trained_models" + # the directory for saving checkpoints. + ckpt_dir = "trained_ckpts" + # the directory for loading checkpoint. + # If provided, continue training from the checkpoint. + ckpt_path = None + # the parameter to initialize the learning rate scheduler. + # It should be provided if use checkpoints, since the checkpoint doesn't + # include the training step counter currently. + start_step = 0 -class ModelHyperParams(object): - # Dictionary size for source and target language. This model directly uses - # paddle.dataset.wmt16 in which , and token has - # alreay been added, but the token is not added. Transformer requires - # sequences in a mini-batch are padded to have the same length. A token is - # added into the original dictionary in paddle.dateset.wmt16. + check_acc = True - # size of source word dictionary. - src_vocab_size = 10000 - # index for token in source language. - src_pad_idx = src_vocab_size + data_path = expanduser("~") + ( + "/.cache/paddle/dataset/test_dist_transformer/") + src_vocab_fpath = data_path + "vocab.bpe.32000" + trg_vocab_fpath = data_path + "vocab.bpe.32000" + train_file_pattern = data_path + "train.tok.clean.bpe.32000.en-de" + val_file_pattern = data_path + "newstest2013.tok.bpe.32000.en-de" + pool_size = 2000 + sort_type = None + local = True + shuffle = False + shuffle_batch = False + special_token = ['', '', ''] + token_delimiter = ' ' + use_token_batch = False - # size of target word dictionay - trg_vocab_size = 10000 - # index for token in target language. - trg_pad_idx = trg_vocab_size - # position value corresponding to the token. - pos_pad_idx = 0 +class InferTaskConfig(object): + use_gpu = True + # the number of examples in one run for sequence generation. + batch_size = 10 + # the parameters for beam search. + beam_size = 5 + max_out_len = 256 + # the number of decoded sentences to output. + n_best = 1 + # the flags indicating whether to output the special tokens. + output_bos = False + output_eos = False + output_unk = True + # the directory for loading the trained model. + model_path = "trained_models/pass_1.infer.model" - # max length of sequences. It should plus 1 to include position - # padding token for position encoding. - max_length = 50 +class ModelHyperParams(object): + # These following five vocabularies related configurations will be set + # automatically according to the passed vocabulary path and special tokens. + # size of source word dictionary. + src_vocab_size = 10000 + # size of target word dictionay + trg_vocab_size = 10000 + # index for token + bos_idx = 0 + # index for token + eos_idx = 1 + # index for token + unk_idx = 2 + # max length of sequences deciding the size of position encoding table. + # Start from 1 and count start and end tokens in. + max_length = 256 # the dimension for word embeddings, which is also the last dimension of # the input and output of multi-head attention, position-wise feed-forward # networks, encoder and decoder. - d_model = 512 # size of the hidden layer in position-wise feed-forward networks. - d_inner_hid = 1024 + d_inner_hid = 2048 # the dimension that keys are projected to for dot-product attention. d_key = 64 # the dimension that values are projected to for dot-product attention. @@ -75,95 +150,1521 @@ class ModelHyperParams(object): # number of sub-layers to be stacked in the encoder and decoder. n_layer = 6 # dropout rate used by all dropout layers. - dropout = 0.1 + dropout = 0.0 # no random + # random seed used in dropout for CE. + dropout_seed = None + # the flag indicating whether to share embedding and softmax weights. + # vocabularies in source and target should be same for weight sharing. + weight_sharing = True -def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): +def merge_cfg_from_list(cfg_list, g_cfgs): + """ + Set the above global configurations using the cfg_list. + """ + assert len(cfg_list) % 2 == 0 + for key, value in zip(cfg_list[0::2], cfg_list[1::2]): + for g_cfg in g_cfgs: + if hasattr(g_cfg, key): + try: + value = eval(value) + except Exception: # for file path + pass + setattr(g_cfg, key, value) + break + + +# The placeholder for batch_size in compile time. Must be -1 currently to be +# consistent with some ops' infer-shape output in compile time, such as the +# sequence_expand op used in beamsearch decoder. +batch_size = -1 +# The placeholder for squence length in compile time. +seq_len = ModelHyperParams.max_length +# Here list the data shapes and data types of all inputs. +# The shapes here act as placeholder and are set to pass the infer-shape in +# compile time. +input_descs = { + # The actual data shape of src_word is: + # [batch_size * max_src_len_in_batch, 1] + "src_word": [(batch_size, seq_len, long_type(1)), "int64", 2], + # The actual data shape of src_pos is: + # [batch_size * max_src_len_in_batch, 1] + "src_pos": [(batch_size, seq_len, long_type(1)), "int64"], + # This input is used to remove attention weights on paddings in the + # encoder. + # The actual data shape of src_slf_attn_bias is: + # [batch_size, n_head, max_src_len_in_batch, max_src_len_in_batch] + "src_slf_attn_bias": [(batch_size, ModelHyperParams.n_head, seq_len, + seq_len), "float32"], + # The actual data shape of trg_word is: + # [batch_size * max_trg_len_in_batch, 1] + "trg_word": [(batch_size, seq_len, long_type(1)), "int64", + 2], # lod_level is only used in fast decoder. + # The actual data shape of trg_pos is: + # [batch_size * max_trg_len_in_batch, 1] + "trg_pos": [(batch_size, seq_len, long_type(1)), "int64"], + # This input is used to remove attention weights on paddings and + # subsequent words in the decoder. + # The actual data shape of trg_slf_attn_bias is: + # [batch_size, n_head, max_trg_len_in_batch, max_trg_len_in_batch] + "trg_slf_attn_bias": [(batch_size, ModelHyperParams.n_head, seq_len, + seq_len), "float32"], + # This input is used to remove attention weights on paddings of the source + # input in the encoder-decoder attention. + # The actual data shape of trg_src_attn_bias is: + # [batch_size, n_head, max_trg_len_in_batch, max_src_len_in_batch] + "trg_src_attn_bias": [(batch_size, ModelHyperParams.n_head, seq_len, + seq_len), "float32"], + # This input is used in independent decoder program for inference. + # The actual data shape of enc_output is: + # [batch_size, max_src_len_in_batch, d_model] + "enc_output": [(batch_size, seq_len, ModelHyperParams.d_model), "float32"], + # The actual data shape of label_word is: + # [batch_size * max_trg_len_in_batch, 1] + "lbl_word": [(batch_size * seq_len, long_type(1)), "int64"], + # This input is used to mask out the loss of paddding tokens. + # The actual data shape of label_weight is: + # [batch_size * max_trg_len_in_batch, 1] + "lbl_weight": [(batch_size * seq_len, long_type(1)), "float32"], + # These inputs are used to change the shape tensor in beam-search decoder. + "trg_slf_attn_pre_softmax_shape_delta": [(long_type(2), ), "int32"], + "trg_slf_attn_post_softmax_shape_delta": [(long_type(4), ), "int32"], + "init_score": [(batch_size, long_type(1)), "float32"], +} + +# Names of word embedding table which might be reused for weight sharing. +word_emb_param_names = ( + "src_word_emb_table", + "trg_word_emb_table", ) +# Names of position encoding table which will be initialized externally. +pos_enc_param_names = ( + "src_pos_enc_table", + "trg_pos_enc_table", ) +# separated inputs for different usages. +encoder_data_input_fields = ( + "src_word", + "src_pos", + "src_slf_attn_bias", ) +decoder_data_input_fields = ( + "trg_word", + "trg_pos", + "trg_slf_attn_bias", + "trg_src_attn_bias", + "enc_output", ) +label_data_input_fields = ( + "lbl_word", + "lbl_weight", ) +# In fast decoder, trg_pos (only containing the current time step) is generated +# by ops and trg_slf_attn_bias is not needed. +fast_decoder_data_input_fields = ( + "trg_word", + "init_score", + "trg_src_attn_bias", ) + +# fast_decoder_util_input_fields = ( +# "trg_slf_attn_pre_softmax_shape_delta", +# "trg_slf_attn_post_softmax_shape_delta", ) + + +#from optim import LearningRateScheduler +class LearningRateScheduler(object): + """ + Wrapper for learning rate scheduling as described in the Transformer paper. + LearningRateScheduler adapts the learning rate externally and the adapted + learning rate will be feeded into the main_program as input data. + """ + + def __init__(self, + d_model, + warmup_steps, + learning_rate=0.001, + current_steps=0, + name="learning_rate"): + self.current_steps = current_steps + self.warmup_steps = warmup_steps + self.d_model = d_model + self.static_lr = learning_rate + self.learning_rate = layers.create_global_var( + name=name, + shape=[1], + value=float(learning_rate), + dtype="float32", + persistable=True) + + def update_learning_rate(self): + self.current_steps += 1 + lr_value = np.power(self.d_model, -0.5) * np.min([ + np.power(self.current_steps, -0.5), + np.power(self.warmup_steps, -1.5) * self.current_steps + ]) * self.static_lr + return np.array([lr_value], dtype="float32") + + +#from transformer_train import train_loop +def pad_batch_data(insts, + pad_idx, + n_head, + is_target=False, + is_label=False, + return_attn_bias=True, + return_max_len=True, + return_num_token=False): """ Pad the instances to the max sequence length in batch, and generate the - corresponding position data and attention bias. Then, convert the numpy - data to tensors and return a dict mapping names to tensors. + corresponding position data and attention bias. """ + return_list = [] + max_len = max(len(inst) for inst in insts) + num_token = reduce(lambda x, y: x + y, + [len(inst) for inst in insts]) if return_num_token else 0 + # Any token included in dict can be used to pad, since the paddings' loss + # will be masked out by weights and make no effect on parameter gradients. + inst_data = np.array( + [inst + [pad_idx] * (max_len - len(inst)) for inst in insts]) + return_list += [inst_data.astype("int64").reshape([-1, 1])] + if is_label: # label weight + inst_weight = np.array( + [[1.] * len(inst) + [0.] * (max_len - len(inst)) for inst in insts]) + return_list += [inst_weight.astype("float32").reshape([-1, 1])] + else: # position data + inst_pos = np.array([ + range(1, len(inst) + 1) + [0] * (max_len - len(inst)) + for inst in insts + ]) + return_list += [inst_pos.astype("int64").reshape([-1, 1])] + if return_attn_bias: + if is_target: + # This is used to avoid attention on paddings and subsequent + # words. + slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, max_len)) + slf_attn_bias_data = np.triu(slf_attn_bias_data, + 1).reshape([-1, 1, max_len, max_len]) + slf_attn_bias_data = np.tile(slf_attn_bias_data, + [1, n_head, 1, 1]) * [-1e9] + else: + # This is used to avoid attention on paddings. + slf_attn_bias_data = np.array([[0] * len(inst) + [-1e9] * + (max_len - len(inst)) + for inst in insts]) + slf_attn_bias_data = np.tile( + slf_attn_bias_data.reshape([-1, 1, 1, max_len]), + [1, n_head, max_len, 1]) + return_list += [slf_attn_bias_data.astype("float32")] + if return_max_len: + return_list += [max_len] + if return_num_token: + return_list += [num_token] + return return_list if len(return_list) > 1 else return_list[0] + + +def prepare_batch_input(insts, data_input_names, src_pad_idx, trg_pad_idx, + n_head, d_model): + """ + Put all padded data needed by training into a dict. + """ + src_word, src_pos, src_slf_attn_bias, src_max_len = pad_batch_data( + [inst[0] for inst in insts], src_pad_idx, n_head, is_target=False) + src_word = src_word.reshape(-1, src_max_len, 1) + src_pos = src_pos.reshape(-1, src_max_len, 1) + trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = pad_batch_data( + [inst[1] for inst in insts], trg_pad_idx, n_head, is_target=True) + trg_word = trg_word.reshape(-1, trg_max_len, 1) + trg_pos = trg_pos.reshape(-1, trg_max_len, 1) - def __pad_batch_data(insts, - pad_idx, - is_target=False, - return_pos=True, - return_attn_bias=True, - return_max_len=True): - """ - Pad the instances to the max sequence length in batch, and generate the - corresponding position data and attention bias. - """ - return_list = [] - max_len = max(len(inst) for inst in insts) - inst_data = np.array( - [inst + [pad_idx] * (max_len - len(inst)) for inst in insts]) - return_list += [inst_data.astype("int64").reshape([-1, 1])] - if return_pos: - inst_pos = np.array([[ - pos_i + 1 if w_i != pad_idx else 0 - for pos_i, w_i in enumerate(inst) - ] for inst in inst_data]) - - return_list += [inst_pos.astype("int64").reshape([-1, 1])] - if return_attn_bias: - if is_target: - # This is used to avoid attention on paddings and subsequent - # words. - slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, - max_len)) - slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape( - [-1, 1, max_len, max_len]) - slf_attn_bias_data = np.tile(slf_attn_bias_data, - [1, n_head, 1, 1]) * [-1e9] - else: - # This is used to avoid attention on paddings. - slf_attn_bias_data = np.array([[0] * len(inst) + [-1e9] * - (max_len - len(inst)) - for inst in insts]) - slf_attn_bias_data = np.tile( - slf_attn_bias_data.reshape([-1, 1, 1, max_len]), - [1, n_head, max_len, 1]) - return_list += [slf_attn_bias_data.astype("float32")] - if return_max_len: - return_list += [max_len] - return return_list if len(return_list) > 1 else return_list[0] - - src_word, src_pos, src_slf_attn_bias, src_max_len = __pad_batch_data( - [inst[0] for inst in insts], src_pad_idx, is_target=False) - trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = __pad_batch_data( - [inst[1] for inst in insts], trg_pad_idx, is_target=True) trg_src_attn_bias = np.tile(src_slf_attn_bias[:, :, ::src_max_len, :], [1, 1, trg_max_len, 1]).astype("float32") - lbl_word = __pad_batch_data([inst[2] for inst in insts], trg_pad_idx, False, - False, False, False) - lbl_weight = (lbl_word != trg_pad_idx).astype("float32").reshape([-1, 1]) + lbl_word, lbl_weight, num_token = pad_batch_data( + [inst[2] for inst in insts], + trg_pad_idx, + n_head, + is_target=False, + is_label=True, + return_attn_bias=False, + return_max_len=False, + return_num_token=True) + + data_input_dict = dict( + zip(data_input_names, [ + src_word, src_pos, src_slf_attn_bias, trg_word, trg_pos, + trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight + ])) + return data_input_dict, np.asarray([num_token], dtype="float32") + + +def read_multiple(reader, count, clip_last=True): + """ + Stack data from reader for multi-devices. + """ + + def __impl__(): + res = [] + for item in reader(): + res.append(item) + if len(res) == count: + yield res + res = [] + if len(res) == count: + yield res + elif not clip_last: + data = [] + for item in res: + data += item + if len(data) > count: + inst_num_per_part = len(data) // count + yield [ + data[inst_num_per_part * i:inst_num_per_part * (i + 1)] + for i in range(count) + ] + + return __impl__ + + +def split_data(data, num_part): + """ + Split data for each device. + """ + if len(data) == num_part: + return data + data = data[0] + inst_num_per_part = len(data) // num_part return [ - src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, - trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight + data[inst_num_per_part * i:inst_num_per_part * (i + 1)] + for i in range(num_part) ] -def transformer(use_feed): - assert not use_feed, "transfomer doesn't support feed yet" - return transformer_model.transformer( - ModelHyperParams.src_vocab_size + 1, - ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1, - ModelHyperParams.n_layer, ModelHyperParams.n_head, - ModelHyperParams.d_key, ModelHyperParams.d_value, - ModelHyperParams.d_model, ModelHyperParams.d_inner_hid, - ModelHyperParams.dropout, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx) +def test_context(train_progm, avg_cost, train_exe, dev_count, data_input_names, + sum_cost, token_num): + # Context to do validation. + test_program = train_progm.clone() + with fluid.program_guard(test_program): + test_program = fluid.io.get_inference_program([avg_cost]) + + val_data = DataReader( + src_vocab_fpath=TrainTaskConfig.src_vocab_fpath, + trg_vocab_fpath=TrainTaskConfig.trg_vocab_fpath, + fpattern=TrainTaskConfig.val_file_pattern, + token_delimiter=TrainTaskConfig.token_delimiter, + use_token_batch=TrainTaskConfig.use_token_batch, + batch_size=TrainTaskConfig.batch_size * + (1 if TrainTaskConfig.use_token_batch else dev_count), + pool_size=TrainTaskConfig.pool_size, + sort_type=TrainTaskConfig.sort_type, + start_mark=TrainTaskConfig.special_token[0], + end_mark=TrainTaskConfig.special_token[1], + unk_mark=TrainTaskConfig.special_token[2], + # count start and end tokens out + max_length=ModelHyperParams.max_length - 2, + clip_last_batch=False, + shuffle=False, + shuffle_batch=False) + + build_strategy = fluid.BuildStrategy() + + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + + test_exe = fluid.ParallelExecutor( + use_cuda=TrainTaskConfig.use_gpu, + main_program=test_program, + share_vars_from=train_exe, + build_strategy=build_strategy, + exec_strategy=strategy) + + def test(exe=test_exe): + test_total_cost = 0 + test_total_token = 0 + test_data = read_multiple( + reader=val_data.batch_generator, + count=dev_count if TrainTaskConfig.use_token_batch else 1) + for batch_id, data in enumerate(test_data()): + feed_list = [] + for place_id, data_buffer in enumerate( + split_data( + data, num_part=dev_count)): + data_input_dict, _ = prepare_batch_input( + data_buffer, data_input_names, ModelHyperParams.eos_idx, + ModelHyperParams.eos_idx, ModelHyperParams.n_head, + ModelHyperParams.d_model) + feed_list.append(data_input_dict) + + outs = exe.run(feed=feed_list, + fetch_list=[sum_cost.name, token_num.name]) + sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) + test_total_cost += sum_cost_val.sum() + test_total_token += token_num_val.sum() + test_avg_cost = test_total_cost / test_total_token + test_ppl = np.exp([min(test_avg_cost, 100)]) + return test_avg_cost, test_ppl + + return test + + +def train_loop(exe, train_progm, dev_count, sum_cost, avg_cost, lr_scheduler, + token_num, predict): + # Initialize the parameters. + if TrainTaskConfig.ckpt_path: + lr_scheduler.current_steps = TrainTaskConfig.start_step + else: + exe.run(fluid.framework.default_startup_program()) + + train_data = DataReader( + src_vocab_fpath=TrainTaskConfig.src_vocab_fpath, + trg_vocab_fpath=TrainTaskConfig.trg_vocab_fpath, + fpattern=TrainTaskConfig.train_file_pattern, + token_delimiter=TrainTaskConfig.token_delimiter, + use_token_batch=TrainTaskConfig.use_token_batch, + batch_size=TrainTaskConfig.batch_size * + (1 if TrainTaskConfig.use_token_batch else dev_count), + pool_size=TrainTaskConfig.pool_size, + sort_type=TrainTaskConfig.sort_type, + shuffle=TrainTaskConfig.shuffle, + shuffle_batch=TrainTaskConfig.shuffle_batch, + start_mark=TrainTaskConfig.special_token[0], + end_mark=TrainTaskConfig.special_token[1], + unk_mark=TrainTaskConfig.special_token[2], + # count start and end tokens out + max_length=ModelHyperParams.max_length - 2, + clip_last_batch=False) + train_data = read_multiple( + reader=train_data.batch_generator, + count=dev_count if TrainTaskConfig.use_token_batch else 1) + + build_strategy = fluid.BuildStrategy() + # Since the token number differs among devices, customize gradient scale to + # use token average cost among multi-devices. and the gradient scale is + # `1 / token_number` for average cost. + build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized + + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + + train_exe = fluid.ParallelExecutor( + use_cuda=TrainTaskConfig.use_gpu, + loss_name=sum_cost.name, + main_program=train_progm, + build_strategy=build_strategy, + exec_strategy=strategy) + + data_input_names = encoder_data_input_fields + decoder_data_input_fields[: + -1] + label_data_input_fields + + if TrainTaskConfig.val_file_pattern is not None: + test = test_context(train_progm, avg_cost, train_exe, dev_count, + data_input_names, sum_cost, token_num) + + # the best cross-entropy value with label smoothing + loss_normalizer = -((1. - TrainTaskConfig.label_smooth_eps) * np.log( + (1. - TrainTaskConfig.label_smooth_eps + )) + TrainTaskConfig.label_smooth_eps * + np.log(TrainTaskConfig.label_smooth_eps / ( + ModelHyperParams.trg_vocab_size - 1) + 1e-20)) + init = False + for pass_id in xrange(TrainTaskConfig.pass_num): + pass_start_time = time.time() + for batch_id, data in enumerate(train_data()): + if batch_id >= 5: + break + + feed_list = [] + total_num_token = 0 + + #if TrainTaskConfig.local: + # lr_rate = lr_scheduler.update_learning_rate() + #for place_id, data_buffer in enumerate( + # split_data( + # data, num_part=dev_count)): + + if TrainTaskConfig.local: + lr_rate = lr_scheduler.update_learning_rate() + + for place_id, data_buffer in enumerate( + split_data( + data, num_part=dev_count)): + data_input_dict, num_token = prepare_batch_input( + data_buffer, data_input_names, ModelHyperParams.eos_idx, + ModelHyperParams.eos_idx, ModelHyperParams.n_head, + ModelHyperParams.d_model) + total_num_token += num_token + feed_kv_pairs = data_input_dict.items() + if TrainTaskConfig.local: + feed_kv_pairs += { + lr_scheduler.learning_rate.name: lr_rate + }.items() + feed_list.append(dict(feed_kv_pairs)) + + if not init: + for pos_enc_param_name in pos_enc_param_names: + pos_enc = position_encoding_init( + ModelHyperParams.max_length + 1, + ModelHyperParams.d_model) + feed_list[place_id][pos_enc_param_name] = pos_enc + + if not TrainTaskConfig.check_acc: + for feed_dict in feed_list: + feed_dict[sum_cost.name + "@GRAD"] = 1. / total_num_token + else: + b = 100 * TrainTaskConfig.batch_size + a = np.asarray([b], dtype="float32") + for feed_dict in feed_list: + feed_dict[sum_cost.name + "@GRAD"] = 1. / a + + outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name], + feed=feed_list) + + sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) + total_sum_cost = sum_cost_val.sum() + total_token_num = token_num_val.sum() + total_avg_cost = total_sum_cost / total_token_num + + init = True + + # Validate and save the model for inference. + if TrainTaskConfig.val_file_pattern is not None: + val_avg_cost, val_ppl = test() + print("[%f]" % val_avg_cost) + else: + assert (False) + + +#import transformer_reader as reader +class SortType(object): + GLOBAL = 'global' + POOL = 'pool' + NONE = "none" + + +class Converter(object): + def __init__(self, vocab, beg, end, unk, delimiter): + self._vocab = vocab + self._beg = beg + self._end = end + self._unk = unk + self._delimiter = delimiter + + def __call__(self, sentence): + return [self._beg] + [ + self._vocab.get(w, self._unk) + for w in sentence.split(self._delimiter) + ] + [self._end] + + +class ComposedConverter(object): + def __init__(self, converters): + self._converters = converters + + def __call__(self, parallel_sentence): + return [ + self._converters[i](parallel_sentence[i]) + for i in range(len(self._converters)) + ] + + +class SentenceBatchCreator(object): + def __init__(self, batch_size): + self.batch = [] + self._batch_size = batch_size + + def append(self, info): + self.batch.append(info) + if len(self.batch) == self._batch_size: + tmp = self.batch + self.batch = [] + return tmp + + +class TokenBatchCreator(object): + def __init__(self, batch_size): + self.batch = [] + self.max_len = -1 + self._batch_size = batch_size + + def append(self, info): + cur_len = info.max_len + max_len = max(self.max_len, cur_len) + if max_len * (len(self.batch) + 1) > self._batch_size: + result = self.batch + self.batch = [info] + self.max_len = cur_len + return result + else: + self.max_len = max_len + self.batch.append(info) + + +class SampleInfo(object): + def __init__(self, i, max_len, min_len): + self.i = i + self.min_len = min_len + self.max_len = max_len + + +class MinMaxFilter(object): + def __init__(self, max_len, min_len, underlying_creator): + self._min_len = min_len + self._max_len = max_len + self._creator = underlying_creator + + def append(self, info): + if info.max_len > self._max_len or info.min_len < self._min_len: + return + else: + return self._creator.append(info) + + @property + def batch(self): + return self._creator.batch + + +class DataReader(object): + """ + The data reader loads all data from files and produces batches of data + in the way corresponding to settings. + + An example of returning a generator producing data batches whose data + is shuffled in each pass and sorted in each pool: + + ``` + train_data = DataReader( + src_vocab_fpath='data/src_vocab_file', + trg_vocab_fpath='data/trg_vocab_file', + fpattern='data/part-*', + use_token_batch=True, + batch_size=2000, + pool_size=10000, + sort_type=SortType.POOL, + shuffle=True, + shuffle_batch=True, + start_mark='', + end_mark='', + unk_mark='', + clip_last_batch=False).batch_generator + ``` + + :param src_vocab_fpath: The path of vocabulary file of source language. + :type src_vocab_fpath: basestring + :param trg_vocab_fpath: The path of vocabulary file of target language. + :type trg_vocab_fpath: basestring + :param fpattern: The pattern to match data files. + :type fpattern: basestring + :param batch_size: The number of sequences contained in a mini-batch. + or the maximum number of tokens (include paddings) contained in a + mini-batch. + :type batch_size: int + :param pool_size: The size of pool buffer. + :type pool_size: int + :param sort_type: The grain to sort by length: 'global' for all + instances; 'pool' for instances in pool; 'none' for no sort. + :type sort_type: basestring + :param clip_last_batch: Whether to clip the last uncompleted batch. + :type clip_last_batch: bool + :param tar_fname: The data file in tar if fpattern matches a tar file. + :type tar_fname: basestring + :param min_length: The minimum length used to filt sequences. + :type min_length: int + :param max_length: The maximum length used to filt sequences. + :type max_length: int + :param shuffle: Whether to shuffle all instances. + :type shuffle: bool + :param shuffle_batch: Whether to shuffle the generated batches. + :type shuffle_batch: bool + :param use_token_batch: Whether to produce batch data according to + token number. + :type use_token_batch: bool + :param field_delimiter: The delimiter used to split source and target in + each line of data file. + :type field_delimiter: basestring + :param token_delimiter: The delimiter used to split tokens in source or + target sentences. + :type token_delimiter: basestring + :param start_mark: The token representing for the beginning of + sentences in dictionary. + :type start_mark: basestring + :param end_mark: The token representing for the end of sentences + in dictionary. + :type end_mark: basestring + :param unk_mark: The token representing for unknown word in dictionary. + :type unk_mark: basestring + :param seed: The seed for random. + :type seed: int + """ + + def __init__(self, + src_vocab_fpath, + trg_vocab_fpath, + fpattern, + batch_size, + pool_size, + sort_type=SortType.GLOBAL, + clip_last_batch=True, + tar_fname=None, + min_length=0, + max_length=100, + shuffle=True, + shuffle_batch=False, + use_token_batch=False, + field_delimiter="\t", + token_delimiter=" ", + start_mark="", + end_mark="", + unk_mark="", + seed=0): + self._src_vocab = self.load_dict(src_vocab_fpath) + self._only_src = True + if trg_vocab_fpath is not None: + self._trg_vocab = self.load_dict(trg_vocab_fpath) + self._only_src = False + self._pool_size = pool_size + self._batch_size = batch_size + self._use_token_batch = use_token_batch + self._sort_type = sort_type + self._clip_last_batch = clip_last_batch + self._shuffle = shuffle + self._shuffle_batch = shuffle_batch + self._min_length = min_length + self._max_length = max_length + self._field_delimiter = field_delimiter + self._token_delimiter = token_delimiter + self.load_src_trg_ids(end_mark, fpattern, start_mark, tar_fname, + unk_mark) + self._random = random.Random(x=seed) + + def load_src_trg_ids(self, end_mark, fpattern, start_mark, tar_fname, + unk_mark): + converters = [ + Converter( + vocab=self._src_vocab, + beg=self._src_vocab[start_mark], + end=self._src_vocab[end_mark], + unk=self._src_vocab[unk_mark], + delimiter=self._token_delimiter) + ] + if not self._only_src: + converters.append( + Converter( + vocab=self._trg_vocab, + beg=self._trg_vocab[start_mark], + end=self._trg_vocab[end_mark], + unk=self._trg_vocab[unk_mark], + delimiter=self._token_delimiter)) + + converters = ComposedConverter(converters) + + self._src_seq_ids = [] + self._trg_seq_ids = None if self._only_src else [] + self._sample_infos = [] + + for i, line in enumerate(self._load_lines(fpattern, tar_fname)): + src_trg_ids = converters(line) + self._src_seq_ids.append(src_trg_ids[0]) + lens = [len(src_trg_ids[0])] + if not self._only_src: + self._trg_seq_ids.append(src_trg_ids[1]) + lens.append(len(src_trg_ids[1])) + self._sample_infos.append(SampleInfo(i, max(lens), min(lens))) + + def _load_lines(self, fpattern, tar_fname): + fpaths = glob.glob(fpattern) + + if len(fpaths) == 1 and tarfile.is_tarfile(fpaths[0]): + if tar_fname is None: + raise Exception("If tar file provided, please set tar_fname.") + + f = tarfile.open(fpaths[0], "r") + for line in f.extractfile(tar_fname): + fields = line.strip("\n").split(self._field_delimiter) + if (not self._only_src and len(fields) == 2) or ( + self._only_src and len(fields) == 1): + yield fields + else: + for fpath in fpaths: + if not os.path.isfile(fpath): + raise IOError("Invalid file: %s" % fpath) + + with open(fpath, "r") as f: + for line in f: + fields = line.strip("\n").split(self._field_delimiter) + if (not self._only_src and len(fields) == 2) or ( + self._only_src and len(fields) == 1): + yield fields + + @staticmethod + def load_dict(dict_path, reverse=False): + word_dict = {} + with open(dict_path, "r") as fdict: + for idx, line in enumerate(fdict): + if reverse: + word_dict[idx] = line.strip("\n") + else: + word_dict[line.strip("\n")] = idx + return word_dict + + def batch_generator(self): + # global sort or global shuffle + if self._sort_type == SortType.GLOBAL: + infos = sorted( + self._sample_infos, key=lambda x: x.max_len, reverse=True) + else: + if self._shuffle: + infos = self._sample_infos + self._random.shuffle(infos) + else: + infos = self._sample_infos + + if self._sort_type == SortType.POOL: + for i in range(0, len(infos), self._pool_size): + infos[i:i + self._pool_size] = sorted( + infos[i:i + self._pool_size], key=lambda x: x.max_len) + + # concat batch + batches = [] + batch_creator = TokenBatchCreator( + self._batch_size + ) if self._use_token_batch else SentenceBatchCreator(self._batch_size) + batch_creator = MinMaxFilter(self._max_length, self._min_length, + batch_creator) + + for info in infos: + batch = batch_creator.append(info) + if batch is not None: + batches.append(batch) + + if not self._clip_last_batch and len(batch_creator.batch) != 0: + batches.append(batch_creator.batch) + + if self._shuffle_batch: + self._random.shuffle(batches) + + for batch in batches: + batch_ids = [info.i for info in batch] + + if self._only_src: + yield [[self._src_seq_ids[idx]] for idx in batch_ids] + else: + yield [(self._src_seq_ids[idx], self._trg_seq_ids[idx][:-1], + self._trg_seq_ids[idx][1:]) for idx in batch_ids] + + +#from transformer_model import transformer +def position_encoding_init(n_position, d_pos_vec): + """ + Generate the initial values for the sinusoid position encoding table. + """ + position_enc = np.array([[ + pos / np.power(10000, 2 * (j // 2) / d_pos_vec) + for j in range(d_pos_vec) + ] if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)]) + position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i + position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1 + return position_enc.astype("float32") + + +def multi_head_attention(queries, + keys, + values, + attn_bias, + d_key, + d_value, + d_model, + n_head=1, + dropout_rate=0., + cache=None): + """ + Multi-Head Attention. Note that attn_bias is added to the logit before + computing softmax activiation to mask certain selected positions so that + they will not considered in attention weights. + """ + if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3): + raise ValueError( + "Inputs: quries, keys and values should all be 3-D tensors.") + + def __compute_qkv(queries, keys, values, n_head, d_key, d_value): + """ + Add linear projection to queries, keys, and values. + """ + q = layers.fc(input=queries, + size=d_key * n_head, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + k = layers.fc(input=keys, + size=d_key * n_head, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + v = layers.fc(input=values, + size=d_value * n_head, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + return q, k, v + + def __split_heads(x, n_head): + """ + Reshape the last dimension of inpunt tensor x so that it becomes two + dimensions and then transpose. Specifically, input a tensor with shape + [bs, max_sequence_length, n_head * hidden_dim] then output a tensor + with shape [bs, n_head, max_sequence_length, hidden_dim]. + """ + if n_head == 1: + return x + + hidden_size = x.shape[-1] + # The value 0 in shape attr means copying the corresponding dimension + # size of the input as the output dimension size. + reshaped = layers.reshape( + x=x, shape=[0, 0, n_head, hidden_size // n_head]) + + # permuate the dimensions into: + # [batch_size, n_head, max_sequence_len, hidden_size_per_head] + return layers.transpose(x=reshaped, perm=[0, 2, 1, 3]) + + def __combine_heads(x): + """ + Transpose and then reshape the last two dimensions of inpunt tensor x + so that it becomes one dimension, which is reverse to __split_heads. + """ + if len(x.shape) == 3: return x + if len(x.shape) != 4: + raise ValueError("Input(x) should be a 4-D Tensor.") + + trans_x = layers.transpose(x, perm=[0, 2, 1, 3]) + # The value 0 in shape attr means copying the corresponding dimension + # size of the input as the output dimension size. + return layers.reshape( + x=trans_x, + shape=map(int, [0, 0, trans_x.shape[2] * trans_x.shape[3]])) + + def scaled_dot_product_attention(q, k, v, attn_bias, d_model, dropout_rate): + """ + Scaled Dot-Product Attention + """ + scaled_q = layers.scale(x=q, scale=d_model**-0.5) + product = layers.matmul(x=scaled_q, y=k, transpose_y=True) + if attn_bias: + product += attn_bias + weights = layers.softmax(product) + if dropout_rate: + weights = layers.dropout( + weights, + dropout_prob=dropout_rate, + seed=ModelHyperParams.dropout_seed, + is_test=False) + out = layers.matmul(weights, v) + return out + + q, k, v = __compute_qkv(queries, keys, values, n_head, d_key, d_value) + + if cache is not None: # use cache and concat time steps + k = cache["k"] = layers.concat([cache["k"], k], axis=1) + v = cache["v"] = layers.concat([cache["v"], v], axis=1) + + q = __split_heads(q, n_head) + k = __split_heads(k, n_head) + v = __split_heads(v, n_head) + + ctx_multiheads = scaled_dot_product_attention(q, k, v, attn_bias, d_model, + dropout_rate) + + out = __combine_heads(ctx_multiheads) + + # Project back to the model size. + proj_out = layers.fc(input=out, + size=d_model, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + return proj_out + + +def positionwise_feed_forward(x, d_inner_hid, d_hid): + """ + Position-wise Feed-Forward Networks. + This module consists of two linear transformations with a ReLU activation + in between, which is applied to each position separately and identically. + """ + hidden = layers.fc(input=x, + size=d_inner_hid, + num_flatten_dims=2, + act="relu", + param_attr=const_para_attr, + bias_attr=const_bias_attr) + out = layers.fc(input=hidden, + size=d_hid, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + return out + + +def pre_post_process_layer(prev_out, out, process_cmd, dropout_rate=0.): + """ + Add residual connection, layer normalization and droput to the out tensor + optionally according to the value of process_cmd. + This will be used before or after multi-head attention and position-wise + feed-forward networks. + """ + for cmd in process_cmd: + if cmd == "a": # add residual connection + out = out + prev_out if prev_out else out + elif cmd == "n": # add layer normalization + out = layers.layer_norm( + out, + begin_norm_axis=len(out.shape) - 1, + param_attr=fluid.initializer.Constant(1.), + bias_attr=fluid.initializer.Constant(0.)) + elif cmd == "d": # add dropout + if dropout_rate: + out = layers.dropout( + out, + dropout_prob=dropout_rate, + seed=ModelHyperParams.dropout_seed, + is_test=False) + return out + + +pre_process_layer = partial(pre_post_process_layer, None) +post_process_layer = pre_post_process_layer + + +def prepare_encoder(src_word, + src_pos, + src_vocab_size, + src_emb_dim, + src_max_len, + dropout_rate=0., + word_emb_param_name=None, + pos_enc_param_name=None): + """Add word embeddings and position encodings. + The output tensor has a shape of: + [batch_size, max_src_length_in_batch, d_model]. + This module is used at the bottom of the encoder stacks. + """ + if TrainTaskConfig.check_acc: + src_word_emb = layers.embedding( + src_word, + size=[src_vocab_size, src_emb_dim], + param_attr=fluid.ParamAttr( + name=word_emb_param_name, + initializer=fluid.initializer.ConstantInitializer(0.001))) + else: + src_word_emb = layers.embedding( + src_word, + size=[src_vocab_size, src_emb_dim], + param_attr=fluid.ParamAttr( + name=word_emb_param_name, + initializer=fluid.initializer.Normal(0., src_emb_dim**-0.5))) + + src_word_emb = layers.scale(x=src_word_emb, scale=src_emb_dim**0.5) + src_pos_enc = layers.embedding( + src_pos, + size=[src_max_len, src_emb_dim], + param_attr=fluid.ParamAttr( + name=pos_enc_param_name, + trainable=False, + initializer=fluid.initializer.ConstantInitializer(0.001))) + enc_input = src_word_emb + src_pos_enc + return layers.dropout( + enc_input, + dropout_prob=dropout_rate, + seed=ModelHyperParams.dropout_seed, + is_test=False) if dropout_rate else enc_input + + +prepare_encoder = partial( + prepare_encoder, pos_enc_param_name=pos_enc_param_names[0]) +prepare_decoder = partial( + prepare_encoder, pos_enc_param_name=pos_enc_param_names[1]) + + +def encoder_layer(enc_input, + attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """The encoder layers that can be stacked to form a deep encoder. + This module consits of a multi-head (self) attention followed by + position-wise feed-forward networks and both the two components companied + with the post_process_layer to add residual connection, layer normalization + and droput. + """ + attn_output = multi_head_attention(enc_input, enc_input, enc_input, + attn_bias, d_key, d_value, d_model, + n_head, dropout_rate) + attn_output = post_process_layer(enc_input, attn_output, "dan", + dropout_rate) + ffd_output = positionwise_feed_forward(attn_output, d_inner_hid, d_model) + return post_process_layer(attn_output, ffd_output, "dan", dropout_rate) + + +def encoder(enc_input, + attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """ + The encoder is composed of a stack of identical layers returned by calling + encoder_layer. + """ + for i in range(n_layer): + enc_output = encoder_layer(enc_input, attn_bias, n_head, d_key, d_value, + d_model, d_inner_hid, dropout_rate) + enc_input = enc_output + return enc_output + + +def decoder_layer(dec_input, + enc_output, + slf_attn_bias, + dec_enc_attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0., + cache=None): + """ The layer to be stacked in decoder part. + The structure of this module is similar to that in the encoder part except + a multi-head attention is added to implement encoder-decoder attention. + """ + slf_attn_output = multi_head_attention( + dec_input, + dec_input, + dec_input, + slf_attn_bias, + d_key, + d_value, + d_model, + n_head, + dropout_rate, + cache, ) + slf_attn_output = post_process_layer( + dec_input, + slf_attn_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + enc_attn_output = multi_head_attention( + slf_attn_output, + enc_output, + enc_output, + dec_enc_attn_bias, + d_key, + d_value, + d_model, + n_head, + dropout_rate, ) + enc_attn_output = post_process_layer( + slf_attn_output, + enc_attn_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + ffd_output = positionwise_feed_forward( + enc_attn_output, + d_inner_hid, + d_model, ) + dec_output = post_process_layer( + enc_attn_output, + ffd_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + return dec_output -def get_model(): - avg_cost = transformer(use_feed=False) - optimizer = fluid.optimizer.Adam() - optimizer.minimize(avg_cost) - fluid.memory_optimize(fluid.default_main_program()) - return avg_cost +def decoder(dec_input, + enc_output, + dec_slf_attn_bias, + dec_enc_attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0., + caches=None): + """ + The decoder is composed of a stack of identical decoder_layer layers. + """ + for i in range(n_layer): + cache = None + if caches is not None: + cache = caches[i] + + dec_output = decoder_layer( + dec_input, + enc_output, + dec_slf_attn_bias, + dec_enc_attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + cache=cache) + dec_input = dec_output + return dec_output + + +def make_all_inputs(input_fields): + """ + Define the input data layers for the transformer model. + """ + inputs = [] + for input_field in input_fields: + input_var = layers.data( + name=input_field, + shape=input_descs[input_field][0], + dtype=input_descs[input_field][1], + lod_level=input_descs[input_field][2] + if len(input_descs[input_field]) == 3 else 0, + append_batch_size=False) + inputs.append(input_var) + return inputs + + +def transformer( + src_vocab_size, + trg_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + label_smooth_eps, ): + if weight_sharing: + assert src_vocab_size == src_vocab_size, ( + "Vocabularies in source and target should be same for weight sharing." + ) + enc_inputs = make_all_inputs(encoder_data_input_fields) + + enc_output = wrap_encoder( + src_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + enc_inputs, ) + + dec_inputs = make_all_inputs(decoder_data_input_fields[:-1]) + + predict = wrap_decoder( + trg_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + dec_inputs, + enc_output, ) + + # Padding index do not contribute to the total loss. The weights is used to + # cancel padding index in calculating the loss. + label, weights = make_all_inputs(label_data_input_fields) + if label_smooth_eps: + label = layers.label_smooth( + label=layers.one_hot( + input=label, depth=trg_vocab_size), + epsilon=label_smooth_eps) + + cost = layers.softmax_with_cross_entropy( + logits=layers.reshape( + predict, shape=[-1, trg_vocab_size]), + label=label, + soft_label=True if label_smooth_eps else False) + weighted_cost = cost * weights + sum_cost = layers.reduce_sum(weighted_cost) + token_num = layers.reduce_sum(weights) + avg_cost = sum_cost / token_num + avg_cost.stop_gradient = True + return sum_cost, avg_cost, predict, token_num + + +def wrap_encoder(src_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + enc_inputs=None): + """ + The wrapper assembles together all needed layers for the encoder. + """ + if enc_inputs is None: + # This is used to implement independent encoder program in inference. + src_word, src_pos, src_slf_attn_bias = \ + make_all_inputs(encoder_data_input_fields) + else: + src_word, src_pos, src_slf_attn_bias = \ + enc_inputs + enc_input = prepare_encoder( + src_word, + src_pos, + src_vocab_size, + d_model, + max_length, + dropout_rate, + word_emb_param_name=word_emb_param_names[0]) + enc_output = encoder(enc_input, src_slf_attn_bias, n_layer, n_head, d_key, + d_value, d_model, d_inner_hid, dropout_rate) + return enc_output + + +def wrap_decoder(trg_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + dec_inputs=None, + enc_output=None, + caches=None): + """ + The wrapper assembles together all needed layers for the decoder. + """ + if dec_inputs is None: + # This is used to implement independent decoder program in inference. + trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias, \ + enc_output = make_all_inputs( + decoder_data_input_fields + decoder_util_input_fields) + else: + trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias = dec_inputs + + dec_input = prepare_decoder( + trg_word, + trg_pos, + trg_vocab_size, + d_model, + max_length, + dropout_rate, + word_emb_param_name=word_emb_param_names[0] + if weight_sharing else word_emb_param_names[1]) + dec_output = decoder( + dec_input, + enc_output, + trg_slf_attn_bias, + trg_src_attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + caches=caches) + # Return logits for training and probs for inference. + if weight_sharing: + predict = layers.matmul( + x=dec_output, + y=fluid.get_var(word_emb_param_names[0]), + transpose_y=True) + else: + predict = layers.fc(input=dec_output, + size=trg_vocab_size, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + if dec_inputs is None: + predict = layers.softmax(predict) + return predict + + +def fast_decode( + src_vocab_size, + trg_vocab_size, + max_in_len, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + beam_size, + max_out_len, + eos_idx, ): + """ + Use beam search to decode. Caches will be used to store states of history + steps which can make the decoding faster. + """ + enc_output = wrap_encoder(src_vocab_size, max_in_len, n_layer, n_head, + d_key, d_value, d_model, d_inner_hid, + dropout_rate, weight_sharing) + start_tokens, init_scores, trg_src_attn_bias = \ + make_all_inputs(fast_decoder_data_input_fields ) + + def beam_search(): + max_len = layers.fill_constant( + shape=[1], dtype=start_tokens.dtype, value=max_out_len) + step_idx = layers.fill_constant( + shape=[1], dtype=start_tokens.dtype, value=0) + cond = layers.less_than(x=step_idx, y=max_len) + while_op = layers.While(cond) + # array states will be stored for each step. + ids = layers.array_write( + layers.reshape(start_tokens, (-1, 1)), step_idx) + scores = layers.array_write(init_scores, step_idx) + # cell states will be overwrited at each step. + # caches contains states of history steps to reduce redundant + # computation in decoder. + caches = [{ + "k": layers.fill_constant_batch_size_like( + input=start_tokens, + shape=[-1, 0, d_model], + dtype=enc_output.dtype, + value=0), + "v": layers.fill_constant_batch_size_like( + input=start_tokens, + shape=[-1, 0, d_model], + dtype=enc_output.dtype, + value=0) + } for i in range(n_layer)] + with while_op.block(): + pre_ids = layers.array_read(array=ids, i=step_idx) + pre_ids = layers.reshape(pre_ids, (-1, 1, 1)) + pre_scores = layers.array_read(array=scores, i=step_idx) + # sequence_expand can gather sequences according to lod thus can be + # used in beam search to sift states corresponding to selected ids. + pre_src_attn_bias = layers.sequence_expand( + x=trg_src_attn_bias, y=pre_scores) + pre_enc_output = layers.sequence_expand(x=enc_output, y=pre_scores) + pre_caches = [{ + "k": layers.sequence_expand( + x=cache["k"], y=pre_scores), + "v": layers.sequence_expand( + x=cache["v"], y=pre_scores), + } for cache in caches] + pre_pos = layers.elementwise_mul( + x=layers.fill_constant_batch_size_like( + input=pre_enc_output, # cann't use pre_ids here since it has lod + value=1, + shape=[-1, 1, 1], + dtype=pre_ids.dtype), + y=layers.increment( + x=step_idx, value=1.0, in_place=False), + axis=0) + logits = wrap_decoder( + trg_vocab_size, + max_in_len, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + dec_inputs=(pre_ids, pre_pos, None, pre_src_attn_bias), + enc_output=pre_enc_output, + caches=pre_caches) + logits = layers.reshape(logits, (-1, trg_vocab_size)) + + topk_scores, topk_indices = layers.topk( + input=layers.softmax(logits), k=beam_size) + accu_scores = layers.elementwise_add( + x=layers.log(topk_scores), + y=layers.reshape( + pre_scores, shape=[-1]), + axis=0) + # beam_search op uses lod to distinguish branches. + topk_indices = layers.lod_reset(topk_indices, pre_ids) + selected_ids, selected_scores = layers.beam_search( + pre_ids=pre_ids, + pre_scores=pre_scores, + ids=topk_indices, + scores=accu_scores, + beam_size=beam_size, + end_id=eos_idx) + + layers.increment(x=step_idx, value=1.0, in_place=True) + # update states + layers.array_write(selected_ids, i=step_idx, array=ids) + layers.array_write(selected_scores, i=step_idx, array=scores) + layers.assign(pre_src_attn_bias, trg_src_attn_bias) + layers.assign(pre_enc_output, enc_output) + for i in range(n_layer): + layers.assign(pre_caches[i]["k"], caches[i]["k"]) + layers.assign(pre_caches[i]["v"], caches[i]["v"]) + length_cond = layers.less_than(x=step_idx, y=max_len) + finish_cond = layers.logical_not(layers.is_empty(x=selected_ids)) + layers.logical_and(x=length_cond, y=finish_cond, out=cond) + + finished_ids, finished_scores = layers.beam_search_decode( + ids, scores, beam_size=beam_size, end_id=eos_idx) + return finished_ids, finished_scores + + finished_ids, finished_scores = beam_search() + return finished_ids, finished_scores + + +def get_model(is_dist, is_async): + sum_cost, avg_cost, predict, token_num = transformer( + ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size, + ModelHyperParams.max_length + 1, ModelHyperParams.n_layer, + ModelHyperParams.n_head, ModelHyperParams.d_key, + ModelHyperParams.d_value, ModelHyperParams.d_model, + ModelHyperParams.d_inner_hid, ModelHyperParams.dropout, + ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps) + + local_lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model, + TrainTaskConfig.warmup_steps, + TrainTaskConfig.learning_rate) + + if not is_dist: + optimizer = fluid.optimizer.Adam( + learning_rate=local_lr_scheduler.learning_rate, + beta1=TrainTaskConfig.beta1, + beta2=TrainTaskConfig.beta2, + epsilon=TrainTaskConfig.eps) + optimizer.minimize(sum_cost) + elif is_async: + optimizer = fluid.optimizer.SGD(0.003) + optimizer.minimize(sum_cost) + else: + lr_decay = fluid.layers\ + .learning_rate_scheduler\ + .noam_decay(ModelHyperParams.d_model, + TrainTaskConfig.warmup_steps) + + optimizer = fluid.optimizer.Adam( + learning_rate=lr_decay, + beta1=TrainTaskConfig.beta1, + beta2=TrainTaskConfig.beta2, + epsilon=TrainTaskConfig.eps) + optimizer.minimize(sum_cost) + + return sum_cost, avg_cost, predict, token_num, local_lr_scheduler def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): @@ -176,10 +1677,23 @@ def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): return t -class DistTransformer2x2(object): +def update_args(): + src_dict = DataReader.load_dict(TrainTaskConfig.src_vocab_fpath) + trg_dict = DataReader.load_dict(TrainTaskConfig.trg_vocab_fpath) + dict_args = [ + "src_vocab_size", str(len(src_dict)), "trg_vocab_size", + str(len(trg_dict)), "bos_idx", + str(src_dict[TrainTaskConfig.special_token[0]]), "eos_idx", + str(src_dict[TrainTaskConfig.special_token[1]]), "unk_idx", + str(src_dict[TrainTaskConfig.special_token[2]]) + ] + merge_cfg_from_list(dict_args, [TrainTaskConfig, ModelHyperParams]) + + +class DistTransformer2x2(TestDistRunnerBase): def run_pserver(self, pserver_endpoints, trainers, current_endpoint, - trainer_id): - get_model() + trainer_id, sync_mode): + get_model(True, not sync_mode) t = get_transpiler(trainer_id, fluid.default_main_program(), pserver_endpoints, trainers) @@ -196,7 +1710,6 @@ class DistTransformer2x2(object): while True: assert retry_times >= 0, "wait ps ready failed" time.sleep(3) - print("waiting ps ready: ", pid) try: # the listen_and_serv_op would touch a file which contains the listen port # on the /tmp directory until it was ready to process all the RPC call. @@ -205,63 +1718,35 @@ class DistTransformer2x2(object): except os.error: retry_times -= 1 - def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): - avg_cost = get_model() + def run_trainer(self, + place, + endpoints, + trainer_id, + trainers, + is_dist=True, + sync_mode=True): + + sum_cost, avg_cost, predict, token_num, local_lr_scheduler = get_model( + is_dist, not sync_mode) + if is_dist: t = get_transpiler(trainer_id, fluid.default_main_program(), endpoints, trainers) trainer_prog = t.get_trainer_program() + TrainTaskConfig.batch_size = 10 + TrainTaskConfig.train_file_pattern = TrainTaskConfig.data_path + "train.tok.clean.bpe.32000.en-de.train_{}".format( + trainer_id) else: + TrainTaskConfig.batch_size = 20 trainer_prog = fluid.default_main_program() startup_exe = fluid.Executor(place) - startup_exe.run(fluid.default_startup_program()) - - strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 - strategy.allow_op_delay = False - exe = fluid.ParallelExecutor( - True, loss_name=avg_cost.name, exec_strategy=strategy) - - first_loss, = exe.run(fetch_list=[avg_cost.name]) - print(first_loss) - for i in six.moves.xrange(5): - _ = exe.run(fetch_list=[avg_cost.name]) - last_loss, = exe.run(fetch_list=[avg_cost.name]) - print(last_loss) - - -def main(role="pserver", - endpoints="127.0.0.1:9123", - trainer_id=0, - current_endpoint="127.0.0.1:9123", - trainers=1, - is_dist=True): - - reader = paddle.batch( - wmt16.train(ModelHyperParams.src_vocab_size, - ModelHyperParams.trg_vocab_size), - batch_size=transformer_model.batch_size) - - with fluid.recordio_writer.create_recordio_writer( - WMT16_RECORDIO_FILE) as writer: - for batch in reader(): - for tensor in prepare_batch_input( - batch, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head): - t = fluid.LoDTensor() - t.set(tensor, fluid.CPUPlace()) - writer.append_tensor(t) - writer.complete_append_tensor() - - model = DistTransformer2x2() - if role == "pserver": - model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) - else: - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) + + TrainTaskConfig.local = not is_dist + + train_loop(startup_exe, trainer_prog, 1, sum_cost, avg_cost, + local_lr_scheduler, token_num, predict) if __name__ == "__main__": @@ -269,18 +1754,6 @@ if __name__ == "__main__": print( "Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]" ) - role = sys.argv[1] - endpoints = sys.argv[2] - trainer_id = int(sys.argv[3]) - current_endpoint = sys.argv[4] - trainers = int(sys.argv[5]) - is_dist = True if sys.argv[6] == "TRUE" else False - # FIXME(typhoonzero): refine this test. - is_async = True if sys.argv[7] == "TRUE" else False - main( - role=role, - endpoints=endpoints, - trainer_id=trainer_id, - current_endpoint=current_endpoint, - trainers=trainers, - is_dist=is_dist) + + update_args() + runtime_main(DistTransformer2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_transformer.py b/python/paddle/fluid/tests/unittests/test_dist_transformer.py index 62fcf5953f93637a20beed649de21476a8673419..a8e6ce4cfe18384e405f1602429628914d2c2e00 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transformer.py @@ -15,17 +15,55 @@ from __future__ import print_function import unittest +import paddle from test_dist_base import TestDistBase -class TestDistTransformer2x2(TestDistBase): +def download_files(): + url_prefix = 'http://paddle-unittest-data.cdn.bcebos.com/dist_transformer/' + vocab_url = url_prefix + 'vocab.bpe.32000' + vocab_md5 = 'a86d345ca6e27f6591d0dccb1b9be853' + paddle.dataset.common.download(vocab_url, 'test_dist_transformer', + vocab_md5) + + local_train_url = url_prefix + 'train.tok.clean.bpe.32000.en-de' + local_train_md5 = '033eb02b9449e6dd823f050782ac8914' + paddle.dataset.common.download(local_train_url, 'test_dist_transformer', + local_train_md5) + + train0_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_0' + train0_md5 = 'ddce7f602f352a0405267285379a38b1' + paddle.dataset.common.download(train0_url, 'test_dist_transformer', + train0_md5) + + train1_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_1' + train1_md5 = '8757798200180285b1a619cd7f408747' + paddle.dataset.common.download(train1_url, 'test_dist_transformer', + train1_md5) + + test_url = url_prefix + 'newstest2013.tok.bpe.32000.en-de' + test_md5 = '9dd74a266dbdb25314183899f269b4a2' + paddle.dataset.common.download(test_url, 'test_dist_transformer', test_md5) + + +class TestDistTransformer2x2Sync(TestDistBase): def _setup_config(self): self._sync_mode = True def test_transformer(self): - # TODO(paddle-dev): check if the delta is OK. - # Usually start around ~8000 and converge to ~5000 - self.check_with_place("dist_transformer.py", delta=400) + download_files() + #Note: loss on test dataset of the first 5 batch are: + # 10.518872, 10.518871, 10.518868, 10.518862, 10.518855 + self.check_with_place("dist_transformer.py", delta=1e-7) + + +class TestDistTransformer2x2Async(TestDistBase): + def _setup_config(self): + self._sync_mode = False + + def test_transformer(self): + download_files() + self.check_with_place("dist_transformer.py", delta=1.0) if __name__ == "__main__":