test_fleet_executor.py 3.3 KB
Newer Older
L
LiYuRio 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
16
import numpy as np
L
LiYuRio 已提交
17 18 19 20 21 22 23
import paddle
import paddle.fluid as fluid

paddle.enable_static()


class TestFleetExecutor(unittest.TestCase):
24 25 26
    def fake_fleet_opt(self):
        # TODO: Fake for coverage will be removed in the future
        import paddle.distributed.fleet as fleet
27

28 29 30 31
        strategy = fleet.DistributedStrategy()
        strategy.sharding_configs = {
            "dp_degree": 1,
            "mp_degree": 1,
32
            "pp_degree": 1,
33 34 35 36
        }
        strategy.pipeline_configs = {"accumulate_steps": 1}
        fleet_opt = {
            "dist_strategy": strategy.sharding_configs,
37
            "num_micro_batches": strategy.pipeline_configs["accumulate_steps"],
38
            "scheduler": "1F1B",
39 40 41 42
        }
        return fleet_opt

    def run_fleet_executor(self, place, x_data, y_data):
L
LiYuRio 已提交
43 44 45
        exe = paddle.static.Executor(place)
        empty_program = paddle.static.Program()
        with fluid.program_guard(empty_program, empty_program):
46 47 48 49 50 51
            x = fluid.layers.data(
                name='x', shape=x_data.shape, dtype=x_data.dtype
            )
            y = fluid.layers.data(
                name='y', shape=y_data.shape, dtype=y_data.dtype
            )
52 53
            z = x + y
            a = 2 * x + 3 * y
54 55 56 57 58 59
            loss = paddle.mean(a)
            base_lr = 0.1
            passes = [30, 60, 80, 90]
            steps_per_pass = 10
            bd = [steps_per_pass * p for p in passes]
            lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
60 61 62
            lr_val = paddle.optimizer.lr.PiecewiseDecay(
                boundaries=bd, values=lr
            )
63 64
            opt = paddle.optimizer.AdamW(
                learning_rate=lr_val,
65 66
                grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0),
            )
67
            opt.minimize(loss)
68
        # TODO: section_program will be removed in the future
L
LiYuRio 已提交
69
        empty_program._pipeline_opt = {
70
            "fleet_opt": self.fake_fleet_opt(),
71
            "section_program": empty_program,
L
LiYuRio 已提交
72
        }
73 74 75 76 77
        res = exe.run(
            empty_program,
            feed={'x': x_data, 'y': y_data},
            fetch_list=[z.name, a.name],
        )
78
        return res
L
LiYuRio 已提交
79

80
    def test_executor_on_single_device(self):
L
LiYuRio 已提交
81
        if fluid.is_compiled_with_cuda():
82 83 84 85 86 87
            shape = (10000, 3462)
            x_data = np.random.rand(*shape)
            y_data = np.random.rand(*shape)
            z_data = x_data + y_data
            a_data = 2 * x_data + 3 * y_data
            res = self.run_fleet_executor(fluid.CUDAPlace(0), x_data, y_data)
88 89
            np.testing.assert_allclose(res[0], z_data, rtol=1e-05)
            np.testing.assert_allclose(res[1], a_data, rtol=1e-05)
L
LiYuRio 已提交
90

L
LiYuRio 已提交
91 92 93

if __name__ == "__main__":
    unittest.main()