未验证 提交 05916d00 编写于 作者: G Guo Sheng 提交者: GitHub

Merge pull request #1307 from gongweibao/cloudtest2

Add nccl2 support
import argparse
import ast
import copy
import logging
import multiprocessing
import os
import six
import sys
import time
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.transpiler.details import program_to_code
import reader
from config import *
......@@ -97,6 +101,11 @@ def parse_args():
default='GPU',
choices=['CPU', 'GPU'],
help="The device type.")
parser.add_argument(
'--update_method',
choices=("pserver", "nccl2"),
default="pserver",
help='Update method.')
parser.add_argument(
'--sync', type=ast.literal_eval, default=True, help="sync mode.")
parser.add_argument(
......@@ -115,6 +124,11 @@ def parse_args():
type=ast.literal_eval,
default=True,
help="The flag indicating whether to use py_reader.")
parser.add_argument(
"--fetch_steps",
type=int,
default=100,
help="The frequency to fetch and print output.")
args = parser.parse_args()
# Append args related to dict
......@@ -131,6 +145,25 @@ def parse_args():
return args
def append_nccl2_prepare(trainer_id, worker_endpoints, current_endpoint):
assert (trainer_id >= 0 and len(worker_endpoints) > 1 and
current_endpoint in worker_endpoints)
eps = copy.deepcopy(worker_endpoints)
eps.remove(current_endpoint)
nccl_id_var = fluid.default_startup_program().global_block().create_var(
name="NCCLID", persistable=True, type=fluid.core.VarDesc.VarType.RAW)
fluid.default_startup_program().global_block().append_op(
type="gen_nccl_id",
inputs={},
outputs={"NCCLID": nccl_id_var},
attrs={
"endpoint": current_endpoint,
"endpoint_list": eps,
"trainer_id": trainer_id
})
return nccl_id_var
def pad_batch_data(insts,
pad_idx,
n_head,
......@@ -410,15 +443,25 @@ def test_context(exe, train_exe, dev_count):
return test
def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
token_num, predict, pyreader):
def train_loop(exe,
train_prog,
startup_prog,
dev_count,
sum_cost,
avg_cost,
token_num,
predict,
pyreader,
nccl2_num_trainers=1,
nccl2_trainer_id=0):
# Initialize the parameters.
if TrainTaskConfig.ckpt_path:
fluid.io.load_persistables(exe, TrainTaskConfig.ckpt_path)
else:
print("init fluid.framework.default_startup_program")
logging.info("init fluid.framework.default_startup_program")
exe.run(startup_prog)
logging.info("begin reader")
train_data = prepare_data_generator(
args, is_test=False, count=dev_count, pyreader=pyreader)
......@@ -431,12 +474,16 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
# use token average cost among multi-devices. and the gradient scale is
# `1 / token_number` for average cost.
# build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
logging.info("begin executor")
train_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
loss_name=avg_cost.name,
main_program=train_prog,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
exec_strategy=exec_strategy,
num_trainers=nccl2_num_trainers,
trainer_id=nccl2_trainer_id)
if args.val_file_pattern is not None:
test = test_context(exe, train_exe, dev_count)
......@@ -450,6 +497,8 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
step_idx = 0
init_flag = True
logging.info("begin train")
for pass_id in six.moves.xrange(TrainTaskConfig.pass_num):
pass_start_time = time.time()
......@@ -464,25 +513,38 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
try:
feed_dict_list = prepare_feed_dict_list(data_generator,
init_flag, dev_count)
outs = train_exe.run(
fetch_list=[sum_cost.name, token_num.name],
fetch_list=[sum_cost.name, token_num.name]
if step_idx % args.fetch_steps == 0 else [],
feed=feed_dict_list)
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[
1])
if step_idx % args.fetch_steps == 0:
sum_cost_val, token_num_val = np.array(outs[0]), np.array(
outs[1])
# sum the cost from multi-devices
total_sum_cost = sum_cost_val.sum()
total_token_num = token_num_val.sum()
total_avg_cost = total_sum_cost / total_token_num
print("step_idx: %d, epoch: %d, batch: %d, avg loss: %f, "
if step_idx == 0:
logging.info(
"step_idx: %d, epoch: %d, batch: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f" %
(step_idx, pass_id, batch_id, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)])))
avg_batch_time = time.time()
else:
logging.info(
"step_idx: %d, epoch: %d, batch: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f, speed: %.2f step/s" %
(step_idx, pass_id, batch_id, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)]),
args.fetch_steps / (time.time() - avg_batch_time)))
avg_batch_time = time.time()
if step_idx % int(TrainTaskConfig.
save_freq) == TrainTaskConfig.save_freq - 1:
if step_idx % TrainTaskConfig.save_freq == 0 and step_idx > 0:
fluid.io.save_persistables(
exe,
os.path.join(TrainTaskConfig.ckpt_dir,
......@@ -492,6 +554,7 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
os.path.join(TrainTaskConfig.model_dir,
"iter_" + str(step_idx) + ".infer.model"),
train_prog)
init_flag = False
batch_id += 1
step_idx += 1
......@@ -505,13 +568,13 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
# Validate and save the persistable.
if args.val_file_pattern is not None:
val_avg_cost, val_ppl = test()
print(
logging.info(
"epoch: %d, val avg loss: %f, val normalized loss: %f, val ppl: %f,"
" consumed %fs" % (pass_id, val_avg_cost,
val_avg_cost - loss_normalizer, val_ppl,
time_consumed))
else:
print("epoch: %d, consumed %fs" % (pass_id, time_consumed))
logging.info("epoch: %d, consumed %fs" % (pass_id, time_consumed))
if not args.enable_ce:
fluid.io.save_persistables(
exe,
......@@ -531,7 +594,7 @@ def train(args):
is_local = os.getenv("PADDLE_IS_LOCAL", "1")
if is_local == '0':
args.local = False
print(args)
logging.info(args)
if args.device == 'CPU':
TrainTaskConfig.use_gpu = False
......@@ -576,15 +639,21 @@ def train(args):
use_py_reader=args.use_py_reader,
is_test=False)
if args.local:
optimizer = None
if args.sync:
lr_decay = fluid.layers.learning_rate_scheduler.noam_decay(
ModelHyperParams.d_model, TrainTaskConfig.warmup_steps)
print("before adam")
with fluid.default_main_program()._lr_schedule_guard():
learning_rate = lr_decay * TrainTaskConfig.learning_rate
optimizer = fluid.optimizer.Adam(
learning_rate=lr_decay * TrainTaskConfig.learning_rate,
learning_rate=learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
elif args.sync == False:
else:
optimizer = fluid.optimizer.SGD(0.003)
optimizer.minimize(avg_cost)
......@@ -596,6 +665,27 @@ def train(args):
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
token_num, predict, pyreader)
else:
if args.update_method == "nccl2":
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
port = os.getenv("PADDLE_PORT")
worker_ips = os.getenv("PADDLE_TRAINERS")
worker_endpoints = []
for ip in worker_ips.split(","):
worker_endpoints.append(':'.join([ip, port]))
trainers_num = len(worker_endpoints)
current_endpoint = os.getenv("POD_IP") + ":" + port
if trainer_id == 0:
logging.info("train_id == 0, sleep 60s")
time.sleep(60)
print("trainers_num:", trainers_num)
print("worker_endpoints:", worker_endpoints)
print("current_endpoint:", current_endpoint)
append_nccl2_prepare(trainer_id, worker_endpoints, current_endpoint)
train_loop(exe,
fluid.default_main_program(), dev_count, sum_cost,
avg_cost, token_num, predict, trainers_num, trainer_id)
return
port = os.getenv("PADDLE_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVERS") # ip,ip...
eplist = []
......@@ -605,6 +695,13 @@ def train(args):
trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
print("pserver_endpoints", pserver_endpoints)
print("current_endpoint", current_endpoint)
print("trainer_id", trainer_id)
print("pserver_ips", pserver_ips)
print("port", port)
t = fluid.DistributeTranspiler()
t.transpile(
trainer_id,
......@@ -614,6 +711,7 @@ def train(args):
startup_program=startup_prog)
if training_role == "PSERVER":
logging.info("distributed: pserver started")
current_endpoint = os.getenv("POD_IP") + ":" + os.getenv(
"PADDLE_PORT")
if not current_endpoint:
......@@ -623,23 +721,37 @@ def train(args):
pserver_startup = t.get_startup_program(current_endpoint,
pserver_prog)
print("psserver begin run")
with open('pserver_startup.desc', 'w') as f:
f.write(str(pserver_startup))
with open('pserver_prog.desc', 'w') as f:
f.write(str(pserver_prog))
print("pserver start:")
program_to_code(pserver_startup)
print("pserver train:")
program_to_code(pserver_prog)
#sys.exit(0)
exe.run(pserver_startup)
exe.run(pserver_prog)
elif training_role == "TRAINER":
logging.info("distributed: trainer started")
trainer_prog = t.get_trainer_program()
with open('trainer_prog.desc', 'w') as f:
f.write(str(trainer_prog))
'''
print("trainer start:")
program_to_code(pserver_startup)
print("trainer train:")
program_to_code(trainer_prog)
sys.exit(0)
'''
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost,
avg_cost, token_num, predict, pyreader)
else:
print("environment var TRAINER_ROLE should be TRAINER os PSERVER")
logging.critical(
"environment var TRAINER_ROLE should be TRAINER os PSERVER")
exit(1)
if __name__ == "__main__":
LOG_FORMAT = "[%(asctime)s %(levelname)s %(filename)s:%(lineno)d] %(message)s"
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG, format=LOG_FORMAT)
args = parse_args()
train(args)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册