From 3dd2e3801a9585902e992e921930cf09d467ca85 Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Wed, 5 Aug 2020 10:49:43 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90paddle.fleet=E3=80=91add=20fleetrun=20?= =?UTF-8?q?command=20for=20distributed=20running=20(#25806)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add fleetrun command for distributed running; test=develop --- paddle/scripts/paddle_build.sh | 1 + python/paddle/fleet/cloud_utils.py | 85 ++++ python/paddle/fleet/launch.py | 319 +++++++++++++ python/paddle/fleet/launch_utils.py | 421 ++++++++++++++++++ .../fluid/tests/unittests/CMakeLists.txt | 2 + .../tests/unittests/test_fleet_launch.sh | 101 +++++ python/setup.py.in | 5 + 7 files changed, 934 insertions(+) create mode 100644 python/paddle/fleet/cloud_utils.py create mode 100644 python/paddle/fleet/launch.py create mode 100644 python/paddle/fleet/launch_utils.py create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_launch.sh diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index 0b6b006bbb2..8684851283f 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -958,6 +958,7 @@ function parallel_test() { ut_total_startTime_s=`date +%s` mkdir -p ${PADDLE_ROOT}/build cd ${PADDLE_ROOT}/build + pip install ${PADDLE_ROOT}/build/python/dist/*whl if [ "$WITH_GPU" == "ON" ];then parallel_test_base_gpu else diff --git a/python/paddle/fleet/cloud_utils.py b/python/paddle/fleet/cloud_utils.py new file mode 100644 index 00000000000..72c306fe3b9 --- /dev/null +++ b/python/paddle/fleet/cloud_utils.py @@ -0,0 +1,85 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import paddle +from paddle.fleet.launch_utils import get_cluster, logger + + +def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170): + """ + args_node_ips, args_node_ip:string + """ + #you can automatically get ip info while using paddlecloud multi nodes mode. + node_ips = os.getenv("PADDLE_TRAINERS") + assert node_ips is not None, "PADDLE_TRAINERS should not be None" + + node_ip = os.getenv("POD_IP") + assert node_ip is not None, "POD_IP should not be None" + + node_rank = os.getenv("PADDLE_TRAINER_ID") + assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" + + node_ips = node_ips.split(",") + num_nodes = len(node_ips) + node_rank = int(node_rank) + + if args_node_ips != "127.0.0.1" and args_node_ips != ",".join(node_ips): + logger.warning( + "Please NOTE: When using paddlecloud, cluster_node_ips is \ +automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ +Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ +paddlecloud environment.".format(args_node_ips, node_ips)) + + started_port = args_port + print("num_nodes:", num_nodes) + if num_nodes > 1: + try: + paddle_port = int(os.getenv("PADDLE_PORT", "")) + paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) + + if paddle_port_num >= len( + selected_gpus) and paddle_port != args_port: + logger.warning("Use Cloud specified port:{}.".format( + paddle_port)) + started_port = paddle_port + + except Exception as e: + print(e) + pass + + if started_port is None: + started_port = 6170 + + logger.debug("parsed from args:node_ips:{} \ + node_ip:{} node_rank:{} started_port:{}" + .format(node_ips, node_ip, node_rank, started_port)) + + ports = [x for x in range(started_port, started_port + len(selected_gpus))] + cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) + return cluster, cluster.pods[node_rank] + + +def use_paddlecloud(): + node_ips = os.getenv("PADDLE_TRAINERS") + node_ip = os.getenv("POD_IP") + node_rank = os.getenv("PADDLE_TRAINER_ID") + if node_ips is None or node_ip is None or node_rank is None: + return False + else: + return True + + +def get_trainers_num(): + return int(os.getenv("PADDLE_TRAINERS_NUM", "1")) diff --git a/python/paddle/fleet/launch.py b/python/paddle/fleet/launch.py new file mode 100644 index 00000000000..de5e0b66b3e --- /dev/null +++ b/python/paddle/fleet/launch.py @@ -0,0 +1,319 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +paddle.distributed.launch is a module that spawns multiple distributed +process on each training node for gpu training and cpu training. +Usage: + In both of single node training or multiple node training, this module +launch a process on each of the given gpu card or cpu machine. + GPU training: + 1. for single node training with all visible gpu cards: + fleetrun your_training_py (arg1 arg2 and all others) + 2. for single node training with [0,4) cards + fleetrun --gpus="0,1,2,3" your_training_py (arg1 arg2 and all others) + 3. for multiple node training such as two node:192.168.0.16, 192.168.0.17 + on 192.168.0.16: + fleetrun --ips="192.168.0.16,192.168.0.17" --node_ip=192.168.0.16 \ + your_training_py (arg1 arg2 and all others) + on 192.168.0.17: + fleetrun --ips="192.168.0.16,192.168.0.17" \ + --node_ip=192.168.0.17 \ + your_training_py (arg1 arg2 and all others) + CPU training: + 1. for single node training with multi servers and workers: + fleetrun --server_num=1 --worker_num=4 your_training_py (arg1 arg2 and all others) + 2. for multiple node training such as two node:192.168.0.16, 192.168.0.17 \ + with 2 servers and 4 workers. + on 192.168.0.16: + fleetrun --servers="192.168.0.16:6170,192.168.0.17:6171" \ + --workers="192.168.0.16:6172,192.168.0.17:6173,192.168.0.16:6174,192.168.0.17:6175" \ + your_training_py (arg1 arg2 and all others) + on 192.168.0.17: + fleetrun --servers="192.168.0.16:6170,192.168.0.17:6171" \ + --workers="192.168.0.16:6172,192.168.0.17:6173,192.168.0.16:6174,192.168.0.17:6175" \ + your_training_py (arg1 arg2 and all others) +""" + +from __future__ import print_function +import sys +from sys import version +import subprocess +import os +import time +import six +import copy +from argparse import ArgumentParser, REMAINDER +import paddle +import paddle.fluid as fluid + +from paddle.fleet.launch_utils import * +import paddle.fleet.cloud_utils as cloud_utils + + +def _print_arguments(args): + print("----------- Configuration Arguments -----------") + for arg, value in sorted(six.iteritems(vars(args))): + print("%s: %s" % (arg, value)) + print("------------------------------------------------") + + +def _parse_args(): + """ + Helper function parsing the command line options + @retval ArgumentParser + """ + parser = ArgumentParser( + description='''start paddle training using multi-process mode. +see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- +''') + + #Optional arguments for the launch helper + parser.add_argument( + "--ips", + type=str, + default="127.0.0.1", + help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") + parser.add_argument( + "--gpus", + type=str, + default=None, + help="It's for gpu training and the training process will run on the gpus," + "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." + ) + + parser.add_argument( + "--servers", type=str, default="", help="User defined servers ip:port") + parser.add_argument( + "--workers", type=str, default="", help="User defined workers ip:port") + parser.add_argument( + "--worker_num", type=int, default=2, help="number of workers") + + parser.add_argument( + "--server_num", type=int, default=2, help="number of servers") + + parser.add_argument( + "--log_dir", + type=str, + help="The path for each process's log.If it's not set, the log will printed to default pipe." + ) + #positional + parser.add_argument( + "training_script", + type=str, + help="The full path to the single GPU training " + "program/script to be launched in parallel, " + "followed by all the arguments for the " + "training script") + + #rest from the training program + parser.add_argument('training_script_args', nargs=REMAINDER) + return parser.parse_args() + + +def get_cluster_from_args(args, gpus): + node_ips = [x.strip() for x in args.ips.split(',')] + if len(node_ips) == 1: + node_ip = node_ips[0] + else: + _, node_ip = get_host_name_ip() + + # node_ip = args.node_ip + assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips:{%s}" \ + % (node_ip, node_ips) + node_rank = node_ips.index(node_ip) + + logger.debug("parsed from args:node_ips:{} node_ip:{} node_rank:{}".format( + node_ips, node_ip, node_rank)) + + free_ports = None + if not cloud_utils.use_paddlecloud() and len( + node_ips) <= 1 and os.environ.get('FLAGS_START_PORT') is None: + free_ports = find_free_ports(len(gpus)) + if free_ports is not None: + free_ports = list(free_ports) + else: + start_port = 6070 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = os.environ.get('FLAGS_START_PORT') + + free_ports = [x for x in range(start_port, start_port + len(gpus))] + + return get_cluster(node_ips, node_ip, free_ports, gpus) + + +def get_gpus(gpus): + if gpus is None: + gpus_num = fluid.core.get_cuda_device_count() + gpus = [str(x) for x in range(0, gpus_num)] + else: + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + gpus = [x.strip() for x in gpus.split(',')] + else: + # change gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7; + # therefore gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in gpus.split(','): + assert x in cuda_visible_devices_list, "Can't find "\ + "your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + gpus = [ + cuda_visible_devices_list.index(x.strip()) + for x in gpus.split(',') + ] + + return gpus + + +def launch_collective(args): + # parse arguments, used for cloud-single-machine and local + gpus = get_gpus(args.gpus) + trainers_num = cloud_utils.get_trainers_num() + logger.debug("parsed from args trainerss_num:{} gpus:{}".format( + trainers_num, gpus)) + + cluster = None + pod = None + + if cloud_utils.use_paddlecloud() and trainers_num != 1: + cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus) + logger.info("get cluster from cloud:{}".format(cluster)) + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + cluster, pod = get_cluster_from_args(args, gpus) + logger.info("get cluster from args:{}".format(cluster)) + + procs = start_local_trainers( + cluster, + pod, + training_script=args.training_script, + training_script_args=args.training_script_args, + log_dir=args.log_dir) + + while True: + alive = watch_local_trainers(procs, cluster.trainers_nranks()) + + if not alive: + logger.info("Local procs complete, POD info:{}".format(pod)) + break + + time.sleep(3) + + +def launch_ps(args): + worker_num = args.worker_num + server_num = args.server_num + start_port = 6170 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = os.environ.get('FLAGS_START_PORT') + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + procs = [] + cmds = [] + log_fns = [] + ports = range(start_port, start_port + server_num, 1) + default_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) + user_endpoints = "" + if args.servers == "": + user_endpoints = default_endpoints + else: + user_endpoints = args.servers + user_endpoints_ips = [x.split(":")[0] for x in user_endpoints.split(",")] + user_endpoints_port = [x.split(":")[1] for x in user_endpoints.split(",")] + for i in range(server_num): + current_env.update({ + "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, + "PADDLE_PORT": user_endpoints_port[i], + "TRAINING_ROLE": "PSERVER", + "PADDLE_TRAINERS_NUM": str(worker_num), + "POD_IP": user_endpoints_ips[i] + }) + + cmd = [sys.executable, "-u", args.training_script + ] + args.training_script_args + cmds.append(cmd) + if args.log_dir is not None: + os.system("mkdir -p {}".format(args.log_dir)) + fn = open("%s/serverlog.%d" % (args.log_dir, i), "w") + log_fns.append(fn) + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + procs.append(proc) + + for i in range(worker_num): + current_env.update({ + "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, + "PADDLE_TRAINERS_NUM": str(worker_num), + "TRAINING_ROLE": "TRAINER", + "PADDLE_TRAINER_ID": str(i) + }) + cmd = [sys.executable, "-u", args.training_script + ] + args.training_script_args + cmds.append(cmd) + if args.log_dir is not None: + os.system("mkdir -p {}".format(args.log_dir)) + fn = open("%s/workerlog.%d" % (args.log_dir, i), "w") + log_fns.append(fn) + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + procs.append(proc) + + # only wait worker to finish here + for i, proc in enumerate(procs): + if i < server_num: + continue + procs[i].wait() + if len(log_fns) > 0: + log_fns[i].close() + + print("all workers exit, going to finish parameter server", file=sys.stderr) + for i in range(server_num): + if len(log_fns) > 0: + log_fns[i].close() + procs[i].terminate() + print("all parameter server are killed", file=sys.stderr) + + +def launch(): + args = _parse_args() + logger = get_logger() + _print_arguments(args) + ps_args = ['--worker_num', '--server_num', '--servers', '--workers'] + collective_args = ['--ips', '--gpus'] + has_ps_args = [ + ps_arg for ps_arg in ps_args if ps_arg in " ".join(sys.argv[1:-1]) + ] + has_collective_args = [ + co_arg for co_arg in collective_args + if co_arg in " ".join(sys.argv[1:-1]) + ] + if len(has_ps_args) > 0 or fluid.core.get_cuda_device_count() == 0: + logger.info("Run cpu parameter-sever mode.") + launch_ps(args) + elif len(has_collective_args) > 0: + logger.info("Run gpu collective mode.") + launch_collective(args) + else: + logger.warning( + "Not found distinct args. Default use gpu collective mode") + launch_collective(args) + + +if __name__ == "__main__": + launch() diff --git a/python/paddle/fleet/launch_utils.py b/python/paddle/fleet/launch_utils.py new file mode 100644 index 00000000000..040e7254f8c --- /dev/null +++ b/python/paddle/fleet/launch_utils.py @@ -0,0 +1,421 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import logging +import socket +import time +import os +import signal +import copy +import sys +import subprocess +from contextlib import closing +import socket + +logger = logging.getLogger("root") +logger.propagate = False + + +class Cluster(object): + def __init__(self, hdfs): + self.job_server = None + self.pods = [] + self.hdfs = None + self.job_stage_flag = None + + def __str__(self): + return "job_server:{} pods:{} job_stage_flag:{} hdfs:{}".format( + self.job_server, [str(pod) for pod in self.pods], + self.job_stage_flag, self.hdfs) + + def __eq__(self, cluster): + if len(self.pods) != len(cluster.pods): + return False + + for a, b in zip(self.pods, cluster.pods): + if a != b: + return False + + if self.job_stage_flag != cluster.job_stage_flag: + return False + + return True + + def __ne__(self, cluster): + return not self.__eq__(cluster) + + def update_pods(cluster): + self.pods = copy.copy(cluster.pods) + + def trainers_nranks(self): + return len(self.trainers_endpoints()) + + def pods_nranks(self): + return len(self.pods) + + def trainers_endpoints(self): + r = [] + for pod in self.pods: + for t in pod.trainers: + r.append(t.endpoint) + return r + + def pods_endpoints(self): + r = [] + for pod in self.pods: + ep = "{}:{}".format(pod.addr, pod.port) + assert pod.port != None and pod.addr != None, "{} not a valid endpoint".format( + ep) + r.append(ep) + + return r + + def get_pod_by_id(self, pod_id): + for pod in self.pods: + if str(pod_id) == str(pod.id): + return pod + + return None + + +class JobServer(object): + def __init__(self): + self.endpoint = None + + def __str__(self): + return "{}".format(self.endpoint) + + def __eq__(self, j): + return self.endpint == j.endpoint + + def __ne__(self, j): + return not self == j + + +class Trainer(object): + def __init__(self): + self.gpus = [] + self.endpoint = None + self.rank = None + + def __str__(self): + return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint, + self.rank) + + def __eq__(self, t): + if len(self.gpus) != len(t.gpus): + return False + + if self.endpoint != t.endpoint or \ + self.rank != t.rank: + return False + + for a, b in zip(self.gpus, t.gpus): + if a != b: + return False + + return True + + def __ne__(self, t): + return not self == t + + def rank(self): + return self.rank + + +class Pod(object): + def __init__(self): + self.rank = None + self.id = None + self.addr = None + self.port = None + self.trainers = [] + self.gpus = [] + + def __str__(self): + return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format( + self.rank, self.id, self.addr, self.port, self.gpus, + [str(t) for t in self.trainers]) + + def __eq__(self, pod): + if self.rank != pod.rank or \ + self.id != pod.id or \ + self.addr != pod.addr or \ + self.port != pod.port: + logger.debug("pod {} != pod".format(self, pod)) + return False + + if len(self.trainers) != len(pod.trainers): + logger.debug("trainers {} != {}".format(self.trainers, + pod.trainers)) + return False + + for i in range(len(self.trainers)): + if self.trainers[i] != pod.trainers[i]: + logger.debug("trainer {} != {}".format(self.trainers[i], + pod.trainers[i])) + return False + + return True + + def __ne__(self, pod): + return not self == pod + + def parse_response(self, res_pods): + pass + + def rank(self): + return self.rank + + def get_visible_gpus(self): + r = "" + for g in self.gpus: + r += "{},".format(g) + + assert r != "", "this pod {} can't see any gpus".format(self) + + r = r[:-1] + return r + + +def get_logger(log_level=20, name="root"): + logger = logging.getLogger(name) + logger.setLevel(log_level) + + log_handler = logging.StreamHandler() + log_format = logging.Formatter( + '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') + log_handler.setFormatter(log_format) + logger.addHandler(log_handler) + + return logger + + +def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): + assert type(paddle_ports) is list, "paddle_ports must be list" + cluster = Cluster(hdfs=None) + trainer_rank = 0 + for node_rank, ip in enumerate(node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + for i in range(len(selected_gpus)): + trainer = Trainer() + trainer.gpus.append(selected_gpus[i]) + trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) + trainer.rank = trainer_rank + trainer_rank += 1 + + pod.trainers.append(trainer) + cluster.pods.append(pod) + + pod_rank = node_ips.index(node_ip) + return cluster, cluster.pods[pod_rank] + + +def terminate_local_procs(procs): + for p in procs: + if p.proc.poll() is None: + p.proc.terminate() + p.log_fn.close() + logger.debug("terminate process id:{}".format(p.proc.pid)) + + #wait all process terminiated + time.sleep(3) + for step in range(0, 50): + alive = False + for p in procs: + if p.proc.poll() is None: # not termniate + os.kill(p.proc.pid, signal.SIGKILL) + alive = True + + if not alive: + logger.info("terminate all the procs") + return + + time.sleep(3) + + logger.fatal("can't kill all process and exit") + exit(1) + + +def get_host_name_ip(): + try: + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + return host_name, host_ip + except: + return None + + +def add_arguments(argname, type, default, help, argparser, **kwargs): + """Add argparse's argument. + Usage: + .. code-block:: python + parser = argparse.ArgumentParser() + add_argument("name", str, "Jonh", "User name.", parser) + args = parser.parse_args() + """ + type = distutils.util.strtobool if type == bool else type + argparser.add_argument( + "--" + argname, + default=default, + type=type, + help=help + ' Default: %(default)s.', + **kwargs) + + +def find_free_ports(num): + def __free_port(): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + port_set = set() + step = 0 + while True: + port = __free_port() + if port not in port_set: + port_set.add(port) + + if len(port_set) >= num: + return port_set + + step += 1 + if step > 100: + print( + "can't find avilable port and use the specified static port now!" + ) + return None + + return None + + +class TrainerProc(object): + def __init__(self): + self.proc = None + self.log_fn = None + self.log_offset = None + self.rank = None + self.local_rank = None + self.cmd = None + + +def start_local_trainers(cluster, + pod, + training_script, + training_script_args, + log_dir=None): + current_env = copy.copy(os.environ.copy()) + #paddle broadcast ncclUniqueId use socket, and + #proxy maybe make trainers unreachable, so delete them. + #if we set them to "", grpc will log error message "bad uri" + #so just delete them. + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + procs = [] + for idx, t in enumerate(pod.trainers): + proc_env = { + "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]), + "PADDLE_TRAINER_ID": "%d" % t.rank, + "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, + "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) + } + + current_env.update(proc_env) + + logger.debug("trainer proc env:{}".format(current_env)) + + cmd = [sys.executable, "-u", training_script] + training_script_args + + logger.info("start trainer proc:{} env:{}".format(cmd, proc_env)) + + fn = None + if log_dir is not None: + os.system("mkdir -p {}".format(log_dir)) + fn = open("%s/workerlog.%d" % (log_dir, idx), "a") + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = t.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = fn.tell() if fn else None + tp.cmd = cmd + + procs.append(tp) + + return procs + + +def pull_worker_log(tp): + if tp.log_fn: + with open(tp.log_fn.name, 'r') as fin: + fin.seek(tp.log_offset, 0) + for line in fin: + try: + sys.stdout.write(line) + except UnicodeEncodeError: + sys.stdout.write( + 'UnicodeEncodeError occurs at this line. ' + 'Please refer to the original log file "%s"\n' % + tp.log_fn.name) + tp.log_offset = fin.tell() + + +def watch_local_trainers(procs, nranks): + try: + error = False + error_rank = [] + # wait all process finish or one error + alive = False + for p in procs: + if p.log_fn and p.local_rank == 0: + pull_worker_log(p) + + ret = p.proc.poll() + if ret is None: + alive = True + elif ret != 0: + error = True + error_rank.append(p.rank) + + if error: + terminate_local_procs(procs) + exit(1) + + except KeyboardInterrupt: + logger.warning("KeyboardInterrupt, exit") + terminate_local_procs(procs) + raise + except SystemExit: + logger.error( + "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". + format(nranks, error_rank)) + terminate_local_procs(procs) + raise + except: + logger.error( + "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". + format(nranks, error_rank)) + terminate_local_procs(procs) + raise + + return alive diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 971a94f549f..7696839843b 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -27,6 +27,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_communicator_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo) list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_fleet_checkpoint) list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer) @@ -399,6 +400,7 @@ if(WITH_DISTRIBUTE) py_test_modules(test_fleet_checkpoint MODULES test_fleet_checkpoint) endif() bash_test_modules(test_launch_ps MODULES test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_launch MODULES test_fleet_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) set(dist_ut_port 20001) foreach(TEST_OP ${DIST_TEST_OPS}) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh new file mode 100644 index 00000000000..577f9f6504f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh @@ -0,0 +1,101 @@ +#!/bin/bash +set -e + + +function test_launch_ps(){ + fleetrun --server_num=2 --worker_num=2 fleet_ps_training.py 2> ut.elog + + if grep -q "server are killed" ut.elog; then + echo "test pserver launch succeed" + else + echo "test pserver launch failed" + exit -1 + fi +} + +if [[ ${WITH_GPU} == "OFF" ]]; then + test_launch_ps + exit 0 +fi + +test_launch_ps +# use default values +fleetrun multi_process.py + +# use paddlecloud +echo "begin test use paddlecloud" +cluster_node_ips="127.0.0.1,127.0.0.2" +export PADDLE_TRAINERS_NUM=2 +export POD_IP=127.0.0.1 +export PADDLE_TRAINERS=127.0.0.1,127.0.0.2 +export PADDLE_TRAINER_ID=0 + +export PADDLE_PORT=35019 +export TRAINER_PORTS_NUM=2 + +distributed_args="--ips=${cluster_node_ips} --gpus=0,1 --log_dir=testlog" +CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py + +str1="selected_gpus:0 worker_endpoints:127.0.0.1:35019,127.0.0.1:35020,127.0.0.2:35019,127.0.0.2:35020 trainers_num:4 current_endpoint:127.0.0.1:35019 trainer_id:0" +str2="selected_gpus:1 worker_endpoints:127.0.0.1:35019,127.0.0.1:35020,127.0.0.2:35019,127.0.0.2:35020 trainers_num:4 current_endpoint:127.0.0.1:35020 trainer_id:1" +file_0="multi_process.check_0.log" +file_1="multi_process.check_1.log" + +echo "paddlecloud params test" +if grep -q "$str1" "$file_0"; then + echo "find trainer 0" +else + echo "not find trainer 0" + exit -1 +fi + +if grep -q "$str2" "$file_1"; then + echo "find trainer 1" +else + echo "not find trainer 1" + exit -1 +fi + +# test async poll process +if [ -f $file_0 ]; then + rm $file_0 +fi +if [ -f $file_1 ]; then + rm $file_1 +fi + + +unset PADDLE_PORT +unset TRAINER_PORTS_NUM + +echo "" +echo "paddle.distributed.launch async poll process test" +if ! CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py abort; then + echo "train abort as planned" +fi + +abort_str1="abort>>> selected_gpus:0 worker_endpoints:127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 trainers_num:4 current_endpoint:127.0.0.1:6170 trainer_id:0" + +if grep -q "$abort_str1" "$file_0"; then + echo "trainer 0 abort as planned" +else + echo "trainer 0 not abort as planned" + exit -1 +fi + +if [ ! -f $file_1 ]; then + echo "trainer 1 terminate as planned" +else + echo "trainer 1 not terminate as planned" + exit -1 +fi + +#test for random ports +file_0_0="test_launch_filelock_0_0.log" +file_1_0="test_launch_filelock_1_0.log" +rm -rf $file_0_0 $file_0_1 + +distributed_args="--gpus=0,1 --log_dir=testlog" +export PADDLE_LAUNCH_LOG="test_launch_filelock_0" +CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} find_ports.py +str_0="worker_endpoints:127.0.0.1:6070,127.0.0.1:6071" diff --git a/python/setup.py.in b/python/setup.py.in index 67923c282a3..df200da2cfc 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -475,6 +475,11 @@ with redirect_stdout(): cmdclass={ 'install_headers': InstallHeaders, 'install': InstallCommand, + }, + entry_points={ + 'console_scripts': [ + 'fleetrun = paddle.fleet.launch:launch' + ] } ) -- GitLab