提交 92079e10 编写于 作者: M Megvii Engine Team

docs(mge/distributed): update distributed collective communication Google style docs

GitOrigin-RevId: 871c4052b0c58f4bb1a9008ea5af638ceb1a7b6f
上级 f9719828
......@@ -158,12 +158,40 @@ class _ReduceSum(Function):
def reduce_sum(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create reduce_sum operator for collective communication.
r"""
Reduce tensor data across the specified group by sum.
Only root process will receive the final result.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Reduced tensor if in root process, None in other processes.
Examples:
.. code-block::
input = Tensor([rank])
# Rank 0 # input: Tensor([0])
# Rank 1 # input: Tensor([1])
output = reduce_sum(input)
# Rank 0 # output: Tensor([1])
# Rank 1 # output: None
input = Tensor([rank])
group = Group([1, 0]) # first rank is root
output = reduce_sum(input, group)
# Rank 0 # output: None
# Rank 1 # output: Tensor([1])
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
"""
op = _ReduceSum(group, device)
(out,) = apply(op, inp)
......@@ -194,12 +222,39 @@ class _Broadcast(Function):
def broadcast(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create broadcast operator for collective communication.
r"""
Broadcast tensor data from root process to others.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Result tensor.
Examples:
.. code-block::
input = Tensor([rank])
# Rank 0 # input: Tensor([0])
# Rank 1 # input: Tensor([1])
output = broadcast(input)
# Rank 0 # output: Tensor([0])
# Rank 1 # output: Tensor([0])
input = Tensor([rank])
group = Group([1, 0]) # first rank is root
output = broadcast(input, group)
# Rank 0 # output: Tensor([1])
# Rank 1 # output: Tensor([1])
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
"""
shape, dtype = _bcast_shape_dtype(group, inp)
if group.rank != 0:
......@@ -223,12 +278,39 @@ def _bcast_param(
def all_gather(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create all_gather operator for collective communication.
r"""
Gather tensors across the specified group and concat them at first dimension.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Result tensor.
Examples:
.. code-block::
input = Tensor([rank])
# Rank 0 # input: Tensor([0])
# Rank 1 # input: Tensor([1])
output = all_gather(input)
# Rank 0 # output: Tensor([0 1])
# Rank 1 # output: Tensor([0 1])
input = Tensor([rank])
group = Group([1, 0])
output = all_gather(input, group)
# Rank 0 # output: Tensor([1 0])
# Rank 1 # output: Tensor([1 0])
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
"""
mode = CollectiveComm.Mode.ALL_GATHER
return collective_comm(inp, mode, group, device)
......@@ -237,12 +319,39 @@ def all_gather(
def reduce_scatter_sum(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create reduce_scatter_sum operator for collective communication.
r"""
Reduce tensors across the specified group by sum and split them at first dimension.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Split tensor.
Examples:
.. code-block::
input = Tensor([0 1])
# Rank 0 # input: Tensor([0 1])
# Rank 1 # input: Tensor([0 1])
output = reduce_scatter_sum(input)
# Rank 0 # output: Tensor([0])
# Rank 1 # output: Tensor([2])
input = Tensor([0 1])
group = Group([1, 0])
output = reduce_scatter_sum(input, group)
# Rank 0 # output: Tensor([2])
# Rank 1 # output: Tensor([0])
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
"""
mode = CollectiveComm.Mode.REDUCE_SCATTER_SUM
return collective_comm(inp, mode, group, device)
......@@ -252,50 +361,31 @@ def all_reduce_sum(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
r"""
Create all_reduce_sum operator for collective communication.
This operator sums the tensor data by coordinates across the specified group and returns a tensor with the shape of the input tensor.
Reduce tensors across the specified group by sum.
Args:
inp: The tensor data to apply this operator on.
group: The communication node list instance of :class:'Group' to apply this operator across. The default group is WORLD which means all processes available.
Specify a list of process ranks to apply this operator on specific processes, e.g. [1, 3, 5].
device: The specific device type of :class:'str' to execute this operator. The default device is None which mean the device of inp will be used.
Specify "cpu" or "gpu" to execute this operator on specific devices.
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
opt: The reduce sum tensor of the input tensor data across the specified group.
Result tensor.
Examples:
.. code-block::
import megengine as mge
import megengine.distributed as dist
import numpy as np
from warnings import warn
def func(sum_value):
# get the rank of this process, the ranks shold be 0, 1, 2, 3 for a 4 gpu task
rank = dist.get_rank()
data = mge.tensor(rank)
# the result should be n * (n - 1) / 2 for all processes
result = mge.functional.distributed.all_reduce_sum(data).item()
assert result == sum_value
def main():
p_num = mge.device.get_device_count("gpu")
if p_num < 2:
warn('This opr only works on group with more than one gpu')
return
method = dist.launcher(func)
method(p_num * (p_num - 1) // 2)
if __name__ == '__main__':
main()
input = Tensor(rank)
# Rank 0 # input: Tensor(0)
# Rank 1 # input: Tensor(1)
output = all_reduce_sum(input)
# Rank 0 # output: Tensor(1)
# Rank 1 # output: Tensor(1)
"""
mode = CollectiveComm.Mode.ALL_REDUCE_SUM
......@@ -305,13 +395,33 @@ def all_reduce_sum(
def all_reduce_max(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create all_reduce_max operator for collective communication.
r"""
Reduce tensors across the specified group by max.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Result tensor.
Examples:
.. code-block::
input = Tensor(rank)
# Rank 0 # input: Tensor(0)
# Rank 1 # input: Tensor(1)
output = all_reduce_max(input)
# Rank 0 # output: Tensor(1)
# Rank 1 # output: Tensor(1)
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
:returns: reduced tensor.
"""
mode = CollectiveComm.Mode.ALL_REDUCE_MAX
return collective_comm(inp, mode, group, device)
......@@ -321,50 +431,31 @@ def all_reduce_min(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
r"""
Create all_reduce_min operator for collective communication.
This operator calculates the minimum value of the tensor data by coordinates across the specified group and returns a tensor with the shape of the input tensor.
Reduce tensors across the specified group by min.
Args:
inp: The tensor data to apply this operator on.
group: The communication node list instance of :class:'Group' to apply this operator across. The default group is WORLD which means all processes available.
Specify a list of process ranks to apply this operator on specific processes, e.g. [1, 3, 5].
device: The specific device type of :class:'str' to execute this operator. The default device is None which mean the device of inp will be used.
Specify "cpu" or "gpu" to execute this operator on specific devices.
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
opt: The reduce min tensor of the input tensor data across the specified group.
Result tensor.
Examples:
.. code-block::
import megengine as mge
import megengine.distributed as dist
import numpy as np
from warnings import warn
def func(min_value):
# get the rank of this process, the ranks shold be 0, 1, 2, 3 for a 4 gpu task
rank = dist.get_rank()
data = mge.Tensor(rank)
# the result should be 0 for all processes
result = mge.functional.distributed.all_reduce_min(data).item()
assert result == min_value
def main():
p_num = dist.helper.get_device_count("gpu")
if p_num < 2:
warn('This opr only works on group with more than one gpu')
return
method = dist.launcher(func)
method(0)
if __name__ == '__main__':
main()
input = Tensor(rank)
# Rank 0 # input: Tensor(0)
# Rank 1 # input: Tensor(1)
output = all_reduce_min(input)
# Rank 0 # output: Tensor(0)
# Rank 1 # output: Tensor(0)
"""
mode = CollectiveComm.Mode.ALL_REDUCE_MIN
......@@ -391,12 +482,40 @@ class _Gather(Function):
def gather(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create gather operator for collective communication.
r"""
Gather tensors across the specified group.
Only root process will receive the final result.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Result tensor if in root process, None if in other process
Examples:
.. code-block::
input = Tensor([rank])
# Rank 0 # input: Tensor([0])
# Rank 1 # input: Tensor([1])
output = gather(input)
# Rank 0 # output: Tensor([0 1])
# Rank 1 # output: None
input = Tensor([rank])
group = Group([1, 0]) # first rank is root
output = gather(input, group)
# Rank 0 # output: None
# Rank 1 # output: Tensor([1 0])
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
"""
op = _Gather(group, device)
......@@ -428,12 +547,39 @@ class _Scatter(Function):
def scatter(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create scatter operator for collective communication.
r"""
Split tensor in root process at first dimension.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Split tensor.
Examples:
.. code-block::
input = Tensor([0 1]) + rank*2
# Rank 0 # input: Tensor([0 1])
# Rank 1 # input: Tensor([2 3])
output = scatter(input)
# Rank 0 # output: Tensor([0])
# Rank 1 # output: Tensor([1])
input = Tensor([0 1]) + rank*2
group = Group([1, 0]) # first rank is root
output = scatter(input, group)
# Rank 0 # output: Tensor([3])
# Rank 1 # output: Tensor([2])
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
"""
shape, dtype = _bcast_shape_dtype(group, inp)
if group.rank != 0:
......@@ -450,12 +596,39 @@ def scatter(
def all_to_all(
inp: Tensor, group: Optional[Group] = WORLD, device: Optional[str] = None,
) -> Tensor:
"""
Create all_to_all operator for collective communication.
r"""
Each process scatter input tensor to all processes and return gathered tensor.
Args:
inp: Input tensor.
group: The process group to work on.
The default group is WORLD which means all processes available.
You can use a list of process ranks to create new group to work on it, e.g. [1, 3, 5].
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
Returns:
Result tensor.
Examples:
.. code-block::
input = Tensor([0 1]) + rank*2
# Rank 0 # input: Tensor([0 1])
# Rank 1 # input: Tensor([2 3])
output = all_to_all(input)
# Rank 0 # output: Tensor([0 2])
# Rank 1 # output: Tensor([1 3])
input = Tensor([0 1]) + rank*2
group = Group([1, 0])
output = all_to_all(input, group)
# Rank 0 # output: Tensor([0 3])
# Rank 1 # output: Tensor([2 1])
:param inp: input tensor.
:param group: communication group.
:param device: execution device.
"""
mode = CollectiveComm.Mode.ALL_TO_ALL
return collective_comm(inp, mode, group, device)
......@@ -506,11 +679,28 @@ class _RemoteRecv(Function):
def remote_send(inp: Tensor, dest_rank: int):
"""
Send a Tensor to a remote process.
r"""
Send tensor to another process.
Args:
inp: Tensor to send.
dest_rank: Rank of destination process.
Returns:
None.
Examples:
.. code-block::
if rank == 0:
data = mge.tensor(1)
# Tensor(1)
F.distributed.remote_send(data, 1) # return None
else:
data = F.distributed.remote_recv(0)
# Tensor(1)
:param inp: tensor to send.
:param dest_rank: destination process rank.
"""
group = _SendRecvGroup(get_rank(), dest_rank)
_bcast_shape_dtype(group, inp)
......@@ -528,12 +718,32 @@ def remote_send(inp: Tensor, dest_rank: int):
def remote_recv(src_rank: int, device: Optional[str] = None, inp=None) -> Tensor:
"""
Receive a Tensor from a remote process.
r"""
Receive a tensor from another process.
Args:
src_rank: Rank of source process.
device: The specific device to execute this operator.
None default device means the device of inp will be used.
Specify "gpu0:1" to execute this operator on diffrent cuda stream,
1 is stream id, and default stream id is 0.
inp: Dummy input to determine received tensor type.
Returns:
Received tensor.
Examples:
.. code-block::
if rank == 0:
data = mge.tensor(1)
# Tensor(1)
F.distributed.remote_send(data, 1) # return None
else:
data = F.distributed.remote_recv(0)
# Tensor(1)
:param src_rank: source process rank.
:param device: the device to place the received tensor.
:param inp: dummy input to determine recved tensor type
"""
group = _SendRecvGroup(src_rank, get_rank())
shape, dtype = _bcast_shape_dtype(group, None)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册