提交 fc212827 编写于 作者: T tangwei12

rewrite with parallelexecutor and py_reader

上级 19e0d603
......@@ -83,25 +83,44 @@ def skip_gram_word2vec(dict_size,
return cost
input_word = fluid.layers.data(name="input_word", shape=[1], dtype='int64')
predict_word = fluid.layers.data(name='predict_word', shape=[1], dtype='int64')
data_shapes = []
data_lod_levels = []
data_types = []
# input_word
data_shapes.append((-1, 1))
data_lod_levels.append(1)
data_types.append('int64')
# predict_word
data_shapes.append((-1, 1))
data_lod_levels.append(1)
data_types.append('int64')
py_reader = fluid.layers.py_reader(capacity=64,
shapes=data_shapes,
lod_levels=data_lod_levels,
dtypes=data_types,
name='py_reader',
use_double_buffer=True)
word_and_label = fluid.layers.read_file(py_reader)
cost = None
data_list = [input_word, predict_word]
emb = fluid.layers.embedding(
input=input_word,
input=word_and_label[0],
is_sparse=is_sparse,
size=[dict_size, embedding_size],
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(dict_size))))
if with_nce:
cost = nce_layer(emb, predict_word, embedding_size, dict_size, 5,
cost = nce_layer(emb, word_and_label[1], embedding_size, dict_size, 5,
"uniform", word_frequencys, None)
if with_hsigmoid:
cost = hsigmoid_layer(emb, predict_word, dict_size, max_code_length,
data_list)
cost = hsigmoid_layer(emb, word_and_label[1], dict_size, max_code_length,
None)
avg_cost = fluid.layers.reduce_mean(cost)
return avg_cost, data_list
return avg_cost, py_reader
......@@ -5,6 +5,8 @@ import logging
import os
import time
import numpy as np
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
......@@ -118,42 +120,81 @@ def parse_args():
return parser.parse_args()
def train_loop(args, train_program, reader, data_list, loss, trainer_num,
trainer_id):
def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
train_reader = paddle.batch(
paddle.reader.shuffle(
reader.train((args.with_hs or (not args.with_nce))),
buf_size=args.batch_size * 100),
batch_size=args.batch_size)
place = fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=data_list, place=place)
py_reader.decorate_paddle_reader(train_reader)
place = fluid.CPUPlace()
data_name_list = [var.name for var in data_list]
data_name_list = None
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
start = time.clock()
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
train_exe = fluid.ParallelExecutor(
use_cuda=False,
loss_name=loss.name,
main_program=train_program,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
profile_state = "CPU"
profiler_step = 0
profiler_step_start = 20
profiler_step_end = 30
for pass_id in range(args.num_passes):
for batch_id, data in enumerate(train_reader()):
loss_val = exe.run(train_program,
feed=feeder.feed(data),
fetch_list=[loss])
if batch_id % 10 == 0:
logger.info("TRAIN --> pass: {} batch: {} loss: {}".format(
pass_id, batch_id, loss_val[0] / args.batch_size))
if batch_id % 1000 == 0 and batch_id != 0:
elapsed = (time.clock() - start)
logger.info("Time used: {}".format(elapsed))
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list,
[loss], exe)
model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
epoch_start = time.time()
py_reader.start()
batch_id = 0
try:
while True:
if profiler_step == profiler_step_start:
fluid.profiler.start_profiler(profile_state)
loss_val = train_exe.run(fetch_list=[loss.name])
loss_val = np.mean(loss_val)
if profiler_step == profiler_step_end:
fluid.profiler.stop_profiler('total', 'trainer_profile.log')
profiler_step += 1
else:
profiler_step += 1
if batch_id % 10 == 0:
logger.info("TRAIN --> pass: {} batch: {} loss: {}".format(
pass_id, batch_id, loss_val.mean() / args.batch_size))
if batch_id % 1000 == 0 and batch_id != 0:
elapsed = (time.clock() - start)
logger.info("Time used: {}".format(elapsed))
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
batch_id += 1
except fluid.core.EOFException:
py_reader.reset()
epoch_end = time.time()
print("Epoch: {0}, Train total expend: {1} ".format(pass_id, epoch_end - epoch_start))
model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
def train():
......@@ -167,7 +208,7 @@ def train():
logger.info("dict_size: {}".format(word2vec_reader.dict_size))
loss, data_list = skip_gram_word2vec(
loss, py_reader = skip_gram_word2vec(
word2vec_reader.dict_size, word2vec_reader.word_frequencys,
args.embedding_size, args.max_code_length,
args.with_hs, args.with_nce, is_sparse=args.is_sparse)
......@@ -178,7 +219,7 @@ def train():
if args.is_local:
logger.info("run local training")
main_program = fluid.default_main_program()
train_loop(args, main_program, word2vec_reader, data_list, loss, 1, -1)
train_loop(args, main_program, word2vec_reader, py_reader, loss, 0)
else:
logger.info("run dist training")
t = fluid.DistributeTranspiler()
......@@ -195,8 +236,7 @@ def train():
elif args.role == "trainer":
logger.info("run trainer")
train_prog = t.get_trainer_program()
train_loop(args, train_prog, word2vec_reader, data_list, loss,
args.trainers, args.trainer_id + 1)
train_loop(args, train_prog, word2vec_reader, py_reader, loss, args.trainer_id)
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册