diff --git a/imperative/python/megengine/distributed/functional.py b/imperative/python/megengine/distributed/functional.py index 6f2613e7183963cc8ffe30c533b132c7a4bddac5..8a738718c35469c0e74fba4e669651c543822aeb 100644 --- a/imperative/python/megengine/distributed/functional.py +++ b/imperative/python/megengine/distributed/functional.py @@ -158,12 +158,40 @@ class _ReduceSum(Function): def reduce_sum( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create reduce_sum operator for collective communication. + r""" + Reduce tensor data across the specified group by sum. + Only root process will receive the final result. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Reduced tensor if in root process, None in other processes. + + Examples: + + .. code-block:: + + input = Tensor([rank]) + # Rank 0 # input: Tensor([0]) + # Rank 1 # input: Tensor([1]) + output = reduce_sum(input) + # Rank 0 # output: Tensor([1]) + # Rank 1 # output: None + + input = Tensor([rank]) + group = Group([1, 0]) # first rank is root + output = reduce_sum(input, group) + # Rank 0 # output: None + # Rank 1 # output: Tensor([1]) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. """ op = _ReduceSum(group, device) (out,) = apply(op, inp) @@ -194,12 +222,39 @@ class _Broadcast(Function): def broadcast( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create broadcast operator for collective communication. + r""" + Broadcast tensor data from root process to others. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Result tensor. + + Examples: + + .. code-block:: + + input = Tensor([rank]) + # Rank 0 # input: Tensor([0]) + # Rank 1 # input: Tensor([1]) + output = broadcast(input) + # Rank 0 # output: Tensor([0]) + # Rank 1 # output: Tensor([0]) + + input = Tensor([rank]) + group = Group([1, 0]) # first rank is root + output = broadcast(input, group) + # Rank 0 # output: Tensor([1]) + # Rank 1 # output: Tensor([1]) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. """ shape, dtype = _bcast_shape_dtype(group, inp) if group.rank != 0: @@ -223,12 +278,39 @@ def _bcast_param( def all_gather( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create all_gather operator for collective communication. + r""" + Gather tensors across the specified group and concat them at first dimension. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Result tensor. + + Examples: + + .. code-block:: + + input = Tensor([rank]) + # Rank 0 # input: Tensor([0]) + # Rank 1 # input: Tensor([1]) + output = all_gather(input) + # Rank 0 # output: Tensor([0 1]) + # Rank 1 # output: Tensor([0 1]) + + input = Tensor([rank]) + group = Group([1, 0]) + output = all_gather(input, group) + # Rank 0 # output: Tensor([1 0]) + # Rank 1 # output: Tensor([1 0]) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. """ mode = CollectiveComm.Mode.ALL_GATHER return collective_comm(inp, mode, group, device) @@ -237,12 +319,39 @@ def all_gather( def reduce_scatter_sum( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create reduce_scatter_sum operator for collective communication. + r""" + Reduce tensors across the specified group by sum and split them at first dimension. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Split tensor. + + Examples: + + .. code-block:: + + input = Tensor([0 1]) + # Rank 0 # input: Tensor([0 1]) + # Rank 1 # input: Tensor([0 1]) + output = reduce_scatter_sum(input) + # Rank 0 # output: Tensor([0]) + # Rank 1 # output: Tensor([2]) + + input = Tensor([0 1]) + group = Group([1, 0]) + output = reduce_scatter_sum(input, group) + # Rank 0 # output: Tensor([2]) + # Rank 1 # output: Tensor([0]) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. """ mode = CollectiveComm.Mode.REDUCE_SCATTER_SUM return collective_comm(inp, mode, group, device) @@ -252,50 +361,31 @@ def all_reduce_sum( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: r""" - Create all_reduce_sum operator for collective communication. - - This operator sums the tensor data by coordinates across the specified group and returns a tensor with the shape of the input tensor. + Reduce tensors across the specified group by sum. Args: - inp: The tensor data to apply this operator on. - group: The communication node list instance of :class:'Group' to apply this operator across. The default group is WORLD which means all processes available. - Specify a list of process ranks to apply this operator on specific processes, e.g. [1, 3, 5]. - device: The specific device type of :class:'str' to execute this operator. The default device is None which mean the device of inp will be used. - Specify "cpu" or "gpu" to execute this operator on specific devices. + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. Returns: - opt: The reduce sum tensor of the input tensor data across the specified group. + Result tensor. Examples: .. code-block:: - import megengine as mge - import megengine.distributed as dist - import numpy as np - from warnings import warn - - - def func(sum_value): - # get the rank of this process, the ranks shold be 0, 1, 2, 3 for a 4 gpu task - rank = dist.get_rank() - data = mge.tensor(rank) - # the result should be n * (n - 1) / 2 for all processes - result = mge.functional.distributed.all_reduce_sum(data).item() - assert result == sum_value - - - def main(): - p_num = mge.device.get_device_count("gpu") - if p_num < 2: - warn('This opr only works on group with more than one gpu') - return - method = dist.launcher(func) - method(p_num * (p_num - 1) // 2) - - - if __name__ == '__main__': - main() + input = Tensor(rank) + # Rank 0 # input: Tensor(0) + # Rank 1 # input: Tensor(1) + output = all_reduce_sum(input) + # Rank 0 # output: Tensor(1) + # Rank 1 # output: Tensor(1) """ mode = CollectiveComm.Mode.ALL_REDUCE_SUM @@ -305,13 +395,33 @@ def all_reduce_sum( def all_reduce_max( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create all_reduce_max operator for collective communication. + r""" + Reduce tensors across the specified group by max. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Result tensor. + + Examples: + + .. code-block:: + + input = Tensor(rank) + # Rank 0 # input: Tensor(0) + # Rank 1 # input: Tensor(1) + output = all_reduce_max(input) + # Rank 0 # output: Tensor(1) + # Rank 1 # output: Tensor(1) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. - :returns: reduced tensor. """ mode = CollectiveComm.Mode.ALL_REDUCE_MAX return collective_comm(inp, mode, group, device) @@ -321,50 +431,31 @@ def all_reduce_min( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: r""" - Create all_reduce_min operator for collective communication. - - This operator calculates the minimum value of the tensor data by coordinates across the specified group and returns a tensor with the shape of the input tensor. + Reduce tensors across the specified group by min. Args: - inp: The tensor data to apply this operator on. - group: The communication node list instance of :class:'Group' to apply this operator across. The default group is WORLD which means all processes available. - Specify a list of process ranks to apply this operator on specific processes, e.g. [1, 3, 5]. - device: The specific device type of :class:'str' to execute this operator. The default device is None which mean the device of inp will be used. - Specify "cpu" or "gpu" to execute this operator on specific devices. + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. Returns: - opt: The reduce min tensor of the input tensor data across the specified group. + Result tensor. Examples: .. code-block:: - import megengine as mge - import megengine.distributed as dist - import numpy as np - from warnings import warn - - - def func(min_value): - # get the rank of this process, the ranks shold be 0, 1, 2, 3 for a 4 gpu task - rank = dist.get_rank() - data = mge.Tensor(rank) - # the result should be 0 for all processes - result = mge.functional.distributed.all_reduce_min(data).item() - assert result == min_value - - - def main(): - p_num = dist.helper.get_device_count("gpu") - if p_num < 2: - warn('This opr only works on group with more than one gpu') - return - method = dist.launcher(func) - method(0) - - - if __name__ == '__main__': - main() + input = Tensor(rank) + # Rank 0 # input: Tensor(0) + # Rank 1 # input: Tensor(1) + output = all_reduce_min(input) + # Rank 0 # output: Tensor(0) + # Rank 1 # output: Tensor(0) """ mode = CollectiveComm.Mode.ALL_REDUCE_MIN @@ -391,12 +482,40 @@ class _Gather(Function): def gather( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create gather operator for collective communication. + r""" + Gather tensors across the specified group. + Only root process will receive the final result. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Result tensor if in root process, None if in other process + + Examples: + + .. code-block:: + + input = Tensor([rank]) + # Rank 0 # input: Tensor([0]) + # Rank 1 # input: Tensor([1]) + output = gather(input) + # Rank 0 # output: Tensor([0 1]) + # Rank 1 # output: None + + input = Tensor([rank]) + group = Group([1, 0]) # first rank is root + output = gather(input, group) + # Rank 0 # output: None + # Rank 1 # output: Tensor([1 0]) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. """ op = _Gather(group, device) @@ -428,12 +547,39 @@ class _Scatter(Function): def scatter( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create scatter operator for collective communication. + r""" + Split tensor in root process at first dimension. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Split tensor. + + Examples: + + .. code-block:: + + input = Tensor([0 1]) + rank*2 + # Rank 0 # input: Tensor([0 1]) + # Rank 1 # input: Tensor([2 3]) + output = scatter(input) + # Rank 0 # output: Tensor([0]) + # Rank 1 # output: Tensor([1]) + + input = Tensor([0 1]) + rank*2 + group = Group([1, 0]) # first rank is root + output = scatter(input, group) + # Rank 0 # output: Tensor([3]) + # Rank 1 # output: Tensor([2]) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. """ shape, dtype = _bcast_shape_dtype(group, inp) if group.rank != 0: @@ -450,12 +596,39 @@ def scatter( def all_to_all( inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None, ) -> Tensor: - """ - Create all_to_all operator for collective communication. + r""" + Each process scatter input tensor to all processes and return gathered tensor. + + Args: + inp: Input tensor. + group: The process group to work on. + The default group is WORLD which means all processes available. + You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5]. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + + Returns: + Result tensor. + + Examples: + + .. code-block:: + + input = Tensor([0 1]) + rank*2 + # Rank 0 # input: Tensor([0 1]) + # Rank 1 # input: Tensor([2 3]) + output = all_to_all(input) + # Rank 0 # output: Tensor([0 2]) + # Rank 1 # output: Tensor([1 3]) + + input = Tensor([0 1]) + rank*2 + group = Group([1, 0]) + output = all_to_all(input, group) + # Rank 0 # output: Tensor([0 3]) + # Rank 1 # output: Tensor([2 1]) - :param inp: input tensor. - :param group: communication group. - :param device: execution device. """ mode = CollectiveComm.Mode.ALL_TO_ALL return collective_comm(inp, mode, group, device) @@ -506,11 +679,28 @@ class _RemoteRecv(Function): def remote_send(inp: Tensor, dest_rank: int): - """ - Send a Tensor to a remote process. + r""" + Send tensor to another process. + + Args: + inp: Tensor to send. + dest_rank: Rank of destination process. + + Returns: + None. + + Examples: + + .. code-block:: + + if rank == 0: + data = mge.tensor(1) + # Tensor(1) + F.distributed.remote_send(data, 1) # return None + else: + data = F.distributed.remote_recv(0) + # Tensor(1) - :param inp: tensor to send. - :param dest_rank: destination process rank. """ group = _SendRecvGroup(get_rank(), dest_rank) _bcast_shape_dtype(group, inp) @@ -528,12 +718,32 @@ def remote_send(inp: Tensor, dest_rank: int): def remote_recv(src_rank: int, device: Optional[str] = None, inp=None) -> Tensor: - """ - Receive a Tensor from a remote process. + r""" + Receive a tensor from another process. + + Args: + src_rank: Rank of source process. + device: The specific device to execute this operator. + None default device means the device of inp will be used. + Specify "gpu0:1" to execute this operator on diffrent cuda stream, + 1 is stream id, and default stream id is 0. + inp: Dummy input to determine received tensor type. + + Returns: + Received tensor. + + Examples: + + .. code-block:: + + if rank == 0: + data = mge.tensor(1) + # Tensor(1) + F.distributed.remote_send(data, 1) # return None + else: + data = F.distributed.remote_recv(0) + # Tensor(1) - :param src_rank: source process rank. - :param device: the device to place the received tensor. - :param inp: dummy input to determine recved tensor type """ group = _SendRecvGroup(src_rank, get_rank()) shape, dtype = _bcast_shape_dtype(group, None)