diff --git a/python/paddle/distributed/auto_parallel/engine.py b/python/paddle/distributed/auto_parallel/engine.py index e9ed861106617b5f76418dcf736c7b3fc72bd348..171eab28da1f18c086359435d2c06c05a44cfce4 100644 --- a/python/paddle/distributed/auto_parallel/engine.py +++ b/python/paddle/distributed/auto_parallel/engine.py @@ -31,7 +31,7 @@ from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.fluid.executor import _to_name_str, global_scope from paddle.fluid.framework import Operator from paddle.fluid.framework import _current_expected_place as _get_device -from paddle.fluid.framework import _non_static_mode +from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.layers.utils import flatten from paddle.metric import Metric from paddle.static import InputSpec @@ -300,7 +300,7 @@ class Engine: return inputs_spec, labels_spec def _prepare_data_tensor(self, inputs_spec, labels_spec, inputs, labels): - if _non_static_mode() or self._dygraph_mode: + if in_dygraph_mode() or self._dygraph_mode: raise ValueError("Only support static graph mode.") if inputs_spec: @@ -512,7 +512,7 @@ class Engine: self._has_prepared[mode] = True def _build(self, mode): - if _non_static_mode() or self._dygraph_mode: + if in_dygraph_mode() or self._dygraph_mode: paddle.disable_static() self._dygraph_mode = True self._logger.info("Building model with 'to_static' method.") @@ -1713,7 +1713,7 @@ class Engine: self._build(mode) self._plan(mode) else: - if _non_static_mode() or self._dygraph_mode: + if in_dygraph_mode() or self._dygraph_mode: raise ValueError( "Please call `prepare()` or `fit()` or `evaluate()` or `predict()` before calling `cost()`." ) diff --git a/python/paddle/distributed/auto_parallel/process_group.py b/python/paddle/distributed/auto_parallel/process_group.py index 63e5e6ff4ce3ff0944790c674025db0c0e6551d4..d7e2638f102a052c5e029f20e11498b3b758c1db 100644 --- a/python/paddle/distributed/auto_parallel/process_group.py +++ b/python/paddle/distributed/auto_parallel/process_group.py @@ -17,8 +17,8 @@ from collections import OrderedDict import paddle import paddle.fluid.core as core from paddle import _legacy_C_ops +from paddle.fluid.framework import in_dygraph_mode -from ...fluid.framework import _non_static_mode from ...fluid.layers.tensor import fill_constant from ..collective import _get_global_env, _new_ring_id @@ -154,7 +154,7 @@ class ProcessGroup: ) tmp = ( paddle.to_tensor([1], dtype="int32") - if _non_static_mode() + if in_dygraph_mode() else fill_constant([0], dtype="int32", value="1") ) # use legacy ops diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 6b9075de20a0bf9f92b37e96e4d73181d57ea033..83777bbb2f924108ef24cc409181f81fd555e51f 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -18,7 +18,7 @@ import paddle # (TODO: GhostScreaming) It will be removed later. import paddle.fluid.core as core -from paddle.framework import _non_static_mode, in_dygraph_mode +from paddle.framework import in_dygraph_mode from .communication.group import Group, _add_new_group, is_initialized from .fleet.layers.mpu.mp_ops import _c_concat # noqa: F401 @@ -301,7 +301,7 @@ def new_group(ranks=None, backend=None, timeout=_default_timeout): # hang caused by cross-creation of new_group tmp = ( paddle.to_tensor([1], dtype="int32") - if _non_static_mode() + if in_dygraph_mode() else paddle.full([0], 1, dtype="int32") ) paddle.distributed.all_reduce(tmp, sync_op=True) diff --git a/python/paddle/distributed/communication/all_gather.py b/python/paddle/distributed/communication/all_gather.py index e6540a327426b9ec87cf342e2fba24691c262c64..45496c0e30d962244d93beebd3de9c95034ad182 100644 --- a/python/paddle/distributed/communication/all_gather.py +++ b/python/paddle/distributed/communication/all_gather.py @@ -18,7 +18,6 @@ import pickle import numpy as np import paddle -import paddle.distributed as dist import paddle.distributed.communication.stream as stream import paddle.fluid.framework as framework @@ -64,38 +63,7 @@ def all_gather(tensor_list, tensor, group=None, sync_op=True): print(tensor_list) # [[[4, 5, 6], [4, 5, 6]], [[1, 2, 3], [1, 2, 3]]] (2 GPUs) """ - if not framework._in_legacy_dygraph(): - return stream.all_gather(tensor_list, tensor, group, sync_op) - - # NOTE: uncomment code below when having fully complex support - # def convert_to_complex(list_of_tensor): - # list_of_complex = [] - # for tensor in list_of_tensor: - # list_of_complex.append(paddle.as_complex(tensor)) - # return list_of_complex - - # is_input_complex = (tensor.dtype == paddle.complex64 - # or tensor.dtype == paddle.complex128) - # if is_input_complex: - # tensor = paddle.as_real(tensor) - - # code below will be removed after we remove the old dygraph - if group is not None and not group.is_member(): - return - - ring_id = 0 if group is None else group.id - nranks = dist.get_world_size() - out = paddle._legacy_C_ops.c_allgather( - tensor, - 'use_calc_stream', - sync_op, - 'ring_id', - ring_id, - 'nranks', - nranks, - ) - tensor_list.clear() - tensor_list.extend(paddle.split(out, nranks, 0)) + return stream.all_gather(tensor_list, tensor, group, sync_op) def _convert_object_to_tensor(obj): diff --git a/python/paddle/distributed/communication/all_reduce.py b/python/paddle/distributed/communication/all_reduce.py index 0eea15db70ab9938e808f729d16bc304c179beea..423b7e6120531e948ee07c65f21a34b82bca3c85 100644 --- a/python/paddle/distributed/communication/all_reduce.py +++ b/python/paddle/distributed/communication/all_reduce.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework from paddle.distributed.communication.reduce import ReduceOp @@ -57,31 +55,6 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True): print(data) # [[5, 7, 9], [5, 7, 9]] (2 GPUs) """ - if not framework._in_legacy_dygraph(): - return stream.all_reduce( - tensor, op=op, group=group, sync_op=sync_op, use_calc_stream=False - ) - - # code below will be removed after we remove the old dygraph - if group is not None and not group.is_member(): - return - use_calc_stream = sync_op - ring_id = 0 if group is None else group.id - if op == ReduceOp.SUM: - return paddle._legacy_C_ops.c_allreduce_sum_( - tensor, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id - ) - elif op == ReduceOp.MAX: - return paddle._legacy_C_ops.c_allreduce_max_( - tensor, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id - ) - elif op == ReduceOp.MIN: - return paddle._legacy_C_ops.c_allreduce_min_( - tensor, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id - ) - elif op == ReduceOp.PROD: - return paddle._legacy_C_ops.c_allreduce_prod_( - tensor, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id - ) - else: - raise ValueError("Unknown parameter: {}.".format(op)) + return stream.all_reduce( + tensor, op=op, group=group, sync_op=sync_op, use_calc_stream=False + ) diff --git a/python/paddle/distributed/communication/all_to_all.py b/python/paddle/distributed/communication/all_to_all.py index d8465341aa627794776481e64317c7ec32e88ffa..8878273175d6922a4a901ba14fc80f37ac61de6f 100644 --- a/python/paddle/distributed/communication/all_to_all.py +++ b/python/paddle/distributed/communication/all_to_all.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True): @@ -59,22 +57,9 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True): # [[[1, 2, 3], [4, 5, 6]], [[13, 14, 15], [16, 17, 18]]] (2 GPUs, out for rank 0) # [[[7, 8, 9], [10, 11, 12]], [[19, 20, 21], [22, 23, 24]]] (2 GPUs, out for rank 1) """ - if not framework._in_legacy_dygraph(): - return stream.alltoall( - out_tensor_list, in_tensor_list, group, sync_op, False - ) - - # code below will be removed after we remove the old dygraph - if group is not None and not group.is_member(): - return - ring_id = 0 if group is None else group.id - temp = paddle.concat(in_tensor_list, axis=0) - nranks = len(in_tensor_list) - use_calc_stream = sync_op - out = paddle._legacy_C_ops.alltoall( - temp, 'use_calc_stream', use_calc_stream, 'ring_id', ring_id + return stream.alltoall( + out_tensor_list, in_tensor_list, group, sync_op, False ) - out_tensor_list.extend(paddle.split(out, nranks, 0)) def alltoall_single( @@ -149,13 +134,12 @@ def alltoall_single( # output for rank 1: [[0., 0.], [0., 0.], [1., 1.], [1., 1.]] """ - if not framework._in_legacy_dygraph(): - return stream.alltoall_single( - out_tensor, - in_tensor, - out_split_sizes, - in_split_sizes, - group, - sync_op, - False, - ) + return stream.alltoall_single( + out_tensor, + in_tensor, + out_split_sizes, + in_split_sizes, + group, + sync_op, + False, + ) diff --git a/python/paddle/distributed/communication/broadcast.py b/python/paddle/distributed/communication/broadcast.py index 7a04e839cdb7fa05942cfbadbad5e3f77152b664..eccd3bf98363348b4b5877dd0ce9e6bdb27581de 100644 --- a/python/paddle/distributed/communication/broadcast.py +++ b/python/paddle/distributed/communication/broadcast.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework def broadcast(tensor, src, group=None, sync_op=True): @@ -55,31 +53,10 @@ def broadcast(tensor, src, group=None, sync_op=True): print(data) # [[1, 2, 3], [1, 2, 3]] (2 GPUs) """ - if not framework._in_legacy_dygraph(): - return stream.broadcast( - tensor, - src, - group=group, - sync_op=sync_op, - use_calc_stream=False, - ) - - # code below will be removed after we remove the old dygraph - if group is not None and not group.is_member(): - return - use_calc_stream = sync_op - ring_id = 0 if group is None else group.id - - gsrc = src if group is None else group.get_group_rank(src) - assert gsrc >= 0, "src rank out of group, need global rank" - - return paddle._legacy_C_ops.c_broadcast( - tensor, + return stream.broadcast( tensor, - 'root', - gsrc, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, + src, + group=group, + sync_op=sync_op, + use_calc_stream=False, ) diff --git a/python/paddle/distributed/communication/group.py b/python/paddle/distributed/communication/group.py index 85db479674699b8d103b054b726a3eabdab3c56f..a48c0c080f0e502c2df626c1a16e589c491c5a33 100644 --- a/python/paddle/distributed/communication/group.py +++ b/python/paddle/distributed/communication/group.py @@ -19,6 +19,7 @@ import paddle.distributed as dist import paddle.fluid.core as core import paddle.fluid.framework as framework import paddle.fluid.layer_helper as layer_helper +from paddle.fluid.framework import in_dygraph_mode class Group: @@ -235,32 +236,32 @@ def get_group(id=0): def _sync_calc_stream(tensor): - if framework._non_static_mode(): + if in_dygraph_mode(): return paddle._legacy_C_ops.c_sync_calc_stream(tensor, tensor) - - op_type = 'c_sync_calc_stream' - helper = layer_helper.LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - inputs={'X': [tensor]}, - outputs={'Out': [tensor]}, - ) + else: + op_type = 'c_sync_calc_stream' + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + inputs={'X': [tensor]}, + outputs={'Out': [tensor]}, + ) def _sync_comm_stream(tensor, ring_id=0): - if framework._non_static_mode(): + if in_dygraph_mode(): return paddle._legacy_C_ops.c_sync_comm_stream( [tensor], [tensor], 'ring_id', ring_id ) - - op_type = 'c_sync_comm_stream' - helper = layer_helper.LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - inputs={'X': [tensor]}, - outputs={'Out': [tensor]}, - attrs={'ring_id': ring_id}, - ) + else: + op_type = 'c_sync_comm_stream' + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + inputs={'X': [tensor]}, + outputs={'Out': [tensor]}, + attrs={'ring_id': ring_id}, + ) def wait(tensor, group=None, use_calc_stream=True): @@ -336,18 +337,18 @@ def barrier(group=None): ring_id = 0 if group is None else group.id barrier_tensor = paddle.full([1], 1, dtype="int32") - if framework._non_static_mode(): + if in_dygraph_mode(): return paddle._legacy_C_ops.barrier( barrier_tensor, barrier_tensor, 'ring_id', ring_id ) - - op_type = 'barrier' - if not isinstance(ring_id, int): - raise ValueError("The type of 'group' for barrier must be int.") - helper = layer_helper.LayerHelper(op_type, **locals()) - helper.append_op( - type=op_type, - inputs={'X': [barrier_tensor]}, - outputs={'Out': [barrier_tensor]}, - attrs={'ring_id': ring_id}, - ) + else: + op_type = 'barrier' + if not isinstance(ring_id, int): + raise ValueError("The type of 'group' for barrier must be int.") + helper = layer_helper.LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + inputs={'X': [barrier_tensor]}, + outputs={'Out': [barrier_tensor]}, + attrs={'ring_id': ring_id}, + ) diff --git a/python/paddle/distributed/communication/recv.py b/python/paddle/distributed/communication/recv.py index a340b26aef4fd3825633f4e512d044ea46fbb8b6..3451969508e49d7a04921fe3530a50dafb759865 100644 --- a/python/paddle/distributed/communication/recv.py +++ b/python/paddle/distributed/communication/recv.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework def recv(tensor, src=0, group=None, sync_op=True): @@ -48,29 +46,8 @@ def recv(tensor, src=0, group=None, sync_op=True): print(data) # [7, 8, 9] (2 GPUs) """ - if not framework._in_legacy_dygraph(): - return stream.recv( - tensor, src=src, group=group, sync_op=sync_op, use_calc_stream=False - ) - - # code below will be removed after we remove the old dygraph - if group is not None and not group.is_member(): - return - use_calc_stream = sync_op - gsrc = src if group is None else group.get_group_rank(src) - ring_id = 0 if group is None else group.id - return paddle._legacy_C_ops.recv_v2( - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'peer', - src, - 'dtype', - tensor.dtype, - 'out_shape', - tensor.shape, + return stream.recv( + tensor, src=src, group=group, sync_op=sync_op, use_calc_stream=False ) diff --git a/python/paddle/distributed/communication/reduce.py b/python/paddle/distributed/communication/reduce.py index 01002a1f58f8d4176426550ce816475131a6d964..973e003e0604e6612ce80e8d5860c539d09b6df5 100644 --- a/python/paddle/distributed/communication/reduce.py +++ b/python/paddle/distributed/communication/reduce.py @@ -121,16 +121,14 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True): # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0) # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1) """ - - if not framework._in_legacy_dygraph(): - return stream.reduce( - tensor, - dst=dst, - op=op, - group=group, - sync_op=sync_op, - use_calc_stream=False, - ) + return stream.reduce( + tensor, + dst=dst, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False, + ) # code below will be removed after we remove the old dygraph if group is not None and not group.is_member(): diff --git a/python/paddle/distributed/communication/reduce_scatter.py b/python/paddle/distributed/communication/reduce_scatter.py index e919fc971c8e1407615548e2cba387ce4435e69c..a7744f16e16c916428bd5473c0733ed5ff172fe6 100644 --- a/python/paddle/distributed/communication/reduce_scatter.py +++ b/python/paddle/distributed/communication/reduce_scatter.py @@ -13,7 +13,6 @@ # limitations under the License. import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework from paddle.distributed.communication.reduce import ReduceOp from paddle.distributed.communication.stream.reduce_scatter import ( _reduce_scatter_base as _reduce_scatter_base_stream, @@ -62,15 +61,14 @@ def reduce_scatter( # [8, 10] (2 GPUs, out for rank 1) """ - if not framework._in_legacy_dygraph(): - return stream.reduce_scatter( - tensor, - tensor_list, - op=op, - group=group, - sync_op=sync_op, - use_calc_stream=False, - ) + return stream.reduce_scatter( + tensor, + tensor_list, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False, + ) def _reduce_scatter_base( @@ -111,12 +109,11 @@ def _reduce_scatter_base( # [5, 7] (2 GPUs, out for rank 1) """ - if not framework._in_legacy_dygraph(): - return _reduce_scatter_base_stream( - output, - input, - op=op, - group=group, - sync_op=sync_op, - use_calc_stream=False, - ) + return _reduce_scatter_base_stream( + output, + input, + op=op, + group=group, + sync_op=sync_op, + use_calc_stream=False, + ) diff --git a/python/paddle/distributed/communication/scatter.py b/python/paddle/distributed/communication/scatter.py index a0a9aeb0de296082c94f0ad7ce4eca62ef121e6e..f551811721490c6cc5a457dba1fd99d43ffdf06f 100644 --- a/python/paddle/distributed/communication/scatter.py +++ b/python/paddle/distributed/communication/scatter.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework -from paddle.distributed.communication.group import _get_global_group def scatter(tensor, tensor_list=None, src=0, group=None, sync_op=True): @@ -61,34 +58,4 @@ def scatter(tensor, tensor_list=None, src=0, group=None, sync_op=True): # [1, 2, 3] [10, 11, 12] (2 GPUs, out for rank 0) # [4, 5, 6] [4, 5, 6] (2 GPUs, out for rank 1) """ - if not framework._in_legacy_dygraph(): - return stream.scatter(tensor, tensor_list, src, group, sync_op) - - # code below will be removed after we remove the old dygraph - if group is not None and not group.is_member(): - return - ring_id = 0 if group is None else group.id - gsrc = src if group is None else group.get_group_rank(src) - rank = _get_global_group().rank if group is None else group.rank - nranks = _get_global_group().nranks if group is None else group.nranks - assert gsrc >= 0, "src rank out of group, need global rank" - - if rank != gsrc: - tensor_list = [] - for _ in range(nranks): - tensor_list.append(tensor) - temp = paddle.concat(tensor_list, axis=0) - - use_calc_stream = sync_op - return framework._legacy_C_ops.c_scatter( - temp, - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'nranks', - nranks, - 'root', - gsrc, - ) + return stream.scatter(tensor, tensor_list, src, group, sync_op) diff --git a/python/paddle/distributed/communication/send.py b/python/paddle/distributed/communication/send.py index 0d78fb854802369e1ba287246529ea762e10ab84..3ca6caf7289a6111e649402159d4d53f3fee0d98 100644 --- a/python/paddle/distributed/communication/send.py +++ b/python/paddle/distributed/communication/send.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework def send(tensor, dst=0, group=None, sync_op=True): @@ -48,27 +46,8 @@ def send(tensor, dst=0, group=None, sync_op=True): print(data) # [7, 8, 9] (2 GPUs) """ - if not framework._in_legacy_dygraph(): - return stream.send( - tensor, dst=dst, group=group, sync_op=sync_op, use_calc_stream=False - ) - - # code below will be removed after we remove the old dygraph - if group is not None and not group.is_member(): - return - use_calc_stream = sync_op - gdst = dst if group is None else group.get_group_rank(dst) - assert gdst >= 0, "dst rank out of group, need global rank" - ring_id = 0 if group is None else group.id - - return paddle._legacy_C_ops.send_v2( - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'peer', - gdst, + return stream.send( + tensor, dst=dst, group=group, sync_op=sync_op, use_calc_stream=False ) diff --git a/python/paddle/distributed/fleet/fleet.py b/python/paddle/distributed/fleet/fleet.py index 6efcadbb51a5d56dbd570ef032974ccc89fb2005..1eae55c12dce8dfbda643b8ab2f111731bbfcebd 100755 --- a/python/paddle/distributed/fleet/fleet.py +++ b/python/paddle/distributed/fleet/fleet.py @@ -18,6 +18,7 @@ import os import paddle from paddle.fluid import compiler from paddle.fluid.dygraph import parallel_helper +from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.ir import apply_build_strategy from paddle.fluid.wrapped_decorator import wrap_decorator from paddle.framework import _global_flags @@ -280,7 +281,7 @@ class Fleet: "CUDA_VISIBLE_DEVICES shoule be set only 1 card if you use `python` to launch fleet program." ) - if paddle.framework._non_static_mode(): + if in_dygraph_mode(): if self.worker_num() == 1: # if worker_num is 1, should construct default topology & hcg self._topology = tp.CommunicateTopology() @@ -1255,7 +1256,7 @@ class Fleet: ) else: if ( - paddle.framework._non_static_mode() + in_dygraph_mode() or self._role_maker._is_non_distributed() or self._is_collective ): @@ -1271,247 +1272,288 @@ class Fleet: context["user_defined_strategy"] = copy.deepcopy( self._user_defined_strategy ) - if paddle.framework._non_static_mode(): + if in_dygraph_mode(): # imitate target optimizer retrieval target_opt = self.user_defined_optimizer self._context = context return target_opt.minimize(loss) - - # cache original feed forward program - self.origin_main_program = loss.block.program - # add distributed attr - if not hasattr(self.origin_main_program, "distributed_info_"): - setattr(self.origin_main_program, "distributed_info_", dict()) - self.origin_main_program.distributed_info_[ - "dp_degree" - ] = self._user_defined_strategy.sharding_configs["dp_degree"] - self.origin_main_program.distributed_info_[ - "mp_degree" - ] = self._user_defined_strategy.sharding_configs["mp_degree"] - self.origin_main_program.distributed_info_[ - "pp_degree" - ] = self._user_defined_strategy.sharding_configs["pp_degree"] - self.origin_main_program.distributed_info_[ - "sharding_degree" - ] = self._user_defined_strategy.sharding_configs["sharding_degree"] - - context["origin_main_program"] = self.origin_main_program - context["origin_main_programs"] = [self.origin_main_program] - context["loss"] = loss - if startup_program is None: - self.origin_startup_program = ( - paddle.static.default_startup_program().clone(for_test=False) - ) - startup_program = paddle.static.default_startup_program() else: - self.origin_startup_program = startup_program.clone(for_test=False) - - context["origin_startup_program"] = startup_program - context["origin_startup_programs"] = [startup_program] - context["role_maker"] = self._role_maker + # cache original feed forward program + self.origin_main_program = loss.block.program + # add distributed attr + if not hasattr(self.origin_main_program, "distributed_info_"): + setattr(self.origin_main_program, "distributed_info_", dict()) + self.origin_main_program.distributed_info_[ + "dp_degree" + ] = self._user_defined_strategy.sharding_configs["dp_degree"] + self.origin_main_program.distributed_info_[ + "mp_degree" + ] = self._user_defined_strategy.sharding_configs["mp_degree"] + self.origin_main_program.distributed_info_[ + "pp_degree" + ] = self._user_defined_strategy.sharding_configs["pp_degree"] + self.origin_main_program.distributed_info_[ + "sharding_degree" + ] = self._user_defined_strategy.sharding_configs[ + "sharding_degree" + ] - # Use the auto-parallel's routines instead - if ( - self._user_defined_strategy.semi_auto - or self._user_defined_strategy.auto_search - ): - from ..auto_parallel.parallelizer import AutoParallelizer - - auto_parallelizer = AutoParallelizer(self) - ( - optimize_ops, - params_grads, - dist_startup_prog, - dist_main_prog, - ) = auto_parallelizer.parallelize( - loss, startup_program, parameter_list, no_grad_set - ) + context["origin_main_program"] = self.origin_main_program + context["origin_main_programs"] = [self.origin_main_program] + context["loss"] = loss + if startup_program is None: + self.origin_startup_program = ( + paddle.static.default_startup_program().clone( + for_test=False + ) + ) + startup_program = paddle.static.default_startup_program() + else: + self.origin_startup_program = startup_program.clone( + for_test=False + ) - return optimize_ops, params_grads, dist_startup_prog, dist_main_prog + context["origin_startup_program"] = startup_program + context["origin_startup_programs"] = [startup_program] + context["role_maker"] = self._role_maker - context["user_defined_strategy"] = copy.deepcopy( - self._user_defined_strategy - ) - copy_user_defined_strategy = copy.deepcopy(self._user_defined_strategy) + # Use the auto-parallel's routines instead + if ( + self._user_defined_strategy.semi_auto + or self._user_defined_strategy.auto_search + ): + from ..auto_parallel.parallelizer import AutoParallelizer + + auto_parallelizer = AutoParallelizer(self) + ( + optimize_ops, + params_grads, + dist_startup_prog, + dist_main_prog, + ) = auto_parallelizer.parallelize( + loss, startup_program, parameter_list, no_grad_set + ) - can_not_apply_optimizer_list = [] - # fix set collective and fleet ps gpu error - if ( - self._is_collective - and len(self._user_defined_strategy.sparse_table_configs) > 0 - ): - context["use_fleet_ps"] = True - from .meta_optimizers import ParameterServerOptimizer + return ( + optimize_ops, + params_grads, + dist_startup_prog, + dist_main_prog, + ) - meta_optimizer = ParameterServerOptimizer( - self.user_defined_optimizer + context["user_defined_strategy"] = copy.deepcopy( + self._user_defined_strategy ) - meta_optimizer._set_basic_info( - loss, - self._role_maker, - self.user_defined_optimizer, - copy_user_defined_strategy, + copy_user_defined_strategy = copy.deepcopy( + self._user_defined_strategy ) - can_not_apply_optimizer_list.append(meta_optimizer) - from .meta_optimizers import ParameterServerGraphOptimizer - graph_optimizer = ParameterServerGraphOptimizer( - self.user_defined_optimizer - ) - graph_optimizer._set_basic_info( - loss, - self._role_maker, - self.user_defined_optimizer, - copy_user_defined_strategy, - ) - can_not_apply_optimizer_list.append(graph_optimizer) - else: - # compile time - distributed_optimizer_list = ( - MetaOptimizerFactory()._get_valid_meta_optimizers( + can_not_apply_optimizer_list = [] + # fix set collective and fleet ps gpu error + if ( + self._is_collective + and len(self._user_defined_strategy.sparse_table_configs) > 0 + ): + context["use_fleet_ps"] = True + from .meta_optimizers import ParameterServerOptimizer + + meta_optimizer = ParameterServerOptimizer( self.user_defined_optimizer ) - ) - # trigger the auto-parallel in very strict condition - # strategy = DistributedStrategy() - # strategy.auto = True - # optimizer = paddle.optimizer.SGD(learning_rate=0.1) - # optimizer = fleet.distributed_optimizer(optimizer, strategy) - if copy_user_defined_strategy._is_strict_auto(): - # turn on all the strategy for each optimizer - for opt in distributed_optimizer_list: - opt._enable_strategy(copy_user_defined_strategy, context) - - valid_optimizer_list = [] - valid_graph_optimizer_list = [] - # recall meta optimizers for ranking - for opt in distributed_optimizer_list: - opt._set_basic_info( + meta_optimizer._set_basic_info( loss, self._role_maker, self.user_defined_optimizer, copy_user_defined_strategy, ) - if opt._can_apply() and not opt._is_graph_out(): - valid_optimizer_list.append(opt) - elif opt._can_apply() and opt._is_graph_out(): - valid_graph_optimizer_list.append(opt) - else: - can_not_apply_optimizer_list.append(opt) - # combine recalled meta optimizers to be a valid meta optimizer - ( - meta_optimizer, - graph_optimizer, - ) = self.strategy_compiler.generate_optimizer( - loss, - self._role_maker, - self.user_defined_optimizer, - copy_user_defined_strategy, - valid_optimizer_list, - valid_graph_optimizer_list, - ) + can_not_apply_optimizer_list.append(meta_optimizer) + from .meta_optimizers import ParameterServerGraphOptimizer - valid_strategy = self.strategy_compiler._get_valid_strategy( - copy_user_defined_strategy, can_not_apply_optimizer_list - ) + graph_optimizer = ParameterServerGraphOptimizer( + self.user_defined_optimizer + ) + graph_optimizer._set_basic_info( + loss, + self._role_maker, + self.user_defined_optimizer, + copy_user_defined_strategy, + ) + can_not_apply_optimizer_list.append(graph_optimizer) + else: + # compile time + distributed_optimizer_list = ( + MetaOptimizerFactory()._get_valid_meta_optimizers( + self.user_defined_optimizer + ) + ) + # trigger the auto-parallel in very strict condition + # strategy = DistributedStrategy() + # strategy.auto = True + # optimizer = paddle.optimizer.SGD(learning_rate=0.1) + # optimizer = fleet.distributed_optimizer(optimizer, strategy) + if copy_user_defined_strategy._is_strict_auto(): + # turn on all the strategy for each optimizer + for opt in distributed_optimizer_list: + opt._enable_strategy( + copy_user_defined_strategy, context + ) + + valid_optimizer_list = [] + valid_graph_optimizer_list = [] + # recall meta optimizers for ranking + for opt in distributed_optimizer_list: + opt._set_basic_info( + loss, + self._role_maker, + self.user_defined_optimizer, + copy_user_defined_strategy, + ) + if opt._can_apply() and not opt._is_graph_out(): + valid_optimizer_list.append(opt) + elif opt._can_apply() and opt._is_graph_out(): + valid_graph_optimizer_list.append(opt) + else: + can_not_apply_optimizer_list.append(opt) + # combine recalled meta optimizers to be a valid meta optimizer + ( + meta_optimizer, + graph_optimizer, + ) = self.strategy_compiler.generate_optimizer( + loss, + self._role_maker, + self.user_defined_optimizer, + copy_user_defined_strategy, + valid_optimizer_list, + valid_graph_optimizer_list, + ) - context["valid_strategy"] = copy.deepcopy(valid_strategy) - logger.debug("valid_strategy: " + str(context["valid_strategy"])) - logger.debug( - "user_defined_strategy: " + str(context["user_defined_strategy"]) - ) + valid_strategy = self.strategy_compiler._get_valid_strategy( + copy_user_defined_strategy, can_not_apply_optimizer_list + ) - applied_meta_list = self.strategy_compiler._get_applied_meta_list() - applied_graph_list = self.strategy_compiler._get_applied_graph_list() + context["valid_strategy"] = copy.deepcopy(valid_strategy) + logger.debug("valid_strategy: " + str(context["valid_strategy"])) + logger.debug( + "user_defined_strategy: " + + str(context["user_defined_strategy"]) + ) - context['applied_meta_list'] = applied_meta_list - context['applied_graph_list'] = applied_graph_list + applied_meta_list = self.strategy_compiler._get_applied_meta_list() + applied_graph_list = ( + self.strategy_compiler._get_applied_graph_list() + ) - self._context = context + context['applied_meta_list'] = applied_meta_list + context['applied_graph_list'] = applied_graph_list - self.valid_strategy = valid_strategy - self.valid_strategy._enable_env() + self._context = context - optimize_ops = [] - params_grads = [] + self.valid_strategy = valid_strategy + self.valid_strategy._enable_env() - if self._role_maker._is_non_distributed() and not self._is_collective: - if self._runtime_handle is None: - self._runtime_handle = RuntimeFactory()._create_runtime(context) + optimize_ops = [] + params_grads = [] - compiled_program = compiler.CompiledProgram( - self.origin_main_program - ).with_data_parallel(loss_name=loss.name, share_vars_from=None) - loss.block.program._graph = compiled_program - return self.user_defined_optimizer.minimize( - loss, startup_program, parameter_list, no_grad_set=no_grad_set - ) + if ( + self._role_maker._is_non_distributed() + and not self._is_collective + ): + if self._runtime_handle is None: + self._runtime_handle = RuntimeFactory()._create_runtime( + context + ) - if meta_optimizer: - logger.debug( - "before minimize program id: " + str(id(loss.block.program)) - ) - optimize_ops, params_grads = meta_optimizer.minimize( - loss, startup_program, parameter_list, no_grad_set=no_grad_set - ) - logger.debug( - "after minimize program id: " + str(id(loss.block.program)) - ) - default_program = paddle.static.default_main_program() - logger.debug("default program id: " + str(id(default_program))) + compiled_program = compiler.CompiledProgram( + self.origin_main_program + ).with_data_parallel(loss_name=loss.name, share_vars_from=None) + loss.block.program._graph = compiled_program + return self.user_defined_optimizer.minimize( + loss, + startup_program, + parameter_list, + no_grad_set=no_grad_set, + ) - if id(default_program) != id(loss.block.program): - paddle.framework.switch_main_program(loss.block.program) - logger.debug( - "default program id after switch: " + str(id(default_program)) - ) + if meta_optimizer: + logger.debug( + "before minimize program id: " + str(id(loss.block.program)) + ) + optimize_ops, params_grads = meta_optimizer.minimize( + loss, + startup_program, + parameter_list, + no_grad_set=no_grad_set, + ) + logger.debug( + "after minimize program id: " + str(id(loss.block.program)) + ) + default_program = paddle.static.default_main_program() + logger.debug("default program id: " + str(id(default_program))) + + if id(default_program) != id(loss.block.program): + paddle.framework.switch_main_program(loss.block.program) + logger.debug( + "default program id after switch: " + + str(id(default_program)) + ) - else: - optimize_ops, params_grads = self.user_defined_optimizer.minimize( - loss, startup_program, parameter_list, no_grad_set=no_grad_set - ) + else: + ( + optimize_ops, + params_grads, + ) = self.user_defined_optimizer.minimize( + loss, + startup_program, + parameter_list, + no_grad_set=no_grad_set, + ) - context["program_optimize_ops"] = optimize_ops - context["program_params_grads"] = params_grads + context["program_optimize_ops"] = optimize_ops + context["program_params_grads"] = params_grads - if graph_optimizer: - logger.debug( - "before graph minimize program id: " - + str(id(loss.block.program)) - ) - optimize_ops, params_grads = graph_optimizer.minimize( - loss, startup_program, parameter_list, no_grad_set=no_grad_set - ) - # since we do not encourage users to use graph operations - # if a graph optimizer takes effect, mostly - # optimizers_ops and params_grads are None - # i.e. users can not modify current computation graph anymore - context["graph_optimize_ops"] = optimize_ops - context["graph_optimize_grads"] = params_grads - else: - apply_ir_passes(loss.block.program, startup_program, self) + if graph_optimizer: + logger.debug( + "before graph minimize program id: " + + str(id(loss.block.program)) + ) + optimize_ops, params_grads = graph_optimizer.minimize( + loss, + startup_program, + parameter_list, + no_grad_set=no_grad_set, + ) + # since we do not encourage users to use graph operations + # if a graph optimizer takes effect, mostly + # optimizers_ops and params_grads are None + # i.e. users can not modify current computation graph anymore + context["graph_optimize_ops"] = optimize_ops + context["graph_optimize_grads"] = params_grads + else: + apply_ir_passes(loss.block.program, startup_program, self) - if not self._role_maker._is_heter_parameter_server_mode: - program = paddle.static.default_main_program() - opt_info = {} if program._fleet_opt is None else program._fleet_opt - opt_info["mpi_size"] = self.worker_num() - opt_info["mpi_rank"] = self.worker_index() - for ( - k, - v, - ) in self._user_defined_strategy.trainer_desc_configs.items(): - if v or k not in opt_info: - opt_info[k] = v - program._fleet_opt = opt_info + if not self._role_maker._is_heter_parameter_server_mode: + program = paddle.static.default_main_program() + opt_info = ( + {} if program._fleet_opt is None else program._fleet_opt + ) + opt_info["mpi_size"] = self.worker_num() + opt_info["mpi_rank"] = self.worker_index() + for ( + k, + v, + ) in self._user_defined_strategy.trainer_desc_configs.items(): + if v or k not in opt_info: + opt_info[k] = v + program._fleet_opt = opt_info - if self._runtime_handle is None: - self._runtime_handle = RuntimeFactory()._create_runtime(context) + if self._runtime_handle is None: + self._runtime_handle = RuntimeFactory()._create_runtime(context) - import paddle.distributed.fleet as fleet + import paddle.distributed.fleet as fleet - fleet.util._set_strategy(context["valid_strategy"]) + fleet.util._set_strategy(context["valid_strategy"]) - return optimize_ops, params_grads + return optimize_ops, params_grads def _minimize_losses_impl( self, diff --git a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py index 67e2d96de6a38a63077b6aa6c2943ef79af64ad8..f992a10ed36c8e8d99e78bb0763a7bf6638da9e4 100644 --- a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py +++ b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py @@ -18,13 +18,7 @@ from paddle.common_ops_import import dygraph_utils from paddle.distributed import collective from paddle.fluid import core from paddle.fluid.data_feeder import check_dtype, check_variable_and_dtype -from paddle.framework import ( - LayerHelper, - _in_legacy_dygraph, - _varbase_creator, - in_dygraph_mode, - in_dynamic_mode, -) +from paddle.framework import LayerHelper, _varbase_creator, in_dygraph_mode from paddle.nn import Layer from ....communication.reduce import ReduceOp, _get_reduce_op @@ -69,39 +63,29 @@ def _c_identity(tensor, group=None): return dy return c_identity_eager.apply(tensor) + else: + op_type = 'c_identity' + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - elif _in_legacy_dygraph(): - return _legacy_C_ops.c_identity( + check_variable_and_dtype( tensor, - 'use_calc_stream', - True, - 'ring_id', - ring_id, - 'use_model_parallel', - True, + 'tensor', + ['float16', 'float32', 'float64', 'int32', 'int64'], + '_c_identity', ) - op_type = 'c_identity' - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - - check_variable_and_dtype( - tensor, - 'tensor', - ['float16', 'float32', 'float64', 'int32', 'int64'], - '_c_identity', - ) - helper.append_op( - type=op_type, - inputs={'X': tensor}, - outputs={'Out': out}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': True, - 'use_model_parallel': True, - }, - ) - return out + helper.append_op( + type=op_type, + inputs={'X': tensor}, + outputs={'Out': out}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': True, + 'use_model_parallel': True, + }, + ) + return out def _c_concat(tensor, group=None): @@ -125,7 +109,7 @@ def _c_concat(tensor, group=None): rank = group.rank nranks = group.nranks - if in_dynamic_mode(): + if in_dygraph_mode(): return _legacy_C_ops.c_concat( tensor, 'ring_id', @@ -139,31 +123,31 @@ def _c_concat(tensor, group=None): 'use_model_parallel', True, ) + else: + op_type = 'c_concat' + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - op_type = 'c_concat' - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - - check_variable_and_dtype( - tensor, - 'tensor', - ['float16', 'float32', 'float64', 'int32', 'int64'], - '_c_concat', - ) + check_variable_and_dtype( + tensor, + 'tensor', + ['float16', 'float32', 'float64', 'int32', 'int64'], + '_c_concat', + ) - helper.append_op( - type=op_type, - inputs={'X': tensor}, - outputs={'Out': out}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': True, - 'use_model_parallel': True, - 'nranks': nranks, - 'rank': rank, - }, - ) - return out + helper.append_op( + type=op_type, + inputs={'X': tensor}, + outputs={'Out': out}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': True, + 'use_model_parallel': True, + 'nranks': nranks, + 'rank': rank, + }, + ) + return out def _c_split(tensor, group=None): @@ -191,7 +175,7 @@ def _c_split(tensor, group=None): else group.nranks ) - if in_dynamic_mode(): + if in_dygraph_mode(): return _legacy_C_ops.c_split( tensor, 'use_calc_stream', @@ -205,31 +189,31 @@ def _c_split(tensor, group=None): 'use_model_parallel', True, ) + else: + op_type = 'c_split' + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - op_type = 'c_split' - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - - check_variable_and_dtype( - tensor, - 'tensor', - ['float16', 'float32', 'float64', 'int32', 'int64'], - '_c_split', - ) + check_variable_and_dtype( + tensor, + 'tensor', + ['float16', 'float32', 'float64', 'int32', 'int64'], + '_c_split', + ) - helper.append_op( - type=op_type, - inputs={'X': tensor}, - outputs={'Out': out}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': True, - 'rank': rank, - 'nranks': nranks, - 'use_model_parallel': True, - }, - ) - return out + helper.append_op( + type=op_type, + inputs={'X': tensor}, + outputs={'Out': out}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': True, + 'rank': rank, + 'nranks': nranks, + 'use_model_parallel': True, + }, + ) + return out def _mp_allreduce( @@ -286,41 +270,29 @@ def _mp_allreduce( return mp_allreduce_eager.apply( tensor, group, use_calc_stream, use_model_parallel ) + else: + ring_id = 0 if group is None else group.id + op_type = 'mp_allreduce_sum' + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - ring_id = 0 if group is None else group.id - if _in_legacy_dygraph(): - if op == ReduceOp.SUM: - return _legacy_C_ops.mp_allreduce_sum_( - tensor, - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - ) - else: - raise ValueError("Unknown parameter: {}.".format(op)) - - op_type = 'mp_allreduce_sum' - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=tensor.dtype) - - check_variable_and_dtype( - tensor, - 'tensor', - ['float16', 'float32', 'float64', 'int32', 'int64'], - op_type, - ) + check_variable_and_dtype( + tensor, + 'tensor', + ['float16', 'float32', 'float64', 'int32', 'int64'], + op_type, + ) - helper.append_op( - type=op_type, - inputs={'X': tensor}, - outputs={'Out': out}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': use_calc_stream, - }, - ) - return out + helper.append_op( + type=op_type, + inputs={'X': tensor}, + outputs={'Out': out}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': use_calc_stream, + }, + ) + return out def _c_lookup_table(table, index, start_index=0, name=None): @@ -337,23 +309,23 @@ def _c_lookup_table(table, index, start_index=0, name=None): Returns: Tensor. """ - if in_dynamic_mode(): + if in_dygraph_mode(): return _legacy_C_ops.c_embedding( table, index, "start_index", start_index ) - - op_type = 'c_embedding' - helper = LayerHelper(op_type, **locals()) - dtype = helper.input_dtype(input_param_name='table') - check_variable_and_dtype(index, 'input', ['int32', 'int64'], op_type) - tmp = helper.create_variable_for_type_inference(dtype) - helper.append_op( - type='c_embedding', - inputs={'Ids': index, 'W': table}, - outputs={'Out': tmp}, - attrs={"start_index": start_index}, - ) - return tmp + else: + op_type = 'c_embedding' + helper = LayerHelper(op_type, **locals()) + dtype = helper.input_dtype(input_param_name='table') + check_variable_and_dtype(index, 'input', ['int32', 'int64'], op_type) + tmp = helper.create_variable_for_type_inference(dtype) + helper.append_op( + type='c_embedding', + inputs={'Ids': index, 'W': table}, + outputs={'Out': tmp}, + attrs={"start_index": start_index}, + ) + return tmp class _Linear(Layer): @@ -426,7 +398,7 @@ def _c_softmax_with_cross_entropy( if input_dims - 1 == label_dims: label = paddle.unsqueeze(label, axis=-1) - if in_dynamic_mode(): + if in_dygraph_mode(): softmax, loss = _legacy_C_ops.c_softmax_with_cross_entropy( logits, label, 'ring_id', ring_id, 'rank', rank, 'nranks', nranks ) @@ -434,33 +406,33 @@ def _c_softmax_with_cross_entropy( return loss else: return loss, softmax + else: + attrs = { + 'ring_id': ring_id, + 'rank': rank, + 'nranks': nranks, + } + helper = LayerHelper('c_softmax_with_cross_entropy', **locals()) + softmax = helper.create_variable_for_type_inference(dtype=logits.dtype) + loss = helper.create_variable_for_type_inference(dtype=logits.dtype) + helper.append_op( + type='c_softmax_with_cross_entropy', + inputs={'Logits': logits, 'Label': label}, + outputs={'Softmax': softmax, 'Loss': loss}, + attrs=attrs, + ) - attrs = { - 'ring_id': ring_id, - 'rank': rank, - 'nranks': nranks, - } - helper = LayerHelper('c_softmax_with_cross_entropy', **locals()) - softmax = helper.create_variable_for_type_inference(dtype=logits.dtype) - loss = helper.create_variable_for_type_inference(dtype=logits.dtype) - helper.append_op( - type='c_softmax_with_cross_entropy', - inputs={'Logits': logits, 'Label': label}, - outputs={'Softmax': softmax, 'Loss': loss}, - attrs=attrs, - ) - - if return_softmax: - return loss, softmax + if return_softmax: + return loss, softmax - return loss + return loss def _linear(x, weight, bias=None, name=None): """ Fuction Linear """ - if in_dynamic_mode(): + if in_dygraph_mode(): pre_bias = _varbase_creator(dtype=x.dtype) _legacy_C_ops.matmul( x, @@ -827,7 +799,7 @@ def split( supported_operations ) ) - if in_dynamic_mode(): + if in_dygraph_mode(): raise ValueError( "paddle.distributed.split cannot be used in dynamic " "graph mode, plese use ParallelEmbedding, ParallelRowLinear, " diff --git a/python/paddle/distributed/fleet/layers/mpu/random.py b/python/paddle/distributed/fleet/layers/mpu/random.py index e7674b08be47a7a555c1814c7f7412b0fa0d35bb..7b89330d951c87dcce7276a0fa0f6e672d64a38d 100644 --- a/python/paddle/distributed/fleet/layers/mpu/random.py +++ b/python/paddle/distributed/fleet/layers/mpu/random.py @@ -20,7 +20,8 @@ import paddle from paddle import _legacy_C_ops from paddle.fluid import core from paddle.fluid.data_feeder import check_variable_and_dtype -from paddle.framework import LayerHelper, in_dynamic_mode +from paddle.fluid.framework import in_dygraph_mode +from paddle.framework import LayerHelper from paddle.static import Variable __all__ = [] @@ -211,7 +212,7 @@ def dropout( ) # semantic transfer # dygraph using tracker, doesn't need determinate seed - if in_dynamic_mode(): + if in_dygraph_mode(): out, mask = _legacy_C_ops.dropout( x, 'dropout_prob', @@ -226,34 +227,34 @@ def dropout( mode, ) return out + else: + seed = determinate_seed(rng_name) - seed = determinate_seed(rng_name) - - if isinstance(p, Variable) and not p.shape != [1]: - raise TypeError( - "Required p.shape == [1] if type(p) is Variable, but received p.shape = {}".format( - p.shape + if isinstance(p, Variable) and not p.shape != [1]: + raise TypeError( + "Required p.shape == [1] if type(p) is Variable, but received p.shape = {}".format( + p.shape + ) ) - ) - helper = LayerHelper('dropout', **locals()) - check_variable_and_dtype( - x, 'x', ['float16', 'float32', 'float64'], 'dropout' - ) + helper = LayerHelper('dropout', **locals()) + check_variable_and_dtype( + x, 'x', ['float16', 'float32', 'float64'], 'dropout' + ) - out = helper.create_variable_for_type_inference(dtype=x.dtype) - mask = helper.create_variable_for_type_inference( - dtype=core.VarDesc.VarType.UINT8, stop_gradient=True - ) + out = helper.create_variable_for_type_inference(dtype=x.dtype) + mask = helper.create_variable_for_type_inference( + dtype=core.VarDesc.VarType.UINT8, stop_gradient=True + ) - helper.append_op( - type='dropout', - inputs={'X': [x], 'Seed': seed}, - outputs={'Out': [out], 'Mask': [mask]}, - attrs={ - 'dropout_prob': p, - 'is_test': not training, - 'dropout_implementation': mode, - }, - ) - return out + helper.append_op( + type='dropout', + inputs={'X': [x], 'Seed': seed}, + outputs={'Out': [out], 'Mask': [mask]}, + attrs={ + 'dropout_prob': p, + 'is_test': not training, + 'dropout_implementation': mode, + }, + ) + return out diff --git a/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py index 3bf6480480bed247f47fa9b37e1f7d756e31eaca..3f41ebaa96d07aa10de6e38bf2c80791f2c4b24d 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dgc_optimizer.py @@ -19,10 +19,10 @@ from .meta_optimizer_base import MetaOptimizerBase __all__ = [] import paddle -from paddle import framework from paddle.common_ops_import import LayerHelper from paddle.fluid.clip import GradientClipByNorm, append_gradient_clip_ops from paddle.fluid.dygraph import base as imperative_base +from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.optimizer import Momentum, Optimizer from paddle.framework import core from paddle.static import create_global_var @@ -46,7 +46,7 @@ class DGCMomentumOptimizer(Optimizer): grad_clip=None, name=None, ): - if framework._non_static_mode(): + if in_dygraph_mode(): raise Exception("In dygraph, don't support DGCMomentumOptimizer.") assert ( diff --git a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py index bbe1e5c6137648066ccdd94d4a74f860afffc97e..621455735905caa9003d4f8704905421731a332e 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py +++ b/python/paddle/distributed/fleet/meta_parallel/pp_utils/p2p_communication.py @@ -16,8 +16,7 @@ import numpy as np import paddle import paddle.fluid.core as core -from paddle import _legacy_C_ops -from paddle.fluid.framework import _in_legacy_dygraph, in_dygraph_mode +from paddle.fluid.framework import in_dygraph_mode from ...utils.log_util import logger from .utils import number_2_dtype, paddle_2_number @@ -189,21 +188,7 @@ def _partial_send_op( tensor, group, use_calc_stream, ring_id, dst, nranks, rank_id ): dst_rank_in_group = dst if group is None else group.get_group_rank(dst) - if _in_legacy_dygraph(): - return _legacy_C_ops.partial_send( - tensor.detach(), - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'peer', - dst_rank_in_group, - 'num', - nranks, - 'id', - rank_id, - ) - elif in_dygraph_mode(): + if in_dygraph_mode(): group = ( paddle.distributed.collective._get_default_group() if group is None @@ -234,12 +219,7 @@ def send_partial( tensor, group, use_calc_stream, ring_id, dst_rank, nranks, rank_id ) else: - if _in_legacy_dygraph(): - send_op = lambda x, dst, group: paddle.distributed.send( - x, dst, group, use_calc_stream - ) - elif in_dygraph_mode(): - send_op = paddle.distributed.isend + send_op = paddle.distributed.isend return send_op(tensor.detach(), dst=dst_rank, group=group) @@ -247,37 +227,17 @@ def _partial_recv_op( tensor, group, use_calc_stream, ring_id, src, nranks, rank_id ): src_rank_in_group = src if group is None else group.get_group_rank(src) - if _in_legacy_dygraph(): - assert use_calc_stream - return _legacy_C_ops.partial_recv( - tensor.detach(), - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'peer', - src_rank_in_group, - 'num', - nranks, - 'id', - rank_id, - 'dtype', - tensor.dtype, - 'out_shape', - tensor.shape, - ) - elif in_dygraph_mode(): - group = ( - paddle.distributed.collective._get_default_group() - if group is None - else group - ) - comm_op = ( - group.process_group.recv_partial_on_calc_stream - if use_calc_stream - else group.process_group.recv_partial - ) - return comm_op(tensor, src_rank_in_group, nranks, rank_id) + group = ( + paddle.distributed.collective._get_default_group() + if group is None + else group + ) + comm_op = ( + group.process_group.recv_partial_on_calc_stream + if use_calc_stream + else group.process_group.recv_partial + ) + return comm_op(tensor, src_rank_in_group, nranks, rank_id) def recv_partial( @@ -297,7 +257,7 @@ def recv_partial( tensor, group, use_calc_stream, ring_id, src_rank, nranks, rank_id ) else: - if _in_legacy_dygraph() or use_calc_stream: + if use_calc_stream: recv_op = paddle.distributed.recv elif in_dygraph_mode(): recv_op = paddle.distributed.irecv @@ -307,30 +267,17 @@ def recv_partial( def _partial_allgather_op( tensor, group, use_calc_stream, ring_id, nranks, rank_id ): - if _in_legacy_dygraph(): - return _legacy_C_ops.partial_allgather_( - tensor.detach(), - 'use_calc_stream', - use_calc_stream, - 'ring_id', - ring_id, - 'nranks', - nranks, - 'rank', - rank_id, - ) - elif in_dygraph_mode(): - group = ( - paddle.distributed.collective._get_default_group() - if group is None - else group - ) - comm_op = ( - group.process_group.all_gather_partial_on_calc_stream - if use_calc_stream - else group.process_group.all_gather_partial - ) - return comm_op(tensor, tensor, nranks, rank_id) + group = ( + paddle.distributed.collective._get_default_group() + if group is None + else group + ) + comm_op = ( + group.process_group.all_gather_partial_on_calc_stream + if use_calc_stream + else group.process_group.all_gather_partial + ) + return comm_op(tensor, tensor, nranks, rank_id) def allgather_partial( diff --git a/python/paddle/distributed/fleet/optimizer.py b/python/paddle/distributed/fleet/optimizer.py index e6e2696d52e5d2c6cfe04288d9c59ca711ee5b1c..89b0456fe6abf0603e96d4a09603ef3e3049ea2b 100755 --- a/python/paddle/distributed/fleet/optimizer.py +++ b/python/paddle/distributed/fleet/optimizer.py @@ -14,8 +14,8 @@ import copy -import paddle from paddle.distributed import fleet +from paddle.fluid.framework import in_dygraph_mode from .meta_optimizers import HeterParallelOptimizer, HybridParallelOptimizer from .utils.log_util import logger @@ -74,7 +74,7 @@ def _dygraph_distributed_optimizer(optimizer, strategy=None): def distributed_optimizer(*args, **kwargs): - if paddle.framework._non_static_mode(): + if in_dygraph_mode(): return _dygraph_distributed_optimizer(*args, **kwargs) else: return fleet.fleet.distributed_optimizer(*args, **kwargs) diff --git a/python/paddle/distributed/fleet/utils/hybrid_parallel_inference.py b/python/paddle/distributed/fleet/utils/hybrid_parallel_inference.py index d0981c045f1068d02d163efb713fed81cbb315d5..4502a8ddf4122f002a9a8b5486b84446f5f610b3 100644 --- a/python/paddle/distributed/fleet/utils/hybrid_parallel_inference.py +++ b/python/paddle/distributed/fleet/utils/hybrid_parallel_inference.py @@ -20,7 +20,8 @@ import paddle.distributed.fleet as fleet # (TODO: GhostScreaming) It will be removed later. import paddle.fluid.core as core -from paddle.framework import Block, Program, _non_static_mode +from paddle.fluid.framework import in_dygraph_mode +from paddle.framework import Block, Program class HybridParallelInferenceHelper: @@ -205,7 +206,7 @@ class HybridParallelInferenceHelper: elif core.is_compiled_with_cuda(): self._device = "gpu" assert self._device, "Only gpu and npu are supported." - assert not _non_static_mode(), "Only static mode is supported." + assert not in_dygraph_mode(), "Only static mode is supported." op_maker = core.op_proto_and_checker_maker self._op_role = op_maker.OpRole diff --git a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py index 2b0653ea35dc64b81bf05511f029fef40c50ac88..29f3ee0719d412ebccca6acbbc15a888449f5ecc 100644 --- a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py +++ b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py @@ -18,7 +18,6 @@ from paddle import framework # (TODO: GhostScreaming) It will be removed later. from paddle.fluid import core from paddle.framework import ( - _in_legacy_dygraph, _split_tensors, build_groups, in_dygraph_mode, @@ -215,39 +214,12 @@ def sharding_reduce_gradients(parameter_list, hcg): sharding_nrank = hcg.get_sharding_parallel_group().nranks for param in parameter_list: if param.trainable and (param._grad_ivar() is not None): - if in_dygraph_mode(): - param.grad.scale_(1.0 / sharding_nrank) - paddle.distributed.all_reduce( - param.grad, - group=hcg.get_sharding_parallel_group(), - sync_op=True, - ) - - elif _in_legacy_dygraph(): - g_var = param._grad_ivar() - # need use trace_op to allreduce - # paddle.distributed.all_reduce( - # g_var, group=hcg.get_sharding_parallel_group(), use_calc_stream=True) - paddle.fluid.framework._dygraph_tracer().trace_op( - type="c_allreduce_sum", - inputs={'X': g_var}, - outputs={'Out': g_var}, - attrs={ - 'ring_id': hcg.get_sharding_parallel_group().id, - 'use_calc_stream': True, - }, - ) - - # grad / sharding_rank - div_factor = paddle.to_tensor( - sharding_nrank, dtype=g_var.dtype - ) - paddle.fluid.framework._dygraph_tracer().trace_op( - type="elementwise_div", - inputs={'X': g_var, 'Y': div_factor}, - outputs={'Out': g_var}, - attrs={'axis': -1}, - ) + param.grad.scale_(1.0 / sharding_nrank) + paddle.distributed.all_reduce( + param.grad, + group=hcg.get_sharding_parallel_group(), + sync_op=True, + ) def broadcast_sharding_parameters(model, hcg): diff --git a/python/paddle/distributed/models/moe/utils.py b/python/paddle/distributed/models/moe/utils.py index a021c741c6383626dd510153946e0f62e2c1d78a..383520f19db338c07cc2520ed2033f2220e5274c 100644 --- a/python/paddle/distributed/models/moe/utils.py +++ b/python/paddle/distributed/models/moe/utils.py @@ -13,9 +13,8 @@ # limitations under the License. from paddle import _legacy_C_ops -from paddle.fluid import core from paddle.fluid.data_feeder import check_variable_and_dtype -from paddle.fluid.framework import _in_legacy_dygraph, in_dygraph_mode +from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.layer_helper import LayerHelper @@ -43,8 +42,6 @@ def _number_count(numbers, upper_range): """ if in_dygraph_mode(): return _legacy_C_ops.number_count(numbers, 'upper_range', upper_range) - elif _in_legacy_dygraph(): - return core.ops.number_count(numbers, 'upper_range', upper_range) else: op_type = 'number_count' @@ -92,8 +89,6 @@ def _assign_pos(x, cum_count): """ if in_dygraph_mode(): return _legacy_C_ops.assign_pos(x, cum_count, cum_count[-1]) - elif _in_legacy_dygraph(): - return core.ops.assign_pos(x, cum_count, cum_count[-1]) else: op_type = 'assign_pos' @@ -129,8 +124,6 @@ def _random_routing(topk_idx, topk_value, prob, topk=2): if topk == 2: if in_dygraph_mode(): return _legacy_C_ops.random_routing(prob, topk_value, topk_idx) - elif _in_legacy_dygraph(): - return core.ops.random_routing(prob, topk_value, topk_idx) else: raise RuntimeError("Not supporting static mode now") else: @@ -162,10 +155,6 @@ def _limit_by_capacity(expert_count, capacity, n_worker): return _legacy_C_ops.limit_by_capacity( expert_count, capacity, 'n_worker', n_worker ) - elif _in_legacy_dygraph(): - return core.ops.limit_by_capacity( - expert_count, capacity, 'n_worker', n_worker - ) else: op_type = 'limit_by_capacity' @@ -211,32 +200,29 @@ def _prune_gate_by_capacity(gate_idx, expert_count, n_expert, n_worker): return _legacy_C_ops.prune_gate_by_capacity( gate_idx, expert_count, "n_expert", n_expert, "n_worker", n_worker ) - elif _in_legacy_dygraph(): - return core.ops.prune_gate_by_capacity( - gate_idx, expert_count, "n_expert", n_expert, "n_worker", n_worker + else: + check_variable_and_dtype( + gate_idx, + 'GateIdx', + ['int32', 'int64'], + 'paddle.distributed.utils.prune_gate_by_capacity', + ) + check_variable_and_dtype( + expert_count, + 'ExpertCount', + ['int32', 'int64'], + 'paddle.distributed.utils.prune_gate_by_capacity', ) - check_variable_and_dtype( - gate_idx, - 'GateIdx', - ['int32', 'int64'], - 'paddle.distributed.utils.prune_gate_by_capacity', - ) - check_variable_and_dtype( - expert_count, - 'ExpertCount', - ['int32', 'int64'], - 'paddle.distributed.utils.prune_gate_by_capacity', - ) - - helper = LayerHelper('prune_gate_by_capacity', **locals()) - new_gate_idx = helper.create_variable_for_type_inference( - dtype=gate_idx.dtype - ) - helper.append_op( - type='prune_gate_by_capacity', - inputs={'GateIdx': gate_idx, "ExpertCount": expert_count}, - outputs={'NewGateIdx': new_gate_idx}, - attrs={"n_expert": n_expert, "n_worker": n_worker}, - ) - - return new_gate_idx + + helper = LayerHelper('prune_gate_by_capacity', **locals()) + new_gate_idx = helper.create_variable_for_type_inference( + dtype=gate_idx.dtype + ) + helper.append_op( + type='prune_gate_by_capacity', + inputs={'GateIdx': gate_idx, "ExpertCount": expert_count}, + outputs={'NewGateIdx': new_gate_idx}, + attrs={"n_expert": n_expert, "n_worker": n_worker}, + ) + + return new_gate_idx diff --git a/python/paddle/distributed/utils/moe_utils.py b/python/paddle/distributed/utils/moe_utils.py index 298ae9fc916e83236bf88ba86948c6507ac43c68..6266537a409feb9255dabd91ed0101bee0e3e1f1 100644 --- a/python/paddle/distributed/utils/moe_utils.py +++ b/python/paddle/distributed/utils/moe_utils.py @@ -14,7 +14,7 @@ from paddle import _legacy_C_ops from paddle.fluid.data_feeder import check_variable_and_dtype -from paddle.fluid.framework import _non_static_mode +from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.layer_helper import LayerHelper @@ -103,7 +103,7 @@ def global_scatter( return ring_id = 0 if group is None else group.id - if _non_static_mode(): + if in_dygraph_mode(): return _legacy_C_ops.global_scatter( x, local_count, @@ -220,7 +220,7 @@ def global_gather( return ring_id = 0 if group is None else group.id - if _non_static_mode(): + if in_dygraph_mode(): return _legacy_C_ops.global_gather( x, local_count, diff --git a/python/paddle/distribution/dirichlet.py b/python/paddle/distribution/dirichlet.py index bb89d323c24f2a41c05d3483a4f7d33196940678..0c6290df57b132dbff6c08239828b1a4b5cd32c1 100644 --- a/python/paddle/distribution/dirichlet.py +++ b/python/paddle/distribution/dirichlet.py @@ -15,7 +15,7 @@ import paddle from paddle.distribution import exponential_family from paddle.fluid.data_feeder import check_variable_and_dtype -from paddle.fluid.framework import _in_legacy_dygraph, in_dygraph_mode +from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.layer_helper import LayerHelper @@ -166,8 +166,6 @@ def _dirichlet(concentration, name=None): if in_dygraph_mode(): return paddle._C_ops.dirichlet(concentration) - elif _in_legacy_dygraph(): - return paddle._legacy_C_ops.dirichlet(concentration) else: helper = LayerHelper(op_type, **locals()) out = helper.create_variable_for_type_inference( diff --git a/python/paddle/distribution/distribution.py b/python/paddle/distribution/distribution.py index ae4cb2f9b16ef69d4babbffab2058026d3a62f4e..c879e7a3bbffadf42acf783df5f1ad38cf9f80b7 100644 --- a/python/paddle/distribution/distribution.py +++ b/python/paddle/distribution/distribution.py @@ -24,13 +24,9 @@ import warnings import numpy as np import paddle -from paddle import _C_ops, _legacy_C_ops +from paddle import _C_ops from paddle.fluid.data_feeder import check_variable_and_dtype, convert_dtype -from paddle.fluid.framework import ( - _in_legacy_dygraph, - _non_static_mode, - in_dygraph_mode, -) +from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.layers import tensor @@ -221,7 +217,7 @@ class Distribution: Returns: value (Tensor): Change value's dtype if value's dtype is different from param. """ - if _non_static_mode(): + if in_dygraph_mode(): if value.dtype != param.dtype and convert_dtype(value.dtype) in [ 'float32', 'float64', @@ -229,12 +225,7 @@ class Distribution: warnings.warn( "dtype of input 'value' needs to be the same as parameters of distribution class. dtype of 'value' will be converted." ) - if in_dygraph_mode(): - return _C_ops.cast(value, param.dtype) - if _in_legacy_dygraph(): - return _legacy_C_ops.cast( - value, 'in_dtype', value.dtype, 'out_dtype', param.dtype - ) + return _C_ops.cast(value, param.dtype) return value check_variable_and_dtype( diff --git a/python/paddle/distribution/uniform.py b/python/paddle/distribution/uniform.py index 706ff73ee83ff5a887dfe66dd642cce07c060e1d..d830e1ac65ca29feb1233d21a117fca4c82725cb 100644 --- a/python/paddle/distribution/uniform.py +++ b/python/paddle/distribution/uniform.py @@ -15,14 +15,10 @@ import numpy as np import paddle -from paddle import _C_ops, _legacy_C_ops +from paddle import _C_ops from paddle.distribution import distribution from paddle.fluid.data_feeder import check_type, convert_dtype -from paddle.fluid.framework import ( - _in_legacy_dygraph, - _non_static_mode, - in_dygraph_mode, -) +from paddle.fluid.framework import _non_static_mode, in_dygraph_mode from paddle.fluid.layers import tensor from paddle.tensor import random @@ -210,33 +206,23 @@ class Uniform(distribution.Distribution): """ value = self._check_values_dtype_in_probs(self.low, value) - if _non_static_mode(): + if in_dygraph_mode(): # ensure value in [low, high] lb_bool = self.low < value ub_bool = value < self.high - if in_dygraph_mode(): - lb = _C_ops.cast(lb_bool, value.dtype) - ub = _C_ops.cast(ub_bool, value.dtype) - return paddle.log(lb * ub) - paddle.log(self.high - self.low) - - if _in_legacy_dygraph(): - lb = _legacy_C_ops.cast( - lb_bool, 'in_dtype', lb_bool.dtype, 'out_dtype', value.dtype - ) - ub = _legacy_C_ops.cast( - ub_bool, 'in_dtype', ub_bool.dtype, 'out_dtype', value.dtype - ) - return paddle.log(lb * ub) - paddle.log(self.high - self.low) - - name = self.name + '_log_prob' - lb_bool = self.low < value - ub_bool = value < self.high - lb = tensor.cast(lb_bool, dtype=value.dtype) - ub = tensor.cast(ub_bool, dtype=value.dtype) - return paddle.subtract( - paddle.log(lb * ub), paddle.log(self.high - self.low), name=name - ) + lb = _C_ops.cast(lb_bool, value.dtype) + ub = _C_ops.cast(ub_bool, value.dtype) + return paddle.log(lb * ub) - paddle.log(self.high - self.low) + else: + name = self.name + '_log_prob' + lb_bool = self.low < value + ub_bool = value < self.high + lb = tensor.cast(lb_bool, dtype=value.dtype) + ub = tensor.cast(ub_bool, dtype=value.dtype) + return paddle.subtract( + paddle.log(lb * ub), paddle.log(self.high - self.low), name=name + ) def probs(self, value): """Probability density/mass function. @@ -249,30 +235,19 @@ class Uniform(distribution.Distribution): """ value = self._check_values_dtype_in_probs(self.low, value) - if _non_static_mode(): + if in_dygraph_mode(): lb_bool = self.low < value ub_bool = value < self.high - - if in_dygraph_mode(): - lb = _C_ops.cast(lb_bool, value.dtype) - ub = _C_ops.cast(ub_bool, value.dtype) - return (lb * ub) / (self.high - self.low) - - if _in_legacy_dygraph(): - lb = _legacy_C_ops.cast( - lb_bool, 'in_dtype', lb_bool.dtype, 'out_dtype', value.dtype - ) - ub = _legacy_C_ops.cast( - ub_bool, 'in_dtype', ub_bool.dtype, 'out_dtype', value.dtype - ) - return (lb * ub) / (self.high - self.low) - - name = self.name + '_probs' - lb_bool = self.low < value - ub_bool = value < self.high - lb = tensor.cast(lb_bool, dtype=value.dtype) - ub = tensor.cast(ub_bool, dtype=value.dtype) - return paddle.divide((lb * ub), (self.high - self.low), name=name) + lb = _C_ops.cast(lb_bool, value.dtype) + ub = _C_ops.cast(ub_bool, value.dtype) + return (lb * ub) / (self.high - self.low) + else: + name = self.name + '_probs' + lb_bool = self.low < value + ub_bool = value < self.high + lb = tensor.cast(lb_bool, dtype=value.dtype) + ub = tensor.cast(ub_bool, dtype=value.dtype) + return paddle.divide((lb * ub), (self.high - self.low), name=name) def entropy(self): r"""Shannon entropy in nats.