Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
68b60040
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2299
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
68b60040
编写于
9月 01, 2018
作者:
Y
yi.wu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix ut
上级
ec773f90
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
1716 addition
and
211 deletion
+1716
-211
python/paddle/fluid/tests/unittests/dist_transformer.py
python/paddle/fluid/tests/unittests/dist_transformer.py
+1649
-207
python/paddle/fluid/tests/unittests/test_dist_se_resnext.py
python/paddle/fluid/tests/unittests/test_dist_se_resnext.py
+11
-0
python/paddle/fluid/tests/unittests/test_dist_transformer.py
python/paddle/fluid/tests/unittests/test_dist_transformer.py
+44
-4
python/paddle/fluid/tests/unittests/test_dist_word2vec.py
python/paddle/fluid/tests/unittests/test_dist_word2vec.py
+11
-0
python/paddle/fluid/transpiler/distribute_transpiler.py
python/paddle/fluid/transpiler/distribute_transpiler.py
+1
-0
未找到文件。
python/paddle/fluid/tests/unittests/dist_transformer.py
浏览文件 @
68b60040
...
...
@@ -18,54 +18,129 @@ import numpy as np
import
argparse
import
time
import
math
import
os
import
sys
import
six
import
argparse
import
ast
import
multiprocessing
import
time
from
functools
import
partial
from
os.path
import
expanduser
import
glob
import
random
import
tarfile
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid.layers
as
layers
from
paddle.fluid
import
core
import
os
import
sys
import
six
import
transformer_model
import
paddle.dataset.wmt16
as
wmt16
from
test_dist_base
import
TestDistRunnerBase
,
runtime_main
from
paddle.compat
import
long_type
import
hashlib
from
paddle.fluid.transpiler.details
import
program_to_code
const_para_attr
=
fluid
.
ParamAttr
(
initializer
=
fluid
.
initializer
.
Constant
(
0.001
))
const_bias_attr
=
const_para_attr
# Fix seed for test
fluid
.
default_startup_program
().
random_seed
=
1
fluid
.
default_main_program
().
random_seed
=
1
WMT16_RECORDIO_FILE
=
"/tmp/wmt16.recordio"
#from transformer_config import ModelHyperParams, TrainTaskConfig, merge_cfg_from_list
class
TrainTaskConfig
(
object
):
# only support GPU currently
use_gpu
=
True
# the epoch number to train.
pass_num
=
1
# the number of sequences contained in a mini-batch.
# deprecated, set batch_size in args.
batch_size
=
20
# the hyper parameters for Adam optimizer.
# This static learning_rate will be multiplied to the LearningRateScheduler
# derived learning rate the to get the final learning rate.
learning_rate
=
1
beta1
=
0.9
beta2
=
0.98
eps
=
1e-9
# the parameters for learning rate scheduling.
warmup_steps
=
4000
# the weight used to mix up the ground-truth distribution and the fixed
# uniform distribution in label smoothing when training.
# Set this as zero if label smoothing is not wanted.
label_smooth_eps
=
0.1
# the directory for saving trained models.
model_dir
=
"trained_models"
# the directory for saving checkpoints.
ckpt_dir
=
"trained_ckpts"
# the directory for loading checkpoint.
# If provided, continue training from the checkpoint.
ckpt_path
=
None
# the parameter to initialize the learning rate scheduler.
# It should be provided if use checkpoints, since the checkpoint doesn't
# include the training step counter currently.
start_step
=
0
class
ModelHyperParams
(
object
):
# Dictionary size for source and target language. This model directly uses
# paddle.dataset.wmt16 in which <bos>, <eos> and <unk> token has
# alreay been added, but the <pad> token is not added. Transformer requires
# sequences in a mini-batch are padded to have the same length. A <pad> token is
# added into the original dictionary in paddle.dateset.wmt16.
check_acc
=
True
# size of source word dictionary.
src_vocab_size
=
10000
# index for <pad> token in source language.
src_pad_idx
=
src_vocab_size
data_path
=
expanduser
(
"~"
)
+
(
"/.cache/paddle/dataset/test_dist_transformer/"
)
src_vocab_fpath
=
data_path
+
"vocab.bpe.32000"
trg_vocab_fpath
=
data_path
+
"vocab.bpe.32000"
train_file_pattern
=
data_path
+
"train.tok.clean.bpe.32000.en-de"
val_file_pattern
=
data_path
+
"newstest2013.tok.bpe.32000.en-de"
pool_size
=
2000
sort_type
=
None
local
=
True
shuffle
=
False
shuffle_batch
=
False
special_token
=
[
'<s>'
,
'<e>'
,
'<unk>'
]
token_delimiter
=
' '
use_token_batch
=
False
# size of target word dictionay
trg_vocab_size
=
10000
# index for <pad> token in target language.
trg_pad_idx
=
trg_vocab_size
# position value corresponding to the <pad> token.
pos_pad_idx
=
0
class
InferTaskConfig
(
object
):
use_gpu
=
True
# the number of examples in one run for sequence generation.
batch_size
=
10
# the parameters for beam search.
beam_size
=
5
max_out_len
=
256
# the number of decoded sentences to output.
n_best
=
1
# the flags indicating whether to output the special tokens.
output_bos
=
False
output_eos
=
False
output_unk
=
True
# the directory for loading the trained model.
model_path
=
"trained_models/pass_1.infer.model"
# max length of sequences. It should plus 1 to include position
# padding token for position encoding.
max_length
=
50
class
ModelHyperParams
(
object
):
# These following five vocabularies related configurations will be set
# automatically according to the passed vocabulary path and special tokens.
# size of source word dictionary.
src_vocab_size
=
10000
# size of target word dictionay
trg_vocab_size
=
10000
# index for <bos> token
bos_idx
=
0
# index for <eos> token
eos_idx
=
1
# index for <unk> token
unk_idx
=
2
# max length of sequences deciding the size of position encoding table.
# Start from 1 and count start and end tokens in.
max_length
=
256
# the dimension for word embeddings, which is also the last dimension of
# the input and output of multi-head attention, position-wise feed-forward
# networks, encoder and decoder.
d_model
=
512
# size of the hidden layer in position-wise feed-forward networks.
d_inner_hid
=
1024
d_inner_hid
=
2048
# the dimension that keys are projected to for dot-product attention.
d_key
=
64
# the dimension that values are projected to for dot-product attention.
...
...
@@ -75,46 +150,195 @@ class ModelHyperParams(object):
# number of sub-layers to be stacked in the encoder and decoder.
n_layer
=
6
# dropout rate used by all dropout layers.
dropout
=
0.1
dropout
=
0.0
# no random
# random seed used in dropout for CE.
dropout_seed
=
None
# the flag indicating whether to share embedding and softmax weights.
# vocabularies in source and target should be same for weight sharing.
weight_sharing
=
True
def
prepare_batch_input
(
insts
,
src_pad_idx
,
trg_pad_idx
,
n_head
):
def
merge_cfg_from_list
(
cfg_list
,
g_cfgs
):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias. Then, convert the numpy
data to tensors and return a dict mapping names to tensors.
Set the above global configurations using the cfg_list.
"""
assert
len
(
cfg_list
)
%
2
==
0
for
key
,
value
in
zip
(
cfg_list
[
0
::
2
],
cfg_list
[
1
::
2
]):
for
g_cfg
in
g_cfgs
:
if
hasattr
(
g_cfg
,
key
):
try
:
value
=
eval
(
value
)
except
Exception
:
# for file path
pass
setattr
(
g_cfg
,
key
,
value
)
break
# The placeholder for batch_size in compile time. Must be -1 currently to be
# consistent with some ops' infer-shape output in compile time, such as the
# sequence_expand op used in beamsearch decoder.
batch_size
=
-
1
# The placeholder for squence length in compile time.
seq_len
=
ModelHyperParams
.
max_length
# Here list the data shapes and data types of all inputs.
# The shapes here act as placeholder and are set to pass the infer-shape in
# compile time.
input_descs
=
{
# The actual data shape of src_word is:
# [batch_size * max_src_len_in_batch, 1]
"src_word"
:
[(
batch_size
,
seq_len
,
long_type
(
1
)),
"int64"
,
2
],
# The actual data shape of src_pos is:
# [batch_size * max_src_len_in_batch, 1]
"src_pos"
:
[(
batch_size
,
seq_len
,
long_type
(
1
)),
"int64"
],
# This input is used to remove attention weights on paddings in the
# encoder.
# The actual data shape of src_slf_attn_bias is:
# [batch_size, n_head, max_src_len_in_batch, max_src_len_in_batch]
"src_slf_attn_bias"
:
[(
batch_size
,
ModelHyperParams
.
n_head
,
seq_len
,
seq_len
),
"float32"
],
# The actual data shape of trg_word is:
# [batch_size * max_trg_len_in_batch, 1]
"trg_word"
:
[(
batch_size
,
seq_len
,
long_type
(
1
)),
"int64"
,
2
],
# lod_level is only used in fast decoder.
# The actual data shape of trg_pos is:
# [batch_size * max_trg_len_in_batch, 1]
"trg_pos"
:
[(
batch_size
,
seq_len
,
long_type
(
1
)),
"int64"
],
# This input is used to remove attention weights on paddings and
# subsequent words in the decoder.
# The actual data shape of trg_slf_attn_bias is:
# [batch_size, n_head, max_trg_len_in_batch, max_trg_len_in_batch]
"trg_slf_attn_bias"
:
[(
batch_size
,
ModelHyperParams
.
n_head
,
seq_len
,
seq_len
),
"float32"
],
# This input is used to remove attention weights on paddings of the source
# input in the encoder-decoder attention.
# The actual data shape of trg_src_attn_bias is:
# [batch_size, n_head, max_trg_len_in_batch, max_src_len_in_batch]
"trg_src_attn_bias"
:
[(
batch_size
,
ModelHyperParams
.
n_head
,
seq_len
,
seq_len
),
"float32"
],
# This input is used in independent decoder program for inference.
# The actual data shape of enc_output is:
# [batch_size, max_src_len_in_batch, d_model]
"enc_output"
:
[(
batch_size
,
seq_len
,
ModelHyperParams
.
d_model
),
"float32"
],
# The actual data shape of label_word is:
# [batch_size * max_trg_len_in_batch, 1]
"lbl_word"
:
[(
batch_size
*
seq_len
,
long_type
(
1
)),
"int64"
],
# This input is used to mask out the loss of paddding tokens.
# The actual data shape of label_weight is:
# [batch_size * max_trg_len_in_batch, 1]
"lbl_weight"
:
[(
batch_size
*
seq_len
,
long_type
(
1
)),
"float32"
],
# These inputs are used to change the shape tensor in beam-search decoder.
"trg_slf_attn_pre_softmax_shape_delta"
:
[(
long_type
(
2
),
),
"int32"
],
"trg_slf_attn_post_softmax_shape_delta"
:
[(
long_type
(
4
),
),
"int32"
],
"init_score"
:
[(
batch_size
,
long_type
(
1
)),
"float32"
],
}
# Names of word embedding table which might be reused for weight sharing.
word_emb_param_names
=
(
"src_word_emb_table"
,
"trg_word_emb_table"
,
)
# Names of position encoding table which will be initialized externally.
pos_enc_param_names
=
(
"src_pos_enc_table"
,
"trg_pos_enc_table"
,
)
# separated inputs for different usages.
encoder_data_input_fields
=
(
"src_word"
,
"src_pos"
,
"src_slf_attn_bias"
,
)
decoder_data_input_fields
=
(
"trg_word"
,
"trg_pos"
,
"trg_slf_attn_bias"
,
"trg_src_attn_bias"
,
"enc_output"
,
)
label_data_input_fields
=
(
"lbl_word"
,
"lbl_weight"
,
)
# In fast decoder, trg_pos (only containing the current time step) is generated
# by ops and trg_slf_attn_bias is not needed.
fast_decoder_data_input_fields
=
(
"trg_word"
,
"init_score"
,
"trg_src_attn_bias"
,
)
# fast_decoder_util_input_fields = (
# "trg_slf_attn_pre_softmax_shape_delta",
# "trg_slf_attn_post_softmax_shape_delta", )
#from optim import LearningRateScheduler
class
LearningRateScheduler
(
object
):
"""
Wrapper for learning rate scheduling as described in the Transformer paper.
LearningRateScheduler adapts the learning rate externally and the adapted
learning rate will be feeded into the main_program as input data.
"""
def
__init__
(
self
,
d_model
,
warmup_steps
,
learning_rate
=
0.001
,
current_steps
=
0
,
name
=
"learning_rate"
):
self
.
current_steps
=
current_steps
self
.
warmup_steps
=
warmup_steps
self
.
d_model
=
d_model
self
.
static_lr
=
learning_rate
self
.
learning_rate
=
layers
.
create_global_var
(
name
=
name
,
shape
=
[
1
],
value
=
float
(
learning_rate
),
dtype
=
"float32"
,
persistable
=
True
)
def
__pad_batch_data
(
insts
,
def
update_learning_rate
(
self
):
self
.
current_steps
+=
1
lr_value
=
np
.
power
(
self
.
d_model
,
-
0.5
)
*
np
.
min
([
np
.
power
(
self
.
current_steps
,
-
0.5
),
np
.
power
(
self
.
warmup_steps
,
-
1.5
)
*
self
.
current_steps
])
*
self
.
static_lr
return
np
.
array
([
lr_value
],
dtype
=
"float32"
)
#from transformer_train import train_loop
def
pad_batch_data
(
insts
,
pad_idx
,
n_head
,
is_target
=
False
,
return_pos
=
Tru
e
,
is_label
=
Fals
e
,
return_attn_bias
=
True
,
return_max_len
=
True
):
return_max_len
=
True
,
return_num_token
=
False
):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias.
"""
return_list
=
[]
max_len
=
max
(
len
(
inst
)
for
inst
in
insts
)
num_token
=
reduce
(
lambda
x
,
y
:
x
+
y
,
[
len
(
inst
)
for
inst
in
insts
])
if
return_num_token
else
0
# Any token included in dict can be used to pad, since the paddings' loss
# will be masked out by weights and make no effect on parameter gradients.
inst_data
=
np
.
array
(
[
inst
+
[
pad_idx
]
*
(
max_len
-
len
(
inst
))
for
inst
in
insts
])
return_list
+=
[
inst_data
.
astype
(
"int64"
).
reshape
([
-
1
,
1
])]
if
return_pos
:
inst_pos
=
np
.
array
([[
pos_i
+
1
if
w_i
!=
pad_idx
else
0
for
pos_i
,
w_i
in
enumerate
(
inst
)
]
for
inst
in
inst_data
])
if
is_label
:
# label weight
inst_weight
=
np
.
array
(
[[
1.
]
*
len
(
inst
)
+
[
0.
]
*
(
max_len
-
len
(
inst
))
for
inst
in
insts
])
return_list
+=
[
inst_weight
.
astype
(
"float32"
).
reshape
([
-
1
,
1
])]
else
:
# position data
inst_pos
=
np
.
array
([
range
(
1
,
len
(
inst
)
+
1
)
+
[
0
]
*
(
max_len
-
len
(
inst
))
for
inst
in
insts
])
return_list
+=
[
inst_pos
.
astype
(
"int64"
).
reshape
([
-
1
,
1
])]
if
return_attn_bias
:
if
is_target
:
# This is used to avoid attention on paddings and subsequent
# words.
slf_attn_bias_data
=
np
.
ones
((
inst_data
.
shape
[
0
],
max_len
,
max_len
))
slf_attn_bias_data
=
np
.
triu
(
slf_attn_bias_data
,
1
).
reshape
(
[
-
1
,
1
,
max_len
,
max_len
])
slf_attn_bias_data
=
np
.
ones
((
inst_data
.
shape
[
0
],
max_len
,
max_len
))
slf_attn_bias_data
=
np
.
triu
(
slf_attn_bias_data
,
1
).
reshape
([
-
1
,
1
,
max_len
,
max_len
])
slf_attn_bias_data
=
np
.
tile
(
slf_attn_bias_data
,
[
1
,
n_head
,
1
,
1
])
*
[
-
1e9
]
else
:
...
...
@@ -128,157 +352,1375 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
return_list
+=
[
slf_attn_bias_data
.
astype
(
"float32"
)]
if
return_max_len
:
return_list
+=
[
max_len
]
if
return_num_token
:
return_list
+=
[
num_token
]
return
return_list
if
len
(
return_list
)
>
1
else
return_list
[
0
]
src_word
,
src_pos
,
src_slf_attn_bias
,
src_max_len
=
__pad_batch_data
(
[
inst
[
0
]
for
inst
in
insts
],
src_pad_idx
,
is_target
=
False
)
trg_word
,
trg_pos
,
trg_slf_attn_bias
,
trg_max_len
=
__pad_batch_data
(
[
inst
[
1
]
for
inst
in
insts
],
trg_pad_idx
,
is_target
=
True
)
def
prepare_batch_input
(
insts
,
data_input_names
,
src_pad_idx
,
trg_pad_idx
,
n_head
,
d_model
):
"""
Put all padded data needed by training into a dict.
"""
src_word
,
src_pos
,
src_slf_attn_bias
,
src_max_len
=
pad_batch_data
(
[
inst
[
0
]
for
inst
in
insts
],
src_pad_idx
,
n_head
,
is_target
=
False
)
src_word
=
src_word
.
reshape
(
-
1
,
src_max_len
,
1
)
src_pos
=
src_pos
.
reshape
(
-
1
,
src_max_len
,
1
)
trg_word
,
trg_pos
,
trg_slf_attn_bias
,
trg_max_len
=
pad_batch_data
(
[
inst
[
1
]
for
inst
in
insts
],
trg_pad_idx
,
n_head
,
is_target
=
True
)
trg_word
=
trg_word
.
reshape
(
-
1
,
trg_max_len
,
1
)
trg_pos
=
trg_pos
.
reshape
(
-
1
,
trg_max_len
,
1
)
trg_src_attn_bias
=
np
.
tile
(
src_slf_attn_bias
[:,
:,
::
src_max_len
,
:],
[
1
,
1
,
trg_max_len
,
1
]).
astype
(
"float32"
)
lbl_word
=
__pad_batch_data
([
inst
[
2
]
for
inst
in
insts
],
trg_pad_idx
,
False
,
False
,
False
,
False
)
lbl_weight
=
(
lbl_word
!=
trg_pad_idx
).
astype
(
"float32"
).
reshape
([
-
1
,
1
])
return
[
src_word
,
src_pos
,
trg_word
,
trg_pos
,
src_slf_attn_bias
,
lbl_word
,
lbl_weight
,
num_token
=
pad_batch_data
(
[
inst
[
2
]
for
inst
in
insts
],
trg_pad_idx
,
n_head
,
is_target
=
False
,
is_label
=
True
,
return_attn_bias
=
False
,
return_max_len
=
False
,
return_num_token
=
True
)
data_input_dict
=
dict
(
zip
(
data_input_names
,
[
src_word
,
src_pos
,
src_slf_attn_bias
,
trg_word
,
trg_pos
,
trg_slf_attn_bias
,
trg_src_attn_bias
,
lbl_word
,
lbl_weight
]))
return
data_input_dict
,
np
.
asarray
([
num_token
],
dtype
=
"float32"
)
def
read_multiple
(
reader
,
count
,
clip_last
=
True
):
"""
Stack data from reader for multi-devices.
"""
def
__impl__
():
res
=
[]
for
item
in
reader
():
res
.
append
(
item
)
if
len
(
res
)
==
count
:
yield
res
res
=
[]
if
len
(
res
)
==
count
:
yield
res
elif
not
clip_last
:
data
=
[]
for
item
in
res
:
data
+=
item
if
len
(
data
)
>
count
:
inst_num_per_part
=
len
(
data
)
//
count
yield
[
data
[
inst_num_per_part
*
i
:
inst_num_per_part
*
(
i
+
1
)]
for
i
in
range
(
count
)
]
return
__impl__
def
split_data
(
data
,
num_part
):
"""
Split data for each device.
"""
if
len
(
data
)
==
num_part
:
return
data
data
=
data
[
0
]
inst_num_per_part
=
len
(
data
)
//
num_part
return
[
data
[
inst_num_per_part
*
i
:
inst_num_per_part
*
(
i
+
1
)]
for
i
in
range
(
num_part
)
]
def
test_context
(
train_progm
,
avg_cost
,
train_exe
,
dev_count
,
data_input_names
,
sum_cost
,
token_num
):
# Context to do validation.
test_program
=
train_progm
.
clone
()
with
fluid
.
program_guard
(
test_program
):
test_program
=
fluid
.
io
.
get_inference_program
([
avg_cost
])
val_data
=
DataReader
(
src_vocab_fpath
=
TrainTaskConfig
.
src_vocab_fpath
,
trg_vocab_fpath
=
TrainTaskConfig
.
trg_vocab_fpath
,
fpattern
=
TrainTaskConfig
.
val_file_pattern
,
token_delimiter
=
TrainTaskConfig
.
token_delimiter
,
use_token_batch
=
TrainTaskConfig
.
use_token_batch
,
batch_size
=
TrainTaskConfig
.
batch_size
*
(
1
if
TrainTaskConfig
.
use_token_batch
else
dev_count
),
pool_size
=
TrainTaskConfig
.
pool_size
,
sort_type
=
TrainTaskConfig
.
sort_type
,
start_mark
=
TrainTaskConfig
.
special_token
[
0
],
end_mark
=
TrainTaskConfig
.
special_token
[
1
],
unk_mark
=
TrainTaskConfig
.
special_token
[
2
],
# count start and end tokens out
max_length
=
ModelHyperParams
.
max_length
-
2
,
clip_last_batch
=
False
,
shuffle
=
False
,
shuffle_batch
=
False
)
build_strategy
=
fluid
.
BuildStrategy
()
strategy
=
fluid
.
ExecutionStrategy
()
strategy
.
num_threads
=
1
test_exe
=
fluid
.
ParallelExecutor
(
use_cuda
=
TrainTaskConfig
.
use_gpu
,
main_program
=
test_program
,
share_vars_from
=
train_exe
,
build_strategy
=
build_strategy
,
exec_strategy
=
strategy
)
def
test
(
exe
=
test_exe
):
test_total_cost
=
0
test_total_token
=
0
test_data
=
read_multiple
(
reader
=
val_data
.
batch_generator
,
count
=
dev_count
if
TrainTaskConfig
.
use_token_batch
else
1
)
for
batch_id
,
data
in
enumerate
(
test_data
()):
feed_list
=
[]
for
place_id
,
data_buffer
in
enumerate
(
split_data
(
data
,
num_part
=
dev_count
)):
data_input_dict
,
_
=
prepare_batch_input
(
data_buffer
,
data_input_names
,
ModelHyperParams
.
eos_idx
,
ModelHyperParams
.
eos_idx
,
ModelHyperParams
.
n_head
,
ModelHyperParams
.
d_model
)
feed_list
.
append
(
data_input_dict
)
outs
=
exe
.
run
(
feed
=
feed_list
,
fetch_list
=
[
sum_cost
.
name
,
token_num
.
name
])
sum_cost_val
,
token_num_val
=
np
.
array
(
outs
[
0
]),
np
.
array
(
outs
[
1
])
test_total_cost
+=
sum_cost_val
.
sum
()
test_total_token
+=
token_num_val
.
sum
()
test_avg_cost
=
test_total_cost
/
test_total_token
test_ppl
=
np
.
exp
([
min
(
test_avg_cost
,
100
)])
return
test_avg_cost
,
test_ppl
return
test
def
train_loop
(
exe
,
train_progm
,
dev_count
,
sum_cost
,
avg_cost
,
lr_scheduler
,
token_num
,
predict
):
# Initialize the parameters.
if
TrainTaskConfig
.
ckpt_path
:
lr_scheduler
.
current_steps
=
TrainTaskConfig
.
start_step
else
:
exe
.
run
(
fluid
.
framework
.
default_startup_program
())
train_data
=
DataReader
(
src_vocab_fpath
=
TrainTaskConfig
.
src_vocab_fpath
,
trg_vocab_fpath
=
TrainTaskConfig
.
trg_vocab_fpath
,
fpattern
=
TrainTaskConfig
.
train_file_pattern
,
token_delimiter
=
TrainTaskConfig
.
token_delimiter
,
use_token_batch
=
TrainTaskConfig
.
use_token_batch
,
batch_size
=
TrainTaskConfig
.
batch_size
*
(
1
if
TrainTaskConfig
.
use_token_batch
else
dev_count
),
pool_size
=
TrainTaskConfig
.
pool_size
,
sort_type
=
TrainTaskConfig
.
sort_type
,
shuffle
=
TrainTaskConfig
.
shuffle
,
shuffle_batch
=
TrainTaskConfig
.
shuffle_batch
,
start_mark
=
TrainTaskConfig
.
special_token
[
0
],
end_mark
=
TrainTaskConfig
.
special_token
[
1
],
unk_mark
=
TrainTaskConfig
.
special_token
[
2
],
# count start and end tokens out
max_length
=
ModelHyperParams
.
max_length
-
2
,
clip_last_batch
=
False
)
train_data
=
read_multiple
(
reader
=
train_data
.
batch_generator
,
count
=
dev_count
if
TrainTaskConfig
.
use_token_batch
else
1
)
build_strategy
=
fluid
.
BuildStrategy
()
# Since the token number differs among devices, customize gradient scale to
# use token average cost among multi-devices. and the gradient scale is
# `1 / token_number` for average cost.
build_strategy
.
gradient_scale_strategy
=
fluid
.
BuildStrategy
.
GradientScaleStrategy
.
Customized
strategy
=
fluid
.
ExecutionStrategy
()
strategy
.
num_threads
=
1
train_exe
=
fluid
.
ParallelExecutor
(
use_cuda
=
TrainTaskConfig
.
use_gpu
,
loss_name
=
sum_cost
.
name
,
main_program
=
train_progm
,
build_strategy
=
build_strategy
,
exec_strategy
=
strategy
)
data_input_names
=
encoder_data_input_fields
+
decoder_data_input_fields
[:
-
1
]
+
label_data_input_fields
if
TrainTaskConfig
.
val_file_pattern
is
not
None
:
test
=
test_context
(
train_progm
,
avg_cost
,
train_exe
,
dev_count
,
data_input_names
,
sum_cost
,
token_num
)
# the best cross-entropy value with label smoothing
loss_normalizer
=
-
((
1.
-
TrainTaskConfig
.
label_smooth_eps
)
*
np
.
log
(
(
1.
-
TrainTaskConfig
.
label_smooth_eps
))
+
TrainTaskConfig
.
label_smooth_eps
*
np
.
log
(
TrainTaskConfig
.
label_smooth_eps
/
(
ModelHyperParams
.
trg_vocab_size
-
1
)
+
1e-20
))
init
=
False
for
pass_id
in
xrange
(
TrainTaskConfig
.
pass_num
):
pass_start_time
=
time
.
time
()
for
batch_id
,
data
in
enumerate
(
train_data
()):
if
batch_id
>=
5
:
break
feed_list
=
[]
total_num_token
=
0
#if TrainTaskConfig.local:
# lr_rate = lr_scheduler.update_learning_rate()
#for place_id, data_buffer in enumerate(
# split_data(
# data, num_part=dev_count)):
if
TrainTaskConfig
.
local
:
lr_rate
=
lr_scheduler
.
update_learning_rate
()
for
place_id
,
data_buffer
in
enumerate
(
split_data
(
data
,
num_part
=
dev_count
)):
data_input_dict
,
num_token
=
prepare_batch_input
(
data_buffer
,
data_input_names
,
ModelHyperParams
.
eos_idx
,
ModelHyperParams
.
eos_idx
,
ModelHyperParams
.
n_head
,
ModelHyperParams
.
d_model
)
total_num_token
+=
num_token
feed_kv_pairs
=
data_input_dict
.
items
()
if
TrainTaskConfig
.
local
:
feed_kv_pairs
+=
{
lr_scheduler
.
learning_rate
.
name
:
lr_rate
}.
items
()
feed_list
.
append
(
dict
(
feed_kv_pairs
))
if
not
init
:
for
pos_enc_param_name
in
pos_enc_param_names
:
pos_enc
=
position_encoding_init
(
ModelHyperParams
.
max_length
+
1
,
ModelHyperParams
.
d_model
)
feed_list
[
place_id
][
pos_enc_param_name
]
=
pos_enc
if
not
TrainTaskConfig
.
check_acc
:
for
feed_dict
in
feed_list
:
feed_dict
[
sum_cost
.
name
+
"@GRAD"
]
=
1.
/
total_num_token
else
:
b
=
100
*
TrainTaskConfig
.
batch_size
a
=
np
.
asarray
([
b
],
dtype
=
"float32"
)
for
feed_dict
in
feed_list
:
feed_dict
[
sum_cost
.
name
+
"@GRAD"
]
=
1.
/
a
outs
=
train_exe
.
run
(
fetch_list
=
[
sum_cost
.
name
,
token_num
.
name
],
feed
=
feed_list
)
sum_cost_val
,
token_num_val
=
np
.
array
(
outs
[
0
]),
np
.
array
(
outs
[
1
])
total_sum_cost
=
sum_cost_val
.
sum
()
total_token_num
=
token_num_val
.
sum
()
total_avg_cost
=
total_sum_cost
/
total_token_num
init
=
True
# Validate and save the model for inference.
if
TrainTaskConfig
.
val_file_pattern
is
not
None
:
val_avg_cost
,
val_ppl
=
test
()
print
(
"[%f]"
%
val_avg_cost
)
else
:
assert
(
False
)
#import transformer_reader as reader
class
SortType
(
object
):
GLOBAL
=
'global'
POOL
=
'pool'
NONE
=
"none"
class
Converter
(
object
):
def
__init__
(
self
,
vocab
,
beg
,
end
,
unk
,
delimiter
):
self
.
_vocab
=
vocab
self
.
_beg
=
beg
self
.
_end
=
end
self
.
_unk
=
unk
self
.
_delimiter
=
delimiter
def
__call__
(
self
,
sentence
):
return
[
self
.
_beg
]
+
[
self
.
_vocab
.
get
(
w
,
self
.
_unk
)
for
w
in
sentence
.
split
(
self
.
_delimiter
)
]
+
[
self
.
_end
]
class
ComposedConverter
(
object
):
def
__init__
(
self
,
converters
):
self
.
_converters
=
converters
def
__call__
(
self
,
parallel_sentence
):
return
[
self
.
_converters
[
i
](
parallel_sentence
[
i
])
for
i
in
range
(
len
(
self
.
_converters
))
]
class
SentenceBatchCreator
(
object
):
def
__init__
(
self
,
batch_size
):
self
.
batch
=
[]
self
.
_batch_size
=
batch_size
def
append
(
self
,
info
):
self
.
batch
.
append
(
info
)
if
len
(
self
.
batch
)
==
self
.
_batch_size
:
tmp
=
self
.
batch
self
.
batch
=
[]
return
tmp
class
TokenBatchCreator
(
object
):
def
__init__
(
self
,
batch_size
):
self
.
batch
=
[]
self
.
max_len
=
-
1
self
.
_batch_size
=
batch_size
def
append
(
self
,
info
):
cur_len
=
info
.
max_len
max_len
=
max
(
self
.
max_len
,
cur_len
)
if
max_len
*
(
len
(
self
.
batch
)
+
1
)
>
self
.
_batch_size
:
result
=
self
.
batch
self
.
batch
=
[
info
]
self
.
max_len
=
cur_len
return
result
else
:
self
.
max_len
=
max_len
self
.
batch
.
append
(
info
)
class
SampleInfo
(
object
):
def
__init__
(
self
,
i
,
max_len
,
min_len
):
self
.
i
=
i
self
.
min_len
=
min_len
self
.
max_len
=
max_len
class
MinMaxFilter
(
object
):
def
__init__
(
self
,
max_len
,
min_len
,
underlying_creator
):
self
.
_min_len
=
min_len
self
.
_max_len
=
max_len
self
.
_creator
=
underlying_creator
def
append
(
self
,
info
):
if
info
.
max_len
>
self
.
_max_len
or
info
.
min_len
<
self
.
_min_len
:
return
else
:
return
self
.
_creator
.
append
(
info
)
@
property
def
batch
(
self
):
return
self
.
_creator
.
batch
class
DataReader
(
object
):
"""
The data reader loads all data from files and produces batches of data
in the way corresponding to settings.
An example of returning a generator producing data batches whose data
is shuffled in each pass and sorted in each pool:
```
train_data = DataReader(
src_vocab_fpath='data/src_vocab_file',
trg_vocab_fpath='data/trg_vocab_file',
fpattern='data/part-*',
use_token_batch=True,
batch_size=2000,
pool_size=10000,
sort_type=SortType.POOL,
shuffle=True,
shuffle_batch=True,
start_mark='<s>',
end_mark='<e>',
unk_mark='<unk>',
clip_last_batch=False).batch_generator
```
:param src_vocab_fpath: The path of vocabulary file of source language.
:type src_vocab_fpath: basestring
:param trg_vocab_fpath: The path of vocabulary file of target language.
:type trg_vocab_fpath: basestring
:param fpattern: The pattern to match data files.
:type fpattern: basestring
:param batch_size: The number of sequences contained in a mini-batch.
or the maximum number of tokens (include paddings) contained in a
mini-batch.
:type batch_size: int
:param pool_size: The size of pool buffer.
:type pool_size: int
:param sort_type: The grain to sort by length: 'global' for all
instances; 'pool' for instances in pool; 'none' for no sort.
:type sort_type: basestring
:param clip_last_batch: Whether to clip the last uncompleted batch.
:type clip_last_batch: bool
:param tar_fname: The data file in tar if fpattern matches a tar file.
:type tar_fname: basestring
:param min_length: The minimum length used to filt sequences.
:type min_length: int
:param max_length: The maximum length used to filt sequences.
:type max_length: int
:param shuffle: Whether to shuffle all instances.
:type shuffle: bool
:param shuffle_batch: Whether to shuffle the generated batches.
:type shuffle_batch: bool
:param use_token_batch: Whether to produce batch data according to
token number.
:type use_token_batch: bool
:param field_delimiter: The delimiter used to split source and target in
each line of data file.
:type field_delimiter: basestring
:param token_delimiter: The delimiter used to split tokens in source or
target sentences.
:type token_delimiter: basestring
:param start_mark: The token representing for the beginning of
sentences in dictionary.
:type start_mark: basestring
:param end_mark: The token representing for the end of sentences
in dictionary.
:type end_mark: basestring
:param unk_mark: The token representing for unknown word in dictionary.
:type unk_mark: basestring
:param seed: The seed for random.
:type seed: int
"""
def
__init__
(
self
,
src_vocab_fpath
,
trg_vocab_fpath
,
fpattern
,
batch_size
,
pool_size
,
sort_type
=
SortType
.
GLOBAL
,
clip_last_batch
=
True
,
tar_fname
=
None
,
min_length
=
0
,
max_length
=
100
,
shuffle
=
True
,
shuffle_batch
=
False
,
use_token_batch
=
False
,
field_delimiter
=
"
\t
"
,
token_delimiter
=
" "
,
start_mark
=
"<s>"
,
end_mark
=
"<e>"
,
unk_mark
=
"<unk>"
,
seed
=
0
):
self
.
_src_vocab
=
self
.
load_dict
(
src_vocab_fpath
)
self
.
_only_src
=
True
if
trg_vocab_fpath
is
not
None
:
self
.
_trg_vocab
=
self
.
load_dict
(
trg_vocab_fpath
)
self
.
_only_src
=
False
self
.
_pool_size
=
pool_size
self
.
_batch_size
=
batch_size
self
.
_use_token_batch
=
use_token_batch
self
.
_sort_type
=
sort_type
self
.
_clip_last_batch
=
clip_last_batch
self
.
_shuffle
=
shuffle
self
.
_shuffle_batch
=
shuffle_batch
self
.
_min_length
=
min_length
self
.
_max_length
=
max_length
self
.
_field_delimiter
=
field_delimiter
self
.
_token_delimiter
=
token_delimiter
self
.
load_src_trg_ids
(
end_mark
,
fpattern
,
start_mark
,
tar_fname
,
unk_mark
)
self
.
_random
=
random
.
Random
(
x
=
seed
)
def
load_src_trg_ids
(
self
,
end_mark
,
fpattern
,
start_mark
,
tar_fname
,
unk_mark
):
converters
=
[
Converter
(
vocab
=
self
.
_src_vocab
,
beg
=
self
.
_src_vocab
[
start_mark
],
end
=
self
.
_src_vocab
[
end_mark
],
unk
=
self
.
_src_vocab
[
unk_mark
],
delimiter
=
self
.
_token_delimiter
)
]
if
not
self
.
_only_src
:
converters
.
append
(
Converter
(
vocab
=
self
.
_trg_vocab
,
beg
=
self
.
_trg_vocab
[
start_mark
],
end
=
self
.
_trg_vocab
[
end_mark
],
unk
=
self
.
_trg_vocab
[
unk_mark
],
delimiter
=
self
.
_token_delimiter
))
converters
=
ComposedConverter
(
converters
)
self
.
_src_seq_ids
=
[]
self
.
_trg_seq_ids
=
None
if
self
.
_only_src
else
[]
self
.
_sample_infos
=
[]
def
transformer
(
use_feed
):
assert
not
use_feed
,
"transfomer doesn't support feed yet"
return
transformer_model
.
transformer
(
ModelHyperParams
.
src_vocab_size
+
1
,
ModelHyperParams
.
trg_vocab_size
+
1
,
ModelHyperParams
.
max_length
+
1
,
ModelHyperParams
.
n_layer
,
ModelHyperParams
.
n_head
,
ModelHyperParams
.
d_key
,
ModelHyperParams
.
d_value
,
ModelHyperParams
.
d_model
,
ModelHyperParams
.
d_inner_hid
,
ModelHyperParams
.
dropout
,
ModelHyperParams
.
src_pad_idx
,
ModelHyperParams
.
trg_pad_idx
,
ModelHyperParams
.
pos_pad_idx
)
def
get_model
():
avg_cost
=
transformer
(
use_feed
=
False
)
optimizer
=
fluid
.
optimizer
.
Adam
()
optimizer
.
minimize
(
avg_cost
)
fluid
.
memory_optimize
(
fluid
.
default_main_program
())
return
avg_cost
def
get_transpiler
(
trainer_id
,
main_program
,
pserver_endpoints
,
trainers
):
t
=
fluid
.
DistributeTranspiler
()
t
.
transpile
(
trainer_id
=
trainer_id
,
program
=
main_program
,
pservers
=
pserver_endpoints
,
trainers
=
trainers
)
return
t
class
DistTransformer2x2
(
object
):
def
run_pserver
(
self
,
pserver_endpoints
,
trainers
,
current_endpoint
,
trainer_id
):
get_model
()
t
=
get_transpiler
(
trainer_id
,
fluid
.
default_main_program
(),
pserver_endpoints
,
trainers
)
pserver_prog
=
t
.
get_pserver_program
(
current_endpoint
)
startup_prog
=
t
.
get_startup_program
(
current_endpoint
,
pserver_prog
)
for
i
,
line
in
enumerate
(
self
.
_load_lines
(
fpattern
,
tar_fname
)):
src_trg_ids
=
converters
(
line
)
self
.
_src_seq_ids
.
append
(
src_trg_ids
[
0
])
lens
=
[
len
(
src_trg_ids
[
0
])]
if
not
self
.
_only_src
:
self
.
_trg_seq_ids
.
append
(
src_trg_ids
[
1
])
lens
.
append
(
len
(
src_trg_ids
[
1
]))
self
.
_sample_infos
.
append
(
SampleInfo
(
i
,
max
(
lens
),
min
(
lens
)))
def
_load_lines
(
self
,
fpattern
,
tar_fname
):
fpaths
=
glob
.
glob
(
fpattern
)
if
len
(
fpaths
)
==
1
and
tarfile
.
is_tarfile
(
fpaths
[
0
]):
if
tar_fname
is
None
:
raise
Exception
(
"If tar file provided, please set tar_fname."
)
f
=
tarfile
.
open
(
fpaths
[
0
],
"r"
)
for
line
in
f
.
extractfile
(
tar_fname
):
fields
=
line
.
strip
(
"
\n
"
).
split
(
self
.
_field_delimiter
)
if
(
not
self
.
_only_src
and
len
(
fields
)
==
2
)
or
(
self
.
_only_src
and
len
(
fields
)
==
1
):
yield
fields
else
:
for
fpath
in
fpaths
:
if
not
os
.
path
.
isfile
(
fpath
):
raise
IOError
(
"Invalid file: %s"
%
fpath
)
with
open
(
fpath
,
"r"
)
as
f
:
for
line
in
f
:
fields
=
line
.
strip
(
"
\n
"
).
split
(
self
.
_field_delimiter
)
if
(
not
self
.
_only_src
and
len
(
fields
)
==
2
)
or
(
self
.
_only_src
and
len
(
fields
)
==
1
):
yield
fields
@
staticmethod
def
load_dict
(
dict_path
,
reverse
=
False
):
word_dict
=
{}
with
open
(
dict_path
,
"r"
)
as
fdict
:
for
idx
,
line
in
enumerate
(
fdict
):
if
reverse
:
word_dict
[
idx
]
=
line
.
strip
(
"
\n
"
)
else
:
word_dict
[
line
.
strip
(
"
\n
"
)]
=
idx
return
word_dict
def
batch_generator
(
self
):
# global sort or global shuffle
if
self
.
_sort_type
==
SortType
.
GLOBAL
:
infos
=
sorted
(
self
.
_sample_infos
,
key
=
lambda
x
:
x
.
max_len
,
reverse
=
True
)
else
:
if
self
.
_shuffle
:
infos
=
self
.
_sample_infos
self
.
_random
.
shuffle
(
infos
)
else
:
infos
=
self
.
_sample_infos
if
self
.
_sort_type
==
SortType
.
POOL
:
for
i
in
range
(
0
,
len
(
infos
),
self
.
_pool_size
):
infos
[
i
:
i
+
self
.
_pool_size
]
=
sorted
(
infos
[
i
:
i
+
self
.
_pool_size
],
key
=
lambda
x
:
x
.
max_len
)
# concat batch
batches
=
[]
batch_creator
=
TokenBatchCreator
(
self
.
_batch_size
)
if
self
.
_use_token_batch
else
SentenceBatchCreator
(
self
.
_batch_size
)
batch_creator
=
MinMaxFilter
(
self
.
_max_length
,
self
.
_min_length
,
batch_creator
)
for
info
in
infos
:
batch
=
batch_creator
.
append
(
info
)
if
batch
is
not
None
:
batches
.
append
(
batch
)
if
not
self
.
_clip_last_batch
and
len
(
batch_creator
.
batch
)
!=
0
:
batches
.
append
(
batch_creator
.
batch
)
if
self
.
_shuffle_batch
:
self
.
_random
.
shuffle
(
batches
)
for
batch
in
batches
:
batch_ids
=
[
info
.
i
for
info
in
batch
]
if
self
.
_only_src
:
yield
[[
self
.
_src_seq_ids
[
idx
]]
for
idx
in
batch_ids
]
else
:
yield
[(
self
.
_src_seq_ids
[
idx
],
self
.
_trg_seq_ids
[
idx
][:
-
1
],
self
.
_trg_seq_ids
[
idx
][
1
:])
for
idx
in
batch_ids
]
#from transformer_model import transformer
def
position_encoding_init
(
n_position
,
d_pos_vec
):
"""
Generate the initial values for the sinusoid position encoding table.
"""
position_enc
=
np
.
array
([[
pos
/
np
.
power
(
10000
,
2
*
(
j
//
2
)
/
d_pos_vec
)
for
j
in
range
(
d_pos_vec
)
]
if
pos
!=
0
else
np
.
zeros
(
d_pos_vec
)
for
pos
in
range
(
n_position
)])
position_enc
[
1
:,
0
::
2
]
=
np
.
sin
(
position_enc
[
1
:,
0
::
2
])
# dim 2i
position_enc
[
1
:,
1
::
2
]
=
np
.
cos
(
position_enc
[
1
:,
1
::
2
])
# dim 2i+1
return
position_enc
.
astype
(
"float32"
)
def
multi_head_attention
(
queries
,
keys
,
values
,
attn_bias
,
d_key
,
d_value
,
d_model
,
n_head
=
1
,
dropout_rate
=
0.
,
cache
=
None
):
"""
Multi-Head Attention. Note that attn_bias is added to the logit before
computing softmax activiation to mask certain selected positions so that
they will not considered in attention weights.
"""
if
not
(
len
(
queries
.
shape
)
==
len
(
keys
.
shape
)
==
len
(
values
.
shape
)
==
3
):
raise
ValueError
(
"Inputs: quries, keys and values should all be 3-D tensors."
)
def
__compute_qkv
(
queries
,
keys
,
values
,
n_head
,
d_key
,
d_value
):
"""
Add linear projection to queries, keys, and values.
"""
q
=
layers
.
fc
(
input
=
queries
,
size
=
d_key
*
n_head
,
num_flatten_dims
=
2
,
param_attr
=
const_para_attr
,
bias_attr
=
const_bias_attr
)
k
=
layers
.
fc
(
input
=
keys
,
size
=
d_key
*
n_head
,
num_flatten_dims
=
2
,
param_attr
=
const_para_attr
,
bias_attr
=
const_bias_attr
)
v
=
layers
.
fc
(
input
=
values
,
size
=
d_value
*
n_head
,
num_flatten_dims
=
2
,
param_attr
=
const_para_attr
,
bias_attr
=
const_bias_attr
)
return
q
,
k
,
v
def
__split_heads
(
x
,
n_head
):
"""
Reshape the last dimension of inpunt tensor x so that it becomes two
dimensions and then transpose. Specifically, input a tensor with shape
[bs, max_sequence_length, n_head * hidden_dim] then output a tensor
with shape [bs, n_head, max_sequence_length, hidden_dim].
"""
if
n_head
==
1
:
return
x
hidden_size
=
x
.
shape
[
-
1
]
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
reshaped
=
layers
.
reshape
(
x
=
x
,
shape
=
[
0
,
0
,
n_head
,
hidden_size
//
n_head
])
# permuate the dimensions into:
# [batch_size, n_head, max_sequence_len, hidden_size_per_head]
return
layers
.
transpose
(
x
=
reshaped
,
perm
=
[
0
,
2
,
1
,
3
])
def
__combine_heads
(
x
):
"""
Transpose and then reshape the last two dimensions of inpunt tensor x
so that it becomes one dimension, which is reverse to __split_heads.
"""
if
len
(
x
.
shape
)
==
3
:
return
x
if
len
(
x
.
shape
)
!=
4
:
raise
ValueError
(
"Input(x) should be a 4-D Tensor."
)
trans_x
=
layers
.
transpose
(
x
,
perm
=
[
0
,
2
,
1
,
3
])
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
return
layers
.
reshape
(
x
=
trans_x
,
shape
=
map
(
int
,
[
0
,
0
,
trans_x
.
shape
[
2
]
*
trans_x
.
shape
[
3
]]))
def
scaled_dot_product_attention
(
q
,
k
,
v
,
attn_bias
,
d_model
,
dropout_rate
):
"""
Scaled Dot-Product Attention
"""
scaled_q
=
layers
.
scale
(
x
=
q
,
scale
=
d_model
**-
0.5
)
product
=
layers
.
matmul
(
x
=
scaled_q
,
y
=
k
,
transpose_y
=
True
)
if
attn_bias
:
product
+=
attn_bias
weights
=
layers
.
softmax
(
product
)
if
dropout_rate
:
weights
=
layers
.
dropout
(
weights
,
dropout_prob
=
dropout_rate
,
seed
=
ModelHyperParams
.
dropout_seed
,
is_test
=
False
)
out
=
layers
.
matmul
(
weights
,
v
)
return
out
q
,
k
,
v
=
__compute_qkv
(
queries
,
keys
,
values
,
n_head
,
d_key
,
d_value
)
if
cache
is
not
None
:
# use cache and concat time steps
k
=
cache
[
"k"
]
=
layers
.
concat
([
cache
[
"k"
],
k
],
axis
=
1
)
v
=
cache
[
"v"
]
=
layers
.
concat
([
cache
[
"v"
],
v
],
axis
=
1
)
q
=
__split_heads
(
q
,
n_head
)
k
=
__split_heads
(
k
,
n_head
)
v
=
__split_heads
(
v
,
n_head
)
ctx_multiheads
=
scaled_dot_product_attention
(
q
,
k
,
v
,
attn_bias
,
d_model
,
dropout_rate
)
out
=
__combine_heads
(
ctx_multiheads
)
# Project back to the model size.
proj_out
=
layers
.
fc
(
input
=
out
,
size
=
d_model
,
num_flatten_dims
=
2
,
param_attr
=
const_para_attr
,
bias_attr
=
const_bias_attr
)
return
proj_out
def
positionwise_feed_forward
(
x
,
d_inner_hid
,
d_hid
):
"""
Position-wise Feed-Forward Networks.
This module consists of two linear transformations with a ReLU activation
in between, which is applied to each position separately and identically.
"""
hidden
=
layers
.
fc
(
input
=
x
,
size
=
d_inner_hid
,
num_flatten_dims
=
2
,
act
=
"relu"
,
param_attr
=
const_para_attr
,
bias_attr
=
const_bias_attr
)
out
=
layers
.
fc
(
input
=
hidden
,
size
=
d_hid
,
num_flatten_dims
=
2
,
param_attr
=
const_para_attr
,
bias_attr
=
const_bias_attr
)
return
out
def
pre_post_process_layer
(
prev_out
,
out
,
process_cmd
,
dropout_rate
=
0.
):
"""
Add residual connection, layer normalization and droput to the out tensor
optionally according to the value of process_cmd.
This will be used before or after multi-head attention and position-wise
feed-forward networks.
"""
for
cmd
in
process_cmd
:
if
cmd
==
"a"
:
# add residual connection
out
=
out
+
prev_out
if
prev_out
else
out
elif
cmd
==
"n"
:
# add layer normalization
out
=
layers
.
layer_norm
(
out
,
begin_norm_axis
=
len
(
out
.
shape
)
-
1
,
param_attr
=
fluid
.
initializer
.
Constant
(
1.
),
bias_attr
=
fluid
.
initializer
.
Constant
(
0.
))
elif
cmd
==
"d"
:
# add dropout
if
dropout_rate
:
out
=
layers
.
dropout
(
out
,
dropout_prob
=
dropout_rate
,
seed
=
ModelHyperParams
.
dropout_seed
,
is_test
=
False
)
return
out
pre_process_layer
=
partial
(
pre_post_process_layer
,
None
)
post_process_layer
=
pre_post_process_layer
def
prepare_encoder
(
src_word
,
src_pos
,
src_vocab_size
,
src_emb_dim
,
src_max_len
,
dropout_rate
=
0.
,
word_emb_param_name
=
None
,
pos_enc_param_name
=
None
):
"""Add word embeddings and position encodings.
The output tensor has a shape of:
[batch_size, max_src_length_in_batch, d_model].
This module is used at the bottom of the encoder stacks.
"""
if
TrainTaskConfig
.
check_acc
:
src_word_emb
=
layers
.
embedding
(
src_word
,
size
=
[
src_vocab_size
,
src_emb_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
word_emb_param_name
,
initializer
=
fluid
.
initializer
.
ConstantInitializer
(
0.001
)))
else
:
src_word_emb
=
layers
.
embedding
(
src_word
,
size
=
[
src_vocab_size
,
src_emb_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
word_emb_param_name
,
initializer
=
fluid
.
initializer
.
Normal
(
0.
,
src_emb_dim
**-
0.5
)))
src_word_emb
=
layers
.
scale
(
x
=
src_word_emb
,
scale
=
src_emb_dim
**
0.5
)
src_pos_enc
=
layers
.
embedding
(
src_pos
,
size
=
[
src_max_len
,
src_emb_dim
],
param_attr
=
fluid
.
ParamAttr
(
name
=
pos_enc_param_name
,
trainable
=
False
,
initializer
=
fluid
.
initializer
.
ConstantInitializer
(
0.001
)))
enc_input
=
src_word_emb
+
src_pos_enc
return
layers
.
dropout
(
enc_input
,
dropout_prob
=
dropout_rate
,
seed
=
ModelHyperParams
.
dropout_seed
,
is_test
=
False
)
if
dropout_rate
else
enc_input
prepare_encoder
=
partial
(
prepare_encoder
,
pos_enc_param_name
=
pos_enc_param_names
[
0
])
prepare_decoder
=
partial
(
prepare_encoder
,
pos_enc_param_name
=
pos_enc_param_names
[
1
])
def
encoder_layer
(
enc_input
,
attn_bias
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
=
0.
):
"""The encoder layers that can be stacked to form a deep encoder.
This module consits of a multi-head (self) attention followed by
position-wise feed-forward networks and both the two components companied
with the post_process_layer to add residual connection, layer normalization
and droput.
"""
attn_output
=
multi_head_attention
(
enc_input
,
enc_input
,
enc_input
,
attn_bias
,
d_key
,
d_value
,
d_model
,
n_head
,
dropout_rate
)
attn_output
=
post_process_layer
(
enc_input
,
attn_output
,
"dan"
,
dropout_rate
)
ffd_output
=
positionwise_feed_forward
(
attn_output
,
d_inner_hid
,
d_model
)
return
post_process_layer
(
attn_output
,
ffd_output
,
"dan"
,
dropout_rate
)
def
encoder
(
enc_input
,
attn_bias
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
=
0.
):
"""
The encoder is composed of a stack of identical layers returned by calling
encoder_layer.
"""
for
i
in
range
(
n_layer
):
enc_output
=
encoder_layer
(
enc_input
,
attn_bias
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
)
enc_input
=
enc_output
return
enc_output
def
decoder_layer
(
dec_input
,
enc_output
,
slf_attn_bias
,
dec_enc_attn_bias
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
=
0.
,
cache
=
None
):
""" The layer to be stacked in decoder part.
The structure of this module is similar to that in the encoder part except
a multi-head attention is added to implement encoder-decoder attention.
"""
slf_attn_output
=
multi_head_attention
(
dec_input
,
dec_input
,
dec_input
,
slf_attn_bias
,
d_key
,
d_value
,
d_model
,
n_head
,
dropout_rate
,
cache
,
)
slf_attn_output
=
post_process_layer
(
dec_input
,
slf_attn_output
,
"dan"
,
# residual connection + dropout + layer normalization
dropout_rate
,
)
enc_attn_output
=
multi_head_attention
(
slf_attn_output
,
enc_output
,
enc_output
,
dec_enc_attn_bias
,
d_key
,
d_value
,
d_model
,
n_head
,
dropout_rate
,
)
enc_attn_output
=
post_process_layer
(
slf_attn_output
,
enc_attn_output
,
"dan"
,
# residual connection + dropout + layer normalization
dropout_rate
,
)
ffd_output
=
positionwise_feed_forward
(
enc_attn_output
,
d_inner_hid
,
d_model
,
)
dec_output
=
post_process_layer
(
enc_attn_output
,
ffd_output
,
"dan"
,
# residual connection + dropout + layer normalization
dropout_rate
,
)
return
dec_output
def
decoder
(
dec_input
,
enc_output
,
dec_slf_attn_bias
,
dec_enc_attn_bias
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
=
0.
,
caches
=
None
):
"""
The decoder is composed of a stack of identical decoder_layer layers.
"""
for
i
in
range
(
n_layer
):
cache
=
None
if
caches
is
not
None
:
cache
=
caches
[
i
]
dec_output
=
decoder_layer
(
dec_input
,
enc_output
,
dec_slf_attn_bias
,
dec_enc_attn_bias
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
cache
=
cache
)
dec_input
=
dec_output
return
dec_output
def
make_all_inputs
(
input_fields
):
"""
Define the input data layers for the transformer model.
"""
inputs
=
[]
for
input_field
in
input_fields
:
input_var
=
layers
.
data
(
name
=
input_field
,
shape
=
input_descs
[
input_field
][
0
],
dtype
=
input_descs
[
input_field
][
1
],
lod_level
=
input_descs
[
input_field
][
2
]
if
len
(
input_descs
[
input_field
])
==
3
else
0
,
append_batch_size
=
False
)
inputs
.
append
(
input_var
)
return
inputs
def
transformer
(
src_vocab_size
,
trg_vocab_size
,
max_length
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
,
label_smooth_eps
,
):
if
weight_sharing
:
assert
src_vocab_size
==
src_vocab_size
,
(
"Vocabularies in source and target should be same for weight sharing."
)
enc_inputs
=
make_all_inputs
(
encoder_data_input_fields
)
enc_output
=
wrap_encoder
(
src_vocab_size
,
max_length
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
,
enc_inputs
,
)
dec_inputs
=
make_all_inputs
(
decoder_data_input_fields
[:
-
1
])
predict
=
wrap_decoder
(
trg_vocab_size
,
max_length
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
,
dec_inputs
,
enc_output
,
)
# Padding index do not contribute to the total loss. The weights is used to
# cancel padding index in calculating the loss.
label
,
weights
=
make_all_inputs
(
label_data_input_fields
)
if
label_smooth_eps
:
label
=
layers
.
label_smooth
(
label
=
layers
.
one_hot
(
input
=
label
,
depth
=
trg_vocab_size
),
epsilon
=
label_smooth_eps
)
cost
=
layers
.
softmax_with_cross_entropy
(
logits
=
layers
.
reshape
(
predict
,
shape
=
[
-
1
,
trg_vocab_size
]),
label
=
label
,
soft_label
=
True
if
label_smooth_eps
else
False
)
weighted_cost
=
cost
*
weights
sum_cost
=
layers
.
reduce_sum
(
weighted_cost
)
token_num
=
layers
.
reduce_sum
(
weights
)
avg_cost
=
sum_cost
/
token_num
avg_cost
.
stop_gradient
=
True
return
sum_cost
,
avg_cost
,
predict
,
token_num
def
wrap_encoder
(
src_vocab_size
,
max_length
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
,
enc_inputs
=
None
):
"""
The wrapper assembles together all needed layers for the encoder.
"""
if
enc_inputs
is
None
:
# This is used to implement independent encoder program in inference.
src_word
,
src_pos
,
src_slf_attn_bias
=
\
make_all_inputs
(
encoder_data_input_fields
)
else
:
src_word
,
src_pos
,
src_slf_attn_bias
=
\
enc_inputs
enc_input
=
prepare_encoder
(
src_word
,
src_pos
,
src_vocab_size
,
d_model
,
max_length
,
dropout_rate
,
word_emb_param_name
=
word_emb_param_names
[
0
])
enc_output
=
encoder
(
enc_input
,
src_slf_attn_bias
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
)
return
enc_output
def
wrap_decoder
(
trg_vocab_size
,
max_length
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
,
dec_inputs
=
None
,
enc_output
=
None
,
caches
=
None
):
"""
The wrapper assembles together all needed layers for the decoder.
"""
if
dec_inputs
is
None
:
# This is used to implement independent decoder program in inference.
trg_word
,
trg_pos
,
trg_slf_attn_bias
,
trg_src_attn_bias
,
\
enc_output
=
make_all_inputs
(
decoder_data_input_fields
+
decoder_util_input_fields
)
else
:
trg_word
,
trg_pos
,
trg_slf_attn_bias
,
trg_src_attn_bias
=
dec_inputs
dec_input
=
prepare_decoder
(
trg_word
,
trg_pos
,
trg_vocab_size
,
d_model
,
max_length
,
dropout_rate
,
word_emb_param_name
=
word_emb_param_names
[
0
]
if
weight_sharing
else
word_emb_param_names
[
1
])
dec_output
=
decoder
(
dec_input
,
enc_output
,
trg_slf_attn_bias
,
trg_src_attn_bias
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
caches
=
caches
)
# Return logits for training and probs for inference.
if
weight_sharing
:
predict
=
layers
.
matmul
(
x
=
dec_output
,
y
=
fluid
.
get_var
(
word_emb_param_names
[
0
]),
transpose_y
=
True
)
else
:
predict
=
layers
.
fc
(
input
=
dec_output
,
size
=
trg_vocab_size
,
num_flatten_dims
=
2
,
param_attr
=
const_para_attr
,
bias_attr
=
const_bias_attr
)
if
dec_inputs
is
None
:
predict
=
layers
.
softmax
(
predict
)
return
predict
def
fast_decode
(
src_vocab_size
,
trg_vocab_size
,
max_in_len
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
,
beam_size
,
max_out_len
,
eos_idx
,
):
"""
Use beam search to decode. Caches will be used to store states of history
steps which can make the decoding faster.
"""
enc_output
=
wrap_encoder
(
src_vocab_size
,
max_in_len
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
)
start_tokens
,
init_scores
,
trg_src_attn_bias
=
\
make_all_inputs
(
fast_decoder_data_input_fields
)
def
beam_search
():
max_len
=
layers
.
fill_constant
(
shape
=
[
1
],
dtype
=
start_tokens
.
dtype
,
value
=
max_out_len
)
step_idx
=
layers
.
fill_constant
(
shape
=
[
1
],
dtype
=
start_tokens
.
dtype
,
value
=
0
)
cond
=
layers
.
less_than
(
x
=
step_idx
,
y
=
max_len
)
while_op
=
layers
.
While
(
cond
)
# array states will be stored for each step.
ids
=
layers
.
array_write
(
layers
.
reshape
(
start_tokens
,
(
-
1
,
1
)),
step_idx
)
scores
=
layers
.
array_write
(
init_scores
,
step_idx
)
# cell states will be overwrited at each step.
# caches contains states of history steps to reduce redundant
# computation in decoder.
caches
=
[{
"k"
:
layers
.
fill_constant_batch_size_like
(
input
=
start_tokens
,
shape
=
[
-
1
,
0
,
d_model
],
dtype
=
enc_output
.
dtype
,
value
=
0
),
"v"
:
layers
.
fill_constant_batch_size_like
(
input
=
start_tokens
,
shape
=
[
-
1
,
0
,
d_model
],
dtype
=
enc_output
.
dtype
,
value
=
0
)
}
for
i
in
range
(
n_layer
)]
with
while_op
.
block
():
pre_ids
=
layers
.
array_read
(
array
=
ids
,
i
=
step_idx
)
pre_ids
=
layers
.
reshape
(
pre_ids
,
(
-
1
,
1
,
1
))
pre_scores
=
layers
.
array_read
(
array
=
scores
,
i
=
step_idx
)
# sequence_expand can gather sequences according to lod thus can be
# used in beam search to sift states corresponding to selected ids.
pre_src_attn_bias
=
layers
.
sequence_expand
(
x
=
trg_src_attn_bias
,
y
=
pre_scores
)
pre_enc_output
=
layers
.
sequence_expand
(
x
=
enc_output
,
y
=
pre_scores
)
pre_caches
=
[{
"k"
:
layers
.
sequence_expand
(
x
=
cache
[
"k"
],
y
=
pre_scores
),
"v"
:
layers
.
sequence_expand
(
x
=
cache
[
"v"
],
y
=
pre_scores
),
}
for
cache
in
caches
]
pre_pos
=
layers
.
elementwise_mul
(
x
=
layers
.
fill_constant_batch_size_like
(
input
=
pre_enc_output
,
# cann't use pre_ids here since it has lod
value
=
1
,
shape
=
[
-
1
,
1
,
1
],
dtype
=
pre_ids
.
dtype
),
y
=
layers
.
increment
(
x
=
step_idx
,
value
=
1.0
,
in_place
=
False
),
axis
=
0
)
logits
=
wrap_decoder
(
trg_vocab_size
,
max_in_len
,
n_layer
,
n_head
,
d_key
,
d_value
,
d_model
,
d_inner_hid
,
dropout_rate
,
weight_sharing
,
dec_inputs
=
(
pre_ids
,
pre_pos
,
None
,
pre_src_attn_bias
),
enc_output
=
pre_enc_output
,
caches
=
pre_caches
)
logits
=
layers
.
reshape
(
logits
,
(
-
1
,
trg_vocab_size
))
topk_scores
,
topk_indices
=
layers
.
topk
(
input
=
layers
.
softmax
(
logits
),
k
=
beam_size
)
accu_scores
=
layers
.
elementwise_add
(
x
=
layers
.
log
(
topk_scores
),
y
=
layers
.
reshape
(
pre_scores
,
shape
=
[
-
1
]),
axis
=
0
)
# beam_search op uses lod to distinguish branches.
topk_indices
=
layers
.
lod_reset
(
topk_indices
,
pre_ids
)
selected_ids
,
selected_scores
=
layers
.
beam_search
(
pre_ids
=
pre_ids
,
pre_scores
=
pre_scores
,
ids
=
topk_indices
,
scores
=
accu_scores
,
beam_size
=
beam_size
,
end_id
=
eos_idx
)
layers
.
increment
(
x
=
step_idx
,
value
=
1.0
,
in_place
=
True
)
# update states
layers
.
array_write
(
selected_ids
,
i
=
step_idx
,
array
=
ids
)
layers
.
array_write
(
selected_scores
,
i
=
step_idx
,
array
=
scores
)
layers
.
assign
(
pre_src_attn_bias
,
trg_src_attn_bias
)
layers
.
assign
(
pre_enc_output
,
enc_output
)
for
i
in
range
(
n_layer
):
layers
.
assign
(
pre_caches
[
i
][
"k"
],
caches
[
i
][
"k"
])
layers
.
assign
(
pre_caches
[
i
][
"v"
],
caches
[
i
][
"v"
])
length_cond
=
layers
.
less_than
(
x
=
step_idx
,
y
=
max_len
)
finish_cond
=
layers
.
logical_not
(
layers
.
is_empty
(
x
=
selected_ids
))
layers
.
logical_and
(
x
=
length_cond
,
y
=
finish_cond
,
out
=
cond
)
finished_ids
,
finished_scores
=
layers
.
beam_search_decode
(
ids
,
scores
,
beam_size
=
beam_size
,
end_id
=
eos_idx
)
return
finished_ids
,
finished_scores
finished_ids
,
finished_scores
=
beam_search
()
return
finished_ids
,
finished_scores
def
get_model
(
is_dist
,
is_async
):
sum_cost
,
avg_cost
,
predict
,
token_num
=
transformer
(
ModelHyperParams
.
src_vocab_size
,
ModelHyperParams
.
trg_vocab_size
,
ModelHyperParams
.
max_length
+
1
,
ModelHyperParams
.
n_layer
,
ModelHyperParams
.
n_head
,
ModelHyperParams
.
d_key
,
ModelHyperParams
.
d_value
,
ModelHyperParams
.
d_model
,
ModelHyperParams
.
d_inner_hid
,
ModelHyperParams
.
dropout
,
ModelHyperParams
.
weight_sharing
,
TrainTaskConfig
.
label_smooth_eps
)
local_lr_scheduler
=
LearningRateScheduler
(
ModelHyperParams
.
d_model
,
TrainTaskConfig
.
warmup_steps
,
TrainTaskConfig
.
learning_rate
)
if
not
is_dist
:
optimizer
=
fluid
.
optimizer
.
Adam
(
learning_rate
=
local_lr_scheduler
.
learning_rate
,
beta1
=
TrainTaskConfig
.
beta1
,
beta2
=
TrainTaskConfig
.
beta2
,
epsilon
=
TrainTaskConfig
.
eps
)
optimizer
.
minimize
(
sum_cost
)
elif
is_async
:
optimizer
=
fluid
.
optimizer
.
SGD
(
0.003
)
optimizer
.
minimize
(
sum_cost
)
else
:
lr_decay
=
fluid
.
layers
\
.
learning_rate_scheduler
\
.
noam_decay
(
ModelHyperParams
.
d_model
,
TrainTaskConfig
.
warmup_steps
)
optimizer
=
fluid
.
optimizer
.
Adam
(
learning_rate
=
lr_decay
,
beta1
=
TrainTaskConfig
.
beta1
,
beta2
=
TrainTaskConfig
.
beta2
,
epsilon
=
TrainTaskConfig
.
eps
)
optimizer
.
minimize
(
sum_cost
)
return
sum_cost
,
avg_cost
,
predict
,
token_num
,
local_lr_scheduler
def
update_args
():
src_dict
=
DataReader
.
load_dict
(
TrainTaskConfig
.
src_vocab_fpath
)
trg_dict
=
DataReader
.
load_dict
(
TrainTaskConfig
.
trg_vocab_fpath
)
dict_args
=
[
"src_vocab_size"
,
str
(
len
(
src_dict
)),
"trg_vocab_size"
,
str
(
len
(
trg_dict
)),
"bos_idx"
,
str
(
src_dict
[
TrainTaskConfig
.
special_token
[
0
]]),
"eos_idx"
,
str
(
src_dict
[
TrainTaskConfig
.
special_token
[
1
]]),
"unk_idx"
,
str
(
src_dict
[
TrainTaskConfig
.
special_token
[
2
]])
]
merge_cfg_from_list
(
dict_args
,
[
TrainTaskConfig
,
ModelHyperParams
])
class
DistTransformer2x2
(
TestDistRunnerBase
):
def
run_pserver
(
self
,
args
):
get_model
(
True
,
not
args
.
sync_mode
)
t
=
self
.
get_transpiler
(
args
.
trainer_id
,
fluid
.
default_main_program
(),
args
.
endpoints
,
args
.
trainers
,
args
.
sync_mode
)
pserver_prog
=
t
.
get_pserver_program
(
args
.
current_endpoint
)
startup_prog
=
t
.
get_startup_program
(
args
.
current_endpoint
,
pserver_prog
)
place
=
fluid
.
CPUPlace
()
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
startup_prog
)
exe
.
run
(
pserver_prog
)
def
_wait_ps_ready
(
self
,
pid
):
retry_times
=
20
while
True
:
assert
retry_times
>=
0
,
"wait ps ready failed"
time
.
sleep
(
3
)
print
(
"waiting ps ready: "
,
pid
)
try
:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os
.
stat
(
"/tmp/paddle.%d.port"
%
pid
)
return
except
os
.
error
:
retry_times
-=
1
def
run_trainer
(
self
,
place
,
endpoints
,
trainer_id
,
trainers
,
is_dist
=
True
):
avg_cost
=
get_model
()
if
is_dist
:
t
=
get_transpiler
(
trainer_id
,
fluid
.
default_main_program
(),
endpoints
,
trainers
)
def
run_trainer
(
self
,
place
,
args
):
sum_cost
,
avg_cost
,
predict
,
token_num
,
local_lr_scheduler
=
get_model
(
args
.
is_dist
,
not
args
.
sync_mode
)
if
args
.
is_dist
:
t
=
self
.
get_transpiler
(
args
.
trainer_id
,
fluid
.
default_main_program
(),
args
.
endpoints
,
args
.
trainers
,
args
.
sync_mode
)
trainer_prog
=
t
.
get_trainer_program
()
TrainTaskConfig
.
batch_size
=
10
TrainTaskConfig
.
train_file_pattern
=
TrainTaskConfig
.
data_path
+
"train.tok.clean.bpe.32000.en-de.train_{}"
.
format
(
args
.
trainer_id
)
else
:
TrainTaskConfig
.
batch_size
=
20
trainer_prog
=
fluid
.
default_main_program
()
startup_exe
=
fluid
.
Executor
(
place
)
startup_exe
.
run
(
fluid
.
default_startup_program
())
strategy
=
fluid
.
ExecutionStrategy
()
strategy
.
num_threads
=
1
strategy
.
allow_op_delay
=
False
exe
=
fluid
.
ParallelExecutor
(
True
,
loss_name
=
avg_cost
.
name
,
exec_strategy
=
strategy
)
first_loss
,
=
exe
.
run
(
fetch_list
=
[
avg_cost
.
name
])
print
(
first_loss
)
for
i
in
six
.
moves
.
xrange
(
5
):
_
=
exe
.
run
(
fetch_list
=
[
avg_cost
.
name
])
last_loss
,
=
exe
.
run
(
fetch_list
=
[
avg_cost
.
name
])
print
(
last_loss
)
def
main
(
role
=
"pserver"
,
endpoints
=
"127.0.0.1:9123"
,
trainer_id
=
0
,
current_endpoint
=
"127.0.0.1:9123"
,
trainers
=
1
,
is_dist
=
True
):
reader
=
paddle
.
batch
(
wmt16
.
train
(
ModelHyperParams
.
src_vocab_size
,
ModelHyperParams
.
trg_vocab_size
),
batch_size
=
transformer_model
.
batch_size
)
with
fluid
.
recordio_writer
.
create_recordio_writer
(
WMT16_RECORDIO_FILE
)
as
writer
:
for
batch
in
reader
():
for
tensor
in
prepare_batch_input
(
batch
,
ModelHyperParams
.
src_pad_idx
,
ModelHyperParams
.
trg_pad_idx
,
ModelHyperParams
.
n_head
):
t
=
fluid
.
LoDTensor
()
t
.
set
(
tensor
,
fluid
.
CPUPlace
())
writer
.
append_tensor
(
t
)
writer
.
complete_append_tensor
()
model
=
DistTransformer2x2
()
if
role
==
"pserver"
:
model
.
run_pserver
(
endpoints
,
trainers
,
current_endpoint
,
trainer_id
)
else
:
p
=
fluid
.
CUDAPlace
(
0
)
if
core
.
is_compiled_with_cuda
(
)
else
fluid
.
CPUPlace
()
model
.
run_trainer
(
p
,
endpoints
,
trainer_id
,
trainers
,
is_dist
)
TrainTaskConfig
.
local
=
not
args
.
is_dist
train_loop
(
startup_exe
,
trainer_prog
,
1
,
sum_cost
,
avg_cost
,
local_lr_scheduler
,
token_num
,
predict
)
if
__name__
==
"__main__"
:
if
len
(
sys
.
argv
)
!=
7
:
print
(
"Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]"
)
role
=
sys
.
argv
[
1
]
endpoints
=
sys
.
argv
[
2
]
trainer_id
=
int
(
sys
.
argv
[
3
])
current_endpoint
=
sys
.
argv
[
4
]
trainers
=
int
(
sys
.
argv
[
5
])
is_dist
=
True
if
sys
.
argv
[
6
]
==
"TRUE"
else
False
main
(
role
=
role
,
endpoints
=
endpoints
,
trainer_id
=
trainer_id
,
current_endpoint
=
current_endpoint
,
trainers
=
trainers
,
is_dist
=
is_dist
)
update_args
()
runtime_main
(
DistTransformer2x2
)
python/paddle/fluid/tests/unittests/test_dist_se_resnext.py
浏览文件 @
68b60040
...
...
@@ -18,9 +18,20 @@ from test_dist_base import TestDistBase
class
TestDistSeResneXt2x2
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
def
test_se_resnext
(
self
):
self
.
check_with_place
(
"dist_se_resnext.py"
,
delta
=
1e-7
)
class
TestDistSeResneXt2x2Async
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
False
def
test_se_resnext
(
self
):
self
.
check_with_place
(
"dist_se_resnext.py"
,
delta
=
100
)
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_dist_transformer.py
浏览文件 @
68b60040
...
...
@@ -18,11 +18,51 @@ import unittest
from
test_dist_base
import
TestDistBase
class
TestDistTransformer2x2
(
TestDistBase
):
def
download_files
():
url_prefix
=
'http://paddle-unittest-data.cdn.bcebos.com/dist_transformer/'
vocab_url
=
url_prefix
+
'vocab.bpe.32000'
vocab_md5
=
'a86d345ca6e27f6591d0dccb1b9be853'
paddle
.
dataset
.
common
.
download
(
vocab_url
,
'test_dist_transformer'
,
vocab_md5
)
local_train_url
=
url_prefix
+
'train.tok.clean.bpe.32000.en-de'
local_train_md5
=
'033eb02b9449e6dd823f050782ac8914'
paddle
.
dataset
.
common
.
download
(
local_train_url
,
'test_dist_transformer'
,
local_train_md5
)
train0_url
=
url_prefix
+
'train.tok.clean.bpe.32000.en-de.train_0'
train0_md5
=
'ddce7f602f352a0405267285379a38b1'
paddle
.
dataset
.
common
.
download
(
train0_url
,
'test_dist_transformer'
,
train0_md5
)
train1_url
=
url_prefix
+
'train.tok.clean.bpe.32000.en-de.train_1'
train1_md5
=
'8757798200180285b1a619cd7f408747'
paddle
.
dataset
.
common
.
download
(
train1_url
,
'test_dist_transformer'
,
train1_md5
)
test_url
=
url_prefix
+
'newstest2013.tok.bpe.32000.en-de'
test_md5
=
'9dd74a266dbdb25314183899f269b4a2'
paddle
.
dataset
.
common
.
download
(
test_url
,
'test_dist_transformer'
,
test_md5
)
class
TestDistTransformer2x2Sync
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
def
test_transformer
(
self
):
download_files
()
#Note: loss on test dataset of the first 5 batch are:
# 10.518872, 10.518871, 10.518868, 10.518862, 10.518855
self
.
check_with_place
(
"dist_transformer.py"
,
delta
=
1e-7
)
class
TestDistTransformer2x2Async
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
False
def
test_transformer
(
self
):
# TODO(paddle-dev): check if the delta is OK.
# Usually start around ~8000 and converge to ~5000
self
.
check_with_place
(
"dist_transformer.py"
,
delta
=
400
)
download_files
()
self
.
check_with_place
(
"dist_transformer.py"
,
delta
=
1.0
)
if
__name__
==
"__main__"
:
...
...
python/paddle/fluid/tests/unittests/test_dist_word2vec.py
浏览文件 @
68b60040
...
...
@@ -18,9 +18,20 @@ from test_dist_base import TestDistBase
class
TestDistSeResneXt2x2
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
def
test_se_resnext
(
self
):
self
.
check_with_place
(
"dist_word2vec.py"
,
delta
=
1e-4
)
class
TestDistSeResneXt2x2Async
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
False
def
test_se_resnext
(
self
):
self
.
check_with_place
(
"dist_word2vec.py"
,
delta
=
1
)
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/transpiler/distribute_transpiler.py
浏览文件 @
68b60040
...
...
@@ -34,6 +34,7 @@ import math
import
sys
import
numpy
as
np
import
collections
import
random
from
.ps_dispatcher
import
RoundRobin
,
HashName
,
PSDispatcher
from
..
import
core
,
framework
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录