From 81eaa97de21a1e2da2bc1f6043951a80af960e1b Mon Sep 17 00:00:00 2001 From: Yuang Liu Date: Fri, 26 Aug 2022 21:06:51 +0800 Subject: [PATCH] [dygraph hybrid pp for interleave] Virtual pipeline layer forward function (#45444) --- .../parallel_layers/pp_layers.py | 35 +++++++++-- ...id_parallel_pp_layer_with_virtual_stage.py | 63 +++++++++++-------- 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 7bc7f69be5a..371a8b3e041 100755 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -172,17 +172,26 @@ class PipelineLayerChunk(Layer): def __init__(self): super(PipelineLayerChunk, self).__init__() - self.functions = [] + self.run_function = [] def append(self, sublayer): # This method is used to unify codes in _build_layer_impl. # For 1f1b scheduler, it will call append method of a List. # For interleave scheduler, it will call append method of this class. if isinstance(sublayer, Layer): - self.add_sublayer(str(len(self.functions)), sublayer) - self.functions.append(sublayer) + self.add_sublayer(str(len(self.run_function)), sublayer) + self.run_function.append(sublayer) - # TODO (Yuang Liu) forward function implement + def get_run_function(self): + return self.run_function + + def forward(self, *args, **kwargs): + # Users shouldn't call PipelineLayerChunk directly, since all logics relating with recompute + # are in the forward function of PipelineLayer. Any directly call will bring unexpected + # behavior under recompute circumstance. + raise NotImplementedError( + "The forward function of PipelineLayerChunk cannot be called directly. " + "Please call forward function of PipelineLayer.") class PipelineLayer(Layer): @@ -520,8 +529,22 @@ class PipelineLayer(Layer): return execute_func - def forward(self, input): - # TODO(Yuang Liu): forward function for interleave scheduler + def forward(self, input, chunk_id=None): + if chunk_id is not None: + assert isinstance(chunk_id, int), "chunk_id should be an int" + assert self._num_virtual_pipeline_stages > 1, \ + "chunk_id is only valid when using virtual pipeline stage" + assert chunk_id < len(self._model_chunks), \ + "The virtual pipeline only has {} chunks, " \ + "but received chunk_id {}.".format(len(self._model_chunks), chunk_id) + # Get the target model chunk. + model_chunk = self._model_chunks[chunk_id] + # Update the self.run_function to the target run functions. + # Runs for 1f1b and interleave are similar, just handle all functions in self.run_function. + # The only different is that, for 1f1b, self.run_function has already been inited during build_layer. + # But for interleave, self.run_function will keep updating to the target functions at every run. + self.run_function = model_chunk.get_run_function() + if self._recompute_interval == 0: input = self.forward_function(0, len(self.run_function))(input) else: diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_layer_with_virtual_stage.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_layer_with_virtual_stage.py index 86d699bb4a4..0ff14ad5f54 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_layer_with_virtual_stage.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_layer_with_virtual_stage.py @@ -33,33 +33,24 @@ class ReshapeHelp(Layer): return x.reshape(shape=self.shape) -class FakeAlexNetPipeDesc(PipelineLayer): +class MLPForVirtualStageLayerTest(PipelineLayer): def __init__(self, num_classes=10, **kwargs): self.num_classes = num_classes decs = [ - LayerDesc(nn.Conv2D, 1, 64, kernel_size=11, stride=4, padding=5), - LayerDesc(nn.Conv2D, 64, 64, kernel_size=11, stride=4, padding=5), - LayerDesc(nn.ReLU), - LayerDesc(nn.MaxPool2D, kernel_size=2, stride=2), - LayerDesc(nn.Conv2D, 64, 192, kernel_size=5, padding=2), - LayerDesc(nn.Conv2D, 192, 192, kernel_size=5, padding=2), - F.relu, - LayerDesc(nn.MaxPool2D, kernel_size=2, stride=2), - LayerDesc(nn.Conv2D, 192, 384, kernel_size=3, padding=1), - F.relu, - LayerDesc(nn.Conv2D, 384, 256, kernel_size=3, padding=1), - F.relu, - LayerDesc(nn.Conv2D, 256, 256, kernel_size=3, padding=1), - LayerDesc(nn.Conv2D, 256, 256, kernel_size=3, padding=1), - F.relu, - LayerDesc(nn.MaxPool2D, kernel_size=2, stride=2), - LayerDesc(ReshapeHelp, shape=[-1, 256]), - LayerDesc(nn.Linear, 256, self.num_classes), # classifier + LayerDesc(nn.Linear, 2, self.num_classes), + LayerDesc(nn.Linear, self.num_classes, 2), + LayerDesc(nn.Linear, 2, self.num_classes), + LayerDesc(nn.Linear, self.num_classes, 2), + LayerDesc(nn.Linear, 2, self.num_classes), + LayerDesc(nn.Linear, self.num_classes, 2), + LayerDesc(nn.Linear, 2, self.num_classes), + LayerDesc(nn.Linear, self.num_classes, 2), ] - super(FakeAlexNetPipeDesc, self).__init__(layers=decs, - loss_fn=nn.CrossEntropyLoss(), - **kwargs) + super(MLPForVirtualStageLayerTest, + self).__init__(layers=decs, + loss_fn=nn.CrossEntropyLoss(), + **kwargs) class TestPipeLayerAPI(unittest.TestCase): @@ -73,16 +64,38 @@ class TestPipeLayerAPI(unittest.TestCase): "pp_degree": self.pipeline_parallel_size } fleet.init(is_collective=True, strategy=strategy) + self.rank = fleet.worker_index() self.hcg = fleet.get_hybrid_communicate_group() def test_pipelayer_desc(self): - pipe_model = FakeAlexNetPipeDesc(seg_method="layer:Conv2D", - num_stages=self.pipeline_parallel_size, - num_virtual_pipeline_stages=2) + pipe_model = MLPForVirtualStageLayerTest( + seg_method="layer:Linear", + num_stages=self.pipeline_parallel_size, + num_virtual_pipeline_stages=2, + recompute_interval=1) assert len(pipe_model.parameters()) > 0 model_chunks = pipe_model.get_model_chunks() assert model_chunks is not None assert len(model_chunks) == 2 + + optimizer = paddle.optimizer.SGD(parameters=pipe_model.parameters()) + + try: + model_chunks[0](paddle.to_tensor([1., 2.])) + except NotImplementedError: + pass + + # fake call for the forward function of virtual pipeline layer + for i in range(len(model_chunks)): + out = pipe_model(paddle.to_tensor([1., 2.]), chunk_id=i) + assert list(out.shape) == [2] + out = F.relu(out) + loss = paddle.mean(out) + loss.backward() + + optimizer.step() + + # just make sure the model can be wrapped with distributed model dist_model = fleet.distributed_model(pipe_model) -- GitLab