未验证 提交 0f16ccf5 编写于 作者: Z zhaoyingli 提交者: GitHub

[Cherry-Pick] place all save/load path into temporary directory (#43316) (#43651)

* place all save/load path into temporary directory

* rm no need unittest
上级 bfe21ff3
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
import unittest import unittest
import time import time
import paddle.fluid as fluid import tempfile
import copy import copy
import os import os
import numpy as np import numpy as np
...@@ -128,9 +128,15 @@ def train(): ...@@ -128,9 +128,15 @@ def train():
engine.fit(dataset, engine.fit(dataset,
batch_size=batch_size, batch_size=batch_size,
steps_per_epoch=batch_num * batch_size) steps_per_epoch=batch_num * batch_size)
engine.save('./mlp')
engine.load('./mlp') # save
engine.save('./mlp_inf', training=False, mode='predict') temp_dir = tempfile.TemporaryDirectory()
model_filename0 = os.path.join(temp_dir.name, 'mlp')
model_filename1 = os.path.join(temp_dir.name, 'mlp_inf')
engine.save(model_filename0)
engine.load(model_filename0)
engine.save(model_filename1, training=False, mode='predict')
temp_dir.cleanup()
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import tempfile
import unittest import unittest
import os import os
import sys import sys
...@@ -77,15 +78,44 @@ cluster_json = """ ...@@ -77,15 +78,44 @@ cluster_json = """
} }
""" """
mapping_josn = """
[
{
"hostname": "machine1",
"addr": "127.0.0.1",
"port": "768",
"ranks":
{
"0": [1],
"1": [0]
}
}
]
"""
class TestAutoParallelReLaunch(unittest.TestCase): class TestAutoParallelReLaunch(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_relaunch(self): def test_relaunch(self):
file_dir = os.path.dirname(os.path.abspath(__file__)) cluster_json_path = os.path.join(self.temp_dir.name,
cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") "auto_parallel_cluster.json")
mapping_json_path = os.path.join(self.temp_dir.name,
"auto_parallel_rank_mapping.json")
cluster_json_object = json.loads(cluster_json) cluster_json_object = json.loads(cluster_json)
with open(cluster_json_path, "w") as cluster_json_file: with open(cluster_json_path, "w") as cluster_json_file:
json.dump(cluster_json_object, cluster_json_file) json.dump(cluster_json_object, cluster_json_file)
mapping_josn_object = json.loads(mapping_josn)
with open(mapping_json_path, "w") as mapping_josn_file:
json.dump(mapping_josn_object, mapping_josn_file)
file_dir = os.path.dirname(os.path.abspath(__file__))
launch_model_path = os.path.join(file_dir, launch_model_path = os.path.join(file_dir,
"auto_parallel_relaunch_model.py") "auto_parallel_relaunch_model.py")
...@@ -95,24 +125,15 @@ class TestAutoParallelReLaunch(unittest.TestCase): ...@@ -95,24 +125,15 @@ class TestAutoParallelReLaunch(unittest.TestCase):
coverage_args = [] coverage_args = []
cmd = [sys.executable, "-u"] + coverage_args + [ cmd = [sys.executable, "-u"] + coverage_args + [
"-m", "launch", "--cluster_topo_path", cluster_json_path, "-m", "launch", "--log_dir", self.temp_dir.name,
"--enable_auto_mapping", "True", launch_model_path "--cluster_topo_path", cluster_json_path, "--rank_mapping_path",
mapping_json_path, "--enable_auto_mapping", "True",
launch_model_path
] ]
process = subprocess.Popen(cmd) process = subprocess.Popen(cmd)
process.wait() process.wait()
self.assertEqual(process.returncode, 0) self.assertEqual(process.returncode, 0)
# Remove unnecessary files
if os.path.exists(cluster_json_path):
os.remove(cluster_json_path)
rank_mapping_json_path = os.path.join(file_dir,
"auto_parallel_rank_mapping.json")
if os.path.exists(rank_mapping_json_path):
os.remove(rank_mapping_json_path)
log_path = os.path.join(file_dir, "log")
if os.path.exists(log_path):
shutil.rmtree(log_path)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import tempfile
import unittest import unittest
import os import os
import sys import sys
...@@ -30,24 +31,17 @@ class TestEngineAPI(unittest.TestCase): ...@@ -30,24 +31,17 @@ class TestEngineAPI(unittest.TestCase):
else: else:
coverage_args = [] coverage_args = []
tmp_dir = tempfile.TemporaryDirectory()
cmd = [sys.executable, "-u"] + coverage_args + [ cmd = [sys.executable, "-u"] + coverage_args + [
"-m", "launch", "--gpus", "0,1", launch_model_path "-m", "launch", "--gpus", "0,1", "--log_dir", tmp_dir.name,
launch_model_path
] ]
process = subprocess.Popen(cmd) process = subprocess.Popen(cmd)
process.wait() process.wait()
self.assertEqual(process.returncode, 0) self.assertEqual(process.returncode, 0)
# Remove unnecessary files tmp_dir.cleanup()
log_path = os.path.join(file_dir, "log")
if os.path.exists(log_path):
shutil.rmtree(log_path)
files_path = [path for path in os.listdir('.') if '.pd' in path]
for path in files_path:
if os.path.exists(path):
os.remove(path)
if os.path.exists('rank_mapping.csv'):
os.remove('rank_mapping.csv')
def test_engine_predict(self): def test_engine_predict(self):
file_dir = os.path.dirname(os.path.abspath(__file__)) file_dir = os.path.dirname(os.path.abspath(__file__))
......
...@@ -13,6 +13,9 @@ ...@@ -13,6 +13,9 @@
# limitations under the License. # limitations under the License.
import unittest import unittest
import os
import json
import tempfile
import paddle import paddle
import paddle.distributed.auto_parallel.cost as cost_model import paddle.distributed.auto_parallel.cost as cost_model
...@@ -30,6 +33,12 @@ def check_cost(cost): ...@@ -30,6 +33,12 @@ def check_cost(cost):
class TestCost(unittest.TestCase): class TestCost(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_base_cost(self): def test_base_cost(self):
cost = cost_model.Cost(memory=100, flops=200, time=0.5) cost = cost_model.Cost(memory=100, flops=200, time=0.5)
self.assertTrue(check_cost(cost)) self.assertTrue(check_cost(cost))
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import tempfile
import unittest import unittest
import os import os
import sys import sys
...@@ -22,14 +23,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage ...@@ -22,14 +23,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage
class TestPlannerReLaunch(unittest.TestCase): class TestPlannerReLaunch(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_relaunch_with_planner(self): def test_relaunch_with_planner(self):
from test_auto_parallel_relaunch import cluster_json from test_auto_parallel_relaunch import cluster_json, mapping_josn
file_dir = os.path.dirname(os.path.abspath(__file__))
cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") cluster_json_path = os.path.join(self.temp_dir.name,
"auto_parallel_cluster.json")
mapping_json_path = os.path.join(self.temp_dir.name,
"auto_parallel_rank_mapping.json")
cluster_json_object = json.loads(cluster_json) cluster_json_object = json.loads(cluster_json)
with open(cluster_json_path, "w") as cluster_json_file: with open(cluster_json_path, "w") as cluster_json_file:
json.dump(cluster_json_object, cluster_json_file) json.dump(cluster_json_object, cluster_json_file)
mapping_json_object = json.loads(mapping_josn)
with open(mapping_json_path, "w") as mapping_json_file:
json.dump(mapping_json_object, mapping_json_file)
file_dir = os.path.dirname(os.path.abspath(__file__))
launch_model_path = os.path.join( launch_model_path = os.path.join(
file_dir, "auto_parallel_relaunch_with_gpt_planner.py") file_dir, "auto_parallel_relaunch_with_gpt_planner.py")
...@@ -39,24 +55,15 @@ class TestPlannerReLaunch(unittest.TestCase): ...@@ -39,24 +55,15 @@ class TestPlannerReLaunch(unittest.TestCase):
coverage_args = [] coverage_args = []
cmd = [sys.executable, "-u"] + coverage_args + [ cmd = [sys.executable, "-u"] + coverage_args + [
"-m", "launch", "--cluster_topo_path", cluster_json_path, "-m", "launch", "--log_dir", self.temp_dir.name,
"--enable_auto_mapping", "True", launch_model_path "--cluster_topo_path", cluster_json_path, "--rank_mapping_path",
mapping_json_path, "--enable_auto_mapping", "True",
launch_model_path
] ]
process = subprocess.Popen(cmd) process = subprocess.Popen(cmd)
process.wait() process.wait()
self.assertEqual(process.returncode, 0) self.assertEqual(process.returncode, 0)
# Remove unnecessary files
if os.path.exists(cluster_json_path):
os.remove(cluster_json_path)
rank_mapping_json_path = os.path.join(file_dir,
"auto_parallel_rank_mapping.json")
if os.path.exists(rank_mapping_json_path):
os.remove(rank_mapping_json_path)
log_path = os.path.join(file_dir, "log")
if os.path.exists(log_path):
shutil.rmtree(log_path)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import tempfile
import unittest import unittest
import os import os
import sys import sys
...@@ -22,14 +23,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage ...@@ -22,14 +23,29 @@ from paddle.distributed.fleet.launch_utils import run_with_coverage
class TestPlannerReLaunch(unittest.TestCase): class TestPlannerReLaunch(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_relaunch_with_planner(self): def test_relaunch_with_planner(self):
from test_auto_parallel_relaunch import cluster_json from test_auto_parallel_relaunch import cluster_json, mapping_josn
file_dir = os.path.dirname(os.path.abspath(__file__))
cluster_json_path = os.path.join(file_dir, "auto_parallel_cluster.json") cluster_json_path = os.path.join(self.temp_dir.name,
"auto_parallel_cluster.json")
mapping_json_path = os.path.join(self.temp_dir.name,
"auto_parallel_rank_mapping.json")
cluster_json_object = json.loads(cluster_json) cluster_json_object = json.loads(cluster_json)
with open(cluster_json_path, "w") as cluster_json_file: with open(cluster_json_path, "w") as cluster_json_file:
json.dump(cluster_json_object, cluster_json_file) json.dump(cluster_json_object, cluster_json_file)
mapping_json_object = json.loads(mapping_josn)
with open(mapping_json_path, "w") as mapping_json_file:
json.dump(mapping_json_object, mapping_json_file)
file_dir = os.path.dirname(os.path.abspath(__file__))
launch_model_path = os.path.join( launch_model_path = os.path.join(
file_dir, "auto_parallel_relaunch_with_planner.py") file_dir, "auto_parallel_relaunch_with_planner.py")
...@@ -39,24 +55,15 @@ class TestPlannerReLaunch(unittest.TestCase): ...@@ -39,24 +55,15 @@ class TestPlannerReLaunch(unittest.TestCase):
coverage_args = [] coverage_args = []
cmd = [sys.executable, "-u"] + coverage_args + [ cmd = [sys.executable, "-u"] + coverage_args + [
"-m", "launch", "--cluster_topo_path", cluster_json_path, "-m", "launch", "--log_dir", self.temp_dir.name,
"--enable_auto_mapping", "True", launch_model_path "--cluster_topo_path", cluster_json_path, "--rank_mapping_path",
mapping_json_path, "--enable_auto_mapping", "True",
launch_model_path
] ]
process = subprocess.Popen(cmd) process = subprocess.Popen(cmd)
process.wait() process.wait()
self.assertEqual(process.returncode, 0) self.assertEqual(process.returncode, 0)
# Remove unnecessary files
if os.path.exists(cluster_json_path):
os.remove(cluster_json_path)
rank_mapping_json_path = os.path.join(file_dir,
"auto_parallel_rank_mapping.json")
if os.path.exists(rank_mapping_json_path):
os.remove(rank_mapping_json_path)
log_path = os.path.join(file_dir, "log")
if os.path.exists(log_path):
shutil.rmtree(log_path)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
from __future__ import print_function from __future__ import print_function
import tempfile
import unittest import unittest
import os import os
import json import json
...@@ -200,15 +201,21 @@ cluster_json = """ ...@@ -200,15 +201,21 @@ cluster_json = """
class TestAutoParallelCluster(unittest.TestCase): class TestAutoParallelCluster(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_cluster(self): def test_cluster(self):
cluster_json_file = "" cluster_json_path = os.path.join(self.temp_dir.name,
"auto_parallel_cluster.json")
cluster_json_object = json.loads(cluster_json) cluster_json_object = json.loads(cluster_json)
with open("./auto_parallel_cluster.json", "w") as cluster_json_file: with open(cluster_json_path, "w") as cluster_json_file:
json.dump(cluster_json_object, cluster_json_file) json.dump(cluster_json_object, cluster_json_file)
cluster = Cluster() cluster = Cluster()
cluster.build_from_file("./auto_parallel_cluster.json") cluster.build_from_file(cluster_json_path)
os.remove("./auto_parallel_cluster.json")
self.assertEqual(len(cluster.get_all_devices("GPU")), 4) self.assertEqual(len(cluster.get_all_devices("GPU")), 4)
self.assertEqual(len(cluster.get_all_devices("CPU")), 2) self.assertEqual(len(cluster.get_all_devices("CPU")), 2)
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
from __future__ import print_function from __future__ import print_function
import tempfile
import unittest import unittest
import os import os
import json import json
...@@ -523,14 +524,20 @@ def get_device_local_ids(machine): ...@@ -523,14 +524,20 @@ def get_device_local_ids(machine):
class TestAutoParallelMapper(unittest.TestCase): class TestAutoParallelMapper(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_mapper_dp_mp_pp(self): def test_mapper_dp_mp_pp(self):
cluster_json_file = "" cluster_json_path = os.path.join(self.temp_dir.name,
"auto_parallel_cluster.json")
cluster_json_object = json.loads(cluster_json) cluster_json_object = json.loads(cluster_json)
with open("./auto_parallel_cluster.json", "w") as cluster_json_file: with open(cluster_json_path, "w") as cluster_json_file:
json.dump(cluster_json_object, cluster_json_file) json.dump(cluster_json_object, cluster_json_file)
cluster = Cluster() cluster = Cluster()
cluster.build_from_file("./auto_parallel_cluster.json") cluster.build_from_file(cluster_json_path)
os.remove("./auto_parallel_cluster.json")
global _global_parallel_strategy global _global_parallel_strategy
_global_parallel_strategy = "dp_mp_pp" _global_parallel_strategy = "dp_mp_pp"
......
...@@ -901,25 +901,6 @@ class TestGPTPartitioner(unittest.TestCase): ...@@ -901,25 +901,6 @@ class TestGPTPartitioner(unittest.TestCase):
auto_parallel_main_prog, auto_parallel_startup_prog, params_grads = partitioner.partition( auto_parallel_main_prog, auto_parallel_startup_prog, params_grads = partitioner.partition(
complete_train_program, startup_program, params_grads) complete_train_program, startup_program, params_grads)
with open("./test_auto_parallel_partitioner_serial_main_new.txt",
"w") as fw:
fw.write(str(train_program))
with open("./test_auto_parallel_partitioner_serial_startup_new.txt",
"w") as fw:
fw.write(str(startup_program))
from paddle.distributed.auto_parallel.dist_context import set_default_distributed_context
set_default_distributed_context(dist_context)
with open("./test_auto_parallel_partitioner_main_new.txt1", "w") as fw:
fw.write(str(auto_parallel_main_prog))
with open("./test_auto_parallel_partitioner_startup_new.txt1",
"w") as fw:
fw.write(str(auto_parallel_startup_prog))
# with open("./test_auto_parallel_partitioner_main_completed.txt", "w") as fw:
# from paddle.distributed.auto_parallel.completion import Completer
# completer = Completer()
# completer.complete_forward_annotation(auto_parallel_main_prog)
# fw.write(str(auto_parallel_main_prog))
nrank = 4 nrank = 4
# col parallel # col parallel
weights = [ weights = [
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册