未验证 提交 8020e34e 编写于 作者: C Chen Weihang 提交者: GitHub

Simplify the options of spawn based on fleetrun (#30144)

* Simplify the options of spawn based on fleetrun

* polish details

* polish doc details
上级 4763e6bc
......@@ -21,8 +21,9 @@ import six
import sys
import warnings
from paddle.distributed.utils import _print_arguments, _prepare_trainer_env
from paddle.distributed.utils import _print_arguments, _prepare_trainer_env, get_host_name_ip
from paddle.distributed.cloud_utils import get_cluster_and_pod
from paddle.distributed.fleet.cloud_utils import use_paddlecloud
from paddle.device import get_device
# deprecated module import
......@@ -65,17 +66,35 @@ def _py_supported_check():
def _options_valid_check(options):
supported_options = [
'start_method', 'cluster_node_ips', 'node_ip', 'started_port',
'selected_gpus', 'print_config', 'use_paddlecloud'
# `print_config` keeped as a debug options, not show to users
supported_options = ['start_method', 'ips', 'gpus', 'print_config']
deprecated_options = [
'selected_gpus', 'started_port', 'cluster_node_ips', 'node_ip',
'use_paddlecloud'
]
for key in options:
if key not in supported_options:
if key in deprecated_options:
warnings.warn(
"The config option (%s) of `paddle.distributed.spawn` is deprecated. "
"Please use the latest config options stated in the `spawn` API documentation."
% key, DeprecationWarning)
else:
raise ValueError(
"The config option (%s) of `paddle.distributed.spawn` is not supported."
% key)
def _get_node_ip(ips):
node_ip = None
node_ips = [x.strip() for x in ips.split(',')]
if len(node_ips) == 1:
node_ip = node_ips[0]
else:
_, node_ip = get_host_name_ip()
return node_ip
def _get_subprocess_env_list(nprocs, options):
# contruct processes env list
processes_env_list = []
......@@ -83,18 +102,14 @@ def _get_subprocess_env_list(nprocs, options):
# get args from kwargs
args = ParallelEnvArgs()
# set default `node_ip` and `cluster_node_ips`
# deal with `ips`
args.cluster_node_ips = options.get('ips', None)
if args.cluster_node_ips is None:
args.cluster_node_ips = options.get('cluster_node_ips', None)
args.node_ip = options.get('node_ip', None)
if args.cluster_node_ips is not None and args.node_ip is None:
raise ValueError("please input current node ip, "
"cannot only give `cluster_node_ips`.")
default_node_ip = "127.0.0.1"
if args.node_ip is None:
args.node_ip = default_node_ip
if args.cluster_node_ips is None:
args.cluster_node_ips = default_node_ip
args.cluster_node_ips = "127.0.0.1"
# deal with `gpus`
# set default selected gpus
# e.g. if the nprocs is 4, the selected gpus is "0,1,2,3"
# NOTE(chenweihang): [ why not use FLAGS_selected_gpus directly? ]
......@@ -102,6 +117,8 @@ def _get_subprocess_env_list(nprocs, options):
# if we set FLAGS_selected_gpus to be `0,1,2,3`, it may cause error
# when using `ParallelEnv`
# NOTE(chenweihang): use absolute gpu card id
args.selected_gpus = options.get('gpus', None)
if args.selected_gpus is None:
args.selected_gpus = options.get('selected_gpus', None)
env_devices = os.getenv("CUDA_VISIBLE_DEVICES", None)
if env_devices is None or env_devices == "":
......@@ -121,24 +138,39 @@ def _get_subprocess_env_list(nprocs, options):
args.selected_gpus = ",".join(
[str(env_devices_list[x]) for x in range(0, nprocs)])
else:
for card_id in args.selected_gpus.split(','):
selected_gpu_list = args.selected_gpus.split(',')
if len(selected_gpu_list) != nprocs:
raise ValueError(
"The number of selected gpus(%s) is not equal to "
"the number of spawn processes(%d), please ensure that the "
"correct `nprocs` and `gpus` arguments are passed." %
(len(selected_gpu_list), nprocs))
for card_id in selected_gpu_list:
if card_id not in env_devices_list:
raise ValueError("The selected gpu card %s cannot found in "
"CUDA_VISIBLE_DEVICES (%s)." %
(card_id, ",".join(env_devices_list)))
# set other arguments
# set other inner args
args.node_ip = options.get('node_ip', None)
if args.node_ip is None:
args.node_ip = _get_node_ip(args.cluster_node_ips)
args.started_port = options.get('started_port', None)
args.use_paddlecloud = options.get('use_paddlecloud', False)
args.print_config = options.get('print_config', False)
args.use_paddlecloud = options.get('use_paddlecloud', None)
if args.use_paddlecloud is None:
args.use_paddlecloud = use_paddlecloud()
# get cluster and pod config
cluster, pod = get_cluster_and_pod(args)
# prepare subprocess env list
for trainer in pod.trainers:
processes_env_list.append(_prepare_trainer_env(cluster, trainer))
# print config
# [Debug] print config
args.print_config = options.get('print_config', False)
if args.print_config:
_print_arguments(args)
......@@ -245,6 +277,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
"""
Start multiple processes with ``spawn`` method for parallel training.
.. note::
``spawn`` now only supports GPU collective mode.
Args:
func (function): The target function is called by spawned process.
This function need to be able to pickled, so it must be defined
......@@ -269,17 +304,10 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
Because the CUDA runtime does not support the ``fork`` start method,
when use CUDA in subprocesses, we should start process by ``spawn``
or ``forkserver`` method. Default: "spawn" ;
(2) cluster_node_ips (string): Paddle cluster nodes ips, such as
"192.168.0.16,192.168.0.17". Default: "127.0.0.1";
(3) node_ip (string): The current node ip, such as "192.168.0.16".
Default: "127.0.0.1";
(4) started_port (int): The trainer's started port on a single node,
such as 6170. Default: None;
(5) selected_gpus (string): The training process will run on the
selected_gpus, such as "0,1,2,3". Default: None;
(6) print_config (bool): Print current parallel training config. Default: False;
(7) use_paddlecloud (bool): Whether to use paddlecloud platform to run your
multi-process job. Default: False.
(2) gpus (string): The training process will run on the
selected gpus, such as "0,1,2,3". Default: None;
(3) ips (string): Paddle cluster nodes ips, such as
"192.168.0.16,192.168.0.17". Default: "127.0.0.1" .
Returns:
``MultiprocessContext`` object, it hold the spawned processes.
......@@ -351,16 +379,16 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
if __name__ == '__main__':
dist.spawn(train, args=(True,), nprocs=2)
# Usage 4: pass function, arguments, nprocs and selected_gpus.
# Usage 4: pass function, arguments, nprocs and gpus.
# If your training method need some arguments, and
# only use part of visible devices for parallel training,
# but you can't set your machine's environment variable
# CUDA_VISIBLE_DEVICES, such as it is None or all cards
# {0,1,2,3,4,5,6,7}, you can pass `selected_gpus` to
# {0,1,2,3,4,5,6,7}, you can pass `gpus` to
# select the GPU cards you want to use. For example,
# this case will use cards {4,5} if your machine hold 8 cards.
if __name__ == '__main__':
dist.spawn(train, args=(True,), nprocs=2, selected_gpus='4,5')
dist.spawn(train, args=(True,), nprocs=2, gpus='4,5')
"""
# NOTE(chenweihang): [ why only supports python3.4+ ? ]
# Python supported setting the child process startup method
......
......@@ -20,7 +20,7 @@ import unittest
import paddle
import paddle.distributed as dist
from paddle.distributed.spawn import _get_subprocess_env_list
from paddle.distributed.spawn import _get_subprocess_env_list, _options_valid_check
from paddle.fluid import core
from paddle.fluid.dygraph import parallel_helper
......@@ -55,12 +55,6 @@ class TestInitParallelEnv(unittest.TestCase):
@unittest.skipIf(not core.is_compiled_with_cuda(),
"core is not compiled with CUDA")
class TestSpawnAssistMethod(unittest.TestCase):
def test_only_cluster_node_ips_error(self):
with self.assertRaises(ValueError):
options = dict()
options['cluster_node_ips'] = "127.0.0.1,127.0.0.2"
_get_subprocess_env_list(nprocs=1, options=options)
def test_nprocs_greater_than_device_num_error(self):
with self.assertRaises(RuntimeError):
_get_subprocess_env_list(nprocs=100, options=dict())
......@@ -72,10 +66,27 @@ class TestSpawnAssistMethod(unittest.TestCase):
_get_subprocess_env_list(nprocs=2, options=options)
def test_get_correct_env(self):
env_dict = _get_subprocess_env_list(nprocs=1, options=dict())[0]
options = dict()
options['print_config'] = True
env_dict = _get_subprocess_env_list(nprocs=1, options=options)[0]
self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0')
self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1')
def test_nprocs_not_equal_to_selected_gpus(self):
with self.assertRaises(ValueError):
options = dict()
options['selected_gpus'] = "100,101,102"
_get_subprocess_env_list(nprocs=2, options=options)
def test_options_valid_check(self):
options = dict()
options['selected_gpus'] = "100,101,102"
_options_valid_check(options)
with self.assertRaises(ValueError):
options['error'] = "error"
_options_valid_check(options)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册