未验证 提交 99504cbb 编写于 作者: L LiYuRio 提交者: GitHub

move broadcast, reduce, send, recv, reduce_scatter, scatter, alltoall (#47255)

上级 ef67c8a8
...@@ -233,6 +233,14 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast( ...@@ -233,6 +233,14 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
std::vector<phi::DenseTensor>& inputs, std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs, std::vector<phi::DenseTensor>& outputs,
const BroadcastOptions& opts) { const BroadcastOptions& opts) {
return Broadcast(inputs, outputs, opts, true);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Broadcast(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
const BroadcastOptions& opts,
bool sync_op) {
auto root = opts.source_rank; auto root = opts.source_rank;
std::unique_ptr<BroadcastGlooTask> task; std::unique_ptr<BroadcastGlooTask> task;
auto tag = next_tag(); auto tag = next_tag();
...@@ -442,6 +450,14 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce( ...@@ -442,6 +450,14 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
std::vector<phi::DenseTensor>& inputs, std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs, std::vector<phi::DenseTensor>& outputs,
const ReduceOptions& opts) { const ReduceOptions& opts) {
return Reduce(inputs, outputs, opts, true);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Reduce(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
const ReduceOptions& opts,
bool sync_op) {
std::shared_ptr<ReduceGlooTask> task; std::shared_ptr<ReduceGlooTask> task;
auto tag = next_tag(); auto tag = next_tag();
auto context = get_context(); auto context = get_context();
...@@ -497,6 +513,14 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter( ...@@ -497,6 +513,14 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
std::vector<phi::DenseTensor>& in_tensors, std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors, std::vector<phi::DenseTensor>& out_tensors,
const ScatterOptions& opts) { const ScatterOptions& opts) {
return Scatter(in_tensors, out_tensors, opts, true);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupGloo::Scatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ScatterOptions& opts,
bool sync_op) {
std::shared_ptr<ScatterGlooTask> task; std::shared_ptr<ScatterGlooTask> task;
auto tag = next_tag(); auto tag = next_tag();
auto context = get_context(); auto context = get_context();
......
...@@ -113,6 +113,12 @@ class ProcessGroupGloo : public ProcessGroup { ...@@ -113,6 +113,12 @@ class ProcessGroupGloo : public ProcessGroup {
std::vector<phi::DenseTensor>& outputs, std::vector<phi::DenseTensor>& outputs,
const BroadcastOptions& = BroadcastOptions()) override; const BroadcastOptions& = BroadcastOptions()) override;
std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
const BroadcastOptions& opts,
bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> AllReduce( std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& inputs, std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs, std::vector<phi::DenseTensor>& outputs,
...@@ -131,11 +137,23 @@ class ProcessGroupGloo : public ProcessGroup { ...@@ -131,11 +137,23 @@ class ProcessGroupGloo : public ProcessGroup {
std::vector<phi::DenseTensor>& in_tensors, std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) override; std::vector<phi::DenseTensor>& out_tensors) override;
std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ReduceOptions& opts,
bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Reduce( std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& in_tensors, std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors, std::vector<phi::DenseTensor>& out_tensors,
const ReduceOptions& opts) override; const ReduceOptions& opts) override;
std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
const ScatterOptions&,
bool sync_op) override;
std::shared_ptr<ProcessGroup::Task> Scatter( std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>& in_tensors, std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors, std::vector<phi::DenseTensor>& out_tensors,
......
...@@ -27,31 +27,33 @@ from paddle.distributed.fleet.dataset import InMemoryDataset # noqa: F401 ...@@ -27,31 +27,33 @@ from paddle.distributed.fleet.dataset import InMemoryDataset # noqa: F401
from paddle.distributed.fleet.dataset import QueueDataset # noqa: F401 from paddle.distributed.fleet.dataset import QueueDataset # noqa: F401
from paddle.distributed.fleet.base.topology import ParallelMode # noqa: F401 from paddle.distributed.fleet.base.topology import ParallelMode # noqa: F401
from .collective import broadcast # noqa: F401
from .collective import all_reduce # noqa: F401
from .collective import reduce # noqa: F401
from .collective import all_gather # noqa: F401 from .collective import all_gather # noqa: F401
from .collective import all_gather_object # noqa: F401 from .collective import all_gather_object # noqa: F401
from .collective import scatter # noqa: F401
from .collective import barrier # noqa: F401 from .collective import barrier # noqa: F401
from .collective import ReduceOp # noqa: F401
from .collective import split # noqa: F401 from .collective import split # noqa: F401
from .collective import new_group # noqa: F401 from .collective import new_group # noqa: F401
from .collective import alltoall # noqa: F401
from .collective import recv # noqa: F401
from .collective import get_group # noqa: F401
from .collective import send # noqa: F401
from .collective import wait # noqa: F401 from .collective import wait # noqa: F401
from .collective import is_initialized # noqa: F401
from .collective import destroy_process_group # noqa: F401
from .collective import alltoall_single # noqa: F401
from .collective import isend # noqa: F401
from .collective import irecv # noqa: F401
from .collective import batch_isend_irecv # noqa: F401
from .collective import P2POp # noqa: F401
from .collective import reduce_scatter # noqa: F401
from .communication import stream # noqa: F401 from .communication import (
stream,
ReduceOp,
all_reduce,
alltoall,
alltoall_single,
broadcast,
reduce,
send,
scatter,
isend,
recv,
irecv,
batch_isend_irecv,
P2POp,
reduce_scatter,
is_initialized,
destroy_process_group,
get_group,
) # noqa: F401
from .auto_parallel import shard_op # noqa: F401 from .auto_parallel import shard_op # noqa: F401
from .auto_parallel import shard_tensor # noqa: F401 from .auto_parallel import shard_tensor # noqa: F401
...@@ -109,5 +111,4 @@ __all__ = [ # noqa ...@@ -109,5 +111,4 @@ __all__ = [ # noqa
"irecv", "irecv",
"reduce_scatter", "reduce_scatter",
"rpc", "rpc",
"stream",
] ]
...@@ -11,3 +11,33 @@ ...@@ -11,3 +11,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from .all_reduce import all_reduce
from .broadcast import broadcast
from .reduce import reduce, ReduceOp
from .send import send, isend
from .recv import recv, irecv
from .scatter import scatter
from .batch_isend_irecv import batch_isend_irecv, P2POp
from .reduce_scatter import reduce_scatter
from .all_to_all import alltoall, alltoall_single
from .group import is_initialized, destroy_process_group, get_group
__all__ = [
"ReduceOp",
"all_reduce",
"alltoall",
"alltoall_single",
"broadcast",
"reduce",
"send",
"scatter",
"isend",
"recv",
"irecv",
"batch_isend_irecv",
"P2POp",
"reduce_scatter",
"is_initialized",
"destroy_process_group",
"get_group",
]
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
import paddle import paddle
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed.communication import stream as stream import paddle.distributed.communication.stream as stream
from paddle.distributed.communication.reduce import ReduceOp from paddle.distributed.communication.reduce import ReduceOp
...@@ -63,6 +63,8 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): ...@@ -63,6 +63,8 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True):
) )
# code below will be removed after we remove the old dygraph # code below will be removed after we remove the old dygraph
if group is not None and not group.is_member():
return
use_calc_stream = sync_op use_calc_stream = sync_op
ring_id = 0 if group is None else group.id ring_id = 0 if group is None else group.id
if op == ReduceOp.SUM: if op == ReduceOp.SUM:
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
import paddle.distributed.communication.stream as stream
def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True):
"""
Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list.
As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1.
Through alltoall operator, the 0_0 in GPU0 will be sent to GPU0 and 0_1 to GPU1, 1_0 in GPU1 sent to GPU0 and 1_1 to GPU1.
Finally the out_tensor_list in GPU0 includes 0_0 and 1_0, and GPU1 includes 0_1 and 1_1.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/alltoall.png
:width: 800
:alt: alltoall
:align: center
Args:
in_tensor_list (List[Tensor]): List of tensors to scatter one per rank. The data type of each tensor
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
out_tensor_list (List[Tensor]): List of tensors to be gathered one per rank. The data type of each tensor should be the same as the input tensors.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Return a task object.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
out_tensor_list = []
if dist.get_rank() == 0:
data1 = paddle.to_tensor([[1, 2, 3], [4, 5, 6]])
data2 = paddle.to_tensor([[7, 8, 9], [10, 11, 12]])
else:
data1 = paddle.to_tensor([[13, 14, 15], [16, 17, 18]])
data2 = paddle.to_tensor([[19, 20, 21], [22, 23, 24]])
dist.alltoall([data1, data2], out_tensor_list)
print(out_tensor_list)
# [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0)
# [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1)
"""
if not framework._in_legacy_dygraph():
return stream.alltoall(
out_tensor_list, in_tensor_list, group, sync_op, False
)
# code below will be removed after we remove the old dygraph
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
temp = paddle.concat(in_tensor_list, axis=0)
nranks = len(in_tensor_list)
use_calc_stream = sync_op
out = paddle._legacy_C_ops.alltoall(
temp, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id
)
out_tensor_list.extend(paddle.split(out, nranks, 0))
def alltoall_single(
in_tensor,
out_tensor,
in_split_sizes=None,
out_split_sizes=None,
group=None,
sync_op=True,
):
"""
Scatter a single input tensor to all participators and gather the received tensors in out_tensor.
Note:
``alltoall_single`` is only supported in eager mode.
Args:
in_tensor (Tensor): Input tensor. The data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
out_tensor (Tensor): Output Tensor. The data type should be the same as the data type of the input Tensor.
in_split_sizes (list[int], optional): Split sizes of ``in_tensor`` for dim[0]. If not given, dim[0] of ``in_tensor``
must be divisible by group size and ``in_tensor`` will be scattered averagely to all participators. Default: None.
out_split_sizes (list[int], optional): Split sizes of ``out_tensor`` for dim[0]. If not given, dim[0] of ``out_tensor``
must be divisible by group size and ``out_tensor`` will be gathered averagely from all participators. Default: None.
group (Group, optional): The group instance return by ``new_group`` or None for global default group. Default: None.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Return a task object.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
rank = dist.get_rank()
size = dist.get_world_size()
# case 1 (2 GPUs)
data = paddle.arange(2, dtype='int64') + rank * 2
# data for rank 0: [0, 1]
# data for rank 1: [2, 3]
output = paddle.empty([2], dtype='int64')
dist.alltoall_single(data, output)
print(output)
# output for rank 0: [0, 2]
# output for rank 1: [1, 3]
# case 2 (2 GPUs)
in_split_sizes = [i + 1 for i in range(size)]
# in_split_sizes for rank 0: [1, 2]
# in_split_sizes for rank 1: [1, 2]
out_split_sizes = [rank + 1 for i in range(size)]
# out_split_sizes for rank 0: [1, 1]
# out_split_sizes for rank 1: [2, 2]
data = paddle.ones([sum(in_split_sizes), size], dtype='float32') * rank
# data for rank 0: [[0., 0.], [0., 0.], [0., 0.]]
# data for rank 1: [[1., 1.], [1., 1.], [1., 1.]]
output = paddle.empty([(rank + 1) * size, size], dtype='float32')
group = dist.new_group([0, 1])
task = dist.alltoall_single(data,
output,
in_split_sizes,
out_split_sizes,
sync_op=False,
group=group)
task.wait()
print(output)
# output for rank 0: [[0., 0.], [1., 1.]]
# output for rank 1: [[0., 0.], [0., 0.], [1., 1.], [1., 1.]]
"""
if not framework._in_legacy_dygraph():
return stream.alltoall_single(
out_tensor,
in_tensor,
out_split_sizes,
in_split_sizes,
group,
sync_op,
False,
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import paddle.distributed as dist
import paddle.fluid.core as core
import paddle.fluid.framework as framework
from paddle.distributed.communication.group import (
_get_global_group,
_warn_cur_rank_not_in_group,
)
class P2POp(object):
"""
A class that makes point-to-point operations for "batch_isend_irecv".
This class creates the type of P2P operation, communication buffer, peer rank,
Group. Instances of this class will be passed to
``paddle.distributed.batch_isend_irecv`` for point-to-point communication.
Args:
op (callable): A function to send data to or receive data from a peer process.
The type of ``op`` is either ``paddle.distributed.isend`` or ``paddle.distributed.irecv``.
tensor (Tensor): Tensor to send or receive.
peer (int): The destination or source rank.
group (Group, optional): The group instance return by new_group or None for global
default group. Default: None.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
rank = dist.get_rank()
world_size = dist.get_world_size()
send_t = paddle.arange(2) + rank
# paddle.tensor([0, 1]) # Rank-0
# paddle.tensor([1, 2]) # Rank-1
recv_t = paddle.empty(shape=[2], dtype=send_t.dtype)
send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size)
recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size)
"""
def __init__(self, op, tensor, peer, group=None):
if op not in [dist.isend, dist.irecv]:
raise RuntimeError(
"Invalid ``op`` function. Expected ``op`` "
"to be of type ``paddle.distributed.isend`` or "
"``paddle.distributed.irecv``."
)
self.op = op
self.tensor = tensor
self.peer = peer
self.group = _get_global_group() if group is None else group
@contextlib.contextmanager
def _with_batch_p2p_guard(backend):
if backend == "NCCL":
core.ProcessGroupNCCL.group_start()
try:
yield
finally:
if backend == "NCCL":
core.ProcessGroupNCCL.group_end()
def _check_p2p_op_list(p2p_op_list):
"""
Helper to check that the ``p2p_op_list`` is a list of P2POp instances and
all ops use the same backend.
"""
if not isinstance(p2p_op_list, list) or not all(
isinstance(p2p_op, P2POp) for p2p_op in p2p_op_list
):
raise RuntimeError(
"Invalid ``p2p_op_list``. Each op is expected to "
"to be of type ``paddle.distributed.P2POp``."
)
backend = p2p_op_list[0].group.backend
if not all(backend == p2p_op.group.backend for p2p_op in p2p_op_list):
raise RuntimeError("All groups need to use the same backend.")
def batch_isend_irecv(p2p_op_list):
"""
Send or Receive a batch of tensors asynchronously and return a list of requests.
Process each of the point-to-point operations in ``p2p_op_list`` and return the
corresponding tasks. NCCL are currently supported.
Args:
p2p_op_list (List[P2POp]): A list of point-to-point operations(type of each operator is
``paddle.distributed.P2POp``). The order of the isend/irecv in the list
matters and it needs to match with corresponding isend/irecv on the
remote end.
Returns:
A list of distributed tasks returned by calling the corresponding
op in the op_list.
Warning:
This API only supports the dygraph mode.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
rank = dist.get_rank()
world_size = dist.get_world_size()
send_t = paddle.arange(2) + rank
# paddle.tensor([0, 1]) # Rank-0
# paddle.tensor([1, 2]) # Rank-1
recv_t = paddle.empty(shape=[2], dtype=send_t.dtype)
send_op = dist.P2POp(dist.isend, send_t, (rank + 1) % world_size)
recv_op = dist.P2POp(dist.irecv, recv_t, (rank - 1 + world_size) % world_size)
tasks = dist.batch_isend_irecv([send_op, recv_op])
for task in tasks:
task.wait()
print(recv_t)
# paddle.tensor([1, 2]) # Rank-0
# paddle.tensor([0, 1]) # Rank-1
"""
_check_p2p_op_list(p2p_op_list)
group = p2p_op_list[0].group
if _warn_cur_rank_not_in_group(group):
return
if framework.in_dygraph_mode():
group = _get_global_group() if group is None else group
backend = group.backend
tasks = []
with _with_batch_p2p_guard(backend):
for p2p_op in p2p_op_list:
op = p2p_op.op
tensor = p2p_op.tensor
peer = p2p_op.peer
comm_group = p2p_op.group
task = op(tensor, peer, comm_group)
if task is not None:
tasks.append(task)
return tasks
else:
raise RuntimeError("Don't support static graph mode currently.")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
import paddle.distributed.communication.stream as stream
def broadcast(tensor, src, group=None, sync_op=True):
"""
Broadcast a tensor from the source to all others.
As shown below, one process is started with a GPU and GPU0 owns data 0. Through broadcast operator,
data 0 will be sent to all GPUs from GPU0.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/broadcast.png
:width: 800
:alt: broadcast
:align: center
Args:
tensor (Tensor): The tensor to send if current rank is the source, or the tensor to receive otherwise. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
src (int): The source rank in global view.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Return a task object.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
dist.broadcast(data, src=1)
print(data)
# [[1, 2, 3], [1, 2, 3]] (2 GPUs)
"""
if not framework._in_legacy_dygraph():
return stream.broadcast(
tensor,
src,
group=group,
sync_op=sync_op,
use_calc_stream=False,
)
# code below will be removed after we remove the old dygraph
if group is not None and not group.is_member():
return
use_calc_stream = sync_op
ring_id = 0 if group is None else group.id
gsrc = src if group is None else group.get_group_rank(src)
assert gsrc >= 0, "src rank out of group, need global rank"
return paddle._legacy_C_ops.c_broadcast(
tensor,
tensor,
'root',
gsrc,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
)
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import warnings
import paddle.distributed as dist
class Group: class Group:
""" """
...@@ -50,6 +53,10 @@ class Group: ...@@ -50,6 +53,10 @@ class Group:
def world_size(self): def world_size(self):
return self._world_size return self._world_size
@property
def backend(self):
return self._pg.name()
@property @property
def id(self): def id(self):
return self._id return self._id
...@@ -94,3 +101,129 @@ def _add_new_group(group): ...@@ -94,3 +101,129 @@ def _add_new_group(group):
"The group with id {} already exist.".format(group.id) "The group with id {} already exist.".format(group.id)
) )
_GroupManager.group_map_by_id[group.id] = group _GroupManager.group_map_by_id[group.id] = group
def _is_global_group(group):
return group.id == _GroupManager.global_group_id
def _warn_cur_rank_not_in_group(group):
global_rank = dist.get_rank()
if group and not group.is_member():
warnings.warn(
"Current global rank {} is not in group {}".format(
global_rank, group.name
)
)
return True
return False
def _get_or_throw_group_rank(global_rank, group):
group_rank = group.get_group_rank(global_rank)
assert (
group_rank >= 0
), "The input rank {} can not be found inside the group {}".format(
global_rank, group.name
)
return group_rank
def is_initialized():
"""
Check whether the distributed environment has been initialized
Returns:
`True` if distributed environment has been initialized, otherwise `False`.
Warning:
This API only supports the dygraph mode.
Examples:
.. code-block:: python
# required: distributed
import paddle
print(paddle.distributed.is_initialized())
# False
paddle.distributed.init_parallel_env()
print(paddle.distributed.is_initialized())
# True
"""
return _GroupManager.global_group_id in _GroupManager.group_map_by_id
def destroy_process_group(group=None):
"""
Destroy a given group for communication
Args:
group (Group, optional): The group to be destroyed. All of process groups, including
the default group, will be destroyed and the distributed
environment will be deinitialized.
Returns : None
Warning:
This API only supports the dygraph mode.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
group = dist.new_group([0, 1])
dist.destroy_process_group(group)
print(dist.is_initialized())
# True
dist.destroy_process_group()
print(dist.is_initialized())
# False
"""
group = _get_global_group() if group is None else group
assert (
group.id in _GroupManager.group_map_by_id
), "Destroy group with id {} is invalid.".format(group.id)
if _is_global_group(group):
_GroupManager.group_map_by_id.clear()
else:
del _GroupManager.group_map_by_id[group.id]
def get_group(id=0):
"""
Get group instance by group id.
Args:
id (int): the group id. Default value is 0.
Returns:
Group: the group instance.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
gid = paddle.distributed.new_group([2,4,6])
paddle.distributed.get_group(gid.id)
"""
if id in _GroupManager.group_map_by_id:
return _GroupManager.group_map_by_id[id]
warnings.warn("Group {} is not initialized.".format(id))
return None
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
import paddle.distributed.communication.stream as stream
def recv(tensor, src=0, group=None, sync_op=True):
"""
Receive a tensor to the sender.
Args:
tensor (Tensor): The tensor to receive. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
src (int): The source rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Return a task object.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([7, 8, 9])
dist.send(data, dst=1)
else:
data = paddle.to_tensor([1, 2, 3])
dist.recv(data, src=0)
print(data)
# [7, 8, 9] (2 GPUs)
"""
if not framework._in_legacy_dygraph():
return stream.recv(
tensor, src=src, group=group, sync_op=sync_op, use_calc_stream=False
)
# code below will be removed after we remove the old dygraph
if group is not None and not group.is_member():
return
use_calc_stream = sync_op
gsrc = src if group is None else group.get_group_rank(src)
ring_id = 0 if group is None else group.id
return paddle._legacy_C_ops.recv_v2(
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
'peer',
src,
'dtype',
tensor.dtype,
'out_shape',
tensor.shape,
)
def irecv(tensor, src=None, group=None):
"""
Receive a tensor to the sender.
Args:
tensor (Tensor): The Tensor to receive. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
src (int): The source rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([7, 8, 9])
task = dist.isend(data, dst=1)
else:
data = paddle.to_tensor([1, 2, 3])
task = dist.irecv(data, src=0)
task.wait()
print(data)
# [7, 8, 9] (2 GPUs)
"""
return recv(tensor, src, group, sync_op=False)
...@@ -12,8 +12,10 @@ ...@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import paddle
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
import paddle.fluid.core as core import paddle.fluid.core as core
import paddle.distributed.communication.stream as stream
class ReduceOp: class ReduceOp:
...@@ -66,12 +68,121 @@ def _get_reduce_op(reduce_op, func_name): ...@@ -66,12 +68,121 @@ def _get_reduce_op(reduce_op, func_name):
return core.ReduceOp.PRODUCT return core.ReduceOp.PRODUCT
else: else:
if reduce_op == ReduceOp.SUM: if reduce_op == ReduceOp.SUM:
return 'c_allreduce_sum' return 'c_{}_sum'.format(func_name)
elif reduce_op == ReduceOp.MAX: elif reduce_op == ReduceOp.MAX:
return 'c_allreduce_max' return 'c_{}_max'.format(func_name)
elif reduce_op == ReduceOp.MIN: elif reduce_op == ReduceOp.MIN:
return 'c_allreduce_min' return 'c_{}_min'.format(func_name)
elif reduce_op == ReduceOp.PROD: elif reduce_op == ReduceOp.PROD:
return 'c_allreduce_prod' return 'c_{}_prod'.format(func_name)
else:
return 'c_{}'.format(func_name)
raise ValueError("Unknown reduce_op type for {}.".format(func_name)) raise ValueError("Unknown reduce_op type for {}.".format(func_name))
def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True):
"""
Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented
by its group rank. The destination of the reduce operator is GPU0 and the process is sum. Through reduce operator,
the GPU0 will owns the sum of all data from all GPUs.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/reduce.png
:width: 800
:alt: reduce
:align: center
Args:
tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
dst (int): The destination rank id.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Return a task object.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
else:
data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
dist.reduce(data, dst=0)
print(data)
# [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0)
# [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1)
"""
if not framework._in_legacy_dygraph():
return stream.reduce(
tensor,
dst=dst,
op=op,
group=group,
sync_op=sync_op,
use_calc_stream=False,
)
# code below will be removed after we remove the old dygraph
if group is not None and not group.is_member():
return
use_calc_stream = sync_op
ring_id = 0 if group is None else group.id
gdst = dst if group is None else group.get_group_rank(dst)
assert gdst >= 0, "dst rank out of group, need global rank"
if op == ReduceOp.SUM:
return paddle._legacy_C_ops.c_reduce_sum(
tensor,
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
'root_id',
gdst,
)
elif op == ReduceOp.MAX:
return paddle._legacy_C_ops.c_reduce_max(
tensor,
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
'root_id',
gdst,
)
elif op == ReduceOp.MIN:
return paddle._legacy_C_ops.c_reduce_min(
tensor,
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
'root_id',
gdst,
)
elif op == ReduceOp.PROD:
return paddle._legacy_C_ops.c_reduce_prod(
tensor,
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
'root_id',
gdst,
)
else:
raise ValueError("Unknown parameter: {}.".format(op))
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.framework as framework
import paddle.distributed.communication.stream as stream
from paddle.distributed.communication.reduce import ReduceOp
from paddle.distributed.communication.stream.reduce_scatter import (
_reduce_scatter_base as _reduce_scatter_base_stream,
)
def reduce_scatter(
tensor, tensor_list, op=ReduceOp.SUM, group=None, sync_op=True
):
"""
Reduces, then scatters a list of tensors to all processes in a group
Args:
tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
tensor_list (List[Tensor]]): List of tensors to reduce and scatter. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([0, 1])
data2 = paddle.to_tensor([2, 3])
else:
data1 = paddle.to_tensor([4, 5])
data2 = paddle.to_tensor([6, 7])
dist.reduce_scatter(data1, [data1, data2])
print(data1)
# [4, 6] (2 GPUs, out for rank 0)
# [8, 10] (2 GPUs, out for rank 1)
"""
if not framework._in_legacy_dygraph():
return stream.reduce_scatter(
tensor,
tensor_list,
op=op,
group=group,
sync_op=sync_op,
use_calc_stream=False,
)
def _reduce_scatter_base(
output, input, op=ReduceOp.SUM, group=None, sync_op=True
):
"""
Reduces, then scatters a flattened tensor to all processes in a group.
Args:
output (Tensor): Output tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
input (Tensor): Input tensor that is of size output tensor size times world size. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM.
group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Async task handle, if sync_op is set to False.
None, if sync_op or if not part of the group.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
rank = dist.get_rank()
data = paddle.arange(4) + rank
# [0, 1, 2, 3] (2 GPUs, for rank 0)
# [1, 2, 3, 4] (2 GPUs, for rank 1)
output = paddle.empty(shape=[2], dtype=data.dtype)
dist.collective._reduce_scatter_base(output, data)
print(output)
# [1, 3] (2 GPUs, out for rank 0)
# [5, 7] (2 GPUs, out for rank 1)
"""
if not framework._in_legacy_dygraph():
return _reduce_scatter_base_stream(
output,
input,
op=op,
group=group,
sync_op=sync_op,
use_calc_stream=False,
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
import paddle.distributed.communication.stream as stream
from paddle.distributed.communication.group import _get_global_group
def scatter(tensor, tensor_list=None, src=0, group=None, sync_op=True):
"""
Scatter a tensor to all participators. As shown below, one process is started with a GPU and the source of the scatter
is GPU0. Through scatter operator, the data in GPU0 will be sent to all GPUs averagely.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/scatter.png
:width: 800
:alt: scatter
:align: center
Args:
tensor (Tensor): The output Tensor. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
tensor_list (list|tuple): A list/tuple of Tensors to scatter. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16. Default value is None.
src (int): The source rank id. Default value is 0.
group (Group, optional): The group instance return by new_group or None for global default group.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
None.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data1 = paddle.to_tensor([7, 8, 9])
data2 = paddle.to_tensor([10, 11, 12])
dist.scatter(data1, src=1)
else:
data1 = paddle.to_tensor([1, 2, 3])
data2 = paddle.to_tensor([4, 5, 6])
dist.scatter(data1, tensor_list=[data1, data2], src=1)
print(data1, data2)
# [1, 2, 3] [10, 11, 12] (2 GPUs, out for rank 0)
# [4, 5, 6] [4, 5, 6] (2 GPUs, out for rank 1)
"""
if not framework._in_legacy_dygraph():
return stream.scatter(tensor, tensor_list, src, group, sync_op)
# code below will be removed after we remove the old dygraph
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
gsrc = src if group is None else group.get_group_rank(src)
rank = _get_global_group().rank if group is None else group.rank
nranks = _get_global_group().nranks if group is None else group.nranks
assert gsrc >= 0, "src rank out of group, need global rank"
if rank != gsrc:
tensor_list = []
for _ in range(nranks):
tensor_list.append(tensor)
temp = paddle.concat(tensor_list, axis=0)
use_calc_stream = sync_op
return framework._legacy_C_ops.c_scatter(
temp,
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
'nranks',
nranks,
'root',
gsrc,
)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid.framework as framework
import paddle.distributed.communication.stream as stream
def send(tensor, dst=0, group=None, sync_op=True):
"""
Send a tensor to the receiver.
Args:
tensor (Tensor): The Tensor to send. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
dst (int): The destination rank id.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
sync_op (bool, optional): Whether this op is a sync op. The default value is True.
Returns:
Return a task object.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([7, 8, 9])
dist.send(data, dst=1)
else:
data = paddle.to_tensor([1, 2, 3])
dist.recv(data, src=0)
print(data)
# [7, 8, 9] (2 GPUs)
"""
if not framework._in_legacy_dygraph():
return stream.send(
tensor, dst=dst, group=group, sync_op=sync_op, use_calc_stream=False
)
# code below will be removed after we remove the old dygraph
if group is not None and not group.is_member():
return
use_calc_stream = sync_op
gdst = dst if group is None else group.get_group_rank(dst)
assert gdst >= 0, "dst rank out of group, need global rank"
ring_id = 0 if group is None else group.id
return paddle._legacy_C_ops.send_v2(
tensor,
'use_calc_stream',
use_calc_stream,
'ring_id',
ring_id,
'peer',
gdst,
)
def isend(tensor, dst, group=None):
"""
Send tensor asynchronously
Args:
tensor (Tensor): The Tensor to send. Its data type
should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
dst (int): The destination rank.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
if dist.get_rank() == 0:
data = paddle.to_tensor([7, 8, 9])
task = dist.isend(data, dst=1)
else:
data = paddle.to_tensor([1, 2, 3])
task = dist.irecv(data, src=0)
task.wait()
print(data)
# [7, 8, 9] (2 GPUs)
"""
return send(tensor, dst, group, sync_op=False)
...@@ -14,8 +14,7 @@ ...@@ -14,8 +14,7 @@
from .all_gather import all_gather from .all_gather import all_gather
from .all_reduce import all_reduce from .all_reduce import all_reduce
from .alltoall import alltoall from .all_to_all import alltoall, alltoall_single
from .alltoall_single import alltoall_single
from .broadcast import broadcast from .broadcast import broadcast
from .reduce import reduce from .reduce import reduce
from .reduce_scatter import reduce_scatter from .reduce_scatter import reduce_scatter
......
...@@ -16,11 +16,14 @@ import paddle.fluid.framework as framework ...@@ -16,11 +16,14 @@ import paddle.fluid.framework as framework
import paddle.fluid.data_feeder as data_feeder import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp
from paddle.distributed.communication.group import _get_global_group from paddle.distributed.communication.group import (
_get_global_group,
_warn_cur_rank_not_in_group,
)
def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream): def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream):
op_type = _get_reduce_op(op, "all_reduce") op_type = _get_reduce_op(op, "allreduce")
group = _get_global_group() if group is None else group group = _get_global_group() if group is None else group
if use_calc_stream: if use_calc_stream:
...@@ -50,7 +53,7 @@ def _all_reduce_in_static_mode(tensor, op, group, sync_op, use_calc_stream): ...@@ -50,7 +53,7 @@ def _all_reduce_in_static_mode(tensor, op, group, sync_op, use_calc_stream):
'all_reduce', 'all_reduce',
) )
op_type = _get_reduce_op(op, "all_reduce") op_type = _get_reduce_op(op, "allreduce")
ring_id = 0 if group is None else group.id ring_id = 0 if group is None else group.id
if not isinstance(ring_id, int): if not isinstance(ring_id, int):
...@@ -107,10 +110,8 @@ def all_reduce( ...@@ -107,10 +110,8 @@ def all_reduce(
out = data.numpy() out = data.numpy()
# [[5, 7, 9], [5, 7, 9]] # [[5, 7, 9], [5, 7, 9]]
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
...@@ -122,6 +123,7 @@ def all_reduce( ...@@ -122,6 +123,7 @@ def all_reduce(
tensor, op, group, sync_op, use_calc_stream tensor, op, group, sync_op, use_calc_stream
) )
else: else:
assert group is None, "Group can not be used in static mode for now."
return _all_reduce_in_static_mode( return _all_reduce_in_static_mode(
tensor, op, group, sync_op, use_calc_stream tensor, op, group, sync_op, use_calc_stream
) )
...@@ -14,7 +14,13 @@ ...@@ -14,7 +14,13 @@
import paddle import paddle
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed import collective import paddle.distributed as dist
import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.group import (
_get_global_group,
_warn_cur_rank_not_in_group,
)
def _check_tensor_shape(tensor, shape, nranks=1): def _check_tensor_shape(tensor, shape, nranks=1):
...@@ -34,10 +40,9 @@ def _check_tensor_list_shape(tensor_list, shape, nranks=1): ...@@ -34,10 +40,9 @@ def _check_tensor_list_shape(tensor_list, shape, nranks=1):
) )
def _alltoall_tensor_in_dygraph( def _all_to_all_tensor_in_dygraph(
out_tensor, in_tensor, group, sync_op, use_calc_stream out_tensor, in_tensor, group, sync_op, use_calc_stream
): ):
group = collective._get_default_group() if group is None else group
_check_tensor_shape(out_tensor, in_tensor.shape, group.nranks) _check_tensor_shape(out_tensor, in_tensor.shape, group.nranks)
...@@ -53,11 +58,9 @@ def _alltoall_tensor_in_dygraph( ...@@ -53,11 +58,9 @@ def _alltoall_tensor_in_dygraph(
return task return task
def _alltoall_in_dygraph( def _all_to_all_in_dygraph(
out_tensor_list, in_tensor_list, group, sync_op, use_calc_stream out_tensor_list, in_tensor_list, group, sync_op, use_calc_stream
): ):
group = collective._get_default_group() if group is None else group
if len(in_tensor_list) == 0: if len(in_tensor_list) == 0:
raise RuntimeError("The input tensor_list should not be empty.") raise RuntimeError("The input tensor_list should not be empty.")
...@@ -84,6 +87,59 @@ def _alltoall_in_dygraph( ...@@ -84,6 +87,59 @@ def _alltoall_in_dygraph(
return task return task
def _all_to_all_in_static_mode(
out_tensor_or_tensor_list,
in_tensor_or_tensor_list,
group,
sync_op,
use_calc_stream,
):
op_type = 'alltoall'
ring_id = 0 if group is None else group.id
nranks = dist.get_world_size()
helper = layer_helper.LayerHelper(op_type, **locals())
in_tensor = in_tensor_or_tensor_list
if isinstance(in_tensor_or_tensor_list, list):
if len(in_tensor_or_tensor_list) == 0:
raise RuntimeError("The input tensor_list should not be empty.")
in_tensor = paddle.concat(in_tensor_or_tensor_list, axis=0)
out_tensor = out_tensor_or_tensor_list
if isinstance(out_tensor_or_tensor_list, list):
if len(out_tensor_or_tensor_list) != 0:
raise ValueError(
"The 'out_tensor_list' for all_to_all " "must be an empty list."
)
out_tensor = helper.create_variable_for_type_inference(
dtype=in_tensor.dtype
)
data_feeder.check_variable_and_dtype(
in_tensor,
'in_tensor',
['float16', 'float32', 'float64', 'int32', 'int64'],
'all_to_all',
)
helper.append_op(
type=op_type,
inputs={'X': [in_tensor]},
outputs={'Out': [out_tensor]},
attrs={
'ring_id': ring_id,
'use_calc_stream': sync_op,
},
)
# NOTE(liyurui): If the argument `out_tensor_or_tensor_list` is a tensor_list,
# we need to split the result. So we should wait the result of all_to_all
# before split if the communication is not on calc stream.
if isinstance(out_tensor_or_tensor_list, list):
if not sync_op:
dist.wait(out_tensor, use_calc_stream=False)
out_tensor_or_tensor_list.extend(paddle.split(out_tensor, nranks, 0))
return None
def alltoall( def alltoall(
out_tensor_or_tensor_list, out_tensor_or_tensor_list,
in_tensor_or_tensor_list, in_tensor_or_tensor_list,
...@@ -109,9 +165,6 @@ def alltoall( ...@@ -109,9 +165,6 @@ def alltoall(
Returns: Returns:
Return a task object. Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -133,10 +186,8 @@ def alltoall( ...@@ -133,10 +186,8 @@ def alltoall(
# [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0)
# [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
...@@ -149,10 +200,11 @@ def alltoall( ...@@ -149,10 +200,11 @@ def alltoall(
raise RuntimeError("The input should be specified.") raise RuntimeError("The input should be specified.")
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
group = _get_global_group() if group is None else group
out_is_tensor = paddle.is_tensor(out_tensor_or_tensor_list) out_is_tensor = paddle.is_tensor(out_tensor_or_tensor_list)
in_is_tensor = paddle.is_tensor(in_tensor_or_tensor_list) in_is_tensor = paddle.is_tensor(in_tensor_or_tensor_list)
if out_is_tensor and in_is_tensor: if out_is_tensor and in_is_tensor:
return _alltoall_tensor_in_dygraph( return _all_to_all_tensor_in_dygraph(
out_tensor_or_tensor_list, out_tensor_or_tensor_list,
in_tensor_or_tensor_list, in_tensor_or_tensor_list,
group, group,
...@@ -160,7 +212,7 @@ def alltoall( ...@@ -160,7 +212,7 @@ def alltoall(
use_calc_stream, use_calc_stream,
) )
elif not out_is_tensor and not in_is_tensor: elif not out_is_tensor and not in_is_tensor:
return _alltoall_in_dygraph( return _all_to_all_in_dygraph(
out_tensor_or_tensor_list, out_tensor_or_tensor_list,
in_tensor_or_tensor_list, in_tensor_or_tensor_list,
group, group,
...@@ -171,7 +223,137 @@ def alltoall( ...@@ -171,7 +223,137 @@ def alltoall(
raise RuntimeError( raise RuntimeError(
"The output and input should be both tensor or tensor list." "The output and input should be both tensor or tensor list."
) )
else:
assert group is None, "Group can not be used in static mode for now."
return _all_to_all_in_static_mode(
out_tensor_or_tensor_list,
in_tensor_or_tensor_list,
group,
sync_op,
use_calc_stream,
)
def _alltoall_single_in_dygraph(
out_tensor,
in_tensor,
out_split_sizes,
in_split_sizes,
group,
sync_op,
use_calc_stream,
):
if out_split_sizes is None:
out_split_sizes = []
if in_split_sizes is None:
in_split_sizes = []
if use_calc_stream:
return group.process_group.alltoall_single_on_calc_stream(
in_tensor, out_tensor, in_split_sizes, out_split_sizes
)
task = group.process_group.alltoall_single(
in_tensor, out_tensor, in_split_sizes, out_split_sizes, sync_op
)
if sync_op:
task.wait()
return task
def alltoall_single(
out_tensor,
in_tensor,
out_split_sizes=None,
in_split_sizes=None,
group=None,
sync_op=True,
use_calc_stream=False,
):
"""
Split and Scatter the splitted input tensor to the out tensor across devices.
Args:
out_tensor(Tensor): The output tensor. Its data type should be the same as the input.
in_tensor (Tensor): The input tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool.
out_split_sizes (List[int], optional): Split sizes of out_tensor for dim[0]. If not given, dim[0] of out_tensor must be divisible
by group size and out_tensor will be gathered averagely from all participators. If none is given, use a empty list as default.
in_split_sizes (List[int], optional): Split sizes of in_tensor for dim[0]. If not given, dim[0] of in_tensor must be divisible
by group size and in_tensor will be scattered averagely to all participators. If none is given, use a empty list as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
# case 1
output = paddle.empty([2], dtype="int64")
if local_rank == 0:
data = paddle.to_tensor([0, 1])
else:
data = paddle.to_tensor([2, 3])
task = dist.stream.alltoall_single(output, data, sync_op=False)
task.wait()
out = output.numpy()
# [0, 2] (2 GPUs, out for rank 0)
# [1, 3] (2 GPUs, out for rank 1)
# case 2
size = dist.get_world_size()
output = paddle.empty([(local_rank + 1) * size, size], dtype='float32')
if local_rank == 0:
data = paddle.to_tensor([[0., 0.], [0., 0.], [0., 0.]])
else:
data = paddle.to_tensor([[1., 1.], [1., 1.], [1., 1.]])
out_split_sizes = [local_rank + 1 for i in range(size)]
in_split_sizes = [i + 1 for i in range(size)]
task = dist.stream.alltoall_single(output,
data,
out_split_sizes,
in_split_sizes,
sync_op=False)
task.wait()
out = output.numpy()
# [[0., 0.], [1., 1.]] (2 GPUs, out for rank 0)
# [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] (2 GPUs, out for rank 1)
"""
if _warn_cur_rank_not_in_group(group):
return
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior."
)
if framework.in_dygraph_mode():
group = _get_global_group() if group is None else group
return _alltoall_single_in_dygraph(
out_tensor,
in_tensor,
out_split_sizes,
in_split_sizes,
group,
sync_op,
use_calc_stream,
)
raise RuntimeError( raise RuntimeError(
"paddle.distributed.stream.alltoall is only supported in dygraph mode now." "paddle.distributed.stream.alltoall_single is only supported in dygraph mode now."
) )
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.framework as framework
from paddle.distributed import collective
def _alltoall_single_in_dygraph(
out_tensor,
in_tensor,
out_split_sizes,
in_split_sizes,
group,
sync_op,
use_calc_stream,
):
group = collective._get_default_group() if group is None else group
if out_split_sizes is None:
out_split_sizes = []
if in_split_sizes is None:
in_split_sizes = []
if use_calc_stream:
return group.process_group.alltoall_single_on_calc_stream(
in_tensor, out_tensor, in_split_sizes, out_split_sizes
)
task = group.process_group.alltoall_single(
in_tensor, out_tensor, in_split_sizes, out_split_sizes, sync_op
)
if sync_op:
task.wait()
return task
def alltoall_single(
out_tensor,
in_tensor,
out_split_sizes=None,
in_split_sizes=None,
group=None,
sync_op=True,
use_calc_stream=False,
):
"""
Split and Scatter the splitted input tensor to the out tensor across devices.
Args:
out_tensor(Tensor): The output tensor. Its data type should be the same as the input.
in_tensor (Tensor): The input tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool.
out_split_sizes (List[int], optional): Split sizes of out_tensor for dim[0]. If not given, dim[0] of out_tensor must be divisible
by group size and out_tensor will be gathered averagely from all participators. If none is given, use a empty list as default.
in_split_sizes (List[int], optional): Split sizes of in_tensor for dim[0]. If not given, dim[0] of in_tensor must be divisible
by group size and in_tensor will be scattered averagely to all participators. If none is given, use a empty list as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.
Returns:
Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.distributed as dist
dist.init_parallel_env()
local_rank = dist.get_rank()
# case 1
output = paddle.empty([2], dtype="int64")
if local_rank == 0:
data = paddle.to_tensor([0, 1])
else:
data = paddle.to_tensor([2, 3])
task = dist.stream.alltoall_single(output, data, sync_op=False)
task.wait()
out = output.numpy()
# [0, 2] (2 GPUs, out for rank 0)
# [1, 3] (2 GPUs, out for rank 1)
# case 2
size = dist.get_world_size()
output = paddle.empty([(local_rank + 1) * size, size], dtype='float32')
if local_rank == 0:
data = paddle.to_tensor([[0., 0.], [0., 0.], [0., 0.]])
else:
data = paddle.to_tensor([[1., 1.], [1., 1.], [1., 1.]])
out_split_sizes = [local_rank + 1 for i in range(size)]
in_split_sizes = [i + 1 for i in range(size)]
task = dist.stream.alltoall_single(output,
data,
out_split_sizes,
in_split_sizes,
sync_op=False)
task.wait()
out = output.numpy()
# [[0., 0.], [1., 1.]] (2 GPUs, out for rank 0)
# [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] (2 GPUs, out for rank 1)
"""
if group is not None and not group.is_member():
raise RuntimeError(
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be true in sync op behavior."
)
if framework.in_dygraph_mode():
return _alltoall_single_in_dygraph(
out_tensor,
in_tensor,
out_split_sizes,
in_split_sizes,
group,
sync_op,
use_calc_stream,
)
raise RuntimeError(
"paddle.distributed.stream.alltoall_single is only supported in dygraph mode now."
)
...@@ -13,29 +13,74 @@ ...@@ -13,29 +13,74 @@
# limitations under the License. # limitations under the License.
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed import collective import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.group import (
def _broadcast_in_dygraph(tensor, src, group, sync_op, use_calc_stream): _get_global_group,
group = collective._get_default_group() if group is None else group _warn_cur_rank_not_in_group,
_get_or_throw_group_rank,
)
def _broadcast_in_dygraph(
tensor, src_rank_in_group, group, sync_op, use_calc_stream
):
if use_calc_stream: if use_calc_stream:
return group.process_group.broadcast_on_calc_stream(tensor, src) return group.process_group.broadcast_on_calc_stream(
tensor, src_rank_in_group
)
task = group.process_group.broadcast(tensor, src, sync_op) task = group.process_group.broadcast(tensor, src_rank_in_group, sync_op)
if sync_op: if sync_op:
task.wait() task.wait()
return task return task
def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): def _broadcast_in_static_mode(
tensor, src_rank_in_group, group, sync_op, use_calc_stream
):
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
'int8',
'uint8',
'bool',
],
'broadcast',
)
op_type = 'c_broadcast'
helper = layer_helper.LayerHelper(op_type, **locals())
ring_id = 0 if group is None else group.id
helper.append_op(
type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
attrs={
'root': src_rank_in_group,
'use_calc_stream': sync_op,
'ring_id': ring_id,
},
)
return None
def broadcast(tensor, src, group=None, sync_op=True, use_calc_stream=False):
""" """
Broadcast a tensor to all devices. Broadcast a tensor to all devices.
Args: Args:
tensor (Tensor): The tensor to broadcast. Support float16, float32, float64, int32, int64, int8, uint8 or bool as its data type. tensor (Tensor): The tensor to broadcast. Support float16, float32, float64, int32, int64, int8, uint8 or bool as its data type.
src (int, optional): Rank of the source device. If none is given, use `0` as default. src (int, optional): Rank of the source device.
group (Group, optional): Communicate in which group. If none is given, use the global group as default. group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
...@@ -65,10 +110,8 @@ def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -65,10 +110,8 @@ def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
out = data.numpy() out = data.numpy()
# [[1, 2, 3], [1, 2, 3]] (2 GPUs) # [[1, 2, 3], [1, 2, 3]] (2 GPUs)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
...@@ -76,10 +119,14 @@ def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -76,10 +119,14 @@ def broadcast(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
) )
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
group = _get_global_group() if group is None else group
src_rank_in_group = _get_or_throw_group_rank(src, group)
return _broadcast_in_dygraph( return _broadcast_in_dygraph(
tensor, src_rank_in_group, group, sync_op, use_calc_stream
)
else:
assert group is None, "Group can not be used in static mode for now."
return _broadcast_in_static_mode(
tensor, src, group, sync_op, use_calc_stream tensor, src, group, sync_op, use_calc_stream
) )
raise RuntimeError(
"paddle.distributed.stream.broadcast is only supported in dygraph mode now."
)
...@@ -13,21 +13,56 @@ ...@@ -13,21 +13,56 @@
# limitations under the License. # limitations under the License.
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed import collective import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.group import (
def _recv_in_dygraph(tensor, src, group, sync_op, use_calc_stream): _get_global_group,
group = collective._get_default_group() if group is None else group _warn_cur_rank_not_in_group,
_get_or_throw_group_rank,
)
def _recv_in_dygraph(
tensor, src_rank_in_group, group, sync_op, use_calc_stream
):
if use_calc_stream: if use_calc_stream:
return group.process_group.recv_on_calc_stream(tensor, src) return group.process_group.recv_on_calc_stream(
tensor, src_rank_in_group
)
task = group.process_group.recv(tensor, src, sync_op) task = group.process_group.recv(tensor, src_rank_in_group, sync_op)
if sync_op: if sync_op:
task.wait() task.wait()
return task return task
def _recv_in_static_mode(
tensor, src_rank_in_group, group, sync_op, use_calc_stream
):
op_type = 'recv_v2'
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
['float16', 'float32', 'float64', 'int32', 'int64'],
'recv',
)
ring_id = 0 if group is None else group.id
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
outputs={'Out': [tensor]},
attrs={
'ring_id': ring_id,
'peer': src_rank_in_group,
'out_shape': tensor.shape,
'dtype': tensor.dtype,
'use_calc_stream': sync_op,
},
)
return None
def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
""" """
...@@ -44,9 +79,6 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -44,9 +79,6 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
Returns: Returns:
Return a task object. Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -66,10 +98,8 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -66,10 +98,8 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
out = data.numpy() out = data.numpy()
# [[4, 5, 6], [4, 5, 6]] (2 GPUs) # [[4, 5, 6], [4, 5, 6]] (2 GPUs)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
...@@ -77,8 +107,14 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -77,8 +107,14 @@ def recv(tensor, src=0, group=None, sync_op=True, use_calc_stream=False):
) )
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
return _recv_in_dygraph(tensor, src, group, sync_op, use_calc_stream) group = _get_global_group() if group is None else group
src_rank_in_group = _get_or_throw_group_rank(src, group)
raise RuntimeError( return _recv_in_dygraph(
"paddle.distributed.stream.recv is only supported in dygraph mode now." tensor, src_rank_in_group, group, sync_op, use_calc_stream
) )
else:
assert group is None, "Group can not be used in static mode for now."
return _recv_in_static_mode(
tensor, src, group, sync_op, use_calc_stream
)
...@@ -13,23 +13,70 @@ ...@@ -13,23 +13,70 @@
# limitations under the License. # limitations under the License.
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed.communication.group import _get_global_group import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.group import (
_get_global_group,
_warn_cur_rank_not_in_group,
_get_or_throw_group_rank,
)
from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp
def _reduce_in_dygraph(tensor, dst, op, group, sync_op, use_calc_stream): def _reduce_in_dygraph(
tensor, dst_rank_in_group, op, group, sync_op, use_calc_stream
):
op_type = _get_reduce_op(op, "reduce") op_type = _get_reduce_op(op, "reduce")
group = _get_global_group() if group is None else group
if use_calc_stream: if use_calc_stream:
return group.process_group.reduce_on_calc_stream(tensor, dst, op_type) return group.process_group.reduce_on_calc_stream(
tensor, dst_rank_in_group, op_type
)
task = group.process_group.reduce(tensor, dst, op_type, sync_op) task = group.process_group.reduce(
tensor, dst_rank_in_group, op_type, sync_op
)
if sync_op: if sync_op:
task.wait() task.wait()
return task return task
def _reduce_in_static_mode(
tensor, dst_rank_in_group, op, group, sync_op, use_calc_stream
):
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
'int8',
'uint8',
'bool',
],
'reduce',
)
op_type = _get_reduce_op(op, "reduce")
ring_id = 0 if group is None else group.id
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [tensor]},
outputs={'Out': [tensor]},
attrs={
'ring_id': ring_id,
'use_calc_stream': sync_op,
'root_id': dst_rank_in_group,
},
)
return None
def reduce( def reduce(
tensor, tensor,
dst=0, dst=0,
...@@ -77,10 +124,8 @@ def reduce( ...@@ -77,10 +124,8 @@ def reduce(
# [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0) # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0)
# [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1) # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
...@@ -88,10 +133,13 @@ def reduce( ...@@ -88,10 +133,13 @@ def reduce(
) )
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
group = _get_global_group() if group is None else group
dst_rank_in_group = _get_or_throw_group_rank(dst, group)
return _reduce_in_dygraph( return _reduce_in_dygraph(
tensor, dst_rank_in_group, op, group, sync_op, use_calc_stream
)
else:
assert group is None, "Group can not be used in static mode for now."
return _reduce_in_static_mode(
tensor, dst, op, group, sync_op, use_calc_stream tensor, dst, op, group, sync_op, use_calc_stream
) )
raise RuntimeError(
"paddle.distributed.stream.reduce is only supported in dygraph mode now."
)
...@@ -14,7 +14,10 @@ ...@@ -14,7 +14,10 @@
import paddle import paddle
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed.communication.group import _get_global_group from paddle.distributed.communication.group import (
_get_global_group,
_warn_cur_rank_not_in_group,
)
from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp
...@@ -104,7 +107,7 @@ def reduce_scatter( ...@@ -104,7 +107,7 @@ def reduce_scatter(
Args: Args:
tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support tensor (Tensor): The output tensor on each rank. The result will overwrite this tenor after communication. Support
float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type. float16, float32, float64, int32, int64, int8, uint8 or bool as the input data type.
tensor_list (List[Tensor]]): The input to scatter. tensor_or_tensor_list (Union[Tensor, List[Tensor]]): The input to scatter.
If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors. If it is a tensor, it should be correctly-sized. If it is a list, it should contain correctly-sized tensors.
op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default. group (Group, optional): Communicate in which group. If none is given, use the global group as default.
...@@ -137,10 +140,8 @@ def reduce_scatter( ...@@ -137,10 +140,8 @@ def reduce_scatter(
# [4, 6] (2 GPUs, out for rank 0) # [4, 6] (2 GPUs, out for rank 0)
# [8, 10] (2 GPUs, out for rank 1) # [8, 10] (2 GPUs, out for rank 1)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
...@@ -220,10 +221,8 @@ def _reduce_scatter_base( ...@@ -220,10 +221,8 @@ def _reduce_scatter_base(
# [1, 2, 3] (2 GPUs, out for rank 0) # [1, 2, 3] (2 GPUs, out for rank 0)
# [4, 5, 6] (2 GPUs, out for rank 1) # [4, 5, 6] (2 GPUs, out for rank 1)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
......
...@@ -12,10 +12,17 @@ ...@@ -12,10 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import warnings
import paddle import paddle
import paddle.distributed as dist import paddle.distributed as dist
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed import collective import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.group import (
_get_global_group,
_warn_cur_rank_not_in_group,
_get_or_throw_group_rank,
)
def _check_tensor_shape(tensor, shape, nranks=1): def _check_tensor_shape(tensor, shape, nranks=1):
...@@ -38,26 +45,19 @@ def _check_tensor_list_shape(tensor_list, shape, nranks=1): ...@@ -38,26 +45,19 @@ def _check_tensor_list_shape(tensor_list, shape, nranks=1):
def _scatter_tensor_in_dygraph( def _scatter_tensor_in_dygraph(
out_tensor, in_tensor, src, group, sync_op, use_calc_stream out_tensor, in_tensor, src_rank_in_group, group, sync_op, use_calc_stream
): ):
group = collective._get_default_group() if group is None else group
src_rank = group.get_group_rank(src)
if src_rank == -1:
raise RuntimeError("Src rank out of group.")
nranks = group.nranks nranks = group.nranks
rank = dist.get_rank() if group.rank == src_rank_in_group:
if rank == src_rank:
_check_tensor_shape(out_tensor, in_tensor.shape, nranks) _check_tensor_shape(out_tensor, in_tensor.shape, nranks)
if use_calc_stream: if use_calc_stream:
return group.process_group.scatter_tensor_on_calc_stream( return group.process_group.scatter_tensor_on_calc_stream(
in_tensor, out_tensor, src in_tensor, out_tensor, src_rank_in_group
) )
task = group.process_group.scatter_tensor( task = group.process_group.scatter_tensor(
in_tensor, out_tensor, src, sync_op in_tensor, out_tensor, src_rank_in_group, sync_op
) )
if sync_op: if sync_op:
task.wait() task.wait()
...@@ -66,17 +66,10 @@ def _scatter_tensor_in_dygraph( ...@@ -66,17 +66,10 @@ def _scatter_tensor_in_dygraph(
def _scatter_in_dygraph( def _scatter_in_dygraph(
tensor, tensor_list, src, group, sync_op, use_calc_stream tensor, tensor_list, src_rank_in_group, group, sync_op, use_calc_stream
): ):
group = collective._get_default_group() if group is None else group
src_rank = group.get_group_rank(src)
if src_rank == -1:
raise RuntimeError("Src rank out of group.")
nranks = group.nranks nranks = group.nranks
rank = dist.get_rank() if group.rank == src_rank_in_group:
if rank == src_rank:
if len(tensor_list) == 0: if len(tensor_list) == 0:
raise RuntimeError( raise RuntimeError(
"The tensor_list should not be empty on src rank." "The tensor_list should not be empty on src rank."
...@@ -87,16 +80,76 @@ def _scatter_in_dygraph( ...@@ -87,16 +80,76 @@ def _scatter_in_dygraph(
if use_calc_stream: if use_calc_stream:
return group.process_group.scatter_on_calc_stream( return group.process_group.scatter_on_calc_stream(
tensor_list, tensor, src tensor_list, tensor, src_rank_in_group
) )
task = group.process_group.scatter(tensor_list, tensor, src, sync_op) task = group.process_group.scatter(
tensor_list, tensor, src_rank_in_group, sync_op
)
if sync_op: if sync_op:
task.wait() task.wait()
return task return task
def _scatter_in_static_mode(
tensor,
tensor_or_tensor_list,
src_rank_in_group,
group,
sync_op,
use_calc_stream,
):
nranks = dist.get_world_size() if group is None else group.nranks
rank = dist.get_rank()
input_tensor = tensor_or_tensor_list
if isinstance(tensor_or_tensor_list, list):
tensor_list = tensor_or_tensor_list
if rank == src_rank_in_group:
if len(tensor_list) == 0:
raise RuntimeError(
"The tensor_list should not be empty on src rank."
)
else:
tensor_list = [tensor for _ in range(nranks)]
input_tensor = paddle.concat(tensor_list, axis=0)
ring_id = 0 if group is None else group.id
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
'int8',
'uint8',
'bool',
],
'scatter',
)
op_type = 'c_scatter'
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [input_tensor]},
outputs={'Out': [tensor]},
attrs={
'ring_id': ring_id,
'root': src_rank_in_group,
'use_calc_stream': sync_op,
'nranks': nranks,
},
)
return None
def scatter( def scatter(
tensor, tensor,
tensor_or_tensor_list=None, tensor_or_tensor_list=None,
...@@ -146,25 +199,34 @@ def scatter( ...@@ -146,25 +199,34 @@ def scatter(
# [1, 2, 3] (2 GPUs, out for rank 0) # [1, 2, 3] (2 GPUs, out for rank 0)
# [4, 5, 6] (2 GPUs, out for rank 1) # [4, 5, 6] (2 GPUs, out for rank 1)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
"use_calc_stream can only be true in sync op behavior." "use_calc_stream can only be true in sync op behavior."
) )
if tensor_or_tensor_list is None: # NOTE(liyurui): Only the source rank needs to specific the tensor_or_tensor_list argument.
raise RuntimeError("The input should be specified.") # Other ranks which pass this argument in will be ignored with a warning.
# If a tensor_list passed in, we need to concat it to a tensor before invoke C++ API.
# If a tensor passed in, concat is not needed.
# The passed in type for non-src rank is meaningless, for it will be ignored.
if src != dist.get_rank():
if tensor_or_tensor_list is not None:
warnings.warn(
"Specific `tensor_or_tensor_list` is meaningless for rank which is not src."
)
tensor_or_tensor_list = []
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
group = _get_global_group() if group is None else group
src_rank_in_group = _get_or_throw_group_rank(src, group)
if paddle.is_tensor(tensor_or_tensor_list): if paddle.is_tensor(tensor_or_tensor_list):
return _scatter_tensor_in_dygraph( return _scatter_tensor_in_dygraph(
tensor, tensor,
tensor_or_tensor_list, tensor_or_tensor_list,
src, src_rank_in_group,
group, group,
sync_op, sync_op,
use_calc_stream, use_calc_stream,
...@@ -173,12 +235,19 @@ def scatter( ...@@ -173,12 +235,19 @@ def scatter(
return _scatter_in_dygraph( return _scatter_in_dygraph(
tensor, tensor,
tensor_or_tensor_list, tensor_or_tensor_list,
src, src_rank_in_group,
group, group,
sync_op, sync_op,
use_calc_stream, use_calc_stream,
) )
else:
raise RuntimeError( assert group is None, "Group can not be used in static mode for now."
"paddle.distributed.stream.scatter is only supported in dygraph mode now."
) return _scatter_in_static_mode(
tensor,
tensor_or_tensor_list,
src,
group,
sync_op,
use_calc_stream,
)
...@@ -13,21 +13,55 @@ ...@@ -13,21 +13,55 @@
# limitations under the License. # limitations under the License.
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed import collective import paddle.fluid.data_feeder as data_feeder
import paddle.fluid.layer_helper as layer_helper
from paddle.distributed.communication.group import (
def _send_in_dygraph(tensor, dst, group, sync_op, use_calc_stream): _get_global_group,
group = collective._get_default_group() if group is None else group _warn_cur_rank_not_in_group,
_get_or_throw_group_rank,
)
def _send_in_dygraph(
tensor, dst_rank_in_group, group, sync_op, use_calc_stream
):
if use_calc_stream: if use_calc_stream:
return group.process_group.send_on_calc_stream(tensor, dst) return group.process_group.send_on_calc_stream(
tensor, dst_rank_in_group
)
task = group.process_group.send(tensor, dst, sync_op) task = group.process_group.send(tensor, dst_rank_in_group, sync_op)
if sync_op: if sync_op:
task.wait() task.wait()
return task return task
def _send_in_static_mode(
tensor, dst_rank_in_group, group, sync_op, use_calc_stream
):
op_type = 'send_v2'
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
['float16', 'float32', 'float64', 'int32', 'int64'],
'send',
)
ring_id = 0 if group is None else group.id
helper = layer_helper.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'X': [tensor]},
attrs={
'ring_id': ring_id,
'peer': dst_rank_in_group,
'use_calc_stream': sync_op,
},
)
return None
def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False):
""" """
...@@ -44,9 +78,6 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -44,9 +78,6 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False):
Returns: Returns:
Return a task object. Return a task object.
Warning:
This API only supports the dygraph mode now.
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -66,10 +97,8 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -66,10 +97,8 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False):
out = data.numpy() out = data.numpy()
# [[4, 5, 6], [4, 5, 6]] (2 GPUs) # [[4, 5, 6], [4, 5, 6]] (2 GPUs)
""" """
if group is not None and not group.is_member(): if _warn_cur_rank_not_in_group(group):
raise RuntimeError( return
"The group should not be None and all ranks which invoke this operation should be the member of this group."
)
if not sync_op and use_calc_stream: if not sync_op and use_calc_stream:
raise RuntimeError( raise RuntimeError(
...@@ -77,8 +106,14 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False): ...@@ -77,8 +106,14 @@ def send(tensor, dst=0, group=None, sync_op=True, use_calc_stream=False):
) )
if framework.in_dygraph_mode(): if framework.in_dygraph_mode():
return _send_in_dygraph(tensor, dst, group, sync_op, use_calc_stream) group = _get_global_group() if group is None else group
dst_rank_in_group = _get_or_throw_group_rank(dst, group)
raise RuntimeError( return _send_in_dygraph(
"paddle.distributed.stream.send is only supported in dygraph mode now." tensor, dst_rank_in_group, group, sync_op, use_calc_stream
) )
else:
assert group is None, "Group can not be used in static mode for now."
return _send_in_static_mode(
tensor, dst, group, sync_op, use_calc_stream
)
...@@ -22,7 +22,7 @@ from paddle.fluid.layer_helper import LayerHelper ...@@ -22,7 +22,7 @@ from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.data_feeder import check_variable_and_dtype from paddle.fluid.data_feeder import check_variable_and_dtype
from paddle.fluid.dygraph import layers from paddle.fluid.dygraph import layers
from paddle.distributed import collective from paddle.distributed import collective
from ....communication.reduce import ReduceOp from ....communication.reduce import ReduceOp, _get_reduce_op
from paddle.fluid.data_feeder import check_dtype from paddle.fluid.data_feeder import check_dtype
import paddle.fluid.dygraph_utils as dygraph_utils import paddle.fluid.dygraph_utils as dygraph_utils
...@@ -61,7 +61,7 @@ def _c_identity(tensor, group=None): ...@@ -61,7 +61,7 @@ def _c_identity(tensor, group=None):
@staticmethod @staticmethod
def backward(ctx, dy): def backward(ctx, dy):
op_type = collective._get_reduce_op(ReduceOp.SUM, "_c_identity") op_type = _get_reduce_op(ReduceOp.SUM, "_c_identity")
group.process_group.allreduce_on_calc_stream(dy, op_type) group.process_group.allreduce_on_calc_stream(dy, op_type)
return dy return dy
...@@ -254,7 +254,7 @@ def _mp_allreduce( ...@@ -254,7 +254,7 @@ def _mp_allreduce(
ctx.ring_id = group.id ctx.ring_id = group.id
if use_calc_stream: if use_calc_stream:
op_type = collective._get_reduce_op(op, "_mp_allreduce") op_type = _get_reduce_op(op, "_mp_allreduce")
group.process_group.allreduce_on_calc_stream( group.process_group.allreduce_on_calc_stream(
tensor, op_type tensor, op_type
) )
......
...@@ -27,13 +27,13 @@ import numpy as np ...@@ -27,13 +27,13 @@ import numpy as np
from collections import OrderedDict from collections import OrderedDict
import paddle import paddle
import paddle.distributed as dist
from paddle.fluid import core from paddle.fluid import core
from paddle.optimizer import Optimizer from paddle.optimizer import Optimizer
from paddle.fluid.clip import ClipGradByGlobalNorm from paddle.fluid.clip import ClipGradByGlobalNorm
from paddle.distributed.collective import ( from paddle.distributed.collective import (
_get_global_group, _get_global_group,
new_group, new_group,
broadcast,
wait, wait,
) )
...@@ -169,7 +169,7 @@ class ShardingOptimizerStage2(Optimizer): ...@@ -169,7 +169,7 @@ class ShardingOptimizerStage2(Optimizer):
""" """
for p in self._local_params: for p in self._local_params:
broadcast( dist.broadcast(
p, src=self._global_root_rank, group=self.group, sync_op=True p, src=self._global_root_rank, group=self.group, sync_op=True
) )
...@@ -456,7 +456,7 @@ class ShardingOptimizerStage2(Optimizer): ...@@ -456,7 +456,7 @@ class ShardingOptimizerStage2(Optimizer):
# Exchange all the shards with the other ranks # Exchange all the shards with the other ranks
for dtype_per_rank in self.param_storages.values(): for dtype_per_rank in self.param_storages.values():
for dst_rank, internal_storage in dtype_per_rank.items(): for dst_rank, internal_storage in dtype_per_rank.items():
broadcast( dist.broadcast(
tensor=internal_storage.buffer, tensor=internal_storage.buffer,
src=self.group.ranks[dst_rank], src=self.group.ranks[dst_rank],
group=self.group, group=self.group,
......
...@@ -28,6 +28,7 @@ import warnings ...@@ -28,6 +28,7 @@ import warnings
from collections import OrderedDict from collections import OrderedDict
import paddle import paddle
import paddle.distributed as dist
from paddle.fluid import core from paddle.fluid import core
from paddle.optimizer import Optimizer from paddle.optimizer import Optimizer
from paddle.fluid.clip import ClipGradByGlobalNorm from paddle.fluid.clip import ClipGradByGlobalNorm
...@@ -38,7 +39,6 @@ HybridParallelClipGrad = ( ...@@ -38,7 +39,6 @@ HybridParallelClipGrad = (
) )
from paddle.distributed.collective import ( from paddle.distributed.collective import (
_get_global_group, _get_global_group,
broadcast,
new_group, new_group,
) )
...@@ -206,12 +206,12 @@ class GroupShardedOptimizerStage2(Optimizer): ...@@ -206,12 +206,12 @@ class GroupShardedOptimizerStage2(Optimizer):
""" """
for p in self._local_params: for p in self._local_params:
broadcast( dist.broadcast(
p, src=self._global_root_rank, group=self._group, sync_op=True p, src=self._global_root_rank, group=self._group, sync_op=True
) )
if self._dp_group: if self._dp_group:
broadcast( dist.broadcast(
p, p,
src=self._dp_group.ranks[0], src=self._dp_group.ranks[0],
group=self._dp_group, group=self._dp_group,
...@@ -562,7 +562,7 @@ class GroupShardedOptimizerStage2(Optimizer): ...@@ -562,7 +562,7 @@ class GroupShardedOptimizerStage2(Optimizer):
else: else:
for dtype_per_rank in self.param_storages.values(): for dtype_per_rank in self.param_storages.values():
for dst_rank, internal_storage in dtype_per_rank.items(): for dst_rank, internal_storage in dtype_per_rank.items():
broadcast( dist.broadcast(
tensor=internal_storage.buffer, tensor=internal_storage.buffer,
src=self._group.ranks[dst_rank], src=self._group.ranks[dst_rank],
group=self._group, group=self._group,
...@@ -590,7 +590,7 @@ class GroupShardedOptimizerStage2(Optimizer): ...@@ -590,7 +590,7 @@ class GroupShardedOptimizerStage2(Optimizer):
if x.trainable: if x.trainable:
group = self._broadcast_groups[group_idx] group = self._broadcast_groups[group_idx]
group_idx = (group_idx + 1) % self._number_of_broadcast_groups group_idx = (group_idx + 1) % self._number_of_broadcast_groups
task = broadcast( task = dist.broadcast(
tensor=x, tensor=x,
src=group.ranks[self._param2rank[x.name]], src=group.ranks[self._param2rank[x.name]],
group=group, group=group,
......
...@@ -27,6 +27,7 @@ from functools import reduce ...@@ -27,6 +27,7 @@ from functools import reduce
from types import MethodType from types import MethodType
import paddle import paddle
import paddle.distributed as dist
from paddle import nn from paddle import nn
from paddle.distributed import collective from paddle.distributed import collective
from paddle.distributed.utils.log_utils import get_logger from paddle.distributed.utils.log_utils import get_logger
...@@ -324,12 +325,12 @@ class GroupShardedStage2(nn.Layer): ...@@ -324,12 +325,12 @@ class GroupShardedStage2(nn.Layer):
""" """
for buffer in self._layer.buffers(include_sublayers=True): for buffer in self._layer.buffers(include_sublayers=True):
collective.broadcast( dist.broadcast(
buffer, self._global_root_rank, self._group, sync_op=True buffer, self._global_root_rank, self._group, sync_op=True
) )
if self._dp_group and self._dp_group.nranks > 1: if self._dp_group and self._dp_group.nranks > 1:
collective.broadcast( dist.broadcast(
buffer, buffer,
self._dp_group.ranks[0], self._dp_group.ranks[0],
self._dp_group, self._dp_group,
...@@ -402,7 +403,7 @@ class GroupShardedStage2(nn.Layer): ...@@ -402,7 +403,7 @@ class GroupShardedStage2(nn.Layer):
# Synchronize the reduce parameter gradient asynchronize # Synchronize the reduce parameter gradient asynchronize
self._sharding_optimizers[0]._update_task( self._sharding_optimizers[0]._update_task(
collective.reduce( dist.reduce(
tensor=param.grad, tensor=param.grad,
dst=self._group.ranks[dst_rank], dst=self._group.ranks[dst_rank],
group=self._group, group=self._group,
...@@ -415,7 +416,7 @@ class GroupShardedStage2(nn.Layer): ...@@ -415,7 +416,7 @@ class GroupShardedStage2(nn.Layer):
not self._reduce_overlap not self._reduce_overlap
), 'dp + stage2 hybrid parallel only Synchronize due to the new communication lib.' ), 'dp + stage2 hybrid parallel only Synchronize due to the new communication lib.'
# TODO(wuhuachao):after the new communication lib upgrading, overlapping the comm of dp + stage2. # TODO(wuhuachao):after the new communication lib upgrading, overlapping the comm of dp + stage2.
collective.all_reduce( dist.all_reduce(
tensor=param.grad, tensor=param.grad,
group=self._dp_group, group=self._dp_group,
sync_op=True, sync_op=True,
...@@ -469,7 +470,7 @@ class GroupShardedStage2(nn.Layer): ...@@ -469,7 +470,7 @@ class GroupShardedStage2(nn.Layer):
grad_storage.sent = True grad_storage.sent = True
# Synchronize the reduce parameter gradient asynchronize # Synchronize the reduce parameter gradient asynchronize
self._sharding_optimizers[0]._update_task( self._sharding_optimizers[0]._update_task(
collective.reduce( dist.reduce(
tensor=grad_storage.buffer, tensor=grad_storage.buffer,
dst=self._group.ranks[grad_storage.destination], dst=self._group.ranks[grad_storage.destination],
group=self._group, group=self._group,
...@@ -482,7 +483,7 @@ class GroupShardedStage2(nn.Layer): ...@@ -482,7 +483,7 @@ class GroupShardedStage2(nn.Layer):
not self._reduce_overlap not self._reduce_overlap
), 'dp + stage2 hybrid parallel only Synchronize due to the new communication lib.' ), 'dp + stage2 hybrid parallel only Synchronize due to the new communication lib.'
# TODO(wuhuachao):after the new communication lib upgrading, overlapping the comm of dp + stage2. # TODO(wuhuachao):after the new communication lib upgrading, overlapping the comm of dp + stage2.
collective.all_reduce( dist.all_reduce(
tensor=grad_storage.buffer, tensor=grad_storage.buffer,
group=self._dp_group, group=self._dp_group,
sync_op=True, sync_op=True,
......
...@@ -18,6 +18,7 @@ from types import MethodType ...@@ -18,6 +18,7 @@ from types import MethodType
from collections import OrderedDict from collections import OrderedDict
import paddle import paddle
import paddle.distributed as dist
from paddle import nn from paddle import nn
from paddle.autograd import PyLayer from paddle.autograd import PyLayer
import paddle.fluid.core as core import paddle.fluid.core as core
...@@ -196,7 +197,7 @@ class GroupShardedStage3(nn.Layer): ...@@ -196,7 +197,7 @@ class GroupShardedStage3(nn.Layer):
""" """
for p in self._layer.parameters(): for p in self._layer.parameters():
collective.broadcast( dist.broadcast(
p, src=self._global_root_rank, group=self._group, sync_op=True p, src=self._global_root_rank, group=self._group, sync_op=True
) )
...@@ -493,7 +494,7 @@ class GroupShardedStage3(nn.Layer): ...@@ -493,7 +494,7 @@ class GroupShardedStage3(nn.Layer):
""" """
for buffer in self._layer.buffers(include_sublayers=True): for buffer in self._layer.buffers(include_sublayers=True):
collective.broadcast( dist.broadcast(
buffer, self._global_root_rank, self._group, sync_op=True buffer, self._global_root_rank, self._group, sync_op=True
) )
...@@ -536,7 +537,7 @@ class GroupShardedStage3(nn.Layer): ...@@ -536,7 +537,7 @@ class GroupShardedStage3(nn.Layer):
# 2.Handle unslice param # 2.Handle unslice param
for grad_storage in self._grad_storages.values(): for grad_storage in self._grad_storages.values():
grad_storage.buffer.scale_(scale=self._world_size_scaling) grad_storage.buffer.scale_(scale=self._world_size_scaling)
collective.all_reduce(tensor=grad_storage.buffer, group=self._group) dist.all_reduce(tensor=grad_storage.buffer, group=self._group)
if self._offload: if self._offload:
for param in list(self._unslice_params): for param in list(self._unslice_params):
param._clear_data() param._clear_data()
...@@ -600,7 +601,7 @@ class GroupShardedStage3(nn.Layer): ...@@ -600,7 +601,7 @@ class GroupShardedStage3(nn.Layer):
if param.name in self._task_flow.full_grad.keys(): if param.name in self._task_flow.full_grad.keys():
full_grad = self._task_flow.full_grad[param.name] full_grad = self._task_flow.full_grad[param.name]
# Only support sync allreduce current rank's layer now # Only support sync allreduce current rank's layer now
collective.all_reduce(tensor=full_grad, group=self._group) dist.all_reduce(tensor=full_grad, group=self._group)
start, end = self._param2buffer[param.name][self._rank] start, end = self._param2buffer[param.name][self._rank]
if param.bw_storage is None: if param.bw_storage is None:
......
...@@ -30,8 +30,9 @@ from collections import deque ...@@ -30,8 +30,9 @@ from collections import deque
from types import MethodType from types import MethodType
import paddle import paddle
import paddle.distributed as dist
from paddle import nn from paddle import nn
from paddle.distributed import collective as dist from paddle.distributed import collective as collective
from paddle.distributed.collective import _get_global_group from paddle.distributed.collective import _get_global_group
from ...utils.internal_storage import GradStorage from ...utils.internal_storage import GradStorage
...@@ -92,7 +93,7 @@ class ShardingStage2(nn.Layer): ...@@ -92,7 +93,7 @@ class ShardingStage2(nn.Layer):
# Communication related attributes # Communication related attributes
self._group = ( self._group = (
dist.new_group(_get_global_group().ranks) collective.new_group(_get_global_group().ranks)
if group is None if group is None
else group else group
) )
...@@ -317,7 +318,7 @@ class ShardingStage2(nn.Layer): ...@@ -317,7 +318,7 @@ class ShardingStage2(nn.Layer):
buffer, self._global_root_rank, self._group, sync_op=True buffer, self._global_root_rank, self._group, sync_op=True
) )
# Multi stream operation will be supported later # Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) collective.wait(tensor=buffer, group=self._group, use_calc_stream=True)
def __getattr__(self, name): def __getattr__(self, name):
"""Forward missing attributes to wrapped layer.""" """Forward missing attributes to wrapped layer."""
...@@ -381,7 +382,7 @@ class ShardingStage2(nn.Layer): ...@@ -381,7 +382,7 @@ class ShardingStage2(nn.Layer):
) )
# Multi stream operation will be supported later # Multi stream operation will be supported later
dist.wait( collective.wait(
tensor=param.grad, tensor=param.grad,
group=self._group, group=self._group,
use_calc_stream=True, use_calc_stream=True,
...@@ -447,7 +448,7 @@ class ShardingStage2(nn.Layer): ...@@ -447,7 +448,7 @@ class ShardingStage2(nn.Layer):
) )
# Multi stream operation will be supported later # Multi stream operation will be supported later
dist.wait( collective.wait(
tensor=grad_storage.buffer, tensor=grad_storage.buffer,
group=self._group, group=self._group,
use_calc_stream=True, use_calc_stream=True,
......
...@@ -18,12 +18,13 @@ from types import MethodType ...@@ -18,12 +18,13 @@ from types import MethodType
from collections import OrderedDict from collections import OrderedDict
import paddle import paddle
import paddle.distributed as dist
from paddle import nn from paddle import nn
from paddle.autograd import PyLayer from paddle.autograd import PyLayer
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.fluid.framework import ParamBase from paddle.fluid.framework import ParamBase
from paddle.fluid.clip import ClipGradByGlobalNorm from paddle.fluid.clip import ClipGradByGlobalNorm
from paddle.distributed import collective as dist from paddle.distributed import collective
from paddle.distributed.collective import _get_global_group from paddle.distributed.collective import _get_global_group
from .sharding_utils import Type, ShardingClipGrad, device_guard from .sharding_utils import Type, ShardingClipGrad, device_guard
...@@ -101,7 +102,7 @@ class ShardingStage3(nn.Layer): ...@@ -101,7 +102,7 @@ class ShardingStage3(nn.Layer):
# Communication group establishment # Communication group establishment
self._group = ( self._group = (
dist.new_group(_get_global_group().ranks) collective.new_group(_get_global_group().ranks)
if group is None if group is None
else group else group
) )
...@@ -183,7 +184,7 @@ class ShardingStage3(nn.Layer): ...@@ -183,7 +184,7 @@ class ShardingStage3(nn.Layer):
) )
# Multi stream operation will be supported later # Multi stream operation will be supported later
dist.wait(tensor=p, group=self._group, use_calc_stream=True) collective.wait(tensor=p, group=self._group, use_calc_stream=True)
def _clear_gradients(self): def _clear_gradients(self):
assert len(self._trainable_params.keys()) > 0 assert len(self._trainable_params.keys()) > 0
...@@ -484,7 +485,7 @@ class ShardingStage3(nn.Layer): ...@@ -484,7 +485,7 @@ class ShardingStage3(nn.Layer):
buffer, self._global_root_rank, self._group, sync_op=True buffer, self._global_root_rank, self._group, sync_op=True
) )
# Multi stream operation will be supported later # Multi stream operation will be supported later
dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) collective.wait(tensor=buffer, group=self._group, use_calc_stream=True)
def __getattr__(self, name): def __getattr__(self, name):
"""Forward missing attributes to wrapped layer.""" """Forward missing attributes to wrapped layer."""
...@@ -528,7 +529,7 @@ class ShardingStage3(nn.Layer): ...@@ -528,7 +529,7 @@ class ShardingStage3(nn.Layer):
dist.all_reduce( dist.all_reduce(
tensor=grad_storage.buffer, group=self._group, sync_op=True tensor=grad_storage.buffer, group=self._group, sync_op=True
) )
dist.wait( collective.wait(
tensor=grad_storage.buffer, tensor=grad_storage.buffer,
group=self._group, group=self._group,
use_calc_stream=True, use_calc_stream=True,
...@@ -600,7 +601,7 @@ class ShardingStage3(nn.Layer): ...@@ -600,7 +601,7 @@ class ShardingStage3(nn.Layer):
dist.all_reduce( dist.all_reduce(
tensor=full_grad, group=self._group, sync_op=True tensor=full_grad, group=self._group, sync_op=True
) )
dist.wait( collective.wait(
tensor=full_grad, group=self._group, use_calc_stream=True tensor=full_grad, group=self._group, use_calc_stream=True
) )
...@@ -945,7 +946,7 @@ def _allgather_buffer( ...@@ -945,7 +946,7 @@ def _allgather_buffer(
# Allgather current layer in the 1st step synchronously # Allgather current layer in the 1st step synchronously
if sync_wait: if sync_wait:
with paddle.amp.auto_cast(enable=False): with paddle.amp.auto_cast(enable=False):
dist.wait( collective.wait(
tensor=full_param, tensor=full_param,
group=group, group=group,
use_calc_stream=use_calc_stream, use_calc_stream=use_calc_stream,
......
...@@ -17,6 +17,7 @@ import unittest ...@@ -17,6 +17,7 @@ import unittest
import paddle import paddle
import numpy as np import numpy as np
import paddle.distributed as dist import paddle.distributed as dist
from paddle.distributed.communication.reduce_scatter import _reduce_scatter_base
class TestCollectiveReduceScatter(unittest.TestCase): class TestCollectiveReduceScatter(unittest.TestCase):
...@@ -75,9 +76,7 @@ class TestCollectiveReduceScatter(unittest.TestCase): ...@@ -75,9 +76,7 @@ class TestCollectiveReduceScatter(unittest.TestCase):
# [1, 2, 3, 4] # Rank-1 # [1, 2, 3, 4] # Rank-1
output = paddle.empty(shape=[2], dtype=input.dtype) output = paddle.empty(shape=[2], dtype=input.dtype)
task = paddle.distributed.collective._reduce_scatter_base( task = _reduce_scatter_base(output, input, sync_op=False)
output, input, sync_op=False
)
task.wait() task.wait()
......
...@@ -12,10 +12,10 @@ ...@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import paddle.distributed as dist
from paddle.fluid.clip import ClipGradBase, _squared_l2_norm from paddle.fluid.clip import ClipGradBase, _squared_l2_norm
from paddle.fluid.dygraph import base as imperative_base from paddle.fluid.dygraph import base as imperative_base
from paddle.fluid import core, layers from paddle.fluid import core, layers
from paddle.distributed import collective
class ClipGradForMOEByGlobalNorm(ClipGradBase): class ClipGradForMOEByGlobalNorm(ClipGradBase):
...@@ -185,9 +185,9 @@ class ClipGradForMOEByGlobalNorm(ClipGradBase): ...@@ -185,9 +185,9 @@ class ClipGradForMOEByGlobalNorm(ClipGradBase):
moe_params_grads, sum_dtype moe_params_grads, sum_dtype
) )
if global_norm_var_moe is not None: if global_norm_var_moe is not None:
collective.all_reduce( dist.all_reduce(
global_norm_var_moe, global_norm_var_moe,
op=collective.ReduceOp.SUM, op=dist.ReduceOp.SUM,
group=self.moe_group, group=self.moe_group,
) )
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册