test_fleet_executor.py 3.4 KB
Newer Older
L
LiYuRio 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
16
import numpy as np
L
LiYuRio 已提交
17 18 19 20 21 22 23
import paddle
import paddle.fluid as fluid

paddle.enable_static()


class TestFleetExecutor(unittest.TestCase):
24

25 26 27 28 29 30 31 32 33 34 35 36
    def fake_fleet_opt(self):
        # TODO: Fake for coverage will be removed in the future
        import paddle.distributed.fleet as fleet
        strategy = fleet.DistributedStrategy()
        strategy.sharding_configs = {
            "dp_degree": 1,
            "mp_degree": 1,
            "pp_degree": 1
        }
        strategy.pipeline_configs = {"accumulate_steps": 1}
        fleet_opt = {
            "dist_strategy": strategy.sharding_configs,
37
            "num_micro_batches": strategy.pipeline_configs["accumulate_steps"],
38
            "scheduler": "1F1B"
39 40 41 42
        }
        return fleet_opt

    def run_fleet_executor(self, place, x_data, y_data):
L
LiYuRio 已提交
43 44 45
        exe = paddle.static.Executor(place)
        empty_program = paddle.static.Program()
        with fluid.program_guard(empty_program, empty_program):
46 47 48 49 50 51
            x = fluid.layers.data(name='x',
                                  shape=x_data.shape,
                                  dtype=x_data.dtype)
            y = fluid.layers.data(name='y',
                                  shape=y_data.shape,
                                  dtype=y_data.dtype)
52 53
            z = x + y
            a = 2 * x + 3 * y
54 55 56 57 58 59
            loss = paddle.mean(a)
            base_lr = 0.1
            passes = [30, 60, 80, 90]
            steps_per_pass = 10
            bd = [steps_per_pass * p for p in passes]
            lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
60 61
            lr_val = paddle.optimizer.lr.PiecewiseDecay(boundaries=bd,
                                                        values=lr)
62 63 64 65
            opt = paddle.optimizer.AdamW(
                learning_rate=lr_val,
                grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0))
            opt.minimize(loss)
66
        # TODO: section_program will be removed in the future
L
LiYuRio 已提交
67
        empty_program._pipeline_opt = {
68
            "fleet_opt": self.fake_fleet_opt(),
L
LiYuRio 已提交
69 70
            "section_program": empty_program
        }
71
        res = exe.run(empty_program,
72 73 74 75
                      feed={
                          'x': x_data,
                          'y': y_data
                      },
76 77
                      fetch_list=[z.name, a.name])
        return res
L
LiYuRio 已提交
78

79
    def test_executor_on_single_device(self):
L
LiYuRio 已提交
80
        if fluid.is_compiled_with_cuda():
81 82 83 84 85 86 87 88
            shape = (10000, 3462)
            x_data = np.random.rand(*shape)
            y_data = np.random.rand(*shape)
            z_data = x_data + y_data
            a_data = 2 * x_data + 3 * y_data
            res = self.run_fleet_executor(fluid.CUDAPlace(0), x_data, y_data)
            self.assertTrue(np.allclose(res[0], z_data))
            self.assertTrue(np.allclose(res[1], a_data))
L
LiYuRio 已提交
89

L
LiYuRio 已提交
90 91 92

if __name__ == "__main__":
    unittest.main()