From 99504cbb95d3c9ae3abbd95e8da1702b206b77bf Mon Sep 17 00:00:00 2001 From: LiYuRio <63526175+LiYuRio@users.noreply.github.com> Date: Fri, 4 Nov 2022 21:05:12 +0800 Subject: [PATCH] move broadcast, reduce, send, recv, reduce_scatter, scatter, alltoall (#47255) --- .../collective/ProcessGroupGloo.cc | 24 + .../distributed/collective/ProcessGroupGloo.h | 18 + python/paddle/distributed/__init__.py | 39 +- python/paddle/distributed/collective.py | 1260 +---------------- .../distributed/communication/__init__.py | 30 + .../distributed/communication/all_reduce.py | 4 +- .../distributed/communication/all_to_all.py | 161 +++ .../communication/batch_isend_irecv.py | 177 +++ .../distributed/communication/broadcast.py | 85 ++ .../paddle/distributed/communication/group.py | 133 ++ .../paddle/distributed/communication/recv.py | 111 ++ .../distributed/communication/reduce.py | 119 +- .../communication/reduce_scatter.py | 122 ++ .../distributed/communication/scatter.py | 94 ++ .../paddle/distributed/communication/send.py | 110 ++ .../communication/stream/__init__.py | 3 +- .../communication/stream/all_reduce.py | 16 +- .../communication/stream/all_to_all.py | 359 +++++ .../communication/stream/alltoall.py | 177 --- .../communication/stream/alltoall_single.py | 144 -- .../communication/stream/broadcast.py | 81 +- .../distributed/communication/stream/recv.py | 72 +- .../communication/stream/reduce.py | 74 +- .../communication/stream/reduce_scatter.py | 19 +- .../communication/stream/scatter.py | 139 +- .../distributed/communication/stream/send.py | 71 +- .../distributed/fleet/layers/mpu/mp_ops.py | 6 +- .../sharding_optimizer_stage2.py | 6 +- .../group_sharded_optimizer_stage2.py | 10 +- .../sharding/group_sharded_stage2.py | 13 +- .../sharding/group_sharded_stage3.py | 9 +- .../meta_parallel/sharding/sharding_stage2.py | 11 +- .../meta_parallel/sharding/sharding_stage3.py | 15 +- .../collective/collective_reduce_scatter.py | 5 +- .../distributed/models/moe/grad_clip.py | 6 +- 35 files changed, 1960 insertions(+), 1763 deletions(-) create mode 100644 python/paddle/distributed/communication/all_to_all.py create mode 100644 python/paddle/distributed/communication/batch_isend_irecv.py create mode 100644 python/paddle/distributed/communication/broadcast.py create mode 100644 python/paddle/distributed/communication/recv.py create mode 100644 python/paddle/distributed/communication/reduce_scatter.py create mode 100644 python/paddle/distributed/communication/scatter.py create mode 100644 python/paddle/distributed/communication/send.py create mode 100644 python/paddle/distributed/communication/stream/all_to_all.py delete mode 100644 python/paddle/distributed/communication/stream/alltoall.py delete mode 100644 python/paddle/distributed/communication/stream/alltoall_single.py diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index 07065ac908..09af416ec5 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -233,6 +233,14 @@ std::shared_ptr ProcessGroupGloo::Broadcast( std::vector& inputs, std::vector& outputs, const BroadcastOptions& opts) { + return Broadcast(inputs, outputs, opts, true); +} + +std::shared_ptr ProcessGroupGloo::Broadcast( + std::vector& inputs, + std::vector& outputs, + const BroadcastOptions& opts, + bool sync_op) { auto root = opts.source_rank; std::unique_ptr task; auto tag = next_tag(); @@ -442,6 +450,14 @@ std::shared_ptr ProcessGroupGloo::Reduce( std::vector& inputs, std::vector& outputs, const ReduceOptions& opts) { + return Reduce(inputs, outputs, opts, true); +} + +std::shared_ptr ProcessGroupGloo::Reduce( + std::vector& inputs, + std::vector& outputs, + const ReduceOptions& opts, + bool sync_op) { std::shared_ptr task; auto tag = next_tag(); auto context = get_context(); @@ -497,6 +513,14 @@ std::shared_ptr ProcessGroupGloo::Scatter( std::vector& in_tensors, std::vector& out_tensors, const ScatterOptions& opts) { + return Scatter(in_tensors, out_tensors, opts, true); +} + +std::shared_ptr ProcessGroupGloo::Scatter( + std::vector& in_tensors, + std::vector& out_tensors, + const ScatterOptions& opts, + bool sync_op) { std::shared_ptr task; auto tag = next_tag(); auto context = get_context(); diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index d7412a1975..c8959e399a 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -113,6 +113,12 @@ class ProcessGroupGloo : public ProcessGroup { std::vector& outputs, const BroadcastOptions& = BroadcastOptions()) override; + std::shared_ptr Broadcast( + std::vector& inputs, + std::vector& outputs, + const BroadcastOptions& opts, + bool sync_op) override; + std::shared_ptr AllReduce( std::vector& inputs, std::vector& outputs, @@ -131,11 +137,23 @@ class ProcessGroupGloo : public ProcessGroup { std::vector& in_tensors, std::vector& out_tensors) override; + std::shared_ptr Reduce( + std::vector& in_tensors, + std::vector& out_tensors, + const ReduceOptions& opts, + bool sync_op) override; + std::shared_ptr Reduce( std::vector& in_tensors, std::vector& out_tensors, const ReduceOptions& opts) override; + std::shared_ptr Scatter( + std::vector& in_tensors, + std::vector& out_tensors, + const ScatterOptions&, + bool sync_op) override; + std::shared_ptr Scatter( std::vector& in_tensors, std::vector& out_tensors, diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index 2bce21be7c..3612d00904 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -27,31 +27,33 @@ from paddle.distributed.fleet.dataset import InMemoryDataset # noqa: F401 from paddle.distributed.fleet.dataset import QueueDataset # noqa: F401 from paddle.distributed.fleet.base.topology import ParallelMode # noqa: F401 -from .collective import broadcast # noqa: F401 -from .collective import all_reduce # noqa: F401 -from .collective import reduce # noqa: F401 from .collective import all_gather # noqa: F401 from .collective import all_gather_object # noqa: F401 -from .collective import scatter # noqa: F401 from .collective import barrier # noqa: F401 -from .collective import ReduceOp # noqa: F401 from .collective import split # noqa: F401 from .collective import new_group # noqa: F401 -from .collective import alltoall # noqa: F401 -from .collective import recv # noqa: F401 -from .collective import get_group # noqa: F401 -from .collective import send # noqa: F401 from .collective import wait # noqa: F401 -from .collective import is_initialized # noqa: F401 -from .collective import destroy_process_group # noqa: F401 -from .collective import alltoall_single # noqa: F401 -from .collective import isend # noqa: F401 -from .collective import irecv # noqa: F401 -from .collective import batch_isend_irecv # noqa: F401 -from .collective import P2POp # noqa: F401 -from .collective import reduce_scatter # noqa: F401 -from .communication import stream # noqa: F401 +from .communication import ( + stream, + ReduceOp, + all_reduce, + alltoall, + alltoall_single, + broadcast, + reduce, + send, + scatter, + isend, + recv, + irecv, + batch_isend_irecv, + P2POp, + reduce_scatter, + is_initialized, + destroy_process_group, + get_group, +) # noqa: F401 from .auto_parallel import shard_op # noqa: F401 from .auto_parallel import shard_tensor # noqa: F401 @@ -109,5 +111,4 @@ __all__ = [ # noqa "irecv", "reduce_scatter", "rpc", - "stream", ] diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 6825dae045..4923a531f9 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -17,7 +17,6 @@ import os import pickle import io import datetime -import time from ..fluid.layer_helper import LayerHelper from ..fluid.framework import in_dygraph_mode from ..fluid.framework import _non_static_mode @@ -26,7 +25,6 @@ from ..fluid.layers.tensor import fill_constant import paddle import paddle.fluid.core as core from paddle import _legacy_C_ops -import contextlib from .fleet.layers.mpu.mp_ops import split # noqa: F401 from .fleet.layers.mpu.mp_ops import _c_identity # noqa: F401 from .fleet.layers.mpu.mp_ops import _c_concat # noqa: F401 @@ -39,9 +37,7 @@ from .fleet.layers.mpu.mp_ops import _c_softmax_with_cross_entropy # noqa: F401 from .fleet.layers.mpu.mp_ops import _linear # noqa: F401 from .fleet.layers.mpu.mp_ops import _parallel_linear # noqa: F401 from .fleet.layers.mpu.mp_ops import _parallel_embedding # noqa: F401 -from .communication.group import Group, _add_new_group -from .communication.all_reduce import all_reduce # noqa: F401 -from .communication.reduce import _get_reduce_op, ReduceOp +from .communication.group import Group, _add_new_group, is_initialized __all__ = [] @@ -144,30 +140,6 @@ def _new_ring_id(): return len(_get_group_map()) + max(_get_global_env().nrings, 9) -def get_group(id=0): - """ - - Get group instance by group id. - - Args: - id (int): the group id. Default value is 0. - - Returns: - Group: the group instance. - - Examples: - .. code-block:: python - - ... - gid = paddle.distributed.new_group([2,4,6]) - paddle.distributed.get_group(gid.id) - - """ - - gm = _get_group_map() - return gm[id] if id in gm else None - - def _new_process_group_impl( backend, store, @@ -298,45 +270,6 @@ def _set_custom_gid(gid): _custom_gid = gid -def _barrier_by_tcp_store(group_name, store, timeout): - global_rank = paddle.distributed.get_rank() - global_world_size = paddle.distributed.get_world_size() - - if global_world_size < 2: - return - - barrier_prefix = "Barrier/" + group_name + "/" - is_master = global_rank == 0 - - def _check_keys_ready(wait_keys): - start_time = time.time() - while len(wait_keys) > 0: - time.sleep(0.1) - elapse_time = time.time() - start_time - if datetime.timedelta(seconds=elapse_time) > timeout: - raise RuntimeError( - "Timeout while initializing process group {}." - "Keys {} are not ready sinck rank {} is waiting them." - "Two reason may cause this error:\n 1. The create process group api should be called by all ranks.\n" - " 2. Try to increase the waiting time.\n".format( - group_name, wait_keys, global_rank - ) - ) - wait_keys = list( - filter(lambda key: int(store.get(key)) != 1, wait_keys) - ) - - # all the workers set their exiting key and exit - # the master will wait for all workers' exiting key, ensure to exit in the end - if is_master: - wait_keys = [ - barrier_prefix + str(rank) for rank in range(1, global_world_size) - ] - _check_keys_ready(wait_keys) - else: - store.add(barrier_prefix + str(global_rank), 1) - - def new_group(ranks=None, backend=None, timeout=_default_timeout): """ @@ -479,77 +412,6 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): return gp -def is_initialized(): - """ - - Check whether the distributed environment has been initialized - - Returns: - `True` if distributed environment has been initialized, otherwise `False`. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - - print(paddle.distributed.is_initialized()) - # False - - paddle.distributed.init_parallel_env() - print(paddle.distributed.is_initialized()) - # True - - """ - global _group_map_by_name - return _default_group_name in _group_map_by_name - - -def destroy_process_group(group=None): - """ - Destroy a given group for communication - - Args: - group (ProcessGroup, optional): The group to be destroyed. All of process groups, including - the default group, will be destroyed and the distributed - environment will be deinitialized. - - Returns : None - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - group = dist.new_group([0, 1]) - - dist.destroy_process_group(group) - print(dist.is_initialized()) - # True - dist.destroy_process_group() - print(dist.is_initialized()) - # False - - """ - global _group_map - global _group_map_by_name - - pg = _get_default_group() if group is None else group - assert _group_map.get(pg.id, None) is not None, "Invalid group." - - if group is None: - _group_map.clear() - _group_map_by_name.clear() - _group_map_backend.clear() - else: - del _group_map[pg.id] - del _group_map_by_name[pg.name] - del _group_map_backend[pg] - - def wait(tensor, group=None, use_calc_stream=True): """ @@ -620,256 +482,6 @@ def _sync_comm_stream(tensor, ring_id=0): ) -def broadcast(tensor, src, group=None, sync_op=True): - """ - - Broadcast a tensor from the source to all others. - As shown below, one process is started with a GPU and GPU0 owns data 0. Through broadcast operator, - data 0 will be sent to all GPUs from GPU0. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/broadcast.png - :width: 800 - :alt: broadcast - :align: center - - Args: - tensor (Tensor): The Tensor to send if current rank is the source, or the Tensor to receive otherwise. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - src (int): The source rank. - group (Group, optional): The group instance return by new_group or None for global default group. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) - else: - data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) - dist.broadcast(data, src=1) - print(data) - # [[1, 2, 3], [1, 2, 3]] (2 GPUs) - """ - - if group is not None and not group.is_member(): - return - - if not isinstance(src, int): - raise ValueError("src should be int.") - - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - gsrc = group.get_group_rank(src) - assert gsrc >= 0, "src rank out of group, need global rank" - task = group.process_group.broadcast(tensor, gsrc) - if sync_op: - task.wait() - return None - else: - return task - - use_calc_stream = sync_op - ring_id = ring_id = 0 if group is None else group.id - gsrc = src if group is None else group.get_group_rank(src) - assert gsrc >= 0, "src rank out of group, need global rank" - - if _non_static_mode(): - return _legacy_C_ops.c_broadcast( - tensor, - tensor, - 'root', - gsrc, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - ) - - op_type = 'c_broadcast' - check_variable_and_dtype( - tensor, - 'tensor', - [ - 'float16', - 'float32', - 'float64', - 'int32', - 'int64', - 'int8', - 'uint8', - 'bool', - ], - 'broadcast', - ) - - helper = LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - inputs={'X': [tensor]}, - outputs={'Out': [tensor]}, - attrs={ - 'root': gsrc, - 'use_calc_stream': use_calc_stream, - 'ring_id': ring_id, - }, - ) - - -def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True): - """ - - Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented - by its group rank. The destination of the reduce operator is GPU0 and the process is sum. Through reduce operator, - the GPU0 will owns the sum of all data from all GPUs. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/reduce.png - :width: 800 - :alt: reduce - :align: center - - Args: - tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - dst (int): The destination rank id. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. - group (Group, optional): The group instance return by new_group or None for global default group. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) - else: - data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) - dist.reduce(data, dst=0) - print(data) - # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0) - # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1) - """ - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - op_type = _get_reduce_op(op, "reduce") - group = _get_default_group() if group is None else group - gdst = group.get_group_rank(dst) - assert gdst >= 0, "dst rank out of group, need global rank" - task = group.process_group.reduce(tensor, gdst, op_type) - if sync_op: - task.wait() - return None - else: - return task - - use_calc_stream = sync_op - ring_id = 0 if group is None else group.id - gdst = dst if group is None else group.get_group_rank(dst) - assert gdst >= 0, "dst rank out of group, need global rank" - - if _non_static_mode(): - if op == ReduceOp.SUM: - return _legacy_C_ops.c_reduce_sum( - tensor, - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'root_id', - gdst, - ) - elif op == ReduceOp.MAX: - return _legacy_C_ops.c_reduce_max( - tensor, - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'root_id', - gdst, - ) - elif op == ReduceOp.MIN: - return _legacy_C_ops.c_reduce_min( - tensor, - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'root_id', - gdst, - ) - elif op == ReduceOp.PROD: - return _legacy_C_ops.c_reduce_prod( - tensor, - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'root_id', - gdst, - ) - else: - raise ValueError("Unknown parameter: {}.".format(op)) - - op_type = 'c_reduce' - check_variable_and_dtype( - tensor, - 'tensor', - [ - 'float16', - 'float32', - 'float64', - 'int32', - 'int64', - 'int8', - 'uint8', - 'bool', - ], - 'reduce', - ) - - if op == ReduceOp.SUM: - op_type = 'c_reduce_sum' - elif op == ReduceOp.MAX: - op_type = 'c_reduce_max' - elif op == ReduceOp.MIN: - op_type = 'c_reduce_min' - elif op == ReduceOp.PROD: - op_type = 'c_reduce_prod' - - helper = LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - inputs={'X': [tensor]}, - outputs={'Out': [tensor]}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': use_calc_stream, - 'root_id': gdst, - }, - ) - - def all_gather(tensor_list, tensor, group=None, sync_op=True): """ @@ -1090,873 +702,3 @@ def all_gather_object(object_list, obj, group=None): object_list.append( _convert_tensor_to_object(tensor, list_len_of_tensor[i]) ) - - -def scatter(tensor, tensor_list=None, src=0, group=None, sync_op=True): - """ - - Scatter a tensor to all participators. As shown below, one process is started with a GPU and the source of the scatter - is GPU0. Through scatter operator, the data in GPU0 will be sent to all GPUs averagely. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/scatter.png - :width: 800 - :alt: scatter - :align: center - - Args: - tensor (Tensor): The output Tensor. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - tensor_list (list|tuple): A list/tuple of Tensors to scatter. Every element in the list must be a Tensor whose data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. Default value is None. - src (int): The source rank id. Default value is 0. - group (Group, optional): The group instance return by new_group or None for global default group. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data1 = paddle.to_tensor([7, 8, 9]) - data2 = paddle.to_tensor([10, 11, 12]) - dist.scatter(data1, src=1) - else: - data1 = paddle.to_tensor([1, 2, 3]) - data2 = paddle.to_tensor([4, 5, 6]) - dist.scatter(data1, tensor_list=[data1, data2], src=1) - print(data1, data2) - # [1, 2, 3] [10, 11, 12] (2 GPUs, out for rank 0) - # [4, 5, 6] [4, 5, 6] (2 GPUs, out for rank 1) - """ - if group is not None and not group.is_member(): - return - - if not isinstance(src, int): - raise ValueError("src should be int.") - - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - gsrc = group.get_group_rank(src) - rank = group.rank - nranks = group.nranks - else: - ring_id = 0 if group is None else group.id - gsrc = src if group is None else group.get_group_rank(src) - rank = _get_global_group().rank if group is None else group.rank - nranks = _get_global_group().nranks if group is None else group.nranks - assert gsrc >= 0, "src rank out of group, need global rank" - - if rank != gsrc: - tensor_list = [] - for _ in range(nranks): - tensor_list.append(tensor) - temp = paddle.concat(tensor_list, axis=0) - if in_dygraph_mode(): - task = group.process_group.scatter(temp, tensor, gsrc) - if sync_op: - task.wait() - return None - else: - return task - - use_calc_stream = sync_op - if _non_static_mode(): - return _legacy_C_ops.c_scatter( - temp, - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'nranks', - nranks, - 'root', - gsrc, - ) - op_type = 'c_scatter' - check_variable_and_dtype( - tensor, - 'tensor', - [ - 'float16', - 'float32', - 'float64', - 'int32', - 'int64', - 'int8', - 'uint8', - 'bool', - ], - 'scatter', - ) - helper = LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - inputs={'X': [temp]}, - outputs={'Out': [tensor]}, - attrs={ - 'ring_id': ring_id, - 'root': gsrc, - 'use_calc_stream': use_calc_stream, - 'nranks': nranks, - }, - ) - - -def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True): - """ - Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list. - As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1. - Through alltoall operator, the 0_0 in GPU0 will be sent to GPU0 and 0_1 to GPU1, 1_0 in GPU1 sent to GPU0 and 1_1 to GPU1. - Finally the out_tensor_list in GPU0 includes 0_0 and 1_0, and GPU1 includes 0_1 and 1_1. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/alltoall.png - :width: 800 - :alt: alltoall - :align: center - - Args: - in_tensor_list (list): A list of input Tensors. Every element in the list must be a Tensor whose data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - out_tensor_list (list): A list of output Tensors. The data type of its elements should be the same as the - data type of the input Tensors. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - out_tensor_list = [] - if dist.get_rank() == 0: - data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]]) - data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]]) - else: - data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]]) - data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]]) - dist.alltoall([data1, data2], out_tensor_list) - print(out_tensor_list) - # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) - # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) - """ - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - assert backend != 'gloo', "backend gloo is not supported yet" - else: - ring_id = 0 if group is None else group.id - - temp = paddle.concat(in_tensor_list, axis=0) - nranks = len(in_tensor_list) - if in_dygraph_mode(): - if len(out_tensor_list) == 0: - tensor_shape = list(in_tensor_list[0].shape) - tensor_shape[0] *= nranks - out = paddle.empty(tensor_shape, in_tensor_list[0].dtype) - else: - out = paddle.concat(out_tensor_list, axis=0) - task = group.process_group.alltoall(temp, out) - task.wait() - out_tensor_list.clear() - out_tensor_list.extend(paddle.split(out, nranks, 0)) - return - - use_calc_stream = sync_op - if _non_static_mode(): - out = _legacy_C_ops.alltoall( - temp, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id - ) - else: - op_type = 'alltoall' - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference( - dtype=in_tensor_list[0].dtype - ) - - if not isinstance(in_tensor_list, list): - raise ValueError( - "The type of 'in_tensor_list' for all_to_all " "should be list." - ) - for elem in in_tensor_list: - check_variable_and_dtype( - elem, - 'in_tensor_list', - ['float16', 'float32', 'float64', 'int32', 'int64'], - 'all_to_all', - ) - if not isinstance(out_tensor_list, list): - raise ValueError( - "The type of 'out_tensor_list' for all_to_all " - "should be list." - ) - if len(out_tensor_list) != 0: - raise ValueError( - "The 'out_tensor_list' for all_to_all " "must be an empty list." - ) - helper.append_op( - type=op_type, - inputs={'X': [temp]}, - outputs={'Out': [out]}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': use_calc_stream, - }, - ) - out_tensor_list.extend(paddle.split(out, nranks, 0)) - - -def alltoall_single( - in_tensor, - out_tensor, - in_split_sizes=None, - out_split_sizes=None, - group=None, - sync_op=True, -): - """ - Scatter a single input tensor to all participators and gather the received tensors in out_tensor. - - Note: - ``alltoall_single`` is only supported in eager mode. - - Args: - in_tensor (Tensor): Input tensor. The data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - out_tensor (Tensor): Output Tensor. The data type should be the same as the data type of the input Tensor. - in_split_sizes (list[int], optional): Split sizes of ``in_tensor`` for dim[0]. If not given, dim[0] of ``in_tensor`` - must be divisible by group size and ``in_tensor`` will be scattered averagely to all participators. Default: None. - out_split_sizes (list[int], optional): Split sizes of ``out_tensor`` for dim[0]. If not given, dim[0] of ``out_tensor`` - must be divisible by group size and ``out_tensor`` will be gathered averagely from all participators. Default: None. - group (Group, optional): The group instance return by ``new_group`` or None for global default group. Default: None. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - None, if ``sync_op`` is set to ``True``; ``Task`` of ``group``, if ``sync_op`` is set to ``False``. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - rank = dist.get_rank() - size = dist.get_world_size() - - # case 1 (2 GPUs) - data = paddle.arange(2, dtype='int64') + rank * 2 - # data for rank 0: [0, 1] - # data for rank 1: [2, 3] - output = paddle.empty([2], dtype='int64') - dist.alltoall_single(data, output) - print(output) - # output for rank 0: [0, 2] - # output for rank 1: [1, 3] - - # case 2 (2 GPUs) - in_split_sizes = [i + 1 for i in range(size)] - # in_split_sizes for rank 0: [1, 2] - # in_split_sizes for rank 1: [1, 2] - out_split_sizes = [rank + 1 for i in range(size)] - # out_split_sizes for rank 0: [1, 1] - # out_split_sizes for rank 1: [2, 2] - data = paddle.ones([sum(in_split_sizes), size], dtype='float32') * rank - # data for rank 0: [[0., 0.], [0., 0.], [0., 0.]] - # data for rank 1: [[1., 1.], [1., 1.], [1., 1.]] - output = paddle.empty([(rank + 1) * size, size], dtype='float32') - group = dist.new_group([0, 1]) - task = dist.alltoall_single(data, - output, - in_split_sizes, - out_split_sizes, - sync_op=False, - group=group) - task.wait() - print(output) - # output for rank 0: [[0., 0.], [1., 1.]] - # output for rank 1: [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] - - """ - if group is not None and not group.is_member(): - return - - assert in_dygraph_mode(), "Only suppport alltoall_single in eager mode." - # _check_single_tensor - - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - assert backend != 'gloo', "backend gloo is not supported yet" - - in_split_sizes = [] if in_split_sizes is None else in_split_sizes - out_split_sizes = [] if out_split_sizes is None else out_split_sizes - - task = group.process_group.alltoall_single( - in_tensor, out_tensor, in_split_sizes, out_split_sizes - ) - if sync_op: - task.wait() - return - else: - return task - - -def _get_group_rank(global_rank, group=None): - return global_rank if group is None else group.get_group_rank(global_rank) - - -def send(tensor, dst=0, group=None, sync_op=True): - """ - Send a tensor to the receiver. - - Args: - tensor (Tensor): The Tensor to send. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - dst (int): The destination rank id. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([7, 8, 9]) - dist.send(data, dst=1) - else: - data = paddle.to_tensor([1, 2, 3]) - dist.recv(data, src=0) - print(data) - # [7, 8, 9] (2 GPUs) - """ - if group is not None and not group.is_member(): - return - dst = _get_group_rank(dst, group) - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - assert backend != 'gloo', "backend gloo is not supported yet" - task = group.process_group.send(tensor, dst) - if sync_op: - task.wait() - return None - else: - return task - - use_calc_stream = sync_op - ring_id = 0 if group is None else group.id - - if _non_static_mode(): - return _legacy_C_ops.send_v2( - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'peer', - dst, - ) - op_type = 'send_v2' - check_variable_and_dtype( - tensor, - 'tensor', - ['float16', 'float32', 'float64', 'int32', 'int64'], - 'send', - ) - - helper = LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - inputs={'X': [tensor]}, - attrs={ - 'ring_id': ring_id, - 'peer': dst, - 'use_calc_stream': use_calc_stream, - }, - ) - - -def recv(tensor, src=0, group=None, sync_op=True): - """ - Receive a tensor to the sender. - - Args: - tensor (Tensor): The Tensor to receive. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - src (int): The source rank id. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([7, 8, 9]) - dist.send(data, dst=1) - else: - data = paddle.to_tensor([1, 2, 3]) - dist.recv(data, src=0) - print(data) - # [7, 8, 9] (2 GPUs) - """ - if group is not None and not group.is_member(): - return - - src = _get_group_rank(src, group) - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - assert backend != 'gloo', "backend gloo is not supported yet" - task = group.process_group.recv(tensor, src) - if sync_op: - task.wait() - return None - else: - return task - - use_calc_stream = sync_op - ring_id = 0 if group is None else group.id - - if _non_static_mode(): - return _legacy_C_ops.recv_v2( - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'peer', - src, - 'dtype', - tensor.dtype, - 'out_shape', - tensor.shape, - ) - op_type = 'recv_v2' - check_variable_and_dtype( - tensor, - 'tensor', - ['float16', 'float32', 'float64', 'int32', 'int64'], - 'recv', - ) - helper = LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - outputs={'Out': [tensor]}, - attrs={ - 'ring_id': ring_id, - 'peer': src, - 'out_shape': tensor.shape, - 'dtype': tensor.dtype, - 'use_calc_stream': use_calc_stream, - }, - ) - - -def _check_single_tensor(tensor, tensor_name): - if not isinstance(tensor, (core.eager.Tensor, paddle.Tensor)): - raise RuntimeError( - "Invalid function argument. Expected parameter {}" - "to be of type paddle.Tensor, but it's {}".format( - tensor_name, type(tensor) - ) - ) - - -def _check_tensor_list(tensor_list, tensor_name): - if not isinstance(tensor_list, list) or not all( - isinstance(t, (core.eager.Tensor, paddle.Tensor)) for t in tensor_list - ): - raise RuntimeError( - "Invalid function argument. Expected parameter {}" - "to be of type paddle.Tensor".format(tensor_name) - ) - - -def isend(tensor, dst, group=None): - """ - Sends a tensor asynchronously - - Args: - tensor (Tensor): The Tensor to send. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - dst (int): The destination rank. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - - Returns: - A distributed task object. - - Warning: - This API only supports the dygraph mode. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([7, 8, 9]) - task = dist.isend(data, dst=1) - else: - data = paddle.to_tensor([1, 2, 3]) - task = dist.irecv(data, src=0) - task.wait() - print(data) - # [7, 8, 9] (2 GPUs) - - """ - _check_single_tensor(tensor, "tensor") - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - assert backend != 'gloo', "backend gloo is not supported yet" - group_dst_rank = group.get_group_rank(dst) - assert group_dst_rank >= 0, "dst rank out of group, need global rank" - return group.process_group.send(tensor, group_dst_rank) - else: - raise RuntimeError("Only support eager dygraph mode.") - - -def irecv(tensor, src=None, group=None): - """ - Receive a tensor to the sender. - - Args: - tensor (Tensor): The Tensor to receive. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - src (int): The source rank id. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - - Returns: - A distributed task object. - - Warning: - This API only supports the dygraph mode. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([7, 8, 9]) - task = dist.isend(data, dst=1) - else: - data = paddle.to_tensor([1, 2, 3]) - task = dist.irecv(data, src=0) - task.wait() - print(data) - # [7, 8, 9] (2 GPUs) - """ - _check_single_tensor(tensor, "tensor") - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - assert backend != 'gloo', "backend gloo is not supported yet" - group_src_rank = group.get_group_rank(src) - assert group_src_rank >= 0, "src rank out of group, need global rank" - return group.process_group.recv(tensor, group_src_rank) - else: - raise RuntimeError("Only support eager dygraph mode.") - - -class P2POp(object): - """ - A class that makes point-to-point operations for "batch_isend_irecv". - - This class creates the type of P2P operation, communication buffer, peer rank, - Group. Instances of this class will be passed to - ``paddle.distributed.batch_isend_irecv`` for point-to-point communication. - - Args: - op (callable): A function to send data to or receive data from a peer process. - The type of ``op`` is either ``paddle.distributed.isend`` or ``paddle.distributed.irecv``. - tensor (Tensor): Tensor to send or receive. - peer (int): The destination or source rank. - group (Group, optional): The group instance return by new_group or None for global - default group. Default: None. - - """ - - def __init__(self, op, tensor, peer, group=None): - if op not in [isend, irecv]: - raise RuntimeError( - "Invalid ``op`` function. Expected ``op`` " - "to be of type ``paddle.distributed.isend`` or " - "``paddle.distributed.irecv``." - ) - _check_single_tensor(tensor, "tensor") - - self.op = op - self.tensor = tensor - self.peer = peer - self.group = _get_default_group() if group is None else group - - -@contextlib.contextmanager -def _with_batch_p2p_guard(backend): - if backend == "nccl": - core.ProcessGroupNCCL.group_start() - try: - yield - finally: - if backend == "nccl": - core.ProcessGroupNCCL.group_end() - - -def _check_p2p_op_list(p2p_op_list): - """ - Helper to check that the ``p2p_op_list`` is a list of P2POp instances and - all ops use the same backend. - """ - if not isinstance(p2p_op_list, list) or not all( - isinstance(p2p_op, P2POp) for p2p_op in p2p_op_list - ): - raise RuntimeError( - "Invalid ``p2p_op_list``. Each op is expected to " - "to be of type ``paddle.distributed.P2POp``." - ) - - backend = _group_map_backend[p2p_op_list[0].group] - if not all( - backend == _group_map_backend[p2p_op.group] for p2p_op in p2p_op_list - ): - raise RuntimeError("All groups need to use the same backend.") - - -def batch_isend_irecv(p2p_op_list): - """ - Send or Receive a batch of tensors asynchronously and return a list of requests. - - Process each of the point-to-point operations in ``p2p_op_list`` and return the - corresponding tasks. NCCL are currently supported. - - Args: - p2p_op_list (List[P2POp]): A list of point-to-point operations(type of each operator is - ``paddle.distributed.P2POp``). The order of the isend/irecv in the list - matters and it needs to match with corresponding isend/irecv on the - remote end. - - Returns: - A list of distributed tasks returned by calling the corresponding - op in the op_list. - - Warning: - This API only supports the dygraph mode. - - Examples: - .. code-block:: python - - # required: distributed - - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - rank = dist.get_rank() - world_size = dist.get_world_size() - - send_t = paddle.arange(2) + rank - # paddle.tensor([0, 1]) # Rank-0 - # paddle.tensor([1, 2]) # Rank-1 - - recv_t = paddle.empty(shape=[2], dtype=send_t.dtype) - - send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size) - recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size) - - tasks = dist.batch_isend_irecv([send_op, recv_op]) - - for task in tasks: - task.wait() - - print(recv_t) - # paddle.tensor([1, 2]) # Rank-0 - # paddle.tensor([0, 1]) # Rank-1 - """ - _check_p2p_op_list(p2p_op_list) - group = p2p_op_list[0].group - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - tasks = [] - with _with_batch_p2p_guard(backend): - for p2p_op in p2p_op_list: - op = p2p_op.op - tensor = p2p_op.tensor - peer = p2p_op.peer - comm_group = p2p_op.group - task = op(tensor, peer, comm_group) - if task is not None: - tasks.append(task) - return tasks - else: - raise RuntimeError("Don't support static graph mode currently.") - - -def reduce_scatter( - tensor, tensor_list, op=ReduceOp.SUM, group=None, sync_op=True -): - """ - Reduces, then scatters a list of tensors to all processes in a group - - Args: - tensor (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - tensor_list (list[Tensor]): List of tensors to reduce and scatter. Every element in the list must be a Tensor whose data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM. - group (Group, optional): The group instance return by new_group or None for global - default group. Default: None. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - Async task handle, if sync_op is set to False. - None, if sync_op or if not part of the group. - - Warning: - This API only supports the dygraph mode. - - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data1 = paddle.to_tensor([0, 1]) - data2 = paddle.to_tensor([2, 3]) - else: - data1 = paddle.to_tensor([4, 5]) - data2 = paddle.to_tensor([6, 7]) - dist.reduce_scatter(data1, [data1, data2]) - print(data1) - # [4, 6] (2 GPUs, out for rank 0) - # [8, 10] (2 GPUs, out for rank 1) - - """ - _check_single_tensor(tensor, "tensor") - _check_tensor_list(tensor_list, "tensor_list") - - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - op_type = _get_reduce_op(op, "reduce_scatter") - group = _get_default_group() if group is None else group - backend = _group_map_backend[group] - assert backend != 'gloo', "backend gloo is not supported yet" - - temp = paddle.concat(tensor_list, axis=0) - task = group.process_group._reduce_scatter_base(tensor, temp, op_type) - if sync_op: - task.wait() - return None - else: - return task - else: - raise RuntimeError("Don't support static graph mode currently.") - - -def _reduce_scatter_base( - output, input, op=ReduceOp.SUM, group=None, sync_op=True -): - """ - Reduces, then scatters a flattened tensor to all processes in a group. - - Args: - output (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - input (Tensor): Input tensor that is of size output tensor size times world size. Its data type - should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM. - group (ProcessGroup, optional): The process group to work on. If None, - the default process group will be used. - sync_op (bool, optional): Whether this op is a sync op. The default value is True. - - Returns: - Async task handle, if sync_op is set to False. - None, if sync_op or if not part of the group. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - rank = dist.get_rank() - data = paddle.arange(4) + rank - # [0, 1, 2, 3] (2 GPUs, for rank 0) - # [1, 2, 3, 4] (2 GPUs, for rank 1) - output = paddle.empty(shape=[2], dtype=data.dtype) - dist.collective._reduce_scatter_base(output, data) - print(output) - # [1, 3] (2 GPUs, out for rank 0) - # [5, 7] (2 GPUs, out for rank 1) - - """ - _check_single_tensor(output, "output") - _check_single_tensor(input, "input") - - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - op_type = _get_reduce_op(op, "_reduce_scatter_base") - group = _get_default_group() if group is None else group - task = group.process_group._reduce_scatter_base(output, input, op_type) - if sync_op: - task.wait() - return None - else: - return task - else: - raise RuntimeError("Don't support static graph mode currently.") diff --git a/python/paddle/distributed/communication/__init__.py b/python/paddle/distributed/communication/__init__.py index 97043fd7ba..bdd0f99371 100644 --- a/python/paddle/distributed/communication/__init__.py +++ b/python/paddle/distributed/communication/__init__.py @@ -11,3 +11,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from .all_reduce import all_reduce +from .broadcast import broadcast +from .reduce import reduce, ReduceOp +from .send import send, isend +from .recv import recv, irecv +from .scatter import scatter +from .batch_isend_irecv import batch_isend_irecv, P2POp +from .reduce_scatter import reduce_scatter +from .all_to_all import alltoall, alltoall_single +from .group import is_initialized, destroy_process_group, get_group + +__all__ = [ + "ReduceOp", + "all_reduce", + "alltoall", + "alltoall_single", + "broadcast", + "reduce", + "send", + "scatter", + "isend", + "recv", + "irecv", + "batch_isend_irecv", + "P2POp", + "reduce_scatter", + "is_initialized", + "destroy_process_group", + "get_group", +] diff --git a/python/paddle/distributed/communication/all_reduce.py b/python/paddle/distributed/communication/all_reduce.py index 7a09d779e8..50d8fac160 100644 --- a/python/paddle/distributed/communication/all_reduce.py +++ b/python/paddle/distributed/communication/all_reduce.py @@ -14,7 +14,7 @@ import paddle import paddle.fluid.framework as framework -from paddle.distributed.communication import stream as stream +import paddle.distributed.communication.stream as stream from paddle.distributed.communication.reduce import ReduceOp @@ -63,6 +63,8 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): ) # code below will be removed after we remove the old dygraph + if group is not None and not group.is_member(): + return use_calc_stream = sync_op ring_id = 0 if group is None else group.id if op == ReduceOp.SUM: diff --git a/python/paddle/distributed/communication/all_to_all.py b/python/paddle/distributed/communication/all_to_all.py new file mode 100644 index 0000000000..4fbd3d11fc --- /dev/null +++ b/python/paddle/distributed/communication/all_to_all.py @@ -0,0 +1,161 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +import paddle.distributed.communication.stream as stream + + +def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True): + """ + Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list. + As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1. + Through alltoall operator, the 0_0 in GPU0 will be sent to GPU0 and 0_1 to GPU1, 1_0 in GPU1 sent to GPU0 and 1_1 to GPU1. + Finally the out_tensor_list in GPU0 includes 0_0 and 1_0, and GPU1 includes 0_1 and 1_1. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/alltoall.png + :width: 800 + :alt: alltoall + :align: center + + Args: + in_tensor_list (List[Tensor]): List of tensors to scatter one per rank. The data type of each tensor + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + out_tensor_list (List[Tensor]): List of tensors to be gathered one per rank. The data type of each tensor should be the same as the input tensors. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + out_tensor_list = [] + if dist.get_rank() == 0: + data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]]) + data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]]) + else: + data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]]) + data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]]) + dist.alltoall([data1, data2], out_tensor_list) + print(out_tensor_list) + # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) + # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) + """ + if not framework._in_legacy_dygraph(): + return stream.alltoall( + out_tensor_list, in_tensor_list, group, sync_op, False + ) + + # code below will be removed after we remove the old dygraph + if group is not None and not group.is_member(): + return + ring_id = 0 if group is None else group.id + temp = paddle.concat(in_tensor_list, axis=0) + nranks = len(in_tensor_list) + use_calc_stream = sync_op + out = paddle._legacy_C_ops.alltoall( + temp, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id + ) + out_tensor_list.extend(paddle.split(out, nranks, 0)) + + +def alltoall_single( + in_tensor, + out_tensor, + in_split_sizes=None, + out_split_sizes=None, + group=None, + sync_op=True, +): + """ + Scatter a single input tensor to all participators and gather the received tensors in out_tensor. + + Note: + ``alltoall_single`` is only supported in eager mode. + + Args: + in_tensor (Tensor): Input tensor. The data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + out_tensor (Tensor): Output Tensor. The data type should be the same as the data type of the input Tensor. + in_split_sizes (list[int], optional): Split sizes of ``in_tensor`` for dim[0]. If not given, dim[0] of ``in_tensor`` + must be divisible by group size and ``in_tensor`` will be scattered averagely to all participators. Default: None. + out_split_sizes (list[int], optional): Split sizes of ``out_tensor`` for dim[0]. If not given, dim[0] of ``out_tensor`` + must be divisible by group size and ``out_tensor`` will be gathered averagely from all participators. Default: None. + group (Group, optional): The group instance return by ``new_group`` or None for global default group. Default: None. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + rank = dist.get_rank() + size = dist.get_world_size() + + # case 1 (2 GPUs) + data = paddle.arange(2, dtype='int64') + rank * 2 + # data for rank 0: [0, 1] + # data for rank 1: [2, 3] + output = paddle.empty([2], dtype='int64') + dist.alltoall_single(data, output) + print(output) + # output for rank 0: [0, 2] + # output for rank 1: [1, 3] + + # case 2 (2 GPUs) + in_split_sizes = [i + 1 for i in range(size)] + # in_split_sizes for rank 0: [1, 2] + # in_split_sizes for rank 1: [1, 2] + out_split_sizes = [rank + 1 for i in range(size)] + # out_split_sizes for rank 0: [1, 1] + # out_split_sizes for rank 1: [2, 2] + data = paddle.ones([sum(in_split_sizes), size], dtype='float32') * rank + # data for rank 0: [[0., 0.], [0., 0.], [0., 0.]] + # data for rank 1: [[1., 1.], [1., 1.], [1., 1.]] + output = paddle.empty([(rank + 1) * size, size], dtype='float32') + group = dist.new_group([0, 1]) + task = dist.alltoall_single(data, + output, + in_split_sizes, + out_split_sizes, + sync_op=False, + group=group) + task.wait() + print(output) + # output for rank 0: [[0., 0.], [1., 1.]] + # output for rank 1: [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] + + """ + if not framework._in_legacy_dygraph(): + return stream.alltoall_single( + out_tensor, + in_tensor, + out_split_sizes, + in_split_sizes, + group, + sync_op, + False, + ) diff --git a/python/paddle/distributed/communication/batch_isend_irecv.py b/python/paddle/distributed/communication/batch_isend_irecv.py new file mode 100644 index 0000000000..073ccb0b41 --- /dev/null +++ b/python/paddle/distributed/communication/batch_isend_irecv.py @@ -0,0 +1,177 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import paddle.distributed as dist +import paddle.fluid.core as core +import paddle.fluid.framework as framework +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, +) + + +class P2POp(object): + """ + A class that makes point-to-point operations for "batch_isend_irecv". + + This class creates the type of P2P operation, communication buffer, peer rank, + Group. Instances of this class will be passed to + ``paddle.distributed.batch_isend_irecv`` for point-to-point communication. + + Args: + op (callable): A function to send data to or receive data from a peer process. + The type of ``op`` is either ``paddle.distributed.isend`` or ``paddle.distributed.irecv``. + tensor (Tensor): Tensor to send or receive. + peer (int): The destination or source rank. + group (Group, optional): The group instance return by new_group or None for global + default group. Default: None. + + Examples: + .. code-block:: python + + # required: distributed + + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + rank = dist.get_rank() + world_size = dist.get_world_size() + + send_t = paddle.arange(2) + rank + # paddle.tensor([0, 1]) # Rank-0 + # paddle.tensor([1, 2]) # Rank-1 + + recv_t = paddle.empty(shape=[2], dtype=send_t.dtype) + + send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size) + recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size) + + """ + + def __init__(self, op, tensor, peer, group=None): + if op not in [dist.isend, dist.irecv]: + raise RuntimeError( + "Invalid ``op`` function. Expected ``op`` " + "to be of type ``paddle.distributed.isend`` or " + "``paddle.distributed.irecv``." + ) + + self.op = op + self.tensor = tensor + self.peer = peer + self.group = _get_global_group() if group is None else group + + +@contextlib.contextmanager +def _with_batch_p2p_guard(backend): + if backend == "NCCL": + core.ProcessGroupNCCL.group_start() + try: + yield + finally: + if backend == "NCCL": + core.ProcessGroupNCCL.group_end() + + +def _check_p2p_op_list(p2p_op_list): + """ + Helper to check that the ``p2p_op_list`` is a list of P2POp instances and + all ops use the same backend. + """ + if not isinstance(p2p_op_list, list) or not all( + isinstance(p2p_op, P2POp) for p2p_op in p2p_op_list + ): + raise RuntimeError( + "Invalid ``p2p_op_list``. Each op is expected to " + "to be of type ``paddle.distributed.P2POp``." + ) + + backend = p2p_op_list[0].group.backend + if not all(backend == p2p_op.group.backend for p2p_op in p2p_op_list): + raise RuntimeError("All groups need to use the same backend.") + + +def batch_isend_irecv(p2p_op_list): + """ + Send or Receive a batch of tensors asynchronously and return a list of requests. + + Process each of the point-to-point operations in ``p2p_op_list`` and return the + corresponding tasks. NCCL are currently supported. + + Args: + p2p_op_list (List[P2POp]): A list of point-to-point operations(type of each operator is + ``paddle.distributed.P2POp``). The order of the isend/irecv in the list + matters and it needs to match with corresponding isend/irecv on the + remote end. + + Returns: + A list of distributed tasks returned by calling the corresponding + op in the op_list. + + Warning: + This API only supports the dygraph mode. + + Examples: + .. code-block:: python + + # required: distributed + + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + rank = dist.get_rank() + world_size = dist.get_world_size() + + send_t = paddle.arange(2) + rank + # paddle.tensor([0, 1]) # Rank-0 + # paddle.tensor([1, 2]) # Rank-1 + + recv_t = paddle.empty(shape=[2], dtype=send_t.dtype) + + send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size) + recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size) + + tasks = dist.batch_isend_irecv([send_op, recv_op]) + + for task in tasks: + task.wait() + + print(recv_t) + # paddle.tensor([1, 2]) # Rank-0 + # paddle.tensor([0, 1]) # Rank-1 + """ + _check_p2p_op_list(p2p_op_list) + group = p2p_op_list[0].group + if _warn_cur_rank_not_in_group(group): + return + + if framework.in_dygraph_mode(): + group = _get_global_group() if group is None else group + backend = group.backend + tasks = [] + with _with_batch_p2p_guard(backend): + for p2p_op in p2p_op_list: + op = p2p_op.op + tensor = p2p_op.tensor + peer = p2p_op.peer + comm_group = p2p_op.group + task = op(tensor, peer, comm_group) + if task is not None: + tasks.append(task) + return tasks + else: + raise RuntimeError("Don't support static graph mode currently.") diff --git a/python/paddle/distributed/communication/broadcast.py b/python/paddle/distributed/communication/broadcast.py new file mode 100644 index 0000000000..cf8e605eca --- /dev/null +++ b/python/paddle/distributed/communication/broadcast.py @@ -0,0 +1,85 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +import paddle.distributed.communication.stream as stream + + +def broadcast(tensor, src, group=None, sync_op=True): + """ + + Broadcast a tensor from the source to all others. + As shown below, one process is started with a GPU and GPU0 owns data 0. Through broadcast operator, + data 0 will be sent to all GPUs from GPU0. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/broadcast.png + :width: 800 + :alt: broadcast + :align: center + + Args: + tensor (Tensor): The tensor to send if current rank is the source, or the tensor to receive otherwise. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + src (int): The source rank in global view. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) + else: + data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) + dist.broadcast(data, src=1) + print(data) + # [[1, 2, 3], [1, 2, 3]] (2 GPUs) + """ + if not framework._in_legacy_dygraph(): + return stream.broadcast( + tensor, + src, + group=group, + sync_op=sync_op, + use_calc_stream=False, + ) + + # code below will be removed after we remove the old dygraph + if group is not None and not group.is_member(): + return + use_calc_stream = sync_op + ring_id = 0 if group is None else group.id + + gsrc = src if group is None else group.get_group_rank(src) + assert gsrc >= 0, "src rank out of group, need global rank" + + return paddle._legacy_C_ops.c_broadcast( + tensor, + tensor, + 'root', + gsrc, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + ) diff --git a/python/paddle/distributed/communication/group.py b/python/paddle/distributed/communication/group.py index 60f1264fe8..8a2c9304aa 100644 --- a/python/paddle/distributed/communication/group.py +++ b/python/paddle/distributed/communication/group.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import warnings +import paddle.distributed as dist + class Group: """ @@ -50,6 +53,10 @@ class Group: def world_size(self): return self._world_size + @property + def backend(self): + return self._pg.name() + @property def id(self): return self._id @@ -94,3 +101,129 @@ def _add_new_group(group): "The group with id {} already exist.".format(group.id) ) _GroupManager.group_map_by_id[group.id] = group + + +def _is_global_group(group): + return group.id == _GroupManager.global_group_id + + +def _warn_cur_rank_not_in_group(group): + global_rank = dist.get_rank() + if group and not group.is_member(): + warnings.warn( + "Current global rank {} is not in group {}".format( + global_rank, group.name + ) + ) + return True + return False + + +def _get_or_throw_group_rank(global_rank, group): + group_rank = group.get_group_rank(global_rank) + assert ( + group_rank >= 0 + ), "The input rank {} can not be found inside the group {}".format( + global_rank, group.name + ) + return group_rank + + +def is_initialized(): + """ + + Check whether the distributed environment has been initialized + + Returns: + `True` if distributed environment has been initialized, otherwise `False`. + + Warning: + This API only supports the dygraph mode. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + + print(paddle.distributed.is_initialized()) + # False + + paddle.distributed.init_parallel_env() + print(paddle.distributed.is_initialized()) + # True + + """ + return _GroupManager.global_group_id in _GroupManager.group_map_by_id + + +def destroy_process_group(group=None): + """ + Destroy a given group for communication + + Args: + group (Group, optional): The group to be destroyed. All of process groups, including + the default group, will be destroyed and the distributed + environment will be deinitialized. + + Returns : None + + Warning: + This API only supports the dygraph mode. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + group = dist.new_group([0, 1]) + + dist.destroy_process_group(group) + print(dist.is_initialized()) + # True + dist.destroy_process_group() + print(dist.is_initialized()) + # False + + """ + group = _get_global_group() if group is None else group + assert ( + group.id in _GroupManager.group_map_by_id + ), "Destroy group with id {} is invalid.".format(group.id) + if _is_global_group(group): + _GroupManager.group_map_by_id.clear() + else: + del _GroupManager.group_map_by_id[group.id] + + +def get_group(id=0): + """ + + Get group instance by group id. + + Args: + id (int): the group id. Default value is 0. + + Returns: + Group: the group instance. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + gid = paddle.distributed.new_group([2,4,6]) + paddle.distributed.get_group(gid.id) + + """ + + if id in _GroupManager.group_map_by_id: + return _GroupManager.group_map_by_id[id] + warnings.warn("Group {} is not initialized.".format(id)) + return None diff --git a/python/paddle/distributed/communication/recv.py b/python/paddle/distributed/communication/recv.py new file mode 100644 index 0000000000..93196ce5f2 --- /dev/null +++ b/python/paddle/distributed/communication/recv.py @@ -0,0 +1,111 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +import paddle.distributed.communication.stream as stream + + +def recv(tensor, src=0, group=None, sync_op=True): + """ + Receive a tensor to the sender. + + Args: + tensor (Tensor): The tensor to receive. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + src (int): The source rank id. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([7, 8, 9]) + dist.send(data, dst=1) + else: + data = paddle.to_tensor([1, 2, 3]) + dist.recv(data, src=0) + print(data) + # [7, 8, 9] (2 GPUs) + """ + if not framework._in_legacy_dygraph(): + return stream.recv( + tensor, src=src, group=group, sync_op=sync_op, use_calc_stream=False + ) + + # code below will be removed after we remove the old dygraph + if group is not None and not group.is_member(): + return + use_calc_stream = sync_op + gsrc = src if group is None else group.get_group_rank(src) + ring_id = 0 if group is None else group.id + return paddle._legacy_C_ops.recv_v2( + tensor, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + 'peer', + src, + 'dtype', + tensor.dtype, + 'out_shape', + tensor.shape, + ) + + +def irecv(tensor, src=None, group=None): + """ + Receive a tensor to the sender. + + Args: + tensor (Tensor): The Tensor to receive. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + src (int): The source rank id. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + + Returns: + Return a task object. + + Warning: + This API only supports the dygraph mode. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([7, 8, 9]) + task = dist.isend(data, dst=1) + else: + data = paddle.to_tensor([1, 2, 3]) + task = dist.irecv(data, src=0) + task.wait() + print(data) + # [7, 8, 9] (2 GPUs) + """ + return recv(tensor, src, group, sync_op=False) diff --git a/python/paddle/distributed/communication/reduce.py b/python/paddle/distributed/communication/reduce.py index 8628e83b62..696daf23e1 100644 --- a/python/paddle/distributed/communication/reduce.py +++ b/python/paddle/distributed/communication/reduce.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import paddle import paddle.fluid.framework as framework import paddle.fluid.core as core +import paddle.distributed.communication.stream as stream class ReduceOp: @@ -66,12 +68,121 @@ def _get_reduce_op(reduce_op, func_name): return core.ReduceOp.PRODUCT else: if reduce_op == ReduceOp.SUM: - return 'c_allreduce_sum' + return 'c_{}_sum'.format(func_name) elif reduce_op == ReduceOp.MAX: - return 'c_allreduce_max' + return 'c_{}_max'.format(func_name) elif reduce_op == ReduceOp.MIN: - return 'c_allreduce_min' + return 'c_{}_min'.format(func_name) elif reduce_op == ReduceOp.PROD: - return 'c_allreduce_prod' + return 'c_{}_prod'.format(func_name) + else: + return 'c_{}'.format(func_name) raise ValueError("Unknown reduce_op type for {}.".format(func_name)) + + +def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True): + """ + + Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented + by its group rank. The destination of the reduce operator is GPU0 and the process is sum. Through reduce operator, + the GPU0 will owns the sum of all data from all GPUs. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/reduce.png + :width: 800 + :alt: reduce + :align: center + + Args: + tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + dst (int): The destination rank id. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) + else: + data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) + dist.reduce(data, dst=0) + print(data) + # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0) + # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1) + """ + + if not framework._in_legacy_dygraph(): + return stream.reduce( + tensor, + dst=dst, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False, + ) + + # code below will be removed after we remove the old dygraph + if group is not None and not group.is_member(): + return + use_calc_stream = sync_op + ring_id = 0 if group is None else group.id + gdst = dst if group is None else group.get_group_rank(dst) + assert gdst >= 0, "dst rank out of group, need global rank" + + if op == ReduceOp.SUM: + return paddle._legacy_C_ops.c_reduce_sum( + tensor, + tensor, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + 'root_id', + gdst, + ) + elif op == ReduceOp.MAX: + return paddle._legacy_C_ops.c_reduce_max( + tensor, + tensor, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + 'root_id', + gdst, + ) + elif op == ReduceOp.MIN: + return paddle._legacy_C_ops.c_reduce_min( + tensor, + tensor, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + 'root_id', + gdst, + ) + elif op == ReduceOp.PROD: + return paddle._legacy_C_ops.c_reduce_prod( + tensor, + tensor, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + 'root_id', + gdst, + ) + else: + raise ValueError("Unknown parameter: {}.".format(op)) diff --git a/python/paddle/distributed/communication/reduce_scatter.py b/python/paddle/distributed/communication/reduce_scatter.py new file mode 100644 index 0000000000..0b01b05dc4 --- /dev/null +++ b/python/paddle/distributed/communication/reduce_scatter.py @@ -0,0 +1,122 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid.framework as framework +import paddle.distributed.communication.stream as stream +from paddle.distributed.communication.reduce import ReduceOp +from paddle.distributed.communication.stream.reduce_scatter import ( + _reduce_scatter_base as _reduce_scatter_base_stream, +) + + +def reduce_scatter( + tensor, tensor_list, op=ReduceOp.SUM, group=None, sync_op=True +): + """ + Reduces, then scatters a list of tensors to all processes in a group + + Args: + tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support + float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type. + tensor_list (List[Tensor]]): List of tensors to reduce and scatter. Every element in the list must be a Tensor whose data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. + group (Group, optional): Communicate in which group. If none is given, use the global group as default. + sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. + + Returns: + Return a task object. + + Warning: + This API only supports the dygraph mode. + + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data1 = paddle.to_tensor([0, 1]) + data2 = paddle.to_tensor([2, 3]) + else: + data1 = paddle.to_tensor([4, 5]) + data2 = paddle.to_tensor([6, 7]) + dist.reduce_scatter(data1, [data1, data2]) + print(data1) + # [4, 6] (2 GPUs, out for rank 0) + # [8, 10] (2 GPUs, out for rank 1) + + """ + if not framework._in_legacy_dygraph(): + return stream.reduce_scatter( + tensor, + tensor_list, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False, + ) + + +def _reduce_scatter_base( + output, input, op=ReduceOp.SUM, group=None, sync_op=True +): + """ + Reduces, then scatters a flattened tensor to all processes in a group. + + Args: + output (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + input (Tensor): Input tensor that is of size output tensor size times world size. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM. + group (ProcessGroup, optional): The process group to work on. If None, + the default process group will be used. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + Async task handle, if sync_op is set to False. + None, if sync_op or if not part of the group. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + rank = dist.get_rank() + data = paddle.arange(4) + rank + # [0, 1, 2, 3] (2 GPUs, for rank 0) + # [1, 2, 3, 4] (2 GPUs, for rank 1) + output = paddle.empty(shape=[2], dtype=data.dtype) + dist.collective._reduce_scatter_base(output, data) + print(output) + # [1, 3] (2 GPUs, out for rank 0) + # [5, 7] (2 GPUs, out for rank 1) + + """ + if not framework._in_legacy_dygraph(): + return _reduce_scatter_base_stream( + output, + input, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False, + ) diff --git a/python/paddle/distributed/communication/scatter.py b/python/paddle/distributed/communication/scatter.py new file mode 100644 index 0000000000..da7809df9c --- /dev/null +++ b/python/paddle/distributed/communication/scatter.py @@ -0,0 +1,94 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +import paddle.distributed.communication.stream as stream +from paddle.distributed.communication.group import _get_global_group + + +def scatter(tensor, tensor_list=None, src=0, group=None, sync_op=True): + """ + + Scatter a tensor to all participators. As shown below, one process is started with a GPU and the source of the scatter + is GPU0. Through scatter operator, the data in GPU0 will be sent to all GPUs averagely. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/scatter.png + :width: 800 + :alt: scatter + :align: center + + Args: + tensor (Tensor): The output Tensor. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + tensor_list (list|tuple): A list/tuple of Tensors to scatter. Every element in the list must be a Tensor whose data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. Default value is None. + src (int): The source rank id. Default value is 0. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + None. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data1 = paddle.to_tensor([7, 8, 9]) + data2 = paddle.to_tensor([10, 11, 12]) + dist.scatter(data1, src=1) + else: + data1 = paddle.to_tensor([1, 2, 3]) + data2 = paddle.to_tensor([4, 5, 6]) + dist.scatter(data1, tensor_list=[data1, data2], src=1) + print(data1, data2) + # [1, 2, 3] [10, 11, 12] (2 GPUs, out for rank 0) + # [4, 5, 6] [4, 5, 6] (2 GPUs, out for rank 1) + """ + if not framework._in_legacy_dygraph(): + return stream.scatter(tensor, tensor_list, src, group, sync_op) + + # code below will be removed after we remove the old dygraph + if group is not None and not group.is_member(): + return + ring_id = 0 if group is None else group.id + gsrc = src if group is None else group.get_group_rank(src) + rank = _get_global_group().rank if group is None else group.rank + nranks = _get_global_group().nranks if group is None else group.nranks + assert gsrc >= 0, "src rank out of group, need global rank" + + if rank != gsrc: + tensor_list = [] + for _ in range(nranks): + tensor_list.append(tensor) + temp = paddle.concat(tensor_list, axis=0) + + use_calc_stream = sync_op + return framework._legacy_C_ops.c_scatter( + temp, + tensor, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + 'nranks', + nranks, + 'root', + gsrc, + ) diff --git a/python/paddle/distributed/communication/send.py b/python/paddle/distributed/communication/send.py new file mode 100644 index 0000000000..fa3eb3dff3 --- /dev/null +++ b/python/paddle/distributed/communication/send.py @@ -0,0 +1,110 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +import paddle.distributed.communication.stream as stream + + +def send(tensor, dst=0, group=None, sync_op=True): + """ + Send a tensor to the receiver. + + Args: + tensor (Tensor): The Tensor to send. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + dst (int): The destination rank id. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([7, 8, 9]) + dist.send(data, dst=1) + else: + data = paddle.to_tensor([1, 2, 3]) + dist.recv(data, src=0) + print(data) + # [7, 8, 9] (2 GPUs) + """ + if not framework._in_legacy_dygraph(): + return stream.send( + tensor, dst=dst, group=group, sync_op=sync_op, use_calc_stream=False + ) + + # code below will be removed after we remove the old dygraph + if group is not None and not group.is_member(): + return + use_calc_stream = sync_op + gdst = dst if group is None else group.get_group_rank(dst) + assert gdst >= 0, "dst rank out of group, need global rank" + ring_id = 0 if group is None else group.id + + return paddle._legacy_C_ops.send_v2( + tensor, + 'use_calc_stream', + use_calc_stream, + 'ring_id', + ring_id, + 'peer', + gdst, + ) + + +def isend(tensor, dst, group=None): + """ + Send tensor asynchronously + + Args: + tensor (Tensor): The Tensor to send. Its data type + should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. + dst (int): The destination rank. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + + Returns: + Return a task object. + + Warning: + This API only supports the dygraph mode. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([7, 8, 9]) + task = dist.isend(data, dst=1) + else: + data = paddle.to_tensor([1, 2, 3]) + task = dist.irecv(data, src=0) + task.wait() + print(data) + # [7, 8, 9] (2 GPUs) + + """ + return send(tensor, dst, group, sync_op=False) diff --git a/python/paddle/distributed/communication/stream/__init__.py b/python/paddle/distributed/communication/stream/__init__.py index 63ec858557..423b655c62 100644 --- a/python/paddle/distributed/communication/stream/__init__.py +++ b/python/paddle/distributed/communication/stream/__init__.py @@ -14,8 +14,7 @@ from .all_gather import all_gather from .all_reduce import all_reduce -from .alltoall import alltoall -from .alltoall_single import alltoall_single +from .all_to_all import alltoall, alltoall_single from .broadcast import broadcast from .reduce import reduce from .reduce_scatter import reduce_scatter diff --git a/python/paddle/distributed/communication/stream/all_reduce.py b/python/paddle/distributed/communication/stream/all_reduce.py index 97eda9d0f9..79a359ab5f 100644 --- a/python/paddle/distributed/communication/stream/all_reduce.py +++ b/python/paddle/distributed/communication/stream/all_reduce.py @@ -16,11 +16,14 @@ import paddle.fluid.framework as framework import paddle.fluid.data_feeder as data_feeder import paddle.fluid.layer_helper as layer_helper from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp -from paddle.distributed.communication.group import _get_global_group +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, +) def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream): - op_type = _get_reduce_op(op, "all_reduce") + op_type = _get_reduce_op(op, "allreduce") group = _get_global_group() if group is None else group if use_calc_stream: @@ -50,7 +53,7 @@ def _all_reduce_in_static_mode(tensor, op, group, sync_op, use_calc_stream): 'all_reduce', ) - op_type = _get_reduce_op(op, "all_reduce") + op_type = _get_reduce_op(op, "allreduce") ring_id = 0 if group is None else group.id if not isinstance(ring_id, int): @@ -107,10 +110,8 @@ def all_reduce( out = data.numpy() # [[5, 7, 9], [5, 7, 9]] """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( @@ -122,6 +123,7 @@ def all_reduce( tensor, op, group, sync_op, use_calc_stream ) else: + assert group is None, "Group can not be used in static mode for now." return _all_reduce_in_static_mode( tensor, op, group, sync_op, use_calc_stream ) diff --git a/python/paddle/distributed/communication/stream/all_to_all.py b/python/paddle/distributed/communication/stream/all_to_all.py new file mode 100644 index 0000000000..663079fc0a --- /dev/null +++ b/python/paddle/distributed/communication/stream/all_to_all.py @@ -0,0 +1,359 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +import paddle.distributed as dist +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, +) + + +def _check_tensor_shape(tensor, shape, nranks=1): + if tensor.shape != shape: + raise RuntimeError('The tensor for alltoall is not correctly-sized.') + + +def _check_tensor_list_shape(tensor_list, shape, nranks=1): + if len(tensor_list) != nranks: + raise RuntimeError( + 'The tensor_list for alltoall is not correctly-sized.' + ) + for tensor in tensor_list: + if tensor.shape != shape: + raise RuntimeError( + 'The tensor_list for alltoall is not correctly-sized.' + ) + + +def _all_to_all_tensor_in_dygraph( + out_tensor, in_tensor, group, sync_op, use_calc_stream +): + + _check_tensor_shape(out_tensor, in_tensor.shape, group.nranks) + + if use_calc_stream: + return group.process_group.alltoall_tensor_on_calc_stream( + in_tensor, out_tensor + ) + + task = group.process_group.alltoall_tensor(in_tensor, out_tensor, sync_op) + if sync_op: + task.wait() + + return task + + +def _all_to_all_in_dygraph( + out_tensor_list, in_tensor_list, group, sync_op, use_calc_stream +): + if len(in_tensor_list) == 0: + raise RuntimeError("The input tensor_list should not be empty.") + + if len(out_tensor_list) == 0: + out_tensor_list += [ + paddle.empty_like(tensor) for tensor in in_tensor_list + ] + else: + _check_tensor_list_shape( + out_tensor_list, in_tensor_list[0].shape, group.nranks + ) + + if use_calc_stream: + return group.process_group.alltoall_on_calc_stream( + in_tensor_list, out_tensor_list + ) + + task = group.process_group.alltoall( + in_tensor_list, out_tensor_list, sync_op + ) + if sync_op: + task.wait() + + return task + + +def _all_to_all_in_static_mode( + out_tensor_or_tensor_list, + in_tensor_or_tensor_list, + group, + sync_op, + use_calc_stream, +): + op_type = 'alltoall' + ring_id = 0 if group is None else group.id + nranks = dist.get_world_size() + helper = layer_helper.LayerHelper(op_type, **locals()) + + in_tensor = in_tensor_or_tensor_list + if isinstance(in_tensor_or_tensor_list, list): + if len(in_tensor_or_tensor_list) == 0: + raise RuntimeError("The input tensor_list should not be empty.") + in_tensor = paddle.concat(in_tensor_or_tensor_list, axis=0) + out_tensor = out_tensor_or_tensor_list + if isinstance(out_tensor_or_tensor_list, list): + if len(out_tensor_or_tensor_list) != 0: + raise ValueError( + "The 'out_tensor_list' for all_to_all " "must be an empty list." + ) + out_tensor = helper.create_variable_for_type_inference( + dtype=in_tensor.dtype + ) + + data_feeder.check_variable_and_dtype( + in_tensor, + 'in_tensor', + ['float16', 'float32', 'float64', 'int32', 'int64'], + 'all_to_all', + ) + helper.append_op( + type=op_type, + inputs={'X': [in_tensor]}, + outputs={'Out': [out_tensor]}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': sync_op, + }, + ) + # NOTE(liyurui): If the argument `out_tensor_or_tensor_list` is a tensor_list, + # we need to split the result. So we should wait the result of all_to_all + # before split if the communication is not on calc stream. + if isinstance(out_tensor_or_tensor_list, list): + if not sync_op: + dist.wait(out_tensor, use_calc_stream=False) + out_tensor_or_tensor_list.extend(paddle.split(out_tensor, nranks, 0)) + + return None + + +def alltoall( + out_tensor_or_tensor_list, + in_tensor_or_tensor_list, + group=None, + sync_op=True, + use_calc_stream=False, +): + """ + + Scatter a tensor (or a tensor list) across devices and gather outputs to another tensor (or a tensor list, respectively). + + Args: + out_tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The output. If it is a tensor, it should be correctly-sized. + If it is a list, it should be empty or contain correctly-sized tensors. Its data type should be the same as the input. + in_tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The input to scatter (must be specified on the source rank). + If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors. Support + float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type. + group (Group, optional): Communicate in which group. If none is given, use the global group as default. + sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. + use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This + option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + out_tensor_list = [] + if dist.get_rank() == 0: + data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]]) + data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]]) + else: + data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]]) + data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]]) + task = dist.stream.alltoall(out_tensor_list, [data1, data2], sync_op=False) + task.wait() + print(out_tensor_list) + # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) + # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) + """ + if _warn_cur_rank_not_in_group(group): + return + + if not sync_op and use_calc_stream: + raise RuntimeError( + "use_calc_stream can only be true in sync op behavior." + ) + + if out_tensor_or_tensor_list is None: + raise RuntimeError("The output should be specified.") + if in_tensor_or_tensor_list is None: + raise RuntimeError("The input should be specified.") + + if framework.in_dygraph_mode(): + group = _get_global_group() if group is None else group + out_is_tensor = paddle.is_tensor(out_tensor_or_tensor_list) + in_is_tensor = paddle.is_tensor(in_tensor_or_tensor_list) + if out_is_tensor and in_is_tensor: + return _all_to_all_tensor_in_dygraph( + out_tensor_or_tensor_list, + in_tensor_or_tensor_list, + group, + sync_op, + use_calc_stream, + ) + elif not out_is_tensor and not in_is_tensor: + return _all_to_all_in_dygraph( + out_tensor_or_tensor_list, + in_tensor_or_tensor_list, + group, + sync_op, + use_calc_stream, + ) + else: + raise RuntimeError( + "The output and input should be both tensor or tensor list." + ) + else: + assert group is None, "Group can not be used in static mode for now." + return _all_to_all_in_static_mode( + out_tensor_or_tensor_list, + in_tensor_or_tensor_list, + group, + sync_op, + use_calc_stream, + ) + + +def _alltoall_single_in_dygraph( + out_tensor, + in_tensor, + out_split_sizes, + in_split_sizes, + group, + sync_op, + use_calc_stream, +): + if out_split_sizes is None: + out_split_sizes = [] + if in_split_sizes is None: + in_split_sizes = [] + + if use_calc_stream: + return group.process_group.alltoall_single_on_calc_stream( + in_tensor, out_tensor, in_split_sizes, out_split_sizes + ) + + task = group.process_group.alltoall_single( + in_tensor, out_tensor, in_split_sizes, out_split_sizes, sync_op + ) + if sync_op: + task.wait() + + return task + + +def alltoall_single( + out_tensor, + in_tensor, + out_split_sizes=None, + in_split_sizes=None, + group=None, + sync_op=True, + use_calc_stream=False, +): + """ + + Split and Scatter the splitted input tensor to the out tensor across devices. + + Args: + out_tensor(Tensor): The output tensor. Its data type should be the same as the input. + in_tensor (Tensor): The input tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool. + out_split_sizes (List[int], optional): Split sizes of out_tensor for dim[0]. If not given, dim[0] of out_tensor must be divisible + by group size and out_tensor will be gathered averagely from all participators. If none is given, use a empty list as default. + in_split_sizes (List[int], optional): Split sizes of in_tensor for dim[0]. If not given, dim[0] of in_tensor must be divisible + by group size and in_tensor will be scattered averagely to all participators. If none is given, use a empty list as default. + group (Group, optional): Communicate in which group. If none is given, use the global group as default. + sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. + use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This + option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning. + + Returns: + Return a task object. + + Warning: + This API only supports the dygraph mode now. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + local_rank = dist.get_rank() + + # case 1 + output = paddle.empty([2], dtype="int64") + if local_rank == 0: + data = paddle.to_tensor([0, 1]) + else: + data = paddle.to_tensor([2, 3]) + task = dist.stream.alltoall_single(output, data, sync_op=False) + task.wait() + out = output.numpy() + # [0, 2] (2 GPUs, out for rank 0) + # [1, 3] (2 GPUs, out for rank 1) + + # case 2 + size = dist.get_world_size() + output = paddle.empty([(local_rank + 1) * size, size], dtype='float32') + if local_rank == 0: + data = paddle.to_tensor([[0., 0.], [0., 0.], [0., 0.]]) + else: + data = paddle.to_tensor([[1., 1.], [1., 1.], [1., 1.]]) + out_split_sizes = [local_rank + 1 for i in range(size)] + in_split_sizes = [i + 1 for i in range(size)] + task = dist.stream.alltoall_single(output, + data, + out_split_sizes, + in_split_sizes, + sync_op=False) + task.wait() + out = output.numpy() + # [[0., 0.], [1., 1.]] (2 GPUs, out for rank 0) + # [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] (2 GPUs, out for rank 1) + """ + if _warn_cur_rank_not_in_group(group): + return + + if not sync_op and use_calc_stream: + raise RuntimeError( + "use_calc_stream can only be true in sync op behavior." + ) + + if framework.in_dygraph_mode(): + group = _get_global_group() if group is None else group + return _alltoall_single_in_dygraph( + out_tensor, + in_tensor, + out_split_sizes, + in_split_sizes, + group, + sync_op, + use_calc_stream, + ) + + raise RuntimeError( + "paddle.distributed.stream.alltoall_single is only supported in dygraph mode now." + ) diff --git a/python/paddle/distributed/communication/stream/alltoall.py b/python/paddle/distributed/communication/stream/alltoall.py deleted file mode 100644 index 5faf106263..0000000000 --- a/python/paddle/distributed/communication/stream/alltoall.py +++ /dev/null @@ -1,177 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle -import paddle.fluid.framework as framework -from paddle.distributed import collective - - -def _check_tensor_shape(tensor, shape, nranks=1): - if tensor.shape != shape: - raise RuntimeError('The tensor for alltoall is not correctly-sized.') - - -def _check_tensor_list_shape(tensor_list, shape, nranks=1): - if len(tensor_list) != nranks: - raise RuntimeError( - 'The tensor_list for alltoall is not correctly-sized.' - ) - for tensor in tensor_list: - if tensor.shape != shape: - raise RuntimeError( - 'The tensor_list for alltoall is not correctly-sized.' - ) - - -def _alltoall_tensor_in_dygraph( - out_tensor, in_tensor, group, sync_op, use_calc_stream -): - group = collective._get_default_group() if group is None else group - - _check_tensor_shape(out_tensor, in_tensor.shape, group.nranks) - - if use_calc_stream: - return group.process_group.alltoall_tensor_on_calc_stream( - in_tensor, out_tensor - ) - - task = group.process_group.alltoall_tensor(in_tensor, out_tensor, sync_op) - if sync_op: - task.wait() - - return task - - -def _alltoall_in_dygraph( - out_tensor_list, in_tensor_list, group, sync_op, use_calc_stream -): - group = collective._get_default_group() if group is None else group - - if len(in_tensor_list) == 0: - raise RuntimeError("The input tensor_list should not be empty.") - - if len(out_tensor_list) == 0: - out_tensor_list += [ - paddle.empty_like(tensor) for tensor in in_tensor_list - ] - else: - _check_tensor_list_shape( - out_tensor_list, in_tensor_list[0].shape, group.nranks - ) - - if use_calc_stream: - return group.process_group.alltoall_on_calc_stream( - in_tensor_list, out_tensor_list - ) - - task = group.process_group.alltoall( - in_tensor_list, out_tensor_list, sync_op - ) - if sync_op: - task.wait() - - return task - - -def alltoall( - out_tensor_or_tensor_list, - in_tensor_or_tensor_list, - group=None, - sync_op=True, - use_calc_stream=False, -): - """ - - Scatter a tensor (or a tensor list) across devices and gather outputs to another tensor (or a tensor list, respectively). - - Args: - out_tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The output. If it is a tensor, it should be correctly-sized. - If it is a list, it should be empty or contain correctly-sized tensors. Its data type should be the same as the input. - in_tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The input to scatter (must be specified on the source rank). - If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors. Support - float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type. - group (Group, optional): Communicate in which group. If none is given, use the global group as default. - sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. - use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This - option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning. - - Returns: - Return a task object. - - Warning: - This API only supports the dygraph mode now. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - out_tensor_list = [] - if dist.get_rank() == 0: - data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]]) - data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]]) - else: - data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]]) - data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]]) - task = dist.stream.alltoall(out_tensor_list, [data1, data2], sync_op=False) - task.wait() - print(out_tensor_list) - # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) - # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) - """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) - - if not sync_op and use_calc_stream: - raise RuntimeError( - "use_calc_stream can only be true in sync op behavior." - ) - - if out_tensor_or_tensor_list is None: - raise RuntimeError("The output should be specified.") - if in_tensor_or_tensor_list is None: - raise RuntimeError("The input should be specified.") - - if framework.in_dygraph_mode(): - out_is_tensor = paddle.is_tensor(out_tensor_or_tensor_list) - in_is_tensor = paddle.is_tensor(in_tensor_or_tensor_list) - if out_is_tensor and in_is_tensor: - return _alltoall_tensor_in_dygraph( - out_tensor_or_tensor_list, - in_tensor_or_tensor_list, - group, - sync_op, - use_calc_stream, - ) - elif not out_is_tensor and not in_is_tensor: - return _alltoall_in_dygraph( - out_tensor_or_tensor_list, - in_tensor_or_tensor_list, - group, - sync_op, - use_calc_stream, - ) - else: - raise RuntimeError( - "The output and input should be both tensor or tensor list." - ) - - raise RuntimeError( - "paddle.distributed.stream.alltoall is only supported in dygraph mode now." - ) diff --git a/python/paddle/distributed/communication/stream/alltoall_single.py b/python/paddle/distributed/communication/stream/alltoall_single.py deleted file mode 100644 index 75eb289032..0000000000 --- a/python/paddle/distributed/communication/stream/alltoall_single.py +++ /dev/null @@ -1,144 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle.fluid.framework as framework -from paddle.distributed import collective - - -def _alltoall_single_in_dygraph( - out_tensor, - in_tensor, - out_split_sizes, - in_split_sizes, - group, - sync_op, - use_calc_stream, -): - group = collective._get_default_group() if group is None else group - - if out_split_sizes is None: - out_split_sizes = [] - if in_split_sizes is None: - in_split_sizes = [] - - if use_calc_stream: - return group.process_group.alltoall_single_on_calc_stream( - in_tensor, out_tensor, in_split_sizes, out_split_sizes - ) - - task = group.process_group.alltoall_single( - in_tensor, out_tensor, in_split_sizes, out_split_sizes, sync_op - ) - if sync_op: - task.wait() - - return task - - -def alltoall_single( - out_tensor, - in_tensor, - out_split_sizes=None, - in_split_sizes=None, - group=None, - sync_op=True, - use_calc_stream=False, -): - """ - - Split and Scatter the splitted input tensor to the out tensor across devices. - - Args: - out_tensor(Tensor): The output tensor. Its data type should be the same as the input. - in_tensor (Tensor): The input tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool. - out_split_sizes (List[int], optional): Split sizes of out_tensor for dim[0]. If not given, dim[0] of out_tensor must be divisible - by group size and out_tensor will be gathered averagely from all participators. If none is given, use a empty list as default. - in_split_sizes (List[int], optional): Split sizes of in_tensor for dim[0]. If not given, dim[0] of in_tensor must be divisible - by group size and in_tensor will be scattered averagely to all participators. If none is given, use a empty list as default. - group (Group, optional): Communicate in which group. If none is given, use the global group as default. - sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. - use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This - option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning. - - Returns: - Return a task object. - - Warning: - This API only supports the dygraph mode now. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - local_rank = dist.get_rank() - - # case 1 - output = paddle.empty([2], dtype="int64") - if local_rank == 0: - data = paddle.to_tensor([0, 1]) - else: - data = paddle.to_tensor([2, 3]) - task = dist.stream.alltoall_single(output, data, sync_op=False) - task.wait() - out = output.numpy() - # [0, 2] (2 GPUs, out for rank 0) - # [1, 3] (2 GPUs, out for rank 1) - - # case 2 - size = dist.get_world_size() - output = paddle.empty([(local_rank + 1) * size, size], dtype='float32') - if local_rank == 0: - data = paddle.to_tensor([[0., 0.], [0., 0.], [0., 0.]]) - else: - data = paddle.to_tensor([[1., 1.], [1., 1.], [1., 1.]]) - out_split_sizes = [local_rank + 1 for i in range(size)] - in_split_sizes = [i + 1 for i in range(size)] - task = dist.stream.alltoall_single(output, - data, - out_split_sizes, - in_split_sizes, - sync_op=False) - task.wait() - out = output.numpy() - # [[0., 0.], [1., 1.]] (2 GPUs, out for rank 0) - # [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] (2 GPUs, out for rank 1) - """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) - - if not sync_op and use_calc_stream: - raise RuntimeError( - "use_calc_stream can only be true in sync op behavior." - ) - - if framework.in_dygraph_mode(): - return _alltoall_single_in_dygraph( - out_tensor, - in_tensor, - out_split_sizes, - in_split_sizes, - group, - sync_op, - use_calc_stream, - ) - - raise RuntimeError( - "paddle.distributed.stream.alltoall_single is only supported in dygraph mode now." - ) diff --git a/python/paddle/distributed/communication/stream/broadcast.py b/python/paddle/distributed/communication/stream/broadcast.py index 3672b02811..e4e58963f3 100644 --- a/python/paddle/distributed/communication/stream/broadcast.py +++ b/python/paddle/distributed/communication/stream/broadcast.py @@ -13,29 +13,74 @@ # limitations under the License. import paddle.fluid.framework as framework -from paddle.distributed import collective - - -def _broadcast_in_dygraph(tensor, src, group, sync_op, use_calc_stream): - group = collective._get_default_group() if group is None else group +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, + _get_or_throw_group_rank, +) + + +def _broadcast_in_dygraph( + tensor, src_rank_in_group, group, sync_op, use_calc_stream +): if use_calc_stream: - return group.process_group.broadcast_on_calc_stream(tensor, src) + return group.process_group.broadcast_on_calc_stream( + tensor, src_rank_in_group + ) - task = group.process_group.broadcast(tensor, src, sync_op) + task = group.process_group.broadcast(tensor, src_rank_in_group, sync_op) if sync_op: task.wait() return task -def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): +def _broadcast_in_static_mode( + tensor, src_rank_in_group, group, sync_op, use_calc_stream +): + data_feeder.check_variable_and_dtype( + tensor, + 'tensor', + [ + 'float16', + 'float32', + 'float64', + 'int32', + 'int64', + 'int8', + 'uint8', + 'bool', + ], + 'broadcast', + ) + + op_type = 'c_broadcast' + helper = layer_helper.LayerHelper(op_type, **locals()) + ring_id = 0 if group is None else group.id + + helper.append_op( + type=op_type, + inputs={'X': [tensor]}, + outputs={'Out': [tensor]}, + attrs={ + 'root': src_rank_in_group, + 'use_calc_stream': sync_op, + 'ring_id': ring_id, + }, + ) + return None + + +def broadcast(tensor, src, group=None, sync_op=True, use_calc_stream=False): """ Broadcast a tensor to all devices. Args: tensor (Tensor): The tensor to broadcast. Support float16, float32, float64, int32, int64, int8, uint8 or bool as its data type. - src (int, optional): Rank of the source device. If none is given, use `0` as default. + src (int, optional): Rank of the source device. group (Group, optional): Communicate in which group. If none is given, use the global group as default. sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This @@ -65,10 +110,8 @@ def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): out = data.numpy() # [[1, 2, 3], [1, 2, 3]] (2 GPUs) """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( @@ -76,10 +119,14 @@ def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): ) if framework.in_dygraph_mode(): + group = _get_global_group() if group is None else group + src_rank_in_group = _get_or_throw_group_rank(src, group) + return _broadcast_in_dygraph( + tensor, src_rank_in_group, group, sync_op, use_calc_stream + ) + else: + assert group is None, "Group can not be used in static mode for now." + return _broadcast_in_static_mode( tensor, src, group, sync_op, use_calc_stream ) - - raise RuntimeError( - "paddle.distributed.stream.broadcast is only supported in dygraph mode now." - ) diff --git a/python/paddle/distributed/communication/stream/recv.py b/python/paddle/distributed/communication/stream/recv.py index 2658379a41..757b0f0c28 100644 --- a/python/paddle/distributed/communication/stream/recv.py +++ b/python/paddle/distributed/communication/stream/recv.py @@ -13,21 +13,56 @@ # limitations under the License. import paddle.fluid.framework as framework -from paddle.distributed import collective - - -def _recv_in_dygraph(tensor, src, group, sync_op, use_calc_stream): - group = collective._get_default_group() if group is None else group +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, + _get_or_throw_group_rank, +) + + +def _recv_in_dygraph( + tensor, src_rank_in_group, group, sync_op, use_calc_stream +): if use_calc_stream: - return group.process_group.recv_on_calc_stream(tensor, src) + return group.process_group.recv_on_calc_stream( + tensor, src_rank_in_group + ) - task = group.process_group.recv(tensor, src, sync_op) + task = group.process_group.recv(tensor, src_rank_in_group, sync_op) if sync_op: task.wait() return task +def _recv_in_static_mode( + tensor, src_rank_in_group, group, sync_op, use_calc_stream +): + op_type = 'recv_v2' + data_feeder.check_variable_and_dtype( + tensor, + 'tensor', + ['float16', 'float32', 'float64', 'int32', 'int64'], + 'recv', + ) + ring_id = 0 if group is None else group.id + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + outputs={'Out': [tensor]}, + attrs={ + 'ring_id': ring_id, + 'peer': src_rank_in_group, + 'out_shape': tensor.shape, + 'dtype': tensor.dtype, + 'use_calc_stream': sync_op, + }, + ) + return None + + def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): """ @@ -44,9 +79,6 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): Returns: Return a task object. - Warning: - This API only supports the dygraph mode now. - Examples: .. code-block:: python @@ -66,10 +98,8 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): out = data.numpy() # [[4, 5, 6], [4, 5, 6]] (2 GPUs) """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( @@ -77,8 +107,14 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): ) if framework.in_dygraph_mode(): - return _recv_in_dygraph(tensor, src, group, sync_op, use_calc_stream) + group = _get_global_group() if group is None else group + src_rank_in_group = _get_or_throw_group_rank(src, group) - raise RuntimeError( - "paddle.distributed.stream.recv is only supported in dygraph mode now." - ) + return _recv_in_dygraph( + tensor, src_rank_in_group, group, sync_op, use_calc_stream + ) + else: + assert group is None, "Group can not be used in static mode for now." + return _recv_in_static_mode( + tensor, src, group, sync_op, use_calc_stream + ) diff --git a/python/paddle/distributed/communication/stream/reduce.py b/python/paddle/distributed/communication/stream/reduce.py index d7f0fc6b2b..eb8b3af806 100644 --- a/python/paddle/distributed/communication/stream/reduce.py +++ b/python/paddle/distributed/communication/stream/reduce.py @@ -13,23 +13,70 @@ # limitations under the License. import paddle.fluid.framework as framework -from paddle.distributed.communication.group import _get_global_group +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, + _get_or_throw_group_rank, +) from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp -def _reduce_in_dygraph(tensor, dst, op, group, sync_op, use_calc_stream): +def _reduce_in_dygraph( + tensor, dst_rank_in_group, op, group, sync_op, use_calc_stream +): op_type = _get_reduce_op(op, "reduce") - group = _get_global_group() if group is None else group if use_calc_stream: - return group.process_group.reduce_on_calc_stream(tensor, dst, op_type) + return group.process_group.reduce_on_calc_stream( + tensor, dst_rank_in_group, op_type + ) - task = group.process_group.reduce(tensor, dst, op_type, sync_op) + task = group.process_group.reduce( + tensor, dst_rank_in_group, op_type, sync_op + ) if sync_op: task.wait() return task +def _reduce_in_static_mode( + tensor, dst_rank_in_group, op, group, sync_op, use_calc_stream +): + data_feeder.check_variable_and_dtype( + tensor, + 'tensor', + [ + 'float16', + 'float32', + 'float64', + 'int32', + 'int64', + 'int8', + 'uint8', + 'bool', + ], + 'reduce', + ) + + op_type = _get_reduce_op(op, "reduce") + ring_id = 0 if group is None else group.id + + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + inputs={'X': [tensor]}, + outputs={'Out': [tensor]}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': sync_op, + 'root_id': dst_rank_in_group, + }, + ) + return None + + def reduce( tensor, dst=0, @@ -77,10 +124,8 @@ def reduce( # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0) # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1) """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( @@ -88,10 +133,13 @@ def reduce( ) if framework.in_dygraph_mode(): + group = _get_global_group() if group is None else group + dst_rank_in_group = _get_or_throw_group_rank(dst, group) return _reduce_in_dygraph( + tensor, dst_rank_in_group, op, group, sync_op, use_calc_stream + ) + else: + assert group is None, "Group can not be used in static mode for now." + return _reduce_in_static_mode( tensor, dst, op, group, sync_op, use_calc_stream ) - - raise RuntimeError( - "paddle.distributed.stream.reduce is only supported in dygraph mode now." - ) diff --git a/python/paddle/distributed/communication/stream/reduce_scatter.py b/python/paddle/distributed/communication/stream/reduce_scatter.py index ef375fa5be..3e46b51dde 100644 --- a/python/paddle/distributed/communication/stream/reduce_scatter.py +++ b/python/paddle/distributed/communication/stream/reduce_scatter.py @@ -14,7 +14,10 @@ import paddle import paddle.fluid.framework as framework -from paddle.distributed.communication.group import _get_global_group +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, +) from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp @@ -104,7 +107,7 @@ def reduce_scatter( Args: tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type. - tensor_list (List[Tensor]]): The input to scatter. + tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The input to scatter. If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors. op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. group (Group, optional): Communicate in which group. If none is given, use the global group as default. @@ -137,10 +140,8 @@ def reduce_scatter( # [4, 6] (2 GPUs, out for rank 0) # [8, 10] (2 GPUs, out for rank 1) """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( @@ -220,10 +221,8 @@ def _reduce_scatter_base( # [1, 2, 3] (2 GPUs, out for rank 0) # [4, 5, 6] (2 GPUs, out for rank 1) """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( diff --git a/python/paddle/distributed/communication/stream/scatter.py b/python/paddle/distributed/communication/stream/scatter.py index 199885c1fd..75a8ab3909 100644 --- a/python/paddle/distributed/communication/stream/scatter.py +++ b/python/paddle/distributed/communication/stream/scatter.py @@ -12,10 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import warnings import paddle import paddle.distributed as dist import paddle.fluid.framework as framework -from paddle.distributed import collective +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, + _get_or_throw_group_rank, +) def _check_tensor_shape(tensor, shape, nranks=1): @@ -38,26 +45,19 @@ def _check_tensor_list_shape(tensor_list, shape, nranks=1): def _scatter_tensor_in_dygraph( - out_tensor, in_tensor, src, group, sync_op, use_calc_stream + out_tensor, in_tensor, src_rank_in_group, group, sync_op, use_calc_stream ): - group = collective._get_default_group() if group is None else group - - src_rank = group.get_group_rank(src) - if src_rank == -1: - raise RuntimeError("Src rank out of group.") - nranks = group.nranks - rank = dist.get_rank() - if rank == src_rank: + if group.rank == src_rank_in_group: _check_tensor_shape(out_tensor, in_tensor.shape, nranks) if use_calc_stream: return group.process_group.scatter_tensor_on_calc_stream( - in_tensor, out_tensor, src + in_tensor, out_tensor, src_rank_in_group ) task = group.process_group.scatter_tensor( - in_tensor, out_tensor, src, sync_op + in_tensor, out_tensor, src_rank_in_group, sync_op ) if sync_op: task.wait() @@ -66,17 +66,10 @@ def _scatter_tensor_in_dygraph( def _scatter_in_dygraph( - tensor, tensor_list, src, group, sync_op, use_calc_stream + tensor, tensor_list, src_rank_in_group, group, sync_op, use_calc_stream ): - group = collective._get_default_group() if group is None else group - - src_rank = group.get_group_rank(src) - if src_rank == -1: - raise RuntimeError("Src rank out of group.") - nranks = group.nranks - rank = dist.get_rank() - if rank == src_rank: + if group.rank == src_rank_in_group: if len(tensor_list) == 0: raise RuntimeError( "The tensor_list should not be empty on src rank." @@ -87,16 +80,76 @@ def _scatter_in_dygraph( if use_calc_stream: return group.process_group.scatter_on_calc_stream( - tensor_list, tensor, src + tensor_list, tensor, src_rank_in_group ) - task = group.process_group.scatter(tensor_list, tensor, src, sync_op) + task = group.process_group.scatter( + tensor_list, tensor, src_rank_in_group, sync_op + ) if sync_op: task.wait() return task +def _scatter_in_static_mode( + tensor, + tensor_or_tensor_list, + src_rank_in_group, + group, + sync_op, + use_calc_stream, +): + nranks = dist.get_world_size() if group is None else group.nranks + rank = dist.get_rank() + + input_tensor = tensor_or_tensor_list + if isinstance(tensor_or_tensor_list, list): + tensor_list = tensor_or_tensor_list + if rank == src_rank_in_group: + if len(tensor_list) == 0: + raise RuntimeError( + "The tensor_list should not be empty on src rank." + ) + else: + tensor_list = [tensor for _ in range(nranks)] + input_tensor = paddle.concat(tensor_list, axis=0) + + ring_id = 0 if group is None else group.id + + data_feeder.check_variable_and_dtype( + tensor, + 'tensor', + [ + 'float16', + 'float32', + 'float64', + 'int32', + 'int64', + 'int8', + 'uint8', + 'bool', + ], + 'scatter', + ) + + op_type = 'c_scatter' + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + inputs={'X': [input_tensor]}, + outputs={'Out': [tensor]}, + attrs={ + 'ring_id': ring_id, + 'root': src_rank_in_group, + 'use_calc_stream': sync_op, + 'nranks': nranks, + }, + ) + + return None + + def scatter( tensor, tensor_or_tensor_list=None, @@ -146,25 +199,34 @@ def scatter( # [1, 2, 3] (2 GPUs, out for rank 0) # [4, 5, 6] (2 GPUs, out for rank 1) """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( "use_calc_stream can only be true in sync op behavior." ) - if tensor_or_tensor_list is None: - raise RuntimeError("The input should be specified.") + # NOTE(liyurui): Only the source rank needs to specific the tensor_or_tensor_list argument. + # Other ranks which pass this argument in will be ignored with a warning. + # If a tensor_list passed in, we need to concat it to a tensor before invoke C++ API. + # If a tensor passed in, concat is not needed. + # The passed in type for non-src rank is meaningless, for it will be ignored. + if src != dist.get_rank(): + if tensor_or_tensor_list is not None: + warnings.warn( + "Specific `tensor_or_tensor_list` is meaningless for rank which is not src." + ) + tensor_or_tensor_list = [] if framework.in_dygraph_mode(): + group = _get_global_group() if group is None else group + src_rank_in_group = _get_or_throw_group_rank(src, group) if paddle.is_tensor(tensor_or_tensor_list): return _scatter_tensor_in_dygraph( tensor, tensor_or_tensor_list, - src, + src_rank_in_group, group, sync_op, use_calc_stream, @@ -173,12 +235,19 @@ def scatter( return _scatter_in_dygraph( tensor, tensor_or_tensor_list, - src, + src_rank_in_group, group, sync_op, use_calc_stream, ) - - raise RuntimeError( - "paddle.distributed.stream.scatter is only supported in dygraph mode now." - ) + else: + assert group is None, "Group can not be used in static mode for now." + + return _scatter_in_static_mode( + tensor, + tensor_or_tensor_list, + src, + group, + sync_op, + use_calc_stream, + ) diff --git a/python/paddle/distributed/communication/stream/send.py b/python/paddle/distributed/communication/stream/send.py index 206a8cfaa6..f2d135abed 100644 --- a/python/paddle/distributed/communication/stream/send.py +++ b/python/paddle/distributed/communication/stream/send.py @@ -13,21 +13,55 @@ # limitations under the License. import paddle.fluid.framework as framework -from paddle.distributed import collective - - -def _send_in_dygraph(tensor, dst, group, sync_op, use_calc_stream): - group = collective._get_default_group() if group is None else group +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.group import ( + _get_global_group, + _warn_cur_rank_not_in_group, + _get_or_throw_group_rank, +) + + +def _send_in_dygraph( + tensor, dst_rank_in_group, group, sync_op, use_calc_stream +): if use_calc_stream: - return group.process_group.send_on_calc_stream(tensor, dst) + return group.process_group.send_on_calc_stream( + tensor, dst_rank_in_group + ) - task = group.process_group.send(tensor, dst, sync_op) + task = group.process_group.send(tensor, dst_rank_in_group, sync_op) if sync_op: task.wait() return task +def _send_in_static_mode( + tensor, dst_rank_in_group, group, sync_op, use_calc_stream +): + op_type = 'send_v2' + data_feeder.check_variable_and_dtype( + tensor, + 'tensor', + ['float16', 'float32', 'float64', 'int32', 'int64'], + 'send', + ) + + ring_id = 0 if group is None else group.id + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + inputs={'X': [tensor]}, + attrs={ + 'ring_id': ring_id, + 'peer': dst_rank_in_group, + 'use_calc_stream': sync_op, + }, + ) + return None + + def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): """ @@ -44,9 +78,6 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): Returns: Return a task object. - Warning: - This API only supports the dygraph mode now. - Examples: .. code-block:: python @@ -66,10 +97,8 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): out = data.numpy() # [[4, 5, 6], [4, 5, 6]] (2 GPUs) """ - if group is not None and not group.is_member(): - raise RuntimeError( - "The group should not be None and all ranks which invoke this operation should be the member of this group." - ) + if _warn_cur_rank_not_in_group(group): + return if not sync_op and use_calc_stream: raise RuntimeError( @@ -77,8 +106,14 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): ) if framework.in_dygraph_mode(): - return _send_in_dygraph(tensor, dst, group, sync_op, use_calc_stream) + group = _get_global_group() if group is None else group + dst_rank_in_group = _get_or_throw_group_rank(dst, group) - raise RuntimeError( - "paddle.distributed.stream.send is only supported in dygraph mode now." - ) + return _send_in_dygraph( + tensor, dst_rank_in_group, group, sync_op, use_calc_stream + ) + else: + assert group is None, "Group can not be used in static mode for now." + return _send_in_static_mode( + tensor, dst, group, sync_op, use_calc_stream + ) diff --git a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py index 71db8d9dbb..f76b1d5232 100644 --- a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py +++ b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py @@ -22,7 +22,7 @@ from paddle.fluid.layer_helper import LayerHelper from paddle.fluid.data_feeder import check_variable_and_dtype from paddle.fluid.dygraph import layers from paddle.distributed import collective -from ....communication.reduce import ReduceOp +from ....communication.reduce import ReduceOp, _get_reduce_op from paddle.fluid.data_feeder import check_dtype import paddle.fluid.dygraph_utils as dygraph_utils @@ -61,7 +61,7 @@ def _c_identity(tensor, group=None): @staticmethod def backward(ctx, dy): - op_type = collective._get_reduce_op(ReduceOp.SUM, "_c_identity") + op_type = _get_reduce_op(ReduceOp.SUM, "_c_identity") group.process_group.allreduce_on_calc_stream(dy, op_type) return dy @@ -254,7 +254,7 @@ def _mp_allreduce( ctx.ring_id = group.id if use_calc_stream: - op_type = collective._get_reduce_op(op, "_mp_allreduce") + op_type = _get_reduce_op(op, "_mp_allreduce") group.process_group.allreduce_on_calc_stream( tensor, op_type ) diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py index ed14352095..c2bacc6a66 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py @@ -27,13 +27,13 @@ import numpy as np from collections import OrderedDict import paddle +import paddle.distributed as dist from paddle.fluid import core from paddle.optimizer import Optimizer from paddle.fluid.clip import ClipGradByGlobalNorm from paddle.distributed.collective import ( _get_global_group, new_group, - broadcast, wait, ) @@ -169,7 +169,7 @@ class ShardingOptimizerStage2(Optimizer): """ for p in self._local_params: - broadcast( + dist.broadcast( p, src=self._global_root_rank, group=self.group, sync_op=True ) @@ -456,7 +456,7 @@ class ShardingOptimizerStage2(Optimizer): # Exchange all the shards with the other ranks for dtype_per_rank in self.param_storages.values(): for dst_rank, internal_storage in dtype_per_rank.items(): - broadcast( + dist.broadcast( tensor=internal_storage.buffer, src=self.group.ranks[dst_rank], group=self.group, diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py index 0414798e68..6f98f5be22 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py @@ -28,6 +28,7 @@ import warnings from collections import OrderedDict import paddle +import paddle.distributed as dist from paddle.fluid import core from paddle.optimizer import Optimizer from paddle.fluid.clip import ClipGradByGlobalNorm @@ -38,7 +39,6 @@ HybridParallelClipGrad = ( ) from paddle.distributed.collective import ( _get_global_group, - broadcast, new_group, ) @@ -206,12 +206,12 @@ class GroupShardedOptimizerStage2(Optimizer): """ for p in self._local_params: - broadcast( + dist.broadcast( p, src=self._global_root_rank, group=self._group, sync_op=True ) if self._dp_group: - broadcast( + dist.broadcast( p, src=self._dp_group.ranks[0], group=self._dp_group, @@ -562,7 +562,7 @@ class GroupShardedOptimizerStage2(Optimizer): else: for dtype_per_rank in self.param_storages.values(): for dst_rank, internal_storage in dtype_per_rank.items(): - broadcast( + dist.broadcast( tensor=internal_storage.buffer, src=self._group.ranks[dst_rank], group=self._group, @@ -590,7 +590,7 @@ class GroupShardedOptimizerStage2(Optimizer): if x.trainable: group = self._broadcast_groups[group_idx] group_idx = (group_idx + 1) % self._number_of_broadcast_groups - task = broadcast( + task = dist.broadcast( tensor=x, src=group.ranks[self._param2rank[x.name]], group=group, diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py index 044111cc34..b28ba66b67 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py @@ -27,6 +27,7 @@ from functools import reduce from types import MethodType import paddle +import paddle.distributed as dist from paddle import nn from paddle.distributed import collective from paddle.distributed.utils.log_utils import get_logger @@ -324,12 +325,12 @@ class GroupShardedStage2(nn.Layer): """ for buffer in self._layer.buffers(include_sublayers=True): - collective.broadcast( + dist.broadcast( buffer, self._global_root_rank, self._group, sync_op=True ) if self._dp_group and self._dp_group.nranks > 1: - collective.broadcast( + dist.broadcast( buffer, self._dp_group.ranks[0], self._dp_group, @@ -402,7 +403,7 @@ class GroupShardedStage2(nn.Layer): # Synchronize the reduce parameter gradient asynchronize self._sharding_optimizers[0]._update_task( - collective.reduce( + dist.reduce( tensor=param.grad, dst=self._group.ranks[dst_rank], group=self._group, @@ -415,7 +416,7 @@ class GroupShardedStage2(nn.Layer): not self._reduce_overlap ), 'dp + stage2 hybrid parallel only Synchronize due to the new communication lib.' # TODO(wuhuachao):after the new communication lib upgrading, overlapping the comm of dp + stage2. - collective.all_reduce( + dist.all_reduce( tensor=param.grad, group=self._dp_group, sync_op=True, @@ -469,7 +470,7 @@ class GroupShardedStage2(nn.Layer): grad_storage.sent = True # Synchronize the reduce parameter gradient asynchronize self._sharding_optimizers[0]._update_task( - collective.reduce( + dist.reduce( tensor=grad_storage.buffer, dst=self._group.ranks[grad_storage.destination], group=self._group, @@ -482,7 +483,7 @@ class GroupShardedStage2(nn.Layer): not self._reduce_overlap ), 'dp + stage2 hybrid parallel only Synchronize due to the new communication lib.' # TODO(wuhuachao):after the new communication lib upgrading, overlapping the comm of dp + stage2. - collective.all_reduce( + dist.all_reduce( tensor=grad_storage.buffer, group=self._dp_group, sync_op=True, diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py index e80ea0531f..7504955e8d 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py @@ -18,6 +18,7 @@ from types import MethodType from collections import OrderedDict import paddle +import paddle.distributed as dist from paddle import nn from paddle.autograd import PyLayer import paddle.fluid.core as core @@ -196,7 +197,7 @@ class GroupShardedStage3(nn.Layer): """ for p in self._layer.parameters(): - collective.broadcast( + dist.broadcast( p, src=self._global_root_rank, group=self._group, sync_op=True ) @@ -493,7 +494,7 @@ class GroupShardedStage3(nn.Layer): """ for buffer in self._layer.buffers(include_sublayers=True): - collective.broadcast( + dist.broadcast( buffer, self._global_root_rank, self._group, sync_op=True ) @@ -536,7 +537,7 @@ class GroupShardedStage3(nn.Layer): # 2.Handle unslice param for grad_storage in self._grad_storages.values(): grad_storage.buffer.scale_(scale=self._world_size_scaling) - collective.all_reduce(tensor=grad_storage.buffer, group=self._group) + dist.all_reduce(tensor=grad_storage.buffer, group=self._group) if self._offload: for param in list(self._unslice_params): param._clear_data() @@ -600,7 +601,7 @@ class GroupShardedStage3(nn.Layer): if param.name in self._task_flow.full_grad.keys(): full_grad = self._task_flow.full_grad[param.name] # Only support sync allreduce current rank's layer now - collective.all_reduce(tensor=full_grad, group=self._group) + dist.all_reduce(tensor=full_grad, group=self._group) start, end = self._param2buffer[param.name][self._rank] if param.bw_storage is None: diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py index 63ce53f01c..a6ac8edc06 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py @@ -30,8 +30,9 @@ from collections import deque from types import MethodType import paddle +import paddle.distributed as dist from paddle import nn -from paddle.distributed import collective as dist +from paddle.distributed import collective as collective from paddle.distributed.collective import _get_global_group from ...utils.internal_storage import GradStorage @@ -92,7 +93,7 @@ class ShardingStage2(nn.Layer): # Communication related attributes self._group = ( - dist.new_group(_get_global_group().ranks) + collective.new_group(_get_global_group().ranks) if group is None else group ) @@ -317,7 +318,7 @@ class ShardingStage2(nn.Layer): buffer, self._global_root_rank, self._group, sync_op=True ) # Multi stream operation will be supported later - dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) + collective.wait(tensor=buffer, group=self._group, use_calc_stream=True) def __getattr__(self, name): """Forward missing attributes to wrapped layer.""" @@ -381,7 +382,7 @@ class ShardingStage2(nn.Layer): ) # Multi stream operation will be supported later - dist.wait( + collective.wait( tensor=param.grad, group=self._group, use_calc_stream=True, @@ -447,7 +448,7 @@ class ShardingStage2(nn.Layer): ) # Multi stream operation will be supported later - dist.wait( + collective.wait( tensor=grad_storage.buffer, group=self._group, use_calc_stream=True, diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py index deae0cddd2..c9a14fd17b 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py @@ -18,12 +18,13 @@ from types import MethodType from collections import OrderedDict import paddle +import paddle.distributed as dist from paddle import nn from paddle.autograd import PyLayer import paddle.fluid.core as core from paddle.fluid.framework import ParamBase from paddle.fluid.clip import ClipGradByGlobalNorm -from paddle.distributed import collective as dist +from paddle.distributed import collective from paddle.distributed.collective import _get_global_group from .sharding_utils import Type, ShardingClipGrad, device_guard @@ -101,7 +102,7 @@ class ShardingStage3(nn.Layer): # Communication group establishment self._group = ( - dist.new_group(_get_global_group().ranks) + collective.new_group(_get_global_group().ranks) if group is None else group ) @@ -183,7 +184,7 @@ class ShardingStage3(nn.Layer): ) # Multi stream operation will be supported later - dist.wait(tensor=p, group=self._group, use_calc_stream=True) + collective.wait(tensor=p, group=self._group, use_calc_stream=True) def _clear_gradients(self): assert len(self._trainable_params.keys()) > 0 @@ -484,7 +485,7 @@ class ShardingStage3(nn.Layer): buffer, self._global_root_rank, self._group, sync_op=True ) # Multi stream operation will be supported later - dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) + collective.wait(tensor=buffer, group=self._group, use_calc_stream=True) def __getattr__(self, name): """Forward missing attributes to wrapped layer.""" @@ -528,7 +529,7 @@ class ShardingStage3(nn.Layer): dist.all_reduce( tensor=grad_storage.buffer, group=self._group, sync_op=True ) - dist.wait( + collective.wait( tensor=grad_storage.buffer, group=self._group, use_calc_stream=True, @@ -600,7 +601,7 @@ class ShardingStage3(nn.Layer): dist.all_reduce( tensor=full_grad, group=self._group, sync_op=True ) - dist.wait( + collective.wait( tensor=full_grad, group=self._group, use_calc_stream=True ) @@ -945,7 +946,7 @@ def _allgather_buffer( # Allgather current layer in the 1st step synchronously if sync_wait: with paddle.amp.auto_cast(enable=False): - dist.wait( + collective.wait( tensor=full_param, group=group, use_calc_stream=use_calc_stream, diff --git a/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py b/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py index a8a7c67d96..4d66f19356 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py @@ -17,6 +17,7 @@ import unittest import paddle import numpy as np import paddle.distributed as dist +from paddle.distributed.communication.reduce_scatter import _reduce_scatter_base class TestCollectiveReduceScatter(unittest.TestCase): @@ -75,9 +76,7 @@ class TestCollectiveReduceScatter(unittest.TestCase): # [1, 2, 3, 4] # Rank-1 output = paddle.empty(shape=[2], dtype=input.dtype) - task = paddle.distributed.collective._reduce_scatter_base( - output, input, sync_op=False - ) + task = _reduce_scatter_base(output, input, sync_op=False) task.wait() diff --git a/python/paddle/incubate/distributed/models/moe/grad_clip.py b/python/paddle/incubate/distributed/models/moe/grad_clip.py index 27a0548e47..9a3a0dfc0f 100644 --- a/python/paddle/incubate/distributed/models/moe/grad_clip.py +++ b/python/paddle/incubate/distributed/models/moe/grad_clip.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import paddle.distributed as dist from paddle.fluid.clip import ClipGradBase, _squared_l2_norm from paddle.fluid.dygraph import base as imperative_base from paddle.fluid import core, layers -from paddle.distributed import collective class ClipGradForMOEByGlobalNorm(ClipGradBase): @@ -185,9 +185,9 @@ class ClipGradForMOEByGlobalNorm(ClipGradBase): moe_params_grads, sum_dtype ) if global_norm_var_moe is not None: - collective.all_reduce( + dist.all_reduce( global_norm_var_moe, - op=collective.ReduceOp.SUM, + op=dist.ReduceOp.SUM, group=self.moe_group, ) -- GitLab