未验证 提交 8fe86ebb 编写于 作者: 张春乔 提交者: GitHub

[xdoctest] reformat example code with google style in No. 203 - 211 (#56473)

* 203

* 204

* 205

* 206

* 207

* 208

* 209

* 210

* 211

* Update all_to_all.py

* Apply suggestions from code review
上级 992fff6a
......@@ -93,21 +93,22 @@ def all_reduce(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
data = None
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.all_reduce(data, sync_op=False)
task.wait()
out = data
# [[5, 7, 9], [5, 7, 9]]
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> local_rank = dist.get_rank()
>>> data = None
>>> if local_rank == 0:
... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
>>> else:
... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
>>> task = dist.stream.all_reduce(data, sync_op=False)
>>> task.wait()
>>> out = data
>>> print(out)
[[5, 7, 9], [5, 7, 9]]
"""
if _warn_cur_rank_not_in_group(group):
return
......
......@@ -154,23 +154,23 @@ def alltoall(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
out_tensor_list = []
if dist.get_rank() == 0:
data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]])
data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]])
else:
data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]])
data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]])
task = dist.stream.alltoall(out_tensor_list, [data1, data2], sync_op=False)
task.wait()
print(out_tensor_list)
# [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0)
# [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> out_tensor_list = []
>>> if dist.get_rank() == 0:
... data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]])
... data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]])
>>> else:
... data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]])
... data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]])
>>> task = dist.stream.alltoall(out_tensor_list, [data1, data2], sync_op=False)
>>> task.wait()
>>> print(out_tensor_list)
>>> # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0)
>>> # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1)
"""
if _warn_cur_rank_not_in_group(group):
return
......@@ -289,43 +289,45 @@ def alltoall_single(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
# case 1
output = paddle.empty([2], dtype="int64")
if local_rank == 0:
data = paddle.to_tensor([0, 1])
else:
data = paddle.to_tensor([2, 3])
task = dist.stream.alltoall_single(output, data, sync_op=False)
task.wait()
out = output.numpy()
# [0, 2] (2 GPUs, out for rank 0)
# [1, 3] (2 GPUs, out for rank 1)
# case 2
size = dist.get_world_size()
output = paddle.empty([(local_rank + 1) * size, size], dtype='float32')
if local_rank == 0:
data = paddle.to_tensor([[0., 0.], [0., 0.], [0., 0.]])
else:
data = paddle.to_tensor([[1., 1.], [1., 1.], [1., 1.]])
out_split_sizes = [local_rank + 1 for i in range(size)]
in_split_sizes = [i + 1 for i in range(size)]
task = dist.stream.alltoall_single(output,
data,
out_split_sizes,
in_split_sizes,
sync_op=False)
task.wait()
out = output.numpy()
# [[0., 0.], [1., 1.]] (2 GPUs, out for rank 0)
# [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] (2 GPUs, out for rank 1)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> local_rank = dist.get_rank()
>>> # case 1
>>> output = paddle.empty([2], dtype="int64")
>>> if local_rank == 0:
... data = paddle.to_tensor([0, 1])
>>> else:
... data = paddle.to_tensor([2, 3])
>>> task = dist.stream.alltoall_single(output, data, sync_op=False)
>>> task.wait()
>>> out = output.numpy()
>>> print(out)
>>> # [0, 2] (2 GPUs, out for rank 0)
>>> # [1, 3] (2 GPUs, out for rank 1)
>>> # case 2
>>> size = dist.get_world_size()
>>> output = paddle.empty([(local_rank + 1) * size, size], dtype='float32')
>>> if local_rank == 0:
... data = paddle.to_tensor([[0., 0.], [0., 0.], [0., 0.]])
>>> else:
... data = paddle.to_tensor([[1., 1.], [1., 1.], [1., 1.]])
>>> out_split_sizes = [local_rank + 1 for i in range(size)]
>>> in_split_sizes = [i + 1 for i in range(size)]
>>> task = dist.stream.alltoall_single(output,
... data,
... out_split_sizes,
... in_split_sizes,
... sync_op=False)
>>> task.wait()
>>> out = output.numpy()
>>> print(out)
>>> # [[0., 0.], [1., 1.]] (2 GPUs, out for rank 0)
>>> # [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] (2 GPUs, out for rank 1)
"""
if _warn_cur_rank_not_in_group(group):
return
......
......@@ -94,20 +94,21 @@ def broadcast(tensor, src, group=None, sync_op=True, use_calc_stream=False):
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.broadcast(data, src=1, sync_op=False)
task.wait()
out = data.numpy()
# [[1, 2, 3], [1, 2, 3]] (2 GPUs)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> local_rank = dist.get_rank()
>>> if local_rank == 0:
... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
>>> else:
... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
>>> task = dist.stream.broadcast(data, src=1, sync_op=False)
>>> task.wait()
>>> out = data.numpy()
>>> print(out)
>>> # [[1, 2, 3], [1, 2, 3]] (2 GPUs)
"""
if _warn_cur_rank_not_in_group(group):
return
......
......@@ -80,21 +80,21 @@ def gather(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
gather_list = []
if dist.get_rank() == 0:
data = paddle.to_tensor([1, 2, 3])
dist.stream.gather(data, gather_list, dst=0)
else:
data = paddle.to_tensor([4, 5, 6])
dist.stream.gather(data1, gather_list, dst=0)
print(gather_list)
# [[1, 2, 3], [4, 5, 6]] (2 GPUs, out for rank 0)
# [] (2 GPUs, out for rank 1)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> gather_list = []
>>> if dist.get_rank() == 0:
... data = paddle.to_tensor([1, 2, 3])
... dist.stream.gather(data, gather_list, dst=0)
>>> else:
... data = paddle.to_tensor([4, 5, 6])
... dist.stream.gather(data1, gather_list, dst=0)
>>> print(gather_list)
>>> # [[1, 2, 3], [4, 5, 6]] (2 GPUs, out for rank 0)
>>> # [] (2 GPUs, out for rank 1)
"""
assert (
......
......@@ -81,21 +81,22 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
task = dist.stream.send(data, dst=1, sync_op=False)
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.recv(data, src=0, sync_op=False)
task.wait()
out = data.numpy()
# [[4, 5, 6], [4, 5, 6]] (2 GPUs)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> local_rank = dist.get_rank()
>>> if local_rank == 0:
... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
... task = dist.stream.send(data, dst=1, sync_op=False)
>>> else:
... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
... task = dist.stream.recv(data, src=0, sync_op=False)
>>> task.wait()
>>> out = data.numpy()
>>> print(out)
>>> # [[4, 5, 6], [4, 5, 6]] (2 GPUs)
"""
if _warn_cur_rank_not_in_group(group):
return
......
......@@ -107,21 +107,22 @@ def reduce(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.reduce(data, dst=0, sync_op=False)
task.wait()
out = data.numpy()
# [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0)
# [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> local_rank = dist.get_rank()
>>> if local_rank == 0:
... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
>>> else:
... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
>>> task = dist.stream.reduce(data, dst=0, sync_op=False)
>>> task.wait()
>>> out = data.numpy()
>>> print(out)
>>> # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0)
>>> # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1)
"""
if _warn_cur_rank_not_in_group(group):
return
......
......@@ -134,21 +134,22 @@ def reduce_scatter(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([0, 1])
data2 = paddle.to_tensor([2, 3])
else:
data1 = paddle.to_tensor([4, 5])
data2 = paddle.to_tensor([6, 7])
dist.stream.reduce_scatter(data1, [data1, data2])
out = data1.numpy()
# [4, 6] (2 GPUs, out for rank 0)
# [8, 10] (2 GPUs, out for rank 1)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> if dist.get_rank() == 0:
... data1 = paddle.to_tensor([0, 1])
... data2 = paddle.to_tensor([2, 3])
>>> else:
... data1 = paddle.to_tensor([4, 5])
... data2 = paddle.to_tensor([6, 7])
>>> dist.stream.reduce_scatter(data1, [data1, data2])
>>> out = data1.numpy()
>>> print(out)
>>> # [4, 6] (2 GPUs, out for rank 0)
>>> # [8, 10] (2 GPUs, out for rank 1)
"""
if _warn_cur_rank_not_in_group(group):
return
......@@ -218,22 +219,23 @@ def _reduce_scatter_base(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([7, 8, 9])
data2 = paddle.to_tensor([10, 11, 12])
dist.stream.scatter(data1, src=1)
else:
data1 = paddle.to_tensor([1, 2, 3])
data2 = paddle.to_tensor([4, 5, 6])
dist.stream.scatter(data1, [data1, data2], src=1)
out = data1.numpy()
# [1, 2, 3] (2 GPUs, out for rank 0)
# [4, 5, 6] (2 GPUs, out for rank 1)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> if dist.get_rank() == 0:
... data1 = paddle.to_tensor([7, 8, 9])
... data2 = paddle.to_tensor([10, 11, 12])
... dist.stream.scatter(data1, src=1)
>>> else:
... data1 = paddle.to_tensor([1, 2, 3])
... data2 = paddle.to_tensor([4, 5, 6])
... dist.stream.scatter(data1, [data1, data2], src=1)
>>> out = data1.numpy()
>>> print(out)
>>> # [1, 2, 3] (2 GPUs, out for rank 0)
>>> # [4, 5, 6] (2 GPUs, out for rank 1)
"""
if _warn_cur_rank_not_in_group(group):
return
......
......@@ -164,22 +164,23 @@ def scatter(
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([7, 8, 9])
data2 = paddle.to_tensor([10, 11, 12])
dist.stream.scatter(data1, src=1)
else:
data1 = paddle.to_tensor([1, 2, 3])
data2 = paddle.to_tensor([4, 5, 6])
dist.stream.scatter(data1, [data1, data2], src=1)
out = data1.numpy()
# [1, 2, 3] (2 GPUs, out for rank 0)
# [4, 5, 6] (2 GPUs, out for rank 1)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> if dist.get_rank() == 0:
... data1 = paddle.to_tensor([7, 8, 9])
... data2 = paddle.to_tensor([10, 11, 12])
... dist.stream.scatter(data1, src=1)
>>> else:
... data1 = paddle.to_tensor([1, 2, 3])
... data2 = paddle.to_tensor([4, 5, 6])
... dist.stream.scatter(data1, [data1, data2], src=1)
>>> out = data1.numpy()
>>> print(out)
>>> # [1, 2, 3] (2 GPUs, out for rank 0)
>>> # [4, 5, 6] (2 GPUs, out for rank 1)
"""
if _warn_cur_rank_not_in_group(group):
return
......
......@@ -80,21 +80,22 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False):
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
if local_rank == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
task = dist.stream.send(data, dst=1, sync_op=False)
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
task = dist.stream.recv(data, src=0, sync_op=False)
task.wait()
out = data.numpy()
# [[4, 5, 6], [4, 5, 6]] (2 GPUs)
>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist
>>> dist.init_parallel_env()
>>> local_rank = dist.get_rank()
>>> if local_rank == 0:
... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
... task = dist.stream.send(data, dst=1, sync_op=False)
>>> else:
... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
... task = dist.stream.recv(data, src=0, sync_op=False)
>>> task.wait()
>>> out = data.numpy()
>>> print(out)
>>> # [[4, 5, 6], [4, 5, 6]] (2 GPUs)
"""
if _warn_cur_rank_not_in_group(group):
return
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册