test_fleet_elastic_manager.py 11.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
#   Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import os
import time
import unittest
import argparse

from paddle.distributed.fleet.elastic.manager import ElasticManager
from paddle.distributed.fleet.elastic.manager import ELASTIC_TIMEOUT


class MockLease():
    def refresh(self):
        pass


class MockEtcdClient:
    def __init__(self, lease=None):
        self._lease = lease

    def put(self, key, value, lease=None):
        pass

    def get(self, key):
        value = "0"
        return value, value

    def delete_prefix(self, key):
        pass

    def get_prefix(self, key_prefix):
        hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
        return hosts

    def add_watch_callback(self, *args, **kwargs):
        return "host_watch"

    def add_watch_prefix_callback(self, key_prefix, callback, **kwargs):
        callback(None)
        return "host_watch"

    def cancel_watch(self, watch_id):
        pass

    def delete(self, key):
        pass

    def lease(self, ttl):
        if self._lease:
            return self._lease
        else:
            return MockLease()


class TestElasticManager(unittest.TestCase):
    def setUp(self):
        self.etcd_client = MockEtcdClient()

    def test_elastic_manager_init(self):
        class Argument:
            elastic_server = "127.0.0.1:2379"
            job_id = "test_job_id_123"
            np = "2"
            gpus = "0"
            nproc_per_node = 1
            host = None
81 82
            curr_host = None
            ips = None
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
            scale = None
            force = None
            backend = 'gloo'

        args = Argument()

        class _MockLease():
            def refresh(self):
                raise ValueError("valid error, this only for unittest")

        etcd_client = MockEtcdClient(lease=_MockLease())
        elastic = ElasticManager(args, etcd_client=etcd_client)

    def test_match_faulttolerance(self):
        class Argument:
            elastic_server = "127.0.0.1:2379"
            job_id = "test_job_id_123"
            np = "2"
            gpus = "0"
            nproc_per_node = 1
            host = None
104 105
            curr_host = None
            ips = None
106 107 108 109 110
            scale = None
            force = None
            backend = 'gloo'

        args = Argument()
111
        args.ips = "10.10.10.1,10.10.10.2"
112
        elastic = ElasticManager(args, self.etcd_client)
113 114
        os.environ['FLAGS_START_PORT'] = "6001"

115 116 117
        hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
        os.environ[
            'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
118

119
        self.assertEqual(elastic._match(hosts), True)
120

121
        hosts = ["10.10.10.1:6001"]
122
        args.ips = "10.10.10.1"
123 124 125 126 127 128 129 130 131 132 133
        os.environ['PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001"
        self.assertEqual(elastic._match(hosts), False)

    def test_match_elastic(self):
        class Argument:
            elastic_server = "127.0.0.1:2379"
            job_id = "test_job_id_123"
            np = "2:4"
            gpus = "0"
            nproc_per_node = 1
            host = None
134 135
            curr_host = None
            ips = None
136 137 138 139 140 141
            scale = None
            force = None
            backend = 'gloo'

        os.environ['PADDLE_ELASTIC_TIMEOUT'] = "60"
        args = Argument()
142
        args.ips = "10.10.10.1,10.10.10.2,10.10.10.3,10.10.10.4"
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
        os.environ['FLAGS_START_PORT'] = "6001"
        os.environ[
            'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001,10.10.10.4:6001"
        os.environ[
            'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001,10.10.10.4:6001"
        elastic = ElasticManager(args, self.etcd_client)
        hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
        self.assertEqual(elastic._match(hosts), False)

        hosts = [
            "10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001",
            "10.10.10.4:6001"
        ]
        self.assertEqual(elastic._match(hosts), True)

        hosts = ["10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"]
        self.assertEqual(elastic._match(hosts), False)

        hosts = ["10.10.10.1:6001"]
        self.assertEqual(elastic._match(hosts), False)

164
        args.ips = "10.10.10.1,10.10.10.2"
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
        os.environ[
            'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
        os.environ[
            'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
        elastic = ElasticManager(args, self.etcd_client)
        hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
        self.assertEqual(elastic._match(hosts), True)

        # TODO test timeout
        #time.sleep(60)
        #self.assertEqual(elastic._match(hosts), True)

    def test_update_hosts_for_faulttolerance(self):
        class Argument:
            elastic_server = "127.0.0.1:2379"
            job_id = "test_job_id_123"
            np = "0"
            gpus = "0"
            nproc_per_node = 1
            host = None
185 186
            curr_host = None
            ips = None
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
            scale = None
            force = None
            backend = 'gloo'

        args = Argument()
        os.environ['FLAGS_START_PORT'] = "6001"
        os.environ['PADDLE_ELASTIC_NP'] = "2"
        os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.2"
        os.environ[
            'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
        os.environ[
            'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
        elastic = ElasticManager(args, self.etcd_client)
        # add 10.10.10.3:6001
        os.environ['PADDLE_TRAINER_ID'] = "0"
202
        elastic.curr_host = "10.10.10.1:6001"
203 204 205 206 207
        elastic.hosts = ["10.10.10.1:6001", "10.10.10.2:6001"]
        elastic._update_hosts()
        self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.2")

        # add 10.10.10.3:6001
208
        elastic.curr_host = "10.10.10.3:6001"
209 210 211 212 213
        elastic.hosts = ["10.10.10.1:6001", "10.10.10.3:6001"]
        os.environ['PADDLE_TRAINER_ID'] = "1"
        elastic._update_hosts()
        self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.3")

214
        elastic.curr_host = "10.10.10.3:6001"
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
        elastic.hosts = ["10.10.10.1:6001", "10.10.10.3:6001"]
        os.environ['PADDLE_TRAINER_ID'] = "-1"
        elastic._update_hosts()
        self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.3")

    def test_update_hosts_for_elastic(self):
        #######################
        #  elastic, scale up  #
        #######################
        class Argument:
            elastic_server = "127.0.0.1:2379"
            job_id = "test_job_id_123"
            np = "2:4"
            gpus = "0"
            nproc_per_node = 1
            host = None
231 232
            curr_host = None
            ips = None
233 234 235 236 237 238 239 240 241 242 243 244 245 246
            scale = None
            force = None
            backend = 'gloo'

        args = Argument()

        os.environ['FLAGS_START_PORT'] = "6001"
        os.environ['PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.2"
        os.environ[
            'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
        os.environ[
            'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.2:6001"
        elastic = ElasticManager(args, self.etcd_client)
        # add 10.10.10.3:6001
247
        elastic.curr_host = "10.10.10.1:6001"
248 249 250 251 252 253 254 255 256 257
        elastic.hosts = [
            "10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"
        ]
        elastic._update_hosts()
        #self.assertEqual(elastic.all_host_endpoints,
        #                 ["10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"])
        self.assertEqual(
            os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.2,10.10.10.3")

        #######################
258
        # elastic, scale in #
259 260 261 262 263 264 265
        #######################
        os.environ[
            'PADDLE_TRAINERS'] = "10.10.10.0,10.10.10.1,10.10.10.2,10.10.10.3"
        os.environ[
            'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.0:6000,10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001"
        os.environ[
            'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.0:6000,10.10.10.1:6001,10.10.10.2:6001,10.10.10.3:6001"
266 267 268 269 270
        os.environ['POD_IP'] = "10.10.10.1"
        os.environ['TRAINER_PORTS_NUM'] = "4"
        os.environ['PADDLE_TRAINER_ID'] = "1"
        os.environ['PADDLE_PORT'] = "6001"
        args = Argument()
271 272
        elastic = ElasticManager(args, self.etcd_client)
        # remove 10.10.10.1:6001
273
        elastic.curr_host = "10.10.10.1:6001"
274 275 276 277 278 279 280 281 282 283 284 285 286
        elastic.hosts = [
            "10.10.10.1:6001", "10.10.10.2:6001", "10.10.10.3:6001"
        ]
        elastic._update_hosts()
        #self.assertEqual(elastic.all_host_endpoints,
        #                 ["10.10.10.3:6001", "10.10.10.1:6001", "10.10.10.2:6001"])
        self.assertEqual(
            os.getenv('PADDLE_TRAINERS'), "10.10.10.3,10.10.10.1,10.10.10.2")
        self.assertEqual(
            os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS'),
            "10.10.10.3:6001,10.10.10.1:6001,10.10.10.2:6001")

        ############
287 288
        os.environ[
            'PADDLE_TRAINERS'] = "10.10.10.1,10.10.10.1,10.10.10.1,10.10.10.1"
289 290 291 292
        os.environ[
            'DISTRIBUTED_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.1:6002,10.10.10.1:6003,10.10.10.1:6004"
        os.environ[
            'PADDLE_TRAINER_ENDPOINTS'] = "10.10.10.1:6001,10.10.10.1:6002,10.10.10.1:6003,10.10.10.1:6004"
293 294 295 296
        os.environ['POD_IP'] = "10.10.10.1"
        os.environ['TRAINER_PORTS_NUM'] = "4"
        os.environ['PADDLE_PORT'] = "6001"
        args = Argument()
297 298
        elastic = ElasticManager(args, self.etcd_client)
        # remove 10.10.10.1:6001
299
        elastic.curr_host = "10.10.10.1:6001"
300
        os.environ['PADDLE_TRAINER_ID'] = "-1"
301
        elastic.hosts = ["10.10.10.1:6001", "10.10.10.1:6003"]
302 303 304 305 306 307
        elastic._update_hosts()
        #self.assertEqual(elastic.all_host_endpoints,
        #                 ["10.10.10.1:6001", "10.10.10.1:6001"])
        self.assertEqual(os.getenv('PADDLE_TRAINERS'), "10.10.10.1,10.10.10.1")
        self.assertEqual(
            os.getenv('DISTRIBUTED_TRAINER_ENDPOINTS'),
308
            "10.10.10.1:6001,10.10.10.1:6003")
309 310 311 312 313 314 315 316 317

    def test_exit(self):
        class Argument:
            elastic_server = "127.0.0.1:2379"
            job_id = "test_job_id_123"
            np = "2"
            gpus = "0"
            nproc_per_node = 1
            host = None
318 319
            curr_host = None
            ips = None
320 321 322 323 324 325 326 327
            scale = None
            force = None
            backend = 'gloo'

        args = Argument()
        elastic = ElasticManager(args, self.etcd_client)
        elastic.exit()

328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
    def test_pre_hook(self):
        class Argument:
            elastic_server = "127.0.0.1:2379"
            job_id = "test_job_id_123"
            np = "2"
            gpus = "0"
            nproc_per_node = 1
            host = None
            curr_host = None
            ips = None
            scale = None
            force = None
            backend = 'gloo'
            elastic_pre_hook = None

        args = Argument()
        elastic = ElasticManager(args, self.etcd_client)
        elastic.pre_hook()

        args.elastic_pre_hook = "hostname"
        elastic.pre_hook()

350 351 352

if __name__ == "__main__":
    unittest.main()