提交 87c5cf65 编写于 作者: Q Qiao Longfei

distributed trainning for transformer

上级 8761ab3d
...@@ -3,6 +3,7 @@ import time ...@@ -3,6 +3,7 @@ import time
import argparse import argparse
import ast import ast
import numpy as np import numpy as np
import multiprocessing
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -80,6 +81,18 @@ def parse_args(): ...@@ -80,6 +81,18 @@ def parse_args():
help='See config.py for all options', help='See config.py for all options',
default=None, default=None,
nargs=argparse.REMAINDER) nargs=argparse.REMAINDER)
parser.add_argument(
'--local',
type=ast.literal_eval,
default=True,
help='Whether to run as local mode.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help="The device type.")
args = parser.parse_args() args = parser.parse_args()
# Append args related to dict # Append args related to dict
src_dict = reader.DataReader.load_dict(args.src_vocab_fpath) src_dict = reader.DataReader.load_dict(args.src_vocab_fpath)
...@@ -205,50 +218,26 @@ def prepare_batch_input(insts, data_input_names, util_input_names, src_pad_idx, ...@@ -205,50 +218,26 @@ def prepare_batch_input(insts, data_input_names, util_input_names, src_pad_idx,
[num_token], dtype="float32") [num_token], dtype="float32")
def read_multiple(reader, count, clip_last=True): def train(args):
""" # priority: ENV > args > config
Stack data from reader for multi-devices. is_local = os.getenv("PADDLE_IS_LOCAL", "1")
""" if is_local == '0':
args.local = False
print args
def __impl__(): if args.device == 'CPU':
res = [] TrainTaskConfig.use_gpu = False
for item in reader():
res.append(item)
if len(res) == count:
yield res
res = []
if len(res) == count:
yield res
elif not clip_last:
data = []
for item in res:
data += item
if len(data) > count:
inst_num_per_part = len(data) // count
yield [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(count)
]
return __impl__
def split_data(data, num_part):
"""
Split data for each device.
"""
if len(data) == num_part:
return data
data = data[0]
inst_num_per_part = len(data) // num_part
return [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(num_part)
]
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
def train(args): if training_role == "PSERVER" or (not TrainTaskConfig.use_gpu):
dev_count = fluid.core.get_cuda_device_count() place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
else:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
exe = fluid.Executor(place)
sum_cost, avg_cost, predict, token_num = transformer( sum_cost, avg_cost, predict, token_num = transformer(
ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size, ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size,
...@@ -257,167 +246,267 @@ def train(args): ...@@ -257,167 +246,267 @@ def train(args):
ModelHyperParams.d_value, ModelHyperParams.d_model, ModelHyperParams.d_value, ModelHyperParams.d_model,
ModelHyperParams.d_inner_hid, ModelHyperParams.dropout, ModelHyperParams.d_inner_hid, ModelHyperParams.dropout,
ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps) ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps)
if args.local:
lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model, lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model,
TrainTaskConfig.warmup_steps, TrainTaskConfig.warmup_steps,
TrainTaskConfig.learning_rate) TrainTaskConfig.learning_rate)
optimizer = fluid.optimizer.Adam( optimizer = fluid.optimizer.Adam(
learning_rate=lr_scheduler.learning_rate, learning_rate=lr_scheduler.learning_rate,
beta1=TrainTaskConfig.beta1, beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2, beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps) epsilon=TrainTaskConfig.eps)
optimizer.minimize(sum_cost) optimizer.minimize(sum_cost)
place = fluid.CUDAPlace(0) if TrainTaskConfig.use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place)
# Initialize the parameters.
if TrainTaskConfig.ckpt_path:
fluid.io.load_persistables(exe, TrainTaskConfig.ckpt_path)
lr_scheduler.current_steps = TrainTaskConfig.start_step
else: else:
exe.run(fluid.framework.default_startup_program()) lr_decay = fluid.layers\
.learning_rate_scheduler\
train_data = reader.DataReader( .noam_decay(ModelHyperParams.d_model,
src_vocab_fpath=args.src_vocab_fpath, TrainTaskConfig.warmup_steps)
trg_vocab_fpath=args.trg_vocab_fpath,
fpattern=args.train_file_pattern, optimizer = fluid.optimizer.Adam(
use_token_batch=args.use_token_batch, learning_rate=lr_decay,
batch_size=args.batch_size * (1 if args.use_token_batch else dev_count), beta1=TrainTaskConfig.beta1,
pool_size=args.pool_size, beta2=TrainTaskConfig.beta2,
sort_type=args.sort_type, epsilon=TrainTaskConfig.eps)
shuffle=args.shuffle, optimizer.minimize(sum_cost)
shuffle_batch=args.shuffle_batch,
start_mark=args.special_token[0], def train_loop(exe, train_progm):
end_mark=args.special_token[1], def read_multiple(reader,
unk_mark=args.special_token[2], count=dev_count if args.use_token_batch else 1,
max_length=ModelHyperParams.max_length, clip_last=True):
clip_last_batch=False) """
train_data = read_multiple( Stack data from reader for multi-devices.
reader=train_data.batch_generator, """
count=dev_count if args.use_token_batch else 1)
def __impl__():
build_strategy = fluid.BuildStrategy() res = []
# Since the token number differs among devices, customize gradient scale to for item in reader():
# use token average cost among multi-devices. and the gradient scale is res.append(item)
# `1 / token_number` for average cost. if len(res) == count:
build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized yield res
train_exe = fluid.ParallelExecutor( res = []
use_cuda=TrainTaskConfig.use_gpu, if len(res) == count:
loss_name=sum_cost.name, yield res
build_strategy=build_strategy) elif not clip_last:
data = []
def test_context(): for item in res:
# Context to do validation. data += item
test_program = fluid.default_main_program().clone(for_test=True) if len(data) > count:
test_exe = fluid.ParallelExecutor( inst_num_per_part = len(data) // count
use_cuda=TrainTaskConfig.use_gpu, yield [
main_program=test_program, data[inst_num_per_part * i:inst_num_per_part * (i +
share_vars_from=train_exe) 1)]
for i in range(count)
]
return __impl__
def split_data(data, num_part=dev_count):
"""
Split data for each device.
"""
if len(data) == num_part:
return data
data = data[0]
inst_num_per_part = len(data) // num_part
return [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(num_part)
]
# Initialize the parameters.
if TrainTaskConfig.ckpt_path:
fluid.io.load_persistables(exe, TrainTaskConfig.ckpt_path)
#lr_scheduler.current_steps = TrainTaskConfig.start_step
else:
print "init fluid.framework.default_startup_program"
exe.run(fluid.framework.default_startup_program())
val_data = reader.DataReader( train_data = reader.DataReader(
src_vocab_fpath=args.src_vocab_fpath, src_vocab_fpath=args.src_vocab_fpath,
trg_vocab_fpath=args.trg_vocab_fpath, trg_vocab_fpath=args.trg_vocab_fpath,
fpattern=args.val_file_pattern, fpattern=args.train_file_pattern,
use_token_batch=args.use_token_batch, use_token_batch=args.use_token_batch,
batch_size=args.batch_size * batch_size=args.batch_size *
(1 if args.use_token_batch else dev_count), (1 if args.use_token_batch else dev_count),
pool_size=args.pool_size, pool_size=args.pool_size,
sort_type=args.sort_type, sort_type=args.sort_type,
shuffle=args.shuffle,
shuffle_batch=args.shuffle_batch,
start_mark=args.special_token[0], start_mark=args.special_token[0],
end_mark=args.special_token[1], end_mark=args.special_token[1],
unk_mark=args.special_token[2], unk_mark=args.special_token[2],
max_length=ModelHyperParams.max_length, clip_last_batch=False)
clip_last_batch=False,
shuffle=False, train_data = read_multiple(reader=train_data.batch_generator)
shuffle_batch=False) build_strategy = fluid.BuildStrategy()
# Since the token number differs among devices, customize gradient scale to
def test(exe=test_exe): # use token average cost among multi-devices. and the gradient scale is
test_total_cost = 0 # `1 / token_number` for average cost.
test_total_token = 0 build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
test_data = read_multiple( train_exe = fluid.ParallelExecutor(
reader=val_data.batch_generator, use_cuda=TrainTaskConfig.use_gpu,
count=dev_count if args.use_token_batch else 1) loss_name=sum_cost.name,
for batch_id, data in enumerate(test_data()): main_program=train_progm,
build_strategy=build_strategy)
def test_context():
# Context to do validation.
test_program = train_progm.clone()
with fluid.program_guard(test_program):
test_program = fluid.io.get_inference_program([avg_cost])
val_data = reader.DataReader(
src_vocab_fpath=args.src_vocab_fpath,
trg_vocab_fpath=args.trg_vocab_fpath,
fpattern=args.val_file_pattern,
use_token_batch=args.use_token_batch,
batch_size=args.batch_size *
(1 if args.use_token_batch else dev_count),
pool_size=args.pool_size,
sort_type=args.sort_type,
start_mark=args.special_token[0],
end_mark=args.special_token[1],
unk_mark=args.special_token[2],
clip_last_batch=False,
shuffle=False,
shuffle_batch=False)
test_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
main_program=test_program,
share_vars_from=train_exe)
def test(exe=test_exe):
test_total_cost = 0
test_total_token = 0
test_data = read_multiple(reader=val_data.batch_generator)
for batch_id, data in enumerate(test_data()):
feed_list = []
for place_id, data_buffer in enumerate(split_data(data)):
data_input_dict, util_input_dict, _ = prepare_batch_input(
data_buffer, data_input_names, util_input_names,
ModelHyperParams.eos_idx, ModelHyperParams.eos_idx,
ModelHyperParams.n_head, ModelHyperParams.d_model)
feed_list.append(
dict(data_input_dict.items() +
util_input_dict.items()))
outs = exe.run(feed=feed_list,
fetch_list=[sum_cost.name, token_num.name])
sum_cost_val, token_num_val = np.array(outs[0]), np.array(
outs[1])
test_total_cost += sum_cost_val.sum()
test_total_token += token_num_val.sum()
test_avg_cost = test_total_cost / test_total_token
test_ppl = np.exp([min(test_avg_cost, 100)])
return test_avg_cost, test_ppl
return test
if args.val_file_pattern is not None:
test = test_context()
data_input_names = encoder_data_input_fields + decoder_data_input_fields[:
-1] + label_data_input_fields
util_input_names = encoder_util_input_fields + decoder_util_input_fields
init = False
for pass_id in xrange(TrainTaskConfig.pass_num):
pass_start_time = time.time()
for batch_id, data in enumerate(train_data()):
feed_list = [] feed_list = []
for place_id, data_buffer in enumerate( total_num_token = 0
split_data( for place_id, data_buffer in enumerate(split_data(data)):
data, num_part=dev_count)): data_input_dict, util_input_dict, num_token = prepare_batch_input(
data_input_dict, util_input_dict, _ = prepare_batch_input(
data_buffer, data_input_names, util_input_names, data_buffer, data_input_names, util_input_names,
ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, ModelHyperParams.eos_idx,
ModelHyperParams.n_head, ModelHyperParams.d_model) ModelHyperParams.n_head, ModelHyperParams.d_model)
feed_list.append( total_num_token += num_token
dict(data_input_dict.items() + util_input_dict.items())) feed_kv_pairs = data_input_dict.items(
) + util_input_dict.items()
outs = exe.run(feed=feed_list, if args.local:
fetch_list=[sum_cost.name, token_num.name]) lr_rate = lr_scheduler.update_learning_rate()
feed_kv_pairs += {
lr_scheduler.learning_rate.name: lr_rate
}.items()
feed_list.append(dict(feed_kv_pairs))
if not init:
for pos_enc_param_name in pos_enc_param_names:
pos_enc = position_encoding_init(
ModelHyperParams.max_length + 1,
ModelHyperParams.d_model)
feed_list[place_id][pos_enc_param_name] = pos_enc
for feed_dict in feed_list:
feed_dict[sum_cost.name + "@GRAD"] = 1. / total_num_token
outs = train_exe.run(
fetch_list=[sum_cost.name, token_num.name], feed=feed_list)
train_exe.bcast_params()
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[ sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[
1]) 1])
test_total_cost += sum_cost_val.sum() total_sum_cost = sum_cost_val.sum(
test_total_token += token_num_val.sum() ) # sum the cost from multi-devices
test_avg_cost = test_total_cost / test_total_token total_token_num = token_num_val.sum()
test_ppl = np.exp([min(test_avg_cost, 100)]) total_avg_cost = total_sum_cost / total_token_num
return test_avg_cost, test_ppl print(
"epoch: %d, batch: %d, sum loss: %f, avg loss: %f, ppl: %f"
return test % (pass_id, batch_id, total_sum_cost, total_avg_cost,
np.exp([min(total_avg_cost, 100)])))
if args.val_file_pattern is not None: init = True
test = test_context() # Validate and save the model for inference.
print("epoch: %d, " % pass_id + (
data_input_names = encoder_data_input_fields + decoder_data_input_fields[: "val avg loss: %f, val ppl: %f, " % test()
-1] + label_data_input_fields if args.val_file_pattern is not None else "") + "consumed %fs" %
util_input_names = encoder_util_input_fields + decoder_util_input_fields (time.time() - pass_start_time))
init = False fluid.io.save_persistables(
for pass_id in xrange(TrainTaskConfig.pass_num): exe,
pass_start_time = time.time() os.path.join(TrainTaskConfig.ckpt_dir,
for batch_id, data in enumerate(train_data()): "pass_" + str(pass_id) + ".checkpoint"))
feed_list = [] fluid.io.save_inference_model(
total_num_token = 0 os.path.join(TrainTaskConfig.model_dir,
lr_rate = lr_scheduler.update_learning_rate() "pass_" + str(pass_id) + ".infer.model"),
for place_id, data_buffer in enumerate( data_input_names[:-2] + util_input_names, [predict], exe)
split_data(
data, num_part=dev_count)): if args.local:
data_input_dict, util_input_dict, num_token = prepare_batch_input( print("local start_up:")
data_buffer, data_input_names, util_input_names, train_loop(exe, fluid.default_main_program())
ModelHyperParams.eos_idx, ModelHyperParams.eos_idx, else:
ModelHyperParams.n_head, ModelHyperParams.d_model) port = os.getenv("PADDLE_PORT", "6174")
total_num_token += num_token pserver_ips = os.getenv("PADDLE_PSERVERS") # ip,ip...
feed_list.append( eplist = []
dict(data_input_dict.items() + util_input_dict.items() + for ip in pserver_ips.split(","):
{lr_scheduler.learning_rate.name: lr_rate}.items())) eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist) # ip:port,ip:port...
if not init: # init the position encoding table trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
for pos_enc_param_name in pos_enc_param_names: current_endpoint = os.getenv("POD_IP") + ":" + port
pos_enc = position_encoding_init( trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
ModelHyperParams.max_length + 1, t = fluid.DistributeTranspiler()
ModelHyperParams.d_model) t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
feed_list[place_id][pos_enc_param_name] = pos_enc
for feed_dict in feed_list: if training_role == "PSERVER":
feed_dict[sum_cost.name + "@GRAD"] = 1. / total_num_token current_endpoint = os.getenv("POD_IP") + ":" + os.getenv(
outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name], "PADDLE_PORT")
feed=feed_list) if not current_endpoint:
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) print("need env SERVER_ENDPOINT")
total_sum_cost = sum_cost_val.sum( exit(1)
) # sum the cost from multi-devices pserver_prog = t.get_pserver_program(current_endpoint)
total_token_num = token_num_val.sum() pserver_startup = t.get_startup_program(current_endpoint,
total_avg_cost = total_sum_cost / total_token_num pserver_prog)
print("epoch: %d, batch: %d, sum loss: %f, avg loss: %f, ppl: %f" %
(pass_id, batch_id, total_sum_cost, total_avg_cost, print "psserver begin run"
np.exp([min(total_avg_cost, 100)]))) with open('pserver_startup.desc', 'w') as f:
init = True f.write(str(pserver_startup))
# Validate and save the model for inference. with open('pserver_prog.desc', 'w') as f:
print("epoch: %d, " % pass_id + ( f.write(str(pserver_prog))
"val avg loss: %f, val ppl: %f, " % test() exe.run(pserver_startup)
if args.val_file_pattern is not None else "") + "consumed %fs" % ( exe.run(pserver_prog)
time.time() - pass_start_time)) elif training_role == "TRAINER":
fluid.io.save_persistables(
exe, trainer_prog = t.get_trainer_program()
os.path.join(TrainTaskConfig.ckpt_dir, with open('trainer_prog.desc', 'w') as f:
"pass_" + str(pass_id) + ".checkpoint")) f.write(str(trainer_prog))
fluid.io.save_inference_model( train_loop(exe, trainer_prog)
os.path.join(TrainTaskConfig.model_dir, else:
"pass_" + str(pass_id) + ".infer.model"), print("environment var TRAINER_ROLE should be TRAINER os PSERVER")
data_input_names[:-2] + util_input_names, [predict], exe)
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册