diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index e53c49b13e0330e7e59644d366fefb1246142259..9a5b7d48509e750fd51250961418a43a830d68db 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,6 +15,7 @@ if(NOT WITH_DISTRIBUTE) list(REMOVE_ITEM TEST_OPS test_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_simple_dist_transpiler) list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op) + list(REMOVE_ITEM TEST_OPS test_pserver_run_empty_optimize_block) LIST(REMOVE_ITEM TEST_OPS test_dist_mnist) LIST(REMOVE_ITEM TEST_OPS test_dist_word2vec) endif(NOT WITH_DISTRIBUTE) @@ -74,6 +75,7 @@ py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=$ if(WITH_DISTRIBUTE) py_test_modules(test_dist_train MODULES test_dist_train SERIAL) set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20) + set_tests_properties(test_pserver_run_empty_optimize_block PROPERTIES TIMEOUT 20) if(NOT APPLE) set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 200) set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200) diff --git a/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py b/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py new file mode 100644 index 0000000000000000000000000000000000000000..197ce9de563225472b8f384f541ccde9a3adb592 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_pserver_run_empty_optimize_block.py @@ -0,0 +1,117 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import paddle +import paddle.fluid as fluid +import os +import signal +import subprocess +import time +import unittest +from multiprocessing import Process +from op_test import OpTest + + +def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): + x = fluid.layers.data(name='x', shape=[1], dtype='float32') + y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + + # loss function + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + + # optimizer + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + ps1 = ip + ":" + str(int(port) + 1) + ps2 = ip + ":" + port + pserver_endpoints = ps1 + "," + ps2 + + config = fluid.DistributeTranspilerConfig() + config.slice_var_up = False + t = fluid.DistributeTranspiler(config=config) + t.transpile( + trainer_id, + pservers=pserver_endpoints, + trainers=trainers, + sync_mode=sync_mode) + pserver_prog = t.get_pserver_program(ps2) + + # pserver2 have no parameter + assert (len(pserver_prog.blocks), 2) + assert (len(pserver_prog.blocks[1].ops), 0) + + pserver_startup = t.get_startup_program(ps2, pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + + +class TestListenAndServOp(OpTest): + def setUp(self): + self.ps_timeout = 5 + self.ip = "127.0.0.1" + self.port = "0" + self.trainers = 1 + self.trainer_id = 0 + + def _start_pserver(self, use_cuda, sync_mode): + p = Process( + target=run_pserver, + args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, + self.trainer_id)) + p.daemon = True + p.start() + return p + + def _wait_ps_ready(self, pid): + start_left_time = self.ps_timeout + sleep_time = 0.5 + while True: + assert start_left_time >= 0, "wait ps ready failed" + time.sleep(sleep_time) + try: + # the listen_and_serv_op would touch a file which contains the listen port + # on the /tmp directory until it was ready to process all the RPC call. + os.stat("/tmp/paddle.%d.port" % pid) + return + except os.error: + start_left_time -= sleep_time + + def test_handle_signal_in_serv_op(self): + # run pserver on CPU in sync mode + p1 = self._start_pserver(False, True) + self._wait_ps_ready(p1.pid) + + # raise SIGTERM to pserver + os.kill(p1.pid, signal.SIGINT) + p1.join() + + # run pserver on CPU in async mode + p2 = self._start_pserver(False, False) + self._wait_ps_ready(p2.pid) + + # raise SIGTERM to pserver + os.kill(p2.pid, signal.SIGTERM) + p2.join() + + +if __name__ == '__main__': + unittest.main()