提交 5616974e 编写于 作者: G guosheng

Add ParallelExecutor for training, validation and saving in Transformer

上级 544ff21c
......@@ -8,6 +8,7 @@ import paddle.fluid as fluid
from model import transformer, position_encoding_init
from optim import LearningRateScheduler
from config import *
from util import save_inference_model
def pad_batch_data(insts,
......@@ -43,8 +44,8 @@ def pad_batch_data(insts,
# This is used to avoid attention on paddings and subsequent
# words.
slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, max_len))
slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape(
[-1, 1, max_len, max_len])
slf_attn_bias_data = np.triu(slf_attn_bias_data,
1).reshape([-1, 1, max_len, max_len])
slf_attn_bias_data = np.tile(slf_attn_bias_data,
[1, n_head, 1, 1]) * [-1e9]
else:
......@@ -165,61 +166,88 @@ def main():
ModelHyperParams.trg_vocab_size),
batch_size=TrainTaskConfig.batch_size)
# Initialize the parameters.
exe.run(fluid.framework.default_startup_program())
data_input_names = encoder_data_input_fields + decoder_data_input_fields[:
-1] + label_data_names
util_input_names = encoder_util_input_fields + decoder_util_input_fields
def test(exe):
test_total_cost = 0
test_total_token = 0
for batch_id, data in enumerate(val_data()):
test_data = read_multiple(reader=val_data, count=dev_count)
for batch_id, data in enumerate(test_data()):
for place_id, data_buffer in enumerate(data):
data_input_dict, util_input_dict = prepare_batch_input(
data, data_input_names, util_input_names,
data_buffer, data_input_names, util_input_names,
ModelHyperParams.eos_idx, ModelHyperParams.eos_idx,
ModelHyperParams.n_head, ModelHyperParams.d_model)
test_sum_cost, test_token_num = exe.run(
test_program,
feed=dict(data_input_dict.items() + util_input_dict.items()),
fetch_list=[sum_cost, token_num],
use_program_cache=True)
test_total_cost += test_sum_cost
test_total_token += test_token_num
local_scope = exe.executor.local_scope(place_id)
for var_name in data_input_dict:
local_scope.var(var_name).get_tensor().set(
data_input_dict[var_name], fluid.CUDAPlace(place_id))
for var_name in util_input_dict:
local_scope.var(var_name).get_tensor().set(
util_input_dict[var_name], fluid.CUDAPlace(place_id))
outs = exe.run(fetch_list=[sum_cost.name, token_num.name])
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1])
test_total_cost += sum_cost_val.sum()
test_total_token += token_num_val.sum()
test_avg_cost = test_total_cost / test_total_token
test_ppl = np.exp([min(test_avg_cost, 100)])
return test_avg_cost, test_ppl
# Initialize the parameters.
exe.run(fluid.framework.default_startup_program())
data_input_names = encoder_data_input_fields + decoder_data_input_fields[:
-1] + label_data_names
util_input_names = encoder_util_input_fields + decoder_util_input_fields
train_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
loss_name=avg_cost.name
if TrainTaskConfig.use_avg_cost else sum_cost.name)
train_data = read_multiple(reader=train_data, count=train_exe.device_count)
test_exe = fluid.ParallelExecutor(
use_cuda=True, main_program=test_program, share_vars_from=train_exe)
'''with open('./main_program.txt', 'w') as f_main:
print >> f_main, fluid.default_main_program()
with open('./startup_program.txt', 'w') as f_main:
print >> f_main, fluid.default_startup_program()
exit(0)'''
dev_count = fluid.core.get_cuda_device_count()
for pos_enc_param_name in pos_enc_param_names:
tensor = position_encoding_init(ModelHyperParams.max_length + 1,
ModelHyperParams.d_model)
for place_id in xrange(dev_count):
local_scope = train_exe.executor.local_scope(place_id)
local_scope.var(pos_enc_param_name).get_tensor().set(
tensor, fluid.CUDAPlace(place_id))
train_data = read_multiple(reader=train_data, count=dev_count)
for pass_id in xrange(TrainTaskConfig.pass_num):
pass_start_time = time.time()
for batch_id, data in enumerate(train_data()):
data_on_devices = []
lr = lr_scheduler.update_learning_rate(),
for place_id, data_buffer in enumerate(data):
data_input_dict, util_input_dict = prepare_batch_input(
data_buffer, data_input_names, util_input_names,
ModelHyperParams.eos_idx, ModelHyperParams.eos_idx,
ModelHyperParams.n_head, ModelHyperParams.d_model)
data_input_dict.update(util_input_dict)
data_input_dict.update({
lr_scheduler.learning_rate.name: lr_scheduler.update_learning_rate()
})
local_scope = train_exe.executor.local_scope(place_id)
for pos_enc_param_name in pos_enc_param_names:
tensor = position_encoding_init(ModelHyperParams.max_length + 1, ModelHyperParams.d_model)
data_input_dict[pos_enc_param_name] = tensor
local_scope.find_var(
lr_scheduler.learning_rate.name).get_tensor().set(
lr, fluid.CUDAPlace(place_id))
data_on_devices.append(data_input_dict)
for var_name in data_input_dict:
local_scope.var(var_name).get_tensor().set(
data_input_dict[var_name], fluid.CUDAPlace(place_id))
outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name], feed=data_on_devices)
for var_name in util_input_dict:
local_scope.var(var_name).get_tensor().set(
util_input_dict[var_name], fluid.CUDAPlace(place_id))
outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name])
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1])
total_sum_cost = sum_cost_val.sum(
) # sum the cost from multi devices
......@@ -229,16 +257,19 @@ def main():
(pass_id, batch_id, total_sum_cost, total_avg_cost,
np.exp([min(total_avg_cost, 100)])))
# Validate and save the model for inference.
val_avg_cost, val_ppl = test(exe)
val_avg_cost, val_ppl = test(test_exe)
pass_end_time = time.time()
time_consumed = pass_end_time - pass_start_time
print("epoch: %d, val avg loss: %f, val ppl: %f, "
"consumed %fs" % (pass_id, val_avg_cost, val_ppl, time_consumed))
fluid.io.save_inference_model(
save_program = save_inference_model(
os.path.join(TrainTaskConfig.model_dir,
"pass_" + str(pass_id) + ".infer.model"),
encoder_input_data_names + decoder_input_data_names[:-1],
[predict], exe)
[predict], train_exe)
save_exe = fluid.ParallelExecutor(
use_cuda=True, main_program=save_program, share_vars_from=train_exe)
save_exe.run(fetch_list=[])
if __name__ == "__main__":
......
import os
from paddle.fluid.framework import Program, Parameter, default_main_program, Variable
import paddle.fluid.core as core
def is_persistable(var):
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST:
return False
return var.persistable
def _clone_var_in_block_(block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=True)
def save_vars(executor,
dirname,
main_program=None,
vars=None,
predicate=None,
filename=None):
"""
Save variables to directory by executor.
:param executor: executor that save variable
:param dirname: directory path
:param main_program: program. If vars is None, then filter all variables in this
program which fit `predicate`. Default default_main_program.
:param predicate: The Predicate describes a callable that returns a variable
as a bool. If it returns true, the corresponding input variable will be saved.
:param vars: variables need to be saved. If vars is specified, program & predicate
will be ignored
:param filename: The name of a single file that all vars are saved to.
If it is None, save variables to separate files.
:return: None
"""
if vars is None:
if main_program is None:
main_program = default_main_program()
if not isinstance(main_program, Program):
raise TypeError("program should be as Program type or None")
return save_vars(
executor,
dirname=dirname,
vars=filter(predicate, main_program.list_vars()),
filename=filename)
else:
save_program = Program()
save_block = save_program.global_block()
save_var_map = {}
for each_var in vars:
# NOTE: don't save the variable which type is RAW
if each_var.type == core.VarDesc.VarType.RAW:
continue
new_var = _clone_var_in_block_(save_block, each_var)
if filename is None:
save_block.append_op(
type='save',
inputs={'X': [new_var]},
outputs={},
attrs={'file_path': os.path.join(dirname, new_var.name)})
else:
save_var_map[new_var.name] = new_var
if filename is not None:
save_var_list = []
for name in sorted(save_var_map.keys()):
save_var_list.append(save_var_map[name])
save_block.append_op(
type='save_combine',
inputs={'X': save_var_list},
outputs={},
attrs={'file_path': os.path.join(dirname, filename)})
#print save_program
return save_program #executor.run(fetch_list=[])
def save_persistables(executor, dirname, main_program=None, filename=None):
"""
Save all persistables to directory with executor.
"""
save_program = save_vars(
executor,
dirname=dirname,
main_program=main_program,
vars=None,
predicate=is_persistable,
filename=filename)
#print save_program
return save_program
def prepend_feed_ops(inference_program,
feed_target_names,
feed_holder_name='feed'):
global_block = inference_program.global_block()
feed_var = global_block.create_var(
name=feed_holder_name,
type=core.VarDesc.VarType.FEED_MINIBATCH,
persistable=True)
for i, name in enumerate(feed_target_names):
out = global_block.var(name)
global_block.prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
attrs={'col': i})
def append_fetch_ops(inference_program,
fetch_target_names,
fetch_holder_name='fetch'):
global_block = inference_program.global_block()
fetch_var = global_block.create_var(
name=fetch_holder_name,
type=core.VarDesc.VarType.FETCH_LIST,
persistable=True)
for i, name in enumerate(fetch_target_names):
global_block.append_op(
type='fetch',
inputs={'X': [name]},
outputs={'Out': [fetch_var]},
attrs={'col': i})
def save_inference_model(dirname,
feeded_var_names,
target_vars,
executor,
main_program=None,
model_filename=None,
params_filename=None):
"""
Build a model especially for inference,
and save it to directory by the executor.
:param dirname: directory path
:param feeded_var_names: Names of variables that need to be feeded data during inference
:param target_vars: Variables from which we can get inference results.
:param executor: executor that save inference model
:param main_program: original program, which will be pruned to build the inference model.
Default default_main_program().
:param model_filename: The name of file to save inference program.
If not specified, default filename `__model__` will be used.
:param params_filename: The name of file to save parameters.
It is used for the case that all parameters are saved in a single binary file.
If not specified, parameters are considered saved in separate files.
:return: None
"""
if isinstance(feeded_var_names, basestring):
feeded_var_names = [feeded_var_names]
else:
if not (bool(feeded_var_names) and all(
isinstance(name, basestring) for name in feeded_var_names)):
raise ValueError("'feed_var_names' should be a list of str.")
if isinstance(target_vars, Variable):
target_vars = [target_vars]
else:
if not (bool(target_vars) and
all(isinstance(var, Variable) for var in target_vars)):
raise ValueError("'target_vars' should be a list of Variable.")
if main_program is None:
main_program = default_main_program()
if not os.path.isdir(dirname):
os.makedirs(dirname)
pruned_program = main_program.prune(targets=target_vars)
inference_program = pruned_program.inference_optimize()
fetch_var_names = [v.name for v in target_vars]
prepend_feed_ops(inference_program, feeded_var_names)
append_fetch_ops(inference_program, fetch_var_names)
if model_filename is not None:
model_filename = os.path.basename(model_filename)
else:
model_filename = "__model__"
model_filename = os.path.join(dirname, model_filename)
if params_filename is not None:
params_filename = os.path.basename(params_filename)
with open(model_filename, "wb") as f:
f.write(inference_program.desc.serialize_to_string())
return save_persistables(executor, dirname, inference_program,
params_filename)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册