未验证 提交 7d46d9f9 编写于 作者: W Wen Sun 提交者: GitHub

refactor: rm fluid deps in fleet (#49724)

上级 b53888e7
......@@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.fluid.dygraph.layers import Layer
import paddle.nn as nn
__all__ = []
class MetaParallelBase(Layer):
class MetaParallelBase(nn.Layer):
def __init__(self, layers, hcg, strategy):
super().__init__(layers.full_name() + "_meta_parallel_base")
self._layers = layers
......
......@@ -45,8 +45,8 @@ import re
from functools import partial
import paddle
from paddle.fluid.dygraph.layers import Layer
from paddle.fluid.framework import in_dygraph_mode
import paddle.framework as framework
import paddle.nn as nn
from paddle.incubate.distributed.fleet import recompute_hybrid
from ...utils.log_util import layer_to_str, logger
......@@ -60,7 +60,7 @@ class LayerDesc:
self.inputs = inputs
self.kwargs = kwargs
if not issubclass(layer_func, Layer):
if not issubclass(layer_func, nn.Layer):
raise TypeError(
"The input(layer_func) should be a derived class of Layer."
)
......@@ -151,7 +151,7 @@ class SegmentLayers:
regex = re.compile(layername, re.IGNORECASE)
for idx, layer in enumerate(self._layers_desc):
name = None
if isinstance(layer, Layer):
if isinstance(layer, nn.Layer):
name = layer.__class__.__name__
elif isinstance(layer, LayerDesc):
name = layer.layer_func.__name__
......@@ -180,7 +180,7 @@ class SegmentLayers:
return result
class PipelineLayerChunk(Layer):
class PipelineLayerChunk(nn.Layer):
def __init__(self):
super().__init__()
self.run_function = []
......@@ -189,7 +189,7 @@ class PipelineLayerChunk(Layer):
# This method is used to unify codes in _build_layer_impl.
# For 1f1b scheduler, it will call append method of a List.
# For interleave scheduler, it will call append method of this class.
if isinstance(sublayer, Layer):
if isinstance(sublayer, nn.Layer):
self.add_sublayer(str(len(self.run_function)), sublayer)
self.run_function.append(sublayer)
......@@ -206,7 +206,7 @@ class PipelineLayerChunk(Layer):
)
class PipelineLayer(Layer):
class PipelineLayer(nn.Layer):
"""PipelineLayer
Args:
layers(Iterable): A sequence of layers description to define the structure for pipeline.
......@@ -220,9 +220,8 @@ class PipelineLayer(Layer):
Examples:
.. code-block:: python
import paddle.nn as nn
from paddle.distributed import fleet
from paddle.fluid.dygraph.layers import Layer
import paddle.nn.functional as F
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer
pipeline_parallel_size = 2
......@@ -241,7 +240,7 @@ class PipelineLayer(Layer):
hcg = fleet.get_hybrid_communicate_group()
class ReshapeHelp(Layer):
class ReshapeHelp(nn.Layer):
def __init__(self, shape):
super().__init__()
self.shape = shape
......@@ -500,14 +499,14 @@ class PipelineLayer(Layer):
for key, comm in self.shared_comm.items():
param = getattr(self.shared_layers[key], comm['weight_attr'])
# need use trace_op to allreduce weight
if in_dygraph_mode():
if framework.in_dygraph_mode():
with paddle.framework.no_grad():
paddle.distributed.all_reduce(
param.grad, group=comm['group']
)
else:
with paddle.framework.no_grad():
paddle.fluid.framework._dygraph_tracer().trace_op(
framework._dygraph_tracer().trace_op(
type="c_allreduce_sum",
inputs={'X': param._grad_ivar()},
outputs={'Out': param._grad_ivar()},
......@@ -627,7 +626,7 @@ class PipelineLayer(Layer):
for index, layer in enumerate(self._layers_desc[start:end]):
layer_index = start + index
if isinstance(layer, Layer):
if isinstance(layer, nn.Layer):
run_function.append(layer)
if self._num_virtual_pipeline_stages == 1:
# Only add sublayer for 1f1b scheduler,
......@@ -729,7 +728,7 @@ class PipelineLayer(Layer):
):
return False
params = [f.parameters() for f in funcs if isinstance(f, Layer)]
params = [f.parameters() for f in funcs if isinstance(f, nn.Layer)]
return any(len(list(p)) > 0 for p in params)
def save_state_dict(self, path):
......
......@@ -12,9 +12,7 @@
# See the License for the specific language governing permissions and
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.framework as framework
import paddle.framework as framework
from ..meta_optimizers.dygraph_optimizer import HybridParallelOptimizer
from ..utils.hybrid_parallel_util import (
......@@ -208,7 +206,7 @@ class PipelineParallel(MetaParallelBase):
), 'optimizer should be HybridParallelOptimizer subclass.'
assert (
fluid.framework._dygraph_tracer()._has_grad
framework._dygraph_tracer()._has_grad
), 'Please enable the generation of gradients.'
if self.is_pipeline_first_stage(
......@@ -308,7 +306,7 @@ class PipelineParallel(MetaParallelBase):
labels = self._load_micro_batch(self.micro_batch_id)
output_tensor = self._layers._loss_fn(output_tensor, labels)
assert isinstance(
output_tensor, (paddle.Tensor, core.eager.Tensor)
output_tensor, (paddle.Tensor, framework.core.eager.Tensor)
), "Currently, loss_fn should obtain Paddle.Tensor dtype"
with paddle.amp.auto_cast(enable=False):
......
......@@ -15,8 +15,7 @@
import numpy as np
import paddle
import paddle.fluid.core as core
from paddle.fluid.framework import in_dygraph_mode
import paddle.framework as framework
from ...utils.log_util import logger
from .utils import number_2_dtype, paddle_2_number
......@@ -138,7 +137,7 @@ class SendRecvMeta:
def send_meta(self, tensor, group):
dst_rank = _hcg._get_p2p_next_rank()
if isinstance(tensor, (paddle.Tensor, core.eager.Tensor)):
if isinstance(tensor, (paddle.Tensor, framework.core.eager.Tensor)):
tensor_type = paddle.to_tensor([0])
# send tensor type
paddle.distributed.send(tensor_type, dst=dst_rank, group=group)
......@@ -153,11 +152,13 @@ class SendRecvMeta:
paddle.distributed.send(nums, dst=dst_rank, group=group)
for d in tensor:
assert isinstance(d, (paddle.Tensor, core.eager.Tensor))
assert isinstance(
d, (paddle.Tensor, framework.core.eager.Tensor)
)
self._send_dims_shape_dtype(d, group=group)
def set_send_message(self, tensor):
if isinstance(tensor, (paddle.Tensor, core.eager.Tensor)):
if isinstance(tensor, (paddle.Tensor, framework.core.eager.Tensor)):
self.send_shape_message = tensor.shape
self.send_dtype_message = paddle_2_number(tensor.dtype)
elif isinstance(tensor, tuple):
......@@ -188,7 +189,7 @@ def _partial_send_op(
tensor, group, use_calc_stream, ring_id, dst, nranks, rank_id
):
dst_rank_in_group = dst if group is None else group.get_group_rank(dst)
if in_dygraph_mode():
if framework.in_dygraph_mode():
group = (
paddle.distributed.collective._get_default_group()
if group is None
......@@ -259,7 +260,7 @@ def recv_partial(
else:
if use_calc_stream:
recv_op = paddle.distributed.recv
elif in_dygraph_mode():
elif framework.in_dygraph_mode():
recv_op = paddle.distributed.irecv
return recv_op(tensor.detach(), src=src_rank, group=group)
......@@ -480,7 +481,7 @@ def _p2p_helper(
tasks.append(task)
if not sync_recv:
if in_dygraph_mode():
if framework.in_dygraph_mode():
# wait irecv tasks in eager dygraph mode with new comm library
for task in tasks:
assert task is not None
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册