From 57f0f0f2dcb15127a41b8dd5b2404f47ac39e89d Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 2 Sep 2019 17:26:07 +0800 Subject: [PATCH] Delete pserver complete file before executor running. (#19468) --- .../distributed_ops/fl_listen_and_serv_op.cc | 2 + .../fluid/tests/unittests/CMakeLists.txt | 9 ++-- .../fluid/tests/unittests/dist_test_utils.py | 27 ++++++++++ .../fluid/tests/unittests/test_dist_train.py | 5 +- .../unittests/test_fl_listen_and_serv_op.py | 2 + .../test_hsigmoid_remote_table_op.py | 2 + .../tests/unittests/test_listen_and_serv.sh | 49 +++++++++++++++++++ .../unittests/test_listen_and_serv_op.py | 31 +++++++++++- .../unittests/test_lookup_remote_table_op.py | 2 + .../unittests/test_nce_remote_table_op.py | 2 + 10 files changed, 123 insertions(+), 8 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/dist_test_utils.py create mode 100644 python/paddle/fluid/tests/unittests/test_listen_and_serv.sh diff --git a/paddle/fluid/operators/distributed_ops/fl_listen_and_serv_op.cc b/paddle/fluid/operators/distributed_ops/fl_listen_and_serv_op.cc index 9ede49c9716..07c864eefe2 100644 --- a/paddle/fluid/operators/distributed_ops/fl_listen_and_serv_op.cc +++ b/paddle/fluid/operators/distributed_ops/fl_listen_and_serv_op.cc @@ -265,6 +265,8 @@ class FlListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { void FlSignalHandler::StopAndExit(int signal_num) { // Do not use VLOG here for the device for printing maybe already released. // exit will release interal allocated resoureces. + auto file_path = string::Sprintf("/tmp/paddle.%d.port", ::getpid()); + remove(file_path.c_str()); exit(0); } diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 42f1ae3a0bc..3021b8e7ef3 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -242,14 +242,15 @@ if(WITH_DISTRIBUTE) py_test_modules(test_lookup_remote_table_op MODULES test_lookup_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_hsigmoid_remote_table_op MODULES test_hsigmoid_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS}) - py_test_modules(test_listen_and_serv_op MODULES test_listen_and_serv_op ENVS ${dist_ENVS}) - set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20 LABELS "RUN_TYPE=EXCLUSIVE") - set_tests_properties(test_listen_and_serv_op test_nce_remote_table_op test_hsigmoid_remote_table_op - PROPERTIES LABELS "RUN_TYPE=DIST") + #py_test_modules(test_listen_and_serv_op MODULES test_listen_and_serv_op ENVS ${dist_ENVS}) if(WITH_DGC) py_test_modules(test_dgc_op MODULES test_dgc_op) endif() if(NOT APPLE) + bash_test_modules(test_listen_and_serv_op MODULES test_listen_and_serv.sh) + set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 100 LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_listen_and_serv_op test_nce_remote_table_op test_hsigmoid_remote_table_op PROPERTIES LABELS "RUN_TYPE=DIST") + set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_dist_mnist_dgc_nccl PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_dist_mnist_hallreduce PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE") diff --git a/python/paddle/fluid/tests/unittests/dist_test_utils.py b/python/paddle/fluid/tests/unittests/dist_test_utils.py new file mode 100644 index 00000000000..7725a07aa5a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dist_test_utils.py @@ -0,0 +1,27 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os, errno + + +def silentremove(filename): + try: + os.remove(filename) + except OSError as e: # this would be "except OSError, e:" before Python 2.6 + if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory + raise # re-raise exception if a different error occurred + + +def remove_ps_flag(pid): + silentremove("/tmp/paddle.%d.port" % pid) diff --git a/python/paddle/fluid/tests/unittests/test_dist_train.py b/python/paddle/fluid/tests/unittests/test_dist_train.py index 6c49ee757e6..e9f39f10904 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_train.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -28,6 +28,7 @@ from paddle.fluid.layers.io import ListenAndServ from paddle.fluid.layers.io import Recv from paddle.fluid.layers.io import Send import paddle.fluid.layers.ops as ops +from dist_test_utils import * from paddle.fluid import core @@ -38,6 +39,7 @@ RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC class TestSendOp(unittest.TestCase): def test_send(self): + remove_ps_flag(os.getpid()) # Run init_serv in a thread place = fluid.CPUPlace() # NOTE: python thread will not work here due to GIL. @@ -55,8 +57,7 @@ class TestSendOp(unittest.TestCase): self.run_local(place) self.assertTrue(numpy.allclose(self.local_out, self.dist_out)) - # FIXME(typhoonzero): find a way to gracefully shutdown the server. - os.kill(p.pid, signal.SIGKILL) + os.kill(p.pid, signal.SIGINT) p.join() def _wait_ps_ready(self, pid): diff --git a/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py b/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py index 3641501894a..fa393074a48 100644 --- a/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py +++ b/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py @@ -27,6 +27,7 @@ from op_test import OpTest import numpy import urllib import sys +from dist_test_utils import * def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): @@ -65,6 +66,7 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): + remove_ps_flag(os.getpid()) x = fluid.layers.data(name='x', shape=[1], dtype='float32') y_predict = fluid.layers.fc(input=x, size=1, act=None) y = fluid.layers.data(name='y', shape=[1], dtype='float32') diff --git a/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py index da343dd503a..014d30486d7 100644 --- a/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py +++ b/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py @@ -25,9 +25,11 @@ import paddle.fluid as fluid import paddle.fluid.core as core from paddle.fluid.op import Operator from paddle.fluid.framework import Program, program_guard +from dist_test_utils import * def run_pserver(pserver_id, use_cuda, sync_mode): + remove_ps_flag(os.getpid()) scope = fluid.core.Scope() program = Program() with fluid.scope_guard(scope): diff --git a/python/paddle/fluid/tests/unittests/test_listen_and_serv.sh b/python/paddle/fluid/tests/unittests/test_listen_and_serv.sh new file mode 100644 index 00000000000..f47e869f9b7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_listen_and_serv.sh @@ -0,0 +1,49 @@ +#!/bin/bash +unset https_proxy http_proxy + +nohup python -u test_listen_and_serv_op.py > test_listen_and_serv_op.log 2>&1 & +pid=$! + +flag1=test_handle_signal_in_serv_op.flag +flag2=test_list_and_serv_run_empty_optimize_block.flag + +for i in {1..10}; do + sleep 3s + if [[ -f "${flag1}" && -f "${flag2}" ]]; then + echo "test_listen_and_serv_op exit" + exit 0 + fi +done + +echo "test_listen_and_serv_op.log context" +cat test_listen_and_serv_op.log + +#display system context +for i in {1..4}; do + sleep 2 + top -b -n1 | head -n 50 + echo "${i}" + top -b -n1 -i | head -n 50 + nvidia-smi +done + +#display /tmp/files +ls -l /tmp/paddle.* + +if ! pgrep -x test_listen_and_serv_op; then + exit 1 +fi + +kill -9 $pid + +echo "after kill ${pid}" + +#display system context +for i in {1..4}; do + sleep 2 + top -b -n1 | head -n 50 + top -b -n1 -i | head -n 50 + nvidia-smi +done + +exit 1 diff --git a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py index e940359b366..07a0ae9a82e 100644 --- a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py +++ b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py @@ -14,9 +14,13 @@ from __future__ import print_function +from dist_test_utils import * + +silentremove("test_handle_signal_in_serv_op.flag") +silentremove("test_list_and_serv_run_empty_optimize_block.flag") + import paddle import paddle.fluid as fluid -import os import signal import subprocess import time @@ -26,6 +30,7 @@ from op_test import OpTest def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): + remove_ps_flag(os.getpid()) x = fluid.layers.data(name='x', shape=[1], dtype='float32') y_predict = fluid.layers.fc(input=x, size=1, act=None) y = fluid.layers.data(name='y', shape=[1], dtype='float32') @@ -56,6 +61,7 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, trainer_id): + remove_ps_flag(os.getpid()) x = fluid.layers.data(name='x', shape=[1], dtype='float32') y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) y = fluid.layers.data(name='y', shape=[1], dtype='float32') @@ -92,7 +98,12 @@ def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, exe.run(pserver_prog) -class TestListenAndServOp(OpTest): +def gen_complete_file_flag(flag_file): + with open(flag_file, "w") as f: + f.write("complete") + + +class TestListenAndServOp(unittest.TestCase): def setUp(self): self.ps_timeout = 5 self.ip = "127.0.0.1" @@ -130,36 +141,52 @@ class TestListenAndServOp(OpTest): def test_handle_signal_in_serv_op(self): # run pserver on CPU in sync mode p1 = self._start_pserver(False, True, run_pserver) + print("test_handle_signal_in_serv_op before _wait_ps_ready") self._wait_ps_ready(p1.pid) # raise SIGTERM to pserver os.kill(p1.pid, signal.SIGINT) + print("test_handle_signal_in_serv_op after kill pid:", p1.pid) p1.join() # run pserver on CPU in async mode p2 = self._start_pserver(False, False, run_pserver) + print("test_handle_signal_in_serv_op after start p2 pid:", p2.pid) self._wait_ps_ready(p2.pid) # raise SIGTERM to pserver os.kill(p2.pid, signal.SIGTERM) + print("test_handle_signal_in_serv_op before join p2 pid:", p2.pid) p2.join() + gen_complete_file_flag("test_handle_signal_in_serv_op.flag") + def test_list_and_serv_run_empty_optimize_block(self): # run pserver on CPU in sync mode p1 = self._start_pserver(False, True, run_pserver_with_empty_block) + print( + "test_list_and_serv_run_empty_optimize_block before _wait_ps_ready") self._wait_ps_ready(p1.pid) # raise SIGTERM to pserver os.kill(p1.pid, signal.SIGINT) + print("test_list_and_serv_run_empty_optimize_block after kill pid:", + p1.pid) p1.join() # run pserver on CPU in async mode p2 = self._start_pserver(False, False, run_pserver_with_empty_block) + print("test_list_and_serv_run_empty_optimize_block after start p2 pid:", + p2.pid) self._wait_ps_ready(p2.pid) # raise SIGTERM to pserver os.kill(p2.pid, signal.SIGTERM) + print("test_list_and_serv_run_empty_optimize_block before join p2 pid:", + p2.pid) p2.join() + gen_complete_file_flag( + "test_list_and_serv_run_empty_optimize_block.flag") if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py index 7aad9a3bcd5..1b02c8d19ad 100644 --- a/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py +++ b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py @@ -25,9 +25,11 @@ import paddle.fluid as fluid import paddle.fluid.core as core from paddle.fluid.op import Operator from paddle.fluid.framework import Program, program_guard +from dist_test_utils import * def run_pserver(pserver_id, use_cuda, sync_mode): + remove_ps_flag(os.getgid()) scope = fluid.core.Scope() program = Program() with fluid.scope_guard(scope): diff --git a/python/paddle/fluid/tests/unittests/test_nce_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_nce_remote_table_op.py index d24532b95fb..3ec69923a11 100644 --- a/python/paddle/fluid/tests/unittests/test_nce_remote_table_op.py +++ b/python/paddle/fluid/tests/unittests/test_nce_remote_table_op.py @@ -25,6 +25,7 @@ import paddle.fluid as fluid import paddle.fluid.core as core from paddle.fluid.op import Operator from paddle.fluid.framework import Program, program_guard +from dist_test_utils import * def nce(input, weight, bias, sample_weight, labels, num_classes, @@ -67,6 +68,7 @@ def nce(input, weight, bias, sample_weight, labels, num_classes, def run_pserver(pserver_id, use_cuda, sync_mode): + remove_ps_flag(os.getpid()) scope = fluid.core.Scope() program = Program() with fluid.scope_guard(scope): -- GitLab