test_communicator_async.py 2.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import unittest
18
import time
19 20
import threading
import numpy
21

22
import paddle
23 24 25
import paddle.fluid as fluid
from paddle.fluid.communicator import Communicator

26 27
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
28
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
29

30 31
paddle.enable_static()

32 33

class TestCommunicator(unittest.TestCase):
34
    def net(self):
T
tangwei12 已提交
35
        x = fluid.layers.data(name='x', shape=[1], dtype='float32')
36 37
        y = fluid.layers.data(name='y', shape=[1], dtype='float32')

T
tangwei12 已提交
38
        cost = fluid.layers.square_error_cost(input=x, label=y)
39 40 41
        avg_cost = fluid.layers.mean(cost)
        return avg_cost

42
    def test_communicator_async(self):
43 44 45 46 47 48 49 50 51 52
        role = role_maker.UserDefinedRoleMaker(
            current_id=0,
            role=role_maker.Role.WORKER,
            worker_num=2,
            server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])

        fleet.init(role)
        avg_cost = self.net()

        optimizer = fluid.optimizer.SGD(0.01)
53
        strategy = StrategyFactory.create_async_strategy()
54 55 56
        optimizer = fleet.distributed_optimizer(optimizer, strategy)
        optimizer.minimize(avg_cost)

57
        fleet.init_worker()
58
        time.sleep(10)
59
        fleet.stop_worker()
60 61 62 63


if __name__ == '__main__':
    unittest.main()