提交 662b6c93 编写于 作者: X Xin Pan

Add ParallelExecutor.

上级 caecc97a
......@@ -4,7 +4,7 @@ class TrainTaskConfig(object):
pass_num = 2
# number of sequences contained in a mini-batch.
batch_size = 32
batch_size = 64
# the hyper params for Adam optimizer.
learning_rate = 0.001
......
......@@ -387,60 +387,34 @@ def transformer(
src_pad_idx,
trg_pad_idx,
pos_pad_idx, ):
# The shapes here act as placeholder.
# The shapes set here is to pass the infer-shape in compile time. The actual
# shape of src_word in run time is:
# [batch_size * max_src_length_in_a_batch, 1].
src_word = layers.data(
name=input_data_names[0],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of src_pos in runtime is:
# [batch_size * max_src_length_in_a_batch, 1].
src_pos = layers.data(
name=input_data_names[1],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of trg_word is in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
trg_word = layers.data(
name=input_data_names[2],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of trg_pos in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
trg_pos = layers.data(
name=input_data_names[3],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
# The actual shape of src_slf_attn_bias in runtime is:
# [batch_size, n_head, max_src_length_in_a_batch, max_src_length_in_a_batch].
# This input is used to remove attention weights on paddings.
src_slf_attn_bias = layers.data(
name=input_data_names[4],
shape=[batch_size, n_head, max_length, max_length],
dtype="float32",
append_batch_size=False)
# The actual shape of trg_slf_attn_bias in runtime is:
# [batch_size, n_head, max_trg_length_in_batch, max_trg_length_in_batch].
# This is used to remove attention weights on paddings and subsequent words.
trg_slf_attn_bias = layers.data(
name=input_data_names[5],
shape=[batch_size, n_head, max_length, max_length],
dtype="float32",
append_batch_size=False)
# The actual shape of trg_src_attn_bias in runtime is:
# [batch_size, n_head, max_trg_length_in_batch, max_src_length_in_batch].
# This is used to remove attention weights on paddings.
trg_src_attn_bias = layers.data(
name=input_data_names[6],
shape=[batch_size, n_head, max_length, max_length],
dtype="float32",
append_batch_size=False)
file_obj = fluid.layers.open_recordio_file(
filename='./wmt16.recordio',
shapes=[
[batch_size * max_length, 1],
[batch_size * max_length, 1],
[batch_size * max_length, 1],
[batch_size * max_length, 1],
[batch_size, n_head, max_length, max_length],
[batch_size, n_head, max_length, max_length],
[batch_size, n_head, max_length, max_length],
[batch_size * max_length, 1],
[batch_size * max_length, 1],
],
dtypes=[
'int64',
'int64',
'int64',
'int64',
'float32',
'float32',
'float32',
'int64',
'float32',
],
lod_levels=[0] * 9)
src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, trg_slf_attn_bias, trg_src_attn_bias, gold, weights = fluid.layers.read_file(
file_obj)
enc_input = prepare_encoder(
src_word,
......@@ -492,22 +466,6 @@ def transformer(
num_flatten_dims=2),
shape=[-1, trg_vocab_size],
act="softmax")
# The actual shape of gold in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
gold = layers.data(
name=input_data_names[7],
shape=[batch_size * max_length, 1],
dtype="int64",
append_batch_size=False)
cost = layers.cross_entropy(input=predict, label=gold)
# The actual shape of weights in runtime is:
# [batch_size * max_trg_length_in_a_batch, 1].
# Padding index do not contribute to the total loss. This Weight is used to
# cancel padding index in calculating the loss.
weights = layers.data(
name=input_data_names[8],
shape=[batch_size * max_length, 1],
dtype="float32",
append_batch_size=False)
weighted_cost = cost * weights
return layers.reduce_sum(weighted_cost)
......@@ -2,9 +2,10 @@ import numpy as np
import sys
import time
import paddle.v2 as paddle
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.dataset.wmt16 as wmt16
from model import transformer, position_encoding_init
from optim import LearningRateScheduler
......@@ -12,8 +13,7 @@ from config import TrainTaskConfig, ModelHyperParams, \
pos_enc_param_names, input_data_names
def prepare_batch_input(insts, input_data_names, src_pad_idx, trg_pad_idx,
max_length, n_head, place):
def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias. Then, convert the numpy
......@@ -28,9 +28,9 @@ def prepare_batch_input(insts, input_data_names, src_pad_idx, trg_pad_idx,
return_attn_bias=True,
return_max_len=True):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias.
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias.
"""
return_list = []
max_len = max(len(inst) for inst in insts)
inst_data = np.array(
......@@ -66,13 +66,6 @@ def prepare_batch_input(insts, input_data_names, src_pad_idx, trg_pad_idx,
return_list += [max_len]
return return_list if len(return_list) > 1 else return_list[0]
def data_to_tensor(data_list, name_list, input_dict, place):
assert len(data_list) == len(name_list)
for i in range(len(name_list)):
tensor = fluid.LoDTensor()
tensor.set(data_list[i], place)
input_dict[name_list[i]] = tensor
src_word, src_pos, src_slf_attn_bias, src_max_len = __pad_batch_data(
[inst[0] for inst in insts], src_pad_idx, is_target=False)
trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = __pad_batch_data(
......@@ -83,18 +76,13 @@ def prepare_batch_input(insts, input_data_names, src_pad_idx, trg_pad_idx,
False, False, False)
lbl_weight = (lbl_word != trg_pad_idx).astype("float32").reshape([-1, 1])
data_to_tensor([
return [
src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias,
trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight
], input_data_names, input_dict, place)
return input_dict
]
def main():
place = fluid.CUDAPlace(0) if TrainTaskConfig.use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place)
cost = transformer(
ModelHyperParams.src_vocab_size + 1,
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,
......@@ -104,11 +92,8 @@ def main():
ModelHyperParams.dropout, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx)
lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model,
TrainTaskConfig.warmup_steps, place,
TrainTaskConfig.learning_rate)
optimizer = fluid.optimizer.Adam(
learning_rate=lr_scheduler.learning_rate,
learning_rate=TrainTaskConfig.learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
......@@ -121,26 +106,27 @@ def main():
buf_size=100000),
batch_size=TrainTaskConfig.batch_size)
# Initialize the parameters.
exe.run(fluid.framework.default_startup_program())
for pos_enc_param_name in pos_enc_param_names:
pos_enc_param = fluid.global_scope().find_var(
pos_enc_param_name).get_tensor()
pos_enc_param.set(
position_encoding_init(ModelHyperParams.max_length + 1,
ModelHyperParams.d_model), place)
def fn(pass_id, batch_id, data):
reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
batch_size=TrainTaskConfig.batch_size)
with fluid.recordio_writer.create_recordio_writer(
"./wmt16.recordio") as writer:
for batch in reader():
for tensor in prepare_batch_input(
batch, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head):
t = fluid.LoDTensor()
t.set(tensor, fluid.CPUPlace())
writer.append_tensor(t)
writer.complete_append_tensor()
exe = fluid.ParallelExecutor(loss_name=cost.name, use_cuda=True)
def fn(pass_id, batch_id):
t1 = time.time()
data_input = prepare_batch_input(
data, input_data_names, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.max_length,
ModelHyperParams.n_head, place)
lr_scheduler.update_learning_rate(data_input)
outs = exe.run(fluid.framework.default_main_program(),
feed=data_input,
fetch_list=[cost],
use_program_cache=True)
outs = exe.run([cost.name])
cost_val = np.array(outs[0])
print("pass_id = " + str(pass_id) + " batch = " + str(batch_id) +
" cost = " + str(cost_val))
......@@ -151,16 +137,13 @@ def main():
total_time = 0.0
count = 0
for pass_id in xrange(TrainTaskConfig.pass_num):
for batch_id, data in enumerate(train_data()):
# The current program desc is coupled with batch_size, thus all
# mini-batches must have the same number of instances currently.
if len(data) != TrainTaskConfig.batch_size:
continue
if pass_id == 0 and batch_id >= 10 and batch_id < 12:
for batch_id in xrange(10000):
if batch_id == 1:
with profiler.profiler('All', 'total', '/tmp/transformer'):
duration = fn(pass_id, batch_id, data)
duration = fn(pass_id, batch_id)
duration = fn(pass_id, batch_id)
else:
duration = fn(pass_id, batch_id, data)
duration = fn(pass_id, batch_id)
count += 1
total_time += duration
print("avg: " + str(total_time / count) + " cur: " + str(duration))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册