diff --git a/python/paddle/distributed/auto_parallel/process_group.py b/python/paddle/distributed/auto_parallel/process_group.py index 5b0d5e286ff7790af6767eceea84d5833580c297..10ff0d36fcee9554d3a398013b4b3de554fd0972 100644 --- a/python/paddle/distributed/auto_parallel/process_group.py +++ b/python/paddle/distributed/auto_parallel/process_group.py @@ -151,7 +151,7 @@ class ProcessGroup: tmp = paddle.to_tensor( [1], dtype="int32") if _non_static_mode() else fill_constant( [0], dtype="int32", value="1") - paddle.distributed.all_reduce(tmp, use_calc_stream=True, group=self) + paddle.distributed.all_reduce(tmp, sync_op=True, group=self) paddle.distributed.wait(tmp, group=self) paddle.enable_static() diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 5f263ba8f0b17238991ccc945a2837ae5c46f076..e1ee362cadfd7bd92d7863cbd3d212a6d4b81883 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -414,7 +414,7 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): paddle.distributed.init_parallel_env() tindata = paddle.randn(shape=[2, 3]) gp = paddle.distributed.new_group([2,4,6]) - paddle.distributed.all_reduce(tindata, group=gp, use_calc_stream=False) + paddle.distributed.all_reduce(tindata, group=gp, sync_op=False) """ global _custom_gid @@ -521,7 +521,7 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): tmp = paddle.to_tensor( [1], dtype="int32") if _non_static_mode() else fill_constant( [0], dtype="int32", value="1") - paddle.distributed.all_reduce(tmp, use_calc_stream=True) + paddle.distributed.all_reduce(tmp, sync_op=True) paddle.distributed.wait(tmp) return gp @@ -617,7 +617,7 @@ def wait(tensor, group=None, use_calc_stream=True): paddle.distributed.init_parallel_env() tindata = paddle.randn(shape=[2, 3]) - paddle.distributed.all_reduce(tindata, use_calc_stream=True) + paddle.distributed.all_reduce(tindata, sync_op=True) paddle.distributed.wait(tindata) """ @@ -665,7 +665,7 @@ def _sync_comm_stream(tensor, ring_id=0): ) -def broadcast(tensor, src, group=None, use_calc_stream=True): +def broadcast(tensor, src, group=None, sync_op=True): """ Broadcast a tensor from the source to all others. @@ -681,9 +681,8 @@ def broadcast(tensor, src, group=None, use_calc_stream=True): tensor (Tensor): The Tensor to send if current rank is the source, or the Tensor to receive otherwise. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool. src (int): The source rank. - group (Group): The group instance return by new_group or None for global default group. - use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False). - Default to True. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. Returns: None. @@ -716,12 +715,13 @@ def broadcast(tensor, src, group=None, use_calc_stream=True): gsrc = group.get_group_rank(src) assert gsrc >= 0, ("src rank out of group, need global rank") task = group.process_group.broadcast(tensor, gsrc) - if use_calc_stream: + if sync_op: task.wait() return None else: return task + use_calc_stream = sync_op ring_id = ring_id = 0 if group is None else group.id gsrc = src if group is None else group.get_group_rank(src) assert gsrc >= 0, ("src rank out of group, need global rank") @@ -748,7 +748,7 @@ def broadcast(tensor, src, group=None, use_calc_stream=True): }) -def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): +def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): """ Reduce a tensor over all ranks so that all get the result. @@ -764,10 +764,9 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): Args: tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default value is ReduceOp.SUM. - group (Group): The group instance return by new_group or None for global default group. - use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False). - Default to True. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Wether this op is a sync op. Default value is True. Returns: None. @@ -795,12 +794,13 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): op_type = _get_reduce_op(op, "all_reduce") group = _get_default_group() if group is None else group task = group.process_group.allreduce(tensor, op_type) - if use_calc_stream: + if sync_op: task.wait() return None else: return task + use_calc_stream = sync_op ring_id = 0 if group is None else group.id if _non_static_mode(): if op == ReduceOp.SUM: @@ -846,7 +846,7 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): }) -def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): +def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True): """ Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented @@ -862,10 +862,9 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type should be float16, float32, float64, int32, int64, int8, uint8 or bool. dst (int): The destination rank id. - op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default value is ReduceOp.SUM. - group (Group): The group instance return by new_group or None for global default group. - use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False). - Default to True. + op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. Returns: None. @@ -896,12 +895,13 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): gdst = group.get_group_rank(dst) assert gdst >= 0, ("dst rank out of group, need global rank") task = group.process_group.reduce(tensor, gdst, op_type) - if use_calc_stream: + if sync_op: task.wait() return None else: return task + use_calc_stream = sync_op ring_id = 0 if group is None else group.id gdst = dst if group is None else group.get_group_rank(dst) assert gdst >= 0, ("dst rank out of group, need global rank") @@ -953,7 +953,7 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): }) -def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): +def all_gather(tensor_list, tensor, group=None, sync_op=True): """ Gather tensors from all participators and all get the result. As shown @@ -971,9 +971,8 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128. tensor (Tensor): The Tensor to send. Its data type should be float16, float32, float64, int32, int64, int8, uint8, bool, complex64 or complex128. - group (Group): The group instance return by new_group or None for global default group. - use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False). - Default to True. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. Returns: None. @@ -1027,6 +1026,7 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): tensor_list.extend(list_of_tensor) return + use_calc_stream = sync_op ring_id = 0 if group is None else group.id nranks = _get_global_group().nranks if group is None else group.nranks @@ -1137,7 +1137,7 @@ def all_gather_object(object_list, obj, group=None): _convert_tensor_to_object(tensor, list_len_of_tensor[i])) -def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True): +def scatter(tensor, tensor_list=None, src=0, group=None, sync_op=True): """ Scatter a tensor to all participators. As shown below, one process is started with a GPU and the source of the scatter @@ -1154,9 +1154,8 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True): tensor_list (list|tuple): A list/tuple of Tensors to scatter. Every element in the list must be a Tensor whose data type should be float16, float32, float64, int32, int64, int8, uint8 or bool. Default value is None. src (int): The source rank id. Default value is 0. - group (Group): The group instance return by new_group or None for global default group. - use_calc_stream (bool): Wether to use calculation stream (True) or communication stream (False). - Default to True. + group (Group, optional): The group instance return by new_group or None for global default group. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. Returns: None. @@ -1206,12 +1205,13 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True): temp = paddle.concat(tensor_list, axis=0) if in_dygraph_mode(): task = group.process_group.scatter(temp, tensor, gsrc) - if use_calc_stream: + if sync_op: task.wait() return None else: return task + use_calc_stream = sync_op if _non_static_mode(): return _legacy_C_ops.c_scatter(temp, tensor, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id, @@ -1233,7 +1233,7 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True): }) -def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True): +def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True): """ Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list. As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1. @@ -1251,8 +1251,8 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True): out_tensor_list (list): A list of output Tensors. The data type of its elements should be the same as the data type of the input Tensors. group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Whether to use calculation stream (True) or communication stream. Default: True. - + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + Returns: None. @@ -1301,6 +1301,7 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True): out_tensor_list.extend(paddle.split(out, nranks, 0)) return + use_calc_stream = sync_op if _non_static_mode(): out = _legacy_C_ops.alltoall(temp, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id) @@ -1339,7 +1340,7 @@ def alltoall_single(in_tensor, in_split_sizes=None, out_split_sizes=None, group=None, - use_calc_stream=True): + sync_op=True): """ Scatter a single input tensor to all participators and gather the received tensors in out_tensor. @@ -1354,11 +1355,11 @@ def alltoall_single(in_tensor, out_split_sizes (list[int], optional): Split sizes of ``out_tensor`` for dim[0]. If not given, dim[0] of ``out_tensor`` must be divisible by group size and ``out_tensor`` will be gathered averagely from all participators. Default: None. group (Group, optional): The group instance return by ``new_group`` or None for global default group. Default: None. - use_calc_stream (bool, optional): Whether to use calculation stream (True) or communication stream. Default: True. - + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + Returns: - None, if ``use_calc_stream`` is set to ``True``; ``Task`` of ``group``, if ``use_calc_stream`` is set to ``False``. - + None, if ``sync_op`` is set to ``True``; ``Task`` of ``group``, if ``sync_op`` is set to ``False``. + Examples: .. code-block:: python @@ -1396,7 +1397,7 @@ def alltoall_single(in_tensor, output, in_split_sizes, out_split_sizes, - use_calc_stream=False, + sync_op=False, group=group) task.wait() print(output) @@ -1419,7 +1420,7 @@ def alltoall_single(in_tensor, task = group.process_group.alltoall_single(in_tensor, out_tensor, in_split_sizes, out_split_sizes) - if use_calc_stream: + if sync_op: task.wait() return else: @@ -1430,7 +1431,7 @@ def _get_group_rank(global_rank, group=None): return global_rank if group is None else group.get_group_rank(global_rank) -def send(tensor, dst=0, group=None, use_calc_stream=True): +def send(tensor, dst=0, group=None, sync_op=True): """ Send a tensor to the receiver. @@ -1439,8 +1440,8 @@ def send(tensor, dst=0, group=None, use_calc_stream=True): should be float16, float32, float64, int32, int64, int8, uint8 or bool. dst (int): The destination rank id. group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Whether to use calculate stream or communication stream. Default: True. - + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + Returns: None. @@ -1469,12 +1470,13 @@ def send(tensor, dst=0, group=None, use_calc_stream=True): backend = _group_map_backend[group] assert backend != 'gloo', ("backend gloo is not supported yet") task = group.process_group.send(tensor, dst) - if use_calc_stream: + if sync_op: task.wait() return None else: return task + use_calc_stream = sync_op ring_id = 0 if group is None else group.id if _non_static_mode(): @@ -1495,7 +1497,7 @@ def send(tensor, dst=0, group=None, use_calc_stream=True): }) -def recv(tensor, src=0, group=None, use_calc_stream=True): +def recv(tensor, src=0, group=None, sync_op=True): """ Receive a tensor to the sender. @@ -1504,8 +1506,8 @@ def recv(tensor, src=0, group=None, use_calc_stream=True): should be float16, float32, float64, int32, int64, int8, uint8 or bool. src (int): The source rank id. group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Whether to use calculate stream or communication stream. Default: True. - + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + Returns: None. @@ -1535,12 +1537,13 @@ def recv(tensor, src=0, group=None, use_calc_stream=True): backend = _group_map_backend[group] assert backend != 'gloo', ("backend gloo is not supported yet") task = group.process_group.recv(tensor, src) - if use_calc_stream: + if sync_op: task.wait() return None else: return task + use_calc_stream = sync_op ring_id = 0 if group is None else group.id if _non_static_mode(): @@ -1811,7 +1814,7 @@ def reduce_scatter(tensor, tensor_list, op=ReduceOp.SUM, group=None, - use_calc_stream=True): + sync_op=True): """ Reduces, then scatters a list of tensors to all processes in a group @@ -1822,13 +1825,13 @@ def reduce_scatter(tensor, op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM. group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Whether this op should be an async op. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. Returns: - Async task handle, if use_calc_stream is set to False. - None, if use_calc_stream or if not part of the group. - - Warning: + Async task handle, if sync_op is set to False. + None, if sync_op or if not part of the group. + + Warning: This API only supports the dygraph mode. @@ -1866,7 +1869,7 @@ def reduce_scatter(tensor, temp = paddle.concat(tensor_list, axis=0) task = group.process_group._reduce_scatter_base(tensor, temp, op_type) - if use_calc_stream: + if sync_op: task.wait() return None else: @@ -1879,7 +1882,7 @@ def _reduce_scatter_base(output, input, op=ReduceOp.SUM, group=None, - use_calc_stream=True): + sync_op=True): """ Reduces, then scatters a flattened tensor to all processes in a group. @@ -1890,11 +1893,11 @@ def _reduce_scatter_base(output, op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD): Optional. The operation used. Default: ReduceOp.SUM. group (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. - use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream (False). - Default to True. + sync_op (bool, optional): Whether this op is a sync op. The default value is True. + Returns: - Async task handle, if use_calc_stream is set to False. - None, if use_calc_stream or if not part of the group. + Async task handle, if sync_op is set to False. + None, if sync_op or if not part of the group. Examples: .. code-block:: python @@ -1925,7 +1928,7 @@ def _reduce_scatter_base(output, op_type = _get_reduce_op(op, "_reduce_scatter_base") group = _get_default_group() if group is None else group task = group.process_group._reduce_scatter_base(output, input, op_type) - if use_calc_stream: + if sync_op: task.wait() return None else: diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py index 8a6ec33b39b7395236559ecdc6325a510135fea3..d34f1cad2703322fe10e17591545641aa3708115 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/dygraph_sharding_optimizer.py @@ -146,7 +146,7 @@ class DygraphShardingOptimizer(object): # instead of the relative logic rank id within group src=self._hcg.get_sharding_parallel_group().ranks[rank], group=self._hcg.get_sharding_parallel_group(), - use_calc_stream=True) + sync_op=True) def _update_trainable(self): """ diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py index 3359e63b1deff8183faaca8e139e2d431eb710dd..ad5cbf83ecb862b264ba1f3e7b410de898d6e839 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/sharding_optimizer_stage2.py @@ -150,7 +150,7 @@ class ShardingOptimizerStage2(Optimizer): broadcast(p, src=self._global_root_rank, group=self.group, - use_calc_stream=True) + sync_op=True) # Multi stream operation will be supported later wait(tensor=p, group=self.group, use_calc_stream=True) @@ -415,7 +415,7 @@ class ShardingOptimizerStage2(Optimizer): broadcast(tensor=internal_storage.buffer, src=self.group.ranks[dst_rank], group=self.group, - use_calc_stream=True) + sync_op=True) # Multi stream operation will be supported later wait(tensor=internal_storage.buffer, diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index 537885bfad349779fe1b791c5693f2c8c5b114bb..02a1b421526dfc88a39edf1b82c394f2c816187a 100755 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -377,18 +377,18 @@ class PipelineParallel(MetaParallelBase): 1) if loss.dtype == paddle.float32 else paddle.to_tensor(0) paddle.distributed.broadcast(is_fp32, src=self.global_rank, - use_calc_stream=True, + sync_op=True, group=self.pp_group) paddle.distributed.broadcast(loss, src=self.global_rank, - use_calc_stream=True, + sync_op=True, group=self.pp_group) else: is_fp32 = paddle.to_tensor(1) paddle.distributed.broadcast( is_fp32, src=self._hcg.get_rank_from_stage(self.num_stages - 1), - use_calc_stream=True, + sync_op=True, group=self.pp_group) loss = paddle.zeros(shape=[ 1 @@ -397,7 +397,7 @@ class PipelineParallel(MetaParallelBase): paddle.distributed.broadcast( loss, src=self._hcg.get_rank_from_stage(self.num_stages - 1), - use_calc_stream=True, + sync_op=True, group=self.pp_group) return loss diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py index 7bdbe2ce32e47b1b0efb757ec96ed651b2c45eb7..19d3245982a96aedaf37e1d757ba690cbdf4178f 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_optimizer_stage2.py @@ -155,7 +155,7 @@ class GroupShardedOptimizerStage2(Optimizer): broadcast(p, src=self._global_root_rank, group=self._group, - use_calc_stream=True) + sync_op=True) def _generate_master_params(self, trainable_params): if self.offload: @@ -413,4 +413,4 @@ class GroupShardedOptimizerStage2(Optimizer): broadcast(tensor=internal_storage.buffer, src=self._group.ranks[dst_rank], group=self._group, - use_calc_stream=True) + sync_op=True) diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py index cf1ca83d5f636fa0cfe132949354c5ac278e7d6c..d47f50b292db26e0a0930a58623cb1d05b3ecaeb 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py @@ -287,7 +287,7 @@ class GroupShardedStage2(nn.Layer): collective.broadcast(buffer, self._global_root_rank, self._group, - use_calc_stream=True) + sync_op=True) def __getattr__(self, name): """Forward missing attributes to wrapped layer.""" diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py index abc5e0549ae668e62b6bade4b350163a0d7e9d48..b628378140f785f678f8939171f4ea1370ab2cf2 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage3.py @@ -181,7 +181,7 @@ class GroupShardedStage3(nn.Layer): collective.broadcast(p, src=self._global_root_rank, group=self._group, - use_calc_stream=True) + sync_op=True) def _clear_gradients(self): assert len(self._trainable_params.keys()) > 0 @@ -446,7 +446,7 @@ class GroupShardedStage3(nn.Layer): collective.broadcast(buffer, self._global_root_rank, self._group, - use_calc_stream=True) + sync_op=True) def __getattr__(self, name): """Forward missing attributes to wrapped layer.""" diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py index 7834e6d93984e3a48dfa92b7f306855a2a69f936..a08e67456e5e6f77e3dd1358d004927b47f75f40 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage2.py @@ -285,7 +285,7 @@ class ShardingStage2(nn.Layer): dist.broadcast(buffer, self._global_root_rank, self._group, - use_calc_stream=True) + sync_op=True) # Multi stream operation will be supported later dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) @@ -340,7 +340,7 @@ class ShardingStage2(nn.Layer): tensor=param.grad, dst=self._group.ranks[dst_rank], group=self._group, - use_calc_stream=True), + sync_op=True), callback=cleanup)) # Multi stream operation will be supported later @@ -396,7 +396,7 @@ class ShardingStage2(nn.Layer): tensor=grad_storage.buffer, dst=self._group.ranks[grad_storage.destination], group=self._group, - use_calc_stream=True), + sync_op=True), callback=cleanup)) # Multi stream operation will be supported later diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py index 67d48c8abba1b4af3abdbbac354d8ef6e6629547..5e0c3743dd3f88ff5ada250cb5697da8cc14f8dd 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/sharding_stage3.py @@ -170,7 +170,7 @@ class ShardingStage3(nn.Layer): dist.broadcast(p, src=self._global_root_rank, group=self._group, - use_calc_stream=True) + sync_op=True) # Multi stream operation will be supported later dist.wait(tensor=p, group=self._group, use_calc_stream=True) @@ -435,7 +435,7 @@ class ShardingStage3(nn.Layer): dist.broadcast(buffer, self._global_root_rank, self._group, - use_calc_stream=True) + sync_op=True) # Multi stream operation will be supported later dist.wait(tensor=buffer, group=self._group, use_calc_stream=True) @@ -478,7 +478,7 @@ class ShardingStage3(nn.Layer): grad_storage.buffer.scale_(scale=self._world_size_scaling) dist.all_reduce(tensor=grad_storage.buffer, group=self._group, - use_calc_stream=True) + sync_op=True) dist.wait(tensor=grad_storage.buffer, group=self._group, use_calc_stream=True) @@ -541,7 +541,7 @@ class ShardingStage3(nn.Layer): # Only support sync allreduce current rank's layer now dist.all_reduce(tensor=full_grad, group=self._group, - use_calc_stream=True) + sync_op=True) dist.wait(tensor=full_grad, group=self._group, use_calc_stream=True) diff --git a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py index b7b5bc8608be5bcb1b4b6b7cde4857c9f6ac7002..e7bd434b94fd32c19daa99defefe979058e99355 100644 --- a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py +++ b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py @@ -94,7 +94,7 @@ def _broadcast_data_help(data, shape, dtype, hcg): paddle.distributed.broadcast(shape_gpu, src=src_rank, group=model_parallel_group, - use_calc_stream=True) + sync_op=True) if mp_rank != 0: input_data = paddle.zeros(shape_gpu, dtype=dtype) @@ -104,7 +104,7 @@ def _broadcast_data_help(data, shape, dtype, hcg): paddle.distributed.broadcast(input_data, src=src_rank, group=model_parallel_group, - use_calc_stream=True) + sync_op=True) if mp_rank != 0: if in_dygraph_mode(): @@ -186,7 +186,7 @@ def sharding_reduce_gradients(parameter_list, hcg): paddle.distributed.all_reduce( param.grad, group=hcg.get_sharding_parallel_group(), - use_calc_stream=True) + sync_op=True) elif _in_legacy_dygraph(): g_var = param._grad_ivar() diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 91f22842a456120d1d73e7c5facc6aecc5f0272a..7b1ea80d872955d7a207f5f62b1acb810203d38d 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -420,7 +420,7 @@ def sync_params_buffers(model, paddle.distributed.broadcast(coalesced_var, src=src_rank, group=comm_group, - use_calc_stream=True) + sync_op=True) for coalesced_var, origin_vars, var_shapes in coalesced_vars: var_len = [np.prod(v_shape) for v_shape in var_shapes] diff --git a/python/paddle/fluid/tests/unittests/collective/collective_allreduce_new_group_api.py b/python/paddle/fluid/tests/unittests/collective/collective_allreduce_new_group_api.py index 9d4e21aefff56c975825b1758ec4031568a32fff..b773826169963b6f163aa6f2271ca21d38e32c96 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_allreduce_new_group_api.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_allreduce_new_group_api.py @@ -49,9 +49,7 @@ class TestCollectiveAllreduceNewGroupAPI(TestCollectiveAPIRunnerBase): shape=[10, 1000], dtype='float32') gp = paddle.distributed.new_group([0, 1]) - paddle.distributed.all_reduce(tindata, - group=gp, - use_calc_stream=True) + paddle.distributed.all_reduce(tindata, group=gp, sync_op=True) return [tindata] diff --git a/python/paddle/fluid/tests/unittests/collective/collective_alltoall_single.py b/python/paddle/fluid/tests/unittests/collective/collective_alltoall_single.py index cb6777d20bc25bbf50fcdd0c1820fd55a18ffb4b..3b52ac0e03ff6f99cc635927bcc2b4a9c2980bfc 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_alltoall_single.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_alltoall_single.py @@ -69,7 +69,7 @@ class TestCollectiveAllToAllSingle(unittest.TestCase): output, in_split_sizes, out_split_sizes, - use_calc_stream=False, + sync_op=False, group=group) task.wait() diff --git a/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py b/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py index 0e36296e4089cf17200e02f074dd2500bcc67044..f10aff4752bd2646e9bd8a6c2d8f381abd36b7d8 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_reduce_scatter.py @@ -83,8 +83,9 @@ class TestCollectiveReduceScatter(unittest.TestCase): # [1, 2, 3, 4] # Rank-1 output = paddle.empty(shape=[2], dtype=input.dtype) - task = paddle.distributed.collective._reduce_scatter_base( - output, input, use_calc_stream=False) + task = paddle.distributed.collective._reduce_scatter_base(output, + input, + sync_op=False) task.wait() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_communicate_group.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_communicate_group.py index f290705c312e09b77e9359bc7fe29d11648b7deb..d84ad27e2636d238d5dba282fdb88f34736e4364 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_communicate_group.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_communicate_group.py @@ -53,24 +53,21 @@ class TestNewGroupAPI(object): paddle.distributed.scatter(result, [self.tensor2, self.tensor1], src=dp_src_rank, group=dp_gp, - use_calc_stream=True) + sync_op=True) if dp_rank == 0: assert np.array_equal(result, self.tensor2) elif dp_rank == 1: assert np.array_equal(result, self.tensor1) print("test scatter api ok") - paddle.distributed.broadcast(result, - src=1, - group=dp_gp, - use_calc_stream=True) + paddle.distributed.broadcast(result, src=1, group=dp_gp, sync_op=True) assert np.array_equal(result, self.tensor1) print("test broadcast api ok") paddle.distributed.reduce(result, dst=dp_src_rank, group=dp_gp, - use_calc_stream=True) + sync_op=True) if dp_rank == 0: assert np.array_equal(result, paddle.add(self.tensor1, self.tensor1)) @@ -78,7 +75,7 @@ class TestNewGroupAPI(object): assert np.array_equal(result, self.tensor1) print("test reduce api ok") - paddle.distributed.all_reduce(result, use_calc_stream=True) + paddle.distributed.all_reduce(result, sync_op=True) assert np.array_equal( result, paddle.add(paddle.add(self.tensor1, self.tensor1), self.tensor1)) @@ -92,7 +89,7 @@ class TestNewGroupAPI(object): paddle.distributed.all_gather(result, self.tensor1, group=dp_gp, - use_calc_stream=True) + sync_op=True) assert np.array_equal(result[0], self.tensor1) assert np.array_equal(result[1], self.tensor1) print("test all_gather api ok") diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/new_group.py b/python/paddle/fluid/tests/unittests/collective/fleet/new_group.py index 56ef510c3047fee701fc618d21a11d7639e48256..28a2568f8e594c36e58110024bb386c86edd25fd 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/new_group.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/new_group.py @@ -36,21 +36,18 @@ class TestNewGroupAPI(object): paddle.distributed.scatter(result, [self.tensor2, self.tensor1], src=0, group=gp, - use_calc_stream=True) + sync_op=True) if gp.rank == 0: assert np.array_equal(result, self.tensor2) elif gp.rank == 1: assert np.array_equal(result, self.tensor1) print("test scatter api ok") - paddle.distributed.broadcast(result, - src=1, - group=gp, - use_calc_stream=True) + paddle.distributed.broadcast(result, src=1, group=gp, sync_op=True) assert np.array_equal(result, self.tensor1) print("test broadcast api ok") - paddle.distributed.reduce(result, dst=0, group=gp, use_calc_stream=True) + paddle.distributed.reduce(result, dst=0, group=gp, sync_op=True) if gp.rank == 0: assert np.array_equal(result, paddle.add(self.tensor1, self.tensor1)) @@ -58,7 +55,7 @@ class TestNewGroupAPI(object): assert np.array_equal(result, self.tensor1) print("test reduce api ok") - paddle.distributed.all_reduce(result, use_calc_stream=True) + paddle.distributed.all_reduce(result, sync_op=True) assert np.array_equal( result, paddle.add(paddle.add(self.tensor1, self.tensor1), self.tensor1)) @@ -72,7 +69,7 @@ class TestNewGroupAPI(object): paddle.distributed.all_gather(result, self.tensor1, group=gp, - use_calc_stream=True) + sync_op=True) assert np.array_equal(result[0], self.tensor1) assert np.array_equal(result[1], self.tensor1) print("test all_gather api ok") diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py index 1635eb6c951bd7ef8174d1fbff2f7a69b69696f9..ac6bbcd3ce82a37ff76845f671f6c86e10376576 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py @@ -90,13 +90,13 @@ class TestProcessGroupFp32(unittest.TestCase): if pg.rank() == 0: task = dist.all_reduce(tensor_x, dist.ReduceOp.MAX, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_x, max_result) else: task = dist.all_reduce(tensor_y, dist.ReduceOp.MAX, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_y, max_result) @@ -115,13 +115,13 @@ class TestProcessGroupFp32(unittest.TestCase): if pg.rank() == 0: task = dist.all_reduce(tensor_x, dist.ReduceOp.MIN, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_x, min_result) else: task = dist.all_reduce(tensor_y, dist.ReduceOp.MIN, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_y, min_result) @@ -140,13 +140,13 @@ class TestProcessGroupFp32(unittest.TestCase): if pg.rank() == 0: task = dist.all_reduce(tensor_x, dist.ReduceOp.PROD, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_x, prod_result) else: task = dist.all_reduce(tensor_y, dist.ReduceOp.PROD, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_y, prod_result) @@ -162,7 +162,7 @@ class TestProcessGroupFp32(unittest.TestCase): broadcast_result = paddle.assign(tensor_x) if pg.rank() == 0: - task = dist.broadcast(tensor_x, 0, use_calc_stream=False) + task = dist.broadcast(tensor_x, 0, sync_op=False) task.synchronize() paddle.device.cuda.synchronize() assert task.is_completed() @@ -205,9 +205,7 @@ class TestProcessGroupFp32(unittest.TestCase): paddle.empty_like(tensor_x), paddle.empty_like(tensor_x) ] - task = dist.all_gather(tensor_out_list, - tensor_y, - use_calc_stream=False) + task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False) paddle.device.cuda.synchronize() tensor_out = paddle.concat(tensor_out_list) out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) @@ -224,9 +222,7 @@ class TestProcessGroupFp32(unittest.TestCase): # rank 1 else: tensor_out_list = [] - task = dist.all_gather(tensor_out_list, - tensor_y, - use_calc_stream=False) + task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False) paddle.device.cuda.synchronize() tensor_out = paddle.concat(tensor_out_list) out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) @@ -310,11 +306,11 @@ class TestProcessGroupFp32(unittest.TestCase): tensor_y = paddle.to_tensor(y) sum_result = tensor_x + tensor_y if pg.rank() == 0: - task = dist.reduce(tensor_x, 0, use_calc_stream=True) + task = dist.reduce(tensor_x, 0, sync_op=True) paddle.device.cuda.synchronize() # rank 1 else: - task = dist.reduce(tensor_y, 0, use_calc_stream=False) + task = dist.reduce(tensor_y, 0, sync_op=False) task.wait() paddle.device.cuda.synchronize() if pg.rank() == 0: @@ -335,14 +331,14 @@ class TestProcessGroupFp32(unittest.TestCase): task = dist.reduce(tensor_x, 0, dist.ReduceOp.MAX, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_x, max_result) else: task = dist.reduce(tensor_y, 0, dist.ReduceOp.MAX, - use_calc_stream=False) + sync_op=False) task.wait() print("test reduce max api ok") @@ -361,14 +357,14 @@ class TestProcessGroupFp32(unittest.TestCase): task = dist.reduce(tensor_x, 0, dist.ReduceOp.MIN, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_x, min_result) else: task = dist.reduce(tensor_y, 0, dist.ReduceOp.MIN, - use_calc_stream=False) + sync_op=False) task.wait() print("test reduce min api ok") @@ -387,14 +383,14 @@ class TestProcessGroupFp32(unittest.TestCase): task = dist.reduce(tensor_x, 0, dist.ReduceOp.PROD, - use_calc_stream=False) + sync_op=False) task.wait() assert np.array_equal(tensor_x, prod_result) else: task = dist.reduce(tensor_y, 0, dist.ReduceOp.PROD, - use_calc_stream=False) + sync_op=False) task.wait() print("test reduce prod api ok") @@ -408,14 +404,12 @@ class TestProcessGroupFp32(unittest.TestCase): tensor_y = paddle.to_tensor(y) if pg.rank() == 0: in_1, in_2 = paddle.split(tensor_x, 2) - task = dist.scatter(tensor_y, [in_1, in_2], - 0, - use_calc_stream=True) + task = dist.scatter(tensor_y, [in_1, in_2], 0, sync_op=True) #task.wait() paddle.device.cuda.synchronize() # rank 1 else: - task = dist.scatter(tensor_y, [], 0, use_calc_stream=False) + task = dist.scatter(tensor_y, [], 0, sync_op=False) task.wait() paddle.device.cuda.synchronize() out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) @@ -436,10 +430,10 @@ class TestProcessGroupFp32(unittest.TestCase): tensor_y = paddle.to_tensor(y) if pg.rank() == 0: - task = dist.send(tensor_x, 1, use_calc_stream=False) + task = dist.send(tensor_x, 1, sync_op=False) task.wait() else: - task = dist.recv(tensor_y, 0, use_calc_stream=False) + task = dist.recv(tensor_y, 0, sync_op=False) task.wait() assert np.array_equal(tensor_y, tensor_x) @@ -454,9 +448,9 @@ class TestProcessGroupFp32(unittest.TestCase): tensor_y = paddle.to_tensor(y) if pg.rank() == 0: - task = dist.send(tensor_x, 1, use_calc_stream=True) + task = dist.send(tensor_x, 1, sync_op=True) else: - task = dist.recv(tensor_y, 0, use_calc_stream=True) + task = dist.recv(tensor_y, 0, sync_op=True) assert np.array_equal(tensor_y, tensor_x) print("test send api ok")