diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index a35ab93eb85860544cbc68bd8f2f9390ef7771fc..73e91abbd4a93b230d029d776c7d80bdadeafd66 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -51,7 +51,7 @@ logger = logging.getLogger() logger.setLevel(logging.INFO) log_handler = logging.StreamHandler() log_format = logging.Formatter( - '%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s: %(message)s') + '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') log_handler.setFormatter(log_format) logger.addHandler(log_handler) @@ -71,7 +71,7 @@ def _parse_args(): parser = ArgumentParser( description='''start paddle training using multi-process mode. NOTE: your train program ***must*** run as distributed nccl2 mode, -see: http://www.paddlepaddle.org/documentation/docs/zh/1.2/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- +see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- And your train program must read environment variables below in order to let different process init properly: FLAGS_selected_gpus @@ -147,9 +147,6 @@ def terminate_procs(procs): def start_procs(args): """ """ - procs = [] - log_fns = [] - default_env = os.environ.copy() current_node_ip = args.node_ip @@ -213,48 +210,49 @@ paddlecloud environment.".format(args.cluster_node_ips, node_ips)) current_env.pop("https_proxy", None) procs = [] + log_fns = [] cmds = [] + ranks = [] for i in range(0, selected_gpus_num): + rank = (node_id * selected_gpus_num + i) current_env.update({ "FLAGS_selected_gpus": "%s" % selected_gpus[i], - "PADDLE_TRAINER_ID": "%d" % (node_id * selected_gpus_num + i), + "PADDLE_TRAINER_ID": "%d" % rank, "PADDLE_CURRENT_ENDPOINT": "%s:%d" % (current_node_ip, args.started_port + i), "PADDLE_TRAINERS_NUM": "%d" % nranks, "PADDLE_TRAINER_ENDPOINTS": trainers_endpoints }) - if num_nodes > 1: - current_env.update({"FLAGS_sync_nccl_allreduce": "0"}) - cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args - cmds.append(cmd) if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) fn = open("%s/workerlog.%d" % (args.log_dir, i), "w") log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) else: proc = subprocess.Popen(cmd, env=current_env) procs.append(proc) + ranks.append(rank) try: alive = True error = False + error_rank = [] # wait all process finish or one error while alive and not error: alive = False - for p in procs: + for rank, p in zip(ranks, procs): ret = p.poll() if ret is None: alive = True elif ret != 0: error = True + error_rank.append(rank) time.sleep(1) if error: @@ -266,11 +264,15 @@ paddlecloud environment.".format(args.cluster_node_ips, node_ips)) terminate_procs(procs) raise except SystemExit: - logger.error("One trainer process abort, exit") + logger.error( + "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". + format(nranks, error_rank)) terminate_procs(procs) raise except: - logger.error("Trainer process abort, exit") + logger.error( + "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". + format(nranks, error_rank)) terminate_procs(procs) raise finally: