未验证 提交 a5ccc713 编写于 作者: Z zhaoyingli 提交者: GitHub

place all save/load path into temporary directory (#43652)

上级 0f16ccf5
...@@ -23,6 +23,7 @@ import subprocess ...@@ -23,6 +23,7 @@ import subprocess
import traceback import traceback
import functools import functools
import pickle import pickle
import tempfile
from contextlib import closing from contextlib import closing
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -97,6 +98,11 @@ class TestDistBase(unittest.TestCase): ...@@ -97,6 +98,11 @@ class TestDistBase(unittest.TestCase):
self._find_free_port(), self._find_free_port()) self._find_free_port(), self._find_free_port())
self._python_interp = sys.executable self._python_interp = sys.executable
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def _find_free_port(self): def _find_free_port(self):
def __free_port(): def __free_port():
with closing(socket.socket(socket.AF_INET, with closing(socket.socket(socket.AF_INET,
...@@ -155,9 +161,13 @@ class TestDistBase(unittest.TestCase): ...@@ -155,9 +161,13 @@ class TestDistBase(unittest.TestCase):
tr_cmd = "%s %s" tr_cmd = "%s %s"
tr0_cmd = tr_cmd % (self._python_interp, model_file) tr0_cmd = tr_cmd % (self._python_interp, model_file)
tr1_cmd = tr_cmd % (self._python_interp, model_file) tr1_cmd = tr_cmd % (self._python_interp, model_file)
tr0_pipe = open("/tmp/tr0_err_%d.log" % os.getpid(), "w") path0 = os.path.join(self.temp_dir.name,
tr1_pipe = open("/tmp/tr1_err_%d.log" % os.getpid(), "w") "/tmp/tr0_err_%d.log" % os.getpid())
#print(tr0_cmd) path1 = os.path.join(self.temp_dir.name,
"/tmp/tr1_err_%d.log" % os.getpid())
tr0_pipe = open(path0, "w")
tr1_pipe = open(path1, "w")
#print(tr0_cmd)
tr0_proc = subprocess.Popen( tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(), tr0_cmd.strip().split(),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -177,9 +187,9 @@ class TestDistBase(unittest.TestCase): ...@@ -177,9 +187,9 @@ class TestDistBase(unittest.TestCase):
# close trainer file # close trainer file
tr0_pipe.close() tr0_pipe.close()
tr1_pipe.close() tr1_pipe.close()
with open("/tmp/tr0_err_%d.log" % os.getpid(), "r") as f: with open(path0, "r") as f:
sys.stderr.write('trainer 0 stderr file: %s\n' % f.read()) sys.stderr.write('trainer 0 stderr file: %s\n' % f.read())
with open("/tmp/tr1_err_%d.log" % os.getpid(), "r") as f: with open(path1, "r") as f:
sys.stderr.write('trainer 1 stderr file: %s\n' % f.read()) sys.stderr.write('trainer 1 stderr file: %s\n' % f.read())
return pickle.loads(tr0_out), pickle.loads( return pickle.loads(tr0_out), pickle.loads(
tr1_out), tr0_proc.pid, tr1_proc.pid tr1_out), tr0_proc.pid, tr1_proc.pid
......
...@@ -23,6 +23,7 @@ import subprocess ...@@ -23,6 +23,7 @@ import subprocess
import traceback import traceback
import functools import functools
import pickle import pickle
import tempfile
from contextlib import closing from contextlib import closing
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.unique_name as nameGen import paddle.fluid.unique_name as nameGen
...@@ -145,6 +146,11 @@ class TestDistBase(unittest.TestCase): ...@@ -145,6 +146,11 @@ class TestDistBase(unittest.TestCase):
self._find_free_port(), self._find_free_port()) self._find_free_port(), self._find_free_port())
self._python_interp = sys.executable self._python_interp = sys.executable
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def _find_free_port(self): def _find_free_port(self):
def __free_port(): def __free_port():
with closing(socket.socket(socket.AF_INET, with closing(socket.socket(socket.AF_INET,
...@@ -183,9 +189,11 @@ class TestDistBase(unittest.TestCase): ...@@ -183,9 +189,11 @@ class TestDistBase(unittest.TestCase):
tr_cmd = "%s %s" tr_cmd = "%s %s"
tr0_cmd = tr_cmd % (self._python_interp, model_file) tr0_cmd = tr_cmd % (self._python_interp, model_file)
tr1_cmd = tr_cmd % (self._python_interp, model_file) tr1_cmd = tr_cmd % (self._python_interp, model_file)
tr0_pipe = open("/tmp/tr0_err.log", "wb") path0 = os.path.join(self.temp_dir.name, "/tmp/tr0_err.log")
tr1_pipe = open("/tmp/tr1_err.log", "wb") path1 = os.path.join(self.temp_dir.name, "/tmp/tr1_err.log")
#print(tr0_cmd) tr0_pipe = open(path0, "wb")
tr1_pipe = open(path1, "wb")
#print(tr0_cmd)
tr0_proc = subprocess.Popen( tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(), tr0_cmd.strip().split(),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
......
...@@ -19,6 +19,7 @@ import unittest ...@@ -19,6 +19,7 @@ import unittest
import time import time
import threading import threading
import numpy import numpy
import tempfile
import paddle import paddle
paddle.enable_static() paddle.enable_static()
...@@ -30,7 +31,9 @@ import paddle.distributed.fleet as fleet ...@@ -30,7 +31,9 @@ import paddle.distributed.fleet as fleet
class TestCommunicator(unittest.TestCase): class TestCommunicator(unittest.TestCase):
def test_communicator_ps_gpu(self): def test_communicator_ps_gpu(self):
with open("test_communicator_ps_gpu.txt", "w") as f: temp_dir = tempfile.TemporaryDirectory()
path = os.path.join(temp_dir.name, "test_communicator_ps_gpu.txt")
with open(path, "w") as f:
data = "1 0.6 1 0.7\n" data = "1 0.6 1 0.7\n"
f.write(data) f.write(data)
...@@ -90,7 +93,7 @@ class TestCommunicator(unittest.TestCase): ...@@ -90,7 +93,7 @@ class TestCommunicator(unittest.TestCase):
self.assertTrue(False) self.assertTrue(False)
time.sleep(10) time.sleep(10)
fleet.stop_worker() fleet.stop_worker()
os.remove("./test_communicator_ps_gpu.txt") temp_dir.cleanup()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import time import tempfile
import ast import ast
import unittest import unittest
...@@ -867,6 +867,11 @@ class TestDistBase(unittest.TestCase): ...@@ -867,6 +867,11 @@ class TestDistBase(unittest.TestCase):
self._after_setup_config() self._after_setup_config()
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def _find_free_port(self): def _find_free_port(self):
def __free_port(): def __free_port():
with closing(socket.socket(socket.AF_INET, with closing(socket.socket(socket.AF_INET,
...@@ -909,8 +914,10 @@ class TestDistBase(unittest.TestCase): ...@@ -909,8 +914,10 @@ class TestDistBase(unittest.TestCase):
print(ps0_cmd) print(ps0_cmd)
print(ps1_cmd) print(ps1_cmd)
ps0_pipe = open(log_name + "_ps0_err.log", "wb") path0 = os.path.join(self.temp_dir.name, log_name + "_ps0_err.log")
ps1_pipe = open(log_name + "_ps1_err.log", "wb") path1 = os.path.join(self.temp_dir.name, log_name + "_ps1_err.log")
ps0_pipe = open(path0, "wb")
ps1_pipe = open(path1, "wb")
print_to_err(type(self).__name__, "going to start pserver process 0") print_to_err(type(self).__name__, "going to start pserver process 0")
ps0_proc = subprocess.Popen( ps0_proc = subprocess.Popen(
...@@ -990,7 +997,8 @@ class TestDistBase(unittest.TestCase): ...@@ -990,7 +997,8 @@ class TestDistBase(unittest.TestCase):
print("local_cmd: {}, env: {}".format(cmd, env_local)) print("local_cmd: {}, env: {}".format(cmd, env_local))
if check_error_log: if check_error_log:
err_log = open(log_name + "_local.log", "wb") path = os.path.join(self.temp_dir.name, log_name + "_local.log")
err_log = open(path, "wb")
local_proc = subprocess.Popen( local_proc = subprocess.Popen(
cmd.split(" "), cmd.split(" "),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -1076,8 +1084,11 @@ class TestDistBase(unittest.TestCase): ...@@ -1076,8 +1084,11 @@ class TestDistBase(unittest.TestCase):
print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0)) print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1)) print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
tr0_pipe = open(log_name + "_tr0_err.log", "wb")
tr1_pipe = open(log_name + "_tr1_err.log", "wb") path0 = os.path.join(self.temp_dir.name, log_name + "_tr0_err.log")
path1 = os.path.join(self.temp_dir.name, log_name + "_tr1_err.log")
tr0_pipe = open(path0, "wb")
tr1_pipe = open(path1, "wb")
print_to_err(type(self).__name__, "going to start trainer process 0") print_to_err(type(self).__name__, "going to start trainer process 0")
tr0_proc = subprocess.Popen( tr0_proc = subprocess.Popen(
...@@ -1293,7 +1304,9 @@ class TestDistBase(unittest.TestCase): ...@@ -1293,7 +1304,9 @@ class TestDistBase(unittest.TestCase):
print("use_hallreduce:{} tr_cmd:{}, env: {}".format( print("use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env)) self._use_hallreduce, tr_cmd, tr_env))
tr_pipe = open(log_name + "_tr{}_err.log".format(i), "wb") path = os.path.join(self.temp_dir.name,
log_name + "_tr{}_err.log".format(i))
tr_pipe = open(path, "wb")
print_to_err( print_to_err(
type(self).__name__, type(self).__name__,
...@@ -1355,7 +1368,9 @@ class TestDistBase(unittest.TestCase): ...@@ -1355,7 +1368,9 @@ class TestDistBase(unittest.TestCase):
print("use_hallreduce:{} tr_cmd:{}, env: {}".format( print("use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env)) self._use_hallreduce, tr_cmd, tr_env))
tr_pipe = open(log_name + "_tr{}_err.log".format(i), "wb") path = os.path.join(self.temp_dir.name,
log_name + "_tr{}_err.log".format(i))
tr_pipe = open(path, "wb")
print_to_err( print_to_err(
type(self).__name__, type(self).__name__,
...@@ -1401,7 +1416,8 @@ class TestDistBase(unittest.TestCase): ...@@ -1401,7 +1416,8 @@ class TestDistBase(unittest.TestCase):
tr_env['FLAGS_cudnn_deterministic'] = '0' tr_env['FLAGS_cudnn_deterministic'] = '0'
print("tr_cmd:{}, env: {}".format(tr_cmd, tr_env)) print("tr_cmd:{}, env: {}".format(tr_cmd, tr_env))
tr_pipe = open("/tmp/" + "tr{}_err.log".format(i), "wb") path = os.path.join(self.temp_dir.name + "tr{}_err.log".format(i))
tr_pipe = open(path, "wb")
print_to_err( print_to_err(
type(self).__name__, type(self).__name__,
......
...@@ -57,13 +57,15 @@ class TestDistMnistNCCL2DGC(TestDistBase): ...@@ -57,13 +57,15 @@ class TestDistMnistNCCL2DGC(TestDistBase):
def tearDown(self): def tearDown(self):
import paddle.fluid as fluid import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
result = count_of_sparse_all_reduce_calls( log_file = os.path.join(self.temp_dir.name,
'test_dist_mnist_dgc_nccl_tr0_err.log') 'test_dist_mnist_dgc_nccl_tr0_err.log')
result = count_of_sparse_all_reduce_calls(log_file)
# only 1 layer use dgc now, run_step=5, rampup_begin_step=2, so 1 * (5 - 2) = 3 # only 1 layer use dgc now, run_step=5, rampup_begin_step=2, so 1 * (5 - 2) = 3
# temp close this test. In python3 CI, the log is right, but the result # temp close this test. In python3 CI, the log is right, but the result
# has a problem, may be in multi process mode, log is not written in time. # has a problem, may be in multi process mode, log is not written in time.
# self.assertEqual(result, 3) # self.assertEqual(result, 3)
self.temp_dir.cleanup()
class TestDistMnistNCCL2DGCMultiCards(TestDistBase): class TestDistMnistNCCL2DGCMultiCards(TestDistBase):
...@@ -86,10 +88,13 @@ class TestDistMnistNCCL2DGCMultiCards(TestDistBase): ...@@ -86,10 +88,13 @@ class TestDistMnistNCCL2DGCMultiCards(TestDistBase):
def tearDown(self): def tearDown(self):
import paddle.fluid as fluid import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda(): if fluid.core.is_compiled_with_cuda():
result = count_of_sparse_all_reduce_calls( log_file = os.path.join(
self.temp_dir.name,
'test_dist_mnist_dgc_nccl_dgc_2cards_local.log') 'test_dist_mnist_dgc_nccl_dgc_2cards_local.log')
result = count_of_sparse_all_reduce_calls(log_file)
# same as above, but use two cards # same as above, but use two cards
self.assertEqual(result, 6) self.assertEqual(result, 6)
self.temp_dir.cleanup()
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import tempfile
import unittest import unittest
from paddle.dataset.common import download, DATA_HOME from paddle.dataset.common import download, DATA_HOME
from paddle.distributed.fleet.dataset import TreeIndex from paddle.distributed.fleet.dataset import TreeIndex
...@@ -102,6 +104,12 @@ class TestTreeIndex(unittest.TestCase): ...@@ -102,6 +104,12 @@ class TestTreeIndex(unittest.TestCase):
class TestIndexSampler(unittest.TestCase): class TestIndexSampler(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_layerwise_sampler(self): def test_layerwise_sampler(self):
path = download( path = download(
"https://paddlerec.bj.bcebos.com/tree-based/data/mini_tree.pb", "https://paddlerec.bj.bcebos.com/tree-based/data/mini_tree.pb",
...@@ -109,7 +117,8 @@ class TestIndexSampler(unittest.TestCase): ...@@ -109,7 +117,8 @@ class TestIndexSampler(unittest.TestCase):
tdm_layer_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] tdm_layer_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
#tree = TreeIndex("demo", path) #tree = TreeIndex("demo", path)
file_name = "test_in_memory_dataset_tdm_sample_run.txt" file_name = os.path.join(self.temp_dir.name,
"test_in_memory_dataset_tdm_sample_run.txt")
with open(file_name, "w") as f: with open(file_name, "w") as f:
#data = "29 d 29 d 29 29 29 29 29 29 29 29 29 29 29 29\n" #data = "29 d 29 d 29 29 29 29 29 29 29 29 29 29 29 29\n"
data = "1 1 1 15 15 15\n" data = "1 1 1 15 15 15\n"
......
...@@ -18,6 +18,7 @@ import sys ...@@ -18,6 +18,7 @@ import sys
import shutil import shutil
import unittest import unittest
import paddle import paddle
import tempfile
def get_test_file(): def get_test_file():
...@@ -35,12 +36,13 @@ def remove_file_if_exists(file_name): ...@@ -35,12 +36,13 @@ def remove_file_if_exists(file_name):
def run_test(clip_after_allreduce=True, max_global_norm=-1.0): def run_test(clip_after_allreduce=True, max_global_norm=-1.0):
temp_dir = tempfile.TemporaryDirectory()
if not paddle.is_compiled_with_cuda(): if not paddle.is_compiled_with_cuda():
return return
if os.name == 'nt': if os.name == 'nt':
return return
args = locals() args = locals()
log_dir = 'log_{}'.format(os.getpid()) log_dir = os.path.join(temp_dir.name, 'log_{}'.format(os.getpid()))
cmd = [ cmd = [
sys.executable, sys.executable,
'-u', '-u',
...@@ -57,15 +59,15 @@ def run_test(clip_after_allreduce=True, max_global_norm=-1.0): ...@@ -57,15 +59,15 @@ def run_test(clip_after_allreduce=True, max_global_norm=-1.0):
os.environ['MAX_GLOBAL_NORM'] = str(max_global_norm) os.environ['MAX_GLOBAL_NORM'] = str(max_global_norm)
touch_file_env = 'SUCCESS_TOUCH_FILE' touch_file_env = 'SUCCESS_TOUCH_FILE'
touch_file_name = 'distributed_fused_lamb_touch_file_{}'.format(os.getpid()) touch_file_name = os.path.join(
temp_dir.name,
'distributed_fused_lamb_touch_file_{}'.format(os.getpid()))
os.environ[touch_file_env] = touch_file_name os.environ[touch_file_env] = touch_file_name
remove_file_if_exists(touch_file_name)
try: try:
assert os.system(cmd) == 0 and os.path.exists( assert os.system(cmd) == 0 and os.path.exists(
touch_file_name), 'Test failed when {}'.format(args) touch_file_name), 'Test failed when {}'.format(args)
finally: finally:
remove_file_if_exists(touch_file_name) temp_dir.cleanup()
remove_file_if_exists(log_dir)
class TestDistributedFusedLambWithClip(unittest.TestCase): class TestDistributedFusedLambWithClip(unittest.TestCase):
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import tempfile
import unittest import unittest
import paddle import paddle
paddle.enable_static() paddle.enable_static()
...@@ -69,14 +70,17 @@ class TestFleetBase(unittest.TestCase): ...@@ -69,14 +70,17 @@ class TestFleetBase(unittest.TestCase):
compiled_prog = fluid.compiler.CompiledProgram( compiled_prog = fluid.compiler.CompiledProgram(
fluid.default_main_program()) fluid.default_main_program())
temp_dir = tempfile.TemporaryDirectory()
fleet.init_worker() fleet.init_worker()
fleet.fleet.save(dirname="/tmp", feed=['x', 'y'], fetch=[avg_cost])
fleet.fleet.save( fleet.fleet.save(
dirname="/tmp", feed=[input_x, input_y], fetch=[avg_cost]) dirname=temp_dir.name, feed=['x', 'y'], fetch=[avg_cost])
fleet.fleet.save(dirname="/tmp") fleet.fleet.save(
dirname=temp_dir.name, feed=[input_x, input_y], fetch=[avg_cost])
fleet.fleet.save(dirname=temp_dir.name)
fleet.load_model(path="/tmp", mode=0) fleet.load_model(path=temp_dir.name, mode=0)
fleet.load_model(path="/tmp", mode=1) fleet.load_model(path=temp_dir.name, mode=1)
temp_dir.cleanup()
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -33,12 +33,15 @@ print("test") ...@@ -33,12 +33,15 @@ print("test")
class TestCollectiveLauncher(unittest.TestCase): class TestCollectiveLauncher(unittest.TestCase):
def setUp(self): def setUp(self):
file_dir = os.path.dirname(os.path.abspath(__file__)) self.temp_dir = tempfile.TemporaryDirectory()
self.code_path = os.path.join(self.temp_dir.name,
self.code_path = os.path.join(file_dir, "fake_python_for_elastic.py") "fake_python_for_elastic.py")
with open(self.code_path, "w") as f: with open(self.code_path, "w") as f:
f.write(fake_python_code) f.write(fake_python_code)
def tearDown(self):
self.temp_dir.cleanup()
def test_launch(self): def test_launch(self):
class Argument: class Argument:
elastic_server = "127.0.0.1:2379" elastic_server = "127.0.0.1:2379"
...@@ -56,7 +59,7 @@ class TestCollectiveLauncher(unittest.TestCase): ...@@ -56,7 +59,7 @@ class TestCollectiveLauncher(unittest.TestCase):
run_mode = "cpuonly" run_mode = "cpuonly"
servers = None servers = None
rank_mapping_path = None rank_mapping_path = None
training_script = "fake_python_for_elastic.py" training_script = self.code_path
training_script_args = ["--use_amp false"] training_script_args = ["--use_amp false"]
log_dir = None log_dir = None
...@@ -94,7 +97,7 @@ class TestCollectiveLauncher(unittest.TestCase): ...@@ -94,7 +97,7 @@ class TestCollectiveLauncher(unittest.TestCase):
run_mode = "cpuonly" run_mode = "cpuonly"
servers = None servers = None
rank_mapping_path = None rank_mapping_path = None
training_script = "fake_python_for_elastic.py" training_script = self.code_path
training_script_args = ["--use_amp false"] training_script_args = ["--use_amp false"]
log_dir = None log_dir = None
......
...@@ -16,17 +16,25 @@ import unittest ...@@ -16,17 +16,25 @@ import unittest
import paddle import paddle
import numpy as np import numpy as np
import os import os
import tempfile
from paddle.fluid import core from paddle.fluid import core
paddle.enable_static() paddle.enable_static()
class TestDistModelRun(unittest.TestCase): class TestDistModelRun(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
# step 6: clean up the env, delete the saved model and params
print('cleaned up the env')
self.temp_dir.cleanup()
def test_dist_model_run(self): def test_dist_model_run(self):
# step 0: declare folder to save the model and params # step 0: declare folder to save the model and params
folder = './dist_model_run_test/' path_prefix = os.path.join(self.temp_dir.name,
file = 'inf' "dist_model_run_test/inf")
path_prefix = folder + file
# step 1: saving the inference model and params # step 1: saving the inference model and params
x = paddle.static.data(name='x', shape=[28, 28], dtype='float32') x = paddle.static.data(name='x', shape=[28, 28], dtype='float32')
...@@ -75,12 +83,6 @@ class TestDistModelRun(unittest.TestCase): ...@@ -75,12 +83,6 @@ class TestDistModelRun(unittest.TestCase):
# step 5: compare two results # step 5: compare two results
self.assertTrue(np.allclose(dist_model_rst, load_inference_model_rst)) self.assertTrue(np.allclose(dist_model_rst, load_inference_model_rst))
# step 6: clean up the env, delete the saved model and params
os.remove(path_prefix + '.pdiparams')
os.remove(path_prefix + '.pdmodel')
os.rmdir(folder)
print('cleaned up the env')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -17,6 +17,7 @@ from __future__ import print_function ...@@ -17,6 +17,7 @@ from __future__ import print_function
import paddle import paddle
import os import os
import unittest import unittest
import tempfile
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
...@@ -28,7 +29,10 @@ class TestCloudRoleMaker2(unittest.TestCase): ...@@ -28,7 +29,10 @@ class TestCloudRoleMaker2(unittest.TestCase):
def setUp(self): def setUp(self):
"""Set up, set envs.""" """Set up, set envs."""
pass self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_pslib_2(self): def test_pslib_2(self):
"""Test cases for pslib.""" """Test cases for pslib."""
...@@ -37,6 +41,8 @@ class TestCloudRoleMaker2(unittest.TestCase): ...@@ -37,6 +41,8 @@ class TestCloudRoleMaker2(unittest.TestCase):
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import RoleMakerBase from paddle.fluid.incubate.fleet.base.role_maker import RoleMakerBase
paddle.enable_static()
os.environ["POD_IP"] = "127.0.0.1" os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001" os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER" os.environ["TRAINING_ROLE"] = "TRAINER"
...@@ -155,17 +161,19 @@ class TestCloudRoleMaker2(unittest.TestCase): ...@@ -155,17 +161,19 @@ class TestCloudRoleMaker2(unittest.TestCase):
role23 = GeneralRoleMaker(path="./test_gloo_23") role23 = GeneralRoleMaker(path="./test_gloo_23")
role23._get_size() role23._get_size()
role23._get_size() role23._get_size()
with open("test_fleet_gloo_role_maker_1.txt", "w") as f:
path = os.path.join(self.temp_dir.name,
"test_fleet_gloo_role_maker_1.txt")
with open(path, "w") as f:
data = "1 1 1 1\n" data = "1 1 1 1\n"
f.write(data) f.write(data)
dataset = paddle.distributed.InMemoryDataset() dataset = paddle.distributed.InMemoryDataset()
dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"]) dataset.set_filelist([path])
dataset._set_use_var([show, label]) dataset._set_use_var([show, label])
dataset.load_into_memory() dataset.load_into_memory()
dataset.get_memory_data_size(fleet) dataset.get_memory_data_size(fleet)
dataset.get_shuffle_data_size(fleet) dataset.get_shuffle_data_size(fleet)
os.remove("./test_fleet_gloo_role_maker_1.txt")
class TmpClass(): class TmpClass():
""" """
......
...@@ -24,6 +24,7 @@ import paddle.fluid.core as core ...@@ -24,6 +24,7 @@ import paddle.fluid.core as core
import numpy as np import numpy as np
import os import os
import unittest import unittest
import tempfile
class TestDatasetWithStat(unittest.TestCase): class TestDatasetWithStat(unittest.TestCase):
...@@ -35,12 +36,15 @@ class TestDatasetWithStat(unittest.TestCase): ...@@ -35,12 +36,15 @@ class TestDatasetWithStat(unittest.TestCase):
self.drop_last = False self.drop_last = False
def test_dataset_run_with_stat(self): def test_dataset_run_with_stat(self):
with open("test_in_memory_dataset_run_a.txt", "w") as f: temp_dir = tempfile.TemporaryDirectory()
path_a = os.path.join(temp_dir.name, "test_in_memory_dataset_run_a.txt")
path_b = os.path.join(temp_dir.name, "test_in_memory_dataset_run_b.txt")
with open(path_a, "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n" data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data) f.write(data)
with open("test_in_memory_dataset_run_b.txt", "w") as f: with open(path_b, "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n" data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n" data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n" data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
...@@ -62,10 +66,7 @@ class TestDatasetWithStat(unittest.TestCase): ...@@ -62,10 +66,7 @@ class TestDatasetWithStat(unittest.TestCase):
dataset = paddle.distributed.InMemoryDataset() dataset = paddle.distributed.InMemoryDataset()
dataset._set_batch_size(32) dataset._set_batch_size(32)
dataset._set_thread(3) dataset._set_thread(3)
dataset.set_filelist([ dataset.set_filelist([path_a, path_b])
"test_in_memory_dataset_run_a.txt",
"test_in_memory_dataset_run_b.txt"
])
dataset._set_pipe_command("cat") dataset._set_pipe_command("cat")
dataset._set_use_var(slots_vars) dataset._set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -99,8 +100,7 @@ class TestDatasetWithStat(unittest.TestCase): ...@@ -99,8 +100,7 @@ class TestDatasetWithStat(unittest.TestCase):
# total 56 keys # total 56 keys
print(int_stat["STAT_total_feasign_num_in_mem"]) print(int_stat["STAT_total_feasign_num_in_mem"])
os.remove("./test_in_memory_dataset_run_a.txt") temp_dir.cleanup()
os.remove("./test_in_memory_dataset_run_b.txt")
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -17,6 +17,7 @@ import subprocess ...@@ -17,6 +17,7 @@ import subprocess
import sys, os import sys, os
import json import json
import shutil import shutil
import tempfile
import random import random
...@@ -57,13 +58,18 @@ def get_files(pth, prefix): ...@@ -57,13 +58,18 @@ def get_files(pth, prefix):
class Collective_Test(unittest.TestCase): class Collective_Test(unittest.TestCase):
def setUp(self): def setUp(self):
write_file(pyname, colpyfile) self.temp_dir = tempfile.TemporaryDirectory()
self.path = os.path.join(self.temp_dir.name, pyname)
write_file(self.path, colpyfile)
def tearDown(self):
self.temp_dir.cleanup()
def pdrun(self, args, env=None): def pdrun(self, args, env=None):
cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"] cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"]
if args: if args:
cmd.extend(args.split(" ")) cmd.extend(args.split(" "))
cmd.extend([pyname]) cmd.extend([self.path])
env = os.environ.copy() env = os.environ.copy()
# virtual devies for testing # virtual devies for testing
env.update({'CUDA_VISIBLE_DEVICES': '0,1,2,3,4,5,6,7'}) env.update({'CUDA_VISIBLE_DEVICES': '0,1,2,3,4,5,6,7'})
...@@ -71,30 +77,30 @@ class Collective_Test(unittest.TestCase): ...@@ -71,30 +77,30 @@ class Collective_Test(unittest.TestCase):
return proc return proc
def test_collective_1(self): def test_collective_1(self):
args = "--job_id test1" log_dir = tempfile.TemporaryDirectory()
args = "--job_id test1 --log_dir {}".format(log_dir.name)
p = self.pdrun(args) p = self.pdrun(args)
p.wait() p.wait()
self.assertTrue(p.poll() == 0) self.assertTrue(p.poll() == 0)
log_dir.cleanup()
def test_collective_2(self): def test_collective_2(self):
if os.path.exists('./log'): log_dir = tempfile.TemporaryDirectory()
shutil.rmtree('./log') args = "--job_id test2 --devices 0,1,2 --log_dir {}".format(
log_dir.name)
args = "--job_id test2 --devices 0,1,2"
p = self.pdrun(args) p = self.pdrun(args)
p.wait() p.wait()
self.assertTrue(p.poll() == 0) self.assertTrue(p.poll() == 0)
c = get_files('log', 'test2') c = get_files(log_dir.name, 'test2')
self.assertTrue(len(c) == 4) self.assertTrue(len(c) == 4)
log_dir.cleanup()
def test_collective_3(self): def test_collective_3(self):
if os.path.exists('./log'): log_dir = tempfile.TemporaryDirectory()
shutil.rmtree('./log')
port = random.randrange(6000, 8000) port = random.randrange(6000, 8000)
args = "--job_id test3 --devices 0,1 --master 127.0.0.1:{} --np 2".format( args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2".format(
port) log_dir.name, port)
p1 = self.pdrun(args) p1 = self.pdrun(args)
p2 = self.pdrun(args) p2 = self.pdrun(args)
p1.wait() p1.wait()
...@@ -102,47 +108,53 @@ class Collective_Test(unittest.TestCase): ...@@ -102,47 +108,53 @@ class Collective_Test(unittest.TestCase):
self.assertTrue(p1.poll() == 0) self.assertTrue(p1.poll() == 0)
self.assertTrue(p2.poll() == 0) self.assertTrue(p2.poll() == 0)
c = get_files('log', 'test3') c = get_files(log_dir.name, 'test3')
self.assertTrue(len(c) == 6) self.assertTrue(len(c) == 6)
log_dir.cleanup()
class PS_Test(unittest.TestCase): class PS_Test(unittest.TestCase):
def setUp(self): def setUp(self):
write_file(pyname, pspyfile) self.temp_dir = tempfile.TemporaryDirectory()
self.path = os.path.join(self.temp_dir.name, pyname)
write_file(self.path, pspyfile)
def tearDown(self):
self.temp_dir.cleanup()
def pdrun(self, args, env=None): def pdrun(self, args, env=None):
cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"] cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"]
if args: if args:
cmd.extend(args.split(" ")) cmd.extend(args.split(" "))
cmd.extend([pyname]) cmd.extend([self.path])
proc = subprocess.Popen(cmd, env) proc = subprocess.Popen(cmd, env)
return proc return proc
def test_ps_1(self): def test_ps_1(self):
args = "--run_mode ps" log_dir = tempfile.TemporaryDirectory()
args = "--run_mode ps --log_dir {}".format(log_dir.name)
p = self.pdrun(args) p = self.pdrun(args)
p.wait() p.wait()
self.assertTrue(p.poll() == 0) self.assertTrue(p.poll() == 0)
log_dir.cleanup()
def test_ps_2(self): def test_ps_2(self):
if os.path.exists('./log'): log_dir = tempfile.TemporaryDirectory()
shutil.rmtree('./log') args = "--job_id ps2 --server_num=2 --trainer_num=2 --log_dir {}".format(
log_dir.name)
args = "--job_id ps2 --server_num=2 --trainer_num=2"
p = self.pdrun(args) p = self.pdrun(args)
p.wait() p.wait()
self.assertTrue(p.poll() == 0) self.assertTrue(p.poll() == 0)
c = get_files('log', 'ps2') c = get_files(log_dir.name, 'ps2')
self.assertTrue(len(c) == 5) self.assertTrue(len(c) == 5)
log_dir.cleanup()
def test_ps_3(self): def test_ps_3(self):
if os.path.exists('./log'): log_dir = tempfile.TemporaryDirectory()
shutil.rmtree('./log')
port = random.randrange(6000, 8000) port = random.randrange(6000, 8000)
args = "--job_id ps3 --master 127.0.0.1:{} --np 2 --server_num=1 --trainer_num=1".format( args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1".format(
port) log_dir.name, port)
p1 = self.pdrun(args) p1 = self.pdrun(args)
p2 = self.pdrun(args) p2 = self.pdrun(args)
p1.wait() p1.wait()
...@@ -150,20 +162,21 @@ class PS_Test(unittest.TestCase): ...@@ -150,20 +162,21 @@ class PS_Test(unittest.TestCase):
self.assertTrue(p1.poll() == 0) self.assertTrue(p1.poll() == 0)
self.assertTrue(p2.poll() == 0) self.assertTrue(p2.poll() == 0)
c = get_files('log', 'ps3') c = get_files(log_dir.name, 'ps3')
self.assertTrue(len(c) == 6) self.assertTrue(len(c) == 6)
log_dir.cleanup()
def test_ps_4(self): def test_ps_4(self):
if os.path.exists('./log'): log_dir = tempfile.TemporaryDirectory()
shutil.rmtree('./log') args = "--job_id ps4 --log_dir {} --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903".format(
log_dir.name)
args = "--job_id ps4 --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903"
p1 = self.pdrun(args) p1 = self.pdrun(args)
p1.wait() p1.wait()
self.assertTrue(p1.poll() == 0) self.assertTrue(p1.poll() == 0)
c = get_files('log', 'ps4') c = get_files(log_dir.name, 'ps4')
self.assertTrue(len(c) == 5) self.assertTrue(len(c) == 5)
log_dir.cleanup()
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册