未验证 提交 57f0f0f2 编写于 作者: G gongweibao 提交者: GitHub

Delete pserver complete file before executor running. (#19468)

上级 4a7e6deb
...@@ -265,6 +265,8 @@ class FlListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -265,6 +265,8 @@ class FlListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
void FlSignalHandler::StopAndExit(int signal_num) { void FlSignalHandler::StopAndExit(int signal_num) {
// Do not use VLOG here for the device for printing maybe already released. // Do not use VLOG here for the device for printing maybe already released.
// exit will release interal allocated resoureces. // exit will release interal allocated resoureces.
auto file_path = string::Sprintf("/tmp/paddle.%d.port", ::getpid());
remove(file_path.c_str());
exit(0); exit(0);
} }
......
...@@ -242,14 +242,15 @@ if(WITH_DISTRIBUTE) ...@@ -242,14 +242,15 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_lookup_remote_table_op MODULES test_lookup_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_lookup_remote_table_op MODULES test_lookup_remote_table_op ENVS ${dist_ENVS})
py_test_modules(test_hsigmoid_remote_table_op MODULES test_hsigmoid_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_hsigmoid_remote_table_op MODULES test_hsigmoid_remote_table_op ENVS ${dist_ENVS})
py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS})
py_test_modules(test_listen_and_serv_op MODULES test_listen_and_serv_op ENVS ${dist_ENVS}) #py_test_modules(test_listen_and_serv_op MODULES test_listen_and_serv_op ENVS ${dist_ENVS})
set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 20 LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_listen_and_serv_op test_nce_remote_table_op test_hsigmoid_remote_table_op
PROPERTIES LABELS "RUN_TYPE=DIST")
if(WITH_DGC) if(WITH_DGC)
py_test_modules(test_dgc_op MODULES test_dgc_op) py_test_modules(test_dgc_op MODULES test_dgc_op)
endif() endif()
if(NOT APPLE) if(NOT APPLE)
bash_test_modules(test_listen_and_serv_op MODULES test_listen_and_serv.sh)
set_tests_properties(test_listen_and_serv_op PROPERTIES TIMEOUT 100 LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_listen_and_serv_op test_nce_remote_table_op test_hsigmoid_remote_table_op PROPERTIES LABELS "RUN_TYPE=DIST")
set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_dist_mnist PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_dist_mnist_dgc_nccl PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_dist_mnist_dgc_nccl PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_dist_mnist_hallreduce PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_dist_mnist_hallreduce PROPERTIES TIMEOUT 350 LABELS "RUN_TYPE=EXCLUSIVE")
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, errno
def silentremove(filename):
try:
os.remove(filename)
except OSError as e: # this would be "except OSError, e:" before Python 2.6
if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory
raise # re-raise exception if a different error occurred
def remove_ps_flag(pid):
silentremove("/tmp/paddle.%d.port" % pid)
...@@ -28,6 +28,7 @@ from paddle.fluid.layers.io import ListenAndServ ...@@ -28,6 +28,7 @@ from paddle.fluid.layers.io import ListenAndServ
from paddle.fluid.layers.io import Recv from paddle.fluid.layers.io import Recv
from paddle.fluid.layers.io import Send from paddle.fluid.layers.io import Send
import paddle.fluid.layers.ops as ops import paddle.fluid.layers.ops as ops
from dist_test_utils import *
from paddle.fluid import core from paddle.fluid import core
...@@ -38,6 +39,7 @@ RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC ...@@ -38,6 +39,7 @@ RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC
class TestSendOp(unittest.TestCase): class TestSendOp(unittest.TestCase):
def test_send(self): def test_send(self):
remove_ps_flag(os.getpid())
# Run init_serv in a thread # Run init_serv in a thread
place = fluid.CPUPlace() place = fluid.CPUPlace()
# NOTE: python thread will not work here due to GIL. # NOTE: python thread will not work here due to GIL.
...@@ -55,8 +57,7 @@ class TestSendOp(unittest.TestCase): ...@@ -55,8 +57,7 @@ class TestSendOp(unittest.TestCase):
self.run_local(place) self.run_local(place)
self.assertTrue(numpy.allclose(self.local_out, self.dist_out)) self.assertTrue(numpy.allclose(self.local_out, self.dist_out))
# FIXME(typhoonzero): find a way to gracefully shutdown the server. os.kill(p.pid, signal.SIGINT)
os.kill(p.pid, signal.SIGKILL)
p.join() p.join()
def _wait_ps_ready(self, pid): def _wait_ps_ready(self, pid):
......
...@@ -27,6 +27,7 @@ from op_test import OpTest ...@@ -27,6 +27,7 @@ from op_test import OpTest
import numpy import numpy
import urllib import urllib
import sys import sys
from dist_test_utils import *
def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id):
...@@ -65,6 +66,7 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -65,6 +66,7 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id):
def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
remove_ps_flag(os.getpid())
x = fluid.layers.data(name='x', shape=[1], dtype='float32') x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None) y_predict = fluid.layers.fc(input=x, size=1, act=None)
y = fluid.layers.data(name='y', shape=[1], dtype='float32') y = fluid.layers.data(name='y', shape=[1], dtype='float32')
......
...@@ -25,9 +25,11 @@ import paddle.fluid as fluid ...@@ -25,9 +25,11 @@ import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.fluid.op import Operator from paddle.fluid.op import Operator
from paddle.fluid.framework import Program, program_guard from paddle.fluid.framework import Program, program_guard
from dist_test_utils import *
def run_pserver(pserver_id, use_cuda, sync_mode): def run_pserver(pserver_id, use_cuda, sync_mode):
remove_ps_flag(os.getpid())
scope = fluid.core.Scope() scope = fluid.core.Scope()
program = Program() program = Program()
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
......
#!/bin/bash
unset https_proxy http_proxy
nohup python -u test_listen_and_serv_op.py > test_listen_and_serv_op.log 2>&1 &
pid=$!
flag1=test_handle_signal_in_serv_op.flag
flag2=test_list_and_serv_run_empty_optimize_block.flag
for i in {1..10}; do
sleep 3s
if [[ -f "${flag1}" && -f "${flag2}" ]]; then
echo "test_listen_and_serv_op exit"
exit 0
fi
done
echo "test_listen_and_serv_op.log context"
cat test_listen_and_serv_op.log
#display system context
for i in {1..4}; do
sleep 2
top -b -n1 | head -n 50
echo "${i}"
top -b -n1 -i | head -n 50
nvidia-smi
done
#display /tmp/files
ls -l /tmp/paddle.*
if ! pgrep -x test_listen_and_serv_op; then
exit 1
fi
kill -9 $pid
echo "after kill ${pid}"
#display system context
for i in {1..4}; do
sleep 2
top -b -n1 | head -n 50
top -b -n1 -i | head -n 50
nvidia-smi
done
exit 1
...@@ -14,9 +14,13 @@ ...@@ -14,9 +14,13 @@
from __future__ import print_function from __future__ import print_function
from dist_test_utils import *
silentremove("test_handle_signal_in_serv_op.flag")
silentremove("test_list_and_serv_run_empty_optimize_block.flag")
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import os
import signal import signal
import subprocess import subprocess
import time import time
...@@ -26,6 +30,7 @@ from op_test import OpTest ...@@ -26,6 +30,7 @@ from op_test import OpTest
def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
remove_ps_flag(os.getpid())
x = fluid.layers.data(name='x', shape=[1], dtype='float32') x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None) y_predict = fluid.layers.fc(input=x, size=1, act=None)
y = fluid.layers.data(name='y', shape=[1], dtype='float32') y = fluid.layers.data(name='y', shape=[1], dtype='float32')
...@@ -56,6 +61,7 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -56,6 +61,7 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers,
trainer_id): trainer_id):
remove_ps_flag(os.getpid())
x = fluid.layers.data(name='x', shape=[1], dtype='float32') x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False) y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False)
y = fluid.layers.data(name='y', shape=[1], dtype='float32') y = fluid.layers.data(name='y', shape=[1], dtype='float32')
...@@ -92,7 +98,12 @@ def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, ...@@ -92,7 +98,12 @@ def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers,
exe.run(pserver_prog) exe.run(pserver_prog)
class TestListenAndServOp(OpTest): def gen_complete_file_flag(flag_file):
with open(flag_file, "w") as f:
f.write("complete")
class TestListenAndServOp(unittest.TestCase):
def setUp(self): def setUp(self):
self.ps_timeout = 5 self.ps_timeout = 5
self.ip = "127.0.0.1" self.ip = "127.0.0.1"
...@@ -130,36 +141,52 @@ class TestListenAndServOp(OpTest): ...@@ -130,36 +141,52 @@ class TestListenAndServOp(OpTest):
def test_handle_signal_in_serv_op(self): def test_handle_signal_in_serv_op(self):
# run pserver on CPU in sync mode # run pserver on CPU in sync mode
p1 = self._start_pserver(False, True, run_pserver) p1 = self._start_pserver(False, True, run_pserver)
print("test_handle_signal_in_serv_op before _wait_ps_ready")
self._wait_ps_ready(p1.pid) self._wait_ps_ready(p1.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
os.kill(p1.pid, signal.SIGINT) os.kill(p1.pid, signal.SIGINT)
print("test_handle_signal_in_serv_op after kill pid:", p1.pid)
p1.join() p1.join()
# run pserver on CPU in async mode # run pserver on CPU in async mode
p2 = self._start_pserver(False, False, run_pserver) p2 = self._start_pserver(False, False, run_pserver)
print("test_handle_signal_in_serv_op after start p2 pid:", p2.pid)
self._wait_ps_ready(p2.pid) self._wait_ps_ready(p2.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
os.kill(p2.pid, signal.SIGTERM) os.kill(p2.pid, signal.SIGTERM)
print("test_handle_signal_in_serv_op before join p2 pid:", p2.pid)
p2.join() p2.join()
gen_complete_file_flag("test_handle_signal_in_serv_op.flag")
def test_list_and_serv_run_empty_optimize_block(self): def test_list_and_serv_run_empty_optimize_block(self):
# run pserver on CPU in sync mode # run pserver on CPU in sync mode
p1 = self._start_pserver(False, True, run_pserver_with_empty_block) p1 = self._start_pserver(False, True, run_pserver_with_empty_block)
print(
"test_list_and_serv_run_empty_optimize_block before _wait_ps_ready")
self._wait_ps_ready(p1.pid) self._wait_ps_ready(p1.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
os.kill(p1.pid, signal.SIGINT) os.kill(p1.pid, signal.SIGINT)
print("test_list_and_serv_run_empty_optimize_block after kill pid:",
p1.pid)
p1.join() p1.join()
# run pserver on CPU in async mode # run pserver on CPU in async mode
p2 = self._start_pserver(False, False, run_pserver_with_empty_block) p2 = self._start_pserver(False, False, run_pserver_with_empty_block)
print("test_list_and_serv_run_empty_optimize_block after start p2 pid:",
p2.pid)
self._wait_ps_ready(p2.pid) self._wait_ps_ready(p2.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
os.kill(p2.pid, signal.SIGTERM) os.kill(p2.pid, signal.SIGTERM)
print("test_list_and_serv_run_empty_optimize_block before join p2 pid:",
p2.pid)
p2.join() p2.join()
gen_complete_file_flag(
"test_list_and_serv_run_empty_optimize_block.flag")
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -25,9 +25,11 @@ import paddle.fluid as fluid ...@@ -25,9 +25,11 @@ import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.fluid.op import Operator from paddle.fluid.op import Operator
from paddle.fluid.framework import Program, program_guard from paddle.fluid.framework import Program, program_guard
from dist_test_utils import *
def run_pserver(pserver_id, use_cuda, sync_mode): def run_pserver(pserver_id, use_cuda, sync_mode):
remove_ps_flag(os.getgid())
scope = fluid.core.Scope() scope = fluid.core.Scope()
program = Program() program = Program()
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
......
...@@ -25,6 +25,7 @@ import paddle.fluid as fluid ...@@ -25,6 +25,7 @@ import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.fluid.op import Operator from paddle.fluid.op import Operator
from paddle.fluid.framework import Program, program_guard from paddle.fluid.framework import Program, program_guard
from dist_test_utils import *
def nce(input, weight, bias, sample_weight, labels, num_classes, def nce(input, weight, bias, sample_weight, labels, num_classes,
...@@ -67,6 +68,7 @@ def nce(input, weight, bias, sample_weight, labels, num_classes, ...@@ -67,6 +68,7 @@ def nce(input, weight, bias, sample_weight, labels, num_classes,
def run_pserver(pserver_id, use_cuda, sync_mode): def run_pserver(pserver_id, use_cuda, sync_mode):
remove_ps_flag(os.getpid())
scope = fluid.core.Scope() scope = fluid.core.Scope()
program = Program() program = Program()
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册