diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index aa4fb445b4d75c7dc800e412a18376bede9bd446..7e51ecf07e5992a3eb05027f118466c38bfb9b11 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -13,6 +13,7 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_se_resnext) list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding) list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer) list(APPEND DIST_TEST_OPS test_listen_and_serv_op) +list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS}) #remove distribute unittests. list(APPEND MIXED_DIST_TEST_OPS test_dgc_op) @@ -36,7 +37,6 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_base) list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_2) list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_3) list(APPEND MIXED_DIST_TEST_OPS test_fleet_recompute_meta_optimizer) -list(APPEND MIXED_DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_pipeline_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_amp_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_gradient_merge_meta_optimizer) @@ -454,8 +454,7 @@ if(WITH_DISTRIBUTE) py_test_modules(test_fleet_base_2 MODULES test_fleet_base_2 ENVS ${dist_ENVS}) py_test_modules(test_fleet_base_3 MODULES test_fleet_base_3 ENVS ${dist_ENVS}) py_test_modules(test_fleet_recompute_meta_optimizer MODULES test_fleet_recompute_meta_optimizer ENVS ${dist_ENVS}) - py_test_modules(test_fleet_graph_execution_meta_optimizer MODULES test_fleet_graph_execution_meta_optimizer ENVS ${dist_ENVS}) - py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS}) + py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS}) py_test_modules(test_fleet_gradient_merge_meta_optimizer MODULES test_fleet_gradient_merge_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_amp_meta_optimizer MODULES test_fleet_amp_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_pipeline_meta_optimizer MODULES test_fleet_pipeline_meta_optimizer ENVS ${dist_ENVS}) @@ -490,6 +489,7 @@ if(WITH_DISTRIBUTE) bash_test_modules(test_launch_ps START_BASH test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_fleet_launch START_BASH test_fleet_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + # port range (20000, 23000) is reserved for dist-ops set(dist_ut_port 20001) foreach(TEST_OP ${DIST_TEST_OPS}) bash_test_modules(${TEST_OP} START_BASH dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}") diff --git a/python/paddle/fluid/tests/unittests/launch_function_helper.py b/python/paddle/fluid/tests/unittests/launch_function_helper.py index ecfe39b80e9051d332bb8fd2a05de2fa53770e46..13041827ffeabd3d6b79e4f34a67bd09624e54f6 100644 --- a/python/paddle/fluid/tests/unittests/launch_function_helper.py +++ b/python/paddle/fluid/tests/unittests/launch_function_helper.py @@ -15,6 +15,7 @@ from multiprocessing import Pool, Process import os import socket from contextlib import closing +import psutil def launch_func(func, env_dict): @@ -24,6 +25,21 @@ def launch_func(func, env_dict): return proc +def wait(procs, timeout=None): + # wait + decents = [] + for p in procs: + for child in psutil.Process(p.pid).children(recursive=True): + decents.append(child) + + gone, alive = psutil.wait_procs(decents, timeout=timeout) + for p in alive: + p.kill() + for p in gone: + if p.returncode != 0: + sys.exit(1) + + def _find_free_port(port_set): def __free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: diff --git a/python/paddle/fluid/tests/unittests/test_fleet_graph_execution_meta_optimizer.py b/python/paddle/fluid/tests/unittests/test_fleet_graph_execution_meta_optimizer.py index 2ac1dfc0303defaadd48d5d7457643405a4f09a0..9eec73116cc283b58d3ee39cefb9256e12d4ef15 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_graph_execution_meta_optimizer.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_graph_execution_meta_optimizer.py @@ -15,25 +15,37 @@ import unittest import paddle import os -from launch_function_helper import launch_func, _find_free_port +from launch_function_helper import launch_func, wait, _find_free_port class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): + def setUp(self): + try: + self._dist_ut_port_0 = int(os.environ["PADDLE_DIST_UT_PORT"]) + self._dist_ut_port_1 = self._dist_ut_port_0 + 1 + except Exception as e: + self._dist_ut_port_0 = _find_free_port(set()) + self._dist_ut_port_1 = _find_free_port(set()) + def test_graph_execution_optimizer_not_apply(self): + port_a = self._dist_ut_port_0 + port_b = self._dist_ut_port_1 node_a = { "PADDLE_TRAINER_ID": "0", - "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36003", + "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_a), "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004", + "PADDLE_TRAINER_ENDPOINTS": + "127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b), "http_proxy": "", "https_proxy": "" } node_b = { "PADDLE_TRAINER_ID": "1", - "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36004", + "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_b), "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004", + "PADDLE_TRAINER_ENDPOINTS": + "127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b), "http_proxy": "", "https_proxy": "" } @@ -65,14 +77,11 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): proc_a.start() proc_b = launch_func(node_func, node_b) proc_b.start() - proc_a.join() - proc_b.join() + wait([proc_a, proc_b]) def test_graph_execution_optimizer(self): - - port_set = set() - port_a = _find_free_port(port_set) - port_b = _find_free_port(port_set) + port_a = self._dist_ut_port_0 + 2 + port_b = self._dist_ut_port_1 + 2 node_a = { "PADDLE_TRAINER_ID": "0", @@ -138,24 +147,27 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): proc_a.start() proc_b = launch_func(node_func, node_b) proc_b.start() - proc_a.join() - proc_b.join() + wait([proc_a, proc_b]) def test_graph_execution_optimizer_not_apply_v2(self): + port_a = self._dist_ut_port_0 + 4 + port_b = self._dist_ut_port_1 + 4 node_a = { "PADDLE_TRAINER_ID": "0", - "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36003", + "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_a), "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004", + "PADDLE_TRAINER_ENDPOINTS": + "127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b), "http_proxy": "", "https_proxy": "" } node_b = { "PADDLE_TRAINER_ID": "1", - "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36004", + "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_b), "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36003,127.0.0.1:36004", + "PADDLE_TRAINER_ENDPOINTS": + "127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b), "http_proxy": "", "https_proxy": "" } @@ -187,24 +199,27 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): proc_a.start() proc_b = launch_func(node_func, node_b) proc_b.start() - proc_a.join() - proc_b.join() + wait([proc_a, proc_b]) def test_graph_execution_optimizer(self): + port_a = self._dist_ut_port_0 + 6 + port_b = self._dist_ut_port_1 + 6 node_a = { "PADDLE_TRAINER_ID": "0", - "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36001", + "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_a), "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002", + "PADDLE_TRAINER_ENDPOINTS": + "127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b), "http_proxy": "", "https_proxy": "" } node_b = { "PADDLE_TRAINER_ID": "1", - "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:36002", + "PADDLE_CURRENT_ENDPOINT": "127.0.0.1:{}".format(port_b), "PADDLE_TRAINERS_NUM": "2", - "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002", + "PADDLE_TRAINER_ENDPOINTS": + "127.0.0.1:{},127.0.0.1:{}".format(port_a, port_b), "http_proxy": "", "https_proxy": "" } @@ -253,8 +268,7 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): proc_a.start() proc_b = launch_func(node_func, node_b) proc_b.start() - proc_a.join() - proc_b.join() + wait([proc_a, proc_b]) if __name__ == "__main__": diff --git a/python/requirements.txt b/python/requirements.txt index 28c84c1d630a66077a737a48bb8a26200ec48f0b..5e81ec680897024e7c32d193bef1716e9b25b4a4 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -23,3 +23,4 @@ objgraph astor pathlib netifaces +psutil