提交 4f422f02 编写于 作者: G Guo Sheng 提交者: gongweibao

Support pyreader and data feeding in Transformer (#1239)

上级 d1e78e57
#!/bin/bash
DATA_PATH=$HOME/.cache/paddle/dataset/wmt16
if [ ! -d $DATA_PATH/en_10000.dict ] ; then
if [ ! -e $DATA_PATH/en_10000.dict ] ; then
python -c 'import paddle;paddle.dataset.wmt16.train(10000, 10000, "en")().next()'
tar -zxf $DATA_PATH/wmt16.tar.gz -C $DATA_PATH
fi
......
......@@ -63,7 +63,7 @@ WMT 数据集是机器翻译领域公认的主流数据集;WMT 英德和英法
#### WMT 英德翻译数据
[WMT'16 EN-DE 数据集](http://www.statmt.org/wmt16/translation-task.html)是一个中等规模的数据集。参照论文,英德数据集我们使用 BPE 编码的数据,这能够更好的解决未登录词(out-of-vocabulary,OOV)的问题[4]。用到的 BPE 数据可以参照[这里](https://github.com/google/seq2seq/blob/master/docs/data.md)进行下载(如果希望在自定义数据中使用 BPE 编码,可以参照[这里](https://github.com/rsennrich/subword-nmt)进行预处理),下载后解压,其中 `train.tok.clean.bpe.32000.en``train.tok.clean.bpe.32000.de` 为使用 BPE 的训练数据(平行语料,分别对应了英语和德语,经过了 tokenize 和 BPE 的处理),`newstest2013.tok.bpe.32000.en``newstest2013.tok.bpe.32000.de` 等为测试数据(`newstest2013.tok.en``newstest2013.tok.de` 等则为对应的未使用 BPE 的测试数据),`vocab.bpe.32000` 为相应的词典文件(源语言和目标语言共享该词典文件)。
[WMT'16 EN-DE 数据集](http://www.statmt.org/wmt16/translation-task.html)是一个中等规模的数据集。参照论文,英德数据集我们使用 BPE 编码的数据,这能够更好的解决未登录词(out-of-vocabulary,OOV)的问题[4]。用到的 BPE 数据可以参照[这里](https://github.com/google/seq2seq/blob/master/docs/data.md)进行下载(如果希望在自定义数据中使用 BPE 编码,可以参照[这里](https://github.com/rsennrich/subword-nmt)进行预处理),下载后解压,其中 `train.tok.clean.bpe.32000.en``train.tok.clean.bpe.32000.de` 为使用 BPE 的训练数据(平行语料,分别对应了英语和德语,经过了 tokenize 和 BPE 的处理),`newstest2016.tok.bpe.32000.en``newstest2016.tok.bpe.32000.de` 等为测试数据(`newstest2016.tok.en``newstest2016.tok.de` 等则为对应的未使用 BPE 的测试数据),`vocab.bpe.32000` 为相应的词典文件(源语言和目标语言共享该词典文件)。
由于本示例中的数据读取脚本 `reader.py` 默认使用的样本数据的格式为 `\t` 分隔的的源语言和目标语言句子对(默认句子中的词之间使用空格分隔),因此需要将源语言到目标语言的平行语料库文件合并为一个文件,可以执行以下命令进行合并:
```sh
......@@ -91,7 +91,7 @@ python -u train.py \
--train_file_pattern data/train.tok.clean.bpe.32000.en-de \
--token_delimiter ' ' \
--use_token_batch True \
--batch_size 3200 \
--batch_size 4096 \
--sort_type pool \
--pool_size 200000
```
......@@ -100,7 +100,7 @@ python -u train.py \
python train.py --help
```
更多模型训练相关的参数则在 `config.py` 中的 `ModelHyperParams``TrainTaskConfig` 内定义;`ModelHyperParams` 定义了 embedding 维度等模型超参数,`TrainTaskConfig` 定义了 warmup 步数等训练需要的参数。这些参数默认使用了 Transformer 论文中 base model 的配置,如需调整可以在该脚本中进行修改。另外这些参数同样可在执行训练脚本的命令行中设置,传入的配置会合并并覆盖 `config.py` 中的配置,如可以通过以下命令来训练 Transformer 论文中的 big model :
更多模型训练相关的参数则在 `config.py` 中的 `ModelHyperParams``TrainTaskConfig` 内定义;`ModelHyperParams` 定义了 embedding 维度等模型超参数,`TrainTaskConfig` 定义了 warmup 步数等训练需要的参数。这些参数默认使用了 Transformer 论文中 base model 的配置,如需调整可以在该脚本中进行修改。另外这些参数同样可在执行训练脚本的命令行中设置,传入的配置会合并并覆盖 `config.py` 中的配置,如可以通过以下命令来训练 Transformer 论文中的 big model (如显存不够可适当减小 batch size 的值)
```sh
python -u train.py \
......@@ -117,22 +117,23 @@ python -u train.py \
n_head 16 \
d_model 1024 \
d_inner_hid 4096 \
dropout 0.3
n_head 16 \
prepostprocess_dropout 0.3
```
有关这些参数更详细信息的请参考 `config.py` 中的注释说明。对于英法翻译数据,执行训练和英德翻译训练类似,修改命令中的词典和数据文件为英法数据相应文件的路径,另外要注意的是由于英法翻译数据 token 间不是使用空格进行分隔,需要修改 `token_delimiter` 参数的设置为 `--token_delimiter '\x01'`
训练时默认使用所有 GPU,可以通过 `CUDA_VISIBLE_DEVICES` 环境变量来设置使用的 GPU 数目。也可以只使用 CPU 训练(通过参数 `--divice CPU` 设置),训练速度相对较慢。在训练过程中,每个 epoch 结束后将保存模型到参数 `model_dir` 指定的目录,每个 epoch 内也会每隔1000个 iteration 进行一次保存,每个 iteration 将打印如下的日志到标准输出:
训练时默认使用所有 GPU,可以通过 `CUDA_VISIBLE_DEVICES` 环境变量来设置使用的 GPU 数目。也可以只使用 CPU 训练(通过参数 `--divice CPU` 设置),训练速度相对较慢。在训练过程中,每隔一定 iteration 后(通过参数 `save_freq` 设置,默认为10000)保存模型到参数 `model_dir` 指定的目录,每个 epoch 结束后也会保存 checkpiont 到 `ckpt_dir` 指定的目录,每个 iteration 将打印如下的日志到标准输出:
```txt
epoch: 0, batch: 0, sum loss: 258793.343750, avg loss: 11.069005, ppl: 64151.644531
epoch: 0, batch: 1, sum loss: 256140.718750, avg loss: 11.059616, ppl: 63552.148438
epoch: 0, batch: 2, sum loss: 258931.093750, avg loss: 11.064013, ppl: 63832.167969
epoch: 0, batch: 3, sum loss: 256837.875000, avg loss: 11.058206, ppl: 63462.574219
epoch: 0, batch: 4, sum loss: 256461.000000, avg loss: 11.053401, ppl: 63158.390625
epoch: 0, batch: 5, sum loss: 257064.562500, avg loss: 11.019099, ppl: 61028.683594
epoch: 0, batch: 6, sum loss: 256180.125000, avg loss: 11.008556, ppl: 60388.644531
epoch: 0, batch: 7, sum loss: 256619.671875, avg loss: 11.007106, ppl: 60301.113281
epoch: 0, batch: 8, sum loss: 255716.734375, avg loss: 10.966025, ppl: 57874.105469
epoch: 0, batch: 9, sum loss: 245157.500000, avg loss: 10.966562, ppl: 57905.187500
step_idx: 0, epoch: 0, batch: 0, avg loss: 11.059394, normalized loss: 9.682427, ppl: 63538.027344
step_idx: 1, epoch: 0, batch: 1, avg loss: 11.053112, normalized loss: 9.676146, ppl: 63140.144531
step_idx: 2, epoch: 0, batch: 2, avg loss: 11.054576, normalized loss: 9.677609, ppl: 63232.640625
step_idx: 3, epoch: 0, batch: 3, avg loss: 11.046638, normalized loss: 9.669671, ppl: 62732.664062
step_idx: 4, epoch: 0, batch: 4, avg loss: 11.030095, normalized loss: 9.653129, ppl: 61703.449219
step_idx: 5, epoch: 0, batch: 5, avg loss: 11.047491, normalized loss: 9.670525, ppl: 62786.230469
step_idx: 6, epoch: 0, batch: 6, avg loss: 11.044509, normalized loss: 9.667542, ppl: 62599.273438
step_idx: 7, epoch: 0, batch: 7, avg loss: 11.011090, normalized loss: 9.634124, ppl: 60541.859375
step_idx: 8, epoch: 0, batch: 8, avg loss: 10.985243, normalized loss: 9.608276, ppl: 58997.058594
step_idx: 9, epoch: 0, batch: 9, avg loss: 10.993434, normalized loss: 9.616467, ppl: 59482.292969
```
### 模型预测
......@@ -143,19 +144,19 @@ python -u infer.py \
--src_vocab_fpath data/vocab.bpe.32000 \
--trg_vocab_fpath data/vocab.bpe.32000 \
--special_token '<s>' '<e>' '<unk>' \
--test_file_pattern data/newstest2013.tok.bpe.32000.en-de \
--test_file_pattern data/newstest2016.tok.bpe.32000.en-de \
--use_wordpiece False \
--token_delimiter ' ' \
--batch_size 4 \
model_path trained_models/pass_20.infer.model \
beam_size 5 \
max_out_len 256
--batch_size 32 \
model_path trained_models/iter_199999.infer.model \
beam_size 4 \
max_out_len 255
```
和模型训练时类似,预测时也需要设置数据和 reader 相关的参数,并可以执行 `python infer.py --help` 查看这些参数的说明(部分参数意义和训练时略有不同);同样可以在预测命令中设置模型超参数,但应与模型训练时的设置一致;此外相比于模型训练,预测时还有一些额外的参数,如需要设置 `model_path` 来给出模型所在目录,可以设置 `beam_size``max_out_len` 来指定 Beam Search 算法的搜索宽度和最大深度(翻译长度),这些参数也可以在 `config.py` 中的 `InferTaskConfig` 内查阅注释说明并进行更改设置。
执行以上预测命令会打印翻译结果到标准输出,每行输出是对应行输入的得分最高的翻译。对于使用 BPE 的英德数据,预测出的翻译结果也将是 BPE 表示的数据,要还原成原始的数据(这里指 tokenize 后的数据)才能进行正确的评估,可以使用以下命令来恢复 `predict.txt` 内的翻译结果到 `predict.tok.txt` 中(无需再次 tokenize 处理):
```sh
sed 's/@@ //g' predict.txt > predict.tok.txt
sed -r 's/(@@ )|(@@ ?$)//g' predict.txt > predict.tok.txt
```
对于英法翻译的 wordpiece 数据,执行预测和英德翻译预测类似,修改命令中的词典和数据文件为英法数据相应文件的路径,另外需要注意修改 `token_delimiter` 参数的设置为 `--token_delimiter '\x01'`;同时要修改 `use_wordpiece` 参数的设置为 `--use_wordpiece True`,这会在预测时将翻译得到的 wordpiece 数据还原为原始数据输出。为了使用 tokenize 的数据进行评估,还需要对翻译结果进行 tokenize 的处理,[Moses](https://github.com/moses-smt/mosesdecoder) 提供了一系列机器翻译相关的脚本。执行 `git clone https://github.com/moses-smt/mosesdecoder.git` 克隆 mosesdecoder 仓库后,可以使用其中的 `tokenizer.perl` 脚本对 `predict.txt` 内的翻译结果进行 tokenize 处理并输出到 `predict.tok.txt` 中,如下:
......@@ -163,15 +164,21 @@ sed 's/@@ //g' predict.txt > predict.tok.txt
perl mosesdecoder/scripts/tokenizer/tokenizer.perl -l fr < predict.txt > predict.tok.txt
```
接下来就可以使用参考翻译对翻译结果进行 BLEU 指标的评估了。计算 BLEU 值的脚本也在 Moses 中包含,以英德翻译 `newstest2013.tok.de` 数据为例,执行如下命令:
接下来就可以使用参考翻译对翻译结果进行 BLEU 指标的评估了。计算 BLEU 值的脚本也在 Moses 中包含,以英德翻译 `newstest2016.tok.de` 数据为例,执行如下命令:
```sh
perl mosesdecoder/scripts/generic/multi-bleu.perl data/newstest2013.tok.de < predict.tok.txt
perl mosesdecoder/scripts/generic/multi-bleu.perl data/newstest2016.tok.de < predict.tok.txt
```
可以看到类似如下的结果。
可以看到类似如下的结果(为单机两卡训练 200K 个 iteration 后模型的预测结果)
```
BLEU = 25.08, 58.3/31.5/19.6/12.6 (BP=0.966, ratio=0.967, hyp_len=61321, ref_len=63412)
BLEU = 33.08, 64.2/39.2/26.4/18.5 (BP=0.994, ratio=0.994, hyp_len=61971, ref_len=62362)
```
目前在未使用 model average 的情况下,使用默认配置单机八卡(同论文中 base model 的配置)进行训练,英德翻译在 `newstest2013` 上测试 BLEU 值为25.,在 `newstest2014` 上测试 BLEU 值为26.;英法翻译在 `newstest2014` 上测试 BLEU 值为36.。
目前在未使用 model average 的情况下,英德翻译 base model 八卡训练 100K 个 iteration 后测试 BLEU 值如下:
| 测试集 | newstest2013 | newstest2014 | newstest2015 | newstest2016 |
|-|-|-|-|-|
| BLEU | 25.27 | 26.05 | 28.75 | 33.27 |
英法翻译 base model 八卡训练 100K 个 iteration 后在 `newstest2014` 上测试 BLEU 值为36.。
### 分布式训练
......
......@@ -9,12 +9,12 @@ class TrainTaskConfig(object):
# the hyper parameters for Adam optimizer.
# This static learning_rate will be multiplied to the LearningRateScheduler
# derived learning rate the to get the final learning rate.
learning_rate = 1
learning_rate = 2.0
beta1 = 0.9
beta2 = 0.98
beta2 = 0.997
eps = 1e-9
# the parameters for learning rate scheduling.
warmup_steps = 4000
warmup_steps = 8000
# the weight used to mix up the ground-truth distribution and the fixed
# uniform distribution in label smoothing when training.
# Set this as zero if label smoothing is not wanted.
......@@ -30,6 +30,8 @@ class TrainTaskConfig(object):
# It should be provided if use checkpoints, since the checkpoint doesn't
# include the training step counter currently.
start_step = 0
# the frequency to save trained models.
save_freq = 10000
class InferTaskConfig(object):
......@@ -63,7 +65,6 @@ class ModelHyperParams(object):
# index for <unk> token
unk_idx = 2
# max length of sequences deciding the size of position encoding table.
# Start from 1 and count start and end tokens in.
max_length = 256
# the dimension for word embeddings, which is also the last dimension of
# the input and output of multi-head attention, position-wise feed-forward
......@@ -79,8 +80,14 @@ class ModelHyperParams(object):
n_head = 8
# number of sub-layers to be stacked in the encoder and decoder.
n_layer = 6
# dropout rate used by all dropout layers.
dropout = 0.1
# dropout rates of different modules.
prepostprocess_dropout = 0.1
attention_dropout = 0.1
relu_dropout = 0.1
# to process before each sub-layer
preprocess_cmd = "n" # layer normalization
# to process after each sub-layer
postprocess_cmd = "da" # dropout + residual connection
# random seed used in dropout for CE.
dropout_seed = None
# the flag indicating whether to share embedding and softmax weights.
......
......@@ -156,7 +156,9 @@ def fast_infer(test_data, trg_idx2word, use_wordpiece):
ModelHyperParams.max_length + 1, ModelHyperParams.n_layer,
ModelHyperParams.n_head, ModelHyperParams.d_key,
ModelHyperParams.d_value, ModelHyperParams.d_model,
ModelHyperParams.d_inner_hid, ModelHyperParams.dropout,
ModelHyperParams.d_inner_hid, ModelHyperParams.prepostprocess_dropout,
ModelHyperParams.attention_dropout, ModelHyperParams.relu_dropout,
ModelHyperParams.preprocess_cmd, ModelHyperParams.postprocess_cmd,
ModelHyperParams.weight_sharing, InferTaskConfig.beam_size,
InferTaskConfig.max_out_len, ModelHyperParams.eos_idx)
......
......@@ -11,12 +11,18 @@ def position_encoding_init(n_position, d_pos_vec):
"""
Generate the initial values for the sinusoid position encoding table.
"""
position_enc = np.array([[
pos / np.power(10000, 2. * (j // 2) / d_pos_vec)
for j in range(d_pos_vec)
] if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)])
position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i
position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1
channels = d_pos_vec
position = np.arange(n_position)
num_timescales = channels // 2
log_timescale_increment = (np.log(float(1e4) / float(1)) /
(num_timescales - 1))
inv_timescales = np.exp(np.arange(
num_timescales)) * -log_timescale_increment
scaled_time = np.expand_dims(position, 1) * np.expand_dims(inv_timescales,
0)
signal = np.concatenate([np.sin(scaled_time), np.cos(scaled_time)], axis=1)
signal = np.pad(signal, [[0, 0], [0, np.mod(channels, 2)]], 'constant')
position_enc = signal
return position_enc.astype("float32")
......@@ -35,6 +41,9 @@ def multi_head_attention(queries,
computing softmax activiation to mask certain selected positions so that
they will not considered in attention weights.
"""
keys = queries if keys is None else keys
values = keys if values is None else values
if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3):
raise ValueError(
"Inputs: quries, keys and values should all be 3-D tensors.")
......@@ -92,11 +101,11 @@ def multi_head_attention(queries,
return layers.reshape(
x=trans_x, shape=[0, 0, trans_x.shape[2] * trans_x.shape[3]])
def scaled_dot_product_attention(q, k, v, attn_bias, d_model, dropout_rate):
def scaled_dot_product_attention(q, k, v, attn_bias, d_key, dropout_rate):
"""
Scaled Dot-Product Attention
"""
scaled_q = layers.scale(x=q, scale=d_model**-0.5)
scaled_q = layers.scale(x=q, scale=d_key**-0.5)
product = layers.matmul(x=scaled_q, y=k, transpose_y=True)
if attn_bias:
product += attn_bias
......@@ -133,7 +142,7 @@ def multi_head_attention(queries,
return proj_out
def positionwise_feed_forward(x, d_inner_hid, d_hid):
def positionwise_feed_forward(x, d_inner_hid, d_hid, dropout_rate):
"""
Position-wise Feed-Forward Networks.
This module consists of two linear transformations with a ReLU activation
......@@ -143,6 +152,12 @@ def positionwise_feed_forward(x, d_inner_hid, d_hid):
size=d_inner_hid,
num_flatten_dims=2,
act="relu")
if dropout_rate:
hidden = layers.dropout(
hidden,
dropout_prob=dropout_rate,
seed=ModelHyperParams.dropout_seed,
is_test=False)
out = layers.fc(input=hidden, size=d_hid, num_flatten_dims=2)
return out
......@@ -177,14 +192,14 @@ pre_process_layer = partial(pre_post_process_layer, None)
post_process_layer = pre_post_process_layer
def prepare_encoder(src_word,
src_pos,
src_vocab_size,
src_emb_dim,
src_max_len,
dropout_rate=0.,
word_emb_param_name=None,
pos_enc_param_name=None):
def prepare_encoder_decoder(src_word,
src_pos,
src_vocab_size,
src_emb_dim,
src_max_len,
dropout_rate=0.,
word_emb_param_name=None,
pos_enc_param_name=None):
"""Add word embeddings and position encodings.
The output tensor has a shape of:
[batch_size, max_src_length_in_batch, d_model].
......@@ -193,6 +208,7 @@ def prepare_encoder(src_word,
src_word_emb = layers.embedding(
src_word,
size=[src_vocab_size, src_emb_dim],
padding_idx=ModelHyperParams.bos_idx, # set embedding of bos to 0
param_attr=fluid.ParamAttr(
name=word_emb_param_name,
initializer=fluid.initializer.Normal(0., src_emb_dim**-0.5)))
......@@ -212,9 +228,9 @@ def prepare_encoder(src_word,
prepare_encoder = partial(
prepare_encoder, pos_enc_param_name=pos_enc_param_names[0])
prepare_encoder_decoder, pos_enc_param_name=pos_enc_param_names[0])
prepare_decoder = partial(
prepare_encoder, pos_enc_param_name=pos_enc_param_names[1])
prepare_encoder_decoder, pos_enc_param_name=pos_enc_param_names[1])
def encoder_layer(enc_input,
......@@ -224,20 +240,28 @@ def encoder_layer(enc_input,
d_value,
d_model,
d_inner_hid,
dropout_rate=0.):
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd="n",
postprocess_cmd="da"):
"""The encoder layers that can be stacked to form a deep encoder.
This module consits of a multi-head (self) attention followed by
position-wise feed-forward networks and both the two components companied
with the post_process_layer to add residual connection, layer normalization
and droput.
"""
attn_output = multi_head_attention(enc_input, enc_input, enc_input,
attn_bias, d_key, d_value, d_model,
n_head, dropout_rate)
attn_output = post_process_layer(enc_input, attn_output, "dan",
dropout_rate)
ffd_output = positionwise_feed_forward(attn_output, d_inner_hid, d_model)
return post_process_layer(attn_output, ffd_output, "dan", dropout_rate)
attn_output = multi_head_attention(
pre_process_layer(enc_input, preprocess_cmd,
prepostprocess_dropout), None, None, attn_bias, d_key,
d_value, d_model, n_head, attention_dropout)
attn_output = post_process_layer(enc_input, attn_output, postprocess_cmd,
prepostprocess_dropout)
ffd_output = positionwise_feed_forward(
pre_process_layer(attn_output, preprocess_cmd, prepostprocess_dropout),
d_inner_hid, d_model, relu_dropout)
return post_process_layer(attn_output, ffd_output, postprocess_cmd,
prepostprocess_dropout)
def encoder(enc_input,
......@@ -248,15 +272,32 @@ def encoder(enc_input,
d_value,
d_model,
d_inner_hid,
dropout_rate=0.):
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd="n",
postprocess_cmd="da"):
"""
The encoder is composed of a stack of identical layers returned by calling
encoder_layer.
"""
for i in range(n_layer):
enc_output = encoder_layer(enc_input, attn_bias, n_head, d_key, d_value,
d_model, d_inner_hid, dropout_rate)
enc_output = encoder_layer(
enc_input,
attn_bias,
n_head,
d_key,
d_value,
d_model,
d_inner_hid,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd, )
enc_input = enc_output
enc_output = pre_process_layer(enc_output, preprocess_cmd,
prepostprocess_dropout)
return enc_output
......@@ -269,30 +310,35 @@ def decoder_layer(dec_input,
d_value,
d_model,
d_inner_hid,
dropout_rate=0.,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
cache=None):
""" The layer to be stacked in decoder part.
The structure of this module is similar to that in the encoder part except
a multi-head attention is added to implement encoder-decoder attention.
"""
slf_attn_output = multi_head_attention(
dec_input,
dec_input,
dec_input,
pre_process_layer(dec_input, preprocess_cmd, prepostprocess_dropout),
None,
None,
slf_attn_bias,
d_key,
d_value,
d_model,
n_head,
dropout_rate,
attention_dropout,
cache, )
slf_attn_output = post_process_layer(
dec_input,
slf_attn_output,
"dan", # residual connection + dropout + layer normalization
dropout_rate, )
postprocess_cmd,
prepostprocess_dropout, )
enc_attn_output = multi_head_attention(
slf_attn_output,
pre_process_layer(slf_attn_output, preprocess_cmd,
prepostprocess_dropout),
enc_output,
enc_output,
dec_enc_attn_bias,
......@@ -300,21 +346,23 @@ def decoder_layer(dec_input,
d_value,
d_model,
n_head,
dropout_rate, )
attention_dropout, )
enc_attn_output = post_process_layer(
slf_attn_output,
enc_attn_output,
"dan", # residual connection + dropout + layer normalization
dropout_rate, )
postprocess_cmd,
prepostprocess_dropout, )
ffd_output = positionwise_feed_forward(
enc_attn_output,
pre_process_layer(enc_attn_output, preprocess_cmd,
prepostprocess_dropout),
d_inner_hid,
d_model, )
d_model,
relu_dropout, )
dec_output = post_process_layer(
enc_attn_output,
ffd_output,
"dan", # residual connection + dropout + layer normalization
dropout_rate, )
postprocess_cmd,
prepostprocess_dropout, )
return dec_output
......@@ -328,16 +376,16 @@ def decoder(dec_input,
d_value,
d_model,
d_inner_hid,
dropout_rate=0.,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
caches=None):
"""
The decoder is composed of a stack of identical decoder_layer layers.
"""
for i in range(n_layer):
cache = None
if caches is not None:
cache = caches[i]
dec_output = decoder_layer(
dec_input,
enc_output,
......@@ -348,9 +396,15 @@ def decoder(dec_input,
d_value,
d_model,
d_inner_hid,
dropout_rate,
cache=cache)
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
cache=None if caches is None else caches[i])
dec_input = dec_output
dec_output = pre_process_layer(dec_output, preprocess_cmd,
prepostprocess_dropout)
return dec_output
......@@ -371,24 +425,58 @@ def make_all_inputs(input_fields):
return inputs
def transformer(
src_vocab_size,
trg_vocab_size,
max_length,
n_layer,
n_head,
d_key,
d_value,
d_model,
d_inner_hid,
dropout_rate,
weight_sharing,
label_smooth_eps, ):
def make_all_py_reader_inputs(input_fields, is_test=False):
reader = layers.py_reader(
capacity=20,
name="test_reader" if is_test else "train_reader",
shapes=[input_descs[input_field][0] for input_field in input_fields],
dtypes=[input_descs[input_field][1] for input_field in input_fields],
lod_levels=[
input_descs[input_field][2]
if len(input_descs[input_field]) == 3 else 0
for input_field in input_fields
])
return layers.read_file(reader), reader
def transformer(src_vocab_size,
trg_vocab_size,
max_length,
n_layer,
n_head,
d_key,
d_value,
d_model,
d_inner_hid,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
weight_sharing,
label_smooth_eps,
use_py_reader=False,
is_test=False):
if weight_sharing:
assert src_vocab_size == src_vocab_size, (
"Vocabularies in source and target should be same for weight sharing."
)
enc_inputs = make_all_inputs(encoder_data_input_fields)
data_input_names = encoder_data_input_fields + \
decoder_data_input_fields[:-1] + label_data_input_fields
if use_py_reader:
all_inputs, reader = make_all_py_reader_inputs(data_input_names,
is_test)
else:
all_inputs = make_all_inputs(data_input_names)
enc_inputs_len = len(encoder_data_input_fields)
dec_inputs_len = len(decoder_data_input_fields[:-1])
enc_inputs = all_inputs[0:enc_inputs_len]
dec_inputs = all_inputs[enc_inputs_len:enc_inputs_len + dec_inputs_len]
label = all_inputs[-2]
weights = all_inputs[-1]
enc_output = wrap_encoder(
src_vocab_size,
......@@ -399,12 +487,14 @@ def transformer(
d_value,
d_model,
d_inner_hid,
dropout_rate,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
weight_sharing,
enc_inputs, )
dec_inputs = make_all_inputs(decoder_data_input_fields[:-1])
predict = wrap_decoder(
trg_vocab_size,
max_length,
......@@ -414,14 +504,17 @@ def transformer(
d_value,
d_model,
d_inner_hid,
dropout_rate,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
weight_sharing,
dec_inputs,
enc_output, )
# Padding index do not contribute to the total loss. The weights is used to
# cancel padding index in calculating the loss.
label, weights = make_all_inputs(label_data_input_fields)
if label_smooth_eps:
label = layers.label_smooth(
label=layers.one_hot(
......@@ -436,9 +529,9 @@ def transformer(
weighted_cost = cost * weights
sum_cost = layers.reduce_sum(weighted_cost)
token_num = layers.reduce_sum(weights)
token_num.stop_gradient = True
avg_cost = sum_cost / token_num
avg_cost.stop_gradient = True
return sum_cost, avg_cost, predict, token_num
return sum_cost, avg_cost, predict, token_num, reader if use_py_reader else None
def wrap_encoder(src_vocab_size,
......@@ -449,7 +542,11 @@ def wrap_encoder(src_vocab_size,
d_value,
d_model,
d_inner_hid,
dropout_rate,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
weight_sharing,
enc_inputs=None):
"""
......@@ -457,21 +554,32 @@ def wrap_encoder(src_vocab_size,
"""
if enc_inputs is None:
# This is used to implement independent encoder program in inference.
src_word, src_pos, src_slf_attn_bias = \
make_all_inputs(encoder_data_input_fields)
src_word, src_pos, src_slf_attn_bias = make_all_inputs(
encoder_data_input_fields)
else:
src_word, src_pos, src_slf_attn_bias = \
enc_inputs
src_word, src_pos, src_slf_attn_bias = enc_inputs
enc_input = prepare_encoder(
src_word,
src_pos,
src_vocab_size,
d_model,
max_length,
dropout_rate,
prepostprocess_dropout,
word_emb_param_name=word_emb_param_names[0])
enc_output = encoder(enc_input, src_slf_attn_bias, n_layer, n_head, d_key,
d_value, d_model, d_inner_hid, dropout_rate)
enc_output = encoder(
enc_input,
src_slf_attn_bias,
n_layer,
n_head,
d_key,
d_value,
d_model,
d_inner_hid,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd, )
return enc_output
......@@ -483,7 +591,11 @@ def wrap_decoder(trg_vocab_size,
d_value,
d_model,
d_inner_hid,
dropout_rate,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
weight_sharing,
dec_inputs=None,
enc_output=None,
......@@ -493,9 +605,8 @@ def wrap_decoder(trg_vocab_size,
"""
if dec_inputs is None:
# This is used to implement independent decoder program in inference.
trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias, \
enc_output = make_all_inputs(
decoder_data_input_fields + decoder_util_input_fields)
trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias, enc_output = \
make_all_inputs(decoder_data_input_fields)
else:
trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias = dec_inputs
......@@ -505,7 +616,7 @@ def wrap_decoder(trg_vocab_size,
trg_vocab_size,
d_model,
max_length,
dropout_rate,
prepostprocess_dropout,
word_emb_param_name=word_emb_param_names[0]
if weight_sharing else word_emb_param_names[1])
dec_output = decoder(
......@@ -519,9 +630,12 @@ def wrap_decoder(trg_vocab_size,
d_value,
d_model,
d_inner_hid,
dropout_rate,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
caches=caches)
# Return logits for training and probs for inference.
if weight_sharing:
predict = layers.matmul(
x=dec_output,
......@@ -533,6 +647,7 @@ def wrap_decoder(trg_vocab_size,
bias_attr=False,
num_flatten_dims=2)
if dec_inputs is None:
# Return probs for independent decoder program.
predict = layers.softmax(predict)
return predict
......@@ -547,7 +662,11 @@ def fast_decode(
d_value,
d_model,
d_inner_hid,
dropout_rate,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
weight_sharing,
beam_size,
max_out_len,
......@@ -556,11 +675,12 @@ def fast_decode(
Use beam search to decode. Caches will be used to store states of history
steps which can make the decoding faster.
"""
enc_output = wrap_encoder(src_vocab_size, max_in_len, n_layer, n_head,
d_key, d_value, d_model, d_inner_hid,
dropout_rate, weight_sharing)
start_tokens, init_scores, trg_src_attn_bias = \
make_all_inputs(fast_decoder_data_input_fields )
enc_output = wrap_encoder(
src_vocab_size, max_in_len, n_layer, n_head, d_key, d_value, d_model,
d_inner_hid, prepostprocess_dropout, attention_dropout, relu_dropout,
preprocess_cmd, postprocess_cmd, weight_sharing)
start_tokens, init_scores, trg_src_attn_bias = make_all_inputs(
fast_decoder_data_input_fields)
def beam_search():
max_len = layers.fill_constant(
......@@ -609,8 +729,7 @@ def fast_decode(
value=1,
shape=[-1, 1, 1],
dtype=pre_ids.dtype),
y=layers.increment(
x=step_idx, value=1.0, in_place=False),
y=step_idx,
axis=0)
logits = wrap_decoder(
trg_vocab_size,
......@@ -621,7 +740,11 @@ def fast_decode(
d_value,
d_model,
d_inner_hid,
dropout_rate,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
preprocess_cmd,
postprocess_cmd,
weight_sharing,
dec_inputs=(pre_ids, pre_pos, None, pre_src_attn_bias),
enc_output=pre_enc_output,
......
import os
import time
import argparse
import ast
import numpy as np
import multiprocessing
import os
import six
import time
import paddle
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
from train import split_data, read_multiple, prepare_batch_input
from model import transformer, position_encoding_init
from optim import LearningRateScheduler
from config import *
import reader
from config import *
from train import pad_batch_data, prepare_data_generator, \
prepare_feed_dict_list, py_reader_provider_wrapper
from model import transformer, position_encoding_init
def parse_args():
parser = argparse.ArgumentParser(
"Profile the training process for Transformer.")
parser = argparse.ArgumentParser("Training for Transformer.")
parser.add_argument(
"--src_vocab_fpath",
type=str,
......@@ -43,38 +42,70 @@ def parse_args():
parser.add_argument(
"--batch_size",
type=int,
default=2048,
default=4096,
help="The number of sequences contained in a mini-batch, or the maximum "
"number of tokens (include paddings) contained in a mini-batch. Note "
"that this represents the number on single device and the actual batch "
"size for multi-devices will multiply the device number.")
parser.add_argument(
"--num_iters",
type=int,
default=10,
help="The maximum number of iterations profiling over.")
parser.add_argument(
"--pool_size",
type=int,
default=10000,
default=200000,
help="The buffer size to pool data.")
parser.add_argument(
"--sort_type",
default="pool",
choices=("global", "pool", "none"),
help="The grain to sort by length: global for all instances; pool for "
"instances in pool; none for no sort.")
parser.add_argument(
"--shuffle",
type=ast.literal_eval,
default=True,
help="The flag indicating whether to shuffle instances in each pass.")
parser.add_argument(
"--shuffle_batch",
type=ast.literal_eval,
default=True,
help="The flag indicating whether to shuffle the data batches.")
parser.add_argument(
"--special_token",
type=str,
default=["<s>", "<e>", "<unk>"],
nargs=3,
help="The <bos>, <eos> and <unk> tokens in the dictionary.")
parser.add_argument(
"--token_delimiter",
type=lambda x: str(x.encode().decode("unicode-escape")),
default=" ",
help="The delimiter used to split tokens in source or target sentences. "
"For EN-DE BPE data we provided, use spaces as token delimiter. "
"For EN-FR wordpiece data we provided, use '\x01' as token delimiter.")
parser.add_argument(
"--use_mem_opt",
type=ast.literal_eval,
default=True,
help="The flag indicating whether to use memory optimization.")
parser.add_argument(
"--use_py_reader",
type=ast.literal_eval,
default=True,
help="The flag indicating whether to use py_reader.")
parser.add_argument(
"--iter_num",
type=int,
default=20,
help="The iteration number to run in profiling.")
parser.add_argument(
"--use_parallel_exe",
type=bool,
default=False,
help="The flag indicating whether to use ParallelExecutor.")
parser.add_argument(
'opts',
help='See config.py for all options',
default=None,
nargs=argparse.REMAINDER)
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help="The device type.")
args = parser.parse_args()
# Append args related to dict
......@@ -91,153 +122,147 @@ def parse_args():
return args
def train_loop(exe, train_progm, init, num_iters, train_data, dev_count,
sum_cost, avg_cost, lr_scheduler, token_num, predict):
data_input_names = encoder_data_input_fields + decoder_data_input_fields[:
-1] + label_data_input_fields
start_time = time.time()
exec_time = 0.0
for batch_id, data in enumerate(train_data()):
if batch_id >= num_iters:
break
feed_list = []
total_num_token = 0
for place_id, data_buffer in enumerate(
split_data(
data, num_part=dev_count)):
data_input_dict, num_token = prepare_batch_input(
data_buffer, data_input_names, ModelHyperParams.eos_idx,
ModelHyperParams.eos_idx, ModelHyperParams.n_head,
ModelHyperParams.d_model)
total_num_token += num_token
feed_kv_pairs = data_input_dict.items()
lr_rate = lr_scheduler.update_learning_rate()
feed_kv_pairs += {lr_scheduler.learning_rate.name: lr_rate}.items()
feed_list.append(dict(feed_kv_pairs))
if not init:
for pos_enc_param_name in pos_enc_param_names:
pos_enc = position_encoding_init(
ModelHyperParams.max_length + 1,
ModelHyperParams.d_model)
feed_list[place_id][pos_enc_param_name] = pos_enc
for feed_dict in feed_list:
feed_dict[sum_cost.name + "@GRAD"] = 1. / total_num_token
exe_start_time = time.time()
if dev_count > 1:
# prallel executor
outs = exe.run(fetch_list=[sum_cost.name, token_num.name],
feed=feed_list)
else:
# executor
outs = exe.run(fetch_list=[sum_cost, token_num], feed=feed_list[0])
exec_time += time.time() - exe_start_time
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1])
total_sum_cost = sum_cost_val.sum() # sum the cost from multi-devices
total_token_num = token_num_val.sum()
total_avg_cost = total_sum_cost / total_token_num
print("batch: %d, sum loss: %f, avg loss: %f, ppl: %f" %
(batch_id, total_sum_cost, total_avg_cost,
np.exp([min(total_avg_cost, 100)])))
init = True
return time.time() - start_time, exec_time
def profile(args):
print args
if args.device == 'CPU':
TrainTaskConfig.use_gpu = False
if not TrainTaskConfig.use_gpu:
place = fluid.CPUPlace()
dev_count = multiprocessing.cpu_count()
else:
def main(args):
train_prog = fluid.Program()
startup_prog = fluid.Program()
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
sum_cost, avg_cost, predict, token_num, pyreader = transformer(
ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size,
ModelHyperParams.max_length + 1,
ModelHyperParams.n_layer,
ModelHyperParams.n_head,
ModelHyperParams.d_key,
ModelHyperParams.d_value,
ModelHyperParams.d_model,
ModelHyperParams.d_inner_hid,
ModelHyperParams.prepostprocess_dropout,
ModelHyperParams.attention_dropout,
ModelHyperParams.relu_dropout,
ModelHyperParams.preprocess_cmd,
ModelHyperParams.postprocess_cmd,
ModelHyperParams.weight_sharing,
TrainTaskConfig.label_smooth_eps,
use_py_reader=args.use_py_reader,
is_test=False)
lr_decay = fluid.layers.learning_rate_scheduler.noam_decay(
ModelHyperParams.d_model, TrainTaskConfig.warmup_steps)
optimizer = fluid.optimizer.Adam(
learning_rate=lr_decay * TrainTaskConfig.learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
optimizer.minimize(avg_cost)
if args.use_mem_opt:
fluid.memory_optimize(train_prog)
if TrainTaskConfig.use_gpu:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
else:
place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exe = fluid.Executor(place)
sum_cost, avg_cost, predict, token_num = transformer(
ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size,
ModelHyperParams.max_length + 1, ModelHyperParams.n_layer,
ModelHyperParams.n_head, ModelHyperParams.d_key,
ModelHyperParams.d_value, ModelHyperParams.d_model,
ModelHyperParams.d_inner_hid, ModelHyperParams.dropout,
ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps)
lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model,
TrainTaskConfig.warmup_steps,
TrainTaskConfig.learning_rate)
optimizer = fluid.optimizer.Adam(
learning_rate=lr_scheduler.learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
optimizer.minimize(sum_cost)
# Initialize the parameters.
if TrainTaskConfig.ckpt_path:
fluid.io.load_persistables(exe, TrainTaskConfig.ckpt_path)
lr_scheduler.current_steps = TrainTaskConfig.start_step
else:
exe.run(fluid.framework.default_startup_program())
# Disable all sorts for they will be done in the 1st batch.
train_data = reader.DataReader(
src_vocab_fpath=args.src_vocab_fpath,
trg_vocab_fpath=args.trg_vocab_fpath,
fpattern=args.train_file_pattern,
use_token_batch=args.use_token_batch,
batch_size=args.batch_size * (1 if args.use_token_batch else dev_count),
pool_size=args.pool_size,
sort_type='none',
shuffle=False,
shuffle_batch=False,
start_mark=args.special_token[0],
end_mark=args.special_token[1],
unk_mark=args.special_token[2],
# count start and end tokens out
max_length=ModelHyperParams.max_length - 2,
clip_last_batch=False)
train_data = read_multiple(
reader=train_data.batch_generator,
count=dev_count if args.use_token_batch else 1)
if dev_count > 1:
build_strategy = fluid.BuildStrategy()
build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
train_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
loss_name=sum_cost.name,
main_program=fluid.default_main_program(),
build_strategy=build_strategy)
print("Warming up ...")
train_loop(exe if dev_count == 1 else train_exe,
fluid.default_main_program(), False, 3, train_data, dev_count,
sum_cost, avg_cost, lr_scheduler, token_num, predict)
print("\nProfiling ...")
if dev_count == 1:
with profiler.profiler('All', 'total', '/tmp/profile_file'):
total_time, exec_time = train_loop(
exe,
fluid.default_main_program(), True, args.num_iters, train_data,
dev_count, sum_cost, avg_cost, lr_scheduler, token_num, predict)
exe.run(startup_prog)
exec_strategy = fluid.ExecutionStrategy()
# For faster executor
exec_strategy.use_experimental_executor = True
exec_strategy.num_iteration_per_drop_scope = 5
build_strategy = fluid.BuildStrategy()
# Since the token number differs among devices, customize gradient scale to
# use token average cost among multi-devices. and the gradient scale is
# `1 / token_number` for average cost.
build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
train_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
loss_name=avg_cost.name,
main_program=train_prog,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
# the best cross-entropy value with label smoothing
loss_normalizer = -((1. - TrainTaskConfig.label_smooth_eps) * np.log(
(1. - TrainTaskConfig.label_smooth_eps
)) + TrainTaskConfig.label_smooth_eps *
np.log(TrainTaskConfig.label_smooth_eps / (
ModelHyperParams.trg_vocab_size - 1) + 1e-20))
train_data = prepare_data_generator(
args, is_test=False, count=dev_count, pyreader=pyreader)
if args.use_py_reader:
pyreader.start()
data_generator = None
else:
total_time, exec_time = train_loop(
train_exe,
fluid.default_main_program(), True, args.num_iters, train_data,
dev_count, sum_cost, avg_cost, lr_scheduler, token_num, predict)
print("Elapsed time: total %f s, in executor %f s" %
(total_time, exec_time))
data_generator = train_data()
def run(iter_num):
reader_time = []
run_time = []
for step_idx in six.moves.xrange(iter_num):
try:
start_time = time.time()
feed_dict_list = prepare_feed_dict_list(data_generator,
init_flag, dev_count)
end_time = time.time()
reader_time.append(end_time - start_time)
start_time = time.time()
if args.use_parallel_exe:
outs = train_exe.run(
fetch_list=[sum_cost.name, token_num.name],
feed=feed_dict_list)
else:
outs = exe.run(program=train_prog,
fetch_list=[sum_cost.name, token_num.name],
feed=feed_dict_list[0]
if feed_dict_list is not None else None)
end_time = time.time()
run_time.append(end_time - start_time)
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[
1])
# sum the cost from multi-devices
total_sum_cost = sum_cost_val.sum()
total_token_num = token_num_val.sum()
total_avg_cost = total_sum_cost / total_token_num
print("step_idx: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f" %
(step_idx, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)])))
except (StopIteration, fluid.core.EOFException):
# The current pass is over.
if args.use_py_reader:
pyreader.reset()
pyreader.start()
break
return reader_time, run_time
# start-up
init_flag = True
run(1)
init_flag = False
# profiling
start = time.time()
# currently only support profiling on one device
with profiler.profiler('All', 'total', '/tmp/profile_file'):
reader_time, run_time = run(args.iter_num)
end = time.time()
total_time = end - start
print("Total time: {0}, reader time: {1} s, run time: {2} s".format(
total_time, np.sum(reader_time), np.sum(run_time)))
if __name__ == "__main__":
args = parse_args()
profile(args)
main(args)
......@@ -12,15 +12,16 @@ class SortType(object):
class Converter(object):
def __init__(self, vocab, beg, end, unk, delimiter):
def __init__(self, vocab, beg, end, unk, delimiter, add_beg):
self._vocab = vocab
self._beg = beg
self._end = end
self._unk = unk
self._delimiter = delimiter
self._add_beg = add_beg
def __call__(self, sentence):
return [self._beg] + [
return ([self._beg] if self._add_beg else []) + [
self._vocab.get(w, self._unk)
for w in sentence.split(self._delimiter)
] + [self._end]
......@@ -215,7 +216,8 @@ class DataReader(object):
beg=self._src_vocab[start_mark],
end=self._src_vocab[end_mark],
unk=self._src_vocab[unk_mark],
delimiter=self._token_delimiter)
delimiter=self._token_delimiter,
add_beg=False)
]
if not self._only_src:
converters.append(
......@@ -224,7 +226,8 @@ class DataReader(object):
beg=self._trg_vocab[start_mark],
end=self._trg_vocab[end_mark],
unk=self._trg_vocab[unk_mark],
delimiter=self._token_delimiter))
delimiter=self._token_delimiter,
add_beg=True))
converters = ComposedConverter(converters)
......@@ -280,8 +283,7 @@ class DataReader(object):
def batch_generator(self):
# global sort or global shuffle
if self._sort_type == SortType.GLOBAL:
infos = sorted(
self._sample_infos, key=lambda x: x.max_len, reverse=True)
infos = sorted(self._sample_infos, key=lambda x: x.max_len)
else:
if self._shuffle:
infos = self._sample_infos
......
......@@ -11,7 +11,6 @@ import paddle.fluid as fluid
import reader
from config import *
from model import transformer, position_encoding_init
from optim import LearningRateScheduler
def parse_args():
......@@ -44,7 +43,7 @@ def parse_args():
parser.add_argument(
"--batch_size",
type=int,
default=2048,
default=4096,
help="The number of sequences contained in a mini-batch, or the maximum "
"number of tokens (include paddings) contained in a mini-batch. Note "
"that this represents the number on single device and the actual batch "
......@@ -52,7 +51,7 @@ def parse_args():
parser.add_argument(
"--pool_size",
type=int,
default=10000,
default=200000,
help="The buffer size to pool data.")
parser.add_argument(
"--sort_type",
......@@ -104,9 +103,19 @@ def parse_args():
parser.add_argument(
"--enable_ce",
type=ast.literal_eval,
default=True,
default=False,
help="The flag indicating whether to run the task "
"for continuous evaluation.")
parser.add_argument(
"--use_mem_opt",
type=ast.literal_eval,
default=True,
help="The flag indicating whether to use memory optimization.")
parser.add_argument(
"--use_py_reader",
type=ast.literal_eval,
default=True,
help="The flag indicating whether to use py_reader.")
args = parser.parse_args()
# Append args related to dict
......@@ -148,7 +157,7 @@ def pad_batch_data(insts,
return_list += [inst_weight.astype("float32").reshape([-1, 1])]
else: # position data
inst_pos = np.array([
list(range(1, len(inst) + 1)) + [0] * (max_len - len(inst))
list(range(0, len(inst))) + [0] * (max_len - len(inst))
for inst in insts
])
return_list += [inst_pos.astype("int64").reshape([-1, 1])]
......@@ -212,95 +221,183 @@ def prepare_batch_input(insts, data_input_names, src_pad_idx, trg_pad_idx,
src_word, src_pos, src_slf_attn_bias, trg_word, trg_pos,
trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight
]))
return data_input_dict, np.asarray([num_token], dtype="float32")
def read_multiple(reader, count, clip_last=True):
def prepare_data_generator(args, is_test, count, pyreader):
"""
Stack data from reader for multi-devices.
Data generator wrapper for DataReader. If use py_reader, set the data
provider for py_reader
"""
def __impl__():
res = []
for item in reader():
res.append(item)
if len(res) == count:
yield res
res = []
if len(res) == count:
yield res
elif not clip_last:
data = []
for item in res:
data += item
if len(data) > count:
inst_num_per_part = len(data) // count
yield [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(count)
]
return __impl__
def split_data(data, num_part):
"""
Split data for each device.
"""
if len(data) == num_part:
return data
data = data[0]
inst_num_per_part = len(data) // num_part
return [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(num_part)
]
def test_context(test_program, avg_cost, train_exe, dev_count, data_input_names,
sum_cost, token_num):
val_data = reader.DataReader(
data_reader = reader.DataReader(
fpattern=args.val_file_pattern if is_test else args.train_file_pattern,
src_vocab_fpath=args.src_vocab_fpath,
trg_vocab_fpath=args.trg_vocab_fpath,
fpattern=args.val_file_pattern,
token_delimiter=args.token_delimiter,
use_token_batch=args.use_token_batch,
batch_size=args.batch_size * (1 if args.use_token_batch else dev_count),
batch_size=args.batch_size * (1 if args.use_token_batch else count),
pool_size=args.pool_size,
sort_type=args.sort_type,
shuffle=args.shuffle,
shuffle_batch=args.shuffle_batch,
start_mark=args.special_token[0],
end_mark=args.special_token[1],
unk_mark=args.special_token[2],
# count start and end tokens out
max_length=ModelHyperParams.max_length - 2,
clip_last_batch=False,
shuffle=False,
shuffle_batch=False)
clip_last_batch=False).batch_generator
def stack(data_reader, count, clip_last=True):
def __impl__():
res = []
for item in data_reader():
res.append(item)
if len(res) == count:
yield res
res = []
if len(res) == count:
yield res
elif not clip_last:
data = []
for item in res:
data += item
if len(data) > count:
inst_num_per_part = len(data) // count
yield [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(count)
]
return __impl__
def split(data_reader, count):
def __impl__():
for item in data_reader():
inst_num_per_part = len(item) // count
for i in range(count):
yield item[inst_num_per_part * i:inst_num_per_part * (i + 1
)]
return __impl__
if not args.use_token_batch:
# to make data on each device have similar token number
data_reader = split(data_reader, count)
if args.use_py_reader:
pyreader.decorate_tensor_provider(
py_reader_provider_wrapper(data_reader))
data_reader = None
else: # Data generator for multi-devices
data_reader = stack(data_reader, count)
return data_reader
def prepare_feed_dict_list(data_generator, init_flag, count):
"""
Prepare the list of feed dict for multi-devices.
"""
feed_dict_list = []
if data_generator is not None: # use_py_reader == False
data_input_names = encoder_data_input_fields + \
decoder_data_input_fields[:-1] + label_data_input_fields
data = next(data_generator)
for idx, data_buffer in enumerate(data):
data_input_dict, num_token = prepare_batch_input(
data_buffer, data_input_names, ModelHyperParams.eos_idx,
ModelHyperParams.eos_idx, ModelHyperParams.n_head,
ModelHyperParams.d_model)
feed_dict_list.append(data_input_dict)
if init_flag:
for idx in range(count):
pos_enc_tables = dict()
for pos_enc_param_name in pos_enc_param_names:
pos_enc_tables[pos_enc_param_name] = position_encoding_init(
ModelHyperParams.max_length + 1, ModelHyperParams.d_model)
if len(feed_dict_list) <= idx:
feed_dict_list.append(pos_enc_tables)
else:
feed_dict_list[idx] = dict(
list(pos_enc_tables.items()) + list(feed_dict_list[idx]
.items()))
return feed_dict_list if len(feed_dict_list) == count else None
def py_reader_provider_wrapper(data_reader):
"""
Data provider needed by fluid.layers.py_reader.
"""
def py_reader_provider():
data_input_names = encoder_data_input_fields + \
decoder_data_input_fields[:-1] + label_data_input_fields
for batch_id, data in enumerate(data_reader()):
data_input_dict, num_token = prepare_batch_input(
data, data_input_names, ModelHyperParams.eos_idx,
ModelHyperParams.eos_idx, ModelHyperParams.n_head,
ModelHyperParams.d_model)
total_dict = dict(data_input_dict.items())
yield [total_dict[item] for item in data_input_names]
return py_reader_provider
def test_context(exe, train_exe, dev_count):
# Context to do validation.
startup_prog = fluid.Program()
test_prog = fluid.Program()
with fluid.program_guard(test_prog, startup_prog):
with fluid.unique_name.guard():
sum_cost, avg_cost, predict, token_num, pyreader = transformer(
ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size,
ModelHyperParams.max_length + 1,
ModelHyperParams.n_layer,
ModelHyperParams.n_head,
ModelHyperParams.d_key,
ModelHyperParams.d_value,
ModelHyperParams.d_model,
ModelHyperParams.d_inner_hid,
ModelHyperParams.prepostprocess_dropout,
ModelHyperParams.attention_dropout,
ModelHyperParams.relu_dropout,
ModelHyperParams.preprocess_cmd,
ModelHyperParams.postprocess_cmd,
ModelHyperParams.weight_sharing,
TrainTaskConfig.label_smooth_eps,
use_py_reader=args.use_py_reader,
is_test=True)
test_data = prepare_data_generator(
args, is_test=True, count=dev_count, pyreader=pyreader)
exe.run(startup_prog)
test_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
main_program=test_program,
main_program=test_prog,
share_vars_from=train_exe)
def test(exe=test_exe):
def test(exe=test_exe, pyreader=pyreader):
test_total_cost = 0
test_total_token = 0
test_data = read_multiple(
reader=val_data.batch_generator,
count=dev_count if args.use_token_batch else 1)
for batch_id, data in enumerate(test_data()):
feed_list = []
for place_id, data_buffer in enumerate(
split_data(
data, num_part=dev_count)):
data_input_dict, _ = prepare_batch_input(
data_buffer, data_input_names, ModelHyperParams.eos_idx,
ModelHyperParams.eos_idx, ModelHyperParams.n_head,
ModelHyperParams.d_model)
feed_list.append(data_input_dict)
outs = exe.run(feed=feed_list,
fetch_list=[sum_cost.name, token_num.name])
if args.use_py_reader:
pyreader.start()
data_generator = None
else:
data_generator = test_data()
while True:
try:
feed_dict_list = prepare_feed_dict_list(data_generator, False,
dev_count)
outs = test_exe.run(fetch_list=[sum_cost.name, token_num.name],
feed=feed_dict_list)
except (StopIteration, fluid.core.EOFException):
# The current pass is over.
if args.use_py_reader:
pyreader.reset()
break
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1])
test_total_cost += sum_cost_val.sum()
test_total_token += token_num_val.sum()
......@@ -311,37 +408,22 @@ def test_context(test_program, avg_cost, train_exe, dev_count, data_input_names,
return test
def train_loop(exe, train_progm, dev_count, sum_cost, avg_cost, lr_scheduler,
token_num, predict, test_program):
def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
token_num, predict, pyreader):
# Initialize the parameters.
if TrainTaskConfig.ckpt_path:
fluid.io.load_persistables(exe, TrainTaskConfig.ckpt_path)
lr_scheduler.current_steps = TrainTaskConfig.start_step
else:
print("init fluid.framework.default_startup_program")
exe.run(fluid.framework.default_startup_program())
exe.run(startup_prog)
train_data = reader.DataReader(
src_vocab_fpath=args.src_vocab_fpath,
trg_vocab_fpath=args.trg_vocab_fpath,
fpattern=args.train_file_pattern,
token_delimiter=args.token_delimiter,
use_token_batch=args.use_token_batch,
batch_size=args.batch_size * (1 if args.use_token_batch else dev_count),
pool_size=args.pool_size,
sort_type=args.sort_type,
shuffle=args.shuffle,
shuffle_batch=args.shuffle_batch,
start_mark=args.special_token[0],
end_mark=args.special_token[1],
unk_mark=args.special_token[2],
# count start and end tokens out
max_length=ModelHyperParams.max_length - 2,
clip_last_batch=False)
train_data = read_multiple(
reader=train_data.batch_generator,
count=dev_count if args.use_token_batch else 1)
train_data = prepare_data_generator(
args, is_test=False, count=dev_count, pyreader=pyreader)
# For faster executor
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.use_experimental_executor = True
# exec_strategy.num_iteration_per_drop_scope = 5
build_strategy = fluid.BuildStrategy()
# Since the token number differs among devices, customize gradient scale to
# use token average cost among multi-devices. and the gradient scale is
......@@ -349,16 +431,13 @@ def train_loop(exe, train_progm, dev_count, sum_cost, avg_cost, lr_scheduler,
build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
train_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
loss_name=sum_cost.name,
main_program=train_progm,
build_strategy=build_strategy)
data_input_names = encoder_data_input_fields + decoder_data_input_fields[:
-1] + label_data_input_fields
loss_name=avg_cost.name,
main_program=train_prog,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
if args.val_file_pattern is not None:
test = test_context(test_program, avg_cost, train_exe, dev_count,
data_input_names, sum_cost, token_num)
test = test_context(exe, train_exe, dev_count)
# the best cross-entropy value with label smoothing
loss_normalizer = -((1. - TrainTaskConfig.label_smooth_eps) * np.log(
......@@ -368,61 +447,60 @@ def train_loop(exe, train_progm, dev_count, sum_cost, avg_cost, lr_scheduler,
ModelHyperParams.trg_vocab_size - 1) + 1e-20))
step_idx = 0
inst_num = 0
init = False
init_flag = True
for pass_id in six.moves.xrange(TrainTaskConfig.pass_num):
pass_start_time = time.time()
for batch_id, data in enumerate(train_data()):
feed_list = []
total_num_token = 0
if args.local:
lr_rate = lr_scheduler.update_learning_rate()
for place_id, data_buffer in enumerate(
split_data(
data, num_part=dev_count)):
data_input_dict, num_token = prepare_batch_input(
data_buffer, data_input_names, ModelHyperParams.eos_idx,
ModelHyperParams.eos_idx, ModelHyperParams.n_head,
ModelHyperParams.d_model)
total_num_token += num_token
inst_num += len(data_buffer)
feed_kv_pairs = list(data_input_dict.items())
if args.local:
feed_kv_pairs += list({
lr_scheduler.learning_rate.name: lr_rate
}.items())
feed_list.append(dict(feed_kv_pairs))
if not init:
for pos_enc_param_name in pos_enc_param_names:
pos_enc = position_encoding_init(
ModelHyperParams.max_length + 1,
ModelHyperParams.d_model)
feed_list[place_id][pos_enc_param_name] = pos_enc
for feed_dict in feed_list:
feed_dict[sum_cost.name + "@GRAD"] = 1. / total_num_token
outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name],
feed=feed_list)
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1])
total_sum_cost = sum_cost_val.sum(
) # sum the cost from multi-devices
total_token_num = token_num_val.sum()
total_avg_cost = total_sum_cost / total_token_num
print(
"step_idx: %d, total samples: %d, epoch: %d, batch: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f" %
(step_idx, inst_num, pass_id, batch_id, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)])))
if batch_id > 0 and batch_id % 1000 == 0:
fluid.io.save_persistables(
exe,
os.path.join(TrainTaskConfig.ckpt_dir, "latest.checkpoint"))
step_idx += 1
init = True
if args.use_py_reader:
pyreader.start()
data_generator = None
else:
data_generator = train_data()
batch_id = 0
while True:
try:
feed_dict_list = prepare_feed_dict_list(data_generator,
init_flag, dev_count)
outs = train_exe.run(
fetch_list=[sum_cost.name, token_num.name],
feed=feed_dict_list)
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[
1])
# sum the cost from multi-devices
total_sum_cost = sum_cost_val.sum()
total_token_num = token_num_val.sum()
total_avg_cost = total_sum_cost / total_token_num
print("step_idx: %d, epoch: %d, batch: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f" %
(step_idx, pass_id, batch_id, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)])))
if step_idx % int(TrainTaskConfig.
save_freq) == TrainTaskConfig.save_freq - 1:
fluid.io.save_persistables(
exe,
os.path.join(TrainTaskConfig.ckpt_dir,
"latest.checkpoint"), train_prog)
fluid.io.save_params(
exe,
os.path.join(TrainTaskConfig.model_dir,
"iter_" + str(step_idx) + ".infer.model"),
train_prog)
init_flag = False
batch_id += 1
step_idx += 1
except (StopIteration, fluid.core.EOFException):
# The current pass is over.
if args.use_py_reader:
pyreader.reset()
break
time_consumed = time.time() - pass_start_time
# Validate and save the model for inference.
# Validate and save the persistable.
if args.val_file_pattern is not None:
val_avg_cost, val_ppl = test()
print(
......@@ -435,14 +513,12 @@ def train_loop(exe, train_progm, dev_count, sum_cost, avg_cost, lr_scheduler,
fluid.io.save_persistables(
exe,
os.path.join(TrainTaskConfig.ckpt_dir,
"pass_" + str(pass_id) + ".checkpoint"))
fluid.io.save_inference_model(
os.path.join(TrainTaskConfig.model_dir,
"pass_" + str(pass_id) + ".infer.model"),
data_input_names[:-2], [predict], exe)
"pass_" + str(pass_id) + ".checkpoint"), train_prog)
if args.enable_ce: # For CE
print("kpis\ttrain_cost_card%d\t%f" % (dev_count, total_avg_cost))
print("kpis\ttest_cost_card%d\t%f" % (dev_count, val_avg_cost))
if args.val_file_pattern is not None:
print("kpis\ttest_cost_card%d\t%f" % (dev_count, val_avg_cost))
print("kpis\ttrain_duration_card%d\t%f" % (dev_count, time_consumed))
......@@ -467,50 +543,54 @@ def train(args):
exe = fluid.Executor(place)
if args.enable_ce:
fluid.default_startup_program().random_seed = 1000
train_prog = fluid.Program()
startup_prog = fluid.Program()
sum_cost, avg_cost, predict, token_num = transformer(
ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size,
ModelHyperParams.max_length + 1, ModelHyperParams.n_layer,
ModelHyperParams.n_head, ModelHyperParams.d_key,
ModelHyperParams.d_value, ModelHyperParams.d_model,
ModelHyperParams.d_inner_hid, ModelHyperParams.dropout,
ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps)
lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model,
TrainTaskConfig.warmup_steps,
TrainTaskConfig.learning_rate)
test_program = fluid.default_main_program().clone(for_test=True)
if args.enable_ce:
train_prog.random_seed = 1000
startup_prog.random_seed = 1000
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
sum_cost, avg_cost, predict, token_num, pyreader = transformer(
ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size,
ModelHyperParams.max_length + 1,
ModelHyperParams.n_layer,
ModelHyperParams.n_head,
ModelHyperParams.d_key,
ModelHyperParams.d_value,
ModelHyperParams.d_model,
ModelHyperParams.d_inner_hid,
ModelHyperParams.prepostprocess_dropout,
ModelHyperParams.attention_dropout,
ModelHyperParams.relu_dropout,
ModelHyperParams.preprocess_cmd,
ModelHyperParams.postprocess_cmd,
ModelHyperParams.weight_sharing,
TrainTaskConfig.label_smooth_eps,
use_py_reader=args.use_py_reader,
is_test=False)
if args.local:
optimizer = fluid.optimizer.Adam(
learning_rate=lr_scheduler.learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
optimizer.minimize(sum_cost)
elif args.sync == False:
optimizer = fluid.optimizer.SGD(0.003)
optimizer.minimize(sum_cost)
else:
lr_decay = fluid.layers\
.learning_rate_scheduler\
.noam_decay(ModelHyperParams.d_model,
TrainTaskConfig.warmup_steps)
optimizer = fluid.optimizer.Adam(
learning_rate=lr_decay,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
optimizer.minimize(sum_cost)
if args.local:
lr_decay = fluid.layers.learning_rate_scheduler.noam_decay(
ModelHyperParams.d_model, TrainTaskConfig.warmup_steps)
optimizer = fluid.optimizer.Adam(
learning_rate=lr_decay * TrainTaskConfig.learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
elif args.sync == False:
optimizer = fluid.optimizer.SGD(0.003)
optimizer.minimize(avg_cost)
if args.use_mem_opt:
fluid.memory_optimize(train_prog)
if args.local:
print("local start_up:")
train_loop(exe,
fluid.default_main_program(), dev_count, sum_cost, avg_cost,
lr_scheduler, token_num, predict, test_program)
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
token_num, predict, pyreader)
else:
port = os.getenv("PADDLE_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVERS") # ip,ip...
......@@ -522,7 +602,12 @@ def train(args):
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
t = fluid.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
t.transpile(
trainer_id,
pservers=pserver_endpoints,
trainers=trainers,
program=train_prog,
startup_program=startup_prog)
if training_role == "PSERVER":
current_endpoint = os.getenv("POD_IP") + ":" + os.getenv(
......@@ -542,12 +627,11 @@ def train(args):
exe.run(pserver_startup)
exe.run(pserver_prog)
elif training_role == "TRAINER":
trainer_prog = t.get_trainer_program()
with open('trainer_prog.desc', 'w') as f:
f.write(str(trainer_prog))
train_loop(exe, trainer_prog, dev_count, sum_cost, avg_cost,
lr_scheduler, token_num, predict, test_program)
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost,
avg_cost, token_num, predict, pyreader)
else:
print("environment var TRAINER_ROLE should be TRAINER os PSERVER")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册