diff --git a/imperative/python/megengine/distributed/functional.py b/imperative/python/megengine/distributed/functional.py index d1c67271a3f41abc7b6a7086a7a3c6f00155461a..0864efeee89e9851abc13ef646f05a919e14e409 100644 --- a/imperative/python/megengine/distributed/functional.py +++ b/imperative/python/megengine/distributed/functional.py @@ -389,31 +389,52 @@ def reduce_scatter_sum( def all_reduce_sum( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - r"""Reduce tensors across the specified group by sum. + r"""Reduce tensors with sum operation on each value across the specified group. + + Note: + ``inp`` tensor must have identical shape in all processes across the group. Args: - inp: Input tensor. - group: The process group to work on. - The default group is WORLD which means all processes available. - You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. - device: The specific device to execute this operator. - None default device means the device of inp will be used. - Specify "gpu0:1" to execute this operator on diffrent cuda stream, - 1 is stream id, and default stream id is 0. + inp (Tensor): tensor to be reduced. + + Keyword args: + group (Group or sequence of ints): the process group to work on. Default: ``WORLD``. + ``WORLD`` group selects all processes available. + list of process rank as parameter will create a new group to work on. + device (:attr:`.Tensor.device`): the specific device to execute this operator. Default: ``None`` + ``None`` will select the device of ``inp`` to execute. + Specially, ``GPU`` device can assign a different stream to execute + by adding a number right after a colon following the device name while + ``:0`` denotes default stream of GPU, otherwise will use default stream. Returns: - Result tensor. + A tensor with sum operation on each value across the group. + + The shape of the output tensor must be the same as ``inp``, and the output + tensor is going to be bitwise identical in all processes across the group. + Examples: - .. code-block:: + >>> # We execute all_reduce_sum on rank 0 and rank 1 + >>> input = F.arange(2) + 1 + 2 * rank + >>> input + Tensor([1. 2.], device=xpux:0) # Rank 0 + Tensor([3. 4.], device=xpux:0) # Rank 1 + >>> F.distributed.all_reduce_sum(input, group=[0, 1]) + Tensor([4. 6.], device=xpux:0) # Rank 0 + Tensor([4. 6.], device=xpux:0) # Rank 1 + + >>> # We execute all_reduce_sum with on gpu0 with cuda stream 1 + >>> megengine.set_default_device("gpu0") + >>> input = F.arange(2) + 1 + 2 * rank + >>> input + Tensor([1. 2.], device=gpu0:0) # Rank 0 + Tensor([3. 4.], device=gpu0:0) # Rank 1 + >>> F.distributed.all_reduce_sum(input, device="gpu0:1") + Tensor([4. 6.], device=gpu0:0) # Rank 0 + Tensor([4. 6.], device=gpu0:0) # Rank 1 - input = Tensor(rank) - # Rank 0 # input: Tensor(0) - # Rank 1 # input: Tensor(1) - output = all_reduce_sum(input) - # Rank 0 # output: Tensor(1) - # Rank 1 # output: Tensor(1) """ mode = CollectiveComm.Mode.ALL_REDUCE_SUM return collective_comm(inp, mode, group, device) @@ -422,32 +443,53 @@ def all_reduce_sum( def all_reduce_max( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - r"""Reduce tensors across the specified group by max. + r"""Reduce tensors with max operation on each value across the specified group. + + Note: + ``inp`` tensor must have identical shape in all processes across the group. Args: - inp: Input tensor. - group: The process group to work on. - The default group is WORLD which means all processes available. - You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. - device: The specific device to execute this operator. - None default device means the device of inp will be used. - Specify "gpu0:1" to execute this operator on diffrent cuda stream, - 1 is stream id, and default stream id is 0. + inp (Tensor): tensor to be reduced. + + Keyword args: + group (Group or sequence of ints): the process group to work on. Default: ``WORLD``. + ``WORLD`` group selects all processes available. + list of process rank as parameter will create a new group to work on. + device (:attr:`.Tensor.device`): the specific device to execute this operator. Default: ``None`` + ``None`` will select the device of ``inp`` to execute. + Specially, ``GPU`` device can assign a different stream to execute + by adding a number right after a colon following the device name while + ``:0`` denotes default stream of GPU, otherwise will use default stream. Returns: - Result tensor. + A tensor with max operation on each value across the group. + + The shape of the output tensor must be the same as ``inp``, and the output + tensor is going to be bitwise identical in all processes across the group. Examples: - .. code-block:: + >>> # We execute all_reduce_max on rank 0 and rank 1 + >>> input = F.arange(2) + 1 + 2 * rank + >>> input + Tensor([1. 2.], device=xpux:0) # Rank 0 + Tensor([3. 4.], device=xpux:0) # Rank 1 + >>> F.distributed.all_reduce_max(input, group=[0, 1]) + Tensor([3. 4.], device=xpux:0) # Rank 0 + Tensor([3. 4.], device=xpux:0) # Rank 1 + + >>> # We execute all_reduce_max with on gpu0 with cuda stream 1 + >>> megengine.set_default_device("gpu0") + >>> input = F.arange(2) + 1 + 2 * rank + >>> input + Tensor([1. 2.], device=gpu0:0) # Rank 0 + Tensor([3. 4.], device=gpu0:0) # Rank 1 + >>> F.distributed.all_reduce_max(input, device="gpu0:1") + Tensor([3. 4.], device=xpux:0) # Rank 0 + Tensor([3. 4.], device=xpux:0) # Rank 1 - input = Tensor(rank) - # Rank 0 # input: Tensor(0) - # Rank 1 # input: Tensor(1) - output = all_reduce_max(input) - # Rank 0 # output: Tensor(1) - # Rank 1 # output: Tensor(1) """ + mode = CollectiveComm.Mode.ALL_REDUCE_MAX return collective_comm(inp, mode, group, device) @@ -455,31 +497,51 @@ def all_reduce_max( def all_reduce_min( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - r"""Reduce tensors across the specified group by min. + r"""Reduce tensors with min operation on each value across the specified group. + + Note: + ``inp`` tensor must have identical shape in all processes across the group. Args: - inp: Input tensor. - group: The process group to work on. - The default group is WORLD which means all processes available. - You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. - device: The specific device to execute this operator. - None default device means the device of inp will be used. - Specify "gpu0:1" to execute this operator on diffrent cuda stream, - 1 is stream id, and default stream id is 0. + inp (Tensor): tensor to be reduced. + + Keyword args: + group (Group or sequence of ints): the process group to work on. Default: ``WORLD``. + ``WORLD`` group selects all processes available. + list of process rank as parameter will create a new group to work on. + device (:attr:`.Tensor.device`): the specific device to execute this operator. Default: ``None`` + ``None`` will select the device of ``inp`` to execute. + Specially, ``GPU`` device can assign a different stream to execute + by adding a number right after a colon following the device name while + ``:0`` denotes default stream of GPU, otherwise will use default stream. Returns: - Result tensor. + A tensor with min operation on each value across the group. + + The shape of the output tensor must be the same as ``inp``, and the output + tensor is going to be bitwise identical in all processes across the group. Examples: - .. code-block:: + >>> # We execute all_reduce_min on rank 0 and rank 1 + >>> input = F.arange(2) + 1 + 2 * rank + >>> input + Tensor([1. 2.], device=xpux:0) # Rank 0 + Tensor([3. 4.], device=xpux:0) # Rank 1 + >>> F.distributed.all_reduce_min(input, group=[0, 1]) + Tensor([1. 2.], device=xpux:0) # Rank 0 + Tensor([1. 2.], device=xpux:0) # Rank 1 + + >>> # We execute all_reduce_min with on gpu0 with cuda stream 1 + >>> megengine.set_default_device("gpu0") + >>> input = F.arange(2) + 1 + 2 * rank + >>> input + Tensor([1. 2.], device=gpu0:0) # Rank 0 + Tensor([3. 4.], device=gpu0:0) # Rank 1 + >>> F.distributed.all_reduce_min(input, device="gpu0:1") + Tensor([1. 2.], device=xpux:0) # Rank 0 + Tensor([1. 2.], device=xpux:0) # Rank 1 - input = Tensor(rank) - # Rank 0 # input: Tensor(0) - # Rank 1 # input: Tensor(1) - output = all_reduce_min(input) - # Rank 0 # output: Tensor(0) - # Rank 1 # output: Tensor(0) """ mode = CollectiveComm.Mode.ALL_REDUCE_MIN return collective_comm(inp, mode, group, device)