From e0b50269ea2598d4478c830e4ff5dad5129effd2 Mon Sep 17 00:00:00 2001 From: Wen Sun <35923278+HermitSun@users.noreply.github.com> Date: Wed, 11 Jan 2023 17:57:33 +0800 Subject: [PATCH] refactor: rm fluid deps in distributed communication (#49722) --- .../paddle/distributed/communication/all_gather.py | 2 +- .../distributed/communication/batch_isend_irecv.py | 7 +++---- python/paddle/distributed/communication/broadcast.py | 2 +- python/paddle/distributed/communication/group.py | 12 +++++------- python/paddle/distributed/communication/reduce.py | 11 +++++------ python/paddle/distributed/communication/scatter.py | 2 +- .../distributed/communication/stream/all_gather.py | 5 ++--- .../distributed/communication/stream/all_reduce.py | 5 ++--- .../distributed/communication/stream/all_to_all.py | 5 ++--- .../distributed/communication/stream/broadcast.py | 5 ++--- .../paddle/distributed/communication/stream/recv.py | 5 ++--- .../distributed/communication/stream/reduce.py | 5 ++--- .../communication/stream/reduce_scatter.py | 2 +- .../distributed/communication/stream/scatter.py | 5 ++--- .../paddle/distributed/communication/stream/send.py | 5 ++--- 15 files changed, 33 insertions(+), 45 deletions(-) diff --git a/python/paddle/distributed/communication/all_gather.py b/python/paddle/distributed/communication/all_gather.py index 18f4bbab7c..791586104d 100644 --- a/python/paddle/distributed/communication/all_gather.py +++ b/python/paddle/distributed/communication/all_gather.py @@ -16,7 +16,7 @@ import numpy as np import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework +import paddle.framework as framework from .serialization_utils import ( convert_object_to_tensor, diff --git a/python/paddle/distributed/communication/batch_isend_irecv.py b/python/paddle/distributed/communication/batch_isend_irecv.py index a85fdcbacb..1dab6f533b 100644 --- a/python/paddle/distributed/communication/batch_isend_irecv.py +++ b/python/paddle/distributed/communication/batch_isend_irecv.py @@ -15,8 +15,7 @@ import contextlib import paddle.distributed as dist -import paddle.fluid.core as core -import paddle.fluid.framework as framework +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _warn_cur_rank_not_in_group, @@ -79,12 +78,12 @@ class P2POp: @contextlib.contextmanager def _with_batch_p2p_guard(backend): if backend == "NCCL": - core.ProcessGroupNCCL.group_start() + framework.core.ProcessGroupNCCL.group_start() try: yield finally: if backend == "NCCL": - core.ProcessGroupNCCL.group_end() + framework.core.ProcessGroupNCCL.group_end() def _check_p2p_op_list(p2p_op_list): diff --git a/python/paddle/distributed/communication/broadcast.py b/python/paddle/distributed/communication/broadcast.py index fd6c2219c8..2e5dde826b 100644 --- a/python/paddle/distributed/communication/broadcast.py +++ b/python/paddle/distributed/communication/broadcast.py @@ -15,7 +15,7 @@ import paddle import paddle.distributed as dist import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework +import paddle.framework as framework from .serialization_utils import ( convert_object_to_tensor, diff --git a/python/paddle/distributed/communication/group.py b/python/paddle/distributed/communication/group.py index f0236a2bdb..70f6c93654 100644 --- a/python/paddle/distributed/communication/group.py +++ b/python/paddle/distributed/communication/group.py @@ -16,9 +16,7 @@ import warnings import paddle import paddle.distributed as dist -import paddle.fluid.core as core -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework class Group: @@ -239,7 +237,7 @@ def _sync_calc_stream(tensor): return paddle._legacy_C_ops.c_sync_calc_stream(tensor, tensor) else: op_type = 'c_sync_calc_stream' - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [tensor]}, @@ -254,7 +252,7 @@ def _sync_comm_stream(tensor, ring_id=0): ) else: op_type = 'c_sync_comm_stream' - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [tensor]}, @@ -325,7 +323,7 @@ def barrier(group=None): if framework.in_dygraph_mode(): group = _get_global_group() if group is None else group place = framework._current_expected_place() - if isinstance(place, core.CPUPlace): + if isinstance(place, framework.CPUPlace): task = group.process_group.barrier() else: device_id = place.get_device_id() @@ -344,7 +342,7 @@ def barrier(group=None): op_type = 'barrier' if not isinstance(ring_id, int): raise ValueError("The type of 'group' for barrier must be int.") - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [barrier_tensor]}, diff --git a/python/paddle/distributed/communication/reduce.py b/python/paddle/distributed/communication/reduce.py index 973e003e06..9d4f9548d3 100644 --- a/python/paddle/distributed/communication/reduce.py +++ b/python/paddle/distributed/communication/reduce.py @@ -14,8 +14,7 @@ import paddle import paddle.distributed.communication.stream as stream -import paddle.fluid.core as core -import paddle.fluid.framework as framework +import paddle.framework as framework class ReduceOp: @@ -59,13 +58,13 @@ class ReduceOp: def _get_reduce_op(reduce_op, func_name): if framework.in_dygraph_mode(): if reduce_op == ReduceOp.SUM: - return core.ReduceOp.SUM + return framework.core.ReduceOp.SUM elif reduce_op == ReduceOp.MAX: - return core.ReduceOp.MAX + return framework.core.ReduceOp.MAX elif reduce_op == ReduceOp.MIN: - return core.ReduceOp.MIN + return framework.core.ReduceOp.MIN elif reduce_op == ReduceOp.PROD: - return core.ReduceOp.PRODUCT + return framework.core.ReduceOp.PRODUCT else: if reduce_op == ReduceOp.SUM: return 'c_{}_sum'.format(func_name) diff --git a/python/paddle/distributed/communication/scatter.py b/python/paddle/distributed/communication/scatter.py index ee5886c414..455bb5d1cf 100644 --- a/python/paddle/distributed/communication/scatter.py +++ b/python/paddle/distributed/communication/scatter.py @@ -17,7 +17,7 @@ import numpy as np import paddle import paddle.distributed as dist import paddle.distributed.communication.stream as stream -import paddle.fluid.framework as framework +import paddle.framework as framework from .serialization_utils import ( convert_object_to_tensor, diff --git a/python/paddle/distributed/communication/stream/all_gather.py b/python/paddle/distributed/communication/stream/all_gather.py index 779a3c8f64..f5d21a35da 100644 --- a/python/paddle/distributed/communication/stream/all_gather.py +++ b/python/paddle/distributed/communication/stream/all_gather.py @@ -15,8 +15,7 @@ import paddle import paddle.distributed as dist import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import _get_global_group @@ -62,7 +61,7 @@ def _all_gather_in_dygraph( def _all_gather_in_static_mode(tensor_list, tensor, group, sync_op): op_type = 'c_allgather' - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) out = helper.create_variable_for_type_inference(dtype=tensor.dtype) for elem in tensor_list: data_feeder.check_variable_and_dtype( diff --git a/python/paddle/distributed/communication/stream/all_reduce.py b/python/paddle/distributed/communication/stream/all_reduce.py index 412085b1b1..786e4284c7 100644 --- a/python/paddle/distributed/communication/stream/all_reduce.py +++ b/python/paddle/distributed/communication/stream/all_reduce.py @@ -13,8 +13,7 @@ # limitations under the License. import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _warn_cur_rank_not_in_group, @@ -60,7 +59,7 @@ def _all_reduce_in_static_mode(tensor, op, group, sync_op, use_calc_stream): # TODO: Support task and use task.wait in static graph mode # Use use_calc_stream rather than sync_op - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [tensor]}, diff --git a/python/paddle/distributed/communication/stream/all_to_all.py b/python/paddle/distributed/communication/stream/all_to_all.py index d64ccb742e..4a804cf4ab 100644 --- a/python/paddle/distributed/communication/stream/all_to_all.py +++ b/python/paddle/distributed/communication/stream/all_to_all.py @@ -15,8 +15,7 @@ import paddle import paddle.distributed as dist import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _warn_cur_rank_not_in_group, @@ -73,7 +72,7 @@ def _all_to_all_in_static_mode( op_type = 'alltoall' ring_id = 0 if group is None else group.id nranks = dist.get_world_size() - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) in_tensor = in_tensor_or_tensor_list if isinstance(in_tensor_or_tensor_list, list): diff --git a/python/paddle/distributed/communication/stream/broadcast.py b/python/paddle/distributed/communication/stream/broadcast.py index cb6fbc75d1..c7f6fc3203 100644 --- a/python/paddle/distributed/communication/stream/broadcast.py +++ b/python/paddle/distributed/communication/stream/broadcast.py @@ -13,8 +13,7 @@ # limitations under the License. import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _get_or_throw_group_rank, @@ -57,7 +56,7 @@ def _broadcast_in_static_mode( ) op_type = 'c_broadcast' - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) ring_id = 0 if group is None else group.id helper.append_op( diff --git a/python/paddle/distributed/communication/stream/recv.py b/python/paddle/distributed/communication/stream/recv.py index fcd007e6d3..7b623d6720 100644 --- a/python/paddle/distributed/communication/stream/recv.py +++ b/python/paddle/distributed/communication/stream/recv.py @@ -13,8 +13,7 @@ # limitations under the License. import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _get_or_throw_group_rank, @@ -48,7 +47,7 @@ def _recv_in_static_mode( 'recv', ) ring_id = 0 if group is None else group.id - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, outputs={'Out': [tensor]}, diff --git a/python/paddle/distributed/communication/stream/reduce.py b/python/paddle/distributed/communication/stream/reduce.py index 8bd81bd586..e81ff4de9c 100644 --- a/python/paddle/distributed/communication/stream/reduce.py +++ b/python/paddle/distributed/communication/stream/reduce.py @@ -13,8 +13,7 @@ # limitations under the License. import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _get_or_throw_group_rank, @@ -63,7 +62,7 @@ def _reduce_in_static_mode( op_type = _get_reduce_op(op, "reduce") ring_id = 0 if group is None else group.id - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [tensor]}, diff --git a/python/paddle/distributed/communication/stream/reduce_scatter.py b/python/paddle/distributed/communication/stream/reduce_scatter.py index 3442365863..b0776246d0 100644 --- a/python/paddle/distributed/communication/stream/reduce_scatter.py +++ b/python/paddle/distributed/communication/stream/reduce_scatter.py @@ -13,7 +13,7 @@ # limitations under the License. import paddle -import paddle.fluid.framework as framework +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _warn_cur_rank_not_in_group, diff --git a/python/paddle/distributed/communication/stream/scatter.py b/python/paddle/distributed/communication/stream/scatter.py index 6f332fbbd6..db2010e411 100644 --- a/python/paddle/distributed/communication/stream/scatter.py +++ b/python/paddle/distributed/communication/stream/scatter.py @@ -17,8 +17,7 @@ import warnings import paddle import paddle.distributed as dist import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _get_or_throw_group_rank, @@ -113,7 +112,7 @@ def _scatter_in_static_mode( ) op_type = 'c_scatter' - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [input_tensor]}, diff --git a/python/paddle/distributed/communication/stream/send.py b/python/paddle/distributed/communication/stream/send.py index e18a9a5738..30a965e821 100644 --- a/python/paddle/distributed/communication/stream/send.py +++ b/python/paddle/distributed/communication/stream/send.py @@ -13,8 +13,7 @@ # limitations under the License. import paddle.fluid.data_feeder as data_feeder -import paddle.fluid.framework as framework -import paddle.fluid.layer_helper as layer_helper +import paddle.framework as framework from paddle.distributed.communication.group import ( _get_global_group, _get_or_throw_group_rank, @@ -49,7 +48,7 @@ def _send_in_static_mode( ) ring_id = 0 if group is None else group.id - helper = layer_helper.LayerHelper(op_type, **locals()) + helper = framework.LayerHelper(op_type, **locals()) helper.append_op( type=op_type, inputs={'X': [tensor]}, -- GitLab