未验证 提交 0c51f241 编写于 作者: Z zhaoyingli 提交者: GitHub

place all save/load path into temporary directory (#43451)

* use tempfile to place temporary files

* update

* revert test_communicator

* fix test_dist_base
上级 fcd32950
......@@ -23,6 +23,7 @@ import subprocess
import traceback
import functools
import pickle
import tempfile
from contextlib import closing
import paddle
import paddle.fluid as fluid
......@@ -99,6 +100,11 @@ class TestDistBase(unittest.TestCase):
self._find_free_port(), self._find_free_port())
self._python_interp = sys.executable
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def _find_free_port(self):
def __free_port():
......@@ -158,8 +164,12 @@ class TestDistBase(unittest.TestCase):
tr_cmd = "%s %s"
tr0_cmd = tr_cmd % (self._python_interp, model_file)
tr1_cmd = tr_cmd % (self._python_interp, model_file)
tr0_pipe = open("/tmp/tr0_err_%d.log" % os.getpid(), "w")
tr1_pipe = open("/tmp/tr1_err_%d.log" % os.getpid(), "w")
path0 = os.path.join(self.temp_dir.name,
"/tmp/tr0_err_%d.log" % os.getpid())
path1 = os.path.join(self.temp_dir.name,
"/tmp/tr1_err_%d.log" % os.getpid())
tr0_pipe = open(path0, "w")
tr1_pipe = open(path1, "w")
#print(tr0_cmd)
tr0_proc = subprocess.Popen(tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
......@@ -178,9 +188,9 @@ class TestDistBase(unittest.TestCase):
# close trainer file
tr0_pipe.close()
tr1_pipe.close()
with open("/tmp/tr0_err_%d.log" % os.getpid(), "r") as f:
with open(path0, "r") as f:
sys.stderr.write('trainer 0 stderr file: %s\n' % f.read())
with open("/tmp/tr1_err_%d.log" % os.getpid(), "r") as f:
with open(path1, "r") as f:
sys.stderr.write('trainer 1 stderr file: %s\n' % f.read())
return pickle.loads(tr0_out), pickle.loads(
tr1_out), tr0_proc.pid, tr1_proc.pid
......
......@@ -23,6 +23,7 @@ import subprocess
import traceback
import functools
import pickle
import tempfile
from contextlib import closing
import paddle.fluid as fluid
import paddle.fluid.unique_name as nameGen
......@@ -144,6 +145,11 @@ class TestDistBase(unittest.TestCase):
self._find_free_port(), self._find_free_port())
self._python_interp = sys.executable
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def _find_free_port(self):
def __free_port():
......@@ -183,8 +189,10 @@ class TestDistBase(unittest.TestCase):
tr_cmd = "%s %s"
tr0_cmd = tr_cmd % (self._python_interp, model_file)
tr1_cmd = tr_cmd % (self._python_interp, model_file)
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
path0 = os.path.join(self.temp_dir.name, "/tmp/tr0_err.log")
path1 = os.path.join(self.temp_dir.name, "/tmp/tr1_err.log")
tr0_pipe = open(path0, "wb")
tr1_pipe = open(path1, "wb")
#print(tr0_cmd)
tr0_proc = subprocess.Popen(tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
......
......@@ -19,6 +19,7 @@ import unittest
import time
import threading
import numpy
import tempfile
import paddle
......@@ -32,7 +33,9 @@ import paddle.distributed.fleet as fleet
class TestCommunicator(unittest.TestCase):
def test_communicator_ps_gpu(self):
with open("test_communicator_ps_gpu.txt", "w") as f:
temp_dir = tempfile.TemporaryDirectory()
path = os.path.join(temp_dir.name, "test_communicator_ps_gpu.txt")
with open(path, "w") as f:
data = "1 0.6 1 0.7\n"
f.write(data)
......@@ -94,7 +97,7 @@ class TestCommunicator(unittest.TestCase):
self.assertTrue(False)
time.sleep(10)
fleet.stop_worker()
os.remove("./test_communicator_ps_gpu.txt")
temp_dir.cleanup()
if __name__ == '__main__':
......
......@@ -13,7 +13,7 @@
# limitations under the License.
from __future__ import print_function
import time
import tempfile
import ast
import unittest
......@@ -872,6 +872,11 @@ class TestDistBase(unittest.TestCase):
self._after_setup_config()
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def _find_free_port(self):
def __free_port():
......@@ -915,8 +920,10 @@ class TestDistBase(unittest.TestCase):
print(ps0_cmd)
print(ps1_cmd)
ps0_pipe = open(log_name + "_ps0_err.log", "wb")
ps1_pipe = open(log_name + "_ps1_err.log", "wb")
path0 = os.path.join(self.temp_dir.name, log_name + "_ps0_err.log")
path1 = os.path.join(self.temp_dir.name, log_name + "_ps1_err.log")
ps0_pipe = open(path0, "wb")
ps1_pipe = open(path1, "wb")
print_to_err(type(self).__name__, "going to start pserver process 0")
ps0_proc = subprocess.Popen(ps0_cmd.strip().split(" "),
......@@ -994,7 +1001,8 @@ class TestDistBase(unittest.TestCase):
print("local_cmd: {}, env: {}".format(cmd, env_local))
if check_error_log:
err_log = open(log_name + "_local.log", "wb")
path = os.path.join(self.temp_dir.name, log_name + "_local.log")
err_log = open(path, "wb")
local_proc = subprocess.Popen(cmd.split(" "),
stdout=subprocess.PIPE,
stderr=err_log,
......@@ -1080,8 +1088,11 @@ class TestDistBase(unittest.TestCase):
print("tr0_cmd: {}, env: {}".format(tr0_cmd, env0))
print("tr1_cmd: {}, env: {}".format(tr1_cmd, env1))
tr0_pipe = open(log_name + "_tr0_err.log", "wb")
tr1_pipe = open(log_name + "_tr1_err.log", "wb")
path0 = os.path.join(self.temp_dir.name, log_name + "_tr0_err.log")
path1 = os.path.join(self.temp_dir.name, log_name + "_tr1_err.log")
tr0_pipe = open(path0, "wb")
tr1_pipe = open(path1, "wb")
print_to_err(type(self).__name__, "going to start trainer process 0")
tr0_proc = subprocess.Popen(tr0_cmd.strip().split(" "),
......@@ -1297,7 +1308,9 @@ class TestDistBase(unittest.TestCase):
print("use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env))
tr_pipe = open(log_name + "_tr{}_err.log".format(i), "wb")
path = os.path.join(self.temp_dir.name,
log_name + "_tr{}_err.log".format(i))
tr_pipe = open(path, "wb")
print_to_err(
type(self).__name__,
......@@ -1358,7 +1371,9 @@ class TestDistBase(unittest.TestCase):
print("use_hallreduce:{} tr_cmd:{}, env: {}".format(
self._use_hallreduce, tr_cmd, tr_env))
tr_pipe = open(log_name + "_tr{}_err.log".format(i), "wb")
path = os.path.join(self.temp_dir.name,
log_name + "_tr{}_err.log".format(i))
tr_pipe = open(path, "wb")
print_to_err(
type(self).__name__,
......@@ -1403,7 +1418,8 @@ class TestDistBase(unittest.TestCase):
tr_env['FLAGS_cudnn_deterministic'] = '0'
print("tr_cmd:{}, env: {}".format(tr_cmd, tr_env))
tr_pipe = open("/tmp/" + "tr{}_err.log".format(i), "wb")
path = os.path.join(self.temp_dir.name + "tr{}_err.log".format(i))
tr_pipe = open(path, "wb")
print_to_err(
type(self).__name__,
......
......@@ -57,13 +57,15 @@ class TestDistMnistNCCL2DGC(TestDistBase):
def tearDown(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
result = count_of_sparse_all_reduce_calls(
'test_dist_mnist_dgc_nccl_tr0_err.log')
log_file = os.path.join(self.temp_dir.name,
'test_dist_mnist_dgc_nccl_tr0_err.log')
result = count_of_sparse_all_reduce_calls(log_file)
# only 1 layer use dgc now, run_step=5, rampup_begin_step=2, so 1 * (5 - 2) = 3
# temp close this test. In python3 CI, the log is right, but the result
# has a problem, may be in multi process mode, log is not written in time.
# self.assertEqual(result, 3)
self.temp_dir.cleanup()
class TestDistMnistNCCL2DGCMultiCards(TestDistBase):
......@@ -86,10 +88,13 @@ class TestDistMnistNCCL2DGCMultiCards(TestDistBase):
def tearDown(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
result = count_of_sparse_all_reduce_calls(
log_file = os.path.join(
self.temp_dir.name,
'test_dist_mnist_dgc_nccl_dgc_2cards_local.log')
result = count_of_sparse_all_reduce_calls(log_file)
# same as above, but use two cards
self.assertEqual(result, 6)
self.temp_dir.cleanup()
if __name__ == "__main__":
......
......@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
import unittest
from paddle.dataset.common import download, DATA_HOME
from paddle.distributed.fleet.dataset import TreeIndex
......@@ -113,6 +115,12 @@ class TestTreeIndex(unittest.TestCase):
class TestIndexSampler(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_layerwise_sampler(self):
path = download(
"https://paddlerec.bj.bcebos.com/tree-based/data/mini_tree.pb",
......@@ -120,7 +128,8 @@ class TestIndexSampler(unittest.TestCase):
tdm_layer_counts = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
#tree = TreeIndex("demo", path)
file_name = "test_in_memory_dataset_tdm_sample_run.txt"
file_name = os.path.join(self.temp_dir.name,
"test_in_memory_dataset_tdm_sample_run.txt")
with open(file_name, "w") as f:
#data = "29 d 29 d 29 29 29 29 29 29 29 29 29 29 29 29\n"
data = "1 1 1 15 15 15\n"
......
......@@ -18,6 +18,7 @@ import sys
import shutil
import unittest
import paddle
import tempfile
def get_test_file():
......@@ -38,12 +39,13 @@ def run_test(clip_after_allreduce=True,
max_global_norm=-1.0,
gradient_merge_steps=1,
use_master_acc_grad=True):
temp_dir = tempfile.TemporaryDirectory()
if not paddle.is_compiled_with_cuda():
return
if os.name == 'nt':
return
args = locals()
log_dir = 'log_{}'.format(os.getpid())
log_dir = os.path.join(temp_dir.name, 'log_{}'.format(os.getpid()))
cmd = [
sys.executable,
'-u',
......@@ -62,15 +64,15 @@ def run_test(clip_after_allreduce=True,
os.environ['USE_MASTER_ACC_GRAD'] = str(1 if use_master_acc_grad else 0)
touch_file_env = 'SUCCESS_TOUCH_FILE'
touch_file_name = 'distributed_fused_lamb_touch_file_{}'.format(os.getpid())
touch_file_name = os.path.join(
temp_dir.name,
'distributed_fused_lamb_touch_file_{}'.format(os.getpid()))
os.environ[touch_file_env] = touch_file_name
remove_file_if_exists(touch_file_name)
try:
assert os.system(cmd) == 0 and os.path.exists(
touch_file_name), 'Test failed when {}'.format(args)
finally:
remove_file_if_exists(touch_file_name)
remove_file_if_exists(log_dir)
temp_dir.cleanup()
class TestDistributedFusedLambWithClip(unittest.TestCase):
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
import unittest
import paddle
......@@ -74,15 +75,19 @@ class TestFleetBase(unittest.TestCase):
compiled_prog = fluid.compiler.CompiledProgram(
fluid.default_main_program())
temp_dir = tempfile.TemporaryDirectory()
fleet.init_worker()
fleet.fleet.save(dirname="/tmp", feed=['x', 'y'], fetch=[avg_cost])
fleet.fleet.save(dirname="/tmp",
fleet.fleet.save(dirname=temp_dir.name,
feed=['x', 'y'],
fetch=[avg_cost])
fleet.fleet.save(dirname=temp_dir.name,
feed=[input_x, input_y],
fetch=[avg_cost])
fleet.fleet.save(dirname="/tmp")
fleet.fleet.save(dirname=temp_dir.name)
fleet.load_model(path="/tmp", mode=0)
fleet.load_model(path="/tmp", mode=1)
fleet.load_model(path=temp_dir.name, mode=0)
fleet.load_model(path=temp_dir.name, mode=1)
temp_dir.cleanup()
if __name__ == "__main__":
......
......@@ -34,12 +34,15 @@ print("test")
class TestCollectiveLauncher(unittest.TestCase):
def setUp(self):
file_dir = os.path.dirname(os.path.abspath(__file__))
self.code_path = os.path.join(file_dir, "fake_python_for_elastic.py")
self.temp_dir = tempfile.TemporaryDirectory()
self.code_path = os.path.join(self.temp_dir.name,
"fake_python_for_elastic.py")
with open(self.code_path, "w") as f:
f.write(fake_python_code)
def tearDown(self):
self.temp_dir.cleanup()
def test_launch(self):
class Argument:
......@@ -58,7 +61,7 @@ class TestCollectiveLauncher(unittest.TestCase):
run_mode = "cpuonly"
servers = None
rank_mapping_path = None
training_script = "fake_python_for_elastic.py"
training_script = self.code_path
training_script_args = ["--use_amp false"]
log_dir = None
......@@ -97,7 +100,7 @@ class TestCollectiveLauncher(unittest.TestCase):
run_mode = "cpuonly"
servers = None
rank_mapping_path = None
training_script = "fake_python_for_elastic.py"
training_script = self.code_path
training_script_args = ["--use_amp false"]
log_dir = None
......
......@@ -16,6 +16,7 @@ import unittest
import paddle
import numpy as np
import os
import tempfile
from paddle.fluid import core
paddle.enable_static()
......@@ -23,11 +24,18 @@ paddle.enable_static()
class TestDistModelRun(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
# step 6: clean up the env, delete the saved model and params
print('cleaned up the env')
self.temp_dir.cleanup()
def test_dist_model_run(self):
# step 0: declare folder to save the model and params
folder = './dist_model_run_test/'
file = 'inf'
path_prefix = folder + file
path_prefix = os.path.join(self.temp_dir.name,
"dist_model_run_test/inf")
# step 1: saving the inference model and params
x = paddle.static.data(name='x', shape=[28, 28], dtype='float32')
......@@ -80,12 +88,6 @@ class TestDistModelRun(unittest.TestCase):
# step 5: compare two results
self.assertTrue(np.allclose(dist_model_rst, load_inference_model_rst))
# step 6: clean up the env, delete the saved model and params
os.remove(path_prefix + '.pdiparams')
os.remove(path_prefix + '.pdmodel')
os.rmdir(folder)
print('cleaned up the env')
if __name__ == '__main__':
unittest.main()
......@@ -17,6 +17,7 @@ from __future__ import print_function
import paddle
import os
import unittest
import tempfile
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
......@@ -28,7 +29,10 @@ class TestCloudRoleMaker2(unittest.TestCase):
def setUp(self):
"""Set up, set envs."""
pass
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self):
self.temp_dir.cleanup()
def test_pslib_2(self):
"""Test cases for pslib."""
......@@ -37,6 +41,8 @@ class TestCloudRoleMaker2(unittest.TestCase):
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import RoleMakerBase
paddle.enable_static()
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
......@@ -155,17 +161,19 @@ class TestCloudRoleMaker2(unittest.TestCase):
role23 = GeneralRoleMaker(path="./test_gloo_23")
role23._get_size()
role23._get_size()
with open("test_fleet_gloo_role_maker_1.txt", "w") as f:
path = os.path.join(self.temp_dir.name,
"test_fleet_gloo_role_maker_1.txt")
with open(path, "w") as f:
data = "1 1 1 1\n"
f.write(data)
dataset = paddle.distributed.InMemoryDataset()
dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"])
dataset.set_filelist([path])
dataset._set_use_var([show, label])
dataset.load_into_memory()
dataset.get_memory_data_size(fleet)
dataset.get_shuffle_data_size(fleet)
os.remove("./test_fleet_gloo_role_maker_1.txt")
class TmpClass():
"""
......
......@@ -25,6 +25,7 @@ import paddle.fluid.core as core
import numpy as np
import os
import unittest
import tempfile
class TestDatasetWithStat(unittest.TestCase):
......@@ -36,12 +37,15 @@ class TestDatasetWithStat(unittest.TestCase):
self.drop_last = False
def test_dataset_run_with_stat(self):
with open("test_in_memory_dataset_run_a.txt", "w") as f:
temp_dir = tempfile.TemporaryDirectory()
path_a = os.path.join(temp_dir.name, "test_in_memory_dataset_run_a.txt")
path_b = os.path.join(temp_dir.name, "test_in_memory_dataset_run_b.txt")
with open(path_a, "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data)
with open("test_in_memory_dataset_run_b.txt", "w") as f:
with open(path_b, "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
......@@ -65,10 +69,7 @@ class TestDatasetWithStat(unittest.TestCase):
dataset = paddle.distributed.InMemoryDataset()
dataset._set_batch_size(32)
dataset._set_thread(3)
dataset.set_filelist([
"test_in_memory_dataset_run_a.txt",
"test_in_memory_dataset_run_b.txt"
])
dataset.set_filelist([path_a, path_b])
dataset._set_pipe_command("cat")
dataset._set_use_var(slots_vars)
dataset.load_into_memory()
......@@ -100,8 +101,7 @@ class TestDatasetWithStat(unittest.TestCase):
# total 56 keys
print(int_stat["STAT_total_feasign_num_in_mem"])
os.remove("./test_in_memory_dataset_run_a.txt")
os.remove("./test_in_memory_dataset_run_b.txt")
temp_dir.cleanup()
if __name__ == '__main__':
......
......@@ -17,6 +17,7 @@ import subprocess
import sys, os
import json
import shutil
import tempfile
import random
......@@ -59,13 +60,18 @@ def get_files(pth, prefix):
class Collective_Test(unittest.TestCase):
def setUp(self):
write_file(pyname, colpyfile)
self.temp_dir = tempfile.TemporaryDirectory()
self.path = os.path.join(self.temp_dir.name, pyname)
write_file(self.path, colpyfile)
def tearDown(self):
self.temp_dir.cleanup()
def pdrun(self, args, env=None):
cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"]
if args:
cmd.extend(args.split(" "))
cmd.extend([pyname])
cmd.extend([self.path])
env = os.environ.copy()
# virtual devies for testing
env.update({'CUDA_VISIBLE_DEVICES': '0,1,2,3,4,5,6,7'})
......@@ -73,30 +79,30 @@ class Collective_Test(unittest.TestCase):
return proc
def test_collective_1(self):
args = "--job_id test1"
log_dir = tempfile.TemporaryDirectory()
args = "--job_id test1 --log_dir {}".format(log_dir.name)
p = self.pdrun(args)
p.wait()
self.assertTrue(p.poll() == 0)
log_dir.cleanup()
def test_collective_2(self):
if os.path.exists('./log'):
shutil.rmtree('./log')
args = "--job_id test2 --devices 0,1,2"
log_dir = tempfile.TemporaryDirectory()
args = "--job_id test2 --devices 0,1,2 --log_dir {}".format(
log_dir.name)
p = self.pdrun(args)
p.wait()
self.assertTrue(p.poll() == 0)
c = get_files('log', 'test2')
c = get_files(log_dir.name, 'test2')
self.assertTrue(len(c) == 4)
log_dir.cleanup()
def test_collective_3(self):
if os.path.exists('./log'):
shutil.rmtree('./log')
log_dir = tempfile.TemporaryDirectory()
port = random.randrange(6000, 8000)
args = "--job_id test3 --devices 0,1 --master 127.0.0.1:{} --nnodes 2".format(
port)
args = "--job_id test3 --devices 0,1 --log_dir {} --master 127.0.0.1:{} --nnodes 2".format(
log_dir.name, port)
p1 = self.pdrun(args)
p2 = self.pdrun(args)
p1.wait()
......@@ -104,48 +110,54 @@ class Collective_Test(unittest.TestCase):
self.assertTrue(p1.poll() == 0)
self.assertTrue(p2.poll() == 0)
c = get_files('log', 'test3')
c = get_files(log_dir.name, 'test3')
self.assertTrue(len(c) == 6)
log_dir.cleanup()
class PS_Test(unittest.TestCase):
def setUp(self):
write_file(pyname, pspyfile)
self.temp_dir = tempfile.TemporaryDirectory()
self.path = os.path.join(self.temp_dir.name, pyname)
write_file(self.path, pspyfile)
def tearDown(self):
self.temp_dir.cleanup()
def pdrun(self, args, env=None):
cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"]
if args:
cmd.extend(args.split(" "))
cmd.extend([pyname])
cmd.extend([self.path])
proc = subprocess.Popen(cmd, env)
return proc
def test_ps_1(self):
args = "--run_mode ps"
log_dir = tempfile.TemporaryDirectory()
args = "--run_mode ps --log_dir {}".format(log_dir.name)
p = self.pdrun(args)
p.wait()
self.assertTrue(p.poll() == 0)
log_dir.cleanup()
def test_ps_2(self):
if os.path.exists('./log'):
shutil.rmtree('./log')
args = "--job_id ps2 --server_num=2 --trainer_num=2"
log_dir = tempfile.TemporaryDirectory()
args = "--job_id ps2 --server_num=2 --trainer_num=2 --log_dir {}".format(
log_dir.name)
p = self.pdrun(args)
p.wait()
self.assertTrue(p.poll() == 0)
c = get_files('log', 'ps2')
c = get_files(log_dir.name, 'ps2')
self.assertTrue(len(c) == 5)
log_dir.cleanup()
def test_ps_3(self):
if os.path.exists('./log'):
shutil.rmtree('./log')
log_dir = tempfile.TemporaryDirectory()
port = random.randrange(6000, 8000)
args = "--job_id ps3 --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1".format(
port)
args = "--job_id ps3 --log_dir {} --master 127.0.0.1:{} --nnodes 2 --server_num=1 --trainer_num=1".format(
log_dir.name, port)
p1 = self.pdrun(args)
p2 = self.pdrun(args)
p1.wait()
......@@ -153,20 +165,21 @@ class PS_Test(unittest.TestCase):
self.assertTrue(p1.poll() == 0)
self.assertTrue(p2.poll() == 0)
c = get_files('log', 'ps3')
c = get_files(log_dir.name, 'ps3')
self.assertTrue(len(c) == 6)
log_dir.cleanup()
def test_ps_4(self):
if os.path.exists('./log'):
shutil.rmtree('./log')
args = "--job_id ps4 --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903"
log_dir = tempfile.TemporaryDirectory()
args = "--job_id ps4 --log_dir {} --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903".format(
log_dir.name)
p1 = self.pdrun(args)
p1.wait()
self.assertTrue(p1.poll() == 0)
c = get_files('log', 'ps4')
c = get_files(log_dir.name, 'ps4')
self.assertTrue(len(c) == 5)
log_dir.cleanup()
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册