diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 9a5b7d48509e750fd51250961418a43a830d68db..e53c49b13e0330e7e59644d366fefb1246142259 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,7 +15,6 @@ if(NOT WITH_DISTRIBUTE) list(REMOVE_ITEM TEST_OPS test_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_simple_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op) - list(REMOVE_ITEM TEST_OPS test_pserver_run_empty_optimize_block) LIST(REMOVE_ITEM TEST_OPS test_dist_mnist) LIST(REMOVE_ITEM TEST_OPS test_dist_word2vec) endif(NOT WITH_DISTRIBUTE) @@ -75,7 +74,6 @@ py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=$ if(WITH_DISTRIBUTE) py_test_modules(test_dist_train MODULES test_dist_train SERIAL) set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20) - set_tests_properties(test_pserver_run_empty_optimize_block PROPERTIES TIMEOUT 20) if(NOT APPLE) set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 200) set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200) diff --git a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py index 48b52a5412eb99fbc7a5c8534a766ede4954e849..a0358f8b401e301312b5b9c0b18733d4275045e3 100644 --- a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py +++ b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py @@ -55,6 +55,46 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): exe.run(pserver_prog) +def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, + trainer_id): + x = fluid.layers.data(name='x', shape=[1], dtype='float32') + y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + + # loss function + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + + # optimizer + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + ps1 = ip + ":" + str(int(port) + 1) + ps2 = ip + ":" + port + pserver_endpoints = ps1 + "," + ps2 + + config = fluid.DistributeTranspilerConfig() + config.slice_var_up = False + t = fluid.DistributeTranspiler(config=config) + t.transpile( + trainer_id, + pservers=pserver_endpoints, + trainers=trainers, + sync_mode=sync_mode) + pserver_prog = t.get_pserver_program(ps2) + + # pserver2 have no parameter + assert (len(pserver_prog.blocks) == 2) + assert (len(pserver_prog.blocks[1].ops) == 0) + + pserver_startup = t.get_startup_program(ps2, pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + + class TestListenAndServOp(OpTest): def setUp(self): self.ps_timeout = 5 @@ -63,9 +103,9 @@ class TestListenAndServOp(OpTest): self.trainers = 1 self.trainer_id = 0 - def _start_pserver(self, use_cuda, sync_mode): + def _start_pserver(self, use_cuda, sync_mode, pserver_func): p = Process( - target=run_pserver, + target=pserver_func, args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, self.trainer_id)) p.daemon = True @@ -92,7 +132,24 @@ class TestListenAndServOp(OpTest): def test_handle_signal_in_serv_op(self): # run pserver on CPU in sync mode - p1 = self._start_pserver(False, True) + p1 = self._start_pserver(False, True, run_pserver) + self._wait_ps_ready(p1.pid) + + # raise SIGTERM to pserver + os.kill(p1.pid, signal.SIGINT) + p1.join() + + # run pserver on CPU in async mode + p2 = self._start_pserver(False, False, run_pserver) + self._wait_ps_ready(p2.pid) + + # raise SIGTERM to pserver + os.kill(p2.pid, signal.SIGTERM) + p2.join() + + def test_list_and_serv_run_empty_optimize_block(self): + # run pserver on CPU in sync mode + p1 = self._start_pserver(False, True, run_pserver_with_empty_block) self._wait_ps_ready(p1.pid) # raise SIGTERM to pserver @@ -100,7 +157,7 @@ class TestListenAndServOp(OpTest): p1.join() # run pserver on CPU in async mode - p2 = self._start_pserver(False, False) + p2 = self._start_pserver(False, False, run_pserver_with_empty_block) self._wait_ps_ready(p2.pid) # raise SIGTERM to pserver diff --git a/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py b/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py deleted file mode 100644 index 197ce9de563225472b8f384f541ccde9a3adb592..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import paddle -import paddle.fluid as fluid -import os -import signal -import subprocess -import time -import unittest -from multiprocessing import Process -from op_test import OpTest - - -def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): - x = fluid.layers.data(name='x', shape=[1], dtype='float32') - y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) - y = fluid.layers.data(name='y', shape=[1], dtype='float32') - - # loss function - cost = fluid.layers.square_error_cost(input=y_predict, label=y) - avg_cost = fluid.layers.mean(cost) - - # optimizer - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(avg_cost) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - - ps1 = ip + ":" + str(int(port) + 1) - ps2 = ip + ":" + port - pserver_endpoints = ps1 + "," + ps2 - - config = fluid.DistributeTranspilerConfig() - config.slice_var_up = False - t = fluid.DistributeTranspiler(config=config) - t.transpile( - trainer_id, - pservers=pserver_endpoints, - trainers=trainers, - sync_mode=sync_mode) - pserver_prog = t.get_pserver_program(ps2) - - # pserver2 have no parameter - assert (len(pserver_prog.blocks), 2) - assert (len(pserver_prog.blocks[1].ops), 0) - - pserver_startup = t.get_startup_program(ps2, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) - - -class TestListenAndServOp(OpTest): - def setUp(self): - self.ps_timeout = 5 - self.ip = "127.0.0.1" - self.port = "0" - self.trainers = 1 - self.trainer_id = 0 - - def _start_pserver(self, use_cuda, sync_mode): - p = Process( - target=run_pserver, - args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, - self.trainer_id)) - p.daemon = True - p.start() - return p - - def _wait_ps_ready(self, pid): - start_left_time = self.ps_timeout - sleep_time = 0.5 - while True: - assert start_left_time >= 0, "wait ps ready failed" - time.sleep(sleep_time) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - start_left_time -= sleep_time - - def test_handle_signal_in_serv_op(self): - # run pserver on CPU in sync mode - p1 = self._start_pserver(False, True) - self._wait_ps_ready(p1.pid) - - # raise SIGTERM to pserver - os.kill(p1.pid, signal.SIGINT) - p1.join() - - # run pserver on CPU in async mode - p2 = self._start_pserver(False, False) - self._wait_ps_ready(p2.pid) - - # raise SIGTERM to pserver - os.kill(p2.pid, signal.SIGTERM) - p2.join() - - -if __name__ == '__main__': - unittest.main()