From 1358397e97dc130914625b1ab640c425831814a4 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 26 Nov 2020 10:35:26 +0800 Subject: [PATCH] Clean up the redundant files and unify the launch interface. (#28928) --- python/paddle/distributed/cloud_utils.py | 25 +- .../paddle/distributed/fleet/cloud_utils.py | 13 +- python/paddle/distributed/fleet/launch.py | 39 ++- .../paddle/distributed/fleet/launch_utils.py | 75 +++++- python/paddle/distributed/launch.py | 245 +----------------- python/paddle/distributed/launch_ps.py | 165 ------------ python/paddle/distributed/spawn.py | 11 +- python/paddle/distributed/utils.py | 67 +++++ python/paddle/fluid/dygraph/parallel.py | 10 +- .../fluid/tests/unittests/CMakeLists.txt | 23 +- .../fluid/tests/unittests/detected_gpu.py | 26 ++ .../fluid/tests/unittests/nproc_process.py | 38 +++ .../tests/unittests/test_fleet_launch.sh | 132 ---------- .../unittests/test_fleet_launch_async.sh | 54 ++++ .../unittests/test_fleet_launch_cloud.sh | 59 +++++ .../unittests/test_fleet_launch_nproc.sh | 116 +++++++++ .../tests/unittests/test_fleet_launch_ps.sh | 62 +++++ .../unittests/test_fleet_run_random_port.sh | 27 ++ .../fluid/tests/unittests/test_fleetrun.sh | 20 ++ .../fluid/tests/unittests/test_launch.sh | 85 ------ .../tests/unittests/test_launch_coverage.py | 120 +++++++++ .../fluid/tests/unittests/test_launch_ps.sh | 12 - 22 files changed, 745 insertions(+), 679 deletions(-) delete mode 100644 python/paddle/distributed/launch_ps.py create mode 100644 python/paddle/fluid/tests/unittests/detected_gpu.py create mode 100644 python/paddle/fluid/tests/unittests/nproc_process.py delete mode 100644 python/paddle/fluid/tests/unittests/test_fleet_launch.sh create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_launch_async.sh create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_launch_cloud.sh create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_launch_ps.sh create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_run_random_port.sh create mode 100644 python/paddle/fluid/tests/unittests/test_fleetrun.sh delete mode 100644 python/paddle/fluid/tests/unittests/test_launch.sh create mode 100644 python/paddle/fluid/tests/unittests/test_launch_coverage.py delete mode 100644 python/paddle/fluid/tests/unittests/test_launch_ps.sh diff --git a/python/paddle/distributed/cloud_utils.py b/python/paddle/distributed/cloud_utils.py index 5b7268e4b64..ae603a0e60b 100644 --- a/python/paddle/distributed/cloud_utils.py +++ b/python/paddle/distributed/cloud_utils.py @@ -14,7 +14,7 @@ import os import paddle -from paddle.distributed.utils import get_cluster, logger +from paddle.distributed.utils import get_cluster, logger, get_gpus, get_cluster_from_args def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus): @@ -94,5 +94,26 @@ paddlecloud environment.".format(args_node_ips, node_ips)) return cluster, cluster.pods[node_rank] -def get_trainers_num(): +def _get_trainers_num(): return int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + + +def get_cluster_and_pod(args): + # parse arguments, used for cloud-single-machine and local + selected_gpus = get_gpus(args.selected_gpus) + trainers_num = _get_trainers_num() + logger.debug("parsed from args trainerss_num:{} selected_gpus:{}".format( + trainers_num, selected_gpus)) + + cluster = None + pod = None + + if args.use_paddlecloud and trainers_num != 1: + cluster, pod = get_cloud_cluster(args.cluster_node_ips, args.node_ip, + args.started_port, selected_gpus) + logger.info("get cluster from cloud:{}".format(cluster)) + else: + cluster, pod = get_cluster_from_args(args, selected_gpus) + logger.info("get cluster from args:{}".format(cluster)) + + return cluster, pod diff --git a/python/paddle/distributed/fleet/cloud_utils.py b/python/paddle/distributed/fleet/cloud_utils.py index a1203bed85c..e05196f6314 100644 --- a/python/paddle/distributed/fleet/cloud_utils.py +++ b/python/paddle/distributed/fleet/cloud_utils.py @@ -17,9 +17,12 @@ import paddle from paddle.distributed.fleet.launch_utils import get_cluster, logger -def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170): +def get_cloud_cluster(args_node_ips, + device_mode, + devices_per_proc, + args_port=6170): """ - args_node_ips:string, selected_gpus:list, args_port: int + args_node_ips:string, device_mode:DeviceMode(IntEnum), device_per_proc:list, args_port: int """ #you can automatically get ip info while using paddlecloud multi nodes mode. node_ips = os.getenv("PADDLE_TRAINERS") @@ -55,7 +58,7 @@ paddlecloud environment.".format(args_node_ips, node_ips)) paddle_port = int(os.getenv("PADDLE_PORT", "")) if paddle_ports_num >= len( - selected_gpus) and paddle_port != args_port: + devices_per_proc) and paddle_port != args_port: logger.warning("Use Cloud specified port:{}.".format( paddle_port)) started_port = paddle_port @@ -67,7 +70,7 @@ paddlecloud environment.".format(args_node_ips, node_ips)) if started_port is None: started_port = 6170 ports = [ - x for x in range(started_port, started_port + len(selected_gpus)) + x for x in range(started_port, started_port + len(devices_per_proc)) ] trainer_endpoints = [] for ip in node_ips: @@ -85,7 +88,7 @@ paddlecloud environment.".format(args_node_ips, node_ips)) .format(node_ips, node_ip, node_rank, trainer_endpoints)) cluster, pod = get_cluster(node_ips, node_ip, trainer_endpoints, - selected_gpus) + device_mode, devices_per_proc) return cluster, cluster.pods[node_rank] diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index c48ce1a0f33..fbace6ba1f3 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -68,7 +68,9 @@ import copy from argparse import ArgumentParser, REMAINDER import paddle import paddle.fluid as fluid +from paddle.distributed.fleet import launch_utils +# TODO(danleifeng): Don't import * from a module from paddle.distributed.fleet.launch_utils import * import paddle.distributed.fleet.cloud_utils as cloud_utils @@ -98,12 +100,21 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra help="The path for each process's log.If it's not set, the log will printed to default pipe." ) + base_group.add_argument( + "--nproc_per_node", + type=int, + default=None, + help="The number of processes to launch on a node." + "In gpu training, it should be less or equal to the gpus number of you system(or you set by --gpus). And so each process can" + " bound to one or average number of gpus.") + base_group.add_argument( "--gpus", type=str, default=None, - help="It's for gpu training and the training process will run on the gpus," - "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." + help="It's for gpu training." + "For example:" + "--gpus=\"0,1,2,3\" will launch four training processes each bound to one gpu." ) base_group.add_argument( @@ -146,14 +157,13 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra return parser.parse_args() -def get_cluster_from_args(args, gpus): +def get_cluster_from_args(args, device_mode, devices_per_proc): node_ips = [x.strip() for x in args.ips.split(',')] if len(node_ips) == 1: node_ip = node_ips[0] else: _, node_ip = get_host_name_ip() - # node_ip = args.node_ip assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \ % (node_ip, node_ips) node_rank = node_ips.index(node_ip) @@ -164,7 +174,7 @@ def get_cluster_from_args(args, gpus): free_ports = None if not cloud_utils.use_paddlecloud() and len( node_ips) <= 1 and os.environ.get('FLAGS_START_PORT') is None: - free_ports = find_free_ports(len(gpus)) + free_ports = find_free_ports(len(devices_per_proc)) if free_ports is not None: free_ports = list(free_ports) else: @@ -172,20 +182,23 @@ def get_cluster_from_args(args, gpus): if os.environ.get('FLAGS_START_PORT') is not None: start_port = int(os.environ.get('FLAGS_START_PORT')) - free_ports = [x for x in range(start_port, start_port + len(gpus))] + free_ports = [ + x for x in range(start_port, start_port + len(devices_per_proc)) + ] trainer_endpoints = [] for ip in node_ips: trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) - return get_cluster(node_ips, node_ip, trainer_endpoints, gpus) + return get_cluster(node_ips, node_ip, trainer_endpoints, device_mode, + devices_per_proc) def launch_collective(args): # parse arguments, used for cloud-single-machine and local - gpus = get_gpus(args.gpus) + (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) trainers_num = cloud_utils.get_trainers_num() - logger.debug("parsed from args trainerss_num:{} gpus:{}".format( - trainers_num, gpus)) + logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format( + trainers_num, device_mode, devices_per_proc)) cluster = None pod = None @@ -194,11 +207,13 @@ def launch_collective(args): if os.environ.get('FLAGS_START_PORT') is not None: start_port = os.environ.get('FLAGS_START_PORT') if cloud_utils.use_paddlecloud() and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus, start_port) + cluster, pod = cloud_utils.get_cloud_cluster( + args.ips, device_mode, devices_per_proc, start_port) logger.debug("get cluster from cloud:{}".format(cluster)) else: # trainers_num = 1 or not use paddlecloud ips="a,b" - cluster, pod = get_cluster_from_args(args, gpus) + cluster, pod = get_cluster_from_args(args, device_mode, + devices_per_proc) logger.debug("get cluster from args:{}".format(cluster)) global_envs = copy.copy(os.environ.copy()) diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 2ae5747af9e..526d586f1c3 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -26,6 +26,8 @@ import shutil from contextlib import closing import socket import warnings +import six +from enum import IntEnum import paddle import paddle.fluid as fluid @@ -33,7 +35,7 @@ logger = logging.getLogger("root") logger.propagate = False -class DistributeMode: +class DistributeMode(IntEnum): """ There are various mode for fleetrun, each of them is designed for different model. """ @@ -42,6 +44,16 @@ class DistributeMode: PS_HETER = 2 +class DeviceMode(IntEnum): + """ + Training devices type + """ + CPU = 0 + GPU = 1 + KUNLUN = 2 + UNKNOWN = 3 + + class Cluster(object): def __init__(self, hdfs): self.job_server = None @@ -243,7 +255,8 @@ def get_logger(log_level=20, name="root"): return logger -def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): +def get_cluster(node_ips, node_ip, trainer_endpoints, device_mode, + devices_per_proc): assert type(trainer_endpoints) is list, "trainer_endpoints must be list" cluster = Cluster(hdfs=None) trainer_rank = 0 @@ -252,13 +265,17 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): pod.rank = node_rank pod.addr = ip cur_node_endpoints = trainer_endpoints[node_rank] - # when use paddlecloud, endpoints may > selected_gpus(user_defined) + # when use paddlecloud, endpoints may > devices_per_proc(user_defined) assert len(cur_node_endpoints) >= len( - selected_gpus + devices_per_proc ), "current trainer_endpoints size should be greater equal than selected_gpus size." - for i in range(len(selected_gpus)): + for i in range(len(devices_per_proc)): trainer = Trainer() - trainer.gpus.append(selected_gpus[i]) + if device_mode == DeviceMode.GPU: + if isinstance(devices_per_proc[i], (list, tuple)): + trainer.gpus.extend(devices_per_proc[i]) + else: + trainer.gpus.append(devices_per_proc[i]) trainer.endpoint = "%s" % (cur_node_endpoints[i]) trainer.rank = trainer_rank trainer_rank += 1 @@ -432,13 +449,16 @@ def start_local_trainers(cluster, procs = [] for idx, t in enumerate(pod.trainers): proc_env = { - "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]), "PADDLE_TRAINER_ID": "%d" % t.rank, "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) } + if len(t.gpus) > 0: + proc_env["FLAGS_selected_gpus"] = "%s" % ",".join( + [str(g) for g in t.gpus]) + current_env.update(proc_env) cmd = [sys.executable, "-u", training_script] + training_script_args @@ -565,6 +585,47 @@ def get_gpus(gpus): return res_gpus +def get_device_mode(): + #TODO(gongwb):Add XPU supported + if not fluid.core.is_compiled_with_cuda( + ) or fluid.core.get_cuda_device_count() <= 0: + print("launch train in CPU mode") + return DeviceMode.CPU + + print("launch train in GPU mode") + return DeviceMode.GPU + + +def get_device_proc_info(args): + # device_mode + device_mode = get_device_mode() + + # devices + devices_per_proc = [] + if device_mode == DeviceMode.GPU: + gpus = get_gpus(args.gpus) + if args.nproc_per_node is not None: + assert (len(gpus) % int(args.nproc_per_node)) ==0, \ + "gpus' number:{} mod args.nproc_per_node:{} must == 0".format(len(gpus), arg.nproc_per_node) + + n = int(len(gpus) / int(args.nproc_per_node)) + devices_per_proc = [ + gpus[i:i + n] for i in six.moves.range(0, len(gpus), n) + ] + else: + devices_per_proc = gpus + elif device_mode == DeviceMode.CPU: + if args.nproc_per_node is None: + devices_per_proc = [0] + else: + devices_per_proc = [x for x in range(0, args.nproc_per_node)] + else: + assert False, "Can't support device_mode:{}, support only cpu and gpu now.".format( + device_mode) + + return (device_mode, devices_per_proc) + + def direct_start(args): # run ps-cpu mode on paddlecloud, using given envs cmd = [sys.executable, "-u", args.training_script] + \ diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index 060e742ad6c..df3a3407bf5 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -1,249 +1,16 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -r""" -paddle.distributed.launch is a module that spawns multiple distributed -process on each training node for gpu training. -Usage: - In both of single node training or multiple node training, this module -launch a process on each of the given gpu card. - 1. for single node training with all visible gpu cards: - python -m paddle.distributed.launch \ - your_training_py (arg1 arg2 and all others) - - 2. for single node training with [0,4) cards - python -m paddle.distributed.launch --selected_gpus="0,1,2,3" \ - your_training_py (arg1 arg2 and all others) - 3. for multiple node training such as two node:192.168.0.16, 192.168.0.17 - on 192.168.0.16: - python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \ - --node_ip=192.168.0.16 \ - your_training_py (arg1 arg2 and all others) - on 192.168.0.17: - python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \ - --node_ip=192.168.0.17 \ - your_training_py (arg1 arg2 and all others) -""" -from __future__ import print_function -import sys -from sys import version -import subprocess -import os -import time -import six -import copy -from argparse import ArgumentParser, REMAINDER - -from paddle.distributed.utils import * -from paddle.distributed import cloud_utils - - -def _print_arguments(args): - print("----------- Configuration Arguments -----------") - for arg, value in sorted(six.iteritems(vars(args))): - print("%s: %s" % (arg, value)) - print("------------------------------------------------") - - -def _parse_args(): - """ - Helper function parsing the command line options - @retval ArgumentParser - """ - parser = ArgumentParser( - description='''start paddle training using multi-process mode. -NOTE: your train program ***must*** run as distributed nccl2 mode, -see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- -And your train program must read environment variables below in order to let different -process init properly: -FLAGS_selected_gpus -PADDLE_TRAINER_ID -PADDLE_CURRENT_ENDPOINT -PADDLE_TRAINERS_NUM -PADDLE_TRAINER_ENDPOINTS -POD_IP (current node ip address, not needed for local training) -''') - - #Optional arguments for the launch helper - parser.add_argument( - "--cluster_node_ips", - type=str, - default="127.0.0.1", - help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") - parser.add_argument( - "--node_ip", - type=str, - default="127.0.0.1", - help="The current node ip. ") - parser.add_argument( - "--use_paddlecloud", - action='store_true', - help="wheter to use paddlecloud platform to run your multi-process job. If false, no need to set this argument." - ) - parser.add_argument( - "--started_port", - type=int, - default=None, - help="The trainer's started port on a single node") - - parser.add_argument( - "--print_config", - type=bool, - default=True, - help="Print the config or not") - - parser.add_argument( - "--selected_gpus", - type=str, - default=None, - help="It's for gpu training and the training process will run on the selected_gpus," - "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." - ) - - parser.add_argument( - "--log_level", - type=int, - default=20, # logging.INFO, details are here:https://docs.python.org/3/library/logging.html#levels - help="Logging level, default is logging.INFO") - - parser.add_argument( - "--log_dir", - type=str, - help="The path for each process's log.If it's not set, the log will printed to default pipe." - ) - - #positional - parser.add_argument( - "training_script", - type=str, - help="The full path to the single GPU training " - "program/script to be launched in parallel, " - "followed by all the arguments for the " - "training script") - - #rest from the training program - parser.add_argument('training_script_args', nargs=REMAINDER) - return parser.parse_args() - - -def get_cluster_from_args(args, selected_gpus): - node_ips = [x.strip() for x in args.cluster_node_ips.split(',')] - node_ip = args.node_ip - node_rank = node_ips.index(node_ip) - - logger.debug("parsed from args:node_ips:{} node_ip:{} node_rank:{}".format( - node_ips, node_ip, node_rank)) - - free_ports = None - if not args.use_paddlecloud and len( - node_ips) <= 1 and args.started_port is None: - free_ports = find_free_ports(len(selected_gpus)) - if free_ports is not None: - free_ports = list(free_ports) - else: - started_port = 6070 - if args.started_port is not None: - started_port = args.started_port - - free_ports = [ - x for x in range(started_port, started_port + len(selected_gpus)) - ] - - trainer_endpoints = [] - for ip in node_ips: - trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) - return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus) - - -def get_gpus(selected_gpus): - if selected_gpus is None: - from paddle.fluid import core - gpus_num = core.get_cuda_device_count() - gpus = [str(x) for x in range(0, gpus_num)] - else: - cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") - if cuda_visible_devices is None or cuda_visible_devices == "": - gpus = [x.strip() for x in selected_gpus.split(',')] - else: - # change selected_gpus into relative values - # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; - # therefore selected_gpus=0,1,2,3 - cuda_visible_devices_list = cuda_visible_devices.split(',') - for x in selected_gpus.split(','): - assert x in cuda_visible_devices_list, "Can't find "\ - "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ - % (x, cuda_visible_devices) - gpus = [ - cuda_visible_devices_list.index(x.strip()) - for x in selected_gpus.split(',') - ] - logger.info("Change selected_gpus into reletive values. --ips:{} " - "will change into relative_ips:{} according to your " - "CUDA_VISIBLE_DEVICES:{}".format( - selected_gpus, gpus, cuda_visible_devices_list)) - - return gpus - - -def get_cluster_and_pod(args): - # parse arguments, used for cloud-single-machine and local - selected_gpus = get_gpus(args.selected_gpus) - trainers_num = cloud_utils.get_trainers_num() - logger.debug("parsed from args trainerss_num:{} selected_gpus:{}".format( - trainers_num, selected_gpus)) - - cluster = None - pod = None - - if args.use_paddlecloud and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster( - args.cluster_node_ips, args.node_ip, args.started_port, - selected_gpus) - logger.info("get cluster from cloud:{}".format(cluster)) - else: - cluster, pod = get_cluster_from_args(args, selected_gpus) - logger.info("get cluster from args:{}".format(cluster)) - - return cluster, pod - - -def launch(args): - cluster, pod = get_cluster_and_pod(args) - - procs = start_local_trainers( - cluster, - pod, - training_script=args.training_script, - training_script_args=args.training_script_args, - log_dir=args.log_dir) - - while True: - alive = watch_local_trainers(procs, cluster.trainers_nranks()) - - if not alive: - logger.info("Local procs complete, POD info:{}".format(pod)) - break - - time.sleep(3) - - -if __name__ == "__main__": - args = _parse_args() - - logger = get_logger(args.log_level) - - if args.print_config: - _print_arguments(args) - - launch(args) +from paddle.distributed.fleet import launch +launch.launch() diff --git a/python/paddle/distributed/launch_ps.py b/python/paddle/distributed/launch_ps.py deleted file mode 100644 index 49b6dccc98e..00000000000 --- a/python/paddle/distributed/launch_ps.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -from __future__ import unicode_literals -import subprocess -import sys -import os -import copy -from argparse import ArgumentParser, REMAINDER - - -def parse_args(): - # Optional arguments for the launch helper - parser = ArgumentParser(description="Distributed training") - parser.add_argument( - "--cluster_node_ips", - type=str, - default="127.0.0.1", - help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") - - parser.add_argument( - "--node_ip", - type=str, - default="127.0.0.1", - help="The current node ip. ") - - parser.add_argument( - "--start_port", - type=int, - default=6170, - help="The trainer's start port on a single node") - - parser.add_argument( - "--print_config", - type=bool, - default=True, - help="Print the config or not") - - parser.add_argument( - "--endpoints", type=str, default="", help="User defined endpoints") - - parser.add_argument( - "--worker_num", type=int, default=2, help="number of workers") - - parser.add_argument( - "--server_num", type=int, default=2, help="number of servers") - - parser.add_argument( - "--log_dir", - default="logs", - type=str, - help="The path for each process's log.If it's not set, the log will printed to default pipe." - ) - - # positional - parser.add_argument( - "training_script", - type=str, - help="The full path to the single GPU training " - "program/script to be launched in parallel, " - "followed by all the arguments for the " - "training script") - - # rest from the training program - parser.add_argument('training_script_args', nargs=REMAINDER) - return parser.parse_args() - - -def start_procs(args): - worker_num = args.worker_num - server_num = args.server_num - start_port = args.start_port - default_env = os.environ.copy() - current_env = copy.copy(default_env) - current_env.pop("http_proxy", None) - current_env.pop("https_proxy", None) - procs = [] - cmds = [] - log_fns = [] - ports = range(start_port, start_port + server_num, 1) - default_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - user_endpoints = "" - if args.endpoints == "": - user_endpoints = default_endpoints - else: - user_endpoints = args.endpoints - user_endpoints_ips = [x.split(":")[0] for x in user_endpoints.split(",")] - user_endpoints_port = [x.split(":")[1] for x in user_endpoints.split(",")] - for i in range(server_num): - current_env.update({ - "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, - "PADDLE_PORT": user_endpoints_port[i], - "TRAINING_ROLE": "PSERVER", - "PADDLE_TRAINERS_NUM": str(worker_num), - "POD_IP": user_endpoints_ips[i] - }) - - cmd = [sys.executable, "-u", args.training_script - ] + args.training_script_args - cmds.append(cmd) - if args.log_dir is not None: - os.system("mkdir -p {}".format(args.log_dir)) - fn = open("%s/serverlog.%d" % (args.log_dir, i), "w") - log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) - else: - proc = subprocess.Popen(cmd, env=current_env) - procs.append(proc) - - for i in range(worker_num): - current_env.update({ - "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, - "PADDLE_TRAINERS_NUM": str(worker_num), - "TRAINING_ROLE": "TRAINER", - "PADDLE_TRAINER_ID": str(i) - }) - cmd = [sys.executable, "-u", args.training_script - ] + args.training_script_args - cmds.append(cmd) - if args.log_dir is not None: - os.system("mkdir -p {}".format(args.log_dir)) - fn = open("%s/workerlog.%d" % (args.log_dir, i), "w") - log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) - else: - proc = subprocess.Popen(cmd, env=current_env) - procs.append(proc) - - # only wait worker to finish here - for i, proc in enumerate(procs): - if i < server_num: - continue - procs[i].wait() - if len(log_fns) > 0: - log_fns[i].close() - - print("all workers exit, going to finish parameter server", file=sys.stderr) - for i in range(server_num): - if len(log_fns) > 0: - log_fns[i].close() - procs[i].terminate() - print("all parameter server are killed", file=sys.stderr) - - -def launch(): - args = parse_args() - if args.print_config: - start_procs(args) - - -# server num, worker num -if __name__ == "__main__": - launch() diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index 2d1ff128d81..433662e8ebc 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -21,8 +21,8 @@ import six import sys import warnings -from paddle.distributed.launch import get_cluster_and_pod, _print_arguments -from paddle.distributed.utils import _prepare_trainer_env +from paddle.distributed.utils import _print_arguments, _prepare_trainer_env +from paddle.distributed.cloud_utils import get_cluster_and_pod from paddle.device import get_device # deprecated module import @@ -30,10 +30,6 @@ from paddle.fluid import core from paddle.fluid.framework import _cpu_num -# NOTE(chenweihang): The existence of this class leads to -# the maintenance of two arguments. When the launch.py arguments -# is updated, the arguments here also need to be updated, -# but I have not thought of a better way here class ParallelEnvArgs(object): def __init__(self): # Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17.. @@ -136,7 +132,6 @@ def _get_subprocess_env_list(nprocs, options): args.use_paddlecloud = options.get('use_paddlecloud', False) args.print_config = options.get('print_config', False) - # reuse code of launch.py cluster, pod = get_cluster_and_pod(args) # prepare subprocess env list @@ -151,7 +146,7 @@ def _get_subprocess_env_list(nprocs, options): def _remove_risky_env(): - # remove useless env vars, same as launch.py + # remove useless env vars # no copy, each process will hold env vars itself os.environ.pop("http_proxy", None) os.environ.pop("https_proxy", None) diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py index be144a55b86..54efce052ea 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils.py @@ -20,6 +20,7 @@ import os import signal import copy import sys +import six import subprocess from contextlib import closing import socket @@ -28,6 +29,72 @@ logger = logging.getLogger("root") logger.propagate = False +def get_cluster_from_args(args, selected_gpus): + node_ips = [x.strip() for x in args.cluster_node_ips.split(',')] + node_ip = args.node_ip + node_rank = node_ips.index(node_ip) + + logger.debug("parsed from args:node_ips:{} node_ip:{} node_rank:{}".format( + node_ips, node_ip, node_rank)) + + free_ports = None + if not args.use_paddlecloud and len( + node_ips) <= 1 and args.started_port is None: + free_ports = find_free_ports(len(selected_gpus)) + if free_ports is not None: + free_ports = list(free_ports) + else: + started_port = 6070 + if args.started_port is not None: + started_port = args.started_port + + free_ports = [ + x for x in range(started_port, started_port + len(selected_gpus)) + ] + + trainer_endpoints = [] + for ip in node_ips: + trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports]) + return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus) + + +def get_gpus(selected_gpus): + if selected_gpus is None: + from paddle.fluid import core + gpus_num = core.get_cuda_device_count() + gpus = [str(x) for x in range(0, gpus_num)] + else: + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + gpus = [x.strip() for x in selected_gpus.split(',')] + else: + # change selected_gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; + # therefore selected_gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in selected_gpus.split(','): + assert x in cuda_visible_devices_list, "Can't find "\ + "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + gpus = [ + cuda_visible_devices_list.index(x.strip()) + for x in selected_gpus.split(',') + ] + logger.info("Change selected_gpus into reletive values. --ips:{} " + "will change into relative_ips:{} according to your " + "CUDA_VISIBLE_DEVICES:{}".format( + selected_gpus, gpus, cuda_visible_devices_list)) + + return gpus + + +def _print_arguments(args): + print("----------- Configuration Arguments -----------") + for arg, value in sorted(six.iteritems(vars(args))): + print("%s: %s" % (arg, value)) + print("------------------------------------------------") + + class Hdfs(object): def __init__(self): self.hdfs_ugi = None diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index cbe78c4d208..83b6cf34134 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -69,7 +69,7 @@ class ParallelEnv(object): This class is used to obtain the environment variables required for the parallel execution of ``paddle.nn.Layer`` in dynamic mode. - The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch`` + The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch`` or ``paddle.distributed.spawn`` . Examples: @@ -104,7 +104,11 @@ class ParallelEnv(object): def __init__(self): self._rank = int(os.getenv("PADDLE_TRAINER_ID", "0")) self._world_size = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) - self._device_id = int(os.getenv("FLAGS_selected_gpus", "0")) + + # imperative only support one gpu + selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",") + self._device_id = int(selected_gpus[0]) + self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", "").split(",") self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "") @@ -347,7 +351,7 @@ class DataParallel(layers.Layer): 2. start by ``paddle.distributed.launch`` module, for example: - ``python -m paddle.distributed.launch --selected_gpus=0,1 demo.py`` . + ``python -m paddle.distributed.launch --gpus=0,1 demo.py`` . And the content of `demo.py` is the code of examples. diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 094cfdd4a99..2bb3b45bc41 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -26,14 +26,18 @@ list(APPEND MIXED_DIST_TEST_OPS test_dgc_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_simple_dist_transpiler) list(APPEND MIXED_DIST_TEST_OPS test_recv_save_op) list(APPEND MIXED_DIST_TEST_OPS test_transpiler_ops) -list(APPEND MIXED_DIST_TEST_OPS test_launch) list(APPEND MIXED_DIST_TEST_OPS test_c_comm_init_op) -list(APPEND MIXED_DIST_TEST_OPS test_launch_ps) list(APPEND MIXED_DIST_TEST_OPS test_communicator_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo) list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync) -list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ps) +list(APPEND MIXED_DIST_TEST_OPS test_launch_coverage) +list(APPEND MIXED_DIST_TEST_OPS test_fleetrun) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_run_random_port) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_async) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_fleet_checkpoint) list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer) @@ -494,14 +498,17 @@ if(WITH_DISTRIBUTE) endif() if(NOT APPLE) if(WITH_GPU) - # NOTE. test_launch only work in gpu collective mode - bash_test_modules(test_launch START_BASH test_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_c_comm_init_op START_BASH test_c_comm_init_op.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) py_test_modules(test_fleet_checkpoint MODULES test_fleet_checkpoint) + py_test_modules(test_launch_coverage MODULES test_launch_coverage) endif() - bash_test_modules(test_launch_ps START_BASH test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) - bash_test_modules(test_fleet_launch START_BASH test_fleet_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_launch_ps START_BASH test_fleet_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleetrun START_BASH test_fleetrun.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_run_random_port START_BASH test_fleet_run_random_port.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_launch_async START_BASH test_fleet_launch_async.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_launch_cloud START_BASH test_fleet_launch_cloud.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_fleet_launch_nproc START_BASH test_fleet_launch_nproc.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) # port range (20000, 23000) is reserved for dist-ops set(dist_ut_port 20001) @@ -624,9 +631,7 @@ if (WITH_DISTRIBUTE AND NOT APPLE) if(WITH_GPU) set_tests_properties(test_c_comm_init_op PROPERTIES TIMEOUT 120) set_tests_properties(test_fleet_checkpoint PROPERTIES TIMEOUT 120) - set_tests_properties(test_launch PROPERTIES TIMEOUT 120) endif() - set_tests_properties(test_fleet_launch PROPERTIES TIMEOUT 120) endif() # setting timeout value as 15S diff --git a/python/paddle/fluid/tests/unittests/detected_gpu.py b/python/paddle/fluid/tests/unittests/detected_gpu.py new file mode 100644 index 00000000000..8abd44aff71 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/detected_gpu.py @@ -0,0 +1,26 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import sys +import paddle.fluid as fluid + +print("compile with cuda:", fluid.core.is_compiled_with_cuda()) +print("get_cuda_device_count:", fluid.core.get_cuda_device_count()) + +if fluid.core.is_compiled_with_cuda() and fluid.core.get_cuda_device_count( +) > 0: + sys.exit(0) +else: + sys.exit(1) diff --git a/python/paddle/fluid/tests/unittests/nproc_process.py b/python/paddle/fluid/tests/unittests/nproc_process.py new file mode 100644 index 00000000000..c0e60eec458 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/nproc_process.py @@ -0,0 +1,38 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time + + +def train(prefix): + selected_gpus = os.getenv("FLAGS_selected_gpus") + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS") + current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + worker_endpoints = worker_endpoints_env + trainers_num = len(worker_endpoints.split(',')) + + name = "selected_gpus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\ + .format(selected_gpus, worker_endpoints, trainers_num, current_endpoint,trainer_id) + + print(name) + with open("{}.check_{}.log".format(prefix, trainer_id), "w") as f: + f.write(name) + + +if __name__ == '__main__': + prefix = sys.argv[1] + train(prefix) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh deleted file mode 100644 index 4cd8dc3d945..00000000000 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/bash -set -e - - -function test_launch_ps(){ - fleetrun --server_num=2 --worker_num=2 fleet_ps_training.py 2> ut.elog - if grep -q "server are killed" ut.elog; then - echo "test pserver launch succeed" - else - echo "test pserver launch failed" - exit -1 - fi - - fleetrun --servers="127.0.0.1:6780,127.0.0.1:6781" --workers="127.0.0.1:6782,127.0.0.1:6783" fleet_ps_training.py 2> ut.elog - if grep -q "server are killed" ut.elog; then - echo "test pserver launch succeed" - else - echo "test pserver launch failed" - exit -1 - fi - - fleetrun --servers="127.0.0.1:6780,127.0.0.1:6781" --workers="127.0.0.1,127.0.0.1" fleet_ps_training.py 2> ut.elog - if grep -q "server are killed" ut.elog; then - echo "test pserver launch succeed" - else - echo "test pserver launch failed" - exit -1 - fi -} - -function test_launch_ps_heter(){ - fleetrun --server_num=2 --worker_num=2 --heter_worker_num=2 fleet_ps_training.py 2> ut.elog - if grep -q "server are killed" ut.elog; then - echo "test heter pserver launch succeed" - else - echo "test pserver launch failed" - exit -1 - fi -} - -if [[ ${WITH_GPU} == "OFF" ]]; then - echo "in cpu test mode" - test_launch_ps - exit 0 -fi - -echo "No.1 unittest" -test_launch_ps -test_launch_ps_heter -# use default values -echo "No.2 unittest" -fleetrun multi_process.py fleetrun - -# use paddlecloud -echo "begin test use paddlecloud" -cluster_node_ips="127.0.0.1,127.0.0.2" -export PADDLE_TRAINERS_NUM=2 -export POD_IP=127.0.0.1 -export PADDLE_TRAINERS=127.0.0.1,127.0.0.2 -export PADDLE_TRAINER_ID=0 - -export PADDLE_PORT=35789 -export TRAINER_PORTS_NUM=2 - -echo "No.3 unittest" -distributed_args="--ips=${cluster_node_ips} --gpus=0,1 --log_dir=testlog" -CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun - -str1="selected_gpus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0" -str2="selected_gpus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1" -file_0="multi_process_fleetrun.check_0.log" -file_1="multi_process_fleetrun.check_1.log" - -echo "paddlecloud params test" -if grep -q "$str1" "$file_0"; then - echo "find trainer 0" -else - echo "not find trainer 0" - exit -1 -fi - -if grep -q "$str2" "$file_1"; then - echo "find trainer 1" -else - echo "not find trainer 1" - exit -1 -fi - -# test async poll process -if [ -f $file_0 ]; then - rm $file_0 -fi -if [ -f $file_1 ]; then - rm $file_1 -fi - -# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud -unset PADDLE_PORT -export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 - -echo "No.4 unittest" -echo "paddle.distributed.launch async poll process test" -if ! CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} multi_process.py fleetrun abort; then - echo "train abort as planned" -fi - -abort_str1="abort>>> selected_gpus:0 worker_endpoints:127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 trainers_num:4 current_endpoint:127.0.0.1:6170 trainer_id:0" - -if grep -q "$abort_str1" "$file_0"; then - echo "trainer 0 abort as planned" -else - echo "trainer 0 not abort as planned" - exit -1 -fi - -if [ ! -f $file_1 ]; then - echo "trainer 1 terminate as planned" -else - echo "trainer 1 not terminate as planned" - exit -1 -fi - -#test for random ports -file_0_0="test_launch_filelock_0_0.log" -file_1_0="test_launch_filelock_1_0.log" -rm -rf $file_0_0 $file_0_1 - -distributed_args="--gpus=0,1 --log_dir=testlog" -export PADDLE_LAUNCH_LOG="test_launch_filelock_0" -echo "No.5 unittest" -CUDA_VISIBLE_DEVICES=0,1 fleetrun ${distributed_args} find_ports.py -str_0="worker_endpoints:127.0.0.1:6070,127.0.0.1:6071" diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_async.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_async.sh new file mode 100644 index 00000000000..2c0fc0b0629 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_async.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud +unset PADDLE_PORT +export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 +export cluster_node_ips="127.0.0.1,127.0.0.2" +export PADDLE_TRAINERS_NUM=2 +export POD_IP=127.0.0.1 +export PADDLE_TRAINERS=127.0.0.1,127.0.0.2 +export PADDLE_TRAINER_ID=0 + +export TRAINER_PORTS_NUM=2 + +file_0="multi_process_fleetrun.check_0.log" +file_1="multi_process_fleetrun.check_1.log" + +distributed_args="--ips=${cluster_node_ips} --gpus=0,1 --log_dir=testlog" + +echo "paddle.distributed.fleet.launch async poll process test" +if ! CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} multi_process.py fleetrun abort; then + echo "train abort as planned" +fi + +abort_str1="abort>>> selected_gpus:0 worker_endpoints:127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 trainers_num:4 current_endpoint:127.0.0.1:6170 trainer_id:0" + +if grep -q "$abort_str1" "$file_0"; then + echo "trainer 0 abort as planned" +else + echo "trainer 0 not abort as planned" + exit -1 +fi + +if [ ! -f $file_1 ]; then + echo "trainer 1 terminate as planned" +else + echo "trainer 1 not terminate as planned" + exit -1 +fi diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_cloud.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_cloud.sh new file mode 100644 index 00000000000..68334208c39 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_cloud.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# use paddlecloud +echo "begin test use paddlecloud" +cluster_node_ips="127.0.0.1,127.0.0.2" +export PADDLE_TRAINERS_NUM=2 +export POD_IP=127.0.0.1 +export PADDLE_TRAINERS=127.0.0.1,127.0.0.2 +export PADDLE_TRAINER_ID=0 + +export PADDLE_PORT=35789 +export TRAINER_PORTS_NUM=2 + +distributed_args="--ips=${cluster_node_ips} --gpus=0,1 --log_dir=testlog" +CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} multi_process.py fleetrun + +str1="selected_gpus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0" +str2="selected_gpus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1" +file_0="multi_process_fleetrun.check_0.log" +file_1="multi_process_fleetrun.check_1.log" + +echo "paddlecloud params test" +if grep -q "$str1" "$file_0"; then + echo "find trainer 0" +else + echo "not find trainer 0" + exit -1 +fi + +if grep -q "$str2" "$file_1"; then + echo "find trainer 1" +else + echo "not find trainer 1" + exit -1 +fi + +# test async poll process +if [ -f $file_0 ]; then + rm $file_0 +fi +if [ -f $file_1 ]; then + rm $file_1 +fi diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh new file mode 100644 index 00000000000..14679c49eae --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_nproc.sh @@ -0,0 +1,116 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e +export FLAGS_START_PORT=35789 + +#local_ip=`ip route get 1 | awk '{print $NF;exit}'` +file_0="fleet_nproc_0.check_0.log" + +function test_nproc_0(){ + gpus=$1 + rm -f ${file_0} + distributed_args="--log_dir=testlog --nproc_per_node=1" + # nproc_per_node=1, each with 2 gpus + python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_0 + + str0="selected_gpus:${gpus} worker_endpoints:127.0.0.1:35789 trainers_num:1 current_endpoint:127.0.0.1:35789 trainer_id:0" + if grep -q "$str0" "$file_0"; then + echo "find trainer 0" + else + echo "not find trainer 0" + exit -1 + fi +} + +# unittest1:gpu +if python detected_gpu.py ; then + echo "begin ut 1:" + export CUDA_VISIBLE_DEVICES=0,1 + test_nproc_0 "0,1" +fi + +# unittest2:cpu +if ! python detected_gpu.py ; then + echo "begin ut 2:" + export CUDA_VISIBLE_DEVICES="" + test_nproc_0 "" +fi + + +function test_nproc_1_gpu(){ + file_0="fleet_nproc_1.check_0.log" + file_1="fleet_nproc_1.check_1.log" + rm -f ${file_0} ${file_1} + + distributed_args="--log_dir=testlog --nproc_per_node=2" + python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 + + str0="selected_gpus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" + if grep -q "$str0" "$file_0"; then + echo "find trainer 0" + else + echo "not find trainer 0" + exit -1 + fi + + str1="selected_gpus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35790 trainer_id:1" + if grep -q "$str1" "$file_1"; then + echo "find trainer 1" + else + echo "not find trainer 1" + exit -1 + fi +} + +# unittest3: nproc_per_node=2, each with 1 gpus +if python detected_gpu.py ; then + echo "begin ut 3:" + export CUDA_VISIBLE_DEVICES=0,1 + test_nproc_1_gpu +fi + +function test_nproc_1_cpu(){ + file_0="fleet_nproc_1.check_0.log" + file_1="fleet_nproc_1.check_1.log" + rm -f ${file_0} ${file_1} + + distributed_args="--log_dir=testlog --nproc_per_node=2" + python -m paddle.distributed.launch ${distributed_args} nproc_process.py fleet_nproc_1 + + str0="selected_gpus: worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0" + if grep -q "$str0" "$file_0"; then + echo "find trainer 0" + else + echo "not find trainer 0" + exit -1 + fi + + str1="selected_gpus: worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35790 trainer_id:1" + if grep -q "$str1" "$file_1"; then + echo "find trainer 1" + else + echo "not find trainer 1" + exit -1 + fi +} + +# unittest4: nproc_per_node=2, cpu +if ! python detected_gpu.py ; then + echo "begin ut 4:" + export CUDA_VISIBLE_DEVICES="" + test_nproc_1_cpu +fi diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch_ps.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch_ps.sh new file mode 100644 index 00000000000..892a2420377 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch_ps.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +function test_launch_ps(){ + python -m paddle.distributed.fleet.launch --server_num=2 --worker_num=2 fleet_ps_training.py 2> ut.elog + if grep -q "server are killed" ut.elog; then + echo "test pserver launch succeed" + else + echo "test pserver launch failed" + exit -1 + fi + + python -m paddle.distributed.fleet.launch --servers="127.0.0.1:6780,127.0.0.1:6781" --workers="127.0.0.1:6782,127.0.0.1:6783" fleet_ps_training.py 2> ut.elog + if grep -q "server are killed" ut.elog; then + echo "test pserver launch succeed" + else + echo "test pserver launch failed" + exit -1 + fi + + python -m paddle.distributed.fleet.launch --servers="127.0.0.1:6780,127.0.0.1:6781" --workers="127.0.0.1,127.0.0.1" fleet_ps_training.py 2> ut.elog + if grep -q "server are killed" ut.elog; then + echo "test pserver launch succeed" + else + echo "test pserver launch failed" + exit -1 + fi +} + +function test_launch_ps_heter(){ + python -m paddle.distributed.fleet.launch --server_num=2 --worker_num=2 --heter_worker_num=2 fleet_ps_training.py 2> ut.elog + if grep -q "server are killed" ut.elog; then + echo "test heter pserver launch succeed" + else + echo "test pserver launch failed" + exit -1 + fi +} + +if [[ ${WITH_GPU} == "OFF" ]]; then + echo "in cpu test mode" + test_launch_ps + exit 0 +fi + +test_launch_ps +test_launch_ps_heter diff --git a/python/paddle/fluid/tests/unittests/test_fleet_run_random_port.sh b/python/paddle/fluid/tests/unittests/test_fleet_run_random_port.sh new file mode 100644 index 00000000000..9ca48f2ab5b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_run_random_port.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +#test for random ports +file_0_0="test_launch_filelock_0_0.log" +file_1_0="test_launch_filelock_1_0.log" +rm -rf $file_0_0 $file_0_1 + +distributed_args="--gpus=0,1 --log_dir=testlog" +export PADDLE_LAUNCH_LOG="test_launch_filelock_0" +CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} find_ports.py +str_0="worker_endpoints:127.0.0.1:6070,127.0.0.1:6071" diff --git a/python/paddle/fluid/tests/unittests/test_fleetrun.sh b/python/paddle/fluid/tests/unittests/test_fleetrun.sh new file mode 100644 index 00000000000..710859727d2 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleetrun.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# use default values +fleetrun multi_process.py fleetrun diff --git a/python/paddle/fluid/tests/unittests/test_launch.sh b/python/paddle/fluid/tests/unittests/test_launch.sh deleted file mode 100644 index 958d7824662..00000000000 --- a/python/paddle/fluid/tests/unittests/test_launch.sh +++ /dev/null @@ -1,85 +0,0 @@ -#!/bin/bash -set -e -# use default values -# FIXME: random fails on Unknown command lines -c (or -m). -launch_py=${PADDLE_BINARY_DIR}/python/paddle/distributed/launch.py -python ${launch_py} multi_process.py launch - -# use paddlecloud -echo "begin test use paddlecloud" -cluster_node_ips="10.0.0.1" -node_ip="10.0.0.1" -export PADDLE_TRAINERS_NUM=2 -export POD_IP=127.0.0.1 -export PADDLE_TRAINERS=127.0.0.1,127.0.0.2 -export PADDLE_TRAINER_ID=0 - -export PADDLE_PORT=35019 -export TRAINER_PORTS_NUM=2 - -distributed_args="--use_paddlecloud --cluster_node_ips=${cluster_node_ips} --node_ip=${node_ip} --selected_gpus=0,1 --log_dir=testlog" -CUDA_VISIBLE_DEVICES=0,1 python ${launch_py} ${distributed_args} multi_process.py launch - -str1="selected_gpus:0 worker_endpoints:127.0.0.1:35019,127.0.0.1:35020,127.0.0.2:35019,127.0.0.2:35020 trainers_num:4 current_endpoint:127.0.0.1:35019 trainer_id:0" -str2="selected_gpus:1 worker_endpoints:127.0.0.1:35019,127.0.0.1:35020,127.0.0.2:35019,127.0.0.2:35020 trainers_num:4 current_endpoint:127.0.0.1:35020 trainer_id:1" -file_0="multi_process_launch.check_0.log" -file_1="multi_process_launch.check_1.log" - -echo "paddlecloud params test" -if grep -q "$str1" "$file_0"; then - echo "find trainer 0" -else - echo "not find trainer 0" - exit -1 -fi - -if grep -q "$str2" "$file_1"; then - echo "find trainer 1" -else - echo "not find trainer 1" - exit -1 -fi - -# test async poll process -if [ -f $file_0 ]; then - rm $file_0 -fi -if [ -f $file_1 ]; then - rm $file_1 -fi - -# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud -unset PADDLE_PORT -export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 - -echo "" -echo "paddle.distributed.launch async poll process test" -if ! CUDA_VISIBLE_DEVICES=0,1 python ${launch_py} ${distributed_args} multi_process.py launch abort; then - echo "train abort as planned" -fi - -abort_str1="abort>>> selected_gpus:0 worker_endpoints:127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 trainers_num:4 current_endpoint:127.0.0.1:6170 trainer_id:0" - -if grep -q "$abort_str1" "$file_0"; then - echo "trainer 0 abort as planned" -else - echo "trainer 0 not abort as planned" - exit -1 -fi - -if [ ! -f $file_1 ]; then - echo "trainer 1 terminate as planned" -else - echo "trainer 1 not terminate as planned" - exit -1 -fi - -#test for random ports -file_0_0="test_launch_filelock_0_0.log" -file_1_0="test_launch_filelock_1_0.log" -rm -rf $file_0_0 $file_0_1 - -distributed_args="--selected_gpus=0,1 --log_dir=testlog" -export PADDLE_LAUNCH_LOG="test_launch_filelock_0" -CUDA_VISIBLE_DEVICES=0,1 python ${launch_py} ${distributed_args} find_ports.py -str_0="worker_endpoints:127.0.0.1:6070,127.0.0.1:6071" diff --git a/python/paddle/fluid/tests/unittests/test_launch_coverage.py b/python/paddle/fluid/tests/unittests/test_launch_coverage.py new file mode 100644 index 00000000000..43613928585 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_launch_coverage.py @@ -0,0 +1,120 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import subprocess +import os +import time +import six +import copy +import unittest +import paddle.fluid as fluid + +from argparse import ArgumentParser, REMAINDER +from paddle.distributed.utils import _print_arguments, get_gpus, get_cluster_from_args + + +def _parse_args(): + parser = ArgumentParser( + description='''start paddle training using multi-process mode. +NOTE: your train program ***must*** run as distributed nccl2 mode, +see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- +And your train program must read environment variables below in order to let different +process init properly: +FLAGS_selected_gpus +PADDLE_TRAINER_ID +PADDLE_CURRENT_ENDPOINT +PADDLE_TRAINERS_NUM +PADDLE_TRAINER_ENDPOINTS +POD_IP (current node ip address, not needed for local training) +''') + + #Optional arguments for the launch helper + parser.add_argument( + "--cluster_node_ips", + type=str, + default="127.0.0.1", + help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") + parser.add_argument( + "--node_ip", + type=str, + default="127.0.0.1", + help="The current node ip. ") + parser.add_argument( + "--use_paddlecloud", + action='store_true', + help="wheter to use paddlecloud platform to run your multi-process job. If false, no need to set this argument." + ) + parser.add_argument( + "--started_port", + type=int, + default=None, + help="The trainer's started port on a single node") + + parser.add_argument( + "--print_config", + type=bool, + default=True, + help="Print the config or not") + + parser.add_argument( + "--selected_gpus", + type=str, + default=None, + help="It's for gpu training and the training process will run on the selected_gpus," + "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." + ) + + parser.add_argument( + "--log_level", + type=int, + default=20, # logging.INFO, details are here:https://docs.python.org/3/library/logging.html#levels + help="Logging level, default is logging.INFO") + + parser.add_argument( + "--log_dir", + type=str, + help="The path for each process's log.If it's not set, the log will printed to default pipe." + ) + + #positional + parser.add_argument( + "training_script", + type=str, + help="The full path to the single GPU training " + "program/script to be launched in parallel, " + "followed by all the arguments for the " + "training script") + + #rest from the training program + parser.add_argument('training_script_args', nargs=REMAINDER) + return parser.parse_args() + + +class TestCoverage(unittest.TestCase): + def test_gpus(self): + args = _parse_args() + + if args.print_config: + _print_arguments(args) + + gpus = get_gpus(None) + + args.use_paddlecloud = True + cluster, pod = get_cluster_from_args(args, "0") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_launch_ps.sh b/python/paddle/fluid/tests/unittests/test_launch_ps.sh deleted file mode 100644 index 78452b5fe37..00000000000 --- a/python/paddle/fluid/tests/unittests/test_launch_ps.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -set -e -# use default values -launch_py=${PADDLE_BINARY_DIR}/python/paddle/distributed/launch_ps.py -python ${launch_py} fleet_ps_training.py 2> ut.elog - -if grep -q "server are killed" ut.elog; then - echo "succeed" -else - echo "failed" - exit -1 -fi -- GitLab