使用transpiler模式的fleet在MPI上训练,不能正常退出(paddle1.4.0,1.6.0,1.6.1),模型出core
Created by: ustcxiexk
在MPI上训练模型,使用的是fleet的transpiler模式,设置运行五个epoch,每次结束时,模型成功保存,但总是不能正常退出,然后出core,MPI任务显示失败。paddle 1.4.0,1.6.0,1.6.1尝试过,都不行。 MPI训练日志: http://10.76.118.34:8910/fileview.html?type=logsdir&path=/&instance=0.app-user-20191201165831-6668--xiexiaokang_hourly_deepwalk_ins_graph_cf_test_3_paddlecloud 错误结果: [INFO] 2019-12-01 17:18:30,278 [ train.py: 125]: step 1197: loss 0.54263 speed: 0.19838 s/step I1201 17:18:30.531389 5196 communicator.cc:173] communicator stopped, send thread exit I1201 17:18:30.532204 5197 communicator.cc:212] communicator stopped, recv thread exit
* Error in `python2': double free or corruption (!prev): 0x000000000078a5c0 *
======= Backtrace: ========= /opt/compiler/gcc-4.8.2/lib/libc.so.6(+0x7354f)[0x7f87f2ff954f]fffd15ff000-7fffd1600000 r-xp 00000000 00:00 0 [vdso] ffffffffff600000-ffffffffff601000 r-xp 00000000 00:00 0 [vsyscall] job.sh: line 85: 5115 Aborted (core dumped) python2 -u train.py --num_sample_workers $num_sampl
部分代码如下:
`def train(args): import logging log.setLevel(logging.DEBUG) log.info("start")
worker_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
num_devices = int(os.getenv("CPU_NUM", 10))
model = DeepwalkModel(args.num_nodes, args.hidden_size, args.neg_num,
args.is_sparse, args.is_distributed, 1.)
pyreader = model.pyreader
loss = model.forward()
# init fleet
init_role()
train_steps = math.ceil(args.num_nodes * args.epoch / args.batch_size /
num_devices / worker_num)
log.info("Train step: %s" % train_steps)
real_batch_size = args.batch_size * args.walk_len * args.win_size
if args.optimizer == "sgd":
args.lr *= real_batch_size
optimization(args.lr, loss, train_steps, args.optimizer)
# init and run server or worker
if fleet.is_server():
fleet.init_server(args.warm_start_from_dir)
fleet.run_server()
if fleet.is_worker():
log.info("start init worker done")
fleet.init_worker()
#just the worker, load the sample
log.info("init worker done")
exe = F.Executor(F.CPUPlace())
exe.run(fleet.startup_program)
log.info("Startup done")
if args.walkpath_files is None or args.walkpath_files == "None":
graph = build_graph(args.num_nodes, args.edge_path)
else:
graph = build_fake_graph(args.num_nodes)
log.info("Build graph done.")
# bind gen
gen_func = build_gen_func(args, graph, real_batch_size)
cur_time = time.time()
for idx, _ in enumerate(gen_func()):
log.info("iter %s: %s s" % (idx, time.time() - cur_time))
cur_time = time.time()
if idx == 100:
break
pyreader.decorate_tensor_provider(gen_func)
pyreader.start()
compiled_prog = build_complied_prog(fleet.main_program, loss)
train_prog(exe, compiled_prog, loss, pyreader, args, train_steps)
fleet.stop_worker()`
`def train_prog(exe, program, loss, node2vec_pyreader, args, train_steps): trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) step = 0 try: while step <= train_steps: try: begin_time = time.time() loss_val, = exe.run(program, fetch_list=[loss]) log.info("step %s: loss %.5f speed: %.5f s/step" % (step, np.mean(loss_val), time.time() - begin_time)) step += 1 except F.core.EOFException: node2vec_pyreader.reset()
if step % args.steps_per_save == 0 or step == train_steps:
save_path = args.save_path
if trainer_id == 0:
model_path = os.path.join(save_path, "test_3")
if not os.path.exists(save_path):
os.makedirs(save_path)
fleet.save_persistables(exe, model_path)
# try:
# # upload model when using paddlecloud
# import paddlecloud.upload_utils as upload_utils
# import shutil
# filenames = os.listdir(save_path)
# for filename in filenames:
# local_file_path = os.path.join(save_path, filename)
# remote_file_path = os.path.join("${OUTPUT_PATH}", save_path, "%s" % trainer_id)
# upload_utils.upload_to_hdfs(
# local_file_path=local_file_path,
# remote_file_path=remote_file_path)
# shutil.rmtree(local_file_path)
# except Exception as e:
# log.exception(e)
except:
return `