diff --git a/python/paddle/distributed/communication/all_gather.py b/python/paddle/distributed/communication/all_gather.py index 90ff8ca14f92f51d4522bc702aff2cddd7e1bf10..1cabac86f4bad913450f34c5eb237fe6c9fbcb64 100644 --- a/python/paddle/distributed/communication/all_gather.py +++ b/python/paddle/distributed/communication/all_gather.py @@ -63,7 +63,7 @@ def all_gather(tensor_list, tensor, group=None, sync_op=True): ... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) >>> dist.all_gather(tensor_list, data) >>> print(tensor_list) - [[[4, 5, 6], [4, 5, 6]], [[1, 2, 3], [1, 2, 3]]] (2 GPUs) + >>> # [[[4, 5, 6], [4, 5, 6]], [[1, 2, 3], [1, 2, 3]]] (2 GPUs) """ return stream.all_gather(tensor_list, tensor, group, sync_op) @@ -99,7 +99,7 @@ def all_gather_object(object_list, obj, group=None): ... obj = {"bar": [4, 5, 6]} >>> dist.all_gather_object(object_list, obj) >>> print(object_list) - [{'foo': [1, 2, 3]}, {'bar': [4, 5, 6]}] (2 GPUs) + >>> # [{'foo': [1, 2, 3]}, {'bar': [4, 5, 6]}] (2 GPUs) """ assert ( framework.in_dynamic_mode() diff --git a/python/paddle/distributed/communication/all_reduce.py b/python/paddle/distributed/communication/all_reduce.py index fdd88d083565457eb6193cf12dd8ab8ed8178388..1ed26315a5d289a9017da8d524342fda0d639bff 100644 --- a/python/paddle/distributed/communication/all_reduce.py +++ b/python/paddle/distributed/communication/all_reduce.py @@ -42,18 +42,18 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): Examples: .. code-block:: python - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) - else: - data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) - dist.all_reduce(data) - print(data) - # [[5, 7, 9], [5, 7, 9]] (2 GPUs) + >>> # doctest: +REQUIRES(env: DISTRIBUTED) + >>> import paddle + >>> import paddle.distributed as dist + + >>> dist.init_parallel_env() + >>> if dist.get_rank() == 0: + ... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) + >>> else: + ... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) + >>> dist.all_reduce(data) + >>> print(data) + >>> # [[5, 7, 9], [5, 7, 9]] (2 GPUs) """ return stream.all_reduce( tensor, op=op, group=group, sync_op=sync_op, use_calc_stream=False diff --git a/python/paddle/distributed/communication/all_to_all.py b/python/paddle/distributed/communication/all_to_all.py index 8c8c16bcbadb8f2741bd80baf980c7aeb28d4856..36567225e9f9c31e5f406f01fa80969195c6d2e6 100644 --- a/python/paddle/distributed/communication/all_to_all.py +++ b/python/paddle/distributed/communication/all_to_all.py @@ -40,22 +40,22 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True): Examples: .. code-block:: python - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - out_tensor_list = [] - if dist.get_rank() == 0: - data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]]) - data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]]) - else: - data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]]) - data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]]) - dist.alltoall([data1, data2], out_tensor_list) - print(out_tensor_list) - # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) - # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) + >>> # doctest: +REQUIRES(env: DISTRIBUTED) + >>> import paddle + >>> import paddle.distributed as dist + + >>> dist.init_parallel_env() + >>> out_tensor_list = [] + >>> if dist.get_rank() == 0: + ... data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]]) + ... data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]]) + >>> else: + ... data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]]) + ... data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]]) + >>> dist.alltoall([data1, data2], out_tensor_list) + >>> print(out_tensor_list) + >>> # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) + >>> # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) """ return stream.alltoall( out_tensor_list, in_tensor_list, group, sync_op, False @@ -92,46 +92,46 @@ def alltoall_single( Examples: .. code-block:: python - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - rank = dist.get_rank() - size = dist.get_world_size() - - # case 1 (2 GPUs) - data = paddle.arange(2, dtype='int64') + rank * 2 - # data for rank 0: [0, 1] - # data for rank 1: [2, 3] - output = paddle.empty([2], dtype='int64') - dist.alltoall_single(data, output) - print(output) - # output for rank 0: [0, 2] - # output for rank 1: [1, 3] - - # case 2 (2 GPUs) - in_split_sizes = [i + 1 for i in range(size)] - # in_split_sizes for rank 0: [1, 2] - # in_split_sizes for rank 1: [1, 2] - out_split_sizes = [rank + 1 for i in range(size)] - # out_split_sizes for rank 0: [1, 1] - # out_split_sizes for rank 1: [2, 2] - data = paddle.ones([sum(in_split_sizes), size], dtype='float32') * rank - # data for rank 0: [[0., 0.], [0., 0.], [0., 0.]] - # data for rank 1: [[1., 1.], [1., 1.], [1., 1.]] - output = paddle.empty([(rank + 1) * size, size], dtype='float32') - group = dist.new_group([0, 1]) - task = dist.alltoall_single(data, - output, - in_split_sizes, - out_split_sizes, - sync_op=False, - group=group) - task.wait() - print(output) - # output for rank 0: [[0., 0.], [1., 1.]] - # output for rank 1: [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] + >>> # doctest: +REQUIRES(env: DISTRIBUTED) + >>> import paddle + >>> import paddle.distributed as dist + + >>> dist.init_parallel_env() + >>> rank = dist.get_rank() + >>> size = dist.get_world_size() + + >>> # case 1 (2 GPUs) + >>> data = paddle.arange(2, dtype='int64') + rank * 2 + >>> # data for rank 0: [0, 1] + >>> # data for rank 1: [2, 3] + >>> output = paddle.empty([2], dtype='int64') + >>> dist.alltoall_single(data, output) + >>> print(output) + >>> # output for rank 0: [0, 2] + >>> # output for rank 1: [1, 3] + + >>> # case 2 (2 GPUs) + >>> in_split_sizes = [i + 1 for i in range(size)] + >>> # in_split_sizes for rank 0: [1, 2] + >>> # in_split_sizes for rank 1: [1, 2] + >>> out_split_sizes = [rank + 1 for i in range(size)] + >>> # out_split_sizes for rank 0: [1, 1] + >>> # out_split_sizes for rank 1: [2, 2] + >>> data = paddle.ones([sum(in_split_sizes), size], dtype='float32') * rank + >>> # data for rank 0: [[0., 0.], [0., 0.], [0., 0.]] + >>> # data for rank 1: [[1., 1.], [1., 1.], [1., 1.]] + >>> output = paddle.empty([(rank + 1) * size, size], dtype='float32') + >>> group = dist.new_group([0, 1]) + >>> task = dist.alltoall_single(data, + ... output, + ... in_split_sizes, + ... out_split_sizes, + ... sync_op=False, + ... group=group) + >>> task.wait() + >>> print(output) + >>> # output for rank 0: [[0., 0.], [1., 1.]] + >>> # output for rank 1: [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] """ return stream.alltoall_single( diff --git a/python/paddle/distributed/communication/batch_isend_irecv.py b/python/paddle/distributed/communication/batch_isend_irecv.py index 17d374fe99f9374434d045fb3d164492285c8a2c..d21664e9364af1f63cbdc98b283c03291f98e00a 100644 --- a/python/paddle/distributed/communication/batch_isend_irecv.py +++ b/python/paddle/distributed/communication/batch_isend_irecv.py @@ -41,23 +41,23 @@ class P2POp: Examples: .. code-block:: python - # required: distributed + >>> # doctest: +REQUIRES(env: DISTRIBUTED) - import paddle - import paddle.distributed as dist + >>> import paddle + >>> import paddle.distributed as dist - dist.init_parallel_env() - rank = dist.get_rank() - world_size = dist.get_world_size() + >>> dist.init_parallel_env() + >>> rank = dist.get_rank() + >>> world_size = dist.get_world_size() - send_t = paddle.arange(2) + rank - # paddle.tensor([0, 1]) # Rank-0 - # paddle.tensor([1, 2]) # Rank-1 + >>> send_t = paddle.arange(2) + rank + >>> # paddle.tensor([0, 1]) # Rank-0 + >>> # paddle.tensor([1, 2]) # Rank-1 - recv_t = paddle.empty(shape=[2], dtype=send_t.dtype) + >>> recv_t = paddle.empty(shape=[2], dtype=send_t.dtype) - send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size) - recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size) + >>> send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size) + >>> recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size) """ @@ -127,32 +127,32 @@ def batch_isend_irecv(p2p_op_list): Examples: .. code-block:: python - # required: distributed + >>> # doctest: +REQUIRES(env: DISTRIBUTED) - import paddle - import paddle.distributed as dist + >>> import paddle + >>> import paddle.distributed as dist - dist.init_parallel_env() - rank = dist.get_rank() - world_size = dist.get_world_size() + >>> dist.init_parallel_env() + >>> rank = dist.get_rank() + >>> world_size = dist.get_world_size() - send_t = paddle.arange(2) + rank - # paddle.tensor([0, 1]) # Rank-0 - # paddle.tensor([1, 2]) # Rank-1 + >>> send_t = paddle.arange(2) + rank + >>> # paddle.tensor([0, 1]) # Rank-0 + >>> # paddle.tensor([1, 2]) # Rank-1 - recv_t = paddle.empty(shape=[2], dtype=send_t.dtype) + >>> recv_t = paddle.empty(shape=[2], dtype=send_t.dtype) - send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size) - recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size) + >>> send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size) + >>> recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size) - tasks = dist.batch_isend_irecv([send_op, recv_op]) + >>> tasks = dist.batch_isend_irecv([send_op, recv_op]) - for task in tasks: - task.wait() + >>> for task in tasks: + ... task.wait() - print(recv_t) - # paddle.tensor([1, 2]) # Rank-0 - # paddle.tensor([0, 1]) # Rank-1 + >>> print(recv_t) + >>> # paddle.tensor([1, 2]) # Rank-0 + >>> # paddle.tensor([0, 1]) # Rank-1 """ _check_p2p_op_list(p2p_op_list) group = p2p_op_list[0].group diff --git a/python/paddle/distributed/communication/broadcast.py b/python/paddle/distributed/communication/broadcast.py index dc2b69194272375d945fcd5077e30e846bdab5b0..208158cd2091827505046c768ba76434a95ef7f8 100644 --- a/python/paddle/distributed/communication/broadcast.py +++ b/python/paddle/distributed/communication/broadcast.py @@ -48,18 +48,18 @@ def broadcast(tensor, src, group=None, sync_op=True): Examples: .. code-block:: python - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) - else: - data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) - dist.broadcast(data, src=1) - print(data) - # [[1, 2, 3], [1, 2, 3]] (2 GPUs) + >>> # doctest: +REQUIRES(env: DISTRIBUTED) + >>> import paddle + >>> import paddle.distributed as dist + + >>> dist.init_parallel_env() + >>> if dist.get_rank() == 0: + ... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) + >>> else: + ... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) + >>> dist.broadcast(data, src=1) + >>> print(data) + >>> # [[1, 2, 3], [1, 2, 3]] (2 GPUs) """ return stream.broadcast( tensor, @@ -89,17 +89,17 @@ def broadcast_object_list(object_list, src, group=None): Examples: .. code-block:: python - # required: distributed - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - object_list = [{"foo": [1, 2, 3]}] - else: - object_list = [{"bar": [4, 5, 6]}] - dist.broadcast_object_list(object_list, src=1) - print(object_list) - # [{"bar": [4, 5, 6]}] (2 GPUs) + >>> # doctest: +REQUIRES(env: DISTRIBUTED) + >>> import paddle.distributed as dist + + >>> dist.init_parallel_env() + >>> if dist.get_rank() == 0: + ... object_list = [{"foo": [1, 2, 3]}] + >>> else: + ... object_list = [{"bar": [4, 5, 6]}] + >>> dist.broadcast_object_list(object_list, src=1) + >>> print(object_list) + >>> # [{"bar": [4, 5, 6]}] (2 GPUs) """ assert ( framework.in_dynamic_mode() diff --git a/python/paddle/distributed/communication/gather.py b/python/paddle/distributed/communication/gather.py index 34d44a7e2b4e2e6e1b81d21072934f565463f71e..de0bbcca03458071ada258b008ad2f5af72f5c07 100644 --- a/python/paddle/distributed/communication/gather.py +++ b/python/paddle/distributed/communication/gather.py @@ -38,21 +38,21 @@ def gather(tensor, gather_list=None, dst=0, group=None, sync_op=True): Examples: .. code-block:: python - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - gather_list = [] - if dist.get_rank() == 0: - data = paddle.to_tensor([1, 2, 3]) - dist.gather(data, gather_list, dst=0) - else: - data = paddle.to_tensor([4, 5, 6]) - dist.gather(data1, gather_list, dst=0) - print(gather_list) - # [[1, 2, 3], [4, 5, 6]] (2 GPUs, out for rank 0) - # [] (2 GPUs, out for rank 1) + >>> # doctest: +REQUIRES(env: DISTRIBUTED) + >>> import paddle + >>> import paddle.distributed as dist + + >>> dist.init_parallel_env() + >>> gather_list = [] + >>> if dist.get_rank() == 0: + ... data = paddle.to_tensor([1, 2, 3]) + ... dist.gather(data, gather_list, dst=0) + >>> else: + ... data = paddle.to_tensor([4, 5, 6]) + ... dist.gather(data1, gather_list, dst=0) + >>> print(gather_list) + >>> # [[1, 2, 3], [4, 5, 6]] (2 GPUs, out for rank 0) + >>> # [] (2 GPUs, out for rank 1) """ assert ( framework.in_dynamic_mode()