未验证 提交 fc5acdd0 编写于 作者: G gongweibao 提交者: GitHub

Fix fleet ut timeout issue. (#26694)

上级 32ae8e81
......@@ -13,6 +13,7 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_se_resnext)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer)
list(APPEND DIST_TEST_OPS test_listen_and_serv_op)
list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer)
set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS})
#remove distribute unittests.
list(APPEND MIXED_DIST_TEST_OPS test_dgc_op)
......@@ -36,7 +37,6 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_base)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_2)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_3)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_recompute_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_pipeline_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_amp_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_gradient_merge_meta_optimizer)
......@@ -454,8 +454,7 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_fleet_base_2 MODULES test_fleet_base_2 ENVS ${dist_ENVS})
py_test_modules(test_fleet_base_3 MODULES test_fleet_base_3 ENVS ${dist_ENVS})
py_test_modules(test_fleet_recompute_meta_optimizer MODULES test_fleet_recompute_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_graph_execution_meta_optimizer MODULES test_fleet_graph_execution_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS})
py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS})
py_test_modules(test_fleet_gradient_merge_meta_optimizer MODULES test_fleet_gradient_merge_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_amp_meta_optimizer MODULES test_fleet_amp_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_pipeline_meta_optimizer MODULES test_fleet_pipeline_meta_optimizer ENVS ${dist_ENVS})
......@@ -490,6 +489,7 @@ if(WITH_DISTRIBUTE)
bash_test_modules(test_launch_ps START_BASH test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch START_BASH test_fleet_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
# port range (20000, 23000) is reserved for dist-ops
set(dist_ut_port 20001)
foreach(TEST_OP ${DIST_TEST_OPS})
bash_test_modules(${TEST_OP} START_BASH dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}")
......
......@@ -15,6 +15,7 @@ from multiprocessing import Pool, Process
import os
import socket
from contextlib import closing
import psutil
def launch_func(func, env_dict):
......@@ -24,6 +25,21 @@ def launch_func(func, env_dict):
return proc
def wait(procs, timeout=None):
# wait
decents = []
for p in procs:
for child in psutil.Process(p.pid).children(recursive=True):
decents.append(child)
gone, alive = psutil.wait_procs(decents, timeout=timeout)
for p in alive:
p.kill()
for p in gone:
if p.returncode != 0:
sys.exit(1)
def _find_free_port(port_set):
def __free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
......
......@@ -15,25 +15,37 @@
import unittest
import paddle
import os
from launch_function_helper import launch_func, _find_free_port
from launch_function_helper import launch_func, wait, _find_free_port
class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
def setUp(self):
try:
self._dist_ut_port_0 = int(os.environ["PADDLE_DIST_UT_PORT"])
self._dist_ut_port_1 = self._dist_ut_port_0 + 1
except Exception as e:
self._dist_ut_port_0 = _find_free_port(set())
self._dist_ut_port_1 = _find_free_port(set())
def test_graph_execution_optimizer_not_apply(self):
port_a = self._dist_ut_port_0
port_b = self._dist_ut_port_1
node_a = {
"PADDLE_TRAINER_ID": "0",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36003",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_a),
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004",
"PADDLE_TRAINER_ENDPOINTS":
"127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b),
"http_proxy": "",
"https_proxy": ""
}
node_b = {
"PADDLE_TRAINER_ID": "1",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36004",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_b),
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004",
"PADDLE_TRAINER_ENDPOINTS":
"127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b),
"http_proxy": "",
"https_proxy": ""
}
......@@ -65,14 +77,11 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
proc_a.start()
proc_b = launch_func(node_func, node_b)
proc_b.start()
proc_a.join()
proc_b.join()
wait([proc_a, proc_b])
def test_graph_execution_optimizer(self):
port_set = set()
port_a = _find_free_port(port_set)
port_b = _find_free_port(port_set)
port_a = self._dist_ut_port_0 + 2
port_b = self._dist_ut_port_1 + 2
node_a = {
"PADDLE_TRAINER_ID": "0",
......@@ -138,24 +147,27 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
proc_a.start()
proc_b = launch_func(node_func, node_b)
proc_b.start()
proc_a.join()
proc_b.join()
wait([proc_a, proc_b])
def test_graph_execution_optimizer_not_apply_v2(self):
port_a = self._dist_ut_port_0 + 4
port_b = self._dist_ut_port_1 + 4
node_a = {
"PADDLE_TRAINER_ID": "0",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36003",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_a),
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004",
"PADDLE_TRAINER_ENDPOINTS":
"127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b),
"http_proxy": "",
"https_proxy": ""
}
node_b = {
"PADDLE_TRAINER_ID": "1",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36004",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_b),
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004",
"PADDLE_TRAINER_ENDPOINTS":
"127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b),
"http_proxy": "",
"https_proxy": ""
}
......@@ -187,24 +199,27 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
proc_a.start()
proc_b = launch_func(node_func, node_b)
proc_b.start()
proc_a.join()
proc_b.join()
wait([proc_a, proc_b])
def test_graph_execution_optimizer(self):
port_a = self._dist_ut_port_0 + 6
port_b = self._dist_ut_port_1 + 6
node_a = {
"PADDLE_TRAINER_ID": "0",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36001",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_a),
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002",
"PADDLE_TRAINER_ENDPOINTS":
"127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b),
"http_proxy": "",
"https_proxy": ""
}
node_b = {
"PADDLE_TRAINER_ID": "1",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36002",
"PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_b),
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002",
"PADDLE_TRAINER_ENDPOINTS":
"127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b),
"http_proxy": "",
"https_proxy": ""
}
......@@ -253,8 +268,7 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
proc_a.start()
proc_b = launch_func(node_func, node_b)
proc_b.start()
proc_a.join()
proc_b.join()
wait([proc_a, proc_b])
if __name__ == "__main__":
......
......@@ -23,3 +23,4 @@ objgraph
astor
pathlib
netifaces
psutil
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册