test_spawn_mlu.py 3.6 KB
Newer Older
1
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
2
#
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
import unittest
import os

import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
from paddle.distributed.spawn import _get_subprocess_env_list, _options_valid_check, _get_default_nprocs
from paddle.fluid import core


class LinearNet(nn.Layer):
28

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
    def __init__(self):
        super(LinearNet, self).__init__()
        self._linear1 = nn.Linear(10, 10)
        self._linear2 = nn.Linear(10, 1)

    def forward(self, x):
        return self._linear2(self._linear1(x))


def train(print_result=False):
    # 1. initialize parallel environment
    dist.init_parallel_env()

    # 2. create data parallel layer & optimizer
    layer = LinearNet()
    dp_layer = paddle.DataParallel(layer)

    loss_fn = nn.MSELoss()
    adam = opt.Adam(learning_rate=0.001, parameters=dp_layer.parameters())

    # 3. run layer
    inputs = paddle.randn([10, 10], 'float32')
    outputs = dp_layer(inputs)
    labels = paddle.randn([10, 1], 'float32')
    loss = loss_fn(outputs, labels)

    if print_result is True:
        print("Rank:", int(os.getenv("PADDLE_TRAINER_ID")))

    loss.backward()
    adam.step()
    adam.clear_grad()

    return int(os.getenv("PADDLE_TRAINER_ID"))


class TestSpawn(unittest.TestCase):
66

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    def test_nprocs_greater_than_device_num_error(self):
        with self.assertRaises(RuntimeError):
            _get_subprocess_env_list(nprocs=100, options=dict())

    def test_selected_devices_error(self):
        with self.assertRaises(ValueError):
            options = dict()
            options['selected_devices'] = "100,101"
            _get_subprocess_env_list(nprocs=2, options=options)

    def test_get_correct_env(self):
        options = dict()
        options['print_config'] = True
        env_dict = _get_subprocess_env_list(nprocs=1, options=options)[0]
        self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0')
        self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1')

    def test_nprocs_not_equal_to_selected_devices(self):
        with self.assertRaises(ValueError):
            options = dict()
            options['selected_devices'] = "100,101,102"
            _get_subprocess_env_list(nprocs=2, options=options)

    def test_options_valid_check(self):
        options = dict()
        options['selected_devices'] = "100,101,102"
        _options_valid_check(options)

        with self.assertRaises(ValueError):
            options['error'] = "error"
            _options_valid_check(options)

    def test_get_default_nprocs(self):
        paddle.set_device('mlu')
        nprocs = _get_default_nprocs()
        self.assertEqual(nprocs, core.get_mlu_device_count())

    def test_spawn(self):
105 106
        num_devs = core.get_mlu_device_count()
        context = dist.spawn(train, backend='cncl', nprocs=num_devs)
107
        rank_list = []
108
        for i in range(num_devs):
109 110
            rank_list.append(context.return_queues[i].get())
        rank_list.sort()
111
        self.assertEqual(rank_list, list(range(num_devs)))
112 113 114 115


if __name__ == '__main__':
    unittest.main()