From 8df4622981339a61f9ecf4e09463a23205c75550 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Sat, 2 Apr 2022 11:12:58 +0800 Subject: [PATCH] wrapper the usage of distributed functions (#39720) --- .../distributed/collective/ProcessGroup.h | 13 +- python/paddle/distributed/collective.py | 367 ++++++++---------- python/paddle/distributed/parallel.py | 99 ++++- python/paddle/fluid/dygraph/parallel.py | 7 +- .../fluid/tests/unittests/CMakeLists.txt | 3 + .../tests/unittests/init_process_group.py | 14 +- .../tests/unittests/process_group_nccl.py | 157 ++++++-- .../tests/unittests/test_eager_dist_api.py | 33 ++ .../tests/unittests/test_fleet_base_single.py | 2 +- ...t_parallel_dygraph_dataparallel_cpuonly.py | 3 + 10 files changed, 436 insertions(+), 262 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_eager_dist_api.py diff --git a/paddle/fluid/distributed/collective/ProcessGroup.h b/paddle/fluid/distributed/collective/ProcessGroup.h index 36a00a7d31..c2ad1aa2c9 100644 --- a/paddle/fluid/distributed/collective/ProcessGroup.h +++ b/paddle/fluid/distributed/collective/ProcessGroup.h @@ -158,16 +158,17 @@ class ProcessGroupMapFromGid { } void insert(int gid, ProcessGroup* pg) { - PADDLE_ENFORCE_EQ(has(gid), false, - platform::errors::PreconditionNotMet( - "The process group with id %d doesnot exist.", gid)); + // PADDLE_ENFORCE_EQ(has(gid), false, + // platform::errors::PreconditionNotMet( + // "The process group with id %d does exist.", gid)); map_[gid] = pg; } ProcessGroup* get(int gid) { - PADDLE_ENFORCE_EQ(has(gid), false, - platform::errors::PreconditionNotMet( - "The process group with id %d doesnot exist.", gid)); + // PADDLE_ENFORCE_EQ(has(gid), true, + // platform::errors::PreconditionNotMet( + // "The process group with id %d doesnot exist.", + // gid)); return map_.find(gid)->second; } diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 6dbd7d228e..ecd31386a2 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -16,7 +16,9 @@ import numpy as np import os from datetime import timedelta from ..fluid.layer_helper import LayerHelper +import paddle.fluid.framework as framework from ..fluid.framework import Variable +from ..fluid.framework import in_dygraph_mode from ..fluid.framework import OpProtoHolder from ..fluid.framework import _non_static_mode from ..fluid.framework import convert_np_dtype_to_dtype_ @@ -174,10 +176,6 @@ def _new_ring_id(): return len(_get_group_map()) + max(_get_global_env().nrings, 9) -def _new_group_name_id(): - return len(_get_group_map_by_name()) + max(_get_global_env().nrings, 9) - - def get_group(id=0): """ @@ -202,194 +200,24 @@ def get_group(id=0): return gm[id] if id in gm else None -def _new_process_group_impl(backend, store, rank, world_size, group_name, - pg_options): - if backend == "gloo": - gloo_store = core.GlooStore(store) - +def _new_process_group_impl(backend, + store, + rank, + world_size, + group_name, + pg_options, + group_id=0): pg = None if backend == "gloo": - pg = core.ProcessGroupGloo(gloo_store, rank, world_size) + pg = core.ProcessGroupGloo(store, rank, world_size, group_id) elif backend == "nccl": - pg = core.ProcessGroupNCCL(store, rank, world_size) + pg = core.ProcessGroupNCCL(store, rank, world_size, group_id) elif backend == "hccl": - pg = core.ProcessGroupHCCL(store, rank, world_size) + pg = core.ProcessGroupHCCL(store, rank, world_size, group_id) return pg -def _init_parallel_env(rank=None, - world_size=None, - backend="nccl", - timeout=timedelta(0), - pg_options=None): - """ - - Initializes the default distributed environment. - - Args: - rank (int, optional): the rank of the current process or device from 0 to world_size (exclusive). - If you launch your training with paddle.distributed.run or - paddle.distributed.launch module, None can be given. Default: None. - world_size (int, optional): total number of processes or devices. - If you launch your training with paddle.distributed.run or - paddle.distributed.launch module, None can be given. Default: None. - backend (str, optional): the name of the backend used to initialize - the distributed environment. The value can be one of 'nccl' for - GPU, 'gloo' for CPU or 'hccl' for NPU. Default: 'nccl'. - timeout (datetime.timedelta, optional): timeout used for operations of - the group. Default: datetime.timedelta(0) which means no timeout. - pg_options (dict, optional): options for the group. Default: None. - - Returns: - Group: a group. - - Examples: - - .. code-block:: python - - # filename: train.py - import paddle - paddle.distributed.init_parallel_env(0, 1) - - # how to start - # python paddle.distributed.run --gpus="0,1" train.py - - """ - - global _group_map_by_name - global _default_group_name - assert _default_group_name not in _group_map_by_name, ( - "The default distributed environment has been initialized.") - - assert backend in _valid_backend_list, ( - "Backend must be one of {}, but the given one is: {}".format( - _valid_backend_list, backend)) - _default_backend = backend - - assert isinstance(timeout, timedelta), ( - "timeout must be of the type datetime.timedelta.") - - if rank is None or world_size is None: - assert rank is None and world_size is None, ( - "rank and world_size should be unset at the same time.") - trainer_id = os.getenv("PADDLE_TRAINER_ID", None) - trainer_num = os.getenv("PADDLE_TRAINERS_NUM", None) - if trainer_id is None or trainer_num is None: - warnings.warn("If rank and world_size are both None, please start " - "your training with paddle.distributed.run or " - "paddle.distributed.launch module. Otherwise, " - "init_parallel_env will do nothing.") - return None - rank = int(trainer_id) - world_size = int(trainer_num) - - assert rank >= 0 and world_size > rank and world_size > 1, ( - "rank must be non-negative and world_size must be the " - "maximum rank plus one. Moreover, at least two processes are " - "required to create a process group.") - - master_addr = os.getenv("MASTER_ADDR", None) - master_port = os.getenv("MASTER_PORT", None) - if not master_addr or not master_port: - endpoints = os.getenv("PADDLE_MASTER", None) - if endpoints is None: - endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", None) - if not endpoints: - raise ValueError( - "The environment variable 'MASTER_ADDR' and 'MASTER_PORT' " - "must be specified, for example 'export MASTER_ADDR=127.0.0.1' " - "and 'export MASTER_ADDR=54612'. Or you can start your training" - "with paddle.distributed.run or " - "paddle.distributed.luanch module.") - if ',' in endpoints: - endpoints = endpoints.split(',')[0] - master_addr, master_port = endpoints.split(":") - - master_port = int(master_port) - - is_master = rank == 0 - global _default_store - _default_store = core.TCPStore(master_addr, master_port, is_master, - world_size, timeout) - - pg = _new_process_group_impl(backend, _default_store, rank, world_size, - _default_group_name, pg_options) - ranks = list(range(world_size)) - group = Group( - rank, world_size, id=0, ranks=ranks, pg=pg, name=_default_group_name) - - paddle.fluid.dygraph.parallel_helper._set_parallel_ctx(True) - _group_map_by_name[_default_group_name] = group - return group - - -def _new_group(ranks=None, - backend=None, - group_name=None, - timeout=timedelta(0), - pg_options=None): - """ - Create a new process group. - - Args: - ranks (list, optional): list of ranks for the new group. If None is given, - all processes is used. Default: None. - backend (str, optional): the name of the backend used to initialize - the distributed environment. Default: the one for init_parallel_env. - timeout (datetime.timedelta, optional): timeout used for operations of - the group. Default: datetime.timedelta(0). - pg_options (dict, optional): options for the group. Default: None. - - Examples: - - .. code-block:: python - - import paddle - paddle.distributed.init_parallel_env(0, 1) - paddle.distributed.new_group([0, 1]) - - # how to start - # python paddle.distributed.run --gpus="0,1" train.py - - """ - global _default_group_name - if group_name is None: - group_name = _default_group_name + str(_new_group_name_id()) - if group_name == _default_group_name: - raise ValueError("group_name must be specified and it cannot be '{}' " - "which is used for the default process group created " - "by init_parallel_env.".format(_default_group_name)) - global_group = _get_default_group() - global_rank = global_group.rank - global_ranks = global_group.ranks - if ranks is None: - ranks = global_ranks - assert len(ranks) <= len(global_ranks), ( - "Size of new group must be less than or " - "equal to that of the default global group.") - size = len(ranks) - assert size > 1, "A group must have at least two memebers." - ranks = sorted(ranks) - if global_rank in ranks: - rank = ranks.index(global_rank) - pg = _new_process_group_impl(backend, _default_store, rank, size, - group_name, pg_options) - else: - rank = -1 - pg = None - group = Group( - rank, - size, - id=_new_group_name_id(), - ranks=ranks, - pg=pg, - name=group_name) - _group_map_by_name[group_name] = group - - return group - - def barrier(group=None): """ @@ -414,6 +242,12 @@ def barrier(group=None): if group is not None and not group.is_member(): return + if framework._in_eager_mode_ and in_dygraph_mode(): + group = _get_default_group() if group is None else group + task = group.process_group.barrier() + task.wait() + return + ring_id = 0 if group is None else group.id temp = fill_constant([1], dtype="int32", value="1") @@ -455,6 +289,40 @@ def new_group(ranks=None, backend=None): paddle.distributed.all_reduce(tindata, group=gp, use_calc_stream=False) """ + global _group_map + if framework._in_eager_mode_: + global _default_group_name + gid = _new_ring_id() + group_name = _default_group_name + str(gid) + global_group = _get_default_group() + global_rank = global_group.rank + global_ranks = global_group.ranks + if ranks is None: + ranks = global_ranks + assert len(ranks) <= len(global_ranks), ( + "Size of new group must be less than or " + "equal to that of the default global group.") + size = len(ranks) + assert size > 1, "A group must have at least two memebers." + ranks = sorted(ranks) + if global_rank in ranks: + rank = ranks.index(global_rank) + pg = _new_process_group_impl( + backend, + _default_store, + rank, + size, + group_name, + pg_options=None, + group_id=gid) + else: + rank = -1 + pg = None + group = Group(rank, size, id=gid, ranks=ranks, pg=pg, name=group_name) + _group_map_by_name[group_name] = group + _group_map[gid] = group + + return group if not backend: backend = 'nccl' @@ -465,7 +333,6 @@ def new_group(ranks=None, backend=None): ring_id = _new_ring_id() - global _group_map if global_rank not in ranks: gp = Group(-1, -1, ring_id, ranks) _group_map[ring_id] = gp @@ -628,7 +495,18 @@ def broadcast(tensor, src, group=None, use_calc_stream=True): if not isinstance(src, int): raise ValueError("src should be int.") - ring_id = 0 if group is None else group.id + if framework._in_eager_mode_ and in_dygraph_mode(): + group = _get_default_group() if group is None else group + gsrc = group.get_group_rank(src) + assert gsrc >= 0, ("src rank out of group, need global rank") + task = group.process_group.broadcast(tensor, gsrc) + if use_calc_stream: + task.wait() + return None + else: + return task + + ring_id = ring_id = 0 if group is None else group.id gsrc = src if group is None else group.get_group_rank(src) assert gsrc >= 0, ("src rank out of group, need global rank") @@ -701,6 +579,23 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): if group is not None and not group.is_member(): return + if framework._in_eager_mode_ and in_dygraph_mode(): + if op == ReduceOp.SUM: + op_type = core.ReduceOp.SUM + elif op == ReduceOp.MAX: + op_type = core.ReduceOp.MAX + elif op == ReduceOp.MIN: + op_type = core.ReduceOp.MIN + else: + raise ValueError("Unknown reduce_op type for allreduce.") + group = _get_default_group() if group is None else group + task = group.process_group.allreduce(tensor, op_type) + if use_calc_stream: + task.wait() + return None + else: + return task + ring_id = 0 if group is None else group.id if _non_static_mode(): if op == ReduceOp.SUM: @@ -721,9 +616,6 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): check_variable_and_dtype( tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'], 'all_reduce') - if not op in [ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN, ReduceOp.PROD]: - raise ValueError("The op for all_reduce must be one of educeOp.PROD, " - "ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN.") if op == ReduceOp.SUM: op_type = 'c_allreduce_sum' elif op == ReduceOp.MAX: @@ -789,8 +681,24 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): if group is not None and not group.is_member(): return - if not isinstance(dst, int): - raise ValueError("dst should be int.") + if framework._in_eager_mode_ and in_dygraph_mode(): + if op == ReduceOp.SUM: + op_type = core.ReduceOp.SUM + elif op == ReduceOp.MAX: + op_type = core.ReduceOp.MAX + elif op == ReduceOp.MIN: + op_type = core.ReduceOp.MIN + else: + raise ValueError("Unknown reduce_op type for reduce.") + group = _get_default_group() if group is None else group + gdst = group.get_group_rank(dst) + assert gdst >= 0, ("dst rank out of group, need global rank") + task = group.process_group.reduce(tensor, gdst, op_type) + if use_calc_stream: + task.wait() + return None + else: + return task ring_id = 0 if group is None else group.id gdst = dst if group is None else group.get_group_rank(dst) @@ -820,9 +728,6 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): check_variable_and_dtype( tensor, 'tensor', ['float16', 'float32', 'float64', 'int32', 'int64'], 'all_reduce') - if not op in [ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN, ReduceOp.PROD]: - raise ValueError("The op for reduce must be one of educeOp.PROD, " - "ReduceOp.SUM, ReduceOp.MAX, ReduceOp.MIN.") if op == ReduceOp.SUM: op_type = 'c_reduce_sum' @@ -897,6 +802,15 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): if group is not None and not group.is_member(): return + if framework._in_eager_mode_ and in_dygraph_mode(): + group = _get_default_group() if group is None else group + out = paddle.concat(tensor_list) + task = group.process_group.all_gather(tensor, out) + task.wait() + tensor_list.clear() + tensor_list.extend(paddle.split(out, group.nranks, 0)) + return + ring_id = 0 if group is None else group.id nranks = _get_global_group().nranks if group is None else group.nranks @@ -985,18 +899,32 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True): if not isinstance(src, int): raise ValueError("src should be int.") - ring_id = 0 if group is None else group.id - gsrc = src if group is None else group.get_group_rank(src) + if framework._in_eager_mode_ and in_dygraph_mode(): + group = _get_default_group() if group is None else group + gsrc = group.get_group_rank(src) + rank = group.rank + nranks = group.nranks + else: + ring_id = 0 if group is None else group.id + gsrc = src if group is None else group.get_group_rank(src) + rank = _get_global_group().rank if group is None else group.rank + nranks = _get_global_group().nranks if group is None else group.nranks assert gsrc >= 0, ("src rank out of group, need global rank") - rank = _get_global_group().rank if group is None else group.rank - nranks = _get_global_group().nranks if group is None else group.nranks if rank != gsrc: tensor_list = [] for _ in range(nranks): tensor_list.append(tensor) temp = paddle.concat(tensor_list, axis=0) - if _non_static_mode(): + if framework._in_eager_mode_ and in_dygraph_mode(): + task = group.process_group.scatter(temp, tensor, gsrc) + if use_calc_stream: + task.wait() + return None + else: + return task + + if in_dygraph_mode(): return _C_ops.c_scatter(temp, tensor, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id, 'nranks', nranks, 'root', gsrc) @@ -1070,11 +998,12 @@ def _c_concat(tensor, group=None): """ if group is not None and not group.is_member(): return - ring_id = 0 if group is None else group.id + group = _get_default_group() if group is None else group + ring_id = group.id global_rank = _get_global_env().rank - rank = global_rank if group is None else group.get_group_rank(global_rank) - nranks = _get_global_env().world_size if group is None else group.nranks + rank = group.rank + nranks = group.nranks if _non_static_mode(): return _C_ops.c_concat(tensor, 'ring_id', ring_id, 'use_calc_stream', @@ -1765,9 +1694,21 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True): if group is not None and not group.is_member(): return - ring_id = 0 if group is None else group.id + if framework._in_eager_mode_ and in_dygraph_mode(): + group = _get_default_group() if group is None else group + else: + ring_id = 0 if group is None else group.id + temp = paddle.concat(in_tensor_list, axis=0) nranks = len(in_tensor_list) + if framework._in_eager_mode_ and in_dygraph_mode(): + out = paddle.concat(out_tensor_list, axis=0) + task = group.process_group.alltoall(temp, out) + task.wait() + out_tensor_list.clear() + out_tensor_list.extend(paddle.split(out, nranks, 0)) + return + if _non_static_mode(): out = _C_ops.alltoall(temp, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id) @@ -1834,6 +1775,16 @@ def send(tensor, dst=0, group=None, use_calc_stream=True): """ if group is not None and not group.is_member(): return + + if framework._in_eager_mode_ and in_dygraph_mode(): + group = _get_default_group() if group is None else group + task = group.process_group.send(tensor, dst) + if use_calc_stream: + task.wait() + return None + else: + return task + ring_id = 0 if group is None else group.id if _non_static_mode(): @@ -1887,6 +1838,16 @@ def recv(tensor, src=0, group=None, use_calc_stream=True): """ if group is not None and not group.is_member(): return + + if framework._in_eager_mode_ and in_dygraph_mode(): + group = _get_default_group() if group is None else group + task = group.process_group.recv(tensor, src) + if use_calc_stream: + task.wait() + return None + else: + return task + ring_id = 0 if group is None else group.id if _non_static_mode(): diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 16ed528b64..71ac15bd4b 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -24,11 +24,21 @@ from paddle import compat as cpt # deprecated module import from paddle.fluid import core +import paddle.fluid.framework as framework from paddle.fluid.framework import _set_expected_place from paddle.fluid.dygraph import parallel_helper from paddle.distributed.fleet.launch_utils import check_backend from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.distributed.fleet.base.private_helper_function import wait_server_ready # noqa: F401 +import paddle.distributed.collective as collective +from paddle.distributed.collective import _group_map_by_name +from paddle.distributed.collective import _group_map +from paddle.distributed.collective import _default_group_name +from paddle.distributed.collective import _valid_backend_list +from paddle.distributed.collective import _default_backend +from paddle.distributed.collective import _default_store +from paddle.distributed.collective import _new_process_group_impl +from paddle.distributed.collective import Group __all__ = [] @@ -159,18 +169,88 @@ def init_parallel_env(): if not is_cpu_only and core.is_compiled_with_cuda(): _check_var_exists("FLAGS_selected_gpus") + backend = "nccl" if backend == "auto" else backend elif not is_cpu_only and core.is_compiled_with_xpu(): _check_var_exists('FLAGS_selected_xpus') + backend = "bkcl" if backend == "auto" else backend elif not is_cpu_only and core.is_compiled_with_npu(): _check_var_exists('FLAGS_selected_npus') + backend = "hccl" if backend == "auto" else backend elif not is_cpu_only and core.is_compiled_with_mlu(): _check_var_exists('FLAGS_selected_mlus') + backend = "cncl" if backend == "auto" else backend _check_var_exists("PADDLE_TRAINER_ID") _check_var_exists("PADDLE_CURRENT_ENDPOINT") _check_var_exists("PADDLE_TRAINERS_NUM") _check_var_exists("PADDLE_TRAINER_ENDPOINTS") + # NOTE(chenweihang): [ why config global place here? ] + # the dygraph mode will be set to default mode, + # users will not call `dygraph.guard` or `enable_dygraph` + # directly, if they want to switch default place, + # they need to call a function to change default place, + # here just set correctly place to users + if is_cpu_only: + place = core.CPUPlace() + elif core.is_compiled_with_cuda(): + place = core.CUDAPlace(parallel_env.device_id) + elif core.is_compiled_with_xpu(): + place = core.XPUPlace(parallel_env.device_id) + elif core.is_compiled_with_npu(): + place = core.NPUPlace(parallel_env.device_id) + elif core.is_compiled_with_mlu(): + place = core.MLUPlace(parallel_env.device_id) + + _set_expected_place(place) + + group = None + if backend in _valid_backend_list and framework._in_eager_mode_: + if _default_group_name in collective._group_map_by_name: + return collective._group_map_by_name[_default_group_name] + _default_backend = backend + rank = int(os.getenv("PADDLE_TRAINER_ID")) + world_size = int(os.getenv("PADDLE_TRAINERS_NUM")) + assert rank >= 0 and world_size > rank and world_size > 1, ( + "rank must be non-negative and world_size must be the " + "maximum rank plus one. Moreover, at least two processes are " + "required to create a process group.") + master_addr = os.getenv("MASTER_ADDR", None) + master_port = os.getenv("MASTER_PORT", None) + if not master_addr or not master_port: + endpoints = os.getenv("PADDLE_MASTER", None) + if endpoints is None: + endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS").split(',')[0] + assert endpoints, ( + "The environment variable 'MASTER_ADDR' and 'MASTER_PORT' " + "must be specified, for example 'export MASTER_ADDR=127.0.0.1' " + "and 'export MASTER_ADDR=54612'. Or you can start your training" + "with paddle.distributed.run module.") + master_addr, master_port = endpoints.split(":") + master_port = int(master_port) + is_master = rank == 0 + _default_store = core.TCPStore(master_addr, master_port, is_master, + world_size) + pg = _new_process_group_impl( + backend, + _default_store, + rank, + world_size, + _default_group_name, + pg_options=None) + ranks = list(range(world_size)) + group = Group( + rank, + world_size, + id=0, + ranks=ranks, + pg=pg, + name=_default_group_name) + collective._group_map_by_name[_default_group_name] = group + _group_map[0] = group + parallel_helper._set_parallel_ctx(True) + return group + node_num = set([i.split(":")[0] for i in parallel_env.trainer_endpoints]) # 3: init gloo context (step 1: httpsever start) init_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0")) @@ -202,24 +282,6 @@ def init_parallel_env(): strategy.current_endpoint = parallel_env.current_endpoint strategy.nrings = parallel_env.nrings - # NOTE(chenweihang): [ why config global place here? ] - # the dygraph mode will be set to default mode, - # users will not call `dygraph.guard` or `enable_dygraph` - # directly, if they want to switch default place, - # they need to call a function to change default place, - # here just set correctly place to users - if is_cpu_only: - place = core.CPUPlace() - elif core.is_compiled_with_cuda(): - place = core.CUDAPlace(parallel_env.device_id) - elif core.is_compiled_with_xpu(): - place = core.XPUPlace(parallel_env.device_id) - elif core.is_compiled_with_npu(): - place = core.NPUPlace(parallel_env.device_id) - elif core.is_compiled_with_mlu(): - place = core.MLUPlace(parallel_env.device_id) - - _set_expected_place(place) # init nccl or hccl or bkcl or heter context if is_cpu_only: parallel_helper._set_parallel_ctx( @@ -274,6 +336,7 @@ def init_parallel_env(): if parallel_env.rank == 0: http_server_d["running"] = False http_server.join() + return group def get_rank(): diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 64388aadb2..cac67a02dd 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -360,9 +360,10 @@ def sync_params_buffers(model, is_model_parallel=False): model_vars = [] for _, param in model._obtain_parameters_buffers().items(): - if not isinstance(param, core.VarBase): - raise TypeError("The data type of '%s' must be Varbase" % - param.name) + if not isinstance(param, (core.VarBase, core.eager.Tensor)): + raise TypeError( + "The data type of '%s' must be Varbase or eager.Tensor" % + param.name) # is_distributed param not need to sync when in mp mode if isinstance(param, ParamBase): diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index c816a8c4c2..272ca80674 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -60,6 +60,7 @@ list(APPEND DIST_TEST_OPS test_auto_parallel_data_unshard) list(APPEND DIST_TEST_OPS test_auto_parallel_save_load) list(APPEND DIST_TEST_OPS test_auto_parallel_autoconvert) list(APPEND DIST_TEST_OPS test_collective_process_group) +list(APPEND DIST_TEST_OPS test_eager_dist_api) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) #remove distribute unittests. list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) @@ -311,6 +312,7 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM)) LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_save_load) LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_autoconvert) LIST(REMOVE_ITEM TEST_OPS test_collective_process_group) + LIST(REMOVE_ITEM TEST_OPS test_eager_dist_api) elseif(WITH_GPU) if (${CUDNN_VERSION} VERSION_LESS 7100) LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op) @@ -1147,6 +1149,7 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL) set_tests_properties(test_auto_parallel_save_load PROPERTIES TIMEOUT 120) set_tests_properties(test_auto_parallel_autoconvert PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_process_group PROPERTIES TIMEOUT 120) + set_tests_properties(test_eager_dist_api PROPERTIES TIMEOUT 300) if(${NCCL_VERSION} VERSION_GREATER_EQUAL 2212) set_tests_properties(test_parallel_dygraph_sparse_embedding PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/init_process_group.py b/python/paddle/fluid/tests/unittests/init_process_group.py index 90926b1a02..c9c957572c 100644 --- a/python/paddle/fluid/tests/unittests/init_process_group.py +++ b/python/paddle/fluid/tests/unittests/init_process_group.py @@ -37,11 +37,15 @@ class TestProcessGroupFp32(unittest.TestCase): pass def test_init_process_group(self): - paddle.distributed.collective._init_parallel_env() - paddle.distributed.collective._new_group() - with self.assertRaises(ValueError): - paddle.distributed.collective._new_group( - backend="gloo", group_name="_default_pg") + with _test_eager_guard(): + paddle.distributed.init_parallel_env() + paddle.distributed.new_group() + group = paddle.distributed.new_group([-1, -2]) + assert group.process_group == None + + group = paddle.distributed.collective.Group(-1, 2, 0, [-1, -2]) + ret = paddle.distributed.barrier(group) + assert ret == None print("test ok\n") diff --git a/python/paddle/fluid/tests/unittests/process_group_nccl.py b/python/paddle/fluid/tests/unittests/process_group_nccl.py index b1da0777fe..7ae38b3bbc 100644 --- a/python/paddle/fluid/tests/unittests/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/process_group_nccl.py @@ -26,16 +26,16 @@ from datetime import timedelta import paddle.fluid.core as core from paddle.fluid.framework import _test_eager_guard from paddle.fluid.dygraph.parallel import ParallelEnv +import paddle.distributed as dist def init_process_group(strategy=None): nranks = ParallelEnv().nranks rank = ParallelEnv().local_rank is_master = True if rank == 0 else False - store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks) - pg_group = core.ProcessGroupNCCL(store, rank, nranks) + pg_group = dist.init_parallel_env() - return pg_group + return pg_group.process_group class TestProcessGroupFp32(unittest.TestCase): @@ -68,12 +68,10 @@ class TestProcessGroupFp32(unittest.TestCase): sum_result = tensor_x + tensor_y if pg.rank() == 0: - task = pg.allreduce(tensor_x) - task.wait() + task = dist.all_reduce(tensor_x) assert np.array_equal(tensor_x, sum_result) else: - task = pg.allreduce(tensor_y) - task.wait() + task = dist.all_reduce(tensor_y) assert np.array_equal(tensor_y, sum_result) print("test allreduce sum api ok") @@ -89,16 +87,41 @@ class TestProcessGroupFp32(unittest.TestCase): max_result = paddle.maximum(tensor_x, tensor_y) if pg.rank() == 0: - task = pg.allreduce(tensor_x, core.ReduceOp.MAX) + task = dist.all_reduce( + tensor_x, dist.ReduceOp.MAX, use_calc_stream=False) task.wait() assert np.array_equal(tensor_x, max_result) else: - task = pg.allreduce(tensor_y, core.ReduceOp.MAX) + task = dist.all_reduce( + tensor_y, dist.ReduceOp.MAX, use_calc_stream=False) task.wait() assert np.array_equal(tensor_y, max_result) print("test allreduce max api ok") + # test allreduce min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + min_result = paddle.minimum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.all_reduce( + tensor_x, dist.ReduceOp.MIN, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_x, min_result) + else: + task = dist.all_reduce( + tensor_y, dist.ReduceOp.MIN, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_y, min_result) + + print("test allreduce min api ok") + # test broadcast # rank 0 x = np.random.random(self.shape).astype(self.dtype) @@ -109,16 +132,14 @@ class TestProcessGroupFp32(unittest.TestCase): broadcast_result = paddle.assign(tensor_x) if pg.rank() == 0: - task = pg.broadcast(tensor_x, 0) + task = dist.broadcast(tensor_x, 0, use_calc_stream=False) task.synchronize() paddle.device.cuda.synchronize() assert task.is_completed() assert np.array_equal(broadcast_result, tensor_x) else: - task = pg.broadcast(tensor_y, 0) - task.synchronize() + task = dist.broadcast(tensor_y, 0) paddle.device.cuda.synchronize() - assert task.is_completed() assert np.array_equal(broadcast_result, tensor_y) print("test broadcast api ok") @@ -126,8 +147,7 @@ class TestProcessGroupFp32(unittest.TestCase): # test barrier # rank 0 if pg.rank() == 0: - task = pg.barrier() - task.wait() + dist.barrier() # rank 1 else: task = pg.barrier() @@ -151,9 +171,13 @@ class TestProcessGroupFp32(unittest.TestCase): paddle.device.cuda.synchronize() # rank 1 else: - task = pg.all_gather(tensor_y, tensor_out) - task.wait() + tensor_out_list = [ + paddle.empty_like(tensor_x), paddle.empty_like(tensor_x) + ] + task = dist.all_gather( + tensor_out_list, tensor_y, use_calc_stream=False) paddle.device.cuda.synchronize() + tensor_out = paddle.concat(tensor_out_list) out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) out_2 = paddle.slice(tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]) @@ -178,12 +202,14 @@ class TestProcessGroupFp32(unittest.TestCase): if pg.rank() == 0: task = pg.alltoall(tensor_x, tensor_out1) task.wait() - paddle.device.cuda.synchronize() # rank 1 else: - task = pg.alltoall(tensor_y, tensor_out2) - task.wait() + in_1, in_2 = paddle.split(tensor_y, 2) + out_1, out_2 = paddle.split(tensor_out2, 2) + out_tensor_list = [out_1, out_2] + task = dist.alltoall([in_1, in_2], out_tensor_list) paddle.device.cuda.synchronize() + tensor_out2 = paddle.concat(out_tensor_list) out1_2 = paddle.slice(tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]]) out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) @@ -201,18 +227,61 @@ class TestProcessGroupFp32(unittest.TestCase): tensor_y = paddle.to_tensor(y) sum_result = tensor_x + tensor_y if pg.rank() == 0: - task = pg.reduce(tensor_x, 0) - task.wait() + task = dist.reduce(tensor_x, 0, use_calc_stream=True) paddle.device.cuda.synchronize() # rank 1 else: - task = pg.reduce(tensor_y, 0) + task = dist.reduce(tensor_y, 0, use_calc_stream=False) task.wait() paddle.device.cuda.synchronize() if pg.rank() == 0: assert np.array_equal(tensor_x, sum_result) print("test reduce sum api ok\n") + # test reduce max + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + max_result = paddle.maximum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.reduce( + tensor_x, 0, dist.ReduceOp.MAX, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_x, max_result) + else: + task = dist.reduce( + tensor_y, 0, dist.ReduceOp.MAX, use_calc_stream=False) + task.wait() + + print("test reduce max api ok") + + # test reduce min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + min_result = paddle.minimum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.reduce( + tensor_x, 0, dist.ReduceOp.MIN, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_x, min_result) + else: + task = dist.reduce( + tensor_y, 0, dist.ReduceOp.MIN, use_calc_stream=False) + task.wait() + + print("test reduce min api ok") + # test Scatter # rank 0 in_shape = list(self.shape) @@ -222,12 +291,14 @@ class TestProcessGroupFp32(unittest.TestCase): tensor_x = paddle.to_tensor(x) tensor_y = paddle.to_tensor(y) if pg.rank() == 0: - task = pg.scatter(tensor_x, tensor_y, 0) - task.wait() + in_1, in_2 = paddle.split(tensor_x, 2) + task = dist.scatter( + tensor_y, [in_1, in_2], 0, use_calc_stream=True) + #task.wait() paddle.device.cuda.synchronize() # rank 1 else: - task = pg.scatter(tensor_x, tensor_y, 0) + task = dist.scatter(tensor_y, [], 0, use_calc_stream=False) task.wait() paddle.device.cuda.synchronize() out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) @@ -239,6 +310,40 @@ class TestProcessGroupFp32(unittest.TestCase): assert np.array_equal(tensor_y, out2) print("test scatter api ok\n") + # test send min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + if pg.rank() == 0: + task = dist.send(tensor_x, 1, use_calc_stream=False) + task.wait() + else: + task = dist.recv(tensor_y, 0, use_calc_stream=False) + task.wait() + assert np.array_equal(tensor_y, tensor_x) + + print("test send api ok") + + # test send min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + if pg.rank() == 0: + task = dist.send(tensor_x, 1, use_calc_stream=True) + else: + task = dist.recv(tensor_y, 0, use_calc_stream=True) + assert np.array_equal(tensor_y, tensor_x) + + print("test send api ok") + class TestProcessGroupFp16(TestProcessGroupFp32): def setUp(self): diff --git a/python/paddle/fluid/tests/unittests/test_eager_dist_api.py b/python/paddle/fluid/tests/unittests/test_eager_dist_api.py new file mode 100644 index 0000000000..e00f90f4b0 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_eager_dist_api.py @@ -0,0 +1,33 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +from test_parallel_dygraph_dataparallel import TestMultipleGpus + + +class TestProcessGroup(TestMultipleGpus): + def test_process_group_nccl(self): + self.run_mnist_2gpu('process_group_nccl.py') + + def test_process_group_gloo(self): + self.run_mnist_2gpu('process_group_gloo.py') + + def test_init_process_group(self): + self.run_mnist_2gpu('init_process_group.py') + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base_single.py b/python/paddle/fluid/tests/unittests/test_fleet_base_single.py index 589d6adb0f..ff54035045 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base_single.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base_single.py @@ -46,7 +46,7 @@ class TestFleetDygraphSingle(unittest.TestCase): def test_dygraph_single(self): paddle.disable_static() - fleet.init(is_collective=True) + paddle.distributed.init_parallel_env() layer = LinearNet() loss_fn = nn.MSELoss() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py index 587824a1dc..6c5a2375f6 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py @@ -70,6 +70,9 @@ def start_local_trainers(cluster, "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()), + "MASTER_ADDR": "127.0.0.1", + "MASTER_PORT": "6170", + "NCCL_DEBUG": "INFO", "PADDLE_DISTRI_BACKEND": "gloo", # make init_parallel_env get 'gloo' argument. } -- GitLab