pipeline_mnist_multi_device.py 5.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from functools import reduce
16

L
LoneRanger 已提交
17
from legacy_test import nets
T
tianshuo78520a 已提交
18
from legacy_test.test_dist_base import TestDistRunnerBase, runtime_main
19 20

import paddle
21 22
from paddle import fluid
from paddle.distributed import fleet
23 24 25 26 27 28 29 30 31 32 33 34

paddle.enable_static()

DTYPE = "float32"
paddle.dataset.mnist.fetch()

# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1


def cnn_model(data):
L
LoneRanger 已提交
35
    conv_pool_1 = nets.simple_img_conv_pool(
36 37 38 39 40 41
        input=data,
        filter_size=5,
        num_filters=20,
        pool_size=2,
        pool_stride=2,
        act="relu",
42
        param_attr=fluid.ParamAttr(
43
            initializer=paddle.nn.initializer.Constant(value=0.01)
44 45
        ),
    )
L
LoneRanger 已提交
46
    conv_pool_2 = nets.simple_img_conv_pool(
47 48 49 50 51 52
        input=conv_pool_1,
        filter_size=5,
        num_filters=50,
        pool_size=2,
        pool_stride=2,
        act="relu",
53
        param_attr=fluid.ParamAttr(
54
            initializer=paddle.nn.initializer.Constant(value=0.01)
55 56
        ),
    )
57 58 59 60

    SIZE = 10
    input_shape = conv_pool_2.shape
    param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE]
61
    scale = (2.0 / (param_shape[0] ** 2 * SIZE)) ** 0.5
62 63

    with fluid.device_guard("gpu:1"):
C
Charles-hit 已提交
64 65
        predict = paddle.static.nn.fc(
            x=conv_pool_2,
66
            size=SIZE,
C
Charles-hit 已提交
67 68
            activation="softmax",
            weight_attr=fluid.param_attr.ParamAttr(
69
                initializer=paddle.nn.initializer.Constant(value=0.01)
70 71
            ),
        )
72
        # To cover @RENAMED@GRADIENT
C
Charles-hit 已提交
73 74
        predict2 = paddle.static.nn.fc(
            x=conv_pool_1,
75
            size=SIZE,
C
Charles-hit 已提交
76 77
            activation="softmax",
            weight_attr=fluid.param_attr.ParamAttr(
78
                initializer=paddle.nn.initializer.Constant(value=0.01)
79 80
            ),
        )
81 82 83 84 85 86 87 88
        predict += predict2
    return predict


class TestDistMnist2x2(TestDistRunnerBase):
    def get_model(self, batch_size=2, use_dgc=False, dist_strategy=None):
        # Input data
        with fluid.device_guard("gpu:0"):
G
GGBond8488 已提交
89 90 91 92 93
            images = paddle.static.data(
                name='pixel', shape=[-1, 1, 28, 28], dtype=DTYPE
            )
            label = paddle.static.data(
                name='label', shape=[-1, 1], dtype='int64'
94
            )
95 96 97 98 99 100

            if dist_strategy:
                data_loader = fluid.io.DataLoader.from_generator(
                    feed_list=[images, label],
                    capacity=64,
                    use_double_buffer=False,
101 102
                    iterable=False,
                )
103 104 105
            # Train program
            predict = cnn_model(images)
        with fluid.device_guard("gpu:1"):
106 107 108
            cost = paddle.nn.functional.cross_entropy(
                input=predict, label=label, reduction='none', use_softmax=False
            )
109
            avg_cost = paddle.mean(x=cost)
110 111 112

        # Evaluator
        with fluid.device_guard("gpu:1"):
113
            batch_size_tensor = paddle.tensor.create_tensor(dtype='int64')
114
            batch_acc = paddle.static.accuracy(
115 116
                input=predict, label=label, total=batch_size_tensor
            )
117 118 119 120 121 122 123 124 125 126 127

        inference_program = fluid.default_main_program().clone()
        base_lr = self.lr
        passes = [30, 60, 80, 90]
        steps_per_pass = 10
        bd = [steps_per_pass * p for p in passes]
        lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
        lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr)
        opt = fluid.optimizer.Momentum(
            learning_rate=lr_val,
            momentum=0.9,
128
            grad_clip=paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0),
129
        )
130 131 132 133

        acc_steps = 2  # accumulated steps for pipeline
        if dist_strategy:
            # Reader
134 135 136 137 138 139
            train_reader = paddle.batch(
                paddle.dataset.mnist.test(), batch_size=batch_size
            )
            test_reader = paddle.batch(
                paddle.dataset.mnist.test(), batch_size=batch_size
            )
140 141 142 143 144 145 146
            fleet.init(is_collective=True)
            strategy = fleet.DistributedStrategy()
            strategy.pipeline = True
            strategy.amp = True
            strategy.pipeline_configs = {
                'micro_batch_size': batch_size,
                'schedule_mode': 'F-then-B',
147
                'accumulate_steps': acc_steps,
148
            }
149 150 151
            dist_opt = fleet.distributed_optimizer(
                optimizer=opt, strategy=strategy
            )
152 153 154 155
            dist_opt.minimize(avg_cost)
        else:
            opt.minimize(avg_cost)
            # Reader
156 157 158 159 160 161
            train_reader = paddle.batch(
                paddle.dataset.mnist.test(), batch_size=batch_size * acc_steps
            )
            test_reader = paddle.batch(
                paddle.dataset.mnist.test(), batch_size=batch_size * acc_steps
            )
162 163

        if dist_strategy:
164 165 166 167 168 169 170 171 172
            return (
                inference_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
                data_loader,
            )
173
        else:
174 175 176 177 178 179 180 181
            return (
                inference_program,
                avg_cost,
                train_reader,
                test_reader,
                batch_acc,
                predict,
            )
182 183 184 185


if __name__ == "__main__":
    runtime_main(TestDistMnist2x2)