diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py index 0d96c57c2437fac37daeb799bdc32bb27c16c7eb..f7a1a28aa91ca62fc1168657530b07e4c21d1186 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py @@ -14,7 +14,7 @@ import unittest import time -import paddle.fluid as fluid +import tempfile import copy import os import numpy as np @@ -145,7 +145,10 @@ def train(): engine.predict(test_dataset, batch_size, fetch_list=['label']) # save - engine.save('./mlp_inf', training=False, mode='predict') + temp_dir = tempfile.TemporaryDirectory() + model_filename = os.path.join(temp_dir.name, 'mlp_inf') + engine.save(model_filename, training=False, mode='predict') + temp_dir.cleanup() if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py index 4ff72173382da5552a0e7c6cbb7845f8a1b4a3d1..09ec5131402d094e6ddc14148d1de12d2b1d04de 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -77,16 +78,45 @@ cluster_json = """ } """ +mapping_josn = """ +[ + { + "hostname": "machine1", + "addr": "127.0.0.1", + "port": "768", + "ranks": + { + "0": [1], + "1": [0] + } + } +] +""" + class TestAutoParallelReLaunch(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_relaunch(self): - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") + mapping_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_rank_mapping.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) + mapping_josn_object = json.loads(mapping_josn) + with open(mapping_json_path, "w") as mapping_josn_file: + json.dump(mapping_josn_object, mapping_josn_file) + + file_dir = os.path.dirname(os.path.abspath(__file__)) launch_model_path = os.path.join(file_dir, "auto_parallel_relaunch_model.py") @@ -96,24 +126,15 @@ class TestAutoParallelReLaunch(unittest.TestCase): coverage_args = [] cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--cluster_topo_path", cluster_json_path, - "--enable_auto_mapping", "True", launch_model_path + "-m", "launch", "--log_dir", self.temp_dir.name, + "--cluster_topo_path", cluster_json_path, "--rank_mapping_path", + mapping_json_path, "--enable_auto_mapping", "True", + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - rank_mapping_json_path = os.path.join( - file_dir, "auto_parallel_rank_mapping.json") - if os.path.exists(rank_mapping_json_path): - os.remove(rank_mapping_json_path) - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py index 5b6f898d5b7d1c209622cf947f6951aded368f57..dd9b0110dbebddda4228c0126a59c5d7839d6b45 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import json @@ -1968,10 +1969,17 @@ multi_cluster_json = """{ class TestCluster(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_single_machine(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster_single.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -1989,14 +1997,10 @@ class TestCluster(unittest.TestCase): self.assertTrue(devices == [0, 1, 2, 3]) self.assertTrue(involved_machine_count == 1) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - def test_multi_machine(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster_multi.json") cluster_json_object = json.loads(multi_cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -2014,10 +2018,6 @@ class TestCluster(unittest.TestCase): self.assertTrue(devices == [5, 6, 7, 10]) self.assertTrue(involved_machine_count == 2) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py index 0d3f193e8bce86d2a93b7624769c412890d3422f..215385787880c3f6d92f2e63fc47651a0bc5f8f5 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py @@ -15,6 +15,7 @@ import unittest import os import json +import tempfile import paddle from paddle.distributed.auto_parallel.cluster import Cluster @@ -32,10 +33,16 @@ from test_cluster import cluster_json, multi_cluster_json class TestCommOpCost(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_comm_cost(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster0.json") cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -92,14 +99,10 @@ class TestCommOpCost(unittest.TestCase): comm_context=comm_context) self.assertTrue(identity_op_cost.time >= 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - def test_cross_machine_comm_cost(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster1.json") cluster_json_object = json.loads(multi_cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -151,10 +154,6 @@ class TestCommOpCost(unittest.TestCase): comm_context=comm_context) self.assertTrue(recv_op_cost.time > 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py index b8ad54cbb79e1714de5be72cfa842d420b59efd1..8d5051a3d48d4f022aeed22f1a38ee8de909e64e 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -31,24 +32,17 @@ class TestEngineAPI(unittest.TestCase): else: coverage_args = [] + tmp_dir = tempfile.TemporaryDirectory() cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--gpus", "0,1", launch_model_path + "-m", "launch", "--gpus", "0,1", "--log_dir", tmp_dir.name, + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - files_path = [path for path in os.listdir('.') if '.pd' in path] - for path in files_path: - if os.path.exists(path): - os.remove(path) - if os.path.exists('rank_mapping.csv'): - os.remove('rank_mapping.csv') + tmp_dir.cleanup() if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py index 911f20f11491290d7da24792071220a4cbbf3624..fe46131225759dad1c651db95ca658e25c02bd59 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py @@ -15,6 +15,7 @@ import unittest import os import json +import tempfile import paddle import paddle.distributed.auto_parallel.cost as cost_model @@ -36,6 +37,12 @@ def check_cost(cost): class TestCost(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_base_cost(self): cost = cost_model.Cost(memory=100, flops=200, time=0.5) self.assertTrue(check_cost(cost)) @@ -65,8 +72,8 @@ class TestCost(unittest.TestCase): def test_comm_cost(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -85,10 +92,6 @@ class TestCost(unittest.TestCase): op_desc=desc, comm_context=CommContext(cluster)) self.assertTrue(check_cost(allreduce_cost.cost)) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - def test_cost_estimator(self): train_program = paddle.static.Program() cost_estimator = cost_model.CostEstimator(train_program) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py index 88ad5f98bf7d2cb413ad630c2c241276452ace7a..bc1ebd6688edba5ca7b8cf51d26b6063f05ae563 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -23,14 +24,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage class TestPlannerReLaunch(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_relaunch_with_planner(self): - from test_auto_parallel_relaunch import cluster_json - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + from test_auto_parallel_relaunch import cluster_json, mapping_josn + + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") + mapping_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_rank_mapping.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) + mapping_json_object = json.loads(mapping_josn) + with open(mapping_json_path, "w") as mapping_json_file: + json.dump(mapping_json_object, mapping_json_file) + + file_dir = os.path.dirname(os.path.abspath(__file__)) launch_model_path = os.path.join( file_dir, "auto_parallel_relaunch_with_gpt_planner.py") @@ -40,28 +56,15 @@ class TestPlannerReLaunch(unittest.TestCase): coverage_args = [] cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--cluster_topo_path", cluster_json_path, - "--enable_auto_mapping", "True", launch_model_path + "-m", "launch", "--log_dir", self.temp_dir.name, + "--cluster_topo_path", cluster_json_path, "--rank_mapping_path", + mapping_json_path, "--enable_auto_mapping", "True", + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - rank_mapping_json_path = os.path.join( - file_dir, "auto_parallel_rank_mapping.json") - if os.path.exists(rank_mapping_json_path): - os.remove(rank_mapping_json_path) - files_path = [path for path in os.listdir('.') if '.pkl' in path] - for path in files_path: - if os.path.exists(path): - os.remove(path) - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py index b6fc0d7a1fa4121a44c2933db757a5fad06255ac..efcc313a2a4caab39adcfe919317f86e98d3347c 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -23,14 +24,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage class TestPlannerReLaunch(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_relaunch_with_planner(self): - from test_auto_parallel_relaunch import cluster_json - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + from test_auto_parallel_relaunch import cluster_json, mapping_josn + + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") + mapping_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_rank_mapping.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) + mapping_json_object = json.loads(mapping_josn) + with open(mapping_json_path, "w") as mapping_json_file: + json.dump(mapping_json_object, mapping_json_file) + + file_dir = os.path.dirname(os.path.abspath(__file__)) launch_model_path = os.path.join( file_dir, "auto_parallel_relaunch_with_planner.py") @@ -40,24 +56,15 @@ class TestPlannerReLaunch(unittest.TestCase): coverage_args = [] cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--cluster_topo_path", cluster_json_path, - "--enable_auto_mapping", "True", launch_model_path + "-m", "launch", "--log_dir", self.temp_dir.name, + "--cluster_topo_path", cluster_json_path, "--rank_mapping_path", + mapping_json_path, "--enable_auto_mapping", "True", + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - rank_mapping_json_path = os.path.join( - file_dir, "auto_parallel_rank_mapping.json") - if os.path.exists(rank_mapping_json_path): - os.remove(rank_mapping_json_path) - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py index 7ef5516bc047ea46c2cfa6d61395a11d23c290f9..176f41934160620ac65743ce23dcf21d362c7393 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py @@ -14,6 +14,7 @@ from __future__ import print_function +import tempfile import unittest import os import json @@ -201,15 +202,21 @@ cluster_json = """ class TestAutoParallelCluster(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_cluster(self): - cluster_json_file = "" + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") cluster_json_object = json.loads(cluster_json) - with open("./auto_parallel_cluster.json", "w") as cluster_json_file: + with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) cluster = Cluster() - cluster.build_from_file("./auto_parallel_cluster.json") - os.remove("./auto_parallel_cluster.json") + cluster.build_from_file(cluster_json_path) self.assertEqual(len(cluster.get_all_devices("GPU")), 4) self.assertEqual(len(cluster.get_all_devices("CPU")), 2) diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py index a147b0f1f376a49dab64848ff077a59fff4b0d30..36923212fdfa168624c63ddf1436f7b99887d4ae 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py @@ -14,6 +14,7 @@ from __future__ import print_function +import tempfile import unittest import os import json @@ -527,14 +528,20 @@ def get_device_local_ids(machine): class TestAutoParallelMapper(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_mapper_dp_mp_pp(self): - cluster_json_file = "" + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") cluster_json_object = json.loads(cluster_json) - with open("./auto_parallel_cluster.json", "w") as cluster_json_file: + with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) cluster = Cluster() - cluster.build_from_file("./auto_parallel_cluster.json") - os.remove("./auto_parallel_cluster.json") + cluster.build_from_file(cluster_json_path) global _global_parallel_strategy _global_parallel_strategy = "dp_mp_pp" diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py index 96738a466626e5cbcadc4b775fe5396b8e2ac657..00ba2151fcba51aea9b8125b72f95a523f1a8dd3 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py @@ -892,25 +892,6 @@ class TestGPTPartitioner(unittest.TestCase): auto_parallel_main_prog, auto_parallel_startup_prog, params_grads = partitioner.partition( complete_train_program, startup_program, params_grads) - with open("./test_auto_parallel_partitioner_serial_main_new.txt", - "w") as fw: - fw.write(str(train_program)) - with open("./test_auto_parallel_partitioner_serial_startup_new.txt", - "w") as fw: - fw.write(str(startup_program)) - - from paddle.distributed.auto_parallel.dist_context import set_default_distributed_context - set_default_distributed_context(dist_context) - with open("./test_auto_parallel_partitioner_main_new.txt1", "w") as fw: - fw.write(str(auto_parallel_main_prog)) - with open("./test_auto_parallel_partitioner_startup_new.txt1", - "w") as fw: - fw.write(str(auto_parallel_startup_prog)) - # with open("./test_auto_parallel_partitioner_main_completed.txt", "w") as fw: - # from paddle.distributed.auto_parallel.completion import Completer - # completer = Completer() - # completer.complete_forward_annotation(auto_parallel_main_prog) - # fw.write(str(auto_parallel_main_prog)) nrank = 4 # col parallel weights = [