test_fleet_executor.py 3.3 KB
Newer Older
L
LiYuRio 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
16

17
import numpy as np
18

L
LiYuRio 已提交
19 20 21 22 23 24 25
import paddle
import paddle.fluid as fluid

paddle.enable_static()


class TestFleetExecutor(unittest.TestCase):
26 27 28
    def fake_fleet_opt(self):
        # TODO: Fake for coverage will be removed in the future
        import paddle.distributed.fleet as fleet
29

30 31 32 33
        strategy = fleet.DistributedStrategy()
        strategy.sharding_configs = {
            "dp_degree": 1,
            "mp_degree": 1,
34
            "pp_degree": 1,
35 36 37 38
        }
        strategy.pipeline_configs = {"accumulate_steps": 1}
        fleet_opt = {
            "dist_strategy": strategy.sharding_configs,
39
            "num_micro_batches": strategy.pipeline_configs["accumulate_steps"],
40
            "scheduler": "1F1B",
41 42 43 44
        }
        return fleet_opt

    def run_fleet_executor(self, place, x_data, y_data):
L
LiYuRio 已提交
45 46 47
        exe = paddle.static.Executor(place)
        empty_program = paddle.static.Program()
        with fluid.program_guard(empty_program, empty_program):
48 49 50 51 52 53
            x = fluid.layers.data(
                name='x', shape=x_data.shape, dtype=x_data.dtype
            )
            y = fluid.layers.data(
                name='y', shape=y_data.shape, dtype=y_data.dtype
            )
54 55
            z = x + y
            a = 2 * x + 3 * y
56 57 58 59 60 61
            loss = paddle.mean(a)
            base_lr = 0.1
            passes = [30, 60, 80, 90]
            steps_per_pass = 10
            bd = [steps_per_pass * p for p in passes]
            lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
62 63 64
            lr_val = paddle.optimizer.lr.PiecewiseDecay(
                boundaries=bd, values=lr
            )
65 66
            opt = paddle.optimizer.AdamW(
                learning_rate=lr_val,
67 68
                grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0),
            )
69
            opt.minimize(loss)
70
        # TODO: section_program will be removed in the future
L
LiYuRio 已提交
71
        empty_program._pipeline_opt = {
72
            "fleet_opt": self.fake_fleet_opt(),
73
            "section_program": empty_program,
L
LiYuRio 已提交
74
        }
75 76 77 78 79
        res = exe.run(
            empty_program,
            feed={'x': x_data, 'y': y_data},
            fetch_list=[z.name, a.name],
        )
80
        return res
L
LiYuRio 已提交
81

82
    def test_executor_on_single_device(self):
L
LiYuRio 已提交
83
        if fluid.is_compiled_with_cuda():
84 85 86 87 88 89
            shape = (10000, 3462)
            x_data = np.random.rand(*shape)
            y_data = np.random.rand(*shape)
            z_data = x_data + y_data
            a_data = 2 * x_data + 3 * y_data
            res = self.run_fleet_executor(fluid.CUDAPlace(0), x_data, y_data)
90 91
            np.testing.assert_allclose(res[0], z_data, rtol=1e-05)
            np.testing.assert_allclose(res[1], a_data, rtol=1e-05)
L
LiYuRio 已提交
92

L
LiYuRio 已提交
93 94 95

if __name__ == "__main__":
    unittest.main()