test_fleet_base.py 7.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import paddle
17 18
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
19
import os
20
import paddle.fluid as fluid
21
import paddle.nn as nn
22
import numpy as np
23 24 25 26 27


class TestFleetBase(unittest.TestCase):
    def setUp(self):
        os.environ["POD_IP"] = "127.0.0.1"
28
        os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36000"
29 30
        os.environ["PADDLE_TRAINERS_NUM"] = "2"
        os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
31
            "127.0.0.1:36001,127.0.0.2:36002"
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61

    def test_init(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)

    def test_is_first_worker(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
        if fleet.is_first_worker():
            print("test fleet first worker done.")

    def test_worker_index(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
        print(fleet.worker_index())

    def test_worker_num(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
        print(fleet.worker_num())

    def test_is_worker(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
        if fleet.is_worker():
            print("test fleet is worker")

    def test_worker_endpoints(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
62 63 64
        self.assertEqual(
            "127.0.0.1:36000", fleet.worker_endpoints(to_string=True))
        self.assertEqual(["127.0.0.1:36000"], fleet.worker_endpoints())
65 66

    def test_server_num(self):
67 68 69 70 71
        os.environ["TRAINING_ROLE"] = "PSERVER"
        os.environ["PADDLE_PORT"] = "36001"
        os.environ["POD_IP"] = "127.0.0.1"

        role = role_maker.PaddleCloudRoleMaker()
72
        fleet.init(role)
73 74
        os.environ["PADDLE_TRAINERS_NUM"] = "2"
        self.assertEqual(2, fleet.server_num())
75 76

    def test_server_index(self):
77 78 79 80 81
        os.environ["TRAINING_ROLE"] = "PSERVER"
        os.environ["PADDLE_PORT"] = "36001"
        os.environ["POD_IP"] = "127.0.0.1"

        role = role_maker.PaddleCloudRoleMaker()
82
        fleet.init(role)
83
        self.assertEqual(0, fleet.server_index())
84 85

    def test_server_endpoints(self):
86 87 88 89 90
        os.environ["TRAINING_ROLE"] = "PSERVER"
        os.environ["PADDLE_PORT"] = "36001"
        os.environ["POD_IP"] = "127.0.0.1"

        role = role_maker.PaddleCloudRoleMaker()
91 92
        fleet.init(role)
        if fleet.is_server():
93 94 95 96 97
            self.assertEqual(
                "127.0.0.1:36001,127.0.0.2:36002",
                fleet.server_endpoints(to_string=True))
            self.assertEqual(["127.0.0.1:36001", "127.0.0.2:36002"],
                             fleet.server_endpoints())
98 99

    def test_is_server(self):
100 101 102 103 104
        os.environ["TRAINING_ROLE"] = "PSERVER"
        os.environ["PADDLE_PORT"] = "36001"
        os.environ["POD_IP"] = "127.0.0.1"

        role = role_maker.PaddleCloudRoleMaker()
105
        fleet.init(role)
106
        self.assertTrue(fleet.is_server())
107 108 109 110

    def test_util(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
111
        self.assertNotEqual(fleet.util, None)
112 113 114 115 116 117 118 119 120 121 122

    def test_barrier_worker(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
        if fleet.is_worker():
            fleet.barrier_worker()

    def test_init_worker(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)

123 124 125
        with self.assertRaises(ValueError):
            if fleet.is_worker():
                fleet.init_worker()
126 127 128 129

    def test_stop_worker(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
130 131 132
        with self.assertRaises(ValueError):
            if fleet.is_worker():
                fleet.stop_worker()
133 134 135 136

    def test_distributed_optimizer(self):
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
137

138
        optimizer = paddle.optimizer.SGD(learning_rate=0.001)
139
        optimizer = fleet.distributed_optimizer(optimizer)
140

141 142 143
    def test_exception(self):
        import paddle.distributed.fleet as fleet
        self.assertRaises(Exception, fleet.init_worker)
144 145


146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
class TestFleetDygraph(unittest.TestCase):
    def setUp(self):
        os.environ[
            "PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36213,127.0.0.1:36214"
        os.environ["PADDLE_CURRENT_ENDPOINTS"] = "127.0.0.1:36213"
        os.environ["PADDLE_TRAINERS_NUM"] = "2"
        os.environ["PADDLE_TRAINER_ID"] = "0"

    def test_dygraph_method(self):
        paddle.disable_static()
        value = np.arange(26).reshape(2, 13).astype("float32")
        a = fluid.dygraph.to_variable(value)
        layer = paddle.nn.Linear(13, 5)
        adam = paddle.optimizer.Adam(
            learning_rate=0.01, parameters=layer.parameters())
        # remove init cause this UT cannot launch distributed task
        adam = fleet.distributed_optimizer(adam)
        dp_layer = fleet.distributed_model(layer)
        lr = 0.001
        adam.set_lr(lr)
        cur_lr = adam.get_lr()
        assert (lr == cur_lr)
        state_dict = adam.state_dict()
        adam.set_state_dict(state_dict)

D
Dong Daxiang 已提交
171 172
        final_strategy = fleet._final_strategy()

173

174
class TestFleetBaseSingleError(unittest.TestCase):
175 176 177 178 179 180 181 182 183 184 185
    def setUp(self):
        os.environ.pop("PADDLE_TRAINER_ENDPOINTS")

    def gen_data(self):
        return {
            "x": np.random.random(size=(128, 32)).astype('float32'),
            "y": np.random.randint(
                2, size=(128, 1)).astype('int64')
        }

    def test_single_run_collective_minimize(self):
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
        def test_single_error():
            input_x = paddle.static.data(
                name="x", shape=[-1, 32], dtype='float32')
            input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64')

            fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh')
            prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax')
            cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
            avg_cost = paddle.mean(x=cost)
            fleet.init(is_collective=True)

        # in non_distributed mode(use `python` to launch), raise error if has multi cards
        if fluid.core.is_compiled_with_cuda(
        ) and fluid.core.get_cuda_device_count() > 1:
            self.assertRaises(ValueError, test_single_error)
        else:
            test_single_error()
203 204


205 206
if __name__ == "__main__":
    unittest.main()