test_while_op_completion.py 6.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
16

17
import numpy as np
18 19

import paddle
20
import paddle.nn.functional as F
21
from paddle import nn, static
22
from paddle.distributed import fleet
23 24 25 26
from paddle.distributed.auto_parallel.static.completion import Completer
from paddle.distributed.auto_parallel.static.dist_context import (
    DistributedContext,
)
27
from paddle.distributed.fleet import auto
28 29 30 31 32 33 34

paddle.enable_static()

batch_size = 4
epoch_num = 10
hidden_size = 1024
sequence_len = 512
35
_g_process_mesh = auto.ProcessMesh([[0, 1], [2, 3]], dim_names=['x', 'y'])
36 37 38 39 40 41 42 43 44 45 46 47 48


def get_random_inputs_and_labels(input_shape, label_shape):
    input = np.random.random(size=input_shape).astype('float32')
    label = np.random.random(size=label_shape).astype('float32')
    return input, label


def batch_generator_creator():
    def __reader__():
        for _ in range(batch_size):
            batch_input, batch_label = get_random_inputs_and_labels(
                [batch_size, sequence_len, hidden_size],
49 50
                [batch_size, sequence_len, 1],
            )
51 52 53 54 55 56
            yield batch_input, batch_label

    return __reader__


class MLPLayer(nn.Layer):
57 58 59 60 61 62 63
    def __init__(
        self,
        hidden_size=1024,
        intermediate_size=4 * 1024,
        dropout_ratio=0.1,
        initializer_range=0.02,
    ):
64
        super().__init__()
65 66
        d_model = hidden_size
        dim_feedforward = intermediate_size
67 68 69
        param_initializer = nn.initializer.Normal(
            mean=0.0, std=initializer_range
        )
70 71 72 73 74 75

        self.norm = nn.LayerNorm(d_model, epsilon=1e-5)
        self.linear0 = nn.Linear(
            d_model,
            dim_feedforward,
            weight_attr=paddle.ParamAttr(initializer=param_initializer),
76 77
            bias_attr=None,
        )
78 79 80 81
        self.linear1 = nn.Linear(
            dim_feedforward,
            d_model,
            weight_attr=paddle.ParamAttr(initializer=param_initializer),
82 83
            bias_attr=None,
        )
84 85 86

    def forward(self, input):
        out = self.norm(input)
87 88 89
        auto.shard_tensor(
            self.linear0.weight, _g_process_mesh[:, 0], [None, 'x']
        )
90 91
        out = self.linear0(out)
        out = F.gelu(out, approximate=True)
92 93 94
        auto.shard_tensor(
            self.linear1.weight, _g_process_mesh[:, 1], ['x', None]
        )
95 96 97 98 99 100 101 102 103 104 105
        out = self.linear1(out)

        return out


def loop_cond(i, loop_len, input_array):
    return i < loop_len


def loop_body(i, loop_len, input_array):
    pre_input = paddle.tensor.array_read(array=input_array, i=i)
106 107 108 109 110 111 112 113 114 115 116 117 118
    mlp_while0 = MLPLayer(
        hidden_size=hidden_size,
        intermediate_size=4 * hidden_size,
        dropout_ratio=0.1,
        initializer_range=0.02,
    )

    mlp_while1 = MLPLayer(
        hidden_size=hidden_size,
        intermediate_size=4 * hidden_size,
        dropout_ratio=0.1,
        initializer_range=0.02,
    )
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

    output = mlp_while0(pre_input)
    cur_pred = mlp_while1(output)
    # 更新循环条件
    i = paddle.increment(x=i, value=1)
    paddle.tensor.array_write(cur_pred, array=input_array, i=i)
    return i, loop_len, input_array


def get_program():
    dist_strategy = fleet.DistributedStrategy()
    dist_strategy.semi_auto = True
    # fleet.init(is_collective=True, strategy=dist_strategy)

    train_program = static.Program()
    start_program = static.Program()
    with static.program_guard(train_program, start_program):

        # 循环计数器
        i = paddle.full(shape=[1], fill_value=0, dtype='int64')
        # 循环次数
        loop_len = paddle.full(shape=[1], fill_value=epoch_num, dtype='int64')

        # input
143 144 145 146 147 148 149 150
        input = static.data(
            name="input",
            shape=[batch_size, sequence_len, hidden_size],
            dtype='float32',
        )
        label = static.data(
            name="label", shape=[batch_size, sequence_len, 1], dtype='float32'
        )
151 152
        data_holder = [input, label]
        # dataloader
153
        dataloader = paddle.fluid.io.DataLoader.from_generator(
154 155 156 157 158
            feed_list=data_holder, capacity=4 * batch_size, iterable=False
        )
        dataloader.set_batch_generator(
            batch_generator_creator(), places=paddle.static.cuda_places()
        )
159
        # data dist_attr
160 161
        auto.shard_tensor(input, _g_process_mesh[:, 0], [None, None, None])
        auto.shard_tensor(label, _g_process_mesh[:, 0], [None, None, None])
162

163 164 165 166 167 168
        mlp_start = MLPLayer(
            hidden_size=hidden_size,
            intermediate_size=4 * hidden_size,
            dropout_ratio=0.1,
            initializer_range=0.02,
        )
169 170 171 172
        pred = mlp_start(input)

        input_array = paddle.tensor.array_write(pred, i)
        i, loop_len, input_array = static.nn.while_loop(
173 174
            cond=loop_cond, body=loop_body, loop_vars=[i, loop_len, input_array]
        )
175 176
        end_pred = paddle.tensor.array_read(array=input_array, i=i)

177 178 179 180 181 182
        mlp_end = MLPLayer(
            hidden_size=hidden_size,
            intermediate_size=4 * hidden_size,
            dropout_ratio=0.1,
            initializer_range=0.02,
        )
183 184 185 186 187 188 189 190 191 192 193 194 195 196
        pred = mlp_end(end_pred)

        error_cost = paddle.nn.functional.square_error_cost(pred, label)
        loss = paddle.mean(error_cost)

    return train_program, start_program, dataloader, i, loss


class TestMLP(unittest.TestCase):
    def test_completer(self):
        train_program, start_program, dataloader, i, loss = get_program()
        dist_context = DistributedContext()
        completer = Completer(dist_context)
        complete_train_program = completer.complete_forward_annotation(
197 198
            train_program
        )
199 200
        # print_program_with_dist_attr(complete_train_program, dist_context)

201 202 203 204 205
    def test_completer_by_dist_op(self):
        train_program, start_program, dataloader, i, loss = get_program()
        dist_context = DistributedContext()
        completer = Completer(dist_context)
        complete_train_program = completer.complete_forward_annotation(
206 207
            train_program
        )
208 209
        complete_train_program = completer._complete_tensor_dist_attr_by_op()

210 211 212

if __name__ == "__main__":
    unittest.main()