From 3a2a7116816f9f2747d0c2a8194d01aa8047894c Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Thu, 3 Sep 2020 23:50:13 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90paddle.fleet=E3=80=91simplify=20fleetr?= =?UTF-8?q?un=20log=20infos=20(#26888)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * print detailed and clear log infos; test=develop --- python/paddle/distributed/fleet/launch.py | 47 ++++++++++++---- .../paddle/distributed/fleet/launch_utils.py | 56 +++++++++++++++++-- 2 files changed, 87 insertions(+), 16 deletions(-) diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index 29a1bda92f1..7778acaf83b 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -200,11 +200,11 @@ def launch_collective(args): start_port = os.environ.get('FLAGS_START_PORT') if cloud_utils.use_paddlecloud() and trainers_num != 1: cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus, start_port) - logger.info("get cluster from cloud:{}".format(cluster)) + logger.debug("get cluster from cloud:{}".format(cluster)) else: # trainers_num = 1 or not use paddlecloud ips="a,b" cluster, pod = get_cluster_from_args(args, gpus) - logger.info("get cluster from args:{}".format(cluster)) + logger.debug("get cluster from args:{}".format(cluster)) procs = start_local_trainers( cluster, @@ -217,7 +217,8 @@ def launch_collective(args): alive = watch_local_trainers(procs, cluster.trainers_nranks()) if not alive: - logger.info("Local procs complete, POD info:{}".format(pod)) + logger.info("Local processes completed.") + logger.debug("POD info:{}".format(pod)) break time.sleep(3) @@ -313,18 +314,26 @@ def launch_ps(args): cmds = [] log_fns = [] for idx, cur_server in enumerate(pod.servers): - current_env.update({ + proc_env = { "PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints, "PADDLE_PORT": cur_server.endpoint.split(":")[1], "TRAINING_ROLE": "PSERVER", "PADDLE_TRAINERS_NUM": str(worker_num), "POD_IP": cur_server.endpoint.split(":")[0] - }) + } + current_env.update(proc_env) cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args cmds.append(cmd) + if idx == 0: + logger.info( + "Local server start {} processes. First process distributed " + "environment info (Only For Debug): {}".format( + len(pod.servers), + pretty_print_envs(proc_env, ("Distributed Envs", "Value")))) + if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w") @@ -338,21 +347,32 @@ def launch_ps(args): tp.rank = cur_server.rank tp.local_rank = idx tp.log_fn = fn - tp.log_offset = 0 if fn else None + tp.log_offset = fn.tell() if fn else None tp.cmd = cmd procs.append(tp) for idx, cur_worker in enumerate(pod.workers): - current_env.update({ + proc_env = { "PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints, + "PADDLE_TRAINER_ENDPOINTS": worker_endpoints, "PADDLE_TRAINERS_NUM": str(worker_num), "TRAINING_ROLE": "TRAINER", "PADDLE_TRAINER_ID": str(cur_worker.rank) - }) + } + current_env.update(proc_env) + cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args cmds.append(cmd) + + if idx == 0: + logger.info( + "Local worker start {} processes. First process distributed " + "environment info (Only For Debug): {}".format( + len(pod.workers), + pretty_print_envs(proc_env, ("Distributed Envs", "Value")))) + if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w") @@ -366,11 +386,14 @@ def launch_ps(args): tp.rank = cur_worker.rank tp.local_rank = idx tp.log_fn = fn - tp.log_offset = 0 if fn else None + tp.log_offset = fn.tell() if fn else None tp.cmd = cmd procs.append(tp) + logger.info( + "Please check servers and workers logs in {}/workerlog.* and {}/serverlog.*". + format(args.log_dir, args.log_dir)) # only wait worker to finish here for i, proc in enumerate(procs): if i < len(pod.servers): @@ -403,16 +426,16 @@ def launch(): cuda_device_num = fluid.core.get_cuda_device_count() if len(has_ps_args) > 0 or cuda_device_num == 0: logger.info( - "Run parameter-sever cpu mode. pserver args:{}, cuda count:{}". + "Run parameter-sever cpu mode. pserver arguments:{}, cuda count:{}". format(has_ps_args, cuda_device_num)) launch_ps(args) elif len(has_collective_args) > 0: - logger.info("Run collective gpu mode. gpu args:{}, cuda count:{}". + logger.info("Run collective gpu mode. gpu arguments:{}, cuda count:{}". format(has_collective_args, cuda_device_num)) launch_collective(args) else: logger.warning( - "Not found distinct args. Default use gpu collective mode") + "Not found distinct arguments. Default use gpu collective mode") launch_collective(args) diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 350d8ae2b44..3da5aed8201 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -253,7 +253,8 @@ def terminate_local_procs(procs): for p in procs: if p.proc.poll() is None: p.proc.terminate() - p.log_fn.close() + if p.log_fn: + p.log_fn.close() logger.debug("terminate process id:{}".format(p.proc.pid)) #wait all process terminiated @@ -338,6 +339,45 @@ def get_ports(num, offset): return ports +def pretty_print_envs(envs, header=None): + spacing = 2 + max_k = 40 + max_v = 45 + + for k, v in envs.items(): + max_k = max(max_k, len(k)) + + h_format = "{{:^{}s}}{}{{:<{}s}}\n".format(max_k, " " * spacing, max_v) + l_format = "{{:<{}s}}{{}}{{:<{}s}}\n".format(max_k, max_v) + length = max_k + max_v + spacing + + border = "".join(["="] * length) + line = "".join(["-"] * length) + + draws = "" + draws += border + "\n" + + if header: + draws += h_format.format(header[0], header[1]) + else: + draws += h_format.format("fleetrun Distributed Envs", "Value") + + draws += line + "\n" + + for k, v in envs.items(): + if isinstance(v, str) and len(v) >= max_v: + str_v = "... " + v[-41:] + else: + str_v = v + + draws += l_format.format(k, " " * spacing, str(str_v)) + + draws += border + + _str = "\n{}\n".format(draws) + return _str + + class TrainerProc(object): def __init__(self): self.proc = None @@ -373,11 +413,19 @@ def start_local_trainers(cluster, current_env.update(proc_env) - logger.debug("trainer proc env:{}".format(current_env)) - cmd = [sys.executable, "-u", training_script] + training_script_args - logger.info("start trainer proc:{} env:{}".format(cmd, proc_env)) + logger.debug("start trainer proc{} env:{}".format(cmd, current_env)) + + if idx == 0: + logger.info("Local start {} processes. First process distributed " + "environment info (Only For Debug): {}".format( + len(pod.trainers), + pretty_print_envs(proc_env, ("Distributed Envs", + "Value")))) + logger.info( + "More details for debug about commands and environments are written in {}/run.sh". + format(log_dir)) fn = None if log_dir is not None: -- GitLab