test_fleet_base_single.py 5.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import os
17

18 19 20 21 22 23 24 25 26 27 28 29 30 31
cuda_visible_devices = os.getenv('CUDA_VISIBLE_DEVICES')
if cuda_visible_devices is None or cuda_visible_devices == "":
    os.environ['CUDA_VISIBLE_DEVICES'] = '0'
else:
    os.environ['CUDA_VISIBLE_DEVICES'] = cuda_visible_devices.split(',')[0]
import paddle
import paddle.distributed.fleet as fleet
import paddle.fluid as fluid
import unittest
import paddle.nn as nn


class LinearNet(nn.Layer):
    def __init__(self):
32
        super().__init__()
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
        self._linear1 = nn.Linear(10, 10)
        self._linear2 = nn.Linear(10, 1)

    def forward(self, x):
        return self._linear2(self._linear1(x))


class TestFleetDygraphSingle(unittest.TestCase):
    def setUp(self):
        os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36213"
        os.environ["PADDLE_CURRENT_ENDPOINTS"] = "127.0.0.1:36213"
        os.environ["PADDLE_TRAINERS_NUM"] = "1"
        os.environ["PADDLE_TRAINER_ID"] = "0"

    def test_dygraph_single(self):
        paddle.disable_static()
49
        paddle.distributed.init_parallel_env()
50 51 52

        layer = LinearNet()
        loss_fn = nn.MSELoss()
53 54 55
        adam = paddle.optimizer.Adam(
            learning_rate=0.001, parameters=layer.parameters()
        )
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75

        adam = fleet.distributed_optimizer(adam)
        dp_layer = fleet.distributed_model(layer)
        for step in range(2):
            inputs = paddle.randn([10, 10], 'float32')
            outputs = dp_layer(inputs)
            labels = paddle.randn([10, 1], 'float32')
            loss = loss_fn(outputs, labels)
            loss.backward()
            adam.step()
            adam.clear_grad()


class TestFleetBaseSingleRunCollective(unittest.TestCase):
    def setUp(self):
        pass

    def gen_data(self):
        return {
            "x": np.random.random(size=(128, 32)).astype('float32'),
76
            "y": np.random.randint(2, size=(128, 1)).astype('int64'),
77 78 79
        }

    def test_single_run_collective_minimize(self):
80
        paddle.enable_static()
81 82 83 84 85 86 87 88 89 90 91 92 93
        input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32')
        input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64')

        fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh')
        prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax')
        cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
        avg_cost = paddle.mean(x=cost)

        fleet.init(is_collective=True)
        optimizer = fluid.optimizer.SGD(learning_rate=0.001)
        optimizer = fleet.distributed_optimizer(optimizer)
        optimizer.minimize(avg_cost)

94 95 96 97 98
        place = (
            fluid.CUDAPlace(0)
            if paddle.fluid.is_compiled_with_cuda()
            else fluid.CPUPlace()
        )
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114

        exe = fluid.Executor(place)
        exe.run(paddle.static.default_startup_program())

        for i in range(10):
            cost_val = exe.run(feed=self.gen_data(), fetch_list=[avg_cost.name])
            print("cost of step[{}] = {}".format(i, cost_val))


class TestFleetBaseSingleRunPS(unittest.TestCase):
    def setUp(self):
        pass

    def gen_data(self):
        return {
            "x": np.random.random(size=(128, 32)).astype('float32'),
115
            "y": np.random.randint(2, size=(128, 1)).astype('int64'),
116 117 118
        }

    def test_single_run_ps_minimize(self):
119
        paddle.enable_static()
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32')
        input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64')

        fc_1 = fluid.layers.fc(input=input_x, size=64, act='tanh')
        prediction = fluid.layers.fc(input=fc_1, size=2, act='softmax')
        cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
        avg_cost = paddle.mean(x=cost)

        fleet.init()
        strategy = paddle.distributed.fleet.DistributedStrategy()
        optimizer = fluid.optimizer.SGD(learning_rate=0.01)
        optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
        optimizer.minimize(avg_cost)
        if fleet.is_server():
            fleet.init_server()
            fleet.run_server()
        elif fleet.is_worker():
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            exe.run(paddle.static.default_startup_program())
            step = 10
            for i in range(step):
142 143 144 145 146 147 148 149 150
                cost_val = exe.run(
                    program=fluid.default_main_program(),
                    feed=self.gen_data(),
                    fetch_list=[avg_cost.name],
                )
                print(
                    "worker_index: %d, step%d cost = %f"
                    % (fleet.worker_index(), i, cost_val[0])
                )
151 152 153 154


if __name__ == "__main__":
    unittest.main()