From bb16c2515db2f12378cb4c133676e2af8f4bf08b Mon Sep 17 00:00:00 2001 From: Chen Weihang Date: Tue, 24 Nov 2020 21:21:38 +0800 Subject: [PATCH] Polish parallel api impl & doc details (#28980) * polish parallel api impl & doc details * add unittest for coverage * remove spawn test in py2.7 * add parallel api into white list --- python/paddle/distributed/parallel.py | 66 ++++++++++++------- python/paddle/distributed/spawn.py | 33 +++++++--- python/paddle/fluid/dygraph/parallel.py | 51 +++++++------- .../test_spawn_and_init_parallel_env.py | 2 +- tools/wlist.json | 2 + 5 files changed, 91 insertions(+), 63 deletions(-) diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 9b6691dac7..2f951d6aa9 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -32,6 +32,17 @@ __all__ = ["init_parallel_env"] ParallelStrategy = core.ParallelStrategy +# NOTE(chenweihang): Maintain a global parallel env to avoid +# initializing ParallelEnv every time and improve performance +_global_parallel_env = None + + +def _get_global_parallel_env(): + global _global_parallel_env + if _global_parallel_env is None: + _global_parallel_env = ParallelEnv() + return _global_parallel_env + def _start_kv_server(port, http_server_d): from paddle.distributed.fleet.utils.http_server import KVServer @@ -48,8 +59,7 @@ def init_parallel_env(): Initialize parallel training environment in dynamic graph mode. .. note:: - Now only supports initializing the GPU parallel training - environment and using NCCL for communication. + Now initialize both `NCCL` and `GLOO` contexts for communication. Returns: None @@ -72,13 +82,10 @@ def init_parallel_env(): return self._linear2(self._linear1(x)) def train(): - # 1. enable dynamic mode - paddle.disable_static() - - # 2. initialize parallel environment + # 1. initialize parallel environment dist.init_parallel_env() - # 3. create data parallel layer & optimizer + # 2. create data parallel layer & optimizer layer = LinearNet() dp_layer = paddle.DataParallel(layer) @@ -86,7 +93,7 @@ def init_parallel_env(): adam = opt.Adam( learning_rate=0.001, parameters=dp_layer.parameters()) - # 4. run layer + # 3. run layer inputs = paddle.randn([10, 10], 'float32') outputs = dp_layer(inputs) labels = paddle.randn([10, 1], 'float32') @@ -101,6 +108,18 @@ def init_parallel_env(): dist.spawn(train) """ + # 0. get env & check world size + global _global_parallel_env + # when call init_parallel_env, need update `_global_parallel_env` + _global_parallel_env = ParallelEnv() + parallel_env = _global_parallel_env + # if not parallel, `init_parallel_env` do nothing + if parallel_env.world_size < 2: + warnings.warn( + "Currently not a parallel execution environment, `paddle.distributed.init_parallel_env` will not do anything." + ) + return + # 1. gpu check if not core.is_compiled_with_cuda(): raise NotImplementedError( @@ -122,17 +141,14 @@ def init_parallel_env(): _check_var_exists("PADDLE_TRAINERS_NUM") _check_var_exists("PADDLE_TRAINER_ENDPOINTS") - if ParallelEnv().world_size < 2: - return - # 3: init gloo context (step 1: httpsever start) - ep_rank_0 = ParallelEnv().trainer_endpoints[0].split(":") - ep_rank = ParallelEnv().trainer_endpoints[ParallelEnv().rank].split(":") + ep_rank_0 = parallel_env.trainer_endpoints[0].split(":") + ep_rank = parallel_env.trainer_endpoints[parallel_env.rank].split(":") manager = Manager() # glboal dict to store status http_server_d = manager.dict() http_server_d["running"] = False - if ParallelEnv().rank == 0: + if parallel_env.rank == 0: http_server = Process( target=_start_kv_server, args=(int(ep_rank_0[1]), http_server_d)) http_server.daemon = True @@ -143,10 +159,10 @@ def init_parallel_env(): strategy = ParallelStrategy() if parallel_helper._is_parallel_ctx_initialized(): warnings.warn("The parallel environment has been initialized.") - strategy.nranks = ParallelEnv().world_size - strategy.local_rank = ParallelEnv().rank - strategy.trainer_endpoints = ParallelEnv().trainer_endpoints - strategy.current_endpoint = ParallelEnv().current_endpoint + strategy.nranks = parallel_env.world_size + strategy.local_rank = parallel_env.rank + strategy.trainer_endpoints = parallel_env.trainer_endpoints + strategy.current_endpoint = parallel_env.current_endpoint # NOTE(chenweihang): [ why config global place here? ] # the dygraph mode will be set to default mode, @@ -154,7 +170,7 @@ def init_parallel_env(): # directly, if they want to switch default place, # they need to call a function to change default place, # here just set correctly place to users - place = core.CUDAPlace(ParallelEnv().device_id) + place = core.CUDAPlace(parallel_env.device_id) _set_expected_place(place) # init nccl context @@ -165,11 +181,11 @@ def init_parallel_env(): # dividing init_gloo into two part beacause nccl and gloo # are separately looking for free ports which sometimes # leads to port-conflict. - wait_server_ready([ParallelEnv().trainer_endpoints[0]]) + wait_server_ready([parallel_env.trainer_endpoints[0]]) gloo_strategy = core.GlooParallelStrategy() - gloo_strategy.rank = ParallelEnv().rank - gloo_strategy.rank_num = ParallelEnv().world_size + gloo_strategy.rank = parallel_env.rank + gloo_strategy.rank_num = parallel_env.world_size gloo_strategy.ip_address = ep_rank_0[0] gloo_strategy.ip_port = int(ep_rank_0[1]) default_init_timeout_seconds = 3600 @@ -178,7 +194,7 @@ def init_parallel_env(): gloo_strategy.run_seconds = default_run_timeout_seconds gloo = core.GlooParallelContext(gloo_strategy) gloo.init() - if ParallelEnv().rank == 0: + if parallel_env.rank == 0: http_server_d["running"] = False http_server.join() @@ -203,7 +219,7 @@ def get_rank(): print("The rank is %d" % dist.get_rank()) # The rank is 0 """ - return ParallelEnv().rank + return _get_global_parallel_env().rank def get_world_size(): @@ -226,4 +242,4 @@ def get_world_size(): print("The world_size is %d" % dist.get_world_size()) # The world_size is 4 """ - return ParallelEnv().world_size + return _get_global_parallel_env().world_size diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index fda898799f..2d1ff128d8 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -68,6 +68,18 @@ def _py_supported_check(): "`paddle.distributed.launch` instead.") +def _options_valid_check(options): + supported_options = [ + 'start_method', 'cluster_node_ips', 'node_ip', 'started_port', + 'selected_gpus', 'print_config', 'use_paddlecloud' + ] + for key in options: + if key not in supported_options: + raise ValueError( + "The config option (%s) of `paddle.distributed.spawn` is not supported." + % key) + + def _get_subprocess_env_list(nprocs, options): # contruct processes env list processes_env_list = [] @@ -290,14 +302,11 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): def forward(self, x): return self._linear2(self._linear1(x)) - def train(print_result=False): - # 1. enable dynamic mode - paddle.disable_static() - - # 2. initialize parallel environment + def train(print_result=False): + # 1. initialize parallel environment dist.init_parallel_env() - # 3. create data parallel layer & optimizer + # 2. create data parallel layer & optimizer layer = LinearNet() dp_layer = paddle.DataParallel(layer) @@ -305,7 +314,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): adam = opt.Adam( learning_rate=0.001, parameters=dp_layer.parameters()) - # 4. run layer + # 3. run layer inputs = paddle.randn([10, 10], 'float32') outputs = dp_layer(inputs) labels = paddle.randn([10, 1], 'float32') @@ -344,13 +353,13 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): # Usage 4: pass function, arguments, nprocs and selected_gpus. # If your training method need some arguments, and # only use part of visible devices for parallel training, - # but you can't set your machine's environment varibale + # but you can't set your machine's environment variable # CUDA_VISIBLE_DEVICES, such as it is None or all cards - # {0,1,2,3,4,5,6,7}, you can pass `selelcted_gpus` to + # {0,1,2,3,4,5,6,7}, you can pass `selected_gpus` to # select the GPU cards you want to use. For example, # this case will use cards {4,5} if your machine hold 8 cards. if __name__ == '__main__': - dist.spawn(train, args=(True,), nprocs=2, selelcted_gpus='4,5') + dist.spawn(train, args=(True,), nprocs=2, selected_gpus='4,5') """ # NOTE(chenweihang): [ why only supports python3.4+ ? ] # Python supported setting the child process startup method @@ -359,6 +368,10 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): # cannot support CUDA runtime multi-process _py_supported_check() + # Give an error hint when the users enter a configuration option + # that does not exist + _options_valid_check(options) + # get default nprocs if nprocs == -1: device = get_device() diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 28670aa1b0..cbe78c4d20 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -377,13 +377,10 @@ class DataParallel(layers.Layer): return self._linear2(self._linear1(x)) def train(): - # 1. enable dynamic mode - paddle.disable_static() - - # 2. initialize parallel environment + # 1. initialize parallel environment dist.init_parallel_env() - # 3. create data parallel layer & optimizer + # 2. create data parallel layer & optimizer layer = LinearNet() dp_layer = paddle.DataParallel(layer) @@ -391,7 +388,7 @@ class DataParallel(layers.Layer): adam = opt.Adam( learning_rate=0.001, parameters=dp_layer.parameters()) - # 4. run layer + # 3. run layer inputs = paddle.randn([10, 10], 'float32') outputs = dp_layer(inputs) labels = paddle.randn([10, 1], 'float32') @@ -450,28 +447,28 @@ class DataParallel(layers.Layer): include_sublayers=True, structured_name_prefix=""): ''' - Get all parameters of self._layers and its sub-layers. And set all the parameters into a dict + Get all parameters and persistable buffers of current layer and its sub-layers. And set them into a dict Parameters: - destination(dict, optional) : If provide, all the parameters will set to this dict . Default: None - include_sublayers(bool, optional) : If true, also include the parameters from sublayers. Default: True - structured_name_prefix(str, optional): If not empty str, all the key in state dict will start - with structured_name_prefix + destination(dict, optional) : If provide, all the parameters and persistable buffers will be set to this dict . Default: None + include_sublayers(bool, optional) : If true, also include the parameters and persistable buffers from sublayers. Default: True Retruns: - dict: a dict contains all the parameters of self._layers + dict: a dict contains all the parameters and persistable buffers. Examples: .. code-block:: python - import paddle.fluid as fluid - with fluid.dygraph.guard(): - strategy=fluid.dygraph.prepare_context() - emb = fluid.dygraph.Embedding([10, 10]) - emb = fluid.dygraph.DataParallel(emb, strategy) + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + + emb = fluid.dygraph.Embedding([10, 10]) + emb = fluid.dygraph.DataParallel(emb) - state_dict = emb.state_dict() - fluid.save_dygraph( state_dict, "paddle_dy") + state_dict = emb.state_dict() + paddle.save(state_dict, "paddle_dy.pdparams") ''' @@ -486,12 +483,12 @@ class DataParallel(layers.Layer): include_sublayers=True, use_structured_name=True): ''' - Set parameters of self._layers from state_dict. All the parameters of self._layers will be reset by the tensor in the state_dict + Set parameters and persistable buffers from state_dict. All the parameters and buffers will be reset by the tensor in the state_dict Parameters: - state_dict(dict) : Dict contains all the parameters - include_sublayers(bool, optional) : If true, also include the parameters from sublayers. Default: True - use_structured_name(bool, optional) : If true, use structured name as key, otherwise, use parameter name as key. + state_dict(dict) : Dict contains all the parameters and persistable buffers. + include_sublayers(bool, optional) : If true, also include the parameters and peresistable buffers from sublayers. Default: True + use_structured_name(bool, optional) : If true, use structured name as key, otherwise, use parameter or buffer name as key. Default: True Returns: None @@ -499,18 +496,18 @@ class DataParallel(layers.Layer): Examples: .. code-block:: python - import paddle + import paddle + import paddle.distributed as dist - paddle.disable_static() + dist.init_parallel_env() emb = paddle.nn.Embedding(10, 10) - emb = fluid.dygraph.DataParallel(emb, strategy) + emb = fluid.dygraph.DataParallel(emb) state_dict = emb.state_dict() paddle.save(state_dict, "paddle_dy.pdparams") para_state_dict = paddle.load("paddle_dy.pdparams") - emb.set_state_dict(para_state_dict) ''' diff --git a/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py b/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py index 171d3788d8..b6336379ba 100644 --- a/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py +++ b/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py @@ -37,7 +37,7 @@ class TestInitParallelEnv(unittest.TestCase): os.environ['FLAGS_selected_gpus'] = '0' os.environ['PADDLE_TRAINER_ID'] = '0' os.environ['PADDLE_CURRENT_ENDPOINT'] = '127.0.0.1:6170' - os.environ['PADDLE_TRAINERS_NUM'] = '1' + os.environ['PADDLE_TRAINERS_NUM'] = '2' with self.assertRaises(ValueError): dist.init_parallel_env() diff --git a/tools/wlist.json b/tools/wlist.json index 648cbf6c3b..a51ac905e6 100644 --- a/tools/wlist.json +++ b/tools/wlist.json @@ -379,6 +379,8 @@ "While.block", "DGCMomentumOptimizer", "ParallelEnv", + "spawn", + "init_parallel_env", "DataParallel", "DataParallel.scale_loss", "DataParallel.apply_collective_grads", -- GitLab