diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc index b23942b114f3be59af1e544344ce45ab73a2fdf0..097c9799b70f232568b3ac1ebfda9360984e27e2 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.cc @@ -293,6 +293,14 @@ std::shared_ptr ProcessGroupGloo::AllReduce( std::vector& inputs, std::vector& outputs, const AllreduceOptions& opts) { + return AllReduce(inputs, outputs, opts, true); +} + +std::shared_ptr ProcessGroupGloo::AllReduce( + std::vector& inputs, + std::vector& outputs, + const AllreduceOptions& opts, + bool sync_op) { auto tag = next_tag(); std::shared_ptr task; auto context = get_context(); diff --git a/paddle/fluid/distributed/collective/ProcessGroupGloo.h b/paddle/fluid/distributed/collective/ProcessGroupGloo.h index 95ce18c1d8217e36e12e74bfbee22c54d984dc46..d911da91eb1a32bc25811595f81bee4529de8546 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupGloo.h +++ b/paddle/fluid/distributed/collective/ProcessGroupGloo.h @@ -120,6 +120,12 @@ class ProcessGroupGloo : public ProcessGroup { std::vector& outputs, const AllreduceOptions& opts = AllreduceOptions()) override; + std::shared_ptr AllReduce( + std::vector& inputs, + std::vector& outputs, + const AllreduceOptions& opts, + bool sync_op) override; + std::shared_ptr Barrier( const BarrierOptions& = BarrierOptions()) override; diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 45d6b006528117e4940fef9d8bcc580102c19e40..41cb3256c8f5d573afcc50d311d879e250edf70b 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -52,54 +52,12 @@ from .fleet.layers.mpu.mp_ops import _c_softmax_with_cross_entropy from .fleet.layers.mpu.mp_ops import _linear from .fleet.layers.mpu.mp_ops import _parallel_linear from .fleet.layers.mpu.mp_ops import _parallel_embedding -from .communication.comm_utils import ReduceOp +from .communication.group import Group, _add_new_group +from .communication.all_reduce import all_reduce +from .communication.reduce import _get_reduce_op, ReduceOp __all__ = [] - -class Group(): - """ - The abstract representation of group. - """ - - def __init__(self, rank, rank_num, id=0, ranks=[], pg=None, name=None): - self.rank = rank - self.nranks = rank_num - self.id = id - self.ranks = ranks - self.pg = pg - self.name = name - - def is_member(self): - if self.rank < 0: - return False - if self.nranks < 2: - return False - return True - - def get_group_rank(self, rank): - if self.is_member() and rank in self.ranks: - return self.ranks.index(rank) - else: - return -1 - - @property - def process_group(self): - return self.pg - - @property - def world_size(self): - return self.nranks if self.rank >= 0 else -1 - - def __repr__(self): - debug_str = "rank: {}, nranks: {}, id: {}, ranks: ".format( - self.rank, self.nranks, self.id) - debug_str += ", ".join(map(str, self.ranks)) - debug_str += "; name: " - debug_str += self.name if self.name else "None" - return debug_str - - _global_env = None @@ -147,9 +105,8 @@ def _get_group_map(): global _group_map if _global_env_gid not in _group_map: genv = _get_global_env() - _group_map[_global_env_gid] = Group(genv.rank, - genv.world_size, - ranks=list(range(genv.world_size))) + _group_map[_global_env_gid] = Group(genv.rank, 0, + list(range(genv.world_size))) return _group_map @@ -197,19 +154,6 @@ def _new_ring_id(): return len(_get_group_map()) + max(_get_global_env().nrings, 9) -def _get_reduce_op(reduce_op, func_name): - if reduce_op == ReduceOp.SUM: - return core.ReduceOp.SUM - elif reduce_op == ReduceOp.MAX: - return core.ReduceOp.MAX - elif reduce_op == ReduceOp.MIN: - return core.ReduceOp.MIN - elif reduce_op == ReduceOp.PROD: - return core.ReduceOp.PRODUCT - else: - raise ValueError("Unknown reduce_op type for {}.".format(func_name)) - - def get_group(id=0): """ @@ -451,10 +395,13 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): else: rank = -1 pg = None - group = Group(rank, size, id=gid, ranks=ranks, pg=pg, name=group_name) + group = Group(rank, gid, ranks, pg=pg, name=group_name) _group_map_by_name[group_name] = group _group_map[gid] = group _group_map_backend[group] = backend + #TODO: The method below is a new method for group management, will replace the previous + # three in the future. + _add_new_group(group) # TODO(shenliang03): This is a temporary solution to solve the problem of # hang caused by tcp @@ -476,13 +423,13 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): ring_id = _new_ring_id() if global_rank not in ranks: - gp = Group(-1, -1, ring_id, ranks) + gp = Group(-1, ring_id, ranks) _group_map[ring_id] = gp else: ranks = sorted(ranks) group_rank = ranks.index(global_rank) group_size = len(ranks) - gp = Group(group_rank, group_size, ring_id, ranks) + gp = Group(group_rank, ring_id, ranks) _group_map[ring_id] = gp if group_size >= 2: @@ -748,104 +695,6 @@ def broadcast(tensor, src, group=None, sync_op=True): }) -def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): - """ - - Reduce a tensor over all ranks so that all get the result. - As shown below, one process is started with a GPU and the data of this process is represented - by its group rank. The reduce operator is sum. Through all_reduce operator, - each GPU will have the sum of the data from all GPUs. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png - :width: 800 - :alt: all_reduce - :align: center - - Args: - tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type - should be float16, float32, float64, int32, int64, int8, uint8 or bool. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. - group (Group, optional): The group instance return by new_group or None for global default group. - sync_op (bool, optional): Wether this op is a sync op. Default value is True. - - Returns: - None. - - Examples: - .. code-block:: python - - # required: distributed - import paddle - import paddle.distributed as dist - - dist.init_parallel_env() - if dist.get_rank() == 0: - data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) - else: - data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) - dist.all_reduce(data) - print(data) - # [[5, 7, 9], [5, 7, 9]] (2 GPUs) - """ - if group is not None and not group.is_member(): - return - - if in_dygraph_mode(): - op_type = _get_reduce_op(op, "all_reduce") - group = _get_default_group() if group is None else group - task = group.process_group.allreduce(tensor, op_type) - if sync_op: - task.wait() - return None - else: - return task - - use_calc_stream = sync_op - ring_id = 0 if group is None else group.id - if _non_static_mode(): - if op == ReduceOp.SUM: - return _legacy_C_ops.c_allreduce_sum_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - elif op == ReduceOp.MAX: - return _legacy_C_ops.c_allreduce_max_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - elif op == ReduceOp.MIN: - return _legacy_C_ops.c_allreduce_min_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - elif op == ReduceOp.PROD: - return _legacy_C_ops.c_allreduce_prod_(tensor, 'use_calc_stream', - use_calc_stream, 'ring_id', - ring_id) - else: - raise ValueError("Unknown parameter: {}.".format(op)) - - check_variable_and_dtype(tensor, 'tensor', [ - 'float16', 'float32', 'float64', 'int32', 'int64', 'int8', 'uint8', - 'bool' - ], 'all_reduce') - if op == ReduceOp.SUM: - op_type = 'c_allreduce_sum' - elif op == ReduceOp.MAX: - op_type = 'c_allreduce_max' - elif op == ReduceOp.MIN: - op_type = 'c_allreduce_min' - elif op == ReduceOp.PROD: - op_type = 'c_allreduce_prod' - if not isinstance(ring_id, int): - raise ValueError("The type of 'ring_id' for all_reduce should be int.") - helper = LayerHelper(op_type, **locals()) - helper.append_op(type=op_type, - inputs={'X': [tensor]}, - outputs={'Out': [tensor]}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': use_calc_stream - }) - - def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True): """ diff --git a/python/paddle/distributed/communication/all_reduce.py b/python/paddle/distributed/communication/all_reduce.py new file mode 100644 index 0000000000000000000000000000000000000000..737e0cbbfb56c070687fa18b9df30df0cec890d0 --- /dev/null +++ b/python/paddle/distributed/communication/all_reduce.py @@ -0,0 +1,87 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid.framework as framework +from paddle.distributed.communication import stream as stream +from paddle.distributed.communication.reduce import ReduceOp + + +def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): + """ + + Reduce a tensor over all ranks so that all get the result. + As shown below, one process is started with a GPU and the data of this process is represented + by its group rank. The reduce operator is sum. Through all_reduce operator, + each GPU will have the sum of the data from all GPUs. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png + :width: 800 + :alt: all_reduce + :align: center + + Args: + tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type + should be float16, float32, float64, int32, int64, int8, uint8 or bool. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Wether this op is a sync op. Default value is True. + + Returns: + Return a task object. + + Examples: + .. code-block:: python + + # required: distributed + import paddle + import paddle.distributed as dist + + dist.init_parallel_env() + if dist.get_rank() == 0: + data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]]) + else: + data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]]) + dist.all_reduce(data) + print(data) + # [[5, 7, 9], [5, 7, 9]] (2 GPUs) + """ + if not framework._in_legacy_dygraph(): + return stream.all_reduce(tensor, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False) + + # code below will be removed after we remove the old dygraph + use_calc_stream = sync_op + ring_id = 0 if group is None else group.id + if op == ReduceOp.SUM: + return paddle._legacy_C_ops.c_allreduce_sum_(tensor, 'use_calc_stream', + use_calc_stream, 'ring_id', + ring_id) + elif op == ReduceOp.MAX: + return paddle._legacy_C_ops.c_allreduce_max_(tensor, 'use_calc_stream', + use_calc_stream, 'ring_id', + ring_id) + elif op == ReduceOp.MIN: + return paddle._legacy_C_ops.c_allreduce_min_(tensor, 'use_calc_stream', + use_calc_stream, 'ring_id', + ring_id) + elif op == ReduceOp.PROD: + return paddle._legacy_C_ops.c_allreduce_prod_(tensor, 'use_calc_stream', + use_calc_stream, + 'ring_id', ring_id) + else: + raise ValueError("Unknown parameter: {}.".format(op)) diff --git a/python/paddle/distributed/communication/group.py b/python/paddle/distributed/communication/group.py new file mode 100644 index 0000000000000000000000000000000000000000..6b4e545b245d1e987fedcd62e3331a96354930d6 --- /dev/null +++ b/python/paddle/distributed/communication/group.py @@ -0,0 +1,94 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Group(): + """ + The abstract representation of group. + """ + + def __init__(self, rank_in_group, id, ranks, pg=None, name=None): + self._rank_in_group = rank_in_group + self._world_size = len(ranks) if rank_in_group >= 0 else -1 + self._id = id + self._ranks = ranks + self._pg = pg + self._name = name + + @property + def rank(self): + return self._rank_in_group + + @property + def ranks(self): + return self._ranks + + @property + def nranks(self): + return len(self._ranks) + + @property + def name(self): + return self._name + + @property + def process_group(self): + return self._pg + + @property + def world_size(self): + return self._world_size + + @property + def id(self): + return self._id + + def is_member(self): + if self.rank < 0: + return False + if self.nranks < 2: + return False + return True + + def get_group_rank(self, rank): + if self.is_member(): + return self.ranks.index(rank) + else: + return -1 + + def __repr__(self): + debug_str = "rank: {}, nranks: {}, id: {}, ranks: ".format( + self.rank, self.nranks, self.id) + debug_str += ", ".join(map(str, self.ranks)) + debug_str += "; name: " + debug_str += self.name if self.name else "None" + return debug_str + + +class _GroupManager(): + global_group_id = 0 + group_map_by_id = {} + + +def _get_global_group(): + if _GroupManager.global_group_id not in _GroupManager.group_map_by_id: + raise RuntimeError("The global group is not initialized.") + return _GroupManager.group_map_by_id[_GroupManager.global_group_id] + + +def _add_new_group(group): + if group.id in _GroupManager.group_map_by_id: + raise RuntimeError("The group with id {} already exist.".format( + group.id)) + _GroupManager.group_map_by_id[group.id] = group diff --git a/python/paddle/distributed/communication/comm_utils.py b/python/paddle/distributed/communication/reduce.py similarity index 59% rename from python/paddle/distributed/communication/comm_utils.py rename to python/paddle/distributed/communication/reduce.py index 62e1bcb4cca94d87e804b4580cd1c3928821b19a..5caa5bebedfd8115f34252c3f490b49c32131778 100644 --- a/python/paddle/distributed/communication/comm_utils.py +++ b/python/paddle/distributed/communication/reduce.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import paddle.fluid.framework as framework +import paddle.fluid.core as core + class ReduceOp: """ @@ -48,3 +51,26 @@ class ReduceOp: MIN = 2 PROD = 3 AVG = 4 + + +def _get_reduce_op(reduce_op, func_name): + if framework.in_dygraph_mode(): + if reduce_op == ReduceOp.SUM: + return core.ReduceOp.SUM + elif reduce_op == ReduceOp.MAX: + return core.ReduceOp.MAX + elif reduce_op == ReduceOp.MIN: + return core.ReduceOp.MIN + elif reduce_op == ReduceOp.PROD: + return core.ReduceOp.PRODUCT + else: + if reduce_op == ReduceOp.SUM: + return 'c_allreduce_sum' + elif reduce_op == ReduceOp.MAX: + return 'c_allreduce_max' + elif reduce_op == ReduceOp.MIN: + return 'c_allreduce_min' + elif reduce_op == ReduceOp.PROD: + return 'c_allreduce_prod' + + raise ValueError("Unknown reduce_op type for {}.".format(func_name)) diff --git a/python/paddle/distributed/communication/stream/all_reduce.py b/python/paddle/distributed/communication/stream/all_reduce.py index e4cfa6d3218c23e8cb2fbd43aff23cfe92e85aa4..965a6ae89008a3c362afa7a171572df0d8f19427 100644 --- a/python/paddle/distributed/communication/stream/all_reduce.py +++ b/python/paddle/distributed/communication/stream/all_reduce.py @@ -13,12 +13,16 @@ # limitations under the License. import paddle.fluid.framework as framework -from paddle.distributed import collective +import paddle.fluid.data_feeder as data_feeder +import paddle.fluid.layer_helper as layer_helper +from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp +from paddle.distributed.communication.group import _get_global_group def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream): - op_type = collective._get_reduce_op(op, "all_reduce") - group = collective._get_default_group() if group is None else group + op_type = _get_reduce_op(op, "all_reduce") + + group = _get_global_group() if group is None else group if use_calc_stream: return group.process_group.allreduce_on_calc_stream(tensor, op_type) @@ -29,8 +33,34 @@ def _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream): return task +def _all_reduce_in_static_mode(tensor, op, group, sync_op, use_calc_stream): + data_feeder.check_variable_and_dtype(tensor, 'tensor', [ + 'float16', 'float32', 'float64', 'int32', 'int64', 'int8', 'uint8', + 'bool' + ], 'all_reduce') + + op_type = _get_reduce_op(op, "all_reduce") + ring_id = 0 if group is None else group.id + + if not isinstance(ring_id, int): + raise ValueError("The type of 'ring_id' for all_reduce should be int.") + + # TODO: Support task and use task.wait in static mode + # Use use_calc_stream rather than sync_op + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op(type=op_type, + inputs={'X': [tensor]}, + outputs={'Out': [tensor]}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': sync_op + }) + + return None + + def all_reduce(tensor, - op=collective.ReduceOp.SUM, + op=ReduceOp.SUM, group=None, sync_op=True, use_calc_stream=False): @@ -41,7 +71,7 @@ def all_reduce(tensor, Args: tensor (Tensor): The input tensor on each rank. The result will overwrite this tenor after communication. Support float16, float32, float64, int32 or int64 as the input data type. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The reduction used. If none is given, use ReduceOp.SUM as default. group (Group, optional): Communicate in which group. If none is given, use the global group as default. sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default. use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This @@ -50,9 +80,6 @@ def all_reduce(tensor, Returns: Return a task object. - Warning: - This API only supports the dygraph mode now. - Examples: .. code-block:: python @@ -84,7 +111,6 @@ def all_reduce(tensor, if framework.in_dygraph_mode(): return _all_reduce_in_dygraph(tensor, op, group, sync_op, use_calc_stream) - - raise RuntimeError( - "paddle.distributed.stream.all_reduce is only supported in dygraph mode now." - ) + else: + return _all_reduce_in_static_mode(tensor, op, group, sync_op, + use_calc_stream) diff --git a/python/paddle/distributed/fleet/base/topology.py b/python/paddle/distributed/fleet/base/topology.py index 0f72bfe9be28dd8e0d167bf393ec9ccd4c4119aa..b841542312ef88f47910dd36f55b0e5f211cac58 100644 --- a/python/paddle/distributed/fleet/base/topology.py +++ b/python/paddle/distributed/fleet/base/topology.py @@ -377,8 +377,8 @@ class _CommunicateGroup(object): def set_comm_group(self, group_name, group_rank, group_size, ring_id, group_ranks): - group = paddle.distributed.collective.Group(group_rank, group_size, - ring_id, group_ranks) + group = paddle.distributed.collective.Group(group_rank, ring_id, + group_ranks) self.groups[group_name] = group def get_group(self, group_name): diff --git a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py index 3506851e1db347fd810f5caf1fee77ad5f70f073..18e7b6617783e295070886852c75390e0eb4d339 100644 --- a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py +++ b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py @@ -22,7 +22,7 @@ from paddle.fluid.layer_helper import LayerHelper from paddle.fluid.data_feeder import check_variable_and_dtype from paddle.fluid.dygraph import layers from paddle.distributed import collective -from ....communication.comm_utils import ReduceOp +from ....communication.reduce import ReduceOp from paddle.fluid.data_feeder import check_dtype import paddle.fluid.dygraph_utils as dygraph_utils diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 8a22526d3c2e50b77800904ae9b536226bc258a7..19a24488f9442943d361fda9f00624e2d6c1a071 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -43,6 +43,7 @@ from paddle.distributed.collective import _set_default_store from paddle.distributed.collective import _new_process_group_impl from paddle.distributed.collective import Group from paddle.distributed.collective import _set_group_map_backend +from paddle.distributed.communication.group import _add_new_group __all__ = [] @@ -258,15 +259,11 @@ def init_parallel_env(): _default_group_name, pg_options=None) ranks = list(range(world_size)) - group = Group(rank, - world_size, - id=0, - ranks=ranks, - pg=pg, - name=_default_group_name) + group = Group(rank, 0, ranks, pg=pg, name=_default_group_name) _set_group_map_by_name(_default_group_name, group) _set_group_map(0, group) _set_group_map_backend(group, backend) + _add_new_group(group) parallel_helper._set_parallel_ctx(True) paddle.distributed.barrier(group=group) diff --git a/python/paddle/incubate/distributed/models/moe/moe_layer.py b/python/paddle/incubate/distributed/models/moe/moe_layer.py index 58b026a3b2a306ac40eb6f05b11a71f33b546712..7c11a3e6393cc7824de56fd6ba9e9f54715008e9 100644 --- a/python/paddle/incubate/distributed/models/moe/moe_layer.py +++ b/python/paddle/incubate/distributed/models/moe/moe_layer.py @@ -265,7 +265,6 @@ class MoELayer(nn.Layer): from paddle.distributed import fleet moe_group = Group(fleet.worker_index(), - fleet.worker_num(), 0, list(range(fleet.worker_num()))) mp_group = None