未验证 提交 39204d56 编写于 作者: C Chen Weihang 提交者: GitHub

[Cherry-pick] Simplify the options of spawn based on fleetrun (#30144) (#30197)

* Simplify the options of spawn based on fleetrun (#30144)

* Simplify the options of spawn based on fleetrun

* polish details

* polish doc details

* cleanup enum test=develop (#29294)
Co-authored-by: Ngongweibao <weibao.gong@gmail.com>
上级 bfb6f613
...@@ -22,7 +22,7 @@ def get_cloud_cluster(args_node_ips, ...@@ -22,7 +22,7 @@ def get_cloud_cluster(args_node_ips,
devices_per_proc, devices_per_proc,
args_port=6170): args_port=6170):
""" """
args_node_ips:string, device_mode:DeviceMode(IntEnum), device_per_proc:list, args_port: int args_node_ips:string, device_mode:DeviceMode(Int), device_per_proc:list, args_port: int
""" """
#you can automatically get ip info while using paddlecloud multi nodes mode. #you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips = os.getenv("PADDLE_TRAINERS") node_ips = os.getenv("PADDLE_TRAINERS")
......
...@@ -27,7 +27,6 @@ from contextlib import closing ...@@ -27,7 +27,6 @@ from contextlib import closing
import socket import socket
import warnings import warnings
import six import six
from enum import IntEnum
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -35,7 +34,7 @@ logger = logging.getLogger("root") ...@@ -35,7 +34,7 @@ logger = logging.getLogger("root")
logger.propagate = False logger.propagate = False
class DistributeMode(IntEnum): class DistributeMode():
""" """
There are various mode for fleetrun, each of them is designed for different model. There are various mode for fleetrun, each of them is designed for different model.
""" """
...@@ -44,7 +43,7 @@ class DistributeMode(IntEnum): ...@@ -44,7 +43,7 @@ class DistributeMode(IntEnum):
PS_HETER = 2 PS_HETER = 2
class DeviceMode(IntEnum): class DeviceMode():
""" """
Training devices type Training devices type
""" """
......
...@@ -21,8 +21,9 @@ import six ...@@ -21,8 +21,9 @@ import six
import sys import sys
import warnings import warnings
from paddle.distributed.utils import _print_arguments, _prepare_trainer_env from paddle.distributed.utils import _print_arguments, _prepare_trainer_env, get_host_name_ip
from paddle.distributed.cloud_utils import get_cluster_and_pod from paddle.distributed.cloud_utils import get_cluster_and_pod
from paddle.distributed.fleet.cloud_utils import use_paddlecloud
from paddle.device import get_device from paddle.device import get_device
# deprecated module import # deprecated module import
...@@ -65,15 +66,33 @@ def _py_supported_check(): ...@@ -65,15 +66,33 @@ def _py_supported_check():
def _options_valid_check(options): def _options_valid_check(options):
supported_options = [ # `print_config` keeped as a debug options, not show to users
'start_method', 'cluster_node_ips', 'node_ip', 'started_port', supported_options = ['start_method', 'ips', 'gpus', 'print_config']
'selected_gpus', 'print_config', 'use_paddlecloud' deprecated_options = [
'selected_gpus', 'started_port', 'cluster_node_ips', 'node_ip',
'use_paddlecloud'
] ]
for key in options: for key in options:
if key not in supported_options: if key not in supported_options:
raise ValueError( if key in deprecated_options:
"The config option (%s) of `paddle.distributed.spawn` is not supported." warnings.warn(
% key) "The config option (%s) of `paddle.distributed.spawn` is deprecated. "
"Please use the latest config options stated in the `spawn` API documentation."
% key, DeprecationWarning)
else:
raise ValueError(
"The config option (%s) of `paddle.distributed.spawn` is not supported."
% key)
def _get_node_ip(ips):
node_ip = None
node_ips = [x.strip() for x in ips.split(',')]
if len(node_ips) == 1:
node_ip = node_ips[0]
else:
_, node_ip = get_host_name_ip()
return node_ip
def _get_subprocess_env_list(nprocs, options): def _get_subprocess_env_list(nprocs, options):
...@@ -83,18 +102,14 @@ def _get_subprocess_env_list(nprocs, options): ...@@ -83,18 +102,14 @@ def _get_subprocess_env_list(nprocs, options):
# get args from kwargs # get args from kwargs
args = ParallelEnvArgs() args = ParallelEnvArgs()
# set default `node_ip` and `cluster_node_ips` # deal with `ips`
args.cluster_node_ips = options.get('cluster_node_ips', None) args.cluster_node_ips = options.get('ips', None)
args.node_ip = options.get('node_ip', None)
if args.cluster_node_ips is not None and args.node_ip is None:
raise ValueError("please input current node ip, "
"cannot only give `cluster_node_ips`.")
default_node_ip = "127.0.0.1"
if args.node_ip is None:
args.node_ip = default_node_ip
if args.cluster_node_ips is None: if args.cluster_node_ips is None:
args.cluster_node_ips = default_node_ip args.cluster_node_ips = options.get('cluster_node_ips', None)
if args.cluster_node_ips is None:
args.cluster_node_ips = "127.0.0.1"
# deal with `gpus`
# set default selected gpus # set default selected gpus
# e.g. if the nprocs is 4, the selected gpus is "0,1,2,3" # e.g. if the nprocs is 4, the selected gpus is "0,1,2,3"
# NOTE(chenweihang): [ why not use FLAGS_selected_gpus directly? ] # NOTE(chenweihang): [ why not use FLAGS_selected_gpus directly? ]
...@@ -102,7 +117,9 @@ def _get_subprocess_env_list(nprocs, options): ...@@ -102,7 +117,9 @@ def _get_subprocess_env_list(nprocs, options):
# if we set FLAGS_selected_gpus to be `0,1,2,3`, it may cause error # if we set FLAGS_selected_gpus to be `0,1,2,3`, it may cause error
# when using `ParallelEnv` # when using `ParallelEnv`
# NOTE(chenweihang): use absolute gpu card id # NOTE(chenweihang): use absolute gpu card id
args.selected_gpus = options.get('selected_gpus', None) args.selected_gpus = options.get('gpus', None)
if args.selected_gpus is None:
args.selected_gpus = options.get('selected_gpus', None)
env_devices = os.getenv("CUDA_VISIBLE_DEVICES", None) env_devices = os.getenv("CUDA_VISIBLE_DEVICES", None)
if env_devices is None or env_devices == "": if env_devices is None or env_devices == "":
env_devices_list = [ env_devices_list = [
...@@ -121,24 +138,39 @@ def _get_subprocess_env_list(nprocs, options): ...@@ -121,24 +138,39 @@ def _get_subprocess_env_list(nprocs, options):
args.selected_gpus = ",".join( args.selected_gpus = ",".join(
[str(env_devices_list[x]) for x in range(0, nprocs)]) [str(env_devices_list[x]) for x in range(0, nprocs)])
else: else:
for card_id in args.selected_gpus.split(','): selected_gpu_list = args.selected_gpus.split(',')
if len(selected_gpu_list) != nprocs:
raise ValueError(
"The number of selected gpus(%s) is not equal to "
"the number of spawn processes(%d), please ensure that the "
"correct `nprocs` and `gpus` arguments are passed." %
(len(selected_gpu_list), nprocs))
for card_id in selected_gpu_list:
if card_id not in env_devices_list: if card_id not in env_devices_list:
raise ValueError("The selected gpu card %s cannot found in " raise ValueError("The selected gpu card %s cannot found in "
"CUDA_VISIBLE_DEVICES (%s)." % "CUDA_VISIBLE_DEVICES (%s)." %
(card_id, ",".join(env_devices_list))) (card_id, ",".join(env_devices_list)))
# set other arguments # set other inner args
args.node_ip = options.get('node_ip', None)
if args.node_ip is None:
args.node_ip = _get_node_ip(args.cluster_node_ips)
args.started_port = options.get('started_port', None) args.started_port = options.get('started_port', None)
args.use_paddlecloud = options.get('use_paddlecloud', False)
args.print_config = options.get('print_config', False)
args.use_paddlecloud = options.get('use_paddlecloud', None)
if args.use_paddlecloud is None:
args.use_paddlecloud = use_paddlecloud()
# get cluster and pod config
cluster, pod = get_cluster_and_pod(args) cluster, pod = get_cluster_and_pod(args)
# prepare subprocess env list # prepare subprocess env list
for trainer in pod.trainers: for trainer in pod.trainers:
processes_env_list.append(_prepare_trainer_env(cluster, trainer)) processes_env_list.append(_prepare_trainer_env(cluster, trainer))
# print config # [Debug] print config
args.print_config = options.get('print_config', False)
if args.print_config: if args.print_config:
_print_arguments(args) _print_arguments(args)
...@@ -245,6 +277,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): ...@@ -245,6 +277,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
""" """
Start multiple processes with ``spawn`` method for parallel training. Start multiple processes with ``spawn`` method for parallel training.
.. note::
``spawn`` now only supports GPU collective mode.
Args: Args:
func (function): The target function is called by spawned process. func (function): The target function is called by spawned process.
This function need to be able to pickled, so it must be defined This function need to be able to pickled, so it must be defined
...@@ -269,17 +304,10 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): ...@@ -269,17 +304,10 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
Because the CUDA runtime does not support the ``fork`` start method, Because the CUDA runtime does not support the ``fork`` start method,
when use CUDA in subprocesses, we should start process by ``spawn`` when use CUDA in subprocesses, we should start process by ``spawn``
or ``forkserver`` method. Default: "spawn" ; or ``forkserver`` method. Default: "spawn" ;
(2) cluster_node_ips (string): Paddle cluster nodes ips, such as (2) gpus (string): The training process will run on the
"192.168.0.16,192.168.0.17". Default: "127.0.0.1"; selected gpus, such as "0,1,2,3". Default: None;
(3) node_ip (string): The current node ip, such as "192.168.0.16". (3) ips (string): Paddle cluster nodes ips, such as
Default: "127.0.0.1"; "192.168.0.16,192.168.0.17". Default: "127.0.0.1" .
(4) started_port (int): The trainer's started port on a single node,
such as 6170. Default: None;
(5) selected_gpus (string): The training process will run on the
selected_gpus, such as "0,1,2,3". Default: None;
(6) print_config (bool): Print current parallel training config. Default: False;
(7) use_paddlecloud (bool): Whether to use paddlecloud platform to run your
multi-process job. Default: False.
Returns: Returns:
``MultiprocessContext`` object, it hold the spawned processes. ``MultiprocessContext`` object, it hold the spawned processes.
...@@ -351,16 +379,16 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): ...@@ -351,16 +379,16 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
if __name__ == '__main__': if __name__ == '__main__':
dist.spawn(train, args=(True,), nprocs=2) dist.spawn(train, args=(True,), nprocs=2)
# Usage 4: pass function, arguments, nprocs and selected_gpus. # Usage 4: pass function, arguments, nprocs and gpus.
# If your training method need some arguments, and # If your training method need some arguments, and
# only use part of visible devices for parallel training, # only use part of visible devices for parallel training,
# but you can't set your machine's environment variable # but you can't set your machine's environment variable
# CUDA_VISIBLE_DEVICES, such as it is None or all cards # CUDA_VISIBLE_DEVICES, such as it is None or all cards
# {0,1,2,3,4,5,6,7}, you can pass `selected_gpus` to # {0,1,2,3,4,5,6,7}, you can pass `gpus` to
# select the GPU cards you want to use. For example, # select the GPU cards you want to use. For example,
# this case will use cards {4,5} if your machine hold 8 cards. # this case will use cards {4,5} if your machine hold 8 cards.
if __name__ == '__main__': if __name__ == '__main__':
dist.spawn(train, args=(True,), nprocs=2, selected_gpus='4,5') dist.spawn(train, args=(True,), nprocs=2, gpus='4,5')
""" """
# NOTE(chenweihang): [ why only supports python3.4+ ? ] # NOTE(chenweihang): [ why only supports python3.4+ ? ]
# Python supported setting the child process startup method # Python supported setting the child process startup method
......
...@@ -20,7 +20,7 @@ import unittest ...@@ -20,7 +20,7 @@ import unittest
import paddle import paddle
import paddle.distributed as dist import paddle.distributed as dist
from paddle.distributed.spawn import _get_subprocess_env_list from paddle.distributed.spawn import _get_subprocess_env_list, _options_valid_check
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid.dygraph import parallel_helper from paddle.fluid.dygraph import parallel_helper
...@@ -55,12 +55,6 @@ class TestInitParallelEnv(unittest.TestCase): ...@@ -55,12 +55,6 @@ class TestInitParallelEnv(unittest.TestCase):
@unittest.skipIf(not core.is_compiled_with_cuda(), @unittest.skipIf(not core.is_compiled_with_cuda(),
"core is not compiled with CUDA") "core is not compiled with CUDA")
class TestSpawnAssistMethod(unittest.TestCase): class TestSpawnAssistMethod(unittest.TestCase):
def test_only_cluster_node_ips_error(self):
with self.assertRaises(ValueError):
options = dict()
options['cluster_node_ips'] = "127.0.0.1,127.0.0.2"
_get_subprocess_env_list(nprocs=1, options=options)
def test_nprocs_greater_than_device_num_error(self): def test_nprocs_greater_than_device_num_error(self):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
_get_subprocess_env_list(nprocs=100, options=dict()) _get_subprocess_env_list(nprocs=100, options=dict())
...@@ -72,10 +66,27 @@ class TestSpawnAssistMethod(unittest.TestCase): ...@@ -72,10 +66,27 @@ class TestSpawnAssistMethod(unittest.TestCase):
_get_subprocess_env_list(nprocs=2, options=options) _get_subprocess_env_list(nprocs=2, options=options)
def test_get_correct_env(self): def test_get_correct_env(self):
env_dict = _get_subprocess_env_list(nprocs=1, options=dict())[0] options = dict()
options['print_config'] = True
env_dict = _get_subprocess_env_list(nprocs=1, options=options)[0]
self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0') self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0')
self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1') self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1')
def test_nprocs_not_equal_to_selected_gpus(self):
with self.assertRaises(ValueError):
options = dict()
options['selected_gpus'] = "100,101,102"
_get_subprocess_env_list(nprocs=2, options=options)
def test_options_valid_check(self):
options = dict()
options['selected_gpus'] = "100,101,102"
_options_valid_check(options)
with self.assertRaises(ValueError):
options['error'] = "error"
_options_valid_check(options)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册