From 2c8739e8ea9c16396470d1e68d16332bb8677b33 Mon Sep 17 00:00:00 2001 From: zhaoyingli <86812880+zhaoyinglia@users.noreply.github.com> Date: Thu, 9 Jun 2022 16:07:29 +0800 Subject: [PATCH] use tempfile to place temporary files (#43316) --- .../unittests/auto_parallel/engine_api.py | 7 ++- .../test_auto_parallel_relaunch.py | 51 +++++++++++++------ .../unittests/auto_parallel/test_cluster.py | 24 ++++----- .../unittests/auto_parallel/test_comm_cost.py | 23 ++++----- .../auto_parallel/test_engine_api.py | 16 ++---- .../auto_parallel/test_new_cost_model.py | 15 +++--- .../test_relaunch_with_gpt_planner.py | 43 ++++++++-------- .../test_relaunch_with_planner.py | 39 ++++++++------ .../unittests/test_auto_parallel_cluster.py | 15 ++++-- .../unittests/test_auto_parallel_mapper.py | 15 ++++-- .../test_auto_parallel_partitioner_gpt.py | 19 ------- 11 files changed, 146 insertions(+), 121 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py index 0d96c57c243..f7a1a28aa91 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/engine_api.py @@ -14,7 +14,7 @@ import unittest import time -import paddle.fluid as fluid +import tempfile import copy import os import numpy as np @@ -145,7 +145,10 @@ def train(): engine.predict(test_dataset, batch_size, fetch_list=['label']) # save - engine.save('./mlp_inf', training=False, mode='predict') + temp_dir = tempfile.TemporaryDirectory() + model_filename = os.path.join(temp_dir.name, 'mlp_inf') + engine.save(model_filename, training=False, mode='predict') + temp_dir.cleanup() if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py index 4ff72173382..09ec5131402 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_auto_parallel_relaunch.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -77,16 +78,45 @@ cluster_json = """ } """ +mapping_josn = """ +[ + { + "hostname": "machine1", + "addr": "127.0.0.1", + "port": "768", + "ranks": + { + "0": [1], + "1": [0] + } + } +] +""" + class TestAutoParallelReLaunch(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_relaunch(self): - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") + mapping_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_rank_mapping.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) + mapping_josn_object = json.loads(mapping_josn) + with open(mapping_json_path, "w") as mapping_josn_file: + json.dump(mapping_josn_object, mapping_josn_file) + + file_dir = os.path.dirname(os.path.abspath(__file__)) launch_model_path = os.path.join(file_dir, "auto_parallel_relaunch_model.py") @@ -96,24 +126,15 @@ class TestAutoParallelReLaunch(unittest.TestCase): coverage_args = [] cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--cluster_topo_path", cluster_json_path, - "--enable_auto_mapping", "True", launch_model_path + "-m", "launch", "--log_dir", self.temp_dir.name, + "--cluster_topo_path", cluster_json_path, "--rank_mapping_path", + mapping_json_path, "--enable_auto_mapping", "True", + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - rank_mapping_json_path = os.path.join( - file_dir, "auto_parallel_rank_mapping.json") - if os.path.exists(rank_mapping_json_path): - os.remove(rank_mapping_json_path) - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py index 5b6f898d5b7..dd9b0110dbe 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_cluster.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import json @@ -1968,10 +1969,17 @@ multi_cluster_json = """{ class TestCluster(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_single_machine(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster_single.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -1989,14 +1997,10 @@ class TestCluster(unittest.TestCase): self.assertTrue(devices == [0, 1, 2, 3]) self.assertTrue(involved_machine_count == 1) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - def test_multi_machine(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster_multi.json") cluster_json_object = json.loads(multi_cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -2014,10 +2018,6 @@ class TestCluster(unittest.TestCase): self.assertTrue(devices == [5, 6, 7, 10]) self.assertTrue(involved_machine_count == 2) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py index 0d3f193e8bc..21538578788 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_comm_cost.py @@ -15,6 +15,7 @@ import unittest import os import json +import tempfile import paddle from paddle.distributed.auto_parallel.cluster import Cluster @@ -32,10 +33,16 @@ from test_cluster import cluster_json, multi_cluster_json class TestCommOpCost(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_comm_cost(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster0.json") cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -92,14 +99,10 @@ class TestCommOpCost(unittest.TestCase): comm_context=comm_context) self.assertTrue(identity_op_cost.time >= 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - def test_cross_machine_comm_cost(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster1.json") cluster_json_object = json.loads(multi_cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -151,10 +154,6 @@ class TestCommOpCost(unittest.TestCase): comm_context=comm_context) self.assertTrue(recv_op_cost.time > 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py index b8ad54cbb79..8d5051a3d48 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_engine_api.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -31,24 +32,17 @@ class TestEngineAPI(unittest.TestCase): else: coverage_args = [] + tmp_dir = tempfile.TemporaryDirectory() cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--gpus", "0,1", launch_model_path + "-m", "launch", "--gpus", "0,1", "--log_dir", tmp_dir.name, + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - files_path = [path for path in os.listdir('.') if '.pd' in path] - for path in files_path: - if os.path.exists(path): - os.remove(path) - if os.path.exists('rank_mapping.csv'): - os.remove('rank_mapping.csv') + tmp_dir.cleanup() if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py index 911f20f1149..fe461312257 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_new_cost_model.py @@ -15,6 +15,7 @@ import unittest import os import json +import tempfile import paddle import paddle.distributed.auto_parallel.cost as cost_model @@ -36,6 +37,12 @@ def check_cost(cost): class TestCost(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_base_cost(self): cost = cost_model.Cost(memory=100, flops=200, time=0.5) self.assertTrue(check_cost(cost)) @@ -65,8 +72,8 @@ class TestCost(unittest.TestCase): def test_comm_cost(self): # Build cluster - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) @@ -85,10 +92,6 @@ class TestCost(unittest.TestCase): op_desc=desc, comm_context=CommContext(cluster)) self.assertTrue(check_cost(allreduce_cost.cost)) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - def test_cost_estimator(self): train_program = paddle.static.Program() cost_estimator = cost_model.CostEstimator(train_program) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py index 88ad5f98bf7..bc1ebd6688e 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_gpt_planner.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -23,14 +24,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage class TestPlannerReLaunch(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_relaunch_with_planner(self): - from test_auto_parallel_relaunch import cluster_json - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + from test_auto_parallel_relaunch import cluster_json, mapping_josn + + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") + mapping_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_rank_mapping.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) + mapping_json_object = json.loads(mapping_josn) + with open(mapping_json_path, "w") as mapping_json_file: + json.dump(mapping_json_object, mapping_json_file) + + file_dir = os.path.dirname(os.path.abspath(__file__)) launch_model_path = os.path.join( file_dir, "auto_parallel_relaunch_with_gpt_planner.py") @@ -40,28 +56,15 @@ class TestPlannerReLaunch(unittest.TestCase): coverage_args = [] cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--cluster_topo_path", cluster_json_path, - "--enable_auto_mapping", "True", launch_model_path + "-m", "launch", "--log_dir", self.temp_dir.name, + "--cluster_topo_path", cluster_json_path, "--rank_mapping_path", + mapping_json_path, "--enable_auto_mapping", "True", + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - rank_mapping_json_path = os.path.join( - file_dir, "auto_parallel_rank_mapping.json") - if os.path.exists(rank_mapping_json_path): - os.remove(rank_mapping_json_path) - files_path = [path for path in os.listdir('.') if '.pkl' in path] - for path in files_path: - if os.path.exists(path): - os.remove(path) - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py index b6fc0d7a1fa..efcc313a2a4 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_relaunch_with_planner.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import unittest import os import sys @@ -23,14 +24,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage class TestPlannerReLaunch(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_relaunch_with_planner(self): - from test_auto_parallel_relaunch import cluster_json - file_dir = os.path.dirname(os.path.abspath(__file__)) - cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") + from test_auto_parallel_relaunch import cluster_json, mapping_josn + + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") + mapping_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_rank_mapping.json") + cluster_json_object = json.loads(cluster_json) with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) + mapping_json_object = json.loads(mapping_josn) + with open(mapping_json_path, "w") as mapping_json_file: + json.dump(mapping_json_object, mapping_json_file) + + file_dir = os.path.dirname(os.path.abspath(__file__)) launch_model_path = os.path.join( file_dir, "auto_parallel_relaunch_with_planner.py") @@ -40,24 +56,15 @@ class TestPlannerReLaunch(unittest.TestCase): coverage_args = [] cmd = [sys.executable, "-u"] + coverage_args + [ - "-m", "launch", "--cluster_topo_path", cluster_json_path, - "--enable_auto_mapping", "True", launch_model_path + "-m", "launch", "--log_dir", self.temp_dir.name, + "--cluster_topo_path", cluster_json_path, "--rank_mapping_path", + mapping_json_path, "--enable_auto_mapping", "True", + launch_model_path ] process = subprocess.Popen(cmd) process.wait() self.assertEqual(process.returncode, 0) - # Remove unnecessary files - if os.path.exists(cluster_json_path): - os.remove(cluster_json_path) - rank_mapping_json_path = os.path.join( - file_dir, "auto_parallel_rank_mapping.json") - if os.path.exists(rank_mapping_json_path): - os.remove(rank_mapping_json_path) - log_path = os.path.join(file_dir, "log") - if os.path.exists(log_path): - shutil.rmtree(log_path) - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py index 7ef5516bc04..176f4193416 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_cluster.py @@ -14,6 +14,7 @@ from __future__ import print_function +import tempfile import unittest import os import json @@ -201,15 +202,21 @@ cluster_json = """ class TestAutoParallelCluster(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_cluster(self): - cluster_json_file = "" + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") cluster_json_object = json.loads(cluster_json) - with open("./auto_parallel_cluster.json", "w") as cluster_json_file: + with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) cluster = Cluster() - cluster.build_from_file("./auto_parallel_cluster.json") - os.remove("./auto_parallel_cluster.json") + cluster.build_from_file(cluster_json_path) self.assertEqual(len(cluster.get_all_devices("GPU")), 4) self.assertEqual(len(cluster.get_all_devices("CPU")), 2) diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py index a147b0f1f37..36923212fdf 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_mapper.py @@ -14,6 +14,7 @@ from __future__ import print_function +import tempfile import unittest import os import json @@ -527,14 +528,20 @@ def get_device_local_ids(machine): class TestAutoParallelMapper(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + + def tearDown(self): + self.temp_dir.cleanup() + def test_mapper_dp_mp_pp(self): - cluster_json_file = "" + cluster_json_path = os.path.join(self.temp_dir.name, + "auto_parallel_cluster.json") cluster_json_object = json.loads(cluster_json) - with open("./auto_parallel_cluster.json", "w") as cluster_json_file: + with open(cluster_json_path, "w") as cluster_json_file: json.dump(cluster_json_object, cluster_json_file) cluster = Cluster() - cluster.build_from_file("./auto_parallel_cluster.json") - os.remove("./auto_parallel_cluster.json") + cluster.build_from_file(cluster_json_path) global _global_parallel_strategy _global_parallel_strategy = "dp_mp_pp" diff --git a/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py b/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py index 96738a46662..00ba2151fcb 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py +++ b/python/paddle/fluid/tests/unittests/test_auto_parallel_partitioner_gpt.py @@ -892,25 +892,6 @@ class TestGPTPartitioner(unittest.TestCase): auto_parallel_main_prog, auto_parallel_startup_prog, params_grads = partitioner.partition( complete_train_program, startup_program, params_grads) - with open("./test_auto_parallel_partitioner_serial_main_new.txt", - "w") as fw: - fw.write(str(train_program)) - with open("./test_auto_parallel_partitioner_serial_startup_new.txt", - "w") as fw: - fw.write(str(startup_program)) - - from paddle.distributed.auto_parallel.dist_context import set_default_distributed_context - set_default_distributed_context(dist_context) - with open("./test_auto_parallel_partitioner_main_new.txt1", "w") as fw: - fw.write(str(auto_parallel_main_prog)) - with open("./test_auto_parallel_partitioner_startup_new.txt1", - "w") as fw: - fw.write(str(auto_parallel_startup_prog)) - # with open("./test_auto_parallel_partitioner_main_completed.txt", "w") as fw: - # from paddle.distributed.auto_parallel.completion import Completer - # completer = Completer() - # completer.complete_forward_annotation(auto_parallel_main_prog) - # fw.write(str(auto_parallel_main_prog)) nrank = 4 # col parallel weights = [ -- GitLab