diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index 8f9d080b6bb936f50bc1dacba425a738007c3ac9..a35ab93eb85860544cbc68bd8f2f9390ef7771fc 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -36,16 +36,25 @@ launch a process on each of the given gpu card. """ from __future__ import print_function +import logging import sys from sys import version import subprocess import os -import warnings +import time import six import copy from argparse import ArgumentParser, REMAINDER import paddle.fluid as fluid +logger = logging.getLogger() +logger.setLevel(logging.INFO) +log_handler = logging.StreamHandler() +log_format = logging.Formatter( + '%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s: %(message)s') +log_handler.setFormatter(log_format) +logger.addHandler(log_handler) + def _print_arguments(args): print("----------- Configuration Arguments -----------") @@ -129,6 +138,12 @@ POD_IP (current node ip address, not needed for local training) return parser.parse_args() +def terminate_procs(procs): + for p in procs: + if p.poll() is None: + p.terminate() + + def start_procs(args): """ """ @@ -154,14 +169,14 @@ def start_procs(args): node_id = int(node_id) if args.node_ip != "127.0.0.1" and current_node_ip != args.node_ip: - warnings.warn( + logger.warning( "Please NOTE: When using paddlecloud, current_node_ip is \ automatically got from POD_IP. Your input node_ip: {} doesn't equals to \ current_node_ip: {} from paddlecloud environment." .format(args.node_ip, current_node_ip)) if args.cluster_node_ips != "127.0.0.1" and args.cluster_node_ips != ",".join( node_ips): - warnings.warn( + logger.warning( "Please NOTE: When using paddlecloud, cluster_node_ips is \ automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ @@ -228,16 +243,39 @@ paddlecloud environment.".format(args.cluster_node_ips, node_ips)) procs.append(proc) - for i in range(0, len(procs)): - proc = procs[i] - - proc.wait() - if len(log_fns) > 0: - log_fns[i].close() - - if proc.returncode != 0: - raise subprocess.CalledProcessError( - returncode=procs[i].returncode, cmd=cmds[i]) + try: + alive = True + error = False + # wait all process finish or one error + while alive and not error: + alive = False + for p in procs: + ret = p.poll() + if ret is None: + alive = True + elif ret != 0: + error = True + time.sleep(1) + + if error: + terminate_procs(procs) + exit(1) + + except KeyboardInterrupt: + logger.warning("KeyboardInterrupt, exit") + terminate_procs(procs) + raise + except SystemExit: + logger.error("One trainer process abort, exit") + terminate_procs(procs) + raise + except: + logger.error("Trainer process abort, exit") + terminate_procs(procs) + raise + finally: + for fn in log_fns: + fn.close() def launch(): diff --git a/python/paddle/fluid/tests/unittests/multi_process.py b/python/paddle/fluid/tests/unittests/multi_process.py index f5870edf96cf43d17460b24ce1390f06b2017b24..a67634adfcc0c27d5c9b470c81b880af9130462f 100644 --- a/python/paddle/fluid/tests/unittests/multi_process.py +++ b/python/paddle/fluid/tests/unittests/multi_process.py @@ -13,6 +13,8 @@ # limitations under the License. import os +import sys +import time def train(): @@ -31,5 +33,39 @@ def train(): f.write(name) +def train_abort(): + selected_gpus = os.getenv("FLAGS_selected_gpus") + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS") + current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + worker_endpoints = worker_endpoints_env + trainers_num = len(worker_endpoints.split(',')) + + if trainer_id == 0: + try: + # train abort + exit(1) + except SystemExit: + name = "abort>>> selected_gpus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\ + .format(selected_gpus, worker_endpoints, trainers_num, current_endpoint,trainer_id) + print(name) + with open("multi_process.check_{}.log".format(trainer_id), + "w") as f: + f.write(name) + raise + else: + # sleep 30s to make sure paddle.distributed.launch will terminate this process + time.sleep(30) + name = "selected_gpus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\ + .format(selected_gpus, worker_endpoints, trainers_num, current_endpoint,trainer_id) + + print(name) + with open("multi_process.check_{}.log".format(trainer_id), "w") as f: + f.write(name) + + if __name__ == '__main__': - train() + if len(sys.argv) == 2 and sys.argv[1] == "abort": + train_abort() + else: + train() diff --git a/python/paddle/fluid/tests/unittests/test_launch.sh b/python/paddle/fluid/tests/unittests/test_launch.sh index 87dc9bad96f4c192cdffaaccd48101e15c787015..1419ba7335b247031edfc1c67eaf490db646a57b 100644 --- a/python/paddle/fluid/tests/unittests/test_launch.sh +++ b/python/paddle/fluid/tests/unittests/test_launch.sh @@ -1,5 +1,5 @@ #!/bin/bash -set -ex +set -e # use default values python -m paddle.distributed.launch multi_process.py @@ -33,3 +33,33 @@ else echo "not find trainer 1" exit -1 fi + +# test async poll process +if [ -f $file_0 ]; then + rm $file_0 +fi +if [ -f $file_1 ]; then + rm $file_1 +fi + +echo "" +echo "paddle.distributed.launch async poll process test" +if ! python -m paddle.distributed.launch ${distributed_args} multi_process.py abort; then + echo "train abort as planned" +fi + +abort_str1="abort>>> selected_gpus:0 worker_endpoints:127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 trainers_num:4 current_endpoint:127.0.0.1:6170 trainer_id:0" + +if grep -q "$abort_str1" "$file_0"; then + echo "trainer 0 abort as planned" +else + echo "trainer 0 not abort as planned" + exit -1 +fi + +if [ ! -f $file_1 ]; then + echo "trainer 1 terminate as planned" +else + echo "trainer 1 not terminate as planned" + exit -1 +fi