From 39204d56e62e46023ce014dcfde4d2455a96f287 Mon Sep 17 00:00:00 2001 From: Chen Weihang Date: Thu, 7 Jan 2021 20:25:02 -0600 Subject: [PATCH] [Cherry-pick] Simplify the options of spawn based on fleetrun (#30144) (#30197) * Simplify the options of spawn based on fleetrun (#30144) * Simplify the options of spawn based on fleetrun * polish details * polish doc details * cleanup enum test=develop (#29294) Co-authored-by: gongweibao --- .../paddle/distributed/fleet/cloud_utils.py | 2 +- .../paddle/distributed/fleet/launch_utils.py | 5 +- python/paddle/distributed/spawn.py | 102 +++++++++++------- .../test_spawn_and_init_parallel_env.py | 27 +++-- 4 files changed, 87 insertions(+), 49 deletions(-) diff --git a/python/paddle/distributed/fleet/cloud_utils.py b/python/paddle/distributed/fleet/cloud_utils.py index e05196f631..f5a24cf48c 100644 --- a/python/paddle/distributed/fleet/cloud_utils.py +++ b/python/paddle/distributed/fleet/cloud_utils.py @@ -22,7 +22,7 @@ def get_cloud_cluster(args_node_ips, devices_per_proc, args_port=6170): """ - args_node_ips:string, device_mode:DeviceMode(IntEnum), device_per_proc:list, args_port: int + args_node_ips:string, device_mode:DeviceMode(Int), device_per_proc:list, args_port: int """ #you can automatically get ip info while using paddlecloud multi nodes mode. node_ips = os.getenv("PADDLE_TRAINERS") diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 5d56f77ca1..625e8a476b 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -27,7 +27,6 @@ from contextlib import closing import socket import warnings import six -from enum import IntEnum import paddle import paddle.fluid as fluid @@ -35,7 +34,7 @@ logger = logging.getLogger("root") logger.propagate = False -class DistributeMode(IntEnum): +class DistributeMode(): """ There are various mode for fleetrun, each of them is designed for different model. """ @@ -44,7 +43,7 @@ class DistributeMode(IntEnum): PS_HETER = 2 -class DeviceMode(IntEnum): +class DeviceMode(): """ Training devices type """ diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index 86ec18061c..911fed416c 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -21,8 +21,9 @@ import six import sys import warnings -from paddle.distributed.utils import _print_arguments, _prepare_trainer_env +from paddle.distributed.utils import _print_arguments, _prepare_trainer_env, get_host_name_ip from paddle.distributed.cloud_utils import get_cluster_and_pod +from paddle.distributed.fleet.cloud_utils import use_paddlecloud from paddle.device import get_device # deprecated module import @@ -65,15 +66,33 @@ def _py_supported_check(): def _options_valid_check(options): - supported_options = [ - 'start_method', 'cluster_node_ips', 'node_ip', 'started_port', - 'selected_gpus', 'print_config', 'use_paddlecloud' + # `print_config` keeped as a debug options, not show to users + supported_options = ['start_method', 'ips', 'gpus', 'print_config'] + deprecated_options = [ + 'selected_gpus', 'started_port', 'cluster_node_ips', 'node_ip', + 'use_paddlecloud' ] for key in options: if key not in supported_options: - raise ValueError( - "The config option (%s) of `paddle.distributed.spawn` is not supported." - % key) + if key in deprecated_options: + warnings.warn( + "The config option (%s) of `paddle.distributed.spawn` is deprecated. " + "Please use the latest config options stated in the `spawn` API documentation." + % key, DeprecationWarning) + else: + raise ValueError( + "The config option (%s) of `paddle.distributed.spawn` is not supported." + % key) + + +def _get_node_ip(ips): + node_ip = None + node_ips = [x.strip() for x in ips.split(',')] + if len(node_ips) == 1: + node_ip = node_ips[0] + else: + _, node_ip = get_host_name_ip() + return node_ip def _get_subprocess_env_list(nprocs, options): @@ -83,18 +102,14 @@ def _get_subprocess_env_list(nprocs, options): # get args from kwargs args = ParallelEnvArgs() - # set default `node_ip` and `cluster_node_ips` - args.cluster_node_ips = options.get('cluster_node_ips', None) - args.node_ip = options.get('node_ip', None) - if args.cluster_node_ips is not None and args.node_ip is None: - raise ValueError("please input current node ip, " - "cannot only give `cluster_node_ips`.") - default_node_ip = "127.0.0.1" - if args.node_ip is None: - args.node_ip = default_node_ip + # deal with `ips` + args.cluster_node_ips = options.get('ips', None) if args.cluster_node_ips is None: - args.cluster_node_ips = default_node_ip + args.cluster_node_ips = options.get('cluster_node_ips', None) + if args.cluster_node_ips is None: + args.cluster_node_ips = "127.0.0.1" + # deal with `gpus` # set default selected gpus # e.g. if the nprocs is 4, the selected gpus is "0,1,2,3" # NOTE(chenweihang): [ why not use FLAGS_selected_gpus directly? ] @@ -102,7 +117,9 @@ def _get_subprocess_env_list(nprocs, options): # if we set FLAGS_selected_gpus to be `0,1,2,3`, it may cause error # when using `ParallelEnv` # NOTE(chenweihang): use absolute gpu card id - args.selected_gpus = options.get('selected_gpus', None) + args.selected_gpus = options.get('gpus', None) + if args.selected_gpus is None: + args.selected_gpus = options.get('selected_gpus', None) env_devices = os.getenv("CUDA_VISIBLE_DEVICES", None) if env_devices is None or env_devices == "": env_devices_list = [ @@ -121,24 +138,39 @@ def _get_subprocess_env_list(nprocs, options): args.selected_gpus = ",".join( [str(env_devices_list[x]) for x in range(0, nprocs)]) else: - for card_id in args.selected_gpus.split(','): + selected_gpu_list = args.selected_gpus.split(',') + if len(selected_gpu_list) != nprocs: + raise ValueError( + "The number of selected gpus(%s) is not equal to " + "the number of spawn processes(%d), please ensure that the " + "correct `nprocs` and `gpus` arguments are passed." % + (len(selected_gpu_list), nprocs)) + for card_id in selected_gpu_list: if card_id not in env_devices_list: raise ValueError("The selected gpu card %s cannot found in " "CUDA_VISIBLE_DEVICES (%s)." % (card_id, ",".join(env_devices_list))) - # set other arguments + # set other inner args + args.node_ip = options.get('node_ip', None) + if args.node_ip is None: + args.node_ip = _get_node_ip(args.cluster_node_ips) + args.started_port = options.get('started_port', None) - args.use_paddlecloud = options.get('use_paddlecloud', False) - args.print_config = options.get('print_config', False) + args.use_paddlecloud = options.get('use_paddlecloud', None) + if args.use_paddlecloud is None: + args.use_paddlecloud = use_paddlecloud() + + # get cluster and pod config cluster, pod = get_cluster_and_pod(args) # prepare subprocess env list for trainer in pod.trainers: processes_env_list.append(_prepare_trainer_env(cluster, trainer)) - # print config + # [Debug] print config + args.print_config = options.get('print_config', False) if args.print_config: _print_arguments(args) @@ -245,6 +277,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): """ Start multiple processes with ``spawn`` method for parallel training. + .. note:: + ``spawn`` now only supports GPU collective mode. + Args: func (function): The target function is called by spawned process. This function need to be able to pickled, so it must be defined @@ -269,17 +304,10 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): Because the CUDA runtime does not support the ``fork`` start method, when use CUDA in subprocesses, we should start process by ``spawn`` or ``forkserver`` method. Default: "spawn" ; - (2) cluster_node_ips (string): Paddle cluster nodes ips, such as - "192.168.0.16,192.168.0.17". Default: "127.0.0.1"; - (3) node_ip (string): The current node ip, such as "192.168.0.16". - Default: "127.0.0.1"; - (4) started_port (int): The trainer's started port on a single node, - such as 6170. Default: None; - (5) selected_gpus (string): The training process will run on the - selected_gpus, such as "0,1,2,3". Default: None; - (6) print_config (bool): Print current parallel training config. Default: False; - (7) use_paddlecloud (bool): Whether to use paddlecloud platform to run your - multi-process job. Default: False. + (2) gpus (string): The training process will run on the + selected gpus, such as "0,1,2,3". Default: None; + (3) ips (string): Paddle cluster nodes ips, such as + "192.168.0.16,192.168.0.17". Default: "127.0.0.1" . Returns: ``MultiprocessContext`` object, it hold the spawned processes. @@ -351,16 +379,16 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): if __name__ == '__main__': dist.spawn(train, args=(True,), nprocs=2) - # Usage 4: pass function, arguments, nprocs and selected_gpus. + # Usage 4: pass function, arguments, nprocs and gpus. # If your training method need some arguments, and # only use part of visible devices for parallel training, # but you can't set your machine's environment variable # CUDA_VISIBLE_DEVICES, such as it is None or all cards - # {0,1,2,3,4,5,6,7}, you can pass `selected_gpus` to + # {0,1,2,3,4,5,6,7}, you can pass `gpus` to # select the GPU cards you want to use. For example, # this case will use cards {4,5} if your machine hold 8 cards. if __name__ == '__main__': - dist.spawn(train, args=(True,), nprocs=2, selected_gpus='4,5') + dist.spawn(train, args=(True,), nprocs=2, gpus='4,5') """ # NOTE(chenweihang): [ why only supports python3.4+ ? ] # Python supported setting the child process startup method diff --git a/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py b/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py index b6336379ba..53efa186d1 100644 --- a/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py +++ b/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py @@ -20,7 +20,7 @@ import unittest import paddle import paddle.distributed as dist -from paddle.distributed.spawn import _get_subprocess_env_list +from paddle.distributed.spawn import _get_subprocess_env_list, _options_valid_check from paddle.fluid import core from paddle.fluid.dygraph import parallel_helper @@ -55,12 +55,6 @@ class TestInitParallelEnv(unittest.TestCase): @unittest.skipIf(not core.is_compiled_with_cuda(), "core is not compiled with CUDA") class TestSpawnAssistMethod(unittest.TestCase): - def test_only_cluster_node_ips_error(self): - with self.assertRaises(ValueError): - options = dict() - options['cluster_node_ips'] = "127.0.0.1,127.0.0.2" - _get_subprocess_env_list(nprocs=1, options=options) - def test_nprocs_greater_than_device_num_error(self): with self.assertRaises(RuntimeError): _get_subprocess_env_list(nprocs=100, options=dict()) @@ -72,10 +66,27 @@ class TestSpawnAssistMethod(unittest.TestCase): _get_subprocess_env_list(nprocs=2, options=options) def test_get_correct_env(self): - env_dict = _get_subprocess_env_list(nprocs=1, options=dict())[0] + options = dict() + options['print_config'] = True + env_dict = _get_subprocess_env_list(nprocs=1, options=options)[0] self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0') self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1') + def test_nprocs_not_equal_to_selected_gpus(self): + with self.assertRaises(ValueError): + options = dict() + options['selected_gpus'] = "100,101,102" + _get_subprocess_env_list(nprocs=2, options=options) + + def test_options_valid_check(self): + options = dict() + options['selected_gpus'] = "100,101,102" + _options_valid_check(options) + + with self.assertRaises(ValueError): + options['error'] = "error" + _options_valid_check(options) + if __name__ == "__main__": unittest.main() -- GitLab