hybrid_parallel_pp_layer.py 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import unittest
16

17
import numpy as np
18

19 20
import paddle.nn as nn
import paddle.nn.functional as F
21 22 23 24
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer
from paddle.fluid.dygraph.layers import Layer
from paddle.nn import Sequential
25 26 27 28


class ReshapeHelp(Layer):
    def __init__(self, shape):
29
        super().__init__()
30 31 32 33
        self.shape = shape

    def forward(self, x):
        return x.reshape(shape=self.shape)
34 35 36 37


class AlexNet(Layer):
    def __init__(self, num_classes=10):
38
        super().__init__()
39
        self.features = Sequential(
40
            nn.Conv2D(1, 64, kernel_size=11, stride=4, padding=5),
41
            nn.ReLU(),
42 43
            nn.MaxPool2D(kernel_size=2, stride=2),
            nn.Conv2D(64, 192, kernel_size=5, padding=2),
44
            nn.ReLU(),
45 46
            nn.MaxPool2D(kernel_size=2, stride=2),
            nn.Conv2D(192, 384, kernel_size=3, padding=1),
47
            nn.ReLU(),
48
            nn.Conv2D(384, 256, kernel_size=3, padding=1),
49
            nn.ReLU(),
50
            nn.Conv2D(256, 256, kernel_size=3, padding=1),
51
            nn.ReLU(),
52 53
            nn.MaxPool2D(kernel_size=2, stride=2),
        )
54 55

        self.reshape_layer = ReshapeHelp(shape=[-1, 256])
56 57 58 59 60
        self.classifier = nn.Linear(256, num_classes)
        self.loss_fn = nn.loss.CrossEntropyLoss()

    def forward(self, x, y):
        x = self.features(x)
61
        x = self.reshape_layer(x)
62 63 64 65 66 67 68
        x = self.classifier(x)
        return self.loss_fn(x, y)


class AlexNetPipe(AlexNet):
    def to_layers(self):
        feat = [self.features[i] for i in range(len(self.features))]
69
        loss_fn = [self.reshape_layer, self.classifier]
70 71 72 73 74 75 76 77
        feat.extend(loss_fn)
        return feat


class AlexNetPipeDesc(PipelineLayer):
    def __init__(self, num_classes=10, **kwargs):
        self.num_classes = num_classes
        decs = [
78
            LayerDesc(nn.Conv2D, 1, 64, kernel_size=11, stride=4, padding=5),
79
            LayerDesc(nn.ReLU),
80 81
            LayerDesc(nn.MaxPool2D, kernel_size=2, stride=2),
            LayerDesc(nn.Conv2D, 64, 192, kernel_size=5, padding=2),
82
            F.relu,
83 84
            LayerDesc(nn.MaxPool2D, kernel_size=2, stride=2),
            LayerDesc(nn.Conv2D, 192, 384, kernel_size=3, padding=1),
85
            F.relu,
86
            LayerDesc(nn.Conv2D, 384, 256, kernel_size=3, padding=1),
87
            F.relu,
88
            LayerDesc(nn.Conv2D, 256, 256, kernel_size=3, padding=1),
89
            F.relu,
90 91
            LayerDesc(nn.MaxPool2D, kernel_size=2, stride=2),
            LayerDesc(ReshapeHelp, shape=[-1, 256]),
92 93
            LayerDesc(nn.Linear, 256, self.num_classes),  # classifier
        ]
94
        super().__init__(layers=decs, loss_fn=nn.CrossEntropyLoss(), **kwargs)
95 96 97 98 99


class TestPipeLayerAPI(unittest.TestCase):
    def setUp(self):
        strategy = fleet.DistributedStrategy()
100
        self.pipeline_parallel_size = 2
101 102 103
        strategy.hybrid_configs = {
            "dp_degree": 1,
            "mp_degree": 1,
104
            "pp_degree": self.pipeline_parallel_size,
105 106 107 108 109
        }
        fleet.init(is_collective=True, strategy=strategy)
        self.hcg = fleet.get_hybrid_communicate_group()

    def test_pipelayer_desc(self):
110
        pipe_model = AlexNetPipeDesc(num_stages=self.pipeline_parallel_size)
111 112 113 114
        np.testing.assert_array_equal(len(pipe_model.parameters()), 6)

    def test_pipelayer_sequential(self):
        init_net = AlexNetPipe()
115 116 117 118 119
        pipe_model = PipelineLayer(
            layers=init_net.to_layers(),
            num_stages=self.pipeline_parallel_size,
            loss_fn=nn.CrossEntropyLoss(),
        )
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        stage_id = self.hcg.get_stage_id()
        init_parameters = init_net.parameters()
        pipe_parameters = pipe_model.parameters()
        part_number = len(init_parameters) // 2

        if stage_id == 0:
            for idx in range(part_number):
                param_a = init_parameters[idx]
                param_b = pipe_parameters[idx]
                np.testing.assert_array_equal(param_a.name, param_b.name)
                np.testing.assert_allclose(param_a.numpy(), param_b.numpy())

        elif stage_id == 1:
            for idx in range(part_number):
                param_a = init_parameters[idx + part_number]
                param_b = pipe_parameters[idx]

                np.testing.assert_array_equal(param_a.name, param_b.name)
                np.testing.assert_allclose(param_a.numpy(), param_b.numpy())


if __name__ == '__main__':
    unittest.main()