未验证 提交 3a2a7116 编写于 作者: D danleifeng 提交者: GitHub

【paddle.fleet】simplify fleetrun log infos (#26888)

* print detailed and clear log infos; test=develop
上级 e35ad3ee
......@@ -200,11 +200,11 @@ def launch_collective(args):
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus, start_port)
logger.info("get cluster from cloud:{}".format(cluster))
logger.debug("get cluster from cloud:{}".format(cluster))
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = get_cluster_from_args(args, gpus)
logger.info("get cluster from args:{}".format(cluster))
logger.debug("get cluster from args:{}".format(cluster))
procs = start_local_trainers(
cluster,
......@@ -217,7 +217,8 @@ def launch_collective(args):
alive = watch_local_trainers(procs, cluster.trainers_nranks())
if not alive:
logger.info("Local procs complete, POD info:{}".format(pod))
logger.info("Local processes completed.")
logger.debug("POD info:{}".format(pod))
break
time.sleep(3)
......@@ -313,18 +314,26 @@ def launch_ps(args):
cmds = []
log_fns = []
for idx, cur_server in enumerate(pod.servers):
current_env.update({
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_PORT": cur_server.endpoint.split(":")[1],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": cur_server.endpoint.split(":")[0]
})
}
current_env.update(proc_env)
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if idx == 0:
logger.info(
"Local server start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.servers),
pretty_print_envs(proc_env, ("Distributed Envs", "Value"))))
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w")
......@@ -338,21 +347,32 @@ def launch_ps(args):
tp.rank = cur_server.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = 0 if fn else None
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
procs.append(tp)
for idx, cur_worker in enumerate(pod.workers):
current_env.update({
proc_env = {
"PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints,
"PADDLE_TRAINER_ENDPOINTS": worker_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(cur_worker.rank)
})
}
current_env.update(proc_env)
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if idx == 0:
logger.info(
"Local worker start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.workers),
pretty_print_envs(proc_env, ("Distributed Envs", "Value"))))
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w")
......@@ -366,11 +386,14 @@ def launch_ps(args):
tp.rank = cur_worker.rank
tp.local_rank = idx
tp.log_fn = fn
tp.log_offset = 0 if fn else None
tp.log_offset = fn.tell() if fn else None
tp.cmd = cmd
procs.append(tp)
logger.info(
"Please check servers and workers logs in {}/workerlog.* and {}/serverlog.*".
format(args.log_dir, args.log_dir))
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < len(pod.servers):
......@@ -403,16 +426,16 @@ def launch():
cuda_device_num = fluid.core.get_cuda_device_count()
if len(has_ps_args) > 0 or cuda_device_num == 0:
logger.info(
"Run parameter-sever cpu mode. pserver args:{}, cuda count:{}".
"Run parameter-sever cpu mode. pserver arguments:{}, cuda count:{}".
format(has_ps_args, cuda_device_num))
launch_ps(args)
elif len(has_collective_args) > 0:
logger.info("Run collective gpu mode. gpu args:{}, cuda count:{}".
logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}".
format(has_collective_args, cuda_device_num))
launch_collective(args)
else:
logger.warning(
"Not found distinct args. Default use gpu collective mode")
"Not found distinct arguments. Default use gpu collective mode")
launch_collective(args)
......
......@@ -253,7 +253,8 @@ def terminate_local_procs(procs):
for p in procs:
if p.proc.poll() is None:
p.proc.terminate()
p.log_fn.close()
if p.log_fn:
p.log_fn.close()
logger.debug("terminate process id:{}".format(p.proc.pid))
#wait all process terminiated
......@@ -338,6 +339,45 @@ def get_ports(num, offset):
return ports
def pretty_print_envs(envs, header=None):
spacing = 2
max_k = 40
max_v = 45
for k, v in envs.items():
max_k = max(max_k, len(k))
h_format = "{{:^{}s}}{}{{:<{}s}}\n".format(max_k, " " * spacing, max_v)
l_format = "{{:<{}s}}{{}}{{:<{}s}}\n".format(max_k, max_v)
length = max_k + max_v + spacing
border = "".join(["="] * length)
line = "".join(["-"] * length)
draws = ""
draws += border + "\n"
if header:
draws += h_format.format(header[0], header[1])
else:
draws += h_format.format("fleetrun Distributed Envs", "Value")
draws += line + "\n"
for k, v in envs.items():
if isinstance(v, str) and len(v) >= max_v:
str_v = "... " + v[-41:]
else:
str_v = v
draws += l_format.format(k, " " * spacing, str(str_v))
draws += border
_str = "\n{}\n".format(draws)
return _str
class TrainerProc(object):
def __init__(self):
self.proc = None
......@@ -373,11 +413,19 @@ def start_local_trainers(cluster,
current_env.update(proc_env)
logger.debug("trainer proc env:{}".format(current_env))
cmd = [sys.executable, "-u", training_script] + training_script_args
logger.info("start trainer proc:{} env:{}".format(cmd, proc_env))
logger.debug("start trainer proc{} env:{}".format(cmd, current_env))
if idx == 0:
logger.info("Local start {} processes. First process distributed "
"environment info (Only For Debug): {}".format(
len(pod.trainers),
pretty_print_envs(proc_env, ("Distributed Envs",
"Value"))))
logger.info(
"More details for debug about commands and environments are written in {}/run.sh".
format(log_dir))
fn = None
if log_dir is not None:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册